fix compiler warning about uninitialized variable
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  (C) 2010-2014 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
46
47 /**
48  * Number of messages we can defragment in parallel.  We only really
49  * defragment 1 message at a time, but if messages get re-ordered, we
50  * may want to keep knowledge about the previous message to avoid
51  * discarding the current message in favor of a single fragment of a
52  * previous message.  3 should be good since we don't expect massive
53  * message reorderings with UDP.
54  */
55 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
56
57 /**
58  * We keep a defragmentation queue per sender address.  How many
59  * sender addresses do we support at the same time? Memory consumption
60  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
61  * value. (So 128 corresponds to 12 MB and should suffice for
62  * connecting to roughly 128 peers via UDP).
63  */
64 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
65
66
67 /**
68  * Closure for #append_port().
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * DLL
74    */
75   struct PrettyPrinterContext *next;
76
77   /**
78    * DLL
79    */
80   struct PrettyPrinterContext *prev;
81
82   /**
83    * Our plugin.
84    */
85   struct Plugin *plugin;
86
87   /**
88    * Resolver handle
89    */
90   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
91
92   /**
93    * Function to call with the result.
94    */
95   GNUNET_TRANSPORT_AddressStringCallback asc;
96
97   /**
98    * Clsoure for @e asc.
99    */
100   void *asc_cls;
101
102   /**
103    * Timeout task
104    */
105   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
106
107   /**
108    * IPv6 address
109    */
110   int ipv6;
111
112   /**
113    * Options
114    */
115   uint32_t options;
116
117   /**
118    * Port to add after the IP address.
119    */
120   uint16_t port;
121
122 };
123
124
125 /**
126  * Session with another peer.
127  */
128 struct Session
129 {
130   /**
131    * Which peer is this session for?
132    */
133   struct GNUNET_PeerIdentity target;
134
135   /**
136    * Plugin this session belongs to.
137    */
138   struct Plugin *plugin;
139
140   /**
141    * Context for dealing with fragments.
142    */
143   struct UDP_FragmentationContext *frag_ctx;
144
145   /**
146    * Desired delay for next sending we send to other peer
147    */
148   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
149
150   /**
151    * Desired delay for next sending we received from other peer
152    */
153   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
154
155   /**
156    * Session timeout task
157    */
158   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
159
160   /**
161    * When does this session time out?
162    */
163   struct GNUNET_TIME_Absolute timeout;
164
165   /**
166    * expected delay for ACKs
167    */
168   struct GNUNET_TIME_Relative last_expected_ack_delay;
169
170   /**
171    * desired delay between UDP messages
172    */
173   struct GNUNET_TIME_Relative last_expected_msg_delay;
174
175   /**
176    * Address metrics (as set by the "update_address_metrics" by
177    * the environment).
178    */
179   struct GNUNET_ATS_Information ats;
180
181   /**
182    * Our own address.
183    */
184   struct GNUNET_HELLO_Address *address;
185
186   /**
187    * Number of bytes waiting for transmission to this peer.
188    */
189   unsigned long long bytes_in_queue;
190
191   /**
192    * Number of messages waiting for transmission to this peer.
193    */
194   unsigned int msgs_in_queue;
195
196   /**
197    * Reference counter to indicate that this session is
198    * currently being used and must not be destroyed;
199    * setting @e in_destroy will destroy it as soon as
200    * possible.
201    */
202   unsigned int rc;
203
204   /**
205    * Is this session about to be destroyed (sometimes we cannot
206    * destroy a session immediately as below us on the stack
207    * there might be code that still uses it; in this case,
208    * @e rc is non-zero).
209    */
210   int in_destroy;
211 };
212
213
214 /**
215  * Closure for #session_cmp_it().
216  */
217 struct SessionCompareContext
218 {
219   /**
220    * Set to session matching the address.
221    */
222   struct Session *res;
223
224   /**
225    * Address we are looking for.
226    */
227   const struct GNUNET_HELLO_Address *address;
228 };
229
230
231 /**
232  * Closure for #process_inbound_tokenized_messages().
233  */
234 struct SourceInformation
235 {
236   /**
237    * Sender identity.
238    */
239   struct GNUNET_PeerIdentity sender;
240
241   /**
242    * Source address.
243    */
244   const void *arg;
245
246   /**
247    * Associated session.
248    */
249   struct Session *session;
250
251   /**
252    * Number of bytes in source address.
253    */
254   size_t args;
255
256 };
257
258 /**
259  * Closure for #find_receive_context().
260  */
261 struct FindReceiveContext
262 {
263   /**
264    * Where to store the result.
265    */
266   struct DefragContext *rc;
267
268   /**
269    * Address to find.
270    */
271   const struct sockaddr *addr;
272
273   /**
274    *
275    */
276   struct Session *session;
277
278   /**
279    * Number of bytes in @e addr.
280    */
281   socklen_t addr_len;
282
283 };
284
285 /**
286  * Data structure to track defragmentation contexts based
287  * on the source of the UDP traffic.
288  */
289 struct DefragContext
290 {
291
292   /**
293    * Defragmentation context.
294    */
295   struct GNUNET_DEFRAGMENT_Context *defrag;
296
297   /**
298    * Source address this receive context is for (allocated at the
299    * end of the struct).
300    */
301   const struct sockaddr *src_addr;
302
303   /**
304    * Reference to master plugin struct.
305    */
306   struct Plugin *plugin;
307
308   /**
309    * Node in the defrag heap.
310    */
311   struct GNUNET_CONTAINER_HeapNode *hnode;
312
313   /**
314    * Length of @e src_addr.
315    */
316   size_t addr_len;
317 };
318
319
320 /**
321  * Context to send fragmented messages
322  */
323 struct UDP_FragmentationContext
324 {
325   /**
326    * Next in linked list
327    */
328   struct UDP_FragmentationContext *next;
329
330   /**
331    * Previous in linked list
332    */
333   struct UDP_FragmentationContext *prev;
334
335   /**
336    * The plugin
337    */
338   struct Plugin *plugin;
339
340   /**
341    * Handle for GNUNET_FRAGMENT context
342    */
343   struct GNUNET_FRAGMENT_Context *frag;
344
345   /**
346    * The session this fragmentation context belongs to
347    */
348   struct Session *session;
349
350   /**
351    * Function to call upon completion of the transmission.
352    */
353   GNUNET_TRANSPORT_TransmitContinuation cont;
354
355   /**
356    * Closure for @e cont.
357    */
358   void *cont_cls;
359
360   /**
361    * Message timeout
362    */
363   struct GNUNET_TIME_Absolute timeout;
364
365   /**
366    * Payload size of original unfragmented message
367    */
368   size_t payload_size;
369
370   /**
371    * Bytes used to send all fragments on wire including UDP overhead
372    */
373   size_t on_wire_size;
374
375   /**
376    * FIXME.
377    */
378   unsigned int fragments_used;
379
380 };
381
382
383 /**
384  * Message types included in a `struct UDP_MessageWrapper`
385  */
386 enum UDP_MessageType
387 {
388   /**
389    * Uninitialized (error)
390    */
391   UMT_UNDEFINED = 0,
392
393   /**
394    * Fragment of a message.
395    */
396   UMT_MSG_FRAGMENTED = 1,
397
398   /**
399    *
400    */
401   UMT_MSG_FRAGMENTED_COMPLETE = 2,
402
403   /**
404    * Unfragmented message.
405    */
406   UMT_MSG_UNFRAGMENTED = 3,
407
408   /**
409    * Receipt confirmation.
410    */
411   UMT_MSG_ACK = 4
412
413 };
414
415
416 /**
417  * Information we track for each message in the queue.
418  */
419 struct UDP_MessageWrapper
420 {
421   /**
422    * Session this message belongs to
423    */
424   struct Session *session;
425
426   /**
427    * DLL of messages
428    * previous element
429    */
430   struct UDP_MessageWrapper *prev;
431
432   /**
433    * DLL of messages
434    * previous element
435    */
436   struct UDP_MessageWrapper *next;
437
438   /**
439    * Message with size msg_size including UDP specific overhead
440    */
441   char *msg_buf;
442
443   /**
444    * Function to call upon completion of the transmission.
445    */
446   GNUNET_TRANSPORT_TransmitContinuation cont;
447
448   /**
449    * Closure for @e cont.
450    */
451   void *cont_cls;
452
453   /**
454    * Fragmentation context
455    * frag_ctx == NULL if transport <= MTU
456    * frag_ctx != NULL if transport > MTU
457    */
458   struct UDP_FragmentationContext *frag_ctx;
459
460   /**
461    * Message timeout
462    */
463   struct GNUNET_TIME_Absolute timeout;
464
465   /**
466    * Size of UDP message to send including UDP specific overhead
467    */
468   size_t msg_size;
469
470   /**
471    * Payload size of original message
472    */
473   size_t payload_size;
474
475   /**
476    * Message type
477    */
478   enum UDP_MessageType msg_type;
479
480 };
481
482
483 /**
484  * UDP ACK Message-Packet header (after defragmentation).
485  */
486 struct UDP_ACK_Message
487 {
488   /**
489    * Message header.
490    */
491   struct GNUNET_MessageHeader header;
492
493   /**
494    * Desired delay for flow control
495    */
496   uint32_t delay;
497
498   /**
499    * What is the identity of the sender
500    */
501   struct GNUNET_PeerIdentity sender;
502
503 };
504
505
506 /**
507  * If a session monitor is attached, notify it about the new
508  * session state.
509  *
510  * @param plugin our plugin
511  * @param session session that changed state
512  * @param state new state of the session
513  */
514 static void
515 notify_session_monitor (struct Plugin *plugin,
516                         struct Session *session,
517                         enum GNUNET_TRANSPORT_SessionState state)
518 {
519   struct GNUNET_TRANSPORT_SessionInfo info;
520
521   if (NULL == plugin->sic)
522     return;
523   memset (&info, 0, sizeof (info));
524   info.state = state;
525   info.is_inbound = GNUNET_SYSERR; /* hard to say */
526   info.num_msg_pending = session->msgs_in_queue;
527   info.num_bytes_pending = session->bytes_in_queue;
528   /* info.receive_delay remains zero as this is not supported by UDP
529      (cannot selectively not receive from 'some' peer while continuing
530      to receive from others) */
531   info.session_timeout = session->timeout;
532   info.address = session->address;
533   plugin->sic (plugin->sic_cls,
534                session,
535                &info);
536 }
537
538
539 /**
540  * We have been notified that our readset has something to read.  We don't
541  * know which socket needs to be read, so we have to check each one
542  * Then reschedule this function to be called again once more is available.
543  *
544  * @param cls the plugin handle
545  * @param tc the scheduling context (for rescheduling this function again)
546  */
547 static void
548 udp_plugin_select (void *cls,
549                    const struct GNUNET_SCHEDULER_TaskContext *tc);
550
551
552 /**
553  * We have been notified that our readset has something to read.  We don't
554  * know which socket needs to be read, so we have to check each one
555  * Then reschedule this function to be called again once more is available.
556  *
557  * @param cls the plugin handle
558  * @param tc the scheduling context (for rescheduling this function again)
559  */
560 static void
561 udp_plugin_select_v6 (void *cls,
562                       const struct GNUNET_SCHEDULER_TaskContext *tc);
563
564
565 /**
566  * (re)schedule select tasks for this plugin.
567  *
568  * @param plugin plugin to reschedule
569  */
570 static void
571 schedule_select (struct Plugin *plugin)
572 {
573   struct GNUNET_TIME_Relative min_delay;
574   struct UDP_MessageWrapper *udpw;
575
576   if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
577   {
578     /* Find a message ready to send:
579      * Flow delay from other peer is expired or not set (0) */
580     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
581     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
582       min_delay = GNUNET_TIME_relative_min (min_delay,
583           GNUNET_TIME_absolute_get_remaining (
584               udpw->session->flow_delay_from_other_peer));
585
586     if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK )
587       GNUNET_SCHEDULER_cancel (plugin->select_task);
588
589     /* Schedule with:
590      * - write active set if message is ready
591      * - timeout minimum delay */
592     plugin->select_task = GNUNET_SCHEDULER_add_select (
593         GNUNET_SCHEDULER_PRIORITY_DEFAULT,
594         (0 == min_delay.rel_value_us) ?
595             GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v4,
596         (0 == min_delay.rel_value_us) ? plugin->ws_v4 : NULL,
597         &udp_plugin_select, plugin);
598   }
599   if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
600   {
601     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
602     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
603       min_delay = GNUNET_TIME_relative_min (min_delay,
604           GNUNET_TIME_absolute_get_remaining (
605               udpw->session->flow_delay_from_other_peer));
606
607     if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task_v6)
608       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
609     plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (
610         GNUNET_SCHEDULER_PRIORITY_DEFAULT,
611         (0 == min_delay.rel_value_us) ?
612             GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v6,
613         (0 == min_delay.rel_value_us) ? plugin->ws_v6 : NULL,
614         &udp_plugin_select_v6, plugin);
615   }
616 }
617
618
619 /**
620  * Function called for a quick conversion of the binary address to
621  * a numeric address.  Note that the caller must not free the
622  * address and that the next call to this function is allowed
623  * to override the address again.
624  *
625  * @param cls closure
626  * @param addr binary address
627  * @param addrlen length of the address
628  * @return string representing the same address
629  */
630 const char *
631 udp_address_to_string (void *cls,
632                        const void *addr,
633                        size_t addrlen)
634 {
635   static char rbuf[INET6_ADDRSTRLEN + 10];
636   char buf[INET6_ADDRSTRLEN];
637   const void *sb;
638   struct in_addr a4;
639   struct in6_addr a6;
640   const struct IPv4UdpAddress *t4;
641   const struct IPv6UdpAddress *t6;
642   int af;
643   uint16_t port;
644   uint32_t options;
645
646   if ((NULL != addr) && (addrlen == sizeof(struct IPv6UdpAddress)))
647   {
648     t6 = addr;
649     af = AF_INET6;
650     options = ntohl (t6->options);
651     port = ntohs (t6->u6_port);
652     memcpy (&a6, &t6->ipv6_addr, sizeof(a6));
653     sb = &a6;
654   }
655   else if ((NULL != addr) && (addrlen == sizeof(struct IPv4UdpAddress)))
656   {
657     t4 = addr;
658     af = AF_INET;
659     options = ntohl (t4->options);
660     port = ntohs (t4->u4_port);
661     memcpy (&a4, &t4->ipv4_addr, sizeof(a4));
662     sb = &a4;
663   }
664   else
665   {
666     return NULL;
667   }
668   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
669
670   GNUNET_snprintf (rbuf, sizeof(rbuf),
671       (af == AF_INET6) ? "%s.%u.[%s]:%u" : "%s.%u.%s:%u", PLUGIN_NAME, options,
672       buf, port);
673   return rbuf;
674 }
675
676
677 /**
678  * Function called to convert a string address to
679  * a binary address.
680  *
681  * @param cls closure ('struct Plugin*')
682  * @param addr string address
683  * @param addrlen length of the address
684  * @param buf location to store the buffer
685  * @param added location to store the number of bytes in the buffer.
686  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
687  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
688  */
689 static int
690 udp_string_to_address (void *cls,
691                        const char *addr,
692                        uint16_t addrlen,
693                        void **buf,
694                        size_t *added)
695 {
696   struct sockaddr_storage socket_address;
697   char *address;
698   char *plugin;
699   char *optionstr;
700   uint32_t options;
701
702   /* Format tcp.options.address:port */
703   address = NULL;
704   plugin = NULL;
705   optionstr = NULL;
706
707   if ((NULL == addr) || (addrlen == 0))
708   {
709     GNUNET_break(0);
710     return GNUNET_SYSERR;
711   }
712   if ('\0' != addr[addrlen - 1])
713   {
714     GNUNET_break(0);
715     return GNUNET_SYSERR;
716   }
717   if (strlen (addr) != addrlen - 1)
718   {
719     GNUNET_break(0);
720     return GNUNET_SYSERR;
721   }
722   plugin = GNUNET_strdup (addr);
723   optionstr = strchr (plugin, '.');
724   if (NULL == optionstr)
725   {
726     GNUNET_break(0);
727     GNUNET_free(plugin);
728     return GNUNET_SYSERR;
729   }
730   optionstr[0] = '\0';
731   optionstr++;
732   options = atol (optionstr);
733   address = strchr (optionstr, '.');
734   if (NULL == address)
735   {
736     GNUNET_break(0);
737     GNUNET_free(plugin);
738     return GNUNET_SYSERR;
739   }
740   address[0] = '\0';
741   address++;
742
743   if (GNUNET_OK !=
744       GNUNET_STRINGS_to_address_ip (address, strlen (address),
745                                     &socket_address))
746   {
747     GNUNET_break(0);
748     GNUNET_free(plugin);
749     return GNUNET_SYSERR;
750   }
751
752   GNUNET_free(plugin);
753
754   switch (socket_address.ss_family)
755   {
756   case AF_INET:
757   {
758     struct IPv4UdpAddress *u4;
759     struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
760     u4 = GNUNET_new (struct IPv4UdpAddress);
761     u4->options = htonl (options);
762     u4->ipv4_addr = in4->sin_addr.s_addr;
763     u4->u4_port = in4->sin_port;
764     *buf = u4;
765     *added = sizeof(struct IPv4UdpAddress);
766     return GNUNET_OK;
767   }
768   case AF_INET6:
769   {
770     struct IPv6UdpAddress *u6;
771     struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
772     u6 = GNUNET_new (struct IPv6UdpAddress);
773     u6->options = htonl (options);
774     u6->ipv6_addr = in6->sin6_addr;
775     u6->u6_port = in6->sin6_port;
776     *buf = u6;
777     *added = sizeof(struct IPv6UdpAddress);
778     return GNUNET_OK;
779   }
780   default:
781     GNUNET_break(0);
782     return GNUNET_SYSERR;
783   }
784 }
785
786
787 /**
788  * Append our port and forward the result.
789  *
790  * @param cls a `struct PrettyPrinterContext *`
791  * @param hostname result from DNS resolver
792  */
793 static void
794 append_port (void *cls,
795              const char *hostname)
796 {
797   struct PrettyPrinterContext *ppc = cls;
798   struct Plugin *plugin = ppc->plugin;
799   char *ret;
800
801   if (NULL == hostname)
802   {
803     /* Final call, done */
804     ppc->asc (ppc->asc_cls,
805               NULL,
806               GNUNET_OK);
807     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
808                                  plugin->ppc_dll_tail,
809                                  ppc);
810     ppc->resolver_handle = NULL;
811     GNUNET_free (ppc);
812     return;
813   }
814   if (GNUNET_YES == ppc->ipv6)
815     GNUNET_asprintf (&ret,
816                      "%s.%u.[%s]:%d",
817                      PLUGIN_NAME,
818                      ppc->options,
819                      hostname,
820                      ppc->port);
821   else
822     GNUNET_asprintf (&ret,
823                      "%s.%u.%s:%d",
824                      PLUGIN_NAME,
825                      ppc->options,
826                      hostname,
827                      ppc->port);
828   ppc->asc (ppc->asc_cls,
829             ret,
830             GNUNET_OK);
831   GNUNET_free (ret);
832 }
833
834
835 /**
836  * Convert the transports address to a nice, human-readable
837  * format.
838  *
839  * @param cls closure with the `struct Plugin *`
840  * @param type name of the transport that generated the address
841  * @param addr one of the addresses of the host, NULL for the last address
842  *        the specific address format depends on the transport
843  * @param addrlen length of the address
844  * @param numeric should (IP) addresses be displayed in numeric form?
845  * @param timeout after how long should we give up?
846  * @param asc function to call on each string
847  * @param asc_cls closure for @a asc
848  */
849 static void
850 udp_plugin_address_pretty_printer (void *cls,
851                                    const char *type,
852                                    const void *addr,
853                                    size_t addrlen,
854                                    int numeric,
855                                    struct GNUNET_TIME_Relative timeout,
856                                    GNUNET_TRANSPORT_AddressStringCallback asc,
857                                    void *asc_cls)
858 {
859   struct Plugin *plugin = cls;
860   struct PrettyPrinterContext *ppc;
861   const void *sb;
862   size_t sbs;
863   struct sockaddr_in a4;
864   struct sockaddr_in6 a6;
865   const struct IPv4UdpAddress *u4;
866   const struct IPv6UdpAddress *u6;
867   uint16_t port;
868   uint32_t options;
869
870   if (addrlen == sizeof(struct IPv6UdpAddress))
871   {
872     u6 = addr;
873     memset (&a6, 0, sizeof(a6));
874     a6.sin6_family = AF_INET6;
875 #if HAVE_SOCKADDR_IN_SIN_LEN
876     a6.sin6_len = sizeof (a6);
877 #endif
878     a6.sin6_port = u6->u6_port;
879     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
880     port = ntohs (u6->u6_port);
881     options = ntohl (u6->options);
882     sb = &a6;
883     sbs = sizeof(a6);
884   }
885   else if (addrlen == sizeof(struct IPv4UdpAddress))
886   {
887     u4 = addr;
888     memset (&a4, 0, sizeof(a4));
889     a4.sin_family = AF_INET;
890 #if HAVE_SOCKADDR_IN_SIN_LEN
891     a4.sin_len = sizeof (a4);
892 #endif
893     a4.sin_port = u4->u4_port;
894     a4.sin_addr.s_addr = u4->ipv4_addr;
895     port = ntohs (u4->u4_port);
896     options = ntohl (u4->options);
897     sb = &a4;
898     sbs = sizeof(a4);
899   }
900   else
901   {
902     /* invalid address */
903     GNUNET_break_op (0);
904     asc (asc_cls, NULL , GNUNET_SYSERR);
905     asc (asc_cls, NULL, GNUNET_OK);
906     return;
907   }
908   ppc = GNUNET_new (struct PrettyPrinterContext);
909   ppc->plugin = plugin;
910   ppc->asc = asc;
911   ppc->asc_cls = asc_cls;
912   ppc->port = port;
913   ppc->options = options;
914   if (addrlen == sizeof(struct IPv6UdpAddress))
915     ppc->ipv6 = GNUNET_YES;
916   else
917     ppc->ipv6 = GNUNET_NO;
918   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
919                                plugin->ppc_dll_tail,
920                                ppc);
921   ppc->resolver_handle
922     = GNUNET_RESOLVER_hostname_get (sb,
923                                     sbs,
924                                     ! numeric,
925                                     timeout,
926                                     &append_port, ppc);
927 }
928
929
930 /**
931  * FIXME.
932  */
933 static void
934 call_continuation (struct UDP_MessageWrapper *udpw,
935                    int result)
936 {
937   struct Session *session = udpw->session;
938   struct Plugin *plugin = session->plugin;
939   size_t overhead;
940
941   LOG (GNUNET_ERROR_TYPE_DEBUG,
942        "Calling continuation for %u byte message to `%s' with result %s\n",
943        udpw->payload_size, GNUNET_i2s (&udpw->session->target),
944        (GNUNET_OK == result) ? "OK" : "SYSERR");
945
946   if (udpw->msg_size >= udpw->payload_size)
947     overhead = udpw->msg_size - udpw->payload_size;
948   else
949     overhead = udpw->msg_size;
950
951   switch (result)
952   {
953   case GNUNET_OK:
954     switch (udpw->msg_type)
955     {
956     case UMT_MSG_UNFRAGMENTED:
957       if (NULL != udpw->cont)
958       {
959         /* Transport continuation */
960         udpw->cont (udpw->cont_cls, &udpw->session->target, result,
961             udpw->payload_size, udpw->msg_size);
962       }
963       GNUNET_STATISTICS_update (plugin->env->stats,
964           "# UDP, unfragmented msgs, messages, sent, success", 1, GNUNET_NO);
965       GNUNET_STATISTICS_update (plugin->env->stats,
966           "# UDP, unfragmented msgs, bytes payload, sent, success",
967           udpw->payload_size, GNUNET_NO);
968       GNUNET_STATISTICS_update (plugin->env->stats,
969           "# UDP, unfragmented msgs, bytes overhead, sent, success", overhead,
970           GNUNET_NO);
971       GNUNET_STATISTICS_update (plugin->env->stats,
972           "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO);
973       GNUNET_STATISTICS_update (plugin->env->stats,
974           "# UDP, total, bytes payload, sent", udpw->payload_size, GNUNET_NO);
975       break;
976     case UMT_MSG_FRAGMENTED_COMPLETE:
977       GNUNET_assert(NULL != udpw->frag_ctx);
978       if (udpw->frag_ctx->cont != NULL )
979         udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target,
980             GNUNET_OK, udpw->frag_ctx->payload_size,
981             udpw->frag_ctx->on_wire_size);
982       GNUNET_STATISTICS_update (plugin->env->stats,
983           "# UDP, fragmented msgs, messages, sent, success", 1, GNUNET_NO);
984       GNUNET_STATISTICS_update (plugin->env->stats,
985           "# UDP, fragmented msgs, bytes payload, sent, success",
986           udpw->payload_size, GNUNET_NO);
987       GNUNET_STATISTICS_update (plugin->env->stats,
988           "# UDP, fragmented msgs, bytes overhead, sent, success", overhead,
989           GNUNET_NO);
990       GNUNET_STATISTICS_update (plugin->env->stats,
991           "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO);
992       GNUNET_STATISTICS_update (plugin->env->stats,
993           "# UDP, total, bytes payload, sent", udpw->payload_size, GNUNET_NO);
994       GNUNET_STATISTICS_update (plugin->env->stats,
995           "# UDP, fragmented msgs, messages, pending", -1, GNUNET_NO);
996       break;
997     case UMT_MSG_FRAGMENTED:
998       /* Fragmented message: enqueue next fragment */
999       if (NULL != udpw->cont)
1000         udpw->cont (udpw->cont_cls, &udpw->session->target, result,
1001             udpw->payload_size, udpw->msg_size);
1002       GNUNET_STATISTICS_update (plugin->env->stats,
1003           "# UDP, fragmented msgs, fragments, sent, success", 1, GNUNET_NO);
1004       GNUNET_STATISTICS_update (plugin->env->stats,
1005           "# UDP, fragmented msgs, fragments bytes, sent, success",
1006           udpw->msg_size, GNUNET_NO);
1007       break;
1008     case UMT_MSG_ACK:
1009       /* No continuation */
1010       GNUNET_STATISTICS_update (plugin->env->stats,
1011           "# UDP, ACK msgs, messages, sent, success", 1, GNUNET_NO);
1012       GNUNET_STATISTICS_update (plugin->env->stats,
1013           "# UDP, ACK msgs, bytes overhead, sent, success", overhead,
1014           GNUNET_NO);
1015       GNUNET_STATISTICS_update (plugin->env->stats,
1016           "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO);
1017       break;
1018     default:
1019       GNUNET_break(0);
1020       break;
1021     }
1022     break;
1023   case GNUNET_SYSERR:
1024     switch (udpw->msg_type)
1025     {
1026     case UMT_MSG_UNFRAGMENTED:
1027       /* Unfragmented message: failed to send */
1028       if (NULL != udpw->cont)
1029         udpw->cont (udpw->cont_cls, &udpw->session->target, result,
1030             udpw->payload_size, overhead);
1031       GNUNET_STATISTICS_update (plugin->env->stats,
1032           "# UDP, unfragmented msgs, messages, sent, failure", 1, GNUNET_NO);
1033       GNUNET_STATISTICS_update (plugin->env->stats,
1034           "# UDP, unfragmented msgs, bytes payload, sent, failure",
1035           udpw->payload_size, GNUNET_NO);
1036       GNUNET_STATISTICS_update (plugin->env->stats,
1037           "# UDP, unfragmented msgs, bytes overhead, sent, failure", overhead,
1038           GNUNET_NO);
1039       break;
1040     case UMT_MSG_FRAGMENTED_COMPLETE:
1041       GNUNET_assert(NULL != udpw->frag_ctx);
1042       if (udpw->frag_ctx->cont != NULL )
1043         udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target,
1044             GNUNET_SYSERR, udpw->frag_ctx->payload_size,
1045             udpw->frag_ctx->on_wire_size);
1046       GNUNET_STATISTICS_update (plugin->env->stats,
1047           "# UDP, fragmented msgs, messages, sent, failure", 1, GNUNET_NO);
1048       GNUNET_STATISTICS_update (plugin->env->stats,
1049           "# UDP, fragmented msgs, bytes payload, sent, failure",
1050           udpw->payload_size, GNUNET_NO);
1051       GNUNET_STATISTICS_update (plugin->env->stats,
1052           "# UDP, fragmented msgs, bytes payload, sent, failure", overhead,
1053           GNUNET_NO);
1054       GNUNET_STATISTICS_update (plugin->env->stats,
1055           "# UDP, fragmented msgs, bytes payload, sent, failure", overhead,
1056           GNUNET_NO);
1057       GNUNET_STATISTICS_update (plugin->env->stats,
1058           "# UDP, fragmented msgs, messages, pending", -1, GNUNET_NO);
1059       break;
1060     case UMT_MSG_FRAGMENTED:
1061       GNUNET_assert(NULL != udpw->frag_ctx);
1062       /* Fragmented message: failed to send */
1063       GNUNET_STATISTICS_update (plugin->env->stats,
1064           "# UDP, fragmented msgs, fragments, sent, failure", 1, GNUNET_NO);
1065       GNUNET_STATISTICS_update (plugin->env->stats,
1066           "# UDP, fragmented msgs, fragments bytes, sent, failure",
1067           udpw->msg_size, GNUNET_NO);
1068       break;
1069     case UMT_MSG_ACK:
1070       /* ACK message: failed to send */
1071       GNUNET_STATISTICS_update (plugin->env->stats,
1072           "# UDP, ACK msgs, messages, sent, failure", 1, GNUNET_NO);
1073       break;
1074     default:
1075       GNUNET_break(0);
1076       break;
1077     }
1078     break;
1079   default:
1080     GNUNET_break(0);
1081     break;
1082   }
1083 }
1084
1085
1086 /**
1087  * Check if the given port is plausible (must be either our listen
1088  * port or our advertised port).  If it is neither, we return
1089  * #GNUNET_SYSERR.
1090  *
1091  * @param plugin global variables
1092  * @param in_port port number to check
1093  * @return #GNUNET_OK if port is either open_port or adv_port
1094  */
1095 static int
1096 check_port (struct Plugin *plugin,
1097             uint16_t in_port)
1098 {
1099   if ((in_port == plugin->port) || (in_port == plugin->aport))
1100     return GNUNET_OK;
1101   return GNUNET_SYSERR;
1102 }
1103
1104
1105 /**
1106  * Function that will be called to check if a binary address for this
1107  * plugin is well-formed and corresponds to an address for THIS peer
1108  * (as per our configuration).  Naturally, if absolutely necessary,
1109  * plugins can be a bit conservative in their answer, but in general
1110  * plugins should make sure that the address does not redirect
1111  * traffic to a 3rd party that might try to man-in-the-middle our
1112  * traffic.
1113  *
1114  * @param cls closure, should be our handle to the Plugin
1115  * @param addr pointer to the address
1116  * @param addrlen length of @a addr
1117  * @return #GNUNET_OK if this is a plausible address for this peer
1118  *         and transport, #GNUNET_SYSERR if not
1119  */
1120 static int
1121 udp_plugin_check_address (void *cls,
1122                           const void *addr,
1123                           size_t addrlen)
1124 {
1125   struct Plugin *plugin = cls;
1126   struct IPv4UdpAddress *v4;
1127   struct IPv6UdpAddress *v6;
1128
1129   if ((addrlen != sizeof(struct IPv4UdpAddress))
1130       && (addrlen != sizeof(struct IPv6UdpAddress)))
1131   {
1132     GNUNET_break_op(0);
1133     return GNUNET_SYSERR;
1134   }
1135   if (addrlen == sizeof(struct IPv4UdpAddress))
1136   {
1137     v4 = (struct IPv4UdpAddress *) addr;
1138     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1139       return GNUNET_SYSERR;
1140     if (GNUNET_OK
1141         != GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
1142             sizeof(struct in_addr)))
1143       return GNUNET_SYSERR;
1144   }
1145   else
1146   {
1147     v6 = (struct IPv6UdpAddress *) addr;
1148     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1149     {
1150       GNUNET_break_op(0);
1151       return GNUNET_SYSERR;
1152     }
1153     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
1154       return GNUNET_SYSERR;
1155     if (GNUNET_OK !=
1156         GNUNET_NAT_test_address (plugin->nat,
1157                                  &v6->ipv6_addr,
1158                                  sizeof(struct in6_addr)))
1159       return GNUNET_SYSERR;
1160   }
1161   return GNUNET_OK;
1162 }
1163
1164
1165 /**
1166  * Function to free last resources associated with a session.
1167  *
1168  * @param s session to free
1169  */
1170 static void
1171 free_session (struct Session *s)
1172 {
1173   if (NULL != s->frag_ctx)
1174   {
1175     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, NULL, NULL );
1176     GNUNET_free(s->frag_ctx);
1177     s->frag_ctx = NULL;
1178   }
1179   GNUNET_free(s);
1180 }
1181
1182
1183 /**
1184  * Remove a message from the transmission queue.
1185  *
1186  * @param plugin the UDP plugin
1187  * @param udpw message wrapper to queue
1188  */
1189 static void
1190 dequeue (struct Plugin *plugin,
1191          struct UDP_MessageWrapper *udpw)
1192 {
1193   struct Session *session = udpw->session;
1194
1195   if (plugin->bytes_in_buffer < udpw->msg_size)
1196   {
1197     GNUNET_break (0);
1198   }
1199   else
1200   {
1201     GNUNET_STATISTICS_update (plugin->env->stats,
1202                               "# UDP, total, bytes in buffers",
1203                               -(long long) udpw->msg_size,
1204                               GNUNET_NO);
1205     plugin->bytes_in_buffer -= udpw->msg_size;
1206   }
1207   GNUNET_STATISTICS_update (plugin->env->stats,
1208                             "# UDP, total, msgs in buffers",
1209                             -1, GNUNET_NO);
1210   if (udpw->session->address->address_length == sizeof(struct IPv4UdpAddress))
1211     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1212                                  plugin->ipv4_queue_tail,
1213                                  udpw);
1214   else if (udpw->session->address->address_length == sizeof(struct IPv6UdpAddress))
1215     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1216                                  plugin->ipv6_queue_tail,
1217                                  udpw);
1218   else
1219   {
1220     GNUNET_break (0);
1221     return;
1222   }
1223   GNUNET_assert (session->msgs_in_queue > 0);
1224   session->msgs_in_queue--;
1225   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1226   session->bytes_in_queue -= udpw->msg_size;
1227 }
1228
1229
1230 /**
1231  * FIXME.
1232  */
1233 static void
1234 fragmented_message_done (struct UDP_FragmentationContext *fc,
1235                          int result)
1236 {
1237   struct Plugin *plugin = fc->plugin;
1238   struct Session *s = fc->session;
1239   struct UDP_MessageWrapper *udpw;
1240   struct UDP_MessageWrapper *tmp;
1241   struct UDP_MessageWrapper dummy;
1242
1243   LOG (GNUNET_ERROR_TYPE_DEBUG,
1244        "%p : Fragmented message removed with result %s\n",
1245        fc,
1246        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1247
1248   /* Call continuation for fragmented message */
1249   memset (&dummy, 0, sizeof(dummy));
1250   dummy.msg_type = UMT_MSG_FRAGMENTED_COMPLETE;
1251   dummy.msg_size = s->frag_ctx->on_wire_size;
1252   dummy.payload_size = s->frag_ctx->payload_size;
1253   dummy.frag_ctx = s->frag_ctx;
1254   dummy.cont = NULL;
1255   dummy.cont_cls = NULL;
1256   dummy.session = s;
1257   call_continuation (&dummy, result);
1258   /* Remove leftover fragments from queue */
1259   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1260   {
1261     udpw = plugin->ipv6_queue_head;
1262     while (NULL != udpw)
1263     {
1264       tmp = udpw->next;
1265       if ((udpw->frag_ctx != NULL )&& (udpw->frag_ctx == s->frag_ctx)){
1266       dequeue (plugin, udpw);
1267       call_continuation (udpw, GNUNET_SYSERR);
1268       GNUNET_free (udpw);
1269     }
1270       udpw = tmp;
1271     }
1272   }
1273   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1274   {
1275     udpw = plugin->ipv4_queue_head;
1276     while (udpw != NULL )
1277     {
1278       tmp = udpw->next;
1279       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
1280       {
1281         dequeue (plugin, udpw);
1282         call_continuation (udpw, GNUNET_SYSERR);
1283         GNUNET_free(udpw);
1284       }
1285       udpw = tmp;
1286     }
1287   }
1288   notify_session_monitor (s->plugin,
1289                           s,
1290                           GNUNET_TRANSPORT_SS_UP);
1291   /* Destroy fragmentation context */
1292   GNUNET_FRAGMENT_context_destroy (fc->frag,
1293                                    &s->last_expected_msg_delay,
1294                                    &s->last_expected_ack_delay);
1295   s->frag_ctx = NULL;
1296   GNUNET_free (fc);
1297 }
1298
1299
1300 /**
1301  * Scan the heap for a receive context with the given address.
1302  *
1303  * @param cls the `struct FindReceiveContext`
1304  * @param node internal node of the heap
1305  * @param element value stored at the node (a 'struct ReceiveContext')
1306  * @param cost cost associated with the node
1307  * @return #GNUNET_YES if we should continue to iterate,
1308  *         #GNUNET_NO if not.
1309  */
1310 static int
1311 find_receive_context (void *cls,
1312                       struct GNUNET_CONTAINER_HeapNode *node,
1313                       void *element,
1314                       GNUNET_CONTAINER_HeapCostType cost)
1315 {
1316   struct FindReceiveContext *frc = cls;
1317   struct DefragContext *e = element;
1318
1319   if ((frc->addr_len == e->addr_len)
1320       && (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1321   {
1322     frc->rc = e;
1323     return GNUNET_NO;
1324   }
1325   return GNUNET_YES;
1326 }
1327
1328
1329 /**
1330  * Functions with this signature are called whenever we need
1331  * to close a session due to a disconnect or failure to
1332  * establish a connection.
1333  *
1334  * @param cls closure with the `struct Plugin`
1335  * @param s session to close down
1336  * @return #GNUNET_OK on success
1337  */
1338 static int
1339 udp_disconnect_session (void *cls,
1340                         struct Session *s)
1341 {
1342   struct Plugin *plugin = cls;
1343   struct UDP_MessageWrapper *udpw;
1344   struct UDP_MessageWrapper *next;
1345   struct FindReceiveContext frc;
1346
1347   GNUNET_assert(GNUNET_YES != s->in_destroy);
1348   LOG(GNUNET_ERROR_TYPE_DEBUG, "Session %p to peer `%s' address ended\n", s,
1349       GNUNET_i2s (&s->target),
1350       udp_address_to_string (NULL, s->address->address, s->address->address_length));
1351   /* stop timeout task */
1352   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
1353   {
1354     GNUNET_SCHEDULER_cancel (s->timeout_task);
1355     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1356   }
1357   if (NULL != s->frag_ctx)
1358   {
1359     /* Remove fragmented message due to disconnect */
1360     fragmented_message_done (s->frag_ctx, GNUNET_SYSERR);
1361   }
1362
1363   frc.rc = NULL;
1364   frc.addr = s->address->address;
1365   frc.addr_len = s->address->address_length;
1366   /* Lookup existing receive context for this address */
1367   if (NULL != plugin->defrag_ctxs)
1368   {
1369     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1370                                    &find_receive_context,
1371                                    &frc);
1372     if (NULL != frc.rc)
1373     {
1374       struct DefragContext *d_ctx = frc.rc;
1375
1376       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
1377       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1378       GNUNET_free (d_ctx);
1379     }
1380   }
1381   next = plugin->ipv4_queue_head;
1382   while (NULL != (udpw = next))
1383   {
1384     next = udpw->next;
1385     if (udpw->session == s)
1386     {
1387       dequeue (plugin, udpw);
1388       call_continuation (udpw, GNUNET_SYSERR);
1389       GNUNET_free(udpw);
1390     }
1391   }
1392   next = plugin->ipv6_queue_head;
1393   while (NULL != (udpw = next))
1394   {
1395     next = udpw->next;
1396     if (udpw->session == s)
1397     {
1398       dequeue (plugin, udpw);
1399       call_continuation (udpw, GNUNET_SYSERR);
1400       GNUNET_free(udpw);
1401     }
1402   }
1403   notify_session_monitor (s->plugin,
1404                           s,
1405                           GNUNET_TRANSPORT_SS_DOWN);
1406   plugin->env->session_end (plugin->env->cls,
1407                             s->address,
1408                             s);
1409
1410   if (NULL != s->frag_ctx)
1411   {
1412     if (NULL != s->frag_ctx->cont)
1413     {
1414       s->frag_ctx->cont (s->frag_ctx->cont_cls,
1415                          &s->target,
1416                          GNUNET_SYSERR,
1417                          s->frag_ctx->payload_size,
1418                          s->frag_ctx->on_wire_size);
1419       LOG (GNUNET_ERROR_TYPE_DEBUG,
1420            "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1421            GNUNET_i2s (&s->target));
1422     }
1423   }
1424
1425   GNUNET_assert(GNUNET_YES ==
1426                 GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
1427                                                       &s->target,
1428                                                       s));
1429   GNUNET_STATISTICS_set (plugin->env->stats,
1430                          "# UDP sessions active",
1431                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
1432                          GNUNET_NO);
1433   if (s->rc > 0)
1434   {
1435     s->in_destroy = GNUNET_YES;
1436   }
1437   else
1438   {
1439     GNUNET_HELLO_address_free (s->address);
1440     free_session (s);
1441   }
1442   return GNUNET_OK;
1443 }
1444
1445
1446 /**
1447  * Function that is called to get the keepalive factor.
1448  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
1449  * calculate the interval between keepalive packets.
1450  *
1451  * @param cls closure with the `struct Plugin`
1452  * @return keepalive factor
1453  */
1454 static unsigned int
1455 udp_query_keepalive_factor (void *cls)
1456 {
1457   return 15;
1458 }
1459
1460
1461 /**
1462  * Destroy a session, plugin is being unloaded.
1463  *
1464  * @param cls the `struct Plugin`
1465  * @param key hash of public key of target peer
1466  * @param value a `struct PeerSession *` to clean up
1467  * @return #GNUNET_OK (continue to iterate)
1468  */
1469 static int
1470 disconnect_and_free_it (void *cls,
1471                         const struct GNUNET_PeerIdentity *key,
1472                         void *value)
1473 {
1474   struct Plugin *plugin = cls;
1475
1476   udp_disconnect_session (plugin, value);
1477   return GNUNET_OK;
1478 }
1479
1480
1481 /**
1482  * Disconnect from a remote node.  Clean up session if we have one for
1483  * this peer.
1484  *
1485  * @param cls closure for this call (should be handle to Plugin)
1486  * @param target the peeridentity of the peer to disconnect
1487  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
1488  */
1489 static void
1490 udp_disconnect (void *cls,
1491                 const struct GNUNET_PeerIdentity *target)
1492 {
1493   struct Plugin *plugin = cls;
1494
1495   LOG (GNUNET_ERROR_TYPE_DEBUG,
1496        "Disconnecting from peer `%s'\n",
1497        GNUNET_i2s (target));
1498   /* Clean up sessions */
1499   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1500                                               target,
1501                                               &disconnect_and_free_it,
1502                                               plugin);
1503 }
1504
1505
1506 /**
1507  * Session was idle, so disconnect it
1508  *
1509  * @param cls the `struct Session` to time out
1510  * @param tc scheduler context
1511  */
1512 static void
1513 session_timeout (void *cls,
1514                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1515 {
1516   struct Session *s = cls;
1517   struct Plugin *plugin = s->plugin;
1518   struct GNUNET_TIME_Relative left;
1519
1520   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1521   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
1522   if (left.rel_value_us > 0)
1523   {
1524     /* not actually our turn yet, but let's at least update
1525        the monitor, it may think we're about to die ... */
1526     notify_session_monitor (s->plugin,
1527                             s,
1528                             GNUNET_TRANSPORT_SS_UP);
1529     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
1530                                                     &session_timeout,
1531                                                     s);
1532     return;
1533   }
1534   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1535               "Session %p was idle for %s, disconnecting\n",
1536               s,
1537               GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
1538                                                       GNUNET_YES));
1539   /* call session destroy function */
1540   udp_disconnect_session (plugin, s);
1541 }
1542
1543
1544 /**
1545  * Increment session timeout due to activity
1546  *
1547  * @param s session to reschedule timeout activity for
1548  */
1549 static void
1550 reschedule_session_timeout (struct Session *s)
1551 {
1552   if (GNUNET_YES == s->in_destroy)
1553     return;
1554   GNUNET_assert(GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1555   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1556 }
1557
1558
1559 /**
1560  * FIXME.
1561  */
1562 static struct Session *
1563 create_session (struct Plugin *plugin,
1564                 const struct GNUNET_HELLO_Address *address)
1565 {
1566   struct Session *s;
1567
1568   s = GNUNET_new (struct Session);
1569   s->plugin = plugin;
1570   s->address = GNUNET_HELLO_address_copy (address);
1571   s->target = address->peer;
1572   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (
1573       GNUNET_TIME_UNIT_MILLISECONDS, 250);
1574   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1575   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1576   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1577   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1578   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
1579                                                   &session_timeout, s);
1580   return s;
1581 }
1582
1583
1584
1585 /**
1586  * Function obtain the network type for a session
1587  *
1588  * @param cls closure ('struct Plugin*')
1589  * @param session the session
1590  * @return the network type
1591  */
1592 static enum GNUNET_ATS_Network_Type
1593 udp_get_network (void *cls,
1594                  struct Session *session)
1595 {
1596   return ntohl (session->ats.value);
1597 }
1598
1599
1600 /**
1601  * Find a session with a matching address.
1602  *
1603  * @param cls the `struct SessionCompareContext *`
1604  * @param key peer identity (unused)
1605  * @param value the `struct Session *`
1606  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1607  */
1608 static int
1609 session_cmp_it (void *cls,
1610                 const struct GNUNET_PeerIdentity *key,
1611                 void *value)
1612 {
1613   struct SessionCompareContext *cctx = cls;
1614   const struct GNUNET_HELLO_Address *address = cctx->address;
1615   struct Session *s = value;
1616
1617   LOG (GNUNET_ERROR_TYPE_DEBUG,
1618        "Comparing address %s <-> %s\n",
1619        udp_address_to_string (NULL,
1620                               address->address,
1621                               address->address_length),
1622        udp_address_to_string (NULL,
1623                               s->address->address,
1624                               s->address->address_length));
1625   if (0 == GNUNET_HELLO_address_cmp(s->address, cctx->address))
1626   {
1627     cctx->res = s;
1628     return GNUNET_NO;
1629   }
1630   return GNUNET_YES;
1631 }
1632
1633
1634 /**
1635  * Creates a new outbound session the transport service will use to
1636  * send data to the peer
1637  *
1638  * @param cls the plugin
1639  * @param address the address
1640  * @return the session or NULL of max connections exceeded
1641  */
1642 static struct Session *
1643 udp_plugin_lookup_session (void *cls,
1644                            const struct GNUNET_HELLO_Address *address)
1645 {
1646   struct Plugin * plugin = cls;
1647   struct IPv6UdpAddress *udp_a6;
1648   struct IPv4UdpAddress *udp_a4;
1649   struct SessionCompareContext cctx;
1650
1651   if ( (NULL == address->address) ||
1652        ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1653         (address->address_length != sizeof (struct IPv6UdpAddress))))
1654   {
1655     LOG (GNUNET_ERROR_TYPE_WARNING,
1656          _("Trying to create session for address of unexpected length %u (should be %u or %u)\n"),
1657          address->address_length,
1658          sizeof (struct IPv4UdpAddress),
1659          sizeof (struct IPv6UdpAddress));
1660     return NULL;
1661   }
1662
1663   if (address->address_length == sizeof(struct IPv4UdpAddress))
1664   {
1665     if (plugin->sockv4 == NULL)
1666       return NULL;
1667     udp_a4 = (struct IPv4UdpAddress *) address->address;
1668     if (udp_a4->u4_port == 0)
1669       return NULL;
1670   }
1671
1672   if (address->address_length == sizeof(struct IPv6UdpAddress))
1673   {
1674     if (plugin->sockv6 == NULL)
1675       return NULL;
1676     udp_a6 = (struct IPv6UdpAddress *) address->address;
1677     if (udp_a6->u6_port == 0)
1678       return NULL;
1679   }
1680
1681   /* check if session already exists */
1682   cctx.address = address;
1683   cctx.res = NULL;
1684   LOG (GNUNET_ERROR_TYPE_DEBUG,
1685        "Looking for existing session for peer `%s' `%s' \n",
1686        GNUNET_i2s (&address->peer),
1687        udp_address_to_string(NULL, address->address, address->address_length));
1688   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions, &address->peer,
1689       session_cmp_it, &cctx);
1690   if (cctx.res != NULL )
1691   {
1692     LOG (GNUNET_ERROR_TYPE_DEBUG,
1693          "Found existing session %p\n",
1694          cctx.res);
1695     return cctx.res;
1696   }
1697   return NULL;
1698 }
1699
1700
1701 /**
1702  * Context to lookup a session based on a IP address
1703  */
1704 struct LookupContext
1705 {
1706   /**
1707    * The result
1708    */
1709   struct Session *res;
1710
1711   /**
1712    * The socket address
1713    */
1714   const struct sockaddr *address;
1715
1716   /**
1717    * The socket address length
1718    */
1719   size_t addr_len;
1720
1721   /**
1722    * Is a fragmentation context required for the session
1723    */
1724   int must_have_frag_ctx;
1725 };
1726
1727
1728 /**
1729  * Find a session with a matching address.
1730  * FIXME: very similar code to #udp_plugin_lookup_session() above.
1731  * Unify?
1732  *
1733  * @param cls the `struct LookupContext *`
1734  * @param key peer identity (unused)
1735  * @param value the `struct Session *`
1736  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1737  */
1738 static int
1739 lookup_session_by_sockaddr_it (void *cls,
1740                                const struct GNUNET_PeerIdentity *key,
1741                                void *value)
1742 {
1743   struct LookupContext *l_ctx = cls;
1744   struct Session *s = value;
1745   struct IPv4UdpAddress u4;
1746   struct IPv6UdpAddress u6;
1747   void *arg;
1748   size_t args;
1749
1750   /* convert address */
1751   switch (l_ctx->address->sa_family)
1752   {
1753   case AF_INET:
1754     GNUNET_assert(l_ctx->addr_len == sizeof(struct sockaddr_in));
1755     memset (&u4, 0, sizeof(u4));
1756     u6.options = htonl (0);
1757     u4.ipv4_addr = ((struct sockaddr_in *) l_ctx->address)->sin_addr.s_addr;
1758     u4.u4_port = ((struct sockaddr_in *) l_ctx->address)->sin_port;
1759     arg = &u4;
1760     args = sizeof(u4);
1761     break;
1762   case AF_INET6:
1763     GNUNET_assert(l_ctx->addr_len == sizeof(struct sockaddr_in6));
1764     memset (&u6, 0, sizeof(u6));
1765     u6.options = htonl (0);
1766     u6.ipv6_addr = ((struct sockaddr_in6 *) l_ctx->address)->sin6_addr;
1767     u6.u6_port = ((struct sockaddr_in6 *) l_ctx->address)->sin6_port;
1768     arg = &u6;
1769     args = sizeof(u6);
1770     break;
1771   default:
1772     GNUNET_break(0);
1773     return GNUNET_YES;
1774   }
1775   if ( (GNUNET_YES == l_ctx->must_have_frag_ctx) &&
1776        (NULL == s->frag_ctx))
1777     return GNUNET_YES;
1778
1779   /* Does not compare peer identities but addresses */
1780   if ((args == s->address->address_length) &&
1781       (0 == memcmp (arg, s->address->address, args)))
1782   {
1783     l_ctx->res = s;
1784     return GNUNET_NO;
1785   }
1786   return GNUNET_YES;
1787 }
1788
1789
1790 static struct Session *
1791 udp_plugin_create_session (void *cls,
1792                            const struct GNUNET_HELLO_Address *address)
1793 {
1794   struct Plugin *plugin = cls;
1795   struct Session *s;
1796   struct IPv4UdpAddress *udp_v4;
1797   struct IPv6UdpAddress *udp_v6;
1798
1799   s = create_session (plugin, address);
1800   if (sizeof (struct IPv4UdpAddress) == address->address_length)
1801   {
1802     struct sockaddr_in v4;
1803     udp_v4 = (struct IPv4UdpAddress *) address->address;
1804     memset (&v4, '\0', sizeof (v4));
1805     v4.sin_family = AF_INET;
1806 #if HAVE_SOCKADDR_IN_SIN_LEN
1807     v4.sin_len = sizeof (struct sockaddr_in);
1808 #endif
1809     v4.sin_port = udp_v4->u4_port;
1810     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
1811     s->ats = plugin->env->get_address_type (plugin->env->cls,
1812                                             (const struct sockaddr *) &v4,
1813                                             sizeof (v4));
1814   }
1815   else if (sizeof (struct IPv6UdpAddress) == address->address_length)
1816   {
1817     struct sockaddr_in6 v6;
1818     udp_v6 = (struct IPv6UdpAddress *) address->address;
1819     memset (&v6, '\0', sizeof (v6));
1820     v6.sin6_family = AF_INET6;
1821 #if HAVE_SOCKADDR_IN_SIN_LEN
1822     v6.sin6_len = sizeof (struct sockaddr_in6);
1823 #endif
1824     v6.sin6_port = udp_v6->u6_port;
1825     v6.sin6_addr = udp_v6->ipv6_addr;
1826     s->ats = plugin->env->get_address_type (plugin->env->cls,
1827         (const struct sockaddr *) &v6, sizeof (v6));
1828   }
1829
1830   if (NULL == s)
1831     return NULL; /* protocol not supported or address invalid */
1832   LOG(GNUNET_ERROR_TYPE_DEBUG,
1833       "Creating new %s session %p for peer `%s' address `%s'\n",
1834       GNUNET_HELLO_address_check_option (address, GNUNET_HELLO_ADDRESS_INFO_INBOUND) ? "inbound" : "outbound",
1835       s, GNUNET_i2s (&address->peer),
1836       udp_address_to_string( NULL,address->address,address->address_length));
1837   GNUNET_assert(
1838       GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (plugin->sessions, &s->target, s, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1839   GNUNET_STATISTICS_set (plugin->env->stats, "# UDP sessions active",
1840       GNUNET_CONTAINER_multipeermap_size (plugin->sessions), GNUNET_NO);
1841   return s;
1842 }
1843
1844
1845 /**
1846  * Function that will be called whenever the transport service wants to
1847  * notify the plugin that a session is still active and in use and
1848  * therefore the session timeout for this session has to be updated
1849  *
1850  * @param cls closure
1851  * @param peer which peer was the session for
1852  * @param session which session is being updated
1853  */
1854 static void
1855 udp_plugin_update_session_timeout (void *cls,
1856                                    const struct GNUNET_PeerIdentity *peer,
1857                                    struct Session *session)
1858 {
1859   struct Plugin *plugin = cls;
1860
1861   if (GNUNET_YES !=
1862       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1863                                                     peer,
1864                                                     session))
1865   {
1866     GNUNET_break(0);
1867     return;
1868   }
1869   /* Reschedule session timeout */
1870   reschedule_session_timeout (session);
1871 }
1872
1873
1874 /**
1875  * Creates a new outbound session the transport service will use to send data to the
1876  * peer
1877  *
1878  * @param cls the plugin
1879  * @param address the address
1880  * @return the session or NULL of max connections exceeded
1881  */
1882 static struct Session *
1883 udp_plugin_get_session (void *cls,
1884                         const struct GNUNET_HELLO_Address *address)
1885 {
1886   struct Session *s;
1887
1888   if (NULL == address)
1889   {
1890     GNUNET_break(0);
1891     return NULL;
1892   }
1893   if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
1894        (address->address_length != sizeof(struct IPv6UdpAddress)) )
1895     return NULL;
1896
1897   /* otherwise create new */
1898   if (NULL != (s = udp_plugin_lookup_session (cls, address)))
1899     return s;
1900   return udp_plugin_create_session (cls, address);
1901 }
1902
1903
1904 /**
1905  * Enqueue a message for transmission.
1906  *
1907  * @param plugin the UDP plugin
1908  * @param udpw message wrapper to queue
1909  */
1910 static void
1911 enqueue (struct Plugin *plugin,
1912          struct UDP_MessageWrapper *udpw)
1913 {
1914   struct Session *session = udpw->session;
1915
1916   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1917   {
1918     GNUNET_break (0);
1919   }
1920   else
1921   {
1922     GNUNET_STATISTICS_update (plugin->env->stats,
1923         "# UDP, total, bytes in buffers", udpw->msg_size, GNUNET_NO);
1924     plugin->bytes_in_buffer += udpw->msg_size;
1925   }
1926   GNUNET_STATISTICS_update (plugin->env->stats,
1927                             "# UDP, total, msgs in buffers",
1928                             1, GNUNET_NO);
1929   if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
1930     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1931                                 plugin->ipv4_queue_tail,
1932                                 udpw);
1933   else if (udpw->session->address->address_length == sizeof (struct IPv6UdpAddress))
1934     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1935                                  plugin->ipv6_queue_tail,
1936                                  udpw);
1937   else
1938   {
1939     GNUNET_break (0);
1940     return;
1941   }
1942   session->msgs_in_queue++;
1943   session->bytes_in_queue += udpw->msg_size;
1944 }
1945
1946
1947 /**
1948  * Fragment message was transmitted via UDP, let fragmentation know
1949  * to send the next fragment now.
1950  *
1951  * @param cls the `struct UDPMessageWrapper *` of the fragment
1952  * @param target destination peer (ignored)
1953  * @param result #GNUNET_OK on success (ignored)
1954  * @param payload bytes payload sent
1955  * @param physical bytes physical sent
1956  */
1957 static void
1958 send_next_fragment (void *cls,
1959                     const struct GNUNET_PeerIdentity *target,
1960                     int result,
1961                     size_t payload,
1962                     size_t physical)
1963 {
1964   struct UDP_MessageWrapper *udpw = cls;
1965
1966   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1967 }
1968
1969
1970 /**
1971  * Function that is called with messages created by the fragmentation
1972  * module.  In the case of the 'proc' callback of the
1973  * #GNUNET_FRAGMENT_context_create() function, this function must
1974  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1975  *
1976  * @param cls closure, the 'struct FragmentationContext'
1977  * @param msg the message that was created
1978  */
1979 static void
1980 enqueue_fragment (void *cls,
1981                   const struct GNUNET_MessageHeader *msg)
1982 {
1983   struct UDP_FragmentationContext *frag_ctx = cls;
1984   struct Plugin *plugin = frag_ctx->plugin;
1985   struct UDP_MessageWrapper * udpw;
1986   size_t msg_len = ntohs (msg->size);
1987
1988   LOG (GNUNET_ERROR_TYPE_DEBUG,
1989        "Enqueuing fragment with %u bytes\n",
1990        msg_len);
1991   frag_ctx->fragments_used++;
1992   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1993   udpw->session = frag_ctx->session;
1994   udpw->msg_buf = (char *) &udpw[1];
1995   udpw->msg_size = msg_len;
1996   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
1997   udpw->cont = &send_next_fragment;
1998   udpw->cont_cls = udpw;
1999   udpw->timeout = frag_ctx->timeout;
2000   udpw->frag_ctx = frag_ctx;
2001   udpw->msg_type = UMT_MSG_FRAGMENTED;
2002   memcpy (udpw->msg_buf, msg, msg_len);
2003   enqueue (plugin, udpw);
2004   schedule_select (plugin);
2005 }
2006
2007
2008 /**
2009  * Function that can be used by the transport service to transmit
2010  * a message using the plugin.   Note that in the case of a
2011  * peer disconnecting, the continuation MUST be called
2012  * prior to the disconnect notification itself.  This function
2013  * will be called with this peer's HELLO message to initiate
2014  * a fresh connection to another peer.
2015  *
2016  * @param cls closure
2017  * @param s which session must be used
2018  * @param msgbuf the message to transmit
2019  * @param msgbuf_size number of bytes in 'msgbuf'
2020  * @param priority how important is the message (most plugins will
2021  *                 ignore message priority and just FIFO)
2022  * @param to how long to wait at most for the transmission (does not
2023  *                require plugins to discard the message after the timeout,
2024  *                just advisory for the desired delay; most plugins will ignore
2025  *                this as well)
2026  * @param cont continuation to call once the message has
2027  *        been transmitted (or if the transport is ready
2028  *        for the next transmission call; or if the
2029  *        peer disconnected...); can be NULL
2030  * @param cont_cls closure for cont
2031  * @return number of bytes used (on the physical network, with overheads);
2032  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
2033  *         and does NOT mean that the message was not transmitted (DV)
2034  */
2035 static ssize_t
2036 udp_plugin_send (void *cls,
2037                  struct Session *s,
2038                  const char *msgbuf,
2039                  size_t msgbuf_size,
2040                  unsigned int priority,
2041                  struct GNUNET_TIME_Relative to,
2042                  GNUNET_TRANSPORT_TransmitContinuation cont,
2043                  void *cont_cls)
2044 {
2045   struct Plugin *plugin = cls;
2046   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
2047   struct UDP_FragmentationContext * frag_ctx;
2048   struct UDP_MessageWrapper * udpw;
2049   struct UDPMessage *udp;
2050   char mbuf[udpmlen];
2051   GNUNET_assert(plugin != NULL);
2052   GNUNET_assert(s != NULL);
2053
2054   if ( (s->address->address_length == sizeof(struct IPv6UdpAddress)) &&
2055        (plugin->sockv6 == NULL) )
2056     return GNUNET_SYSERR;
2057   if ( (s->address->address_length == sizeof(struct IPv4UdpAddress)) &&
2058        (plugin->sockv4 == NULL) )
2059     return GNUNET_SYSERR;
2060   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2061   {
2062     GNUNET_break(0);
2063     return GNUNET_SYSERR;
2064   }
2065   if (GNUNET_YES !=
2066       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
2067                                                     &s->target,
2068                                                     s))
2069   {
2070     GNUNET_break(0);
2071     return GNUNET_SYSERR;
2072   }
2073   LOG (GNUNET_ERROR_TYPE_DEBUG,
2074        "UDP transmits %u-byte message to `%s' using address `%s'\n",
2075        udpmlen,
2076        GNUNET_i2s (&s->target),
2077        udp_address_to_string (NULL,
2078                               s->address->address,
2079                               s->address->address_length));
2080
2081   /* Message */
2082   udp = (struct UDPMessage *) mbuf;
2083   udp->header.size = htons (udpmlen);
2084   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2085   udp->reserved = htonl (0);
2086   udp->sender = *plugin->env->my_identity;
2087
2088   /* We do not update the session time out here!
2089    * Otherwise this session will not timeout since we send keep alive before
2090    * session can timeout
2091    *
2092    * For UDP we update session timeout only on receive, this will cover keep
2093    * alives, since remote peer will reply with keep alive response!
2094    */
2095   if (udpmlen <= UDP_MTU)
2096   {
2097     /* unfragmented message */
2098     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
2099     udpw->session = s;
2100     udpw->msg_buf = (char *) &udpw[1];
2101     udpw->msg_size = udpmlen; /* message size with UDP overhead */
2102     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2103     udpw->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), to);
2104     udpw->cont = cont;
2105     udpw->cont_cls = cont_cls;
2106     udpw->frag_ctx = NULL;
2107     udpw->msg_type = UMT_MSG_UNFRAGMENTED;
2108     memcpy (udpw->msg_buf, udp, sizeof(struct UDPMessage));
2109     memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)], msgbuf, msgbuf_size);
2110     enqueue (plugin, udpw);
2111
2112     GNUNET_STATISTICS_update (plugin->env->stats,
2113         "# UDP, unfragmented msgs, messages, attempt", 1, GNUNET_NO);
2114     GNUNET_STATISTICS_update (plugin->env->stats,
2115                               "# UDP, unfragmented msgs, bytes payload, attempt",
2116                               udpw->payload_size,
2117                               GNUNET_NO);
2118   }
2119   else
2120   {
2121     /* fragmented message */
2122     if (s->frag_ctx != NULL)
2123       return GNUNET_SYSERR;
2124     memcpy (&udp[1], msgbuf, msgbuf_size);
2125     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2126     frag_ctx->plugin = plugin;
2127     frag_ctx->session = s;
2128     frag_ctx->cont = cont;
2129     frag_ctx->cont_cls = cont_cls;
2130     frag_ctx->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
2131         to);
2132     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
2133     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
2134     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2135                                                      UDP_MTU,
2136                                                      &plugin->tracker,
2137                                                      s->last_expected_msg_delay,
2138                                                      s->last_expected_ack_delay,
2139                                                      &udp->header,
2140                                                      &enqueue_fragment,
2141                                                      frag_ctx);
2142     s->frag_ctx = frag_ctx;
2143     GNUNET_STATISTICS_update (plugin->env->stats,
2144                               "# UDP, fragmented msgs, messages, pending",
2145                               1,
2146                               GNUNET_NO);
2147     GNUNET_STATISTICS_update (plugin->env->stats,
2148                               "# UDP, fragmented msgs, messages, attempt",
2149                               1,
2150                               GNUNET_NO);
2151     GNUNET_STATISTICS_update (plugin->env->stats,
2152                               "# UDP, fragmented msgs, bytes payload, attempt",
2153                               frag_ctx->payload_size,
2154                               GNUNET_NO);
2155   }
2156   notify_session_monitor (s->plugin,
2157                           s,
2158                           GNUNET_TRANSPORT_SS_UP);
2159   schedule_select (plugin);
2160   return udpmlen;
2161 }
2162
2163
2164 /**
2165  * Our external IP address/port mapping has changed.
2166  *
2167  * @param cls closure, the `struct LocalAddrList`
2168  * @param add_remove #GNUNET_YES to mean the new public IP address, #GNUNET_NO to mean
2169  *     the previous (now invalid) one
2170  * @param addr either the previous or the new public IP address
2171  * @param addrlen actual lenght of the address
2172  */
2173 static void
2174 udp_nat_port_map_callback (void *cls,
2175                            int add_remove,
2176                            const struct sockaddr *addr,
2177                            socklen_t addrlen)
2178 {
2179   struct Plugin *plugin = cls;
2180   struct GNUNET_HELLO_Address *address;
2181   struct IPv4UdpAddress u4;
2182   struct IPv6UdpAddress u6;
2183   void *arg;
2184   size_t args;
2185
2186   LOG (GNUNET_ERROR_TYPE_INFO,
2187        "NAT notification to %s address `%s'\n",
2188        (GNUNET_YES == add_remove) ? "add" : "remove",
2189        GNUNET_a2s (addr, addrlen));
2190
2191   /* convert 'address' to our internal format */
2192   switch (addr->sa_family)
2193   {
2194   case AF_INET:
2195     GNUNET_assert(addrlen == sizeof(struct sockaddr_in));
2196     memset (&u4, 0, sizeof(u4));
2197     u4.options = htonl (plugin->myoptions);
2198     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
2199     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
2200     if (0 == ((struct sockaddr_in *) addr)->sin_port)
2201       return;
2202     arg = &u4;
2203     args = sizeof(struct IPv4UdpAddress);
2204     break;
2205   case AF_INET6:
2206     GNUNET_assert(addrlen == sizeof(struct sockaddr_in6));
2207     memset (&u6, 0, sizeof(u6));
2208     u6.options = htonl (plugin->myoptions);
2209     if (0 == ((struct sockaddr_in6 *) addr)->sin6_port)
2210       return;
2211     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
2212         sizeof(struct in6_addr));
2213     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
2214     arg = &u6;
2215     args = sizeof(struct IPv6UdpAddress);
2216     break;
2217   default:
2218     GNUNET_break(0);
2219     return;
2220   }
2221   /* modify our published address list */
2222   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
2223                                            PLUGIN_NAME,
2224                                            arg, args,
2225                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2226   plugin->env->notify_address (plugin->env->cls, add_remove, address);
2227   GNUNET_HELLO_address_free (address);
2228 }
2229
2230
2231 /**
2232  * Message tokenizer has broken up an incomming message. Pass it on
2233  * to the service.
2234  *
2235  * @param cls the `struct Plugin *`
2236  * @param client the `struct SourceInformation *`
2237  * @param hdr the actual message
2238  * @return #GNUNET_OK (always)
2239  */
2240 static int
2241 process_inbound_tokenized_messages (void *cls,
2242                                     void *client,
2243                                     const struct GNUNET_MessageHeader *hdr)
2244 {
2245   struct Plugin *plugin = cls;
2246   struct SourceInformation *si = client;
2247   struct GNUNET_TIME_Relative delay;
2248
2249   GNUNET_assert(si->session != NULL);
2250   if (GNUNET_YES == si->session->in_destroy)
2251     return GNUNET_OK;
2252   /* setup ATS */
2253   GNUNET_break (ntohl (si->session->ats.value) != GNUNET_ATS_NET_UNSPECIFIED);
2254   reschedule_session_timeout (si->session);
2255   delay = plugin->env->receive (plugin->env->cls,
2256                                 si->session->address,
2257                                 si->session,
2258                                 hdr);
2259   plugin->env->update_address_metrics (plugin->env->cls,
2260                                        si->session->address,
2261                                        si->session,
2262                                        &si->session->ats, 1);
2263   si->session->flow_delay_for_other_peer = delay;
2264   return GNUNET_OK;
2265 }
2266
2267
2268 /**
2269  * We've received a UDP Message.  Process it (pass contents to main service).
2270  *
2271  * @param plugin plugin context
2272  * @param msg the message
2273  * @param sender_addr sender address
2274  * @param sender_addr_len number of bytes in @a sender_addr
2275  */
2276 static void
2277 process_udp_message (struct Plugin *plugin,
2278                      const struct UDPMessage *msg,
2279                      const struct sockaddr *sender_addr,
2280                      socklen_t sender_addr_len)
2281 {
2282   struct SourceInformation si;
2283   struct Session * s;
2284   struct GNUNET_HELLO_Address *address;
2285   struct IPv4UdpAddress u4;
2286   struct IPv6UdpAddress u6;
2287   const void *arg;
2288   size_t args;
2289
2290   if (0 != ntohl (msg->reserved))
2291   {
2292     GNUNET_break_op(0);
2293     return;
2294   }
2295   if (ntohs (msg->header.size)
2296       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2297   {
2298     GNUNET_break_op(0);
2299     return;
2300   }
2301
2302   /* convert address */
2303   switch (sender_addr->sa_family)
2304   {
2305   case AF_INET:
2306     GNUNET_assert(sender_addr_len == sizeof(struct sockaddr_in));
2307     memset (&u4, 0, sizeof(u4));
2308     u6.options = htonl (0);
2309     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
2310     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
2311     arg = &u4;
2312     args = sizeof(u4);
2313     break;
2314   case AF_INET6:
2315     GNUNET_assert(sender_addr_len == sizeof(struct sockaddr_in6));
2316     memset (&u6, 0, sizeof(u6));
2317     u6.options = htonl (0);
2318     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
2319     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
2320     arg = &u6;
2321     args = sizeof(u6);
2322     break;
2323   default:
2324     GNUNET_break(0);
2325     return;
2326   }
2327   LOG(GNUNET_ERROR_TYPE_DEBUG,
2328       "Received message with %u bytes from peer `%s' at `%s'\n",
2329       (unsigned int ) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
2330       GNUNET_a2s (sender_addr, sender_addr_len));
2331
2332   address = GNUNET_HELLO_address_allocate ( &msg->sender, PLUGIN_NAME,
2333                                             arg, args,
2334                                             GNUNET_HELLO_ADDRESS_INFO_INBOUND);
2335   if (NULL == (s = udp_plugin_lookup_session (plugin, address)))
2336   {
2337     s = udp_plugin_create_session (plugin, address);
2338     plugin->env->session_start (NULL, address, s, NULL, 0);
2339   }
2340   GNUNET_free(address);
2341
2342   /* iterate over all embedded messages */
2343   si.session = s;
2344   si.sender = msg->sender;
2345   si.arg = arg;
2346   si.args = args;
2347   s->rc++;
2348   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
2349       ntohs (msg->header.size) - sizeof(struct UDPMessage), GNUNET_YES,
2350       GNUNET_NO);
2351   s->rc--;
2352   if ((0 == s->rc) && (GNUNET_YES == s->in_destroy))
2353     free_session (s);
2354 }
2355
2356
2357 /**
2358  * Process a defragmented message.
2359  *
2360  * @param cls the `struct DefragContext *`
2361  * @param msg the message
2362  */
2363 static void
2364 fragment_msg_proc (void *cls,
2365                    const struct GNUNET_MessageHeader *msg)
2366 {
2367   struct DefragContext *rc = cls;
2368
2369   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2370   {
2371     GNUNET_break(0);
2372     return;
2373   }
2374   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2375   {
2376     GNUNET_break(0);
2377     return;
2378   }
2379   process_udp_message (rc->plugin,
2380                        (const struct UDPMessage *) msg,
2381                        rc->src_addr,
2382                        rc->addr_len);
2383 }
2384
2385
2386 /**
2387  * Transmit an acknowledgement.
2388  *
2389  * @param cls the `struct DefragContext *`
2390  * @param id message ID (unused)
2391  * @param msg ack to transmit
2392  */
2393 static void
2394 ack_proc (void *cls,
2395           uint32_t id,
2396           const struct GNUNET_MessageHeader *msg)
2397 {
2398   struct DefragContext *rc = cls;
2399   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2400   struct UDP_ACK_Message *udp_ack;
2401   uint32_t delay = 0;
2402   struct UDP_MessageWrapper *udpw;
2403   struct Session *s;
2404   struct LookupContext l_ctx;
2405
2406   l_ctx.address = rc->src_addr;
2407   l_ctx.addr_len = rc->addr_len;
2408   l_ctx.must_have_frag_ctx = GNUNET_NO;
2409   l_ctx.res = NULL;
2410   GNUNET_CONTAINER_multipeermap_iterate (rc->plugin->sessions,
2411                                          &lookup_session_by_sockaddr_it,
2412                                          &l_ctx);
2413   s = l_ctx.res;
2414   if (NULL == s)
2415   {
2416     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2417         "Trying to transmit ACK to peer `%s' but not session found!\n",
2418         GNUNET_a2s(rc->src_addr, rc->addr_len));
2419
2420     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2421     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2422     GNUNET_free (rc);
2423
2424     return;
2425   }
2426   if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
2427     delay = s->flow_delay_for_other_peer.rel_value_us;
2428
2429   LOG (GNUNET_ERROR_TYPE_DEBUG,
2430        "Sending ACK to `%s' including delay of %s\n",
2431        GNUNET_a2s (rc->src_addr, (rc->src_addr->sa_family == AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct sockaddr_in6)),
2432        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2433                                                GNUNET_YES));
2434   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2435   udpw->msg_size = msize;
2436   udpw->payload_size = 0;
2437   udpw->session = s;
2438   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2439   udpw->msg_buf = (char *) &udpw[1];
2440   udpw->msg_type = UMT_MSG_ACK;
2441   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2442   udp_ack->header.size = htons ((uint16_t) msize);
2443   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2444   udp_ack->delay = htonl (delay);
2445   udp_ack->sender = *rc->plugin->env->my_identity;
2446   memcpy (&udp_ack[1], msg, ntohs (msg->size));
2447   enqueue (rc->plugin, udpw);
2448   notify_session_monitor (s->plugin,
2449                           s,
2450                           GNUNET_TRANSPORT_SS_UP);
2451   schedule_select (rc->plugin);
2452 }
2453
2454
2455 /**
2456  * FIXME.
2457  */
2458 static void
2459 read_process_msg (struct Plugin *plugin,
2460                   const struct GNUNET_MessageHeader *msg,
2461                   const struct sockaddr *addr,
2462                   socklen_t fromlen)
2463 {
2464   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2465   {
2466     GNUNET_break_op(0);
2467     return;
2468   }
2469   process_udp_message (plugin,
2470                        (const struct UDPMessage *) msg,
2471                        addr,
2472                        fromlen);
2473 }
2474
2475
2476 /**
2477  * FIXME.
2478  */
2479 static void
2480 read_process_ack (struct Plugin *plugin,
2481                   const struct GNUNET_MessageHeader *msg,
2482                   const struct sockaddr *addr,
2483                   socklen_t fromlen)
2484 {
2485   const struct GNUNET_MessageHeader *ack;
2486   const struct UDP_ACK_Message *udp_ack;
2487   struct LookupContext l_ctx;
2488   struct Session *s;
2489   struct GNUNET_TIME_Relative flow_delay;
2490
2491   if (ntohs (msg->size)
2492       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2493   {
2494     GNUNET_break_op(0);
2495     return;
2496   }
2497   udp_ack = (const struct UDP_ACK_Message *) msg;
2498
2499   /* Lookup session based on sockaddr */
2500   l_ctx.address = addr;
2501   l_ctx.addr_len = fromlen;
2502   l_ctx.res = NULL;
2503   l_ctx.must_have_frag_ctx = GNUNET_YES;
2504   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
2505       &lookup_session_by_sockaddr_it, &l_ctx);
2506   s = l_ctx.res;
2507   if ((NULL == s) || (NULL == s->frag_ctx))
2508   {
2509     return;
2510   }
2511
2512   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2513   LOG(GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %s\n",
2514       GNUNET_STRINGS_relative_time_to_string (flow_delay, GNUNET_YES));
2515   s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
2516
2517   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2518   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2519   {
2520     GNUNET_break_op(0);
2521     return;
2522   }
2523
2524   if (0
2525       != memcmp (&l_ctx.res->target, &udp_ack->sender,
2526           sizeof(struct GNUNET_PeerIdentity)))
2527     GNUNET_break(0);
2528   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
2529   {
2530     LOG(GNUNET_ERROR_TYPE_DEBUG,
2531         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2532         (unsigned int ) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2533         GNUNET_a2s (addr, fromlen));
2534     /* Expect more ACKs to arrive */
2535     return;
2536   }
2537
2538   LOG(GNUNET_ERROR_TYPE_DEBUG, "Message full ACK'ed\n",
2539       (unsigned int ) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2540       GNUNET_a2s (addr, fromlen));
2541
2542   /* Remove fragmented message after successful sending */
2543   fragmented_message_done (s->frag_ctx, GNUNET_OK);
2544 }
2545
2546
2547 /**
2548  * FIXME.
2549  */
2550 static void
2551 read_process_fragment (struct Plugin *plugin,
2552                        const struct GNUNET_MessageHeader *msg,
2553                        const struct sockaddr *addr,
2554                        socklen_t fromlen)
2555 {
2556   struct DefragContext *d_ctx;
2557   struct GNUNET_TIME_Absolute now;
2558   struct FindReceiveContext frc;
2559
2560   frc.rc = NULL;
2561   frc.addr = addr;
2562   frc.addr_len = fromlen;
2563
2564   LOG(GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
2565       (unsigned int ) ntohs (msg->size), GNUNET_a2s (addr, fromlen));
2566   /* Lookup existing receive context for this address */
2567   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2568       &find_receive_context, &frc);
2569   now = GNUNET_TIME_absolute_get ();
2570   d_ctx = frc.rc;
2571
2572   if (d_ctx == NULL )
2573   {
2574     /* Create a new defragmentation context */
2575     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
2576     memcpy (&d_ctx[1], addr, fromlen);
2577     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
2578     d_ctx->addr_len = fromlen;
2579     d_ctx->plugin = plugin;
2580     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2581         UDP_MTU, UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx, &fragment_msg_proc,
2582         &ack_proc);
2583     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
2584         (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2585     LOG(GNUNET_ERROR_TYPE_DEBUG,
2586         "Created new defragmentation context for %u-byte fragment from `%s'\n",
2587         (unsigned int ) ntohs (msg->size), GNUNET_a2s (addr, fromlen));
2588   }
2589   else
2590   {
2591     LOG(GNUNET_ERROR_TYPE_DEBUG,
2592         "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2593         (unsigned int ) ntohs (msg->size), GNUNET_a2s (addr, fromlen));
2594   }
2595
2596   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2597   {
2598     /* keep this 'rc' from expiring */
2599     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
2600         (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2601   }
2602   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2603   UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2604   {
2605     /* remove 'rc' that was inactive the longest */
2606     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2607     GNUNET_assert(NULL != d_ctx);
2608     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2609     GNUNET_free(d_ctx);
2610   }
2611 }
2612
2613
2614 /**
2615  * Read and process a message from the given socket.
2616  *
2617  * @param plugin the overall plugin
2618  * @param rsock socket to read from
2619  */
2620 static void
2621 udp_select_read (struct Plugin *plugin,
2622                  struct GNUNET_NETWORK_Handle *rsock)
2623 {
2624   socklen_t fromlen;
2625   struct sockaddr_storage addr;
2626   char buf[65536] GNUNET_ALIGN;
2627   ssize_t size;
2628   const struct GNUNET_MessageHeader *msg;
2629
2630   fromlen = sizeof(addr);
2631   memset (&addr, 0, sizeof(addr));
2632   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof(buf),
2633       (struct sockaddr *) &addr, &fromlen);
2634 #if MINGW
2635   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2636    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
2637    * on this socket has failed.
2638    * Quote from MSDN:
2639    *   WSAECONNRESET - The virtual circuit was reset by the remote side
2640    *   executing a hard or abortive close. The application should close
2641    *   the socket; it is no longer usable. On a UDP-datagram socket this
2642    *   error indicates a previous send operation resulted in an ICMP Port
2643    *   Unreachable message.
2644    */
2645   if ( (-1 == size) && (ECONNRESET == errno) )
2646   return;
2647 #endif
2648   if (-1 == size)
2649   {
2650     LOG(GNUNET_ERROR_TYPE_DEBUG, "UDP failed to receive data: %s\n",
2651         STRERROR (errno));
2652     /* Connection failure or something. Not a protocol violation. */
2653     return;
2654   }
2655   if (size < sizeof(struct GNUNET_MessageHeader))
2656   {
2657     LOG(GNUNET_ERROR_TYPE_WARNING,
2658         "UDP got %u bytes, which is not enough for a GNUnet message header\n",
2659         (unsigned int ) size);
2660     /* _MAY_ be a connection failure (got partial message) */
2661     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2662     GNUNET_break_op(0);
2663     return;
2664   }
2665   msg = (const struct GNUNET_MessageHeader *) buf;
2666
2667   LOG(GNUNET_ERROR_TYPE_DEBUG,
2668       "UDP received %u-byte message from `%s' type %u\n", (unsigned int ) size,
2669       GNUNET_a2s ((const struct sockaddr * ) &addr, fromlen),
2670       ntohs (msg->type));
2671
2672   if (size != ntohs (msg->size))
2673   {
2674     GNUNET_break_op(0);
2675     return;
2676   }
2677   GNUNET_STATISTICS_update (plugin->env->stats,
2678                             "# UDP, total, bytes, received",
2679                             size,
2680                             GNUNET_NO);
2681
2682   switch (ntohs (msg->type))
2683   {
2684   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2685     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
2686       udp_broadcast_receive (plugin, buf, size, (const struct sockaddr *) &addr,
2687           fromlen);
2688     return;
2689   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2690     read_process_msg (plugin, msg, (const struct sockaddr *) &addr, fromlen);
2691     return;
2692   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2693     read_process_ack (plugin, msg, (const struct sockaddr *) &addr, fromlen);
2694     return;
2695   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2696     read_process_fragment (plugin, msg, (const struct sockaddr *) &addr,
2697         fromlen);
2698     return;
2699   default:
2700     GNUNET_break_op(0);
2701     return;
2702   }
2703 }
2704
2705
2706 /**
2707  * FIXME.
2708  */
2709 static struct UDP_MessageWrapper *
2710 remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
2711                                     struct GNUNET_NETWORK_Handle *sock)
2712 {
2713   struct UDP_MessageWrapper *udpw = NULL;
2714   struct GNUNET_TIME_Relative remaining;
2715   struct Session *session;
2716   struct Plugin *plugin;
2717   int removed;
2718
2719   removed = GNUNET_NO;
2720   udpw = head;
2721   while (NULL != udpw)
2722   {
2723     session = udpw->session;
2724     plugin = session->plugin;
2725     /* Find messages with timeout */
2726     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2727     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2728     {
2729       /* Message timed out */
2730       switch (udpw->msg_type)
2731       {
2732       case UMT_MSG_UNFRAGMENTED:
2733         GNUNET_STATISTICS_update (plugin->env->stats,
2734                                   "# UDP, total, bytes, sent, timeout",
2735                                   udpw->msg_size,
2736                                   GNUNET_NO);
2737         GNUNET_STATISTICS_update (plugin->env->stats,
2738                                   "# UDP, total, messages, sent, timeout",
2739                                   1,
2740                                   GNUNET_NO);
2741         GNUNET_STATISTICS_update (plugin->env->stats,
2742                                   "# UDP, unfragmented msgs, messages, sent, timeout",
2743                                   1,
2744                                   GNUNET_NO);
2745         GNUNET_STATISTICS_update (plugin->env->stats,
2746                                   "# UDP, unfragmented msgs, bytes, sent, timeout",
2747                                   udpw->payload_size,
2748                                   GNUNET_NO);
2749         /* Not fragmented message */
2750         LOG (GNUNET_ERROR_TYPE_DEBUG,
2751              "Message for peer `%s' with size %u timed out\n",
2752              GNUNET_i2s (&udpw->session->target),
2753              udpw->payload_size);
2754         call_continuation (udpw, GNUNET_SYSERR);
2755         /* Remove message */
2756         removed = GNUNET_YES;
2757         dequeue (plugin, udpw);
2758         GNUNET_free(udpw);
2759         break;
2760       case UMT_MSG_FRAGMENTED:
2761         /* Fragmented message */
2762         GNUNET_STATISTICS_update (plugin->env->stats,
2763                                   "# UDP, total, bytes, sent, timeout",
2764                                   udpw->frag_ctx->on_wire_size,
2765                                   GNUNET_NO);
2766         GNUNET_STATISTICS_update (plugin->env->stats,
2767                                   "# UDP, total, messages, sent, timeout",
2768                                   1,
2769                                   GNUNET_NO);
2770         call_continuation (udpw, GNUNET_SYSERR);
2771         LOG (GNUNET_ERROR_TYPE_DEBUG,
2772              "Fragment for message for peer `%s' with size %u timed out\n",
2773              GNUNET_i2s (&udpw->session->target),
2774             udpw->frag_ctx->payload_size);
2775
2776         GNUNET_STATISTICS_update (plugin->env->stats,
2777                                   "# UDP, fragmented msgs, messages, sent, timeout",
2778                                   1,
2779                                   GNUNET_NO);
2780         GNUNET_STATISTICS_update (plugin->env->stats,
2781                                   "# UDP, fragmented msgs, bytes, sent, timeout",
2782                                   udpw->frag_ctx->payload_size,
2783                                   GNUNET_NO);
2784         /* Remove fragmented message due to timeout */
2785         fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
2786         break;
2787       case UMT_MSG_ACK:
2788         GNUNET_STATISTICS_update (plugin->env->stats,
2789                                   "# UDP, total, bytes, sent, timeout",
2790                                   udpw->msg_size,
2791                                   GNUNET_NO);
2792         GNUNET_STATISTICS_update (plugin->env->stats,
2793                                   "# UDP, total, messages, sent, timeout",
2794                                   1,
2795                                   GNUNET_NO);
2796         LOG (GNUNET_ERROR_TYPE_DEBUG,
2797              "ACK Message for peer `%s' with size %u timed out\n",
2798              GNUNET_i2s (&udpw->session->target),
2799              udpw->payload_size);
2800         call_continuation (udpw, GNUNET_SYSERR);
2801         removed = GNUNET_YES;
2802         dequeue (plugin, udpw);
2803         GNUNET_free(udpw);
2804         break;
2805       default:
2806         break;
2807       }
2808       if (sock == plugin->sockv4)
2809         udpw = plugin->ipv4_queue_head;
2810       else if (sock == plugin->sockv6)
2811         udpw = plugin->ipv6_queue_head;
2812       else
2813       {
2814         GNUNET_break(0); /* should never happen */
2815         udpw = NULL;
2816       }
2817       GNUNET_STATISTICS_update (plugin->env->stats,
2818                                 "# messages discarded due to timeout",
2819                                 1,
2820                                 GNUNET_NO);
2821     }
2822     else
2823     {
2824       /* Message did not time out, check flow delay */
2825       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2826       if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2827       {
2828         /* this message is not delayed */
2829         LOG (GNUNET_ERROR_TYPE_DEBUG,
2830              "Message for peer `%s' (%u bytes) is not delayed \n",
2831              GNUNET_i2s (&udpw->session->target),
2832              udpw->payload_size);
2833         break; /* Found message to send, break */
2834       }
2835       else
2836       {
2837         /* Message is delayed, try next */
2838         LOG (GNUNET_ERROR_TYPE_DEBUG,
2839              "Message for peer `%s' (%u bytes) is delayed for %s\n",
2840              GNUNET_i2s (&udpw->session->target), udpw->payload_size,
2841              GNUNET_STRINGS_relative_time_to_string (remaining, GNUNET_YES));
2842         udpw = udpw->next;
2843       }
2844     }
2845   }
2846   if (GNUNET_YES == removed)
2847     notify_session_monitor (session->plugin,
2848                             session,
2849                             GNUNET_TRANSPORT_SS_UP);
2850   return udpw;
2851 }
2852
2853
2854 /**
2855  * FIXME.
2856  */
2857 static void
2858 analyze_send_error (struct Plugin *plugin,
2859                     const struct sockaddr *sa,
2860                     socklen_t slen, int error)
2861 {
2862   struct GNUNET_ATS_Information type;
2863
2864   type = plugin->env->get_address_type (plugin->env->cls, sa, slen);
2865   if (((GNUNET_ATS_NET_LAN == ntohl (type.value))
2866       || (GNUNET_ATS_NET_WAN == ntohl (type.value)))
2867       && ((ENETUNREACH == errno)|| (ENETDOWN == errno)))
2868   {
2869     if (slen == sizeof (struct sockaddr_in))
2870     {
2871       /* IPv4: "Network unreachable" or "Network down"
2872        *
2873        * This indicates we do not have connectivity
2874        */
2875       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2876            _("UDP could not transmit message to `%s': "
2877              "Network seems down, please check your network configuration\n"),
2878            GNUNET_a2s (sa, slen));
2879     }
2880     if (slen == sizeof (struct sockaddr_in6))
2881     {
2882       /* IPv6: "Network unreachable" or "Network down"
2883        *
2884        * This indicates that this system is IPv6 enabled, but does not
2885        * have a valid global IPv6 address assigned or we do not have
2886        * connectivity
2887        */
2888       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2889            _("UDP could not transmit IPv6 message! "
2890              "Please check your network configuration and disable IPv6 if your "
2891              "connection does not have a global IPv6 address\n"));
2892     }
2893   }
2894   else
2895   {
2896     LOG (GNUNET_ERROR_TYPE_WARNING,
2897          "UDP could not transmit message to `%s': `%s'\n",
2898          GNUNET_a2s (sa, slen), STRERROR (error));
2899   }
2900 }
2901
2902
2903 /**
2904  * FIXME.
2905  */
2906 static size_t
2907 udp_select_send (struct Plugin *plugin,
2908                  struct GNUNET_NETWORK_Handle *sock)
2909 {
2910   ssize_t sent;
2911   socklen_t slen;
2912   struct sockaddr *a;
2913   const struct IPv4UdpAddress *u4;
2914   struct sockaddr_in a4;
2915   const struct IPv6UdpAddress *u6;
2916   struct sockaddr_in6 a6;
2917   struct UDP_MessageWrapper *udpw;
2918
2919   /* Find message to send */
2920   udpw = remove_timeout_messages_and_select ((sock == plugin->sockv4)
2921                                              ? plugin->ipv4_queue_head
2922                                              : plugin->ipv6_queue_head,
2923                                              sock);
2924   if (NULL == udpw)
2925     return 0; /* No message to send */
2926
2927   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
2928   {
2929     u4 = udpw->session->address->address;
2930     memset (&a4, 0, sizeof(a4));
2931     a4.sin_family = AF_INET;
2932 #if HAVE_SOCKADDR_IN_SIN_LEN
2933     a4.sin_len = sizeof (a4);
2934 #endif
2935     a4.sin_port = u4->u4_port;
2936     memcpy (&a4.sin_addr, &u4->ipv4_addr, sizeof(struct in_addr));
2937     a = (struct sockaddr *) &a4;
2938     slen = sizeof (a4);
2939   }
2940   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
2941   {
2942     u6 = udpw->session->address->address;
2943     memset (&a6, 0, sizeof(a6));
2944     a6.sin6_family = AF_INET6;
2945 #if HAVE_SOCKADDR_IN_SIN_LEN
2946     a6.sin6_len = sizeof (a6);
2947 #endif
2948     a6.sin6_port = u6->u6_port;
2949     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
2950     a = (struct sockaddr *) &a6;
2951     slen = sizeof (a6);
2952   }
2953   else
2954   {
2955     call_continuation (udpw, GNUNET_OK);
2956     dequeue (plugin, udpw);
2957     notify_session_monitor (plugin,
2958                             udpw->session,
2959                             GNUNET_TRANSPORT_SS_UP);
2960     GNUNET_free (udpw);
2961     return GNUNET_SYSERR;
2962   }
2963
2964   sent = GNUNET_NETWORK_socket_sendto (sock,
2965                                        udpw->msg_buf,
2966                                        udpw->msg_size,
2967                                        a,
2968                                        slen);
2969   if (GNUNET_SYSERR == sent)
2970   {
2971     /* Failure */
2972     analyze_send_error (plugin, a, slen, errno);
2973     call_continuation (udpw, GNUNET_SYSERR);
2974     GNUNET_STATISTICS_update (plugin->env->stats,
2975         "# UDP, total, bytes, sent, failure", sent, GNUNET_NO);
2976     GNUNET_STATISTICS_update (plugin->env->stats,
2977         "# UDP, total, messages, sent, failure", 1, GNUNET_NO);
2978   }
2979   else
2980   {
2981     /* Success */
2982     LOG(GNUNET_ERROR_TYPE_DEBUG,
2983         "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
2984         (unsigned int ) (udpw->msg_size), GNUNET_i2s (&udpw->session->target),
2985         GNUNET_a2s (a, slen), (int ) sent,
2986         (sent < 0) ? STRERROR (errno) : "ok");
2987     GNUNET_STATISTICS_update (plugin->env->stats,
2988         "# UDP, total, bytes, sent, success", sent, GNUNET_NO);
2989     GNUNET_STATISTICS_update (plugin->env->stats,
2990         "# UDP, total, messages, sent, success", 1, GNUNET_NO);
2991     if (NULL != udpw->frag_ctx)
2992       udpw->frag_ctx->on_wire_size += udpw->msg_size;
2993     call_continuation (udpw, GNUNET_OK);
2994   }
2995   dequeue (plugin, udpw);
2996   notify_session_monitor (plugin,
2997                           udpw->session,
2998                           GNUNET_TRANSPORT_SS_UP);
2999   GNUNET_free(udpw);
3000   return sent;
3001 }
3002
3003
3004 /**
3005  * We have been notified that our readset has something to read.  We don't
3006  * know which socket needs to be read, so we have to check each one
3007  * Then reschedule this function to be called again once more is available.
3008  *
3009  * @param cls the plugin handle
3010  * @param tc the scheduling context (for rescheduling this function again)
3011  */
3012 static void
3013 udp_plugin_select (void *cls,
3014                    const struct GNUNET_SCHEDULER_TaskContext *tc)
3015 {
3016   struct Plugin *plugin = cls;
3017
3018   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
3019   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3020     return;
3021   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
3022       && (NULL != plugin->sockv4)
3023       && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
3024     udp_select_read (plugin, plugin->sockv4);
3025   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
3026       && (NULL != plugin->sockv4) && (NULL != plugin->ipv4_queue_head)
3027       && (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
3028     udp_select_send (plugin, plugin->sockv4);
3029   schedule_select (plugin);
3030 }
3031
3032
3033 /**
3034  * We have been notified that our readset has something to read.  We don't
3035  * know which socket needs to be read, so we have to check each one
3036  * Then reschedule this function to be called again once more is available.
3037  *
3038  * @param cls the plugin handle
3039  * @param tc the scheduling context (for rescheduling this function again)
3040  */
3041 static void
3042 udp_plugin_select_v6 (void *cls,
3043                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3044 {
3045   struct Plugin *plugin = cls;
3046
3047   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
3048   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3049     return;
3050   if (((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
3051       && (NULL != plugin->sockv6)
3052       && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
3053     udp_select_read (plugin, plugin->sockv6);
3054   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
3055       && (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL )&&
3056       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )udp_select_send (plugin, plugin->sockv6);
3057   schedule_select (plugin);
3058 }
3059
3060
3061 /**
3062  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3063  *
3064  * @param plugin the plugin to initialize
3065  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3066  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3067  * @return number of sockets that were successfully bound
3068  */
3069 static int
3070 setup_sockets (struct Plugin *plugin,
3071                const struct sockaddr_in6 *bind_v6,
3072                const struct sockaddr_in *bind_v4)
3073 {
3074   int tries;
3075   int sockets_created = 0;
3076   struct sockaddr_in6 server_addrv6;
3077   struct sockaddr_in server_addrv4;
3078   struct sockaddr *server_addr;
3079   struct sockaddr *addrs[2];
3080   socklen_t addrlens[2];
3081   socklen_t addrlen;
3082   int eno;
3083
3084   /* Create IPv6 socket */
3085   eno = EINVAL;
3086   if (GNUNET_YES == plugin->enable_ipv6)
3087   {
3088     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
3089     if (NULL == plugin->sockv6)
3090     {
3091       LOG(GNUNET_ERROR_TYPE_WARNING,
3092           "Disabling IPv6 since it is not supported on this system!\n");
3093       plugin->enable_ipv6 = GNUNET_NO;
3094     }
3095     else
3096     {
3097       memset (&server_addrv6, '\0', sizeof(struct sockaddr_in6));
3098 #if HAVE_SOCKADDR_IN_SIN_LEN
3099       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3100 #endif
3101       server_addrv6.sin6_family = AF_INET6;
3102       if (NULL != bind_v6)
3103         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3104       else
3105         server_addrv6.sin6_addr = in6addr_any;
3106
3107       if (0 == plugin->port) /* autodetect */
3108         server_addrv6.sin6_port = htons (
3109             GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3110                 + 32000);
3111       else
3112         server_addrv6.sin6_port = htons (plugin->port);
3113       addrlen = sizeof(struct sockaddr_in6);
3114       server_addr = (struct sockaddr *) &server_addrv6;
3115
3116       tries = 0;
3117       while (tries < 10)
3118       {
3119         LOG(GNUNET_ERROR_TYPE_DEBUG,
3120             "Binding to IPv6 `%s'\n",
3121             GNUNET_a2s (server_addr, addrlen));
3122         /* binding */
3123         if (GNUNET_OK
3124             == GNUNET_NETWORK_socket_bind (plugin->sockv6, server_addr,
3125                 addrlen))
3126           break;
3127         eno = errno;
3128         if (0 != plugin->port)
3129         {
3130           tries = 10; /* fail */
3131           break; /* bind failed on specific port */
3132         }
3133         /* autodetect */
3134         server_addrv6.sin6_port = htons (
3135             GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3136                 + 32000);
3137         tries++;
3138       }
3139       if (tries >= 10)
3140       {
3141         GNUNET_NETWORK_socket_close (plugin->sockv6);
3142         plugin->enable_ipv6 = GNUNET_NO;
3143         plugin->sockv6 = NULL;
3144       }
3145
3146       if (plugin->sockv6 != NULL )
3147       {
3148         LOG (GNUNET_ERROR_TYPE_DEBUG,
3149              "IPv6 socket created on port %s\n",
3150              GNUNET_a2s (server_addr, addrlen));
3151         addrs[sockets_created] = (struct sockaddr *) &server_addrv6;
3152         addrlens[sockets_created] = sizeof(struct sockaddr_in6);
3153         sockets_created++;
3154       }
3155       else
3156       {
3157         LOG (GNUNET_ERROR_TYPE_ERROR,
3158              "Failed to bind UDP socket to %s: %s\n",
3159              GNUNET_a2s (server_addr, addrlen),
3160              STRERROR (eno));
3161       }
3162     }
3163   }
3164
3165   /* Create IPv4 socket */
3166   eno = EINVAL;
3167   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
3168   if (NULL == plugin->sockv4)
3169   {
3170     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3171                          "socket");
3172     LOG(GNUNET_ERROR_TYPE_WARNING,
3173         "Disabling IPv4 since it is not supported on this system!\n");
3174     plugin->enable_ipv4 = GNUNET_NO;
3175   }
3176   else
3177   {
3178     memset (&server_addrv4, '\0', sizeof(struct sockaddr_in));
3179 #if HAVE_SOCKADDR_IN_SIN_LEN
3180     server_addrv4.sin_len = sizeof (struct sockaddr_in);
3181 #endif
3182     server_addrv4.sin_family = AF_INET;
3183     if (NULL != bind_v4)
3184       server_addrv4.sin_addr = bind_v4->sin_addr;
3185     else
3186       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3187
3188     if (0 == plugin->port)
3189       /* autodetect */
3190       server_addrv4.sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3191                                                                 33537)
3192                                       + 32000);
3193     else
3194       server_addrv4.sin_port = htons (plugin->port);
3195
3196     addrlen = sizeof(struct sockaddr_in);
3197     server_addr = (struct sockaddr *) &server_addrv4;
3198
3199     tries = 0;
3200     while (tries < 10)
3201     {
3202       LOG (GNUNET_ERROR_TYPE_DEBUG,
3203            "Binding to IPv4 `%s'\n",
3204            GNUNET_a2s (server_addr, addrlen));
3205
3206       /* binding */
3207       if (GNUNET_OK
3208           == GNUNET_NETWORK_socket_bind (plugin->sockv4, server_addr, addrlen))
3209         break;
3210       eno = errno;
3211       if (0 != plugin->port)
3212       {
3213         tries = 10; /* fail */
3214         break; /* bind failed on specific port */
3215       }
3216
3217       /* autodetect */
3218       server_addrv4.sin_port = htons (
3219           GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3220               + 32000);
3221       tries++;
3222     }
3223
3224     if (tries >= 10)
3225     {
3226       GNUNET_NETWORK_socket_close (plugin->sockv4);
3227       plugin->enable_ipv4 = GNUNET_NO;
3228       plugin->sockv4 = NULL;
3229     }
3230
3231     if (NULL != plugin->sockv4)
3232     {
3233       LOG(GNUNET_ERROR_TYPE_DEBUG, "IPv4 socket created on port %s\n",
3234           GNUNET_a2s (server_addr, addrlen));
3235       addrs[sockets_created] = (struct sockaddr *) &server_addrv4;
3236       addrlens[sockets_created] = sizeof(struct sockaddr_in);
3237       sockets_created++;
3238     }
3239     else
3240     {
3241       LOG (GNUNET_ERROR_TYPE_ERROR,
3242            _("Failed to bind UDP socket to %s: %s\n"),
3243            GNUNET_a2s (server_addr, addrlen),
3244            STRERROR (eno));
3245     }
3246   }
3247
3248   if (0 == sockets_created)
3249   {
3250     LOG(GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
3251     return 0; /* No sockets created, return */
3252   }
3253
3254   /* Create file descriptors */
3255   if (plugin->enable_ipv4 == GNUNET_YES)
3256   {
3257     plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
3258     plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
3259     GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
3260     GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
3261     if (NULL != plugin->sockv4)
3262     {
3263       GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
3264       GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
3265     }
3266   }
3267
3268   if (plugin->enable_ipv6 == GNUNET_YES)
3269   {
3270     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
3271     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
3272     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
3273     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
3274     if (NULL != plugin->sockv6)
3275     {
3276       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
3277       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
3278     }
3279   }
3280
3281   schedule_select (plugin);
3282   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3283                                      GNUNET_NO,
3284                                      plugin->port,
3285                                      sockets_created,
3286                                      (const struct sockaddr **) addrs,
3287                                      addrlens,
3288                                      &udp_nat_port_map_callback,
3289                                      NULL,
3290                                      plugin);
3291
3292   return sockets_created;
3293 }
3294
3295
3296 /**
3297  * Return information about the given session to the
3298  * monitor callback.
3299  *
3300  * @param cls the `struct Plugin` with the monitor callback (`sic`)
3301  * @param peer peer we send information about
3302  * @param value our `struct Session` to send information about
3303  * @return #GNUNET_OK (continue to iterate)
3304  */
3305 static int
3306 send_session_info_iter (void *cls,
3307                         const struct GNUNET_PeerIdentity *peer,
3308                         void *value)
3309 {
3310   struct Plugin *plugin = cls;
3311   struct Session *session = value;
3312
3313   notify_session_monitor (plugin,
3314                           session,
3315                           GNUNET_TRANSPORT_SS_UP);
3316   return GNUNET_OK;
3317 }
3318
3319
3320 /**
3321  * Begin monitoring sessions of a plugin.  There can only
3322  * be one active monitor per plugin (i.e. if there are
3323  * multiple monitors, the transport service needs to
3324  * multiplex the generated events over all of them).
3325  *
3326  * @param cls closure of the plugin
3327  * @param sic callback to invoke, NULL to disable monitor;
3328  *            plugin will being by iterating over all active
3329  *            sessions immediately and then enter monitor mode
3330  * @param sic_cls closure for @a sic
3331  */
3332 static void
3333 udp_plugin_setup_monitor (void *cls,
3334                           GNUNET_TRANSPORT_SessionInfoCallback sic,
3335                           void *sic_cls)
3336 {
3337   struct Plugin *plugin = cls;
3338
3339   plugin->sic = sic;
3340   plugin->sic_cls = sic_cls;
3341   if (NULL != sic)
3342   {
3343     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3344                                            &send_session_info_iter,
3345                                            plugin);
3346     /* signal end of first iteration */
3347     sic (sic_cls, NULL, NULL);
3348   }
3349 }
3350
3351
3352 /**
3353  * The exported method. Makes the core api available via a global and
3354  * returns the udp transport API.
3355  *
3356  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3357  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3358  */
3359 void *
3360 libgnunet_plugin_transport_udp_init (void *cls)
3361 {
3362   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3363   struct GNUNET_TRANSPORT_PluginFunctions *api;
3364   struct Plugin *p;
3365   unsigned long long port;
3366   unsigned long long aport;
3367   unsigned long long udp_max_bps;
3368   unsigned long long enable_v6;
3369   unsigned long long enable_broadcasting;
3370   unsigned long long enable_broadcasting_recv;
3371   char *bind4_address;
3372   char *bind6_address;
3373   char *fancy_interval;
3374   struct GNUNET_TIME_Relative interval;
3375   struct sockaddr_in server_addrv4;
3376   struct sockaddr_in6 server_addrv6;
3377   int res;
3378   int have_bind4;
3379   int have_bind6;
3380
3381   if (NULL == env->receive)
3382   {
3383     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3384      initialze the plugin or the API */
3385     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3386     api->cls = NULL;
3387     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3388     api->address_to_string = &udp_address_to_string;
3389     api->string_to_address = &udp_string_to_address;
3390     return api;
3391   }
3392
3393   /* Get port number: port == 0 : autodetect a port,
3394    * > 0 : use this port, not given : 2086 default */
3395   if (GNUNET_OK !=
3396       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
3397                                              "PORT", &port))
3398     port = 2086;
3399   if (GNUNET_OK !=
3400       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
3401                                              "ADVERTISED_PORT", &aport))
3402     aport = port;
3403   if (port > 65535)
3404   {
3405     LOG (GNUNET_ERROR_TYPE_WARNING,
3406          _("Given `%s' option is out of range: %llu > %u\n"),
3407          "PORT", port,
3408          65535);
3409     return NULL;
3410   }
3411
3412   /* Protocols */
3413   if (GNUNET_YES ==
3414       GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat", "DISABLEV6"))
3415     enable_v6 = GNUNET_NO;
3416   else
3417     enable_v6 = GNUNET_YES;
3418
3419   /* Addresses */
3420   have_bind4 = GNUNET_NO;
3421   memset (&server_addrv4, 0, sizeof(server_addrv4));
3422   if (GNUNET_YES ==
3423       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3424                                              "BINDTO", &bind4_address))
3425   {
3426     LOG (GNUNET_ERROR_TYPE_DEBUG,
3427          "Binding udp plugin to specific address: `%s'\n",
3428          bind4_address);
3429     if (1 != inet_pton (AF_INET,
3430                         bind4_address,
3431                         &server_addrv4.sin_addr))
3432     {
3433       GNUNET_free (bind4_address);
3434       return NULL;
3435     }
3436     have_bind4 = GNUNET_YES;
3437   }
3438   GNUNET_free_non_null(bind4_address);
3439   have_bind6 = GNUNET_NO;
3440   memset (&server_addrv6, 0, sizeof(server_addrv6));
3441   if (GNUNET_YES ==
3442       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3443                                              "BINDTO6", &bind6_address))
3444   {
3445     LOG (GNUNET_ERROR_TYPE_DEBUG,
3446          "Binding udp plugin to specific address: `%s'\n",
3447          bind6_address);
3448     if (1 != inet_pton (AF_INET6,
3449                         bind6_address,
3450                         &server_addrv6.sin6_addr))
3451     {
3452       LOG (GNUNET_ERROR_TYPE_ERROR,
3453            _("Invalid IPv6 address: `%s'\n"),
3454            bind6_address);
3455       GNUNET_free (bind6_address);
3456       return NULL;
3457     }
3458     have_bind6 = GNUNET_YES;
3459   }
3460   GNUNET_free_non_null (bind6_address);
3461
3462   /* Enable neighbour discovery */
3463   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3464       "transport-udp", "BROADCAST");
3465   if (enable_broadcasting == GNUNET_SYSERR)
3466     enable_broadcasting = GNUNET_NO;
3467
3468   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3469       "transport-udp", "BROADCAST_RECEIVE");
3470   if (enable_broadcasting_recv == GNUNET_SYSERR)
3471     enable_broadcasting_recv = GNUNET_YES;
3472
3473   if (GNUNET_SYSERR ==
3474       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3475                                              "BROADCAST_INTERVAL",
3476                                              &fancy_interval))
3477   {
3478     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
3479   }
3480   else
3481   {
3482     if (GNUNET_SYSERR ==
3483         GNUNET_STRINGS_fancy_time_to_relative (fancy_interval, &interval))
3484     {
3485       interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
3486     }
3487     GNUNET_free(fancy_interval);
3488   }
3489
3490   /* Maximum datarate */
3491   if (GNUNET_OK !=
3492       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
3493                                              "MAX_BPS", &udp_max_bps))
3494   {
3495     udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */
3496   }
3497
3498   p = GNUNET_new (struct Plugin);
3499   p->port = port;
3500   p->aport = aport;
3501   p->broadcast_interval = interval;
3502   p->enable_ipv6 = enable_v6;
3503   p->enable_ipv4 = GNUNET_YES; /* default */
3504   p->enable_broadcasting = enable_broadcasting;
3505   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3506   p->env = env;
3507   p->sessions = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
3508   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (
3509       GNUNET_CONTAINER_HEAP_ORDER_MIN);
3510   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
3511   GNUNET_BANDWIDTH_tracker_init (&p->tracker, NULL, NULL,
3512       GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps), 30);
3513   LOG(GNUNET_ERROR_TYPE_DEBUG,
3514       "Setting up sockets\n");
3515   res = setup_sockets (p,
3516                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3517                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3518   if ((res == 0) || ((p->sockv4 == NULL )&& (p->sockv6 == NULL)))
3519   {
3520     LOG (GNUNET_ERROR_TYPE_ERROR,
3521         _("Failed to create network sockets, plugin failed\n"));
3522     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3523     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3524     GNUNET_SERVER_mst_destroy (p->mst);
3525     GNUNET_free (p);
3526     return NULL;
3527   }
3528
3529   /* Setup broadcasting and receiving beacons */
3530   setup_broadcast (p, &server_addrv6, &server_addrv4);
3531
3532   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3533   api->cls = p;
3534   api->send = NULL;
3535   api->disconnect_session = &udp_disconnect_session;
3536   api->query_keepalive_factor = &udp_query_keepalive_factor;
3537   api->disconnect_peer = &udp_disconnect;
3538   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3539   api->address_to_string = &udp_address_to_string;
3540   api->string_to_address = &udp_string_to_address;
3541   api->check_address = &udp_plugin_check_address;
3542   api->get_session = &udp_plugin_get_session;
3543   api->send = &udp_plugin_send;
3544   api->get_network = &udp_get_network;
3545   api->update_session_timeout = &udp_plugin_update_session_timeout;
3546   api->setup_monitor = &udp_plugin_setup_monitor;
3547   return api;
3548 }
3549
3550
3551 /**
3552  * Function called on each entry in the defragmentation heap to
3553  * clean it up.
3554  *
3555  * @param cls NULL
3556  * @param node node in the heap (to be removed)
3557  * @param element a `struct DefragContext` to be cleaned up
3558  * @param cost unused
3559  * @return #GNUNET_YES
3560  */
3561 static int
3562 heap_cleanup_iterator (void *cls,
3563                        struct GNUNET_CONTAINER_HeapNode *node,
3564                        void *element,
3565                        GNUNET_CONTAINER_HeapCostType cost)
3566 {
3567   struct DefragContext *d_ctx = element;
3568
3569   GNUNET_CONTAINER_heap_remove_node (node);
3570   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3571   GNUNET_free (d_ctx);
3572   return GNUNET_YES;
3573 }
3574
3575
3576 /**
3577  * The exported method. Makes the core api available via a global and
3578  * returns the udp transport API.
3579  *
3580  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3581  * @return NULL
3582  */
3583 void *
3584 libgnunet_plugin_transport_udp_done (void *cls)
3585 {
3586   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3587   struct Plugin *plugin = api->cls;
3588   struct PrettyPrinterContext *cur;
3589   struct PrettyPrinterContext *next;
3590   struct UDP_MessageWrapper *udpw;
3591
3592   if (NULL == plugin)
3593   {
3594     GNUNET_free(api);
3595     return NULL;
3596   }
3597   stop_broadcast (plugin);
3598   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK )
3599   {
3600     GNUNET_SCHEDULER_cancel (plugin->select_task);
3601     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
3602   }
3603   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK )
3604   {
3605     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3606     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
3607   }
3608
3609   /* Closing sockets */
3610   if (GNUNET_YES == plugin->enable_ipv4)
3611   {
3612     if (NULL != plugin->sockv4)
3613     {
3614       GNUNET_break (GNUNET_OK ==
3615                     GNUNET_NETWORK_socket_close (plugin->sockv4));
3616       plugin->sockv4 = NULL;
3617     }
3618     GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
3619     GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
3620   }
3621   if (GNUNET_YES == plugin->enable_ipv6)
3622   {
3623     if (NULL != plugin->sockv6)
3624     {
3625       GNUNET_break (GNUNET_OK ==
3626                     GNUNET_NETWORK_socket_close (plugin->sockv6));
3627       plugin->sockv6 = NULL;
3628
3629       GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
3630       GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
3631     }
3632   }
3633   if (NULL != plugin->nat)
3634   {
3635     GNUNET_NAT_unregister (plugin->nat);
3636     plugin->nat = NULL;
3637   }
3638   if (NULL != plugin->defrag_ctxs)
3639   {
3640     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
3641                                    &heap_cleanup_iterator, NULL);
3642     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
3643     plugin->defrag_ctxs = NULL;
3644   }
3645   if (NULL != plugin->mst)
3646   {
3647     GNUNET_SERVER_mst_destroy (plugin->mst);
3648     plugin->mst = NULL;
3649   }
3650
3651   /* Clean up leftover messages */
3652   udpw = plugin->ipv4_queue_head;
3653   while (NULL != udpw)
3654   {
3655     struct UDP_MessageWrapper *tmp = udpw->next;
3656     dequeue (plugin, udpw);
3657     call_continuation (udpw, GNUNET_SYSERR);
3658     GNUNET_free(udpw);
3659     udpw = tmp;
3660   }
3661   udpw = plugin->ipv6_queue_head;
3662   while (NULL != udpw)
3663   {
3664     struct UDP_MessageWrapper *tmp = udpw->next;
3665     dequeue (plugin, udpw);
3666     call_continuation (udpw, GNUNET_SYSERR);
3667     GNUNET_free(udpw);
3668     udpw = tmp;
3669   }
3670
3671   /* Clean up sessions */
3672   LOG (GNUNET_ERROR_TYPE_DEBUG,
3673        "Cleaning up sessions\n");
3674   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3675                                          &disconnect_and_free_it, plugin);
3676   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
3677
3678   next = plugin->ppc_dll_head;
3679   for (cur = next; NULL != cur; cur = next)
3680   {
3681     GNUNET_break(0);
3682     next = cur->next;
3683     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
3684                                  plugin->ppc_dll_tail,
3685                                  cur);
3686     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
3687     GNUNET_free (cur);
3688   }
3689   GNUNET_free (plugin);
3690   GNUNET_free (api);
3691   return NULL;
3692 }
3693
3694 /* end of plugin_transport_udp.c */