call GNUNET_SERVER_receive_done() also on internal error paths
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
46
47 /**
48  * Number of messages we can defragment in parallel.  We only really
49  * defragment 1 message at a time, but if messages get re-ordered, we
50  * may want to keep knowledge about the previous message to avoid
51  * discarding the current message in favor of a single fragment of a
52  * previous message.  3 should be good since we don't expect massive
53  * message reorderings with UDP.
54  */
55 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
56
57 /**
58  * We keep a defragmentation queue per sender address.  How many
59  * sender addresses do we support at the same time? Memory consumption
60  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
61  * value. (So 128 corresponds to 12 MB and should suffice for
62  * connecting to roughly 128 peers via UDP).
63  */
64 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
65
66
67 /**
68  * Closure for #append_port().
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * DLL
74    */
75   struct PrettyPrinterContext *next;
76
77   /**
78    * DLL
79    */
80   struct PrettyPrinterContext *prev;
81
82   /**
83    * Our plugin.
84    */
85   struct Plugin *plugin;
86
87   /**
88    * Resolver handle
89    */
90   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
91
92   /**
93    * Function to call with the result.
94    */
95   GNUNET_TRANSPORT_AddressStringCallback asc;
96
97   /**
98    * Clsoure for @e asc.
99    */
100   void *asc_cls;
101
102   /**
103    * Timeout task
104    */
105   struct GNUNET_SCHEDULER_Task *timeout_task;
106
107   /**
108    * Is this an IPv6 address?
109    */
110   int ipv6;
111
112   /**
113    * Options
114    */
115   uint32_t options;
116
117   /**
118    * Port to add after the IP address.
119    */
120   uint16_t port;
121
122 };
123
124
125 /**
126  * Session with another peer.
127  */
128 struct Session
129 {
130   /**
131    * Which peer is this session for?
132    */
133   struct GNUNET_PeerIdentity target;
134
135   /**
136    * Plugin this session belongs to.
137    */
138   struct Plugin *plugin;
139
140   /**
141    * Context for dealing with fragments.
142    */
143   struct UDP_FragmentationContext *frag_ctx;
144
145   /**
146    * Desired delay for next sending we send to other peer
147    */
148   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
149
150   /**
151    * Desired delay for next sending we received from other peer
152    */
153   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
154
155   /**
156    * Session timeout task
157    */
158   struct GNUNET_SCHEDULER_Task *timeout_task;
159
160   /**
161    * When does this session time out?
162    */
163   struct GNUNET_TIME_Absolute timeout;
164
165   /**
166    * expected delay for ACKs
167    */
168   struct GNUNET_TIME_Relative last_expected_ack_delay;
169
170   /**
171    * desired delay between UDP messages
172    */
173   struct GNUNET_TIME_Relative last_expected_msg_delay;
174
175   /**
176    * Our own address.
177    */
178   struct GNUNET_HELLO_Address *address;
179
180   /**
181    * Number of bytes waiting for transmission to this peer.
182    */
183   unsigned long long bytes_in_queue;
184
185   /**
186    * Number of messages waiting for transmission to this peer.
187    */
188   unsigned int msgs_in_queue;
189
190   /**
191    * Reference counter to indicate that this session is
192    * currently being used and must not be destroyed;
193    * setting @e in_destroy will destroy it as soon as
194    * possible.
195    */
196   unsigned int rc;
197
198   /**
199    * Network type of the address.
200    */
201   enum GNUNET_ATS_Network_Type scope;
202
203   /**
204    * Is this session about to be destroyed (sometimes we cannot
205    * destroy a session immediately as below us on the stack
206    * there might be code that still uses it; in this case,
207    * @e rc is non-zero).
208    */
209   int in_destroy;
210 };
211
212
213 /**
214  * Closure for #process_inbound_tokenized_messages().
215  */
216 struct SourceInformation
217 {
218   /**
219    * Sender identity.
220    */
221   struct GNUNET_PeerIdentity sender;
222
223   /**
224    * Associated session.
225    */
226   struct Session *session;
227
228 };
229
230 /**
231  * Closure for #find_receive_context().
232  */
233 struct FindReceiveContext
234 {
235   /**
236    * Where to store the result.
237    */
238   struct DefragContext *rc;
239
240   /**
241    * Session associated with this context.
242    */
243   struct Session *session;
244
245   /**
246    * Address to find.
247    */
248   const union UdpAddress *udp_addr;
249
250   /**
251    * Number of bytes in @e udp_addr.
252    */
253   size_t udp_addr_len;
254
255 };
256
257 /**
258  * Data structure to track defragmentation contexts based
259  * on the source of the UDP traffic.
260  */
261 struct DefragContext
262 {
263
264   /**
265    * Defragmentation context.
266    */
267   struct GNUNET_DEFRAGMENT_Context *defrag;
268
269   /**
270    * Reference to master plugin struct.
271    */
272   struct Plugin *plugin;
273
274   /**
275    * Node in the defrag heap.
276    */
277   struct GNUNET_CONTAINER_HeapNode *hnode;
278
279   /**
280    * Who's message(s) are we defragmenting here?
281    * Only initialized once we succeeded and
282    * @e have_sender is set.
283    */
284   struct GNUNET_PeerIdentity sender;
285
286   /**
287    * Source address this receive context is for (allocated at the
288    * end of the struct).
289    */
290   const union UdpAddress *udp_addr;
291
292   /**
293    * Length of @e udp_addr.
294    */
295   size_t udp_addr_len;
296
297   /**
298    * Network type the address belongs to.
299    */
300   enum GNUNET_ATS_Network_Type network_type;
301
302   /**
303    * Has the @e sender field been initialized yet?
304    */
305   int have_sender;
306 };
307
308
309 /**
310  * Context to send fragmented messages
311  */
312 struct UDP_FragmentationContext
313 {
314   /**
315    * Next in linked list
316    */
317   struct UDP_FragmentationContext *next;
318
319   /**
320    * Previous in linked list
321    */
322   struct UDP_FragmentationContext *prev;
323
324   /**
325    * The plugin
326    */
327   struct Plugin *plugin;
328
329   /**
330    * Handle for GNUNET_FRAGMENT context
331    */
332   struct GNUNET_FRAGMENT_Context *frag;
333
334   /**
335    * The session this fragmentation context belongs to
336    */
337   struct Session *session;
338
339   /**
340    * Function to call upon completion of the transmission.
341    */
342   GNUNET_TRANSPORT_TransmitContinuation cont;
343
344   /**
345    * Closure for @e cont.
346    */
347   void *cont_cls;
348
349   /**
350    * Message timeout
351    */
352   struct GNUNET_TIME_Absolute timeout;
353
354   /**
355    * Payload size of original unfragmented message
356    */
357   size_t payload_size;
358
359   /**
360    * Bytes used to send all fragments on wire including UDP overhead
361    */
362   size_t on_wire_size;
363
364   /**
365    * FIXME.
366    */
367   unsigned int fragments_used;
368
369 };
370
371
372 /**
373  * Message types included in a `struct UDP_MessageWrapper`
374  */
375 enum UDP_MessageType
376 {
377   /**
378    * Uninitialized (error)
379    */
380   UMT_UNDEFINED = 0,
381
382   /**
383    * Fragment of a message.
384    */
385   UMT_MSG_FRAGMENTED = 1,
386
387   /**
388    *
389    */
390   UMT_MSG_FRAGMENTED_COMPLETE = 2,
391
392   /**
393    * Unfragmented message.
394    */
395   UMT_MSG_UNFRAGMENTED = 3,
396
397   /**
398    * Receipt confirmation.
399    */
400   UMT_MSG_ACK = 4
401
402 };
403
404
405 /**
406  * Information we track for each message in the queue.
407  */
408 struct UDP_MessageWrapper
409 {
410   /**
411    * Session this message belongs to
412    */
413   struct Session *session;
414
415   /**
416    * DLL of messages
417    * previous element
418    */
419   struct UDP_MessageWrapper *prev;
420
421   /**
422    * DLL of messages
423    * previous element
424    */
425   struct UDP_MessageWrapper *next;
426
427   /**
428    * Message with size msg_size including UDP specific overhead
429    */
430   char *msg_buf;
431
432   /**
433    * Function to call upon completion of the transmission.
434    */
435   GNUNET_TRANSPORT_TransmitContinuation cont;
436
437   /**
438    * Closure for @e cont.
439    */
440   void *cont_cls;
441
442   /**
443    * Fragmentation context
444    * frag_ctx == NULL if transport <= MTU
445    * frag_ctx != NULL if transport > MTU
446    */
447   struct UDP_FragmentationContext *frag_ctx;
448
449   /**
450    * Message timeout
451    */
452   struct GNUNET_TIME_Absolute timeout;
453
454   /**
455    * Size of UDP message to send including UDP specific overhead
456    */
457   size_t msg_size;
458
459   /**
460    * Payload size of original message
461    */
462   size_t payload_size;
463
464   /**
465    * Message type
466    */
467   enum UDP_MessageType msg_type;
468
469 };
470
471
472 /**
473  * UDP ACK Message-Packet header.
474  */
475 struct UDP_ACK_Message
476 {
477   /**
478    * Message header.
479    */
480   struct GNUNET_MessageHeader header;
481
482   /**
483    * Desired delay for flow control
484    */
485   uint32_t delay;
486
487   /**
488    * What is the identity of the sender
489    */
490   struct GNUNET_PeerIdentity sender;
491
492 };
493
494
495 /**
496  * If a session monitor is attached, notify it about the new
497  * session state.
498  *
499  * @param plugin our plugin
500  * @param session session that changed state
501  * @param state new state of the session
502  */
503 static void
504 notify_session_monitor (struct Plugin *plugin,
505                         struct Session *session,
506                         enum GNUNET_TRANSPORT_SessionState state)
507 {
508   struct GNUNET_TRANSPORT_SessionInfo info;
509
510   if (NULL == plugin->sic)
511     return;
512   if (GNUNET_YES == session->in_destroy)
513     return; /* already destroyed, just RC>0 left-over actions */
514   memset (&info, 0, sizeof (info));
515   info.state = state;
516   info.is_inbound = GNUNET_SYSERR; /* hard to say */
517   info.num_msg_pending = session->msgs_in_queue;
518   info.num_bytes_pending = session->bytes_in_queue;
519   /* info.receive_delay remains zero as this is not supported by UDP
520      (cannot selectively not receive from 'some' peer while continuing
521      to receive from others) */
522   info.session_timeout = session->timeout;
523   info.address = session->address;
524   plugin->sic (plugin->sic_cls,
525                session,
526                &info);
527 }
528
529
530 /**
531  * We have been notified that our readset has something to read.  We don't
532  * know which socket needs to be read, so we have to check each one
533  * Then reschedule this function to be called again once more is available.
534  *
535  * @param cls the plugin handle
536  * @param tc the scheduling context (for rescheduling this function again)
537  */
538 static void
539 udp_plugin_select (void *cls,
540                    const struct GNUNET_SCHEDULER_TaskContext *tc);
541
542
543 /**
544  * We have been notified that our readset has something to read.  We don't
545  * know which socket needs to be read, so we have to check each one
546  * Then reschedule this function to be called again once more is available.
547  *
548  * @param cls the plugin handle
549  * @param tc the scheduling context (for rescheduling this function again)
550  */
551 static void
552 udp_plugin_select_v6 (void *cls,
553                       const struct GNUNET_SCHEDULER_TaskContext *tc);
554
555
556 /**
557  * (re)schedule select tasks for this plugin.
558  *
559  * @param plugin plugin to reschedule
560  */
561 static void
562 schedule_select (struct Plugin *plugin)
563 {
564   struct GNUNET_TIME_Relative min_delay;
565   struct UDP_MessageWrapper *udpw;
566
567   if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
568   {
569     /* Find a message ready to send:
570      * Flow delay from other peer is expired or not set (0) */
571     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
572     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
573       min_delay = GNUNET_TIME_relative_min (min_delay,
574           GNUNET_TIME_absolute_get_remaining (
575               udpw->session->flow_delay_from_other_peer));
576
577     if (plugin->select_task != NULL )
578       GNUNET_SCHEDULER_cancel (plugin->select_task);
579
580     /* Schedule with:
581      * - write active set if message is ready
582      * - timeout minimum delay */
583     plugin->select_task = GNUNET_SCHEDULER_add_select (
584         GNUNET_SCHEDULER_PRIORITY_DEFAULT,
585         (0 == min_delay.rel_value_us) ?
586             GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v4,
587         (0 == min_delay.rel_value_us) ? plugin->ws_v4 : NULL,
588         &udp_plugin_select, plugin);
589   }
590   if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
591   {
592     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
593     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
594       min_delay = GNUNET_TIME_relative_min (min_delay,
595           GNUNET_TIME_absolute_get_remaining (
596               udpw->session->flow_delay_from_other_peer));
597
598     if (NULL != plugin->select_task_v6)
599       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
600     plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (
601         GNUNET_SCHEDULER_PRIORITY_DEFAULT,
602         (0 == min_delay.rel_value_us) ?
603             GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v6,
604         (0 == min_delay.rel_value_us) ? plugin->ws_v6 : NULL,
605         &udp_plugin_select_v6, plugin);
606   }
607 }
608
609
610 /**
611  * Function called for a quick conversion of the binary address to
612  * a numeric address.  Note that the caller must not free the
613  * address and that the next call to this function is allowed
614  * to override the address again.
615  *
616  * @param cls closure
617  * @param addr binary address (a `union UdpAddress`)
618  * @param addrlen length of the @a addr
619  * @return string representing the same address
620  */
621 const char *
622 udp_address_to_string (void *cls,
623                        const void *addr,
624                        size_t addrlen)
625 {
626   static char rbuf[INET6_ADDRSTRLEN + 10];
627   char buf[INET6_ADDRSTRLEN];
628   const void *sb;
629   struct in_addr a4;
630   struct in6_addr a6;
631   const struct IPv4UdpAddress *t4;
632   const struct IPv6UdpAddress *t6;
633   int af;
634   uint16_t port;
635   uint32_t options;
636
637   if ((NULL != addr) && (addrlen == sizeof(struct IPv6UdpAddress)))
638   {
639     t6 = addr;
640     af = AF_INET6;
641     options = ntohl (t6->options);
642     port = ntohs (t6->u6_port);
643     memcpy (&a6, &t6->ipv6_addr, sizeof(a6));
644     sb = &a6;
645   }
646   else if ((NULL != addr) && (addrlen == sizeof(struct IPv4UdpAddress)))
647   {
648     t4 = addr;
649     af = AF_INET;
650     options = ntohl (t4->options);
651     port = ntohs (t4->u4_port);
652     memcpy (&a4, &t4->ipv4_addr, sizeof(a4));
653     sb = &a4;
654   }
655   else
656   {
657     return NULL;
658   }
659   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
660
661   GNUNET_snprintf (rbuf, sizeof(rbuf),
662                    (af == AF_INET6)
663                    ? "%s.%u.[%s]:%u"
664                    : "%s.%u.%s:%u",
665                    PLUGIN_NAME,
666                    options,
667                    buf,
668                    port);
669   return rbuf;
670 }
671
672
673 /**
674  * Function called to convert a string address to
675  * a binary address.
676  *
677  * @param cls closure (`struct Plugin *`)
678  * @param addr string address
679  * @param addrlen length of the address
680  * @param buf location to store the buffer
681  * @param added location to store the number of bytes in the buffer.
682  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
683  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
684  */
685 static int
686 udp_string_to_address (void *cls,
687                        const char *addr,
688                        uint16_t addrlen,
689                        void **buf,
690                        size_t *added)
691 {
692   struct sockaddr_storage socket_address;
693   char *address;
694   char *plugin;
695   char *optionstr;
696   uint32_t options;
697
698   /* Format tcp.options.address:port */
699   address = NULL;
700   plugin = NULL;
701   optionstr = NULL;
702
703   if ((NULL == addr) || (addrlen == 0))
704   {
705     GNUNET_break(0);
706     return GNUNET_SYSERR;
707   }
708   if ('\0' != addr[addrlen - 1])
709   {
710     GNUNET_break(0);
711     return GNUNET_SYSERR;
712   }
713   if (strlen (addr) != addrlen - 1)
714   {
715     GNUNET_break(0);
716     return GNUNET_SYSERR;
717   }
718   plugin = GNUNET_strdup (addr);
719   optionstr = strchr (plugin, '.');
720   if (NULL == optionstr)
721   {
722     GNUNET_break(0);
723     GNUNET_free(plugin);
724     return GNUNET_SYSERR;
725   }
726   optionstr[0] = '\0';
727   optionstr++;
728   options = atol (optionstr);
729   address = strchr (optionstr, '.');
730   if (NULL == address)
731   {
732     GNUNET_break(0);
733     GNUNET_free(plugin);
734     return GNUNET_SYSERR;
735   }
736   address[0] = '\0';
737   address++;
738
739   if (GNUNET_OK !=
740       GNUNET_STRINGS_to_address_ip (address, strlen (address),
741                                     &socket_address))
742   {
743     GNUNET_break(0);
744     GNUNET_free(plugin);
745     return GNUNET_SYSERR;
746   }
747
748   GNUNET_free(plugin);
749
750   switch (socket_address.ss_family)
751   {
752   case AF_INET:
753   {
754     struct IPv4UdpAddress *u4;
755     struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
756     u4 = GNUNET_new (struct IPv4UdpAddress);
757     u4->options = htonl (options);
758     u4->ipv4_addr = in4->sin_addr.s_addr;
759     u4->u4_port = in4->sin_port;
760     *buf = u4;
761     *added = sizeof(struct IPv4UdpAddress);
762     return GNUNET_OK;
763   }
764   case AF_INET6:
765   {
766     struct IPv6UdpAddress *u6;
767     struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
768     u6 = GNUNET_new (struct IPv6UdpAddress);
769     u6->options = htonl (options);
770     u6->ipv6_addr = in6->sin6_addr;
771     u6->u6_port = in6->sin6_port;
772     *buf = u6;
773     *added = sizeof(struct IPv6UdpAddress);
774     return GNUNET_OK;
775   }
776   default:
777     GNUNET_break(0);
778     return GNUNET_SYSERR;
779   }
780 }
781
782
783 /**
784  * Append our port and forward the result.
785  *
786  * @param cls a `struct PrettyPrinterContext *`
787  * @param hostname result from DNS resolver
788  */
789 static void
790 append_port (void *cls,
791              const char *hostname)
792 {
793   struct PrettyPrinterContext *ppc = cls;
794   struct Plugin *plugin = ppc->plugin;
795   char *ret;
796
797   if (NULL == hostname)
798   {
799     /* Final call, done */
800     ppc->asc (ppc->asc_cls,
801               NULL,
802               GNUNET_OK);
803     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
804                                  plugin->ppc_dll_tail,
805                                  ppc);
806     ppc->resolver_handle = NULL;
807     GNUNET_free (ppc);
808     return;
809   }
810   if (GNUNET_YES == ppc->ipv6)
811     GNUNET_asprintf (&ret,
812                      "%s.%u.[%s]:%d",
813                      PLUGIN_NAME,
814                      ppc->options,
815                      hostname,
816                      ppc->port);
817   else
818     GNUNET_asprintf (&ret,
819                      "%s.%u.%s:%d",
820                      PLUGIN_NAME,
821                      ppc->options,
822                      hostname,
823                      ppc->port);
824   ppc->asc (ppc->asc_cls,
825             ret,
826             GNUNET_OK);
827   GNUNET_free (ret);
828 }
829
830
831 /**
832  * Convert the transports address to a nice, human-readable
833  * format.
834  *
835  * @param cls closure with the `struct Plugin *`
836  * @param type name of the transport that generated the address
837  * @param addr one of the addresses of the host, NULL for the last address
838  *        the specific address format depends on the transport;
839  *        a `union UdpAddress`
840  * @param addrlen length of the address
841  * @param numeric should (IP) addresses be displayed in numeric form?
842  * @param timeout after how long should we give up?
843  * @param asc function to call on each string
844  * @param asc_cls closure for @a asc
845  */
846 static void
847 udp_plugin_address_pretty_printer (void *cls,
848                                    const char *type,
849                                    const void *addr,
850                                    size_t addrlen,
851                                    int numeric,
852                                    struct GNUNET_TIME_Relative timeout,
853                                    GNUNET_TRANSPORT_AddressStringCallback asc,
854                                    void *asc_cls)
855 {
856   struct Plugin *plugin = cls;
857   struct PrettyPrinterContext *ppc;
858   const void *sb;
859   size_t sbs;
860   struct sockaddr_in a4;
861   struct sockaddr_in6 a6;
862   const struct IPv4UdpAddress *u4;
863   const struct IPv6UdpAddress *u6;
864   uint16_t port;
865   uint32_t options;
866
867   if (addrlen == sizeof(struct IPv6UdpAddress))
868   {
869     u6 = addr;
870     memset (&a6, 0, sizeof(a6));
871     a6.sin6_family = AF_INET6;
872 #if HAVE_SOCKADDR_IN_SIN_LEN
873     a6.sin6_len = sizeof (a6);
874 #endif
875     a6.sin6_port = u6->u6_port;
876     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
877     port = ntohs (u6->u6_port);
878     options = ntohl (u6->options);
879     sb = &a6;
880     sbs = sizeof(a6);
881   }
882   else if (addrlen == sizeof(struct IPv4UdpAddress))
883   {
884     u4 = addr;
885     memset (&a4, 0, sizeof(a4));
886     a4.sin_family = AF_INET;
887 #if HAVE_SOCKADDR_IN_SIN_LEN
888     a4.sin_len = sizeof (a4);
889 #endif
890     a4.sin_port = u4->u4_port;
891     a4.sin_addr.s_addr = u4->ipv4_addr;
892     port = ntohs (u4->u4_port);
893     options = ntohl (u4->options);
894     sb = &a4;
895     sbs = sizeof(a4);
896   }
897   else
898   {
899     /* invalid address */
900     GNUNET_break_op (0);
901     asc (asc_cls, NULL , GNUNET_SYSERR);
902     asc (asc_cls, NULL, GNUNET_OK);
903     return;
904   }
905   ppc = GNUNET_new (struct PrettyPrinterContext);
906   ppc->plugin = plugin;
907   ppc->asc = asc;
908   ppc->asc_cls = asc_cls;
909   ppc->port = port;
910   ppc->options = options;
911   if (addrlen == sizeof(struct IPv6UdpAddress))
912     ppc->ipv6 = GNUNET_YES;
913   else
914     ppc->ipv6 = GNUNET_NO;
915   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
916                                plugin->ppc_dll_tail,
917                                ppc);
918   ppc->resolver_handle
919     = GNUNET_RESOLVER_hostname_get (sb,
920                                     sbs,
921                                     ! numeric,
922                                     timeout,
923                                     &append_port, ppc);
924 }
925
926
927 /**
928  * FIXME.
929  */
930 static void
931 call_continuation (struct UDP_MessageWrapper *udpw,
932                    int result)
933 {
934   struct Session *session = udpw->session;
935   struct Plugin *plugin = session->plugin;
936   size_t overhead;
937
938   LOG (GNUNET_ERROR_TYPE_DEBUG,
939        "Calling continuation for %u byte message to `%s' with result %s\n",
940        udpw->payload_size,
941        GNUNET_i2s (&udpw->session->target),
942        (GNUNET_OK == result) ? "OK" : "SYSERR");
943
944   if (udpw->msg_size >= udpw->payload_size)
945     overhead = udpw->msg_size - udpw->payload_size;
946   else
947     overhead = udpw->msg_size;
948
949   switch (result)
950   {
951   case GNUNET_OK:
952     switch (udpw->msg_type)
953     {
954     case UMT_MSG_UNFRAGMENTED:
955       if (NULL != udpw->cont)
956       {
957         /* Transport continuation */
958         udpw->cont (udpw->cont_cls,
959                     &udpw->session->target,
960                     result,
961                     udpw->payload_size,
962                     udpw->msg_size);
963       }
964       GNUNET_STATISTICS_update (plugin->env->stats,
965                                 "# UDP, unfragmented msgs, messages, sent, success",
966                                 1,
967                                 GNUNET_NO);
968       GNUNET_STATISTICS_update (plugin->env->stats,
969                                 "# UDP, unfragmented msgs, bytes payload, sent, success",
970                                 udpw->payload_size,
971                                 GNUNET_NO);
972       GNUNET_STATISTICS_update (plugin->env->stats,
973                                 "# UDP, unfragmented msgs, bytes overhead, sent, success",
974                                 overhead,
975                                 GNUNET_NO);
976       GNUNET_STATISTICS_update (plugin->env->stats,
977                                 "# UDP, total, bytes overhead, sent",
978                                 overhead,
979                                 GNUNET_NO);
980       GNUNET_STATISTICS_update (plugin->env->stats,
981                                 "# UDP, total, bytes payload, sent",
982                                 udpw->payload_size,
983                                 GNUNET_NO);
984       break;
985     case UMT_MSG_FRAGMENTED_COMPLETE:
986       GNUNET_assert(NULL != udpw->frag_ctx);
987       if (udpw->frag_ctx->cont != NULL )
988         udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls,
989                               &udpw->session->target,
990                               GNUNET_OK,
991                               udpw->frag_ctx->payload_size,
992                               udpw->frag_ctx->on_wire_size);
993       GNUNET_STATISTICS_update (plugin->env->stats,
994                                 "# UDP, fragmented msgs, messages, sent, success",
995                                 1,
996                                 GNUNET_NO);
997       GNUNET_STATISTICS_update (plugin->env->stats,
998                                 "# UDP, fragmented msgs, bytes payload, sent, success",
999                                 udpw->payload_size,
1000                                 GNUNET_NO);
1001       GNUNET_STATISTICS_update (plugin->env->stats,
1002                                 "# UDP, fragmented msgs, bytes overhead, sent, success",
1003                                 overhead,
1004                                 GNUNET_NO);
1005       GNUNET_STATISTICS_update (plugin->env->stats,
1006                                 "# UDP, total, bytes overhead, sent",
1007                                 overhead,
1008                                 GNUNET_NO);
1009       GNUNET_STATISTICS_update (plugin->env->stats,
1010                                 "# UDP, total, bytes payload, sent",
1011                                 udpw->payload_size,
1012                                 GNUNET_NO);
1013       GNUNET_STATISTICS_update (plugin->env->stats,
1014                                 "# UDP, fragmented msgs, messages, pending",
1015                                 -1,
1016                                 GNUNET_NO);
1017       break;
1018     case UMT_MSG_FRAGMENTED:
1019       /* Fragmented message: enqueue next fragment */
1020       if (NULL != udpw->cont)
1021         udpw->cont (udpw->cont_cls,
1022                     &udpw->session->target,
1023                     result,
1024                     udpw->payload_size,
1025                     udpw->msg_size);
1026       GNUNET_STATISTICS_update (plugin->env->stats,
1027                                 "# UDP, fragmented msgs, fragments, sent, success",
1028                                 1,
1029                                 GNUNET_NO);
1030       GNUNET_STATISTICS_update (plugin->env->stats,
1031                                 "# UDP, fragmented msgs, fragments bytes, sent, success",
1032                                 udpw->msg_size,
1033                                 GNUNET_NO);
1034       break;
1035     case UMT_MSG_ACK:
1036       /* No continuation */
1037       GNUNET_STATISTICS_update (plugin->env->stats,
1038                                 "# UDP, ACK msgs, messages, sent, success",
1039                                 1,
1040                                 GNUNET_NO);
1041       GNUNET_STATISTICS_update (plugin->env->stats,
1042                                 "# UDP, ACK msgs, bytes overhead, sent, success",
1043                                 overhead,
1044                                 GNUNET_NO);
1045       GNUNET_STATISTICS_update (plugin->env->stats,
1046                                 "# UDP, total, bytes overhead, sent",
1047                                 overhead,
1048                                 GNUNET_NO);
1049       break;
1050     default:
1051       GNUNET_break(0);
1052       break;
1053     }
1054     break;
1055   case GNUNET_SYSERR:
1056     switch (udpw->msg_type)
1057     {
1058     case UMT_MSG_UNFRAGMENTED:
1059       /* Unfragmented message: failed to send */
1060       if (NULL != udpw->cont)
1061         udpw->cont (udpw->cont_cls,
1062                     &udpw->session->target,
1063                     result,
1064                     udpw->payload_size,
1065                     overhead);
1066       GNUNET_STATISTICS_update (plugin->env->stats,
1067                                 "# UDP, unfragmented msgs, messages, sent, failure",
1068                                 1,
1069                                 GNUNET_NO);
1070       GNUNET_STATISTICS_update (plugin->env->stats,
1071                                 "# UDP, unfragmented msgs, bytes payload, sent, failure",
1072                                 udpw->payload_size,
1073                                 GNUNET_NO);
1074       GNUNET_STATISTICS_update (plugin->env->stats,
1075                                 "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1076                                 overhead,
1077                                 GNUNET_NO);
1078       break;
1079     case UMT_MSG_FRAGMENTED_COMPLETE:
1080       GNUNET_assert (NULL != udpw->frag_ctx);
1081       if (udpw->frag_ctx->cont != NULL)
1082         udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls,
1083                               &udpw->session->target,
1084                               GNUNET_SYSERR,
1085                               udpw->frag_ctx->payload_size,
1086                               udpw->frag_ctx->on_wire_size);
1087       GNUNET_STATISTICS_update (plugin->env->stats,
1088                                 "# UDP, fragmented msgs, messages, sent, failure",
1089                                 1,
1090                                 GNUNET_NO);
1091       GNUNET_STATISTICS_update (plugin->env->stats,
1092                                 "# UDP, fragmented msgs, bytes payload, sent, failure",
1093                                 udpw->payload_size,
1094                                 GNUNET_NO);
1095       GNUNET_STATISTICS_update (plugin->env->stats,
1096                                 "# UDP, fragmented msgs, bytes payload, sent, failure",
1097                                 overhead,
1098                                 GNUNET_NO);
1099       GNUNET_STATISTICS_update (plugin->env->stats,
1100                                 "# UDP, fragmented msgs, bytes payload, sent, failure",
1101                                 overhead,
1102                                 GNUNET_NO);
1103       GNUNET_STATISTICS_update (plugin->env->stats,
1104                                 "# UDP, fragmented msgs, messages, pending",
1105                                 -1,
1106                                 GNUNET_NO);
1107       break;
1108     case UMT_MSG_FRAGMENTED:
1109       GNUNET_assert (NULL != udpw->frag_ctx);
1110       /* Fragmented message: failed to send */
1111       GNUNET_STATISTICS_update (plugin->env->stats,
1112                                 "# UDP, fragmented msgs, fragments, sent, failure",
1113                                 1,
1114                                 GNUNET_NO);
1115       GNUNET_STATISTICS_update (plugin->env->stats,
1116                                 "# UDP, fragmented msgs, fragments bytes, sent, failure",
1117                                 udpw->msg_size,
1118                                 GNUNET_NO);
1119       break;
1120     case UMT_MSG_ACK:
1121       /* ACK message: failed to send */
1122       GNUNET_STATISTICS_update (plugin->env->stats,
1123                                 "# UDP, ACK msgs, messages, sent, failure",
1124                                 1,
1125                                 GNUNET_NO);
1126       break;
1127     default:
1128       GNUNET_break(0);
1129       break;
1130     }
1131     break;
1132   default:
1133     GNUNET_break(0);
1134     break;
1135   }
1136 }
1137
1138
1139 /**
1140  * Check if the given port is plausible (must be either our listen
1141  * port or our advertised port).  If it is neither, we return
1142  * #GNUNET_SYSERR.
1143  *
1144  * @param plugin global variables
1145  * @param in_port port number to check
1146  * @return #GNUNET_OK if port is either open_port or adv_port
1147  */
1148 static int
1149 check_port (struct Plugin *plugin,
1150             uint16_t in_port)
1151 {
1152   if ((in_port == plugin->port) || (in_port == plugin->aport))
1153     return GNUNET_OK;
1154   return GNUNET_SYSERR;
1155 }
1156
1157
1158 /**
1159  * Function that will be called to check if a binary address for this
1160  * plugin is well-formed and corresponds to an address for THIS peer
1161  * (as per our configuration).  Naturally, if absolutely necessary,
1162  * plugins can be a bit conservative in their answer, but in general
1163  * plugins should make sure that the address does not redirect
1164  * traffic to a 3rd party that might try to man-in-the-middle our
1165  * traffic.
1166  *
1167  * @param cls closure, should be our handle to the Plugin
1168  * @param addr pointer to a `union UdpAddress`
1169  * @param addrlen length of @a addr
1170  * @return #GNUNET_OK if this is a plausible address for this peer
1171  *         and transport, #GNUNET_SYSERR if not
1172  */
1173 static int
1174 udp_plugin_check_address (void *cls,
1175                           const void *addr,
1176                           size_t addrlen)
1177 {
1178   struct Plugin *plugin = cls;
1179   struct IPv4UdpAddress *v4;
1180   struct IPv6UdpAddress *v6;
1181
1182   if ( (addrlen != sizeof(struct IPv4UdpAddress)) &&
1183        (addrlen != sizeof(struct IPv6UdpAddress)) )
1184   {
1185     GNUNET_break_op(0);
1186     return GNUNET_SYSERR;
1187   }
1188   if (addrlen == sizeof(struct IPv4UdpAddress))
1189   {
1190     v4 = (struct IPv4UdpAddress *) addr;
1191     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1192       return GNUNET_SYSERR;
1193     if (GNUNET_OK !=
1194         GNUNET_NAT_test_address (plugin->nat,
1195                                  &v4->ipv4_addr,
1196                                  sizeof (struct in_addr)))
1197       return GNUNET_SYSERR;
1198   }
1199   else
1200   {
1201     v6 = (struct IPv6UdpAddress *) addr;
1202     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1203     {
1204       GNUNET_break_op(0);
1205       return GNUNET_SYSERR;
1206     }
1207     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
1208       return GNUNET_SYSERR;
1209     if (GNUNET_OK !=
1210         GNUNET_NAT_test_address (plugin->nat,
1211                                  &v6->ipv6_addr,
1212                                  sizeof(struct in6_addr)))
1213       return GNUNET_SYSERR;
1214   }
1215   return GNUNET_OK;
1216 }
1217
1218
1219 /**
1220  * Function to free last resources associated with a session.
1221  *
1222  * @param s session to free
1223  */
1224 static void
1225 free_session (struct Session *s)
1226 {
1227   if (NULL != s->frag_ctx)
1228   {
1229     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, NULL, NULL );
1230     GNUNET_free(s->frag_ctx);
1231     s->frag_ctx = NULL;
1232   }
1233   GNUNET_free(s);
1234 }
1235
1236
1237 /**
1238  * Remove a message from the transmission queue.
1239  *
1240  * @param plugin the UDP plugin
1241  * @param udpw message wrapper to queue
1242  */
1243 static void
1244 dequeue (struct Plugin *plugin,
1245          struct UDP_MessageWrapper *udpw)
1246 {
1247   struct Session *session = udpw->session;
1248
1249   if (plugin->bytes_in_buffer < udpw->msg_size)
1250   {
1251     GNUNET_break (0);
1252   }
1253   else
1254   {
1255     GNUNET_STATISTICS_update (plugin->env->stats,
1256                               "# UDP, total, bytes in buffers",
1257                               -(long long) udpw->msg_size,
1258                               GNUNET_NO);
1259     plugin->bytes_in_buffer -= udpw->msg_size;
1260   }
1261   GNUNET_STATISTICS_update (plugin->env->stats,
1262                             "# UDP, total, msgs in buffers",
1263                             -1, GNUNET_NO);
1264   if (udpw->session->address->address_length == sizeof(struct IPv4UdpAddress))
1265     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1266                                  plugin->ipv4_queue_tail,
1267                                  udpw);
1268   else if (udpw->session->address->address_length == sizeof(struct IPv6UdpAddress))
1269     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1270                                  plugin->ipv6_queue_tail,
1271                                  udpw);
1272   else
1273   {
1274     GNUNET_break (0);
1275     return;
1276   }
1277   GNUNET_assert (session->msgs_in_queue > 0);
1278   session->msgs_in_queue--;
1279   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1280   session->bytes_in_queue -= udpw->msg_size;
1281 }
1282
1283
1284 /**
1285  * We have completed our (attempt) to transmit a message
1286  * that had to be fragmented -- either because we got an
1287  * ACK saying that all fragments were received, or because
1288  * of timeout / disconnect.  Clean up our state.
1289  *
1290  * @param fc fragmentation context to clean up
1291  * @param result #GNUNET_OK if we succeeded (got ACK),
1292  *               #GNUNET_SYSERR if the transmission failed
1293  */
1294 static void
1295 fragmented_message_done (struct UDP_FragmentationContext *fc,
1296                          int result)
1297 {
1298   struct Plugin *plugin = fc->plugin;
1299   struct Session *s = fc->session;
1300   struct UDP_MessageWrapper *udpw;
1301   struct UDP_MessageWrapper *tmp;
1302   struct UDP_MessageWrapper dummy;
1303
1304   LOG (GNUNET_ERROR_TYPE_DEBUG,
1305        "%p : Fragmented message removed with result %s\n",
1306        fc,
1307        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1308
1309   /* Call continuation for fragmented message */
1310   memset (&dummy, 0, sizeof(dummy));
1311   dummy.msg_type = UMT_MSG_FRAGMENTED_COMPLETE;
1312   dummy.msg_size = s->frag_ctx->on_wire_size;
1313   dummy.payload_size = s->frag_ctx->payload_size;
1314   dummy.frag_ctx = s->frag_ctx;
1315   dummy.cont = NULL;
1316   dummy.cont_cls = NULL;
1317   dummy.session = s;
1318   call_continuation (&dummy, result);
1319   /* Remove leftover fragments from queue */
1320   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1321   {
1322     udpw = plugin->ipv6_queue_head;
1323     while (NULL != udpw)
1324     {
1325       tmp = udpw->next;
1326       if ( (udpw->frag_ctx != NULL) &&
1327            (udpw->frag_ctx == s->frag_ctx) )
1328       {
1329         dequeue (plugin, udpw);
1330         call_continuation (udpw, GNUNET_SYSERR);
1331         GNUNET_free (udpw);
1332       }
1333       udpw = tmp;
1334     }
1335   }
1336   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1337   {
1338     udpw = plugin->ipv4_queue_head;
1339     while (udpw != NULL )
1340     {
1341       tmp = udpw->next;
1342       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
1343       {
1344         dequeue (plugin, udpw);
1345         call_continuation (udpw, GNUNET_SYSERR);
1346         GNUNET_free(udpw);
1347       }
1348       udpw = tmp;
1349     }
1350   }
1351   notify_session_monitor (s->plugin,
1352                           s,
1353                           GNUNET_TRANSPORT_SS_UPDATE);
1354   /* Destroy fragmentation context */
1355   GNUNET_FRAGMENT_context_destroy (fc->frag,
1356                                    &s->last_expected_msg_delay,
1357                                    &s->last_expected_ack_delay);
1358   s->frag_ctx = NULL;
1359   GNUNET_free (fc);
1360 }
1361
1362
1363 /**
1364  * Scan the heap for a receive context with the given address.
1365  *
1366  * @param cls the `struct FindReceiveContext`
1367  * @param node internal node of the heap
1368  * @param element value stored at the node (a `struct ReceiveContext`)
1369  * @param cost cost associated with the node
1370  * @return #GNUNET_YES if we should continue to iterate,
1371  *         #GNUNET_NO if not.
1372  */
1373 static int
1374 find_receive_context (void *cls,
1375                       struct GNUNET_CONTAINER_HeapNode *node,
1376                       void *element,
1377                       GNUNET_CONTAINER_HeapCostType cost)
1378 {
1379   struct FindReceiveContext *frc = cls;
1380   struct DefragContext *e = element;
1381
1382   if ( (frc->udp_addr_len == e->udp_addr_len) &&
1383        (0 == memcmp (frc->udp_addr,
1384                      e->udp_addr,
1385                      frc->udp_addr_len)) )
1386   {
1387     frc->rc = e;
1388     return GNUNET_NO;
1389   }
1390   return GNUNET_YES;
1391 }
1392
1393
1394 /**
1395  * Functions with this signature are called whenever we need
1396  * to close a session due to a disconnect or failure to
1397  * establish a connection.
1398  *
1399  * @param cls closure with the `struct Plugin`
1400  * @param s session to close down
1401  * @return #GNUNET_OK on success
1402  */
1403 static int
1404 udp_disconnect_session (void *cls,
1405                         struct Session *s)
1406 {
1407   struct Plugin *plugin = cls;
1408   struct UDP_MessageWrapper *udpw;
1409   struct UDP_MessageWrapper *next;
1410   struct FindReceiveContext frc;
1411
1412   GNUNET_assert (GNUNET_YES != s->in_destroy);
1413   LOG (GNUNET_ERROR_TYPE_DEBUG,
1414        "Session %p to peer `%s' address ended\n",
1415        s,
1416        GNUNET_i2s (&s->target),
1417        udp_address_to_string (plugin,
1418                               s->address->address,
1419                               s->address->address_length));
1420   /* stop timeout task */
1421   if (NULL != s->timeout_task)
1422   {
1423     GNUNET_SCHEDULER_cancel (s->timeout_task);
1424     s->timeout_task = NULL;
1425   }
1426   if (NULL != s->frag_ctx)
1427   {
1428     /* Remove fragmented message due to disconnect */
1429     fragmented_message_done (s->frag_ctx,
1430                              GNUNET_SYSERR);
1431   }
1432
1433   frc.rc = NULL;
1434   frc.udp_addr = s->address->address;
1435   frc.udp_addr_len = s->address->address_length;
1436   /* Lookup existing receive context for this address */
1437   if (NULL != plugin->defrag_ctxs)
1438   {
1439     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1440                                    &find_receive_context,
1441                                    &frc);
1442     if (NULL != frc.rc)
1443     {
1444       struct DefragContext *d_ctx = frc.rc;
1445
1446       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
1447       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1448       GNUNET_free (d_ctx);
1449     }
1450   }
1451   next = plugin->ipv4_queue_head;
1452   while (NULL != (udpw = next))
1453   {
1454     next = udpw->next;
1455     if (udpw->session == s)
1456     {
1457       dequeue (plugin, udpw);
1458       call_continuation (udpw, GNUNET_SYSERR);
1459       GNUNET_free(udpw);
1460     }
1461   }
1462   next = plugin->ipv6_queue_head;
1463   while (NULL != (udpw = next))
1464   {
1465     next = udpw->next;
1466     if (udpw->session == s)
1467     {
1468       dequeue (plugin, udpw);
1469       call_continuation (udpw, GNUNET_SYSERR);
1470       GNUNET_free(udpw);
1471     }
1472   }
1473   notify_session_monitor (s->plugin,
1474                           s,
1475                           GNUNET_TRANSPORT_SS_DONE);
1476   plugin->env->session_end (plugin->env->cls,
1477                             s->address,
1478                             s);
1479
1480   if (NULL != s->frag_ctx)
1481   {
1482     if (NULL != s->frag_ctx->cont)
1483     {
1484       s->frag_ctx->cont (s->frag_ctx->cont_cls,
1485                          &s->target,
1486                          GNUNET_SYSERR,
1487                          s->frag_ctx->payload_size,
1488                          s->frag_ctx->on_wire_size);
1489       LOG (GNUNET_ERROR_TYPE_DEBUG,
1490            "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1491            GNUNET_i2s (&s->target));
1492     }
1493   }
1494
1495   GNUNET_assert (GNUNET_YES ==
1496                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
1497                                                        &s->target,
1498                                                        s));
1499   GNUNET_STATISTICS_set (plugin->env->stats,
1500                          "# UDP sessions active",
1501                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
1502                          GNUNET_NO);
1503   if (s->rc > 0)
1504   {
1505     s->in_destroy = GNUNET_YES;
1506   }
1507   else
1508   {
1509     GNUNET_HELLO_address_free (s->address);
1510     free_session (s);
1511   }
1512   return GNUNET_OK;
1513 }
1514
1515
1516 /**
1517  * Function that is called to get the keepalive factor.
1518  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
1519  * calculate the interval between keepalive packets.
1520  *
1521  * @param cls closure with the `struct Plugin`
1522  * @return keepalive factor
1523  */
1524 static unsigned int
1525 udp_query_keepalive_factor (void *cls)
1526 {
1527   return 15;
1528 }
1529
1530
1531 /**
1532  * Destroy a session, plugin is being unloaded.
1533  *
1534  * @param cls the `struct Plugin`
1535  * @param key hash of public key of target peer
1536  * @param value a `struct PeerSession *` to clean up
1537  * @return #GNUNET_OK (continue to iterate)
1538  */
1539 static int
1540 disconnect_and_free_it (void *cls,
1541                         const struct GNUNET_PeerIdentity *key,
1542                         void *value)
1543 {
1544   struct Plugin *plugin = cls;
1545
1546   udp_disconnect_session (plugin, value);
1547   return GNUNET_OK;
1548 }
1549
1550
1551 /**
1552  * Disconnect from a remote node.  Clean up session if we have one for
1553  * this peer.
1554  *
1555  * @param cls closure for this call (should be handle to Plugin)
1556  * @param target the peeridentity of the peer to disconnect
1557  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
1558  */
1559 static void
1560 udp_disconnect (void *cls,
1561                 const struct GNUNET_PeerIdentity *target)
1562 {
1563   struct Plugin *plugin = cls;
1564
1565   LOG (GNUNET_ERROR_TYPE_DEBUG,
1566        "Disconnecting from peer `%s'\n",
1567        GNUNET_i2s (target));
1568   /* Clean up sessions */
1569   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1570                                               target,
1571                                               &disconnect_and_free_it,
1572                                               plugin);
1573 }
1574
1575
1576 /**
1577  * Session was idle, so disconnect it
1578  *
1579  * @param cls the `struct Session` to time out
1580  * @param tc scheduler context
1581  */
1582 static void
1583 session_timeout (void *cls,
1584                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1585 {
1586   struct Session *s = cls;
1587   struct Plugin *plugin = s->plugin;
1588   struct GNUNET_TIME_Relative left;
1589
1590   s->timeout_task = NULL;
1591   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
1592   if (left.rel_value_us > 0)
1593   {
1594     /* not actually our turn yet, but let's at least update
1595        the monitor, it may think we're about to die ... */
1596     notify_session_monitor (s->plugin,
1597                             s,
1598                             GNUNET_TRANSPORT_SS_UPDATE);
1599     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
1600                                                     &session_timeout,
1601                                                     s);
1602     return;
1603   }
1604   LOG (GNUNET_ERROR_TYPE_DEBUG,
1605        "Session %p was idle for %s, disconnecting\n",
1606        s,
1607        GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
1608                                                GNUNET_YES));
1609   /* call session destroy function */
1610   udp_disconnect_session (plugin, s);
1611 }
1612
1613
1614 /**
1615  * Increment session timeout due to activity
1616  *
1617  * @param s session to reschedule timeout activity for
1618  */
1619 static void
1620 reschedule_session_timeout (struct Session *s)
1621 {
1622   if (GNUNET_YES == s->in_destroy)
1623     return;
1624   GNUNET_assert(NULL != s->timeout_task);
1625   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1626 }
1627
1628
1629 /**
1630  * Function obtain the network type for a session
1631  *
1632  * @param cls closure (`struct Plugin *`)
1633  * @param session the session
1634  * @return the network type
1635  */
1636 static enum GNUNET_ATS_Network_Type
1637 udp_get_network (void *cls,
1638                  struct Session *session)
1639 {
1640   return session->scope;
1641 }
1642
1643
1644 /**
1645  * Closure for #session_cmp_it().
1646  */
1647 struct SessionCompareContext
1648 {
1649   /**
1650    * Set to session matching the address.
1651    */
1652   struct Session *res;
1653
1654   /**
1655    * Address we are looking for.
1656    */
1657   const struct GNUNET_HELLO_Address *address;
1658 };
1659
1660
1661 /**
1662  * Find a session with a matching address.
1663  *
1664  * @param cls the `struct SessionCompareContext *`
1665  * @param key peer identity (unused)
1666  * @param value the `struct Session *`
1667  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1668  */
1669 static int
1670 session_cmp_it (void *cls,
1671                 const struct GNUNET_PeerIdentity *key,
1672                 void *value)
1673 {
1674   struct SessionCompareContext *cctx = cls;
1675   struct Session *s = value;
1676
1677   if (0 == GNUNET_HELLO_address_cmp (s->address,
1678                                      cctx->address))
1679   {
1680     cctx->res = s;
1681     return GNUNET_NO;
1682   }
1683   return GNUNET_YES;
1684 }
1685
1686
1687 /**
1688  * Locate an existing session the transport service is using to
1689  * send data to another peer.  Performs some basic sanity checks
1690  * on the address and then tries to locate a matching session.
1691  *
1692  * @param cls the plugin
1693  * @param address the address we should locate the session by
1694  * @return the session if it exists, or NULL if it is not found
1695  */
1696 static struct Session *
1697 udp_plugin_lookup_session (void *cls,
1698                            const struct GNUNET_HELLO_Address *address)
1699 {
1700   struct Plugin *plugin = cls;
1701   const struct IPv6UdpAddress *udp_a6;
1702   const struct IPv4UdpAddress *udp_a4;
1703   struct SessionCompareContext cctx;
1704
1705   if ( (NULL == address->address) ||
1706        ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1707         (address->address_length != sizeof (struct IPv6UdpAddress))))
1708   {
1709     LOG (GNUNET_ERROR_TYPE_WARNING,
1710          "Trying to locate session for address of unexpected length %u (should be %u or %u)\n",
1711          address->address_length,
1712          sizeof (struct IPv4UdpAddress),
1713          sizeof (struct IPv6UdpAddress));
1714     return NULL;
1715   }
1716
1717   if (address->address_length == sizeof(struct IPv4UdpAddress))
1718   {
1719     if (NULL == plugin->sockv4)
1720       return NULL;
1721     udp_a4 = (const struct IPv4UdpAddress *) address->address;
1722     if (0 == udp_a4->u4_port)
1723       return NULL;
1724   }
1725
1726   if (address->address_length == sizeof(struct IPv6UdpAddress))
1727   {
1728     if (NULL == plugin->sockv6)
1729       return NULL;
1730     udp_a6 = (const struct IPv6UdpAddress *) address->address;
1731     if (0 == udp_a6->u6_port)
1732       return NULL;
1733   }
1734
1735   /* check if session already exists */
1736   cctx.address = address;
1737   cctx.res = NULL;
1738   LOG (GNUNET_ERROR_TYPE_DEBUG,
1739        "Looking for existing session for peer `%s' `%s' \n",
1740        GNUNET_i2s (&address->peer),
1741        udp_address_to_string (plugin,
1742                               address->address,
1743                               address->address_length));
1744   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1745                                               &address->peer,
1746                                               &session_cmp_it,
1747                                               &cctx);
1748   if (NULL != cctx.res)
1749   {
1750     LOG (GNUNET_ERROR_TYPE_DEBUG,
1751          "Found existing session %p\n",
1752          cctx.res);
1753     return cctx.res;
1754   }
1755   return NULL;
1756 }
1757
1758
1759 /**
1760  * Allocate a new session for the given endpoint address.
1761  * Note that this function does not inform the service
1762  * of the new session, this is the responsibility of the
1763  * caller (if needed).
1764  *
1765  * @param cls the `struct Plugin`
1766  * @param address address of the other peer to use
1767  * @param network_type network type the address belongs to
1768  * @return NULL on error, otherwise session handle
1769  */
1770 static struct Session *
1771 udp_plugin_create_session (void *cls,
1772                            const struct GNUNET_HELLO_Address *address,
1773                            enum GNUNET_ATS_Network_Type network_type)
1774 {
1775   struct Plugin *plugin = cls;
1776   struct Session *s;
1777
1778   s = GNUNET_new (struct Session);
1779   s->plugin = plugin;
1780   s->address = GNUNET_HELLO_address_copy (address);
1781   s->target = address->peer;
1782   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1783                                                               250);
1784   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1785   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1786   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1787   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1788   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
1789                                                   &session_timeout, s);
1790   s->scope = network_type;
1791
1792   LOG (GNUNET_ERROR_TYPE_DEBUG,
1793        "Creating new session %p for peer `%s' address `%s'\n",
1794        s,
1795        GNUNET_i2s (&address->peer),
1796        udp_address_to_string (plugin,
1797                               address->address,
1798                               address->address_length));
1799   GNUNET_assert(GNUNET_OK ==
1800                 GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
1801                                                    &s->target,
1802                                                    s,
1803                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1804   GNUNET_STATISTICS_set (plugin->env->stats,
1805                          "# UDP sessions active",
1806                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
1807                          GNUNET_NO);
1808   return s;
1809 }
1810
1811
1812 /**
1813  * Function that will be called whenever the transport service wants to
1814  * notify the plugin that a session is still active and in use and
1815  * therefore the session timeout for this session has to be updated
1816  *
1817  * @param cls closure with the `struct Plugin`
1818  * @param peer which peer was the session for
1819  * @param session which session is being updated
1820  */
1821 static void
1822 udp_plugin_update_session_timeout (void *cls,
1823                                    const struct GNUNET_PeerIdentity *peer,
1824                                    struct Session *session)
1825 {
1826   struct Plugin *plugin = cls;
1827
1828   if (GNUNET_YES !=
1829       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1830                                                     peer,
1831                                                     session))
1832   {
1833     GNUNET_break(0);
1834     return;
1835   }
1836   /* Reschedule session timeout */
1837   reschedule_session_timeout (session);
1838 }
1839
1840
1841 /**
1842  * Creates a new outbound session the transport service will use to
1843  * send data to the peer.
1844  *
1845  * @param cls the `struct Plugin *`
1846  * @param address the address
1847  * @return the session or NULL of max connections exceeded
1848  */
1849 static struct Session *
1850 udp_plugin_get_session (void *cls,
1851                         const struct GNUNET_HELLO_Address *address)
1852 {
1853   struct Plugin *plugin = cls;
1854   struct Session *s;
1855   enum GNUNET_ATS_Network_Type network_type;
1856   struct IPv4UdpAddress *udp_v4;
1857   struct IPv6UdpAddress *udp_v6;
1858
1859   if (NULL == address)
1860   {
1861     GNUNET_break (0);
1862     return NULL;
1863   }
1864   if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
1865        (address->address_length != sizeof(struct IPv6UdpAddress)) )
1866   {
1867     GNUNET_break_op (0);
1868     return NULL;
1869   }
1870   if (NULL != (s = udp_plugin_lookup_session (cls,
1871                                               address)))
1872     return s;
1873
1874   /* need to create new session */
1875   if (sizeof (struct IPv4UdpAddress) == address->address_length)
1876   {
1877     struct sockaddr_in v4;
1878
1879     udp_v4 = (struct IPv4UdpAddress *) address->address;
1880     memset (&v4, '\0', sizeof (v4));
1881     v4.sin_family = AF_INET;
1882 #if HAVE_SOCKADDR_IN_SIN_LEN
1883     v4.sin_len = sizeof (struct sockaddr_in);
1884 #endif
1885     v4.sin_port = udp_v4->u4_port;
1886     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
1887     network_type = plugin->env->get_address_type (plugin->env->cls,
1888                                                   (const struct sockaddr *) &v4,
1889                                                   sizeof (v4));
1890   }
1891   if (sizeof (struct IPv6UdpAddress) == address->address_length)
1892   {
1893     struct sockaddr_in6 v6;
1894
1895     udp_v6 = (struct IPv6UdpAddress *) address->address;
1896     memset (&v6, '\0', sizeof (v6));
1897     v6.sin6_family = AF_INET6;
1898 #if HAVE_SOCKADDR_IN_SIN_LEN
1899     v6.sin6_len = sizeof (struct sockaddr_in6);
1900 #endif
1901     v6.sin6_port = udp_v6->u6_port;
1902     v6.sin6_addr = udp_v6->ipv6_addr;
1903     network_type = plugin->env->get_address_type (plugin->env->cls,
1904                                                   (const struct sockaddr *) &v6,
1905                                                   sizeof (v6));
1906   }
1907   return udp_plugin_create_session (cls,
1908                                     address,
1909                                     network_type);
1910 }
1911
1912
1913 /**
1914  * Enqueue a message for transmission.
1915  *
1916  * @param plugin the UDP plugin
1917  * @param udpw message wrapper to queue
1918  */
1919 static void
1920 enqueue (struct Plugin *plugin,
1921          struct UDP_MessageWrapper *udpw)
1922 {
1923   struct Session *session = udpw->session;
1924
1925   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1926   {
1927     GNUNET_break (0);
1928   }
1929   else
1930   {
1931     GNUNET_STATISTICS_update (plugin->env->stats,
1932                               "# UDP, total, bytes in buffers",
1933                               udpw->msg_size,
1934                               GNUNET_NO);
1935     plugin->bytes_in_buffer += udpw->msg_size;
1936   }
1937   GNUNET_STATISTICS_update (plugin->env->stats,
1938                             "# UDP, total, msgs in buffers",
1939                             1, GNUNET_NO);
1940   if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
1941     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1942                                 plugin->ipv4_queue_tail,
1943                                 udpw);
1944   else if (udpw->session->address->address_length == sizeof (struct IPv6UdpAddress))
1945     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1946                                  plugin->ipv6_queue_tail,
1947                                  udpw);
1948   else
1949   {
1950     GNUNET_break (0);
1951     return;
1952   }
1953   session->msgs_in_queue++;
1954   session->bytes_in_queue += udpw->msg_size;
1955 }
1956
1957
1958 /**
1959  * Fragment message was transmitted via UDP, let fragmentation know
1960  * to send the next fragment now.
1961  *
1962  * @param cls the `struct UDPMessageWrapper *` of the fragment
1963  * @param target destination peer (ignored)
1964  * @param result #GNUNET_OK on success (ignored)
1965  * @param payload bytes payload sent
1966  * @param physical bytes physical sent
1967  */
1968 static void
1969 send_next_fragment (void *cls,
1970                     const struct GNUNET_PeerIdentity *target,
1971                     int result,
1972                     size_t payload,
1973                     size_t physical)
1974 {
1975   struct UDP_MessageWrapper *udpw = cls;
1976
1977   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1978 }
1979
1980
1981 /**
1982  * Function that is called with messages created by the fragmentation
1983  * module.  In the case of the 'proc' callback of the
1984  * #GNUNET_FRAGMENT_context_create() function, this function must
1985  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1986  *
1987  * @param cls closure, the 'struct FragmentationContext'
1988  * @param msg the message that was created
1989  */
1990 static void
1991 enqueue_fragment (void *cls,
1992                   const struct GNUNET_MessageHeader *msg)
1993 {
1994   struct UDP_FragmentationContext *frag_ctx = cls;
1995   struct Plugin *plugin = frag_ctx->plugin;
1996   struct UDP_MessageWrapper * udpw;
1997   size_t msg_len = ntohs (msg->size);
1998
1999   LOG (GNUNET_ERROR_TYPE_DEBUG,
2000        "Enqueuing fragment with %u bytes\n",
2001        msg_len);
2002   frag_ctx->fragments_used++;
2003   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
2004   udpw->session = frag_ctx->session;
2005   udpw->msg_buf = (char *) &udpw[1];
2006   udpw->msg_size = msg_len;
2007   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
2008   udpw->cont = &send_next_fragment;
2009   udpw->cont_cls = udpw;
2010   udpw->timeout = frag_ctx->timeout;
2011   udpw->frag_ctx = frag_ctx;
2012   udpw->msg_type = UMT_MSG_FRAGMENTED;
2013   memcpy (udpw->msg_buf, msg, msg_len);
2014   enqueue (plugin, udpw);
2015   schedule_select (plugin);
2016 }
2017
2018
2019 /**
2020  * Function that can be used by the transport service to transmit
2021  * a message using the plugin.   Note that in the case of a
2022  * peer disconnecting, the continuation MUST be called
2023  * prior to the disconnect notification itself.  This function
2024  * will be called with this peer's HELLO message to initiate
2025  * a fresh connection to another peer.
2026  *
2027  * @param cls closure
2028  * @param s which session must be used
2029  * @param msgbuf the message to transmit
2030  * @param msgbuf_size number of bytes in 'msgbuf'
2031  * @param priority how important is the message (most plugins will
2032  *                 ignore message priority and just FIFO)
2033  * @param to how long to wait at most for the transmission (does not
2034  *                require plugins to discard the message after the timeout,
2035  *                just advisory for the desired delay; most plugins will ignore
2036  *                this as well)
2037  * @param cont continuation to call once the message has
2038  *        been transmitted (or if the transport is ready
2039  *        for the next transmission call; or if the
2040  *        peer disconnected...); can be NULL
2041  * @param cont_cls closure for cont
2042  * @return number of bytes used (on the physical network, with overheads);
2043  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
2044  *         and does NOT mean that the message was not transmitted (DV)
2045  */
2046 static ssize_t
2047 udp_plugin_send (void *cls,
2048                  struct Session *s,
2049                  const char *msgbuf,
2050                  size_t msgbuf_size,
2051                  unsigned int priority,
2052                  struct GNUNET_TIME_Relative to,
2053                  GNUNET_TRANSPORT_TransmitContinuation cont,
2054                  void *cont_cls)
2055 {
2056   struct Plugin *plugin = cls;
2057   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
2058   struct UDP_FragmentationContext * frag_ctx;
2059   struct UDP_MessageWrapper * udpw;
2060   struct UDPMessage *udp;
2061   char mbuf[udpmlen];
2062
2063   if ( (s->address->address_length == sizeof(struct IPv6UdpAddress)) &&
2064        (plugin->sockv6 == NULL) )
2065     return GNUNET_SYSERR;
2066   if ( (s->address->address_length == sizeof(struct IPv4UdpAddress)) &&
2067        (plugin->sockv4 == NULL) )
2068     return GNUNET_SYSERR;
2069   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2070   {
2071     GNUNET_break(0);
2072     return GNUNET_SYSERR;
2073   }
2074   if (GNUNET_YES !=
2075       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
2076                                                     &s->target,
2077                                                     s))
2078   {
2079     GNUNET_break(0);
2080     return GNUNET_SYSERR;
2081   }
2082   LOG (GNUNET_ERROR_TYPE_DEBUG,
2083        "UDP transmits %u-byte message to `%s' using address `%s'\n",
2084        udpmlen,
2085        GNUNET_i2s (&s->target),
2086        udp_address_to_string (plugin,
2087                               s->address->address,
2088                               s->address->address_length));
2089
2090   /* Message */
2091   udp = (struct UDPMessage *) mbuf;
2092   udp->header.size = htons (udpmlen);
2093   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2094   udp->reserved = htonl (0);
2095   udp->sender = *plugin->env->my_identity;
2096
2097   /* We do not update the session time out here!
2098    * Otherwise this session will not timeout since we send keep alive before
2099    * session can timeout
2100    *
2101    * For UDP we update session timeout only on receive, this will cover keep
2102    * alives, since remote peer will reply with keep alive response!
2103    */
2104   if (udpmlen <= UDP_MTU)
2105   {
2106     /* unfragmented message */
2107     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
2108     udpw->session = s;
2109     udpw->msg_buf = (char *) &udpw[1];
2110     udpw->msg_size = udpmlen; /* message size with UDP overhead */
2111     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2112     udpw->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), to);
2113     udpw->cont = cont;
2114     udpw->cont_cls = cont_cls;
2115     udpw->frag_ctx = NULL;
2116     udpw->msg_type = UMT_MSG_UNFRAGMENTED;
2117     memcpy (udpw->msg_buf, udp, sizeof(struct UDPMessage));
2118     memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)], msgbuf, msgbuf_size);
2119     enqueue (plugin, udpw);
2120
2121     GNUNET_STATISTICS_update (plugin->env->stats,
2122         "# UDP, unfragmented msgs, messages, attempt", 1, GNUNET_NO);
2123     GNUNET_STATISTICS_update (plugin->env->stats,
2124                               "# UDP, unfragmented msgs, bytes payload, attempt",
2125                               udpw->payload_size,
2126                               GNUNET_NO);
2127   }
2128   else
2129   {
2130     /* fragmented message */
2131     if (s->frag_ctx != NULL)
2132       return GNUNET_SYSERR;
2133     memcpy (&udp[1], msgbuf, msgbuf_size);
2134     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2135     frag_ctx->plugin = plugin;
2136     frag_ctx->session = s;
2137     frag_ctx->cont = cont;
2138     frag_ctx->cont_cls = cont_cls;
2139     frag_ctx->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
2140         to);
2141     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
2142     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
2143     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2144                                                      UDP_MTU,
2145                                                      &plugin->tracker,
2146                                                      s->last_expected_msg_delay,
2147                                                      s->last_expected_ack_delay,
2148                                                      &udp->header,
2149                                                      &enqueue_fragment,
2150                                                      frag_ctx);
2151     s->frag_ctx = frag_ctx;
2152     GNUNET_STATISTICS_update (plugin->env->stats,
2153                               "# UDP, fragmented msgs, messages, pending",
2154                               1,
2155                               GNUNET_NO);
2156     GNUNET_STATISTICS_update (plugin->env->stats,
2157                               "# UDP, fragmented msgs, messages, attempt",
2158                               1,
2159                               GNUNET_NO);
2160     GNUNET_STATISTICS_update (plugin->env->stats,
2161                               "# UDP, fragmented msgs, bytes payload, attempt",
2162                               frag_ctx->payload_size,
2163                               GNUNET_NO);
2164   }
2165   notify_session_monitor (s->plugin,
2166                           s,
2167                           GNUNET_TRANSPORT_SS_UPDATE);
2168   schedule_select (plugin);
2169   return udpmlen;
2170 }
2171
2172
2173 /**
2174  * Our external IP address/port mapping has changed.
2175  *
2176  * @param cls closure, the `struct LocalAddrList`
2177  * @param add_remove #GNUNET_YES to mean the new public IP address, #GNUNET_NO to mean
2178  *     the previous (now invalid) one
2179  * @param addr either the previous or the new public IP address
2180  * @param addrlen actual lenght of the address
2181  */
2182 static void
2183 udp_nat_port_map_callback (void *cls,
2184                            int add_remove,
2185                            const struct sockaddr *addr,
2186                            socklen_t addrlen)
2187 {
2188   struct Plugin *plugin = cls;
2189   struct GNUNET_HELLO_Address *address;
2190   struct IPv4UdpAddress u4;
2191   struct IPv6UdpAddress u6;
2192   void *arg;
2193   size_t args;
2194
2195   LOG (GNUNET_ERROR_TYPE_INFO,
2196        "NAT notification to %s address `%s'\n",
2197        (GNUNET_YES == add_remove) ? "add" : "remove",
2198        GNUNET_a2s (addr, addrlen));
2199
2200   /* convert 'address' to our internal format */
2201   switch (addr->sa_family)
2202   {
2203   case AF_INET:
2204     GNUNET_assert(addrlen == sizeof(struct sockaddr_in));
2205     memset (&u4, 0, sizeof(u4));
2206     u4.options = htonl (plugin->myoptions);
2207     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
2208     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
2209     if (0 == ((struct sockaddr_in *) addr)->sin_port)
2210       return;
2211     arg = &u4;
2212     args = sizeof(struct IPv4UdpAddress);
2213     break;
2214   case AF_INET6:
2215     GNUNET_assert(addrlen == sizeof(struct sockaddr_in6));
2216     memset (&u6, 0, sizeof(u6));
2217     u6.options = htonl (plugin->myoptions);
2218     if (0 == ((struct sockaddr_in6 *) addr)->sin6_port)
2219       return;
2220     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
2221         sizeof(struct in6_addr));
2222     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
2223     arg = &u6;
2224     args = sizeof(struct IPv6UdpAddress);
2225     break;
2226   default:
2227     GNUNET_break(0);
2228     return;
2229   }
2230   /* modify our published address list */
2231   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
2232                                            PLUGIN_NAME,
2233                                            arg, args,
2234                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2235   plugin->env->notify_address (plugin->env->cls, add_remove, address);
2236   GNUNET_HELLO_address_free (address);
2237 }
2238
2239
2240 /**
2241  * Message tokenizer has broken up an incomming message. Pass it on
2242  * to the service.
2243  *
2244  * @param cls the `struct Plugin *`
2245  * @param client the `struct SourceInformation *`
2246  * @param hdr the actual message
2247  * @return #GNUNET_OK (always)
2248  */
2249 static int
2250 process_inbound_tokenized_messages (void *cls,
2251                                     void *client,
2252                                     const struct GNUNET_MessageHeader *hdr)
2253 {
2254   struct Plugin *plugin = cls;
2255   struct SourceInformation *si = client;
2256   struct GNUNET_TIME_Relative delay;
2257
2258   GNUNET_assert (NULL != si->session);
2259   if (GNUNET_YES == si->session->in_destroy)
2260     return GNUNET_OK;
2261   /* setup ATS */
2262   reschedule_session_timeout (si->session);
2263   delay = plugin->env->receive (plugin->env->cls,
2264                                 si->session->address,
2265                                 si->session,
2266                                 hdr);
2267   si->session->flow_delay_for_other_peer = delay;
2268   return GNUNET_OK;
2269 }
2270
2271
2272 /**
2273  * We've received a UDP Message.  Process it (pass contents to main service).
2274  *
2275  * @param plugin plugin context
2276  * @param msg the message
2277  * @param udp_addr sender address
2278  * @param udp_addr_len number of bytes in @a udp_addr
2279  * @param network_type network type the address belongs to
2280  */
2281 static void
2282 process_udp_message (struct Plugin *plugin,
2283                      const struct UDPMessage *msg,
2284                      const union UdpAddress *udp_addr,
2285                      size_t udp_addr_len,
2286                      enum GNUNET_ATS_Network_Type network_type)
2287 {
2288   struct SourceInformation si;
2289   struct Session *s;
2290   struct GNUNET_HELLO_Address *address;
2291
2292   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2293   if (0 != ntohl (msg->reserved))
2294   {
2295     GNUNET_break_op(0);
2296     return;
2297   }
2298   if (ntohs (msg->header.size)
2299       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2300   {
2301     GNUNET_break_op(0);
2302     return;
2303   }
2304
2305   address = GNUNET_HELLO_address_allocate (&msg->sender,
2306                                            PLUGIN_NAME,
2307                                            udp_addr,
2308                                            udp_addr_len,
2309                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2310   if (NULL ==
2311       (s = udp_plugin_lookup_session (plugin, address)))
2312   {
2313     s = udp_plugin_create_session (plugin,
2314                                    address,
2315                                    network_type);
2316     plugin->env->session_start (plugin->env->cls,
2317                                 address,
2318                                 s,
2319                                 s->scope);
2320     notify_session_monitor (s->plugin,
2321                             s,
2322                             GNUNET_TRANSPORT_SS_INIT);
2323     notify_session_monitor (s->plugin,
2324                             s,
2325                             GNUNET_TRANSPORT_SS_UP);
2326   }
2327   GNUNET_free (address);
2328
2329   /* iterate over all embedded messages */
2330   si.session = s;
2331   si.sender = msg->sender;
2332   s->rc++;
2333   GNUNET_SERVER_mst_receive (plugin->mst,
2334                              &si,
2335                              (const char *) &msg[1],
2336                              ntohs (msg->header.size) - sizeof(struct UDPMessage),
2337                              GNUNET_YES,
2338                              GNUNET_NO);
2339   s->rc--;
2340   if ((0 == s->rc) && (GNUNET_YES == s->in_destroy))
2341     free_session (s);
2342 }
2343
2344
2345 /**
2346  * Process a defragmented message.
2347  *
2348  * @param cls the `struct DefragContext *`
2349  * @param msg the message
2350  */
2351 static void
2352 fragment_msg_proc (void *cls,
2353                    const struct GNUNET_MessageHeader *msg)
2354 {
2355   struct DefragContext *rc = cls;
2356   const struct UDPMessage *um;
2357
2358   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2359   {
2360     GNUNET_break(0);
2361     return;
2362   }
2363   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2364   {
2365     GNUNET_break(0);
2366     return;
2367   }
2368   um = (const struct UDPMessage *) msg;
2369   rc->sender = um->sender;
2370   rc->have_sender = GNUNET_YES;
2371   process_udp_message (rc->plugin,
2372                        um,
2373                        rc->udp_addr,
2374                        rc->udp_addr_len,
2375                        rc->network_type);
2376 }
2377
2378
2379 /**
2380  * Transmit an acknowledgement.
2381  *
2382  * @param cls the `struct DefragContext *`
2383  * @param id message ID (unused)
2384  * @param msg ack to transmit
2385  */
2386 static void
2387 ack_proc (void *cls,
2388           uint32_t id,
2389           const struct GNUNET_MessageHeader *msg)
2390 {
2391   struct DefragContext *rc = cls;
2392   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2393   struct UDP_ACK_Message *udp_ack;
2394   uint32_t delay = 0;
2395   struct UDP_MessageWrapper *udpw;
2396   struct Session *s;
2397   struct GNUNET_HELLO_Address *address;
2398
2399   if (GNUNET_NO == rc->have_sender)
2400   {
2401     /* tried to defragment but never succeeded, hence will not ACK */
2402     GNUNET_break_op (0);
2403     return;
2404   }
2405   address = GNUNET_HELLO_address_allocate (&rc->sender,
2406                                            PLUGIN_NAME,
2407                                            rc->udp_addr,
2408                                            rc->udp_addr_len,
2409                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2410   s = udp_plugin_lookup_session (rc->plugin,
2411                                  address);
2412   GNUNET_HELLO_address_free (address);
2413   if (NULL == s)
2414   {
2415     LOG (GNUNET_ERROR_TYPE_ERROR,
2416          "Trying to transmit ACK to peer `%s' but no session found!\n",
2417          udp_address_to_string (rc->plugin,
2418                                 rc->udp_addr,
2419                                 rc->udp_addr_len));
2420     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2421     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2422     GNUNET_free (rc);
2423     return;
2424   }
2425   if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
2426     delay = s->flow_delay_for_other_peer.rel_value_us;
2427
2428   LOG (GNUNET_ERROR_TYPE_DEBUG,
2429        "Sending ACK to `%s' including delay of %s\n",
2430        udp_address_to_string (rc->plugin,
2431                               rc->udp_addr,
2432                               rc->udp_addr_len),
2433        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2434                                                GNUNET_YES));
2435   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2436   udpw->msg_size = msize;
2437   udpw->payload_size = 0;
2438   udpw->session = s;
2439   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2440   udpw->msg_buf = (char *) &udpw[1];
2441   udpw->msg_type = UMT_MSG_ACK;
2442   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2443   udp_ack->header.size = htons ((uint16_t) msize);
2444   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2445   udp_ack->delay = htonl (delay);
2446   udp_ack->sender = *rc->plugin->env->my_identity;
2447   memcpy (&udp_ack[1], msg, ntohs (msg->size));
2448   enqueue (rc->plugin, udpw);
2449   notify_session_monitor (s->plugin,
2450                           s,
2451                           GNUNET_TRANSPORT_SS_UPDATE);
2452   schedule_select (rc->plugin);
2453 }
2454
2455
2456 /**
2457  * Handle an ACK message.
2458  *
2459  * @param plugin the UDP plugin
2460  * @param msg the (presumed) UDP ACK message
2461  * @param udp_addr sender address
2462  * @param udp_addr_len number of bytes in @a udp_addr
2463  */
2464 static void
2465 read_process_ack (struct Plugin *plugin,
2466                   const struct GNUNET_MessageHeader *msg,
2467                   const union UdpAddress *udp_addr,
2468                   socklen_t udp_addr_len)
2469 {
2470   const struct GNUNET_MessageHeader *ack;
2471   const struct UDP_ACK_Message *udp_ack;
2472   struct GNUNET_HELLO_Address *address;
2473   struct Session *s;
2474   struct GNUNET_TIME_Relative flow_delay;
2475
2476   if (ntohs (msg->size)
2477       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2478   {
2479     GNUNET_break_op(0);
2480     return;
2481   }
2482   udp_ack = (const struct UDP_ACK_Message *) msg;
2483   address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
2484                                            PLUGIN_NAME,
2485                                            udp_addr,
2486                                            udp_addr_len,
2487                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2488   s = udp_plugin_lookup_session (plugin,
2489                                  address);
2490   if (NULL == s)
2491   {
2492     LOG (GNUNET_ERROR_TYPE_WARNING,
2493          "UDP session of address %s for ACK not found\n",
2494          udp_address_to_string (plugin,
2495                                 address->address,
2496                                 address->address_length));
2497     GNUNET_HELLO_address_free (address);
2498     return;
2499   }
2500   if (NULL == s->frag_ctx)
2501   {
2502     LOG (GNUNET_ERROR_TYPE_WARNING,
2503          "Fragmentation context of address %s for ACK not found\n",
2504          udp_address_to_string (plugin,
2505                                 address->address,
2506                                 address->address_length));
2507     GNUNET_HELLO_address_free (address);
2508     return;
2509   }
2510   GNUNET_HELLO_address_free (address);
2511
2512   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2513   LOG (GNUNET_ERROR_TYPE_DEBUG,
2514        "We received a sending delay of %s for %s\n",
2515        GNUNET_STRINGS_relative_time_to_string (flow_delay,
2516                                                GNUNET_YES),
2517        GNUNET_i2s (&udp_ack->sender));
2518   s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
2519
2520   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2521   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2522   {
2523     GNUNET_break_op(0);
2524     return;
2525   }
2526
2527   if (GNUNET_OK !=
2528       GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2529                                    ack))
2530   {
2531     LOG(GNUNET_ERROR_TYPE_DEBUG,
2532         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2533         (unsigned int ) ntohs (msg->size),
2534         GNUNET_i2s (&udp_ack->sender),
2535         udp_address_to_string (plugin,
2536                                udp_addr,
2537                                udp_addr_len));
2538     /* Expect more ACKs to arrive */
2539     return;
2540   }
2541
2542   LOG (GNUNET_ERROR_TYPE_DEBUG,
2543        "Message from %s at %s full ACK'ed\n",
2544        GNUNET_i2s (&udp_ack->sender),
2545        udp_address_to_string (plugin,
2546                               udp_addr,
2547                               udp_addr_len));
2548
2549   /* Remove fragmented message after successful sending */
2550   fragmented_message_done (s->frag_ctx,
2551                            GNUNET_OK);
2552 }
2553
2554
2555 /**
2556  * We received a fragment, process it.
2557  *
2558  * @param plugin our plugin
2559  * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2560  * @param udp_addr sender address
2561  * @param udp_addr_len number of bytes in @a udp_addr
2562  * @param network_type network type the address belongs to
2563  */
2564 static void
2565 read_process_fragment (struct Plugin *plugin,
2566                        const struct GNUNET_MessageHeader *msg,
2567                        const union UdpAddress *udp_addr,
2568                        size_t udp_addr_len,
2569                        enum GNUNET_ATS_Network_Type network_type)
2570 {
2571   struct DefragContext *d_ctx;
2572   struct GNUNET_TIME_Absolute now;
2573   struct FindReceiveContext frc;
2574
2575   frc.rc = NULL;
2576   frc.udp_addr = udp_addr;
2577   frc.udp_addr_len = udp_addr_len;
2578
2579   /* Lookup existing receive context for this address */
2580   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2581                                  &find_receive_context,
2582                                  &frc);
2583   now = GNUNET_TIME_absolute_get ();
2584   d_ctx = frc.rc;
2585
2586   if (NULL == d_ctx)
2587   {
2588     /* Create a new defragmentation context */
2589     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
2590     memcpy (&d_ctx[1],
2591             udp_addr,
2592             udp_addr_len);
2593     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
2594     d_ctx->udp_addr_len = udp_addr_len;
2595     d_ctx->network_type = network_type;
2596     d_ctx->plugin = plugin;
2597     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2598                                                       UDP_MTU,
2599                                                       UDP_MAX_MESSAGES_IN_DEFRAG,
2600                                                       d_ctx,
2601                                                       &fragment_msg_proc,
2602                                                       &ack_proc);
2603     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2604                                                  d_ctx,
2605         (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2606     LOG (GNUNET_ERROR_TYPE_DEBUG,
2607          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2608          (unsigned int ) ntohs (msg->size),
2609          udp_address_to_string (plugin,
2610                                 udp_addr,
2611                                 udp_addr_len));
2612   }
2613   else
2614   {
2615     LOG (GNUNET_ERROR_TYPE_DEBUG,
2616          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2617          (unsigned int ) ntohs (msg->size),
2618          udp_address_to_string (plugin,
2619                                 udp_addr,
2620                                 udp_addr_len));
2621   }
2622
2623   if (GNUNET_OK ==
2624       GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2625   {
2626     /* keep this 'rc' from expiring */
2627     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
2628                                        d_ctx->hnode,
2629         (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2630   }
2631   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2632       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2633   {
2634     /* remove 'rc' that was inactive the longest */
2635     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2636     GNUNET_assert (NULL != d_ctx);
2637     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2638     GNUNET_free (d_ctx);
2639   }
2640 }
2641
2642
2643 /**
2644  * Read and process a message from the given socket.
2645  *
2646  * @param plugin the overall plugin
2647  * @param rsock socket to read from
2648  */
2649 static void
2650 udp_select_read (struct Plugin *plugin,
2651                  struct GNUNET_NETWORK_Handle *rsock)
2652 {
2653   socklen_t fromlen;
2654   struct sockaddr_storage addr;
2655   char buf[65536] GNUNET_ALIGN;
2656   ssize_t size;
2657   const struct GNUNET_MessageHeader *msg;
2658   struct IPv4UdpAddress v4;
2659   struct IPv6UdpAddress v6;
2660   const struct sockaddr *sa;
2661   const struct sockaddr_in *sa4;
2662   const struct sockaddr_in6 *sa6;
2663   const union UdpAddress *int_addr;
2664   size_t int_addr_len;
2665   enum GNUNET_ATS_Network_Type network_type;
2666
2667   fromlen = sizeof(addr);
2668   memset (&addr, 0, sizeof(addr));
2669   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof(buf),
2670                                          (struct sockaddr *) &addr, &fromlen);
2671   sa = (const struct sockaddr *) &addr;
2672 #if MINGW
2673   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2674    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
2675    * on this socket has failed.
2676    * Quote from MSDN:
2677    *   WSAECONNRESET - The virtual circuit was reset by the remote side
2678    *   executing a hard or abortive close. The application should close
2679    *   the socket; it is no longer usable. On a UDP-datagram socket this
2680    *   error indicates a previous send operation resulted in an ICMP Port
2681    *   Unreachable message.
2682    */
2683   if ( (-1 == size) && (ECONNRESET == errno) )
2684   return;
2685 #endif
2686   if (-1 == size)
2687   {
2688     LOG (GNUNET_ERROR_TYPE_DEBUG,
2689          "UDP failed to receive data: %s\n",
2690          STRERROR (errno));
2691     /* Connection failure or something. Not a protocol violation. */
2692     return;
2693   }
2694   if (size < sizeof(struct GNUNET_MessageHeader))
2695   {
2696     LOG (GNUNET_ERROR_TYPE_WARNING,
2697          "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
2698          (unsigned int ) size,
2699          GNUNET_a2s (sa, fromlen));
2700     /* _MAY_ be a connection failure (got partial message) */
2701     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2702     GNUNET_break_op(0);
2703     return;
2704   }
2705   msg = (const struct GNUNET_MessageHeader *) buf;
2706   LOG (GNUNET_ERROR_TYPE_DEBUG,
2707        "UDP received %u-byte message from `%s' type %u\n",
2708        (unsigned int) size,
2709        GNUNET_a2s (sa,
2710                    fromlen),
2711        ntohs (msg->type));
2712   if (size != ntohs (msg->size))
2713   {
2714     LOG (GNUNET_ERROR_TYPE_WARNING,
2715          "UDP malformed message header from %s\n",
2716          (unsigned int) size,
2717          GNUNET_a2s (sa,
2718                      fromlen));
2719     GNUNET_break_op (0);
2720     return;
2721   }
2722   GNUNET_STATISTICS_update (plugin->env->stats,
2723                             "# UDP, total, bytes, received",
2724                             size,
2725                             GNUNET_NO);
2726   network_type = plugin->env->get_address_type (plugin->env->cls,
2727                                                 sa,
2728                                                 fromlen);
2729   switch (sa->sa_family)
2730   {
2731   case AF_INET:
2732     sa4 = (const struct sockaddr_in *) &addr;
2733     v4.options = 0;
2734     v4.ipv4_addr = sa4->sin_addr.s_addr;
2735     v4.u4_port = sa4->sin_port;
2736     int_addr = (union UdpAddress *) &v4;
2737     int_addr_len = sizeof (v4);
2738     break;
2739   case AF_INET6:
2740     sa6 = (const struct sockaddr_in6 *) &addr;
2741     v6.options = 0;
2742     v6.ipv6_addr = sa6->sin6_addr;
2743     v6.u6_port = sa6->sin6_port;
2744     int_addr = (union UdpAddress *) &v6;
2745     int_addr_len = sizeof (v6);
2746     break;
2747   default:
2748     GNUNET_break (0);
2749     return;
2750   }
2751
2752   switch (ntohs (msg->type))
2753   {
2754   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2755     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
2756       udp_broadcast_receive (plugin,
2757                              buf,
2758                              size,
2759                              int_addr,
2760                              int_addr_len,
2761                              network_type);
2762     return;
2763   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2764     if (ntohs (msg->size) < sizeof(struct UDPMessage))
2765     {
2766       GNUNET_break_op(0);
2767       return;
2768     }
2769     process_udp_message (plugin,
2770                          (const struct UDPMessage *) msg,
2771                          int_addr,
2772                          int_addr_len,
2773                          network_type);
2774     return;
2775   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2776     read_process_ack (plugin,
2777                       msg,
2778                       int_addr,
2779                       int_addr_len);
2780     return;
2781   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2782     read_process_fragment (plugin,
2783                            msg,
2784                            int_addr,
2785                            int_addr_len,
2786                            network_type);
2787     return;
2788   default:
2789     GNUNET_break_op(0);
2790     return;
2791   }
2792 }
2793
2794
2795 /**
2796  * Removes messages from the transmission queue that have
2797  * timed out, and then selects a message that should be
2798  * transmitted next.
2799  *
2800  * @param plugin the UDP plugin
2801  * @param sock which socket should we process the queue for (v4 or v6)
2802  * @return message selected for transmission, or NULL for none
2803  */
2804 static struct UDP_MessageWrapper *
2805 remove_timeout_messages_and_select (struct Plugin *plugin,
2806                                     struct GNUNET_NETWORK_Handle *sock)
2807 {
2808   struct UDP_MessageWrapper *udpw = NULL;
2809   struct GNUNET_TIME_Relative remaining;
2810   struct Session *session;
2811   int removed;
2812
2813   removed = GNUNET_NO;
2814   udpw = (sock == plugin->sockv4)
2815     ? plugin->ipv4_queue_head
2816     : plugin->ipv6_queue_head;
2817   while (NULL != udpw)
2818   {
2819     session = udpw->session;
2820     /* Find messages with timeout */
2821     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2822     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2823     {
2824       /* Message timed out */
2825       switch (udpw->msg_type)
2826       {
2827       case UMT_MSG_UNFRAGMENTED:
2828         GNUNET_STATISTICS_update (plugin->env->stats,
2829                                   "# UDP, total, bytes, sent, timeout",
2830                                   udpw->msg_size,
2831                                   GNUNET_NO);
2832         GNUNET_STATISTICS_update (plugin->env->stats,
2833                                   "# UDP, total, messages, sent, timeout",
2834                                   1,
2835                                   GNUNET_NO);
2836         GNUNET_STATISTICS_update (plugin->env->stats,
2837                                   "# UDP, unfragmented msgs, messages, sent, timeout",
2838                                   1,
2839                                   GNUNET_NO);
2840         GNUNET_STATISTICS_update (plugin->env->stats,
2841                                   "# UDP, unfragmented msgs, bytes, sent, timeout",
2842                                   udpw->payload_size,
2843                                   GNUNET_NO);
2844         /* Not fragmented message */
2845         LOG (GNUNET_ERROR_TYPE_DEBUG,
2846              "Message for peer `%s' with size %u timed out\n",
2847              GNUNET_i2s (&udpw->session->target),
2848              udpw->payload_size);
2849         call_continuation (udpw, GNUNET_SYSERR);
2850         /* Remove message */
2851         removed = GNUNET_YES;
2852         dequeue (plugin, udpw);
2853         GNUNET_free(udpw);
2854         break;
2855       case UMT_MSG_FRAGMENTED:
2856         /* Fragmented message */
2857         GNUNET_STATISTICS_update (plugin->env->stats,
2858                                   "# UDP, total, bytes, sent, timeout",
2859                                   udpw->frag_ctx->on_wire_size,
2860                                   GNUNET_NO);
2861         GNUNET_STATISTICS_update (plugin->env->stats,
2862                                   "# UDP, total, messages, sent, timeout",
2863                                   1,
2864                                   GNUNET_NO);
2865         call_continuation (udpw,
2866                            GNUNET_SYSERR);
2867         LOG (GNUNET_ERROR_TYPE_DEBUG,
2868              "Fragment for message for peer `%s' with size %u timed out\n",
2869              GNUNET_i2s (&udpw->session->target),
2870             udpw->frag_ctx->payload_size);
2871
2872         GNUNET_STATISTICS_update (plugin->env->stats,
2873                                   "# UDP, fragmented msgs, messages, sent, timeout",
2874                                   1,
2875                                   GNUNET_NO);
2876         GNUNET_STATISTICS_update (plugin->env->stats,
2877                                   "# UDP, fragmented msgs, bytes, sent, timeout",
2878                                   udpw->frag_ctx->payload_size,
2879                                   GNUNET_NO);
2880         /* Remove fragmented message due to timeout */
2881         fragmented_message_done (udpw->frag_ctx,
2882                                  GNUNET_SYSERR);
2883         break;
2884       case UMT_MSG_ACK:
2885         GNUNET_STATISTICS_update (plugin->env->stats,
2886                                   "# UDP, total, bytes, sent, timeout",
2887                                   udpw->msg_size,
2888                                   GNUNET_NO);
2889         GNUNET_STATISTICS_update (plugin->env->stats,
2890                                   "# UDP, total, messages, sent, timeout",
2891                                   1,
2892                                   GNUNET_NO);
2893         LOG (GNUNET_ERROR_TYPE_DEBUG,
2894              "ACK Message for peer `%s' with size %u timed out\n",
2895              GNUNET_i2s (&udpw->session->target),
2896              udpw->payload_size);
2897         call_continuation (udpw,
2898                            GNUNET_SYSERR);
2899         removed = GNUNET_YES;
2900         dequeue (plugin,
2901                  udpw);
2902         GNUNET_free (udpw);
2903         break;
2904       default:
2905         break;
2906       }
2907       if (sock == plugin->sockv4)
2908         udpw = plugin->ipv4_queue_head;
2909       else if (sock == plugin->sockv6)
2910         udpw = plugin->ipv6_queue_head;
2911       else
2912       {
2913         GNUNET_break(0); /* should never happen */
2914         udpw = NULL;
2915       }
2916       GNUNET_STATISTICS_update (plugin->env->stats,
2917                                 "# messages discarded due to timeout",
2918                                 1,
2919                                 GNUNET_NO);
2920     }
2921     else
2922     {
2923       /* Message did not time out, check flow delay */
2924       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2925       if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2926       {
2927         /* this message is not delayed */
2928         LOG (GNUNET_ERROR_TYPE_DEBUG,
2929              "Message for peer `%s' (%u bytes) is not delayed \n",
2930              GNUNET_i2s (&udpw->session->target),
2931              udpw->payload_size);
2932         break; /* Found message to send, break */
2933       }
2934       else
2935       {
2936         /* Message is delayed, try next */
2937         LOG (GNUNET_ERROR_TYPE_DEBUG,
2938              "Message for peer `%s' (%u bytes) is delayed for %s\n",
2939              GNUNET_i2s (&udpw->session->target),
2940              udpw->payload_size,
2941              GNUNET_STRINGS_relative_time_to_string (remaining,
2942                                                      GNUNET_YES));
2943         udpw = udpw->next;
2944       }
2945     }
2946   }
2947   if (GNUNET_YES == removed)
2948     notify_session_monitor (session->plugin,
2949                             session,
2950                             GNUNET_TRANSPORT_SS_UPDATE);
2951   return udpw;
2952 }
2953
2954
2955 /**
2956  * FIXME.
2957  */
2958 static void
2959 analyze_send_error (struct Plugin *plugin,
2960                     const struct sockaddr *sa,
2961                     socklen_t slen, int error)
2962 {
2963   enum GNUNET_ATS_Network_Type type;
2964
2965   type = plugin->env->get_address_type (plugin->env->cls, sa, slen);
2966   if (((GNUNET_ATS_NET_LAN == type)
2967        || (GNUNET_ATS_NET_WAN == type))
2968       && ((ENETUNREACH == errno)|| (ENETDOWN == errno)))
2969   {
2970     if (slen == sizeof (struct sockaddr_in))
2971     {
2972       /* IPv4: "Network unreachable" or "Network down"
2973        *
2974        * This indicates we do not have connectivity
2975        */
2976       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2977            _("UDP could not transmit message to `%s': "
2978              "Network seems down, please check your network configuration\n"),
2979            GNUNET_a2s (sa, slen));
2980     }
2981     if (slen == sizeof (struct sockaddr_in6))
2982     {
2983       /* IPv6: "Network unreachable" or "Network down"
2984        *
2985        * This indicates that this system is IPv6 enabled, but does not
2986        * have a valid global IPv6 address assigned or we do not have
2987        * connectivity
2988        */
2989       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2990            _("UDP could not transmit IPv6 message! "
2991              "Please check your network configuration and disable IPv6 if your "
2992              "connection does not have a global IPv6 address\n"));
2993     }
2994   }
2995   else
2996   {
2997     LOG (GNUNET_ERROR_TYPE_WARNING,
2998          "UDP could not transmit message to `%s': `%s'\n",
2999          GNUNET_a2s (sa, slen), STRERROR (error));
3000   }
3001 }
3002
3003
3004 /**
3005  * It is time to try to transmit a UDP message.  Select one
3006  * and send.
3007  *
3008  * @param plugin the plugin
3009  * @param sock which socket (v4/v6) to send on
3010  * @return number of bytes transmitted, #GNUNET_SYSERR on failure
3011  */
3012 static size_t
3013 udp_select_send (struct Plugin *plugin,
3014                  struct GNUNET_NETWORK_Handle *sock)
3015 {
3016   ssize_t sent;
3017   socklen_t slen;
3018   struct sockaddr *a;
3019   const struct IPv4UdpAddress *u4;
3020   struct sockaddr_in a4;
3021   const struct IPv6UdpAddress *u6;
3022   struct sockaddr_in6 a6;
3023   struct UDP_MessageWrapper *udpw;
3024
3025   /* Find message to send */
3026   udpw = remove_timeout_messages_and_select (plugin,
3027                                              sock);
3028   if (NULL == udpw)
3029     return 0; /* No message to send */
3030
3031   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
3032   {
3033     u4 = udpw->session->address->address;
3034     memset (&a4, 0, sizeof(a4));
3035     a4.sin_family = AF_INET;
3036 #if HAVE_SOCKADDR_IN_SIN_LEN
3037     a4.sin_len = sizeof (a4);
3038 #endif
3039     a4.sin_port = u4->u4_port;
3040     memcpy (&a4.sin_addr,
3041             &u4->ipv4_addr,
3042             sizeof(struct in_addr));
3043     a = (struct sockaddr *) &a4;
3044     slen = sizeof (a4);
3045   }
3046   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
3047   {
3048     u6 = udpw->session->address->address;
3049     memset (&a6, 0, sizeof(a6));
3050     a6.sin6_family = AF_INET6;
3051 #if HAVE_SOCKADDR_IN_SIN_LEN
3052     a6.sin6_len = sizeof (a6);
3053 #endif
3054     a6.sin6_port = u6->u6_port;
3055     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
3056     a = (struct sockaddr *) &a6;
3057     slen = sizeof (a6);
3058   }
3059   else
3060   {
3061     call_continuation (udpw,
3062                        GNUNET_OK);
3063     dequeue (plugin,
3064              udpw);
3065     notify_session_monitor (plugin,
3066                             udpw->session,
3067                             GNUNET_TRANSPORT_SS_UPDATE);
3068     GNUNET_free (udpw);
3069     return GNUNET_SYSERR;
3070   }
3071   sent = GNUNET_NETWORK_socket_sendto (sock,
3072                                        udpw->msg_buf,
3073                                        udpw->msg_size,
3074                                        a,
3075                                        slen);
3076   if (GNUNET_SYSERR == sent)
3077   {
3078     /* Failure */
3079     analyze_send_error (plugin,
3080                         a,
3081                         slen,
3082                         errno);
3083     call_continuation (udpw,
3084                        GNUNET_SYSERR);
3085     GNUNET_STATISTICS_update (plugin->env->stats,
3086                               "# UDP, total, bytes, sent, failure",
3087                               sent,
3088                               GNUNET_NO);
3089     GNUNET_STATISTICS_update (plugin->env->stats,
3090                               "# UDP, total, messages, sent, failure",
3091                               1,
3092                               GNUNET_NO);
3093   }
3094   else
3095   {
3096     /* Success */
3097     LOG (GNUNET_ERROR_TYPE_DEBUG,
3098          "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
3099          (unsigned int) (udpw->msg_size),
3100          GNUNET_i2s (&udpw->session->target),
3101          GNUNET_a2s (a, slen),
3102          (int ) sent,
3103          (sent < 0) ? STRERROR (errno) : "ok");
3104     GNUNET_STATISTICS_update (plugin->env->stats,
3105                               "# UDP, total, bytes, sent, success",
3106                               sent,
3107                               GNUNET_NO);
3108     GNUNET_STATISTICS_update (plugin->env->stats,
3109                               "# UDP, total, messages, sent, success",
3110                               1,
3111                               GNUNET_NO);
3112     if (NULL != udpw->frag_ctx)
3113       udpw->frag_ctx->on_wire_size += udpw->msg_size;
3114     call_continuation (udpw, GNUNET_OK);
3115   }
3116   dequeue (plugin, udpw);
3117   notify_session_monitor (plugin,
3118                           udpw->session,
3119                           GNUNET_TRANSPORT_SS_UPDATE);
3120   GNUNET_free (udpw);
3121   return sent;
3122 }
3123
3124
3125 /**
3126  * We have been notified that our readset has something to read.  We don't
3127  * know which socket needs to be read, so we have to check each one
3128  * Then reschedule this function to be called again once more is available.
3129  *
3130  * @param cls the plugin handle
3131  * @param tc the scheduling context (for rescheduling this function again)
3132  */
3133 static void
3134 udp_plugin_select (void *cls,
3135                    const struct GNUNET_SCHEDULER_TaskContext *tc)
3136 {
3137   struct Plugin *plugin = cls;
3138
3139   plugin->select_task = NULL;
3140   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3141     return;
3142   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
3143       && (NULL != plugin->sockv4)
3144       && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
3145     udp_select_read (plugin, plugin->sockv4);
3146   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
3147       && (NULL != plugin->sockv4) && (NULL != plugin->ipv4_queue_head)
3148       && (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
3149     udp_select_send (plugin, plugin->sockv4);
3150   schedule_select (plugin);
3151 }
3152
3153
3154 /**
3155  * We have been notified that our readset has something to read.  We don't
3156  * know which socket needs to be read, so we have to check each one
3157  * Then reschedule this function to be called again once more is available.
3158  *
3159  * @param cls the plugin handle
3160  * @param tc the scheduling context (for rescheduling this function again)
3161  */
3162 static void
3163 udp_plugin_select_v6 (void *cls,
3164                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3165 {
3166   struct Plugin *plugin = cls;
3167
3168   plugin->select_task_v6 = NULL;
3169   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3170     return;
3171   if (((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
3172       && (NULL != plugin->sockv6)
3173       && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
3174     udp_select_read (plugin, plugin->sockv6);
3175   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
3176       && (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL )&&
3177       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )udp_select_send (plugin, plugin->sockv6);
3178   schedule_select (plugin);
3179 }
3180
3181
3182 /**
3183  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3184  *
3185  * @param plugin the plugin to initialize
3186  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3187  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3188  * @return number of sockets that were successfully bound
3189  */
3190 static int
3191 setup_sockets (struct Plugin *plugin,
3192                const struct sockaddr_in6 *bind_v6,
3193                const struct sockaddr_in *bind_v4)
3194 {
3195   int tries;
3196   int sockets_created = 0;
3197   struct sockaddr_in6 server_addrv6;
3198   struct sockaddr_in server_addrv4;
3199   struct sockaddr *server_addr;
3200   struct sockaddr *addrs[2];
3201   socklen_t addrlens[2];
3202   socklen_t addrlen;
3203   int eno;
3204
3205   /* Create IPv6 socket */
3206   eno = EINVAL;
3207   if (GNUNET_YES == plugin->enable_ipv6)
3208   {
3209     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
3210     if (NULL == plugin->sockv6)
3211     {
3212       LOG(GNUNET_ERROR_TYPE_WARNING,
3213           "Disabling IPv6 since it is not supported on this system!\n");
3214       plugin->enable_ipv6 = GNUNET_NO;
3215     }
3216     else
3217     {
3218       memset (&server_addrv6, '\0', sizeof(struct sockaddr_in6));
3219 #if HAVE_SOCKADDR_IN_SIN_LEN
3220       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3221 #endif
3222       server_addrv6.sin6_family = AF_INET6;
3223       if (NULL != bind_v6)
3224         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3225       else
3226         server_addrv6.sin6_addr = in6addr_any;
3227
3228       if (0 == plugin->port) /* autodetect */
3229         server_addrv6.sin6_port
3230           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3231                    + 32000);
3232       else
3233         server_addrv6.sin6_port = htons (plugin->port);
3234       addrlen = sizeof(struct sockaddr_in6);
3235       server_addr = (struct sockaddr *) &server_addrv6;
3236
3237       tries = 0;
3238       while (tries < 10)
3239       {
3240         LOG(GNUNET_ERROR_TYPE_DEBUG,
3241             "Binding to IPv6 `%s'\n",
3242             GNUNET_a2s (server_addr, addrlen));
3243         /* binding */
3244         if (GNUNET_OK ==
3245             GNUNET_NETWORK_socket_bind (plugin->sockv6,
3246                                         server_addr,
3247                                         addrlen))
3248           break;
3249         eno = errno;
3250         if (0 != plugin->port)
3251         {
3252           tries = 10; /* fail immediately */
3253           break; /* bind failed on specific port */
3254         }
3255         /* autodetect */
3256         server_addrv6.sin6_port
3257           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3258                    + 32000);
3259         tries++;
3260       }
3261       if (tries >= 10)
3262       {
3263         GNUNET_NETWORK_socket_close (plugin->sockv6);
3264         plugin->enable_ipv6 = GNUNET_NO;
3265         plugin->sockv6 = NULL;
3266       }
3267       else
3268       {
3269         plugin->port = ntohs (server_addrv6.sin6_port);
3270       }
3271       if (NULL != plugin->sockv6)
3272       {
3273         LOG (GNUNET_ERROR_TYPE_DEBUG,
3274              "IPv6 socket created on port %s\n",
3275              GNUNET_a2s (server_addr, addrlen));
3276         addrs[sockets_created] = (struct sockaddr *) &server_addrv6;
3277         addrlens[sockets_created] = sizeof(struct sockaddr_in6);
3278         sockets_created++;
3279       }
3280       else
3281       {
3282         LOG (GNUNET_ERROR_TYPE_ERROR,
3283              "Failed to bind UDP socket to %s: %s\n",
3284              GNUNET_a2s (server_addr, addrlen),
3285              STRERROR (eno));
3286       }
3287     }
3288   }
3289
3290   /* Create IPv4 socket */
3291   eno = EINVAL;
3292   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
3293   if (NULL == plugin->sockv4)
3294   {
3295     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3296                          "socket");
3297     LOG (GNUNET_ERROR_TYPE_WARNING,
3298          "Disabling IPv4 since it is not supported on this system!\n");
3299     plugin->enable_ipv4 = GNUNET_NO;
3300   }
3301   else
3302   {
3303     memset (&server_addrv4, '\0', sizeof(struct sockaddr_in));
3304 #if HAVE_SOCKADDR_IN_SIN_LEN
3305     server_addrv4.sin_len = sizeof (struct sockaddr_in);
3306 #endif
3307     server_addrv4.sin_family = AF_INET;
3308     if (NULL != bind_v4)
3309       server_addrv4.sin_addr = bind_v4->sin_addr;
3310     else
3311       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3312
3313     if (0 == plugin->port)
3314       /* autodetect */
3315       server_addrv4.sin_port
3316         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3317                                            33537)
3318                  + 32000);
3319     else
3320       server_addrv4.sin_port = htons (plugin->port);
3321
3322     addrlen = sizeof(struct sockaddr_in);
3323     server_addr = (struct sockaddr *) &server_addrv4;
3324
3325     tries = 0;
3326     while (tries < 10)
3327     {
3328       LOG (GNUNET_ERROR_TYPE_DEBUG,
3329            "Binding to IPv4 `%s'\n",
3330            GNUNET_a2s (server_addr, addrlen));
3331
3332       /* binding */
3333       if (GNUNET_OK ==
3334           GNUNET_NETWORK_socket_bind (plugin->sockv4,
3335                                       server_addr,
3336                                       addrlen))
3337         break;
3338       eno = errno;
3339       if (0 != plugin->port)
3340       {
3341         tries = 10; /* fail */
3342         break; /* bind failed on specific port */
3343       }
3344
3345       /* autodetect */
3346       server_addrv4.sin_port
3347         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3348                  + 32000);
3349       tries++;
3350     }
3351     if (tries >= 10)
3352     {
3353       GNUNET_NETWORK_socket_close (plugin->sockv4);
3354       plugin->enable_ipv4 = GNUNET_NO;
3355       plugin->sockv4 = NULL;
3356     }
3357     else
3358     {
3359       plugin->port = ntohs (server_addrv4.sin_port);
3360     }
3361
3362     if (NULL != plugin->sockv4)
3363     {
3364       LOG (GNUNET_ERROR_TYPE_DEBUG,
3365            "IPv4 socket created on port %s\n",
3366            GNUNET_a2s (server_addr, addrlen));
3367       addrs[sockets_created] = (struct sockaddr *) &server_addrv4;
3368       addrlens[sockets_created] = sizeof(struct sockaddr_in);
3369       sockets_created++;
3370     }
3371     else
3372     {
3373       LOG (GNUNET_ERROR_TYPE_ERROR,
3374            _("Failed to bind UDP socket to %s: %s\n"),
3375            GNUNET_a2s (server_addr, addrlen),
3376            STRERROR (eno));
3377     }
3378   }
3379
3380   if (0 == sockets_created)
3381   {
3382     LOG (GNUNET_ERROR_TYPE_WARNING,
3383          _("Failed to open UDP sockets\n"));
3384     return 0; /* No sockets created, return */
3385   }
3386
3387   /* Create file descriptors */
3388   if (plugin->enable_ipv4 == GNUNET_YES)
3389   {
3390     plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
3391     plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
3392     GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
3393     GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
3394     if (NULL != plugin->sockv4)
3395     {
3396       GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
3397       GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
3398     }
3399   }
3400
3401   if (plugin->enable_ipv6 == GNUNET_YES)
3402   {
3403     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
3404     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
3405     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
3406     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
3407     if (NULL != plugin->sockv6)
3408     {
3409       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
3410       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
3411     }
3412   }
3413
3414   schedule_select (plugin);
3415   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3416                                      GNUNET_NO,
3417                                      plugin->port,
3418                                      sockets_created,
3419                                      (const struct sockaddr **) addrs,
3420                                      addrlens,
3421                                      &udp_nat_port_map_callback,
3422                                      NULL,
3423                                      plugin);
3424
3425   return sockets_created;
3426 }
3427
3428
3429 /**
3430  * Return information about the given session to the
3431  * monitor callback.
3432  *
3433  * @param cls the `struct Plugin` with the monitor callback (`sic`)
3434  * @param peer peer we send information about
3435  * @param value our `struct Session` to send information about
3436  * @return #GNUNET_OK (continue to iterate)
3437  */
3438 static int
3439 send_session_info_iter (void *cls,
3440                         const struct GNUNET_PeerIdentity *peer,
3441                         void *value)
3442 {
3443   struct Plugin *plugin = cls;
3444   struct Session *session = value;
3445
3446   notify_session_monitor (plugin,
3447                           session,
3448                           GNUNET_TRANSPORT_SS_INIT);
3449   notify_session_monitor (plugin,
3450                           session,
3451                           GNUNET_TRANSPORT_SS_UP);
3452   return GNUNET_OK;
3453 }
3454
3455
3456 /**
3457  * Begin monitoring sessions of a plugin.  There can only
3458  * be one active monitor per plugin (i.e. if there are
3459  * multiple monitors, the transport service needs to
3460  * multiplex the generated events over all of them).
3461  *
3462  * @param cls closure of the plugin
3463  * @param sic callback to invoke, NULL to disable monitor;
3464  *            plugin will being by iterating over all active
3465  *            sessions immediately and then enter monitor mode
3466  * @param sic_cls closure for @a sic
3467  */
3468 static void
3469 udp_plugin_setup_monitor (void *cls,
3470                           GNUNET_TRANSPORT_SessionInfoCallback sic,
3471                           void *sic_cls)
3472 {
3473   struct Plugin *plugin = cls;
3474
3475   plugin->sic = sic;
3476   plugin->sic_cls = sic_cls;
3477   if (NULL != sic)
3478   {
3479     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3480                                            &send_session_info_iter,
3481                                            plugin);
3482     /* signal end of first iteration */
3483     sic (sic_cls, NULL, NULL);
3484   }
3485 }
3486
3487
3488 /**
3489  * The exported method. Makes the core api available via a global and
3490  * returns the udp transport API.
3491  *
3492  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3493  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3494  */
3495 void *
3496 libgnunet_plugin_transport_udp_init (void *cls)
3497 {
3498   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3499   struct GNUNET_TRANSPORT_PluginFunctions *api;
3500   struct Plugin *p;
3501   unsigned long long port;
3502   unsigned long long aport;
3503   unsigned long long udp_max_bps;
3504   unsigned long long enable_v6;
3505   unsigned long long enable_broadcasting;
3506   unsigned long long enable_broadcasting_recv;
3507   char *bind4_address;
3508   char *bind6_address;
3509   char *fancy_interval;
3510   struct GNUNET_TIME_Relative interval;
3511   struct sockaddr_in server_addrv4;
3512   struct sockaddr_in6 server_addrv6;
3513   int res;
3514   int have_bind4;
3515   int have_bind6;
3516
3517   if (NULL == env->receive)
3518   {
3519     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3520      initialze the plugin or the API */
3521     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3522     api->cls = NULL;
3523     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3524     api->address_to_string = &udp_address_to_string;
3525     api->string_to_address = &udp_string_to_address;
3526     return api;
3527   }
3528
3529   /* Get port number: port == 0 : autodetect a port,
3530    * > 0 : use this port, not given : 2086 default */
3531   if (GNUNET_OK !=
3532       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3533                                              "transport-udp",
3534                                              "PORT", &port))
3535     port = 2086;
3536   if (GNUNET_OK !=
3537       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3538                                              "transport-udp",
3539                                              "ADVERTISED_PORT", &aport))
3540     aport = port;
3541   if (port > 65535)
3542   {
3543     LOG (GNUNET_ERROR_TYPE_WARNING,
3544          _("Given `%s' option is out of range: %llu > %u\n"),
3545          "PORT", port,
3546          65535);
3547     return NULL;
3548   }
3549
3550   /* Protocols */
3551   if (GNUNET_YES ==
3552       GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat", "DISABLEV6"))
3553     enable_v6 = GNUNET_NO;
3554   else
3555     enable_v6 = GNUNET_YES;
3556
3557   /* Addresses */
3558   have_bind4 = GNUNET_NO;
3559   memset (&server_addrv4, 0, sizeof(server_addrv4));
3560   if (GNUNET_YES ==
3561       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3562                                              "BINDTO", &bind4_address))
3563   {
3564     LOG (GNUNET_ERROR_TYPE_DEBUG,
3565          "Binding udp plugin to specific address: `%s'\n",
3566          bind4_address);
3567     if (1 != inet_pton (AF_INET,
3568                         bind4_address,
3569                         &server_addrv4.sin_addr))
3570     {
3571       GNUNET_free (bind4_address);
3572       return NULL;
3573     }
3574     have_bind4 = GNUNET_YES;
3575   }
3576   GNUNET_free_non_null(bind4_address);
3577   have_bind6 = GNUNET_NO;
3578   memset (&server_addrv6, 0, sizeof(server_addrv6));
3579   if (GNUNET_YES ==
3580       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3581                                              "BINDTO6", &bind6_address))
3582   {
3583     LOG (GNUNET_ERROR_TYPE_DEBUG,
3584          "Binding udp plugin to specific address: `%s'\n",
3585          bind6_address);
3586     if (1 != inet_pton (AF_INET6,
3587                         bind6_address,
3588                         &server_addrv6.sin6_addr))
3589     {
3590       LOG (GNUNET_ERROR_TYPE_ERROR,
3591            _("Invalid IPv6 address: `%s'\n"),
3592            bind6_address);
3593       GNUNET_free (bind6_address);
3594       return NULL;
3595     }
3596     have_bind6 = GNUNET_YES;
3597   }
3598   GNUNET_free_non_null (bind6_address);
3599
3600   /* Enable neighbour discovery */
3601   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3602       "transport-udp", "BROADCAST");
3603   if (enable_broadcasting == GNUNET_SYSERR)
3604     enable_broadcasting = GNUNET_NO;
3605
3606   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3607       "transport-udp", "BROADCAST_RECEIVE");
3608   if (enable_broadcasting_recv == GNUNET_SYSERR)
3609     enable_broadcasting_recv = GNUNET_YES;
3610
3611   if (GNUNET_SYSERR ==
3612       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3613                                              "BROADCAST_INTERVAL",
3614                                              &fancy_interval))
3615   {
3616     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
3617   }
3618   else
3619   {
3620     if (GNUNET_SYSERR ==
3621         GNUNET_STRINGS_fancy_time_to_relative (fancy_interval, &interval))
3622     {
3623       interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
3624     }
3625     GNUNET_free(fancy_interval);
3626   }
3627
3628   /* Maximum datarate */
3629   if (GNUNET_OK !=
3630       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
3631                                              "MAX_BPS", &udp_max_bps))
3632   {
3633     udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */
3634   }
3635
3636   p = GNUNET_new (struct Plugin);
3637   p->port = port;
3638   p->aport = aport;
3639   p->broadcast_interval = interval;
3640   p->enable_ipv6 = enable_v6;
3641   p->enable_ipv4 = GNUNET_YES; /* default */
3642   p->enable_broadcasting = enable_broadcasting;
3643   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3644   p->env = env;
3645   p->sessions = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
3646   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (
3647       GNUNET_CONTAINER_HEAP_ORDER_MIN);
3648   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
3649                                      p);
3650   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3651                                  NULL,
3652                                  NULL,
3653                                  GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
3654                                  30);
3655   LOG(GNUNET_ERROR_TYPE_DEBUG,
3656       "Setting up sockets\n");
3657   res = setup_sockets (p,
3658                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3659                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3660   if ((res == 0) || ((p->sockv4 == NULL )&& (p->sockv6 == NULL)))
3661   {
3662     LOG (GNUNET_ERROR_TYPE_ERROR,
3663         _("Failed to create network sockets, plugin failed\n"));
3664     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3665     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3666     GNUNET_SERVER_mst_destroy (p->mst);
3667     GNUNET_free (p);
3668     return NULL;
3669   }
3670
3671   /* Setup broadcasting and receiving beacons */
3672   setup_broadcast (p, &server_addrv6, &server_addrv4);
3673
3674   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3675   api->cls = p;
3676   api->send = NULL;
3677   api->disconnect_session = &udp_disconnect_session;
3678   api->query_keepalive_factor = &udp_query_keepalive_factor;
3679   api->disconnect_peer = &udp_disconnect;
3680   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3681   api->address_to_string = &udp_address_to_string;
3682   api->string_to_address = &udp_string_to_address;
3683   api->check_address = &udp_plugin_check_address;
3684   api->get_session = &udp_plugin_get_session;
3685   api->send = &udp_plugin_send;
3686   api->get_network = &udp_get_network;
3687   api->update_session_timeout = &udp_plugin_update_session_timeout;
3688   api->setup_monitor = &udp_plugin_setup_monitor;
3689   return api;
3690 }
3691
3692
3693 /**
3694  * Function called on each entry in the defragmentation heap to
3695  * clean it up.
3696  *
3697  * @param cls NULL
3698  * @param node node in the heap (to be removed)
3699  * @param element a `struct DefragContext` to be cleaned up
3700  * @param cost unused
3701  * @return #GNUNET_YES
3702  */
3703 static int
3704 heap_cleanup_iterator (void *cls,
3705                        struct GNUNET_CONTAINER_HeapNode *node,
3706                        void *element,
3707                        GNUNET_CONTAINER_HeapCostType cost)
3708 {
3709   struct DefragContext *d_ctx = element;
3710
3711   GNUNET_CONTAINER_heap_remove_node (node);
3712   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3713   GNUNET_free (d_ctx);
3714   return GNUNET_YES;
3715 }
3716
3717
3718 /**
3719  * The exported method. Makes the core api available via a global and
3720  * returns the udp transport API.
3721  *
3722  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3723  * @return NULL
3724  */
3725 void *
3726 libgnunet_plugin_transport_udp_done (void *cls)
3727 {
3728   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3729   struct Plugin *plugin = api->cls;
3730   struct PrettyPrinterContext *cur;
3731   struct PrettyPrinterContext *next;
3732   struct UDP_MessageWrapper *udpw;
3733
3734   if (NULL == plugin)
3735   {
3736     GNUNET_free(api);
3737     return NULL;
3738   }
3739   stop_broadcast (plugin);
3740   if (plugin->select_task != NULL)
3741   {
3742     GNUNET_SCHEDULER_cancel (plugin->select_task);
3743     plugin->select_task = NULL;
3744   }
3745   if (plugin->select_task_v6 != NULL)
3746   {
3747     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3748     plugin->select_task_v6 = NULL;
3749   }
3750
3751   /* Closing sockets */
3752   if (GNUNET_YES == plugin->enable_ipv4)
3753   {
3754     if (NULL != plugin->sockv4)
3755     {
3756       GNUNET_break (GNUNET_OK ==
3757                     GNUNET_NETWORK_socket_close (plugin->sockv4));
3758       plugin->sockv4 = NULL;
3759     }
3760     GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
3761     GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
3762   }
3763   if (GNUNET_YES == plugin->enable_ipv6)
3764   {
3765     if (NULL != plugin->sockv6)
3766     {
3767       GNUNET_break (GNUNET_OK ==
3768                     GNUNET_NETWORK_socket_close (plugin->sockv6));
3769       plugin->sockv6 = NULL;
3770
3771       GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
3772       GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
3773     }
3774   }
3775   if (NULL != plugin->nat)
3776   {
3777     GNUNET_NAT_unregister (plugin->nat);
3778     plugin->nat = NULL;
3779   }
3780   if (NULL != plugin->defrag_ctxs)
3781   {
3782     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
3783                                    &heap_cleanup_iterator, NULL);
3784     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
3785     plugin->defrag_ctxs = NULL;
3786   }
3787   if (NULL != plugin->mst)
3788   {
3789     GNUNET_SERVER_mst_destroy (plugin->mst);
3790     plugin->mst = NULL;
3791   }
3792
3793   /* Clean up leftover messages */
3794   udpw = plugin->ipv4_queue_head;
3795   while (NULL != udpw)
3796   {
3797     struct UDP_MessageWrapper *tmp = udpw->next;
3798     dequeue (plugin, udpw);
3799     call_continuation (udpw, GNUNET_SYSERR);
3800     GNUNET_free(udpw);
3801     udpw = tmp;
3802   }
3803   udpw = plugin->ipv6_queue_head;
3804   while (NULL != udpw)
3805   {
3806     struct UDP_MessageWrapper *tmp = udpw->next;
3807     dequeue (plugin, udpw);
3808     call_continuation (udpw, GNUNET_SYSERR);
3809     GNUNET_free(udpw);
3810     udpw = tmp;
3811   }
3812
3813   /* Clean up sessions */
3814   LOG (GNUNET_ERROR_TYPE_DEBUG,
3815        "Cleaning up sessions\n");
3816   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3817                                          &disconnect_and_free_it, plugin);
3818   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
3819
3820   next = plugin->ppc_dll_head;
3821   for (cur = next; NULL != cur; cur = next)
3822   {
3823     GNUNET_break(0);
3824     next = cur->next;
3825     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
3826                                  plugin->ppc_dll_tail,
3827                                  cur);
3828     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
3829     GNUNET_free (cur);
3830   }
3831   GNUNET_free (plugin);
3832   GNUNET_free (api);
3833   return NULL;
3834 }
3835
3836 /* end of plugin_transport_udp.c */