- client-side implementation of peer queries
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  (C) 2010-2013 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
46
47 /**
48  * Number of messages we can defragment in parallel.  We only really
49  * defragment 1 message at a time, but if messages get re-ordered, we
50  * may want to keep knowledge about the previous message to avoid
51  * discarding the current message in favor of a single fragment of a
52  * previous message.  3 should be good since we don't expect massive
53  * message reorderings with UDP.
54  */
55 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
56
57 /**
58  * We keep a defragmentation queue per sender address.  How many
59  * sender addresses do we support at the same time? Memory consumption
60  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
61  * value. (So 128 corresponds to 12 MB and should suffice for
62  * connecting to roughly 128 peers via UDP).
63  */
64 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
65
66 /**
67  * Running pretty printers: head
68  */
69 static struct PrettyPrinterContext *ppc_dll_head;
70
71 /**
72  * Running pretty printers: tail
73  */
74 static struct PrettyPrinterContext *ppc_dll_tail;
75
76 /**
77  * Closure for 'append_port'.
78  */
79 struct PrettyPrinterContext
80 {
81   /**
82    * DLL
83    */
84   struct PrettyPrinterContext *next;
85
86   /**
87    * DLL
88    */
89   struct PrettyPrinterContext *prev;
90
91   /**
92    * Timeout task
93    */
94   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
95
96   /**
97    * Resolver handle
98    */
99   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
100
101   /**
102    * Function to call with the result.
103    */
104   GNUNET_TRANSPORT_AddressStringCallback asc;
105
106   /**
107    * Clsoure for 'asc'.
108    */
109   void *asc_cls;
110
111   /**
112    * Port to add after the IP address.
113    */
114   uint16_t port;
115
116   /**
117    * IPv6 address
118    */
119
120   int ipv6;
121
122   /**
123    * Options
124    */
125   uint32_t options;
126 };
127
128 enum UDP_MessageType
129 {
130   UNDEFINED = 0,
131   MSG_FRAGMENTED = 1,
132   MSG_FRAGMENTED_COMPLETE = 2,
133   MSG_UNFRAGMENTED = 3,
134   MSG_ACK = 4,
135   MSG_BEACON = 5
136 };
137
138 struct Session
139 {
140   /**
141    * Which peer is this session for?
142    */
143   struct GNUNET_PeerIdentity target;
144
145   /**
146    * Plugin this session belongs to.
147    */
148   struct Plugin *plugin;
149
150   /**
151    * Context for dealing with fragments.
152    */
153   struct UDP_FragmentationContext *frag_ctx;
154
155   /**
156    * Desired delay for next sending we send to other peer
157    */
158   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
159
160   /**
161    * Desired delay for next sending we received from other peer
162    */
163   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
164
165   /**
166    * Session timeout task
167    */
168   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
169
170   /**
171    * expected delay for ACKs
172    */
173   struct GNUNET_TIME_Relative last_expected_ack_delay;
174
175   /**
176    * desired delay between UDP messages
177    */
178   struct GNUNET_TIME_Relative last_expected_msg_delay;
179
180   struct GNUNET_ATS_Information ats;
181
182   struct GNUNET_HELLO_Address *address;
183
184   /**
185    * Reference counter to indicate that this session is
186    * currently being used and must not be destroyed;
187    * setting @e in_destroy will destroy it as soon as
188    * possible.
189    */
190   unsigned int rc;
191
192   /**
193    * Is this session about to be destroyed (sometimes we cannot
194    * destroy a session immediately as below us on the stack
195    * there might be code that still uses it; in this case,
196    * @e rc is non-zero).
197    */
198   int in_destroy;
199 };
200
201 /**
202  * Closure for #session_cmp_it().
203  */
204 struct SessionCompareContext
205 {
206   struct Session *res;
207   const struct GNUNET_HELLO_Address *address;
208 };
209
210 /**
211  * Closure for #process_inbound_tokenized_messages().
212  */
213 struct SourceInformation
214 {
215   /**
216    * Sender identity.
217    */
218   struct GNUNET_PeerIdentity sender;
219
220   /**
221    * Source address.
222    */
223   const void *arg;
224
225   /**
226    * Associated session.
227    */
228   struct Session *session;
229
230   /**
231    * Number of bytes in source address.
232    */
233   size_t args;
234
235 };
236
237 /**
238  * Closure for #find_receive_context().
239  */
240 struct FindReceiveContext
241 {
242   /**
243    * Where to store the result.
244    */
245   struct DefragContext *rc;
246
247   /**
248    * Address to find.
249    */
250   const struct sockaddr *addr;
251
252   struct Session *session;
253
254   /**
255    * Number of bytes in @e addr.
256    */
257   socklen_t addr_len;
258
259 };
260
261 /**
262  * Data structure to track defragmentation contexts based
263  * on the source of the UDP traffic.
264  */
265 struct DefragContext
266 {
267
268   /**
269    * Defragmentation context.
270    */
271   struct GNUNET_DEFRAGMENT_Context *defrag;
272
273   /**
274    * Source address this receive context is for (allocated at the
275    * end of the struct).
276    */
277   const struct sockaddr *src_addr;
278
279   /**
280    * Reference to master plugin struct.
281    */
282   struct Plugin *plugin;
283
284   /**
285    * Node in the defrag heap.
286    */
287   struct GNUNET_CONTAINER_HeapNode *hnode;
288
289   /**
290    * Length of 'src_addr'
291    */
292   size_t addr_len;
293 };
294
295 /**
296  * Context to send fragmented messages
297  */
298 struct UDP_FragmentationContext
299 {
300   /**
301    * Next in linked list
302    */
303   struct UDP_FragmentationContext *next;
304
305   /**
306    * Previous in linked list
307    */
308   struct UDP_FragmentationContext *prev;
309
310   /**
311    * The plugin
312    */
313   struct Plugin *plugin;
314
315   /**
316    * Handle for GNUNET_FRAGMENT context
317    */
318   struct GNUNET_FRAGMENT_Context *frag;
319
320   /**
321    * The session this fragmentation context belongs to
322    */
323   struct Session *session;
324
325   /**
326    * Function to call upon completion of the transmission.
327    */
328   GNUNET_TRANSPORT_TransmitContinuation cont;
329
330   /**
331    * Closure for @e cont.
332    */
333   void *cont_cls;
334
335   /**
336    * Message timeout
337    */
338   struct GNUNET_TIME_Absolute timeout;
339
340   /**
341    * Payload size of original unfragmented message
342    */
343   size_t payload_size;
344
345   /**
346    * Bytes used to send all fragments on wire including UDP overhead
347    */
348   size_t on_wire_size;
349
350   unsigned int fragments_used;
351
352 };
353
354 struct UDP_MessageWrapper
355 {
356   /**
357    * Session this message belongs to
358    */
359   struct Session *session;
360
361   /**
362    * DLL of messages
363    * previous element
364    */
365   struct UDP_MessageWrapper *prev;
366
367   /**
368    * DLL of messages
369    * previous element
370    */
371   struct UDP_MessageWrapper *next;
372
373   /**
374    * Message type
375    * According to UDP_MessageType
376    */
377   int msg_type;
378
379   /**
380    * Message with size msg_size including UDP specific overhead
381    */
382   char *msg_buf;
383
384   /**
385    * Size of UDP message to send including UDP specific overhead
386    */
387   size_t msg_size;
388
389   /**
390    * Payload size of original message
391    */
392   size_t payload_size;
393
394   /**
395    * Message timeout
396    */
397   struct GNUNET_TIME_Absolute timeout;
398
399   /**
400    * Function to call upon completion of the transmission.
401    */
402   GNUNET_TRANSPORT_TransmitContinuation cont;
403
404   /**
405    * Closure for 'cont'.
406    */
407   void *cont_cls;
408
409   /**
410    * Fragmentation context
411    * frag_ctx == NULL if transport <= MTU
412    * frag_ctx != NULL if transport > MTU
413    */
414   struct UDP_FragmentationContext *frag_ctx;
415 };
416
417 /**
418  * UDP ACK Message-Packet header (after defragmentation).
419  */
420 struct UDP_ACK_Message
421 {
422   /**
423    * Message header.
424    */
425   struct GNUNET_MessageHeader header;
426
427   /**
428    * Desired delay for flow control
429    */
430   uint32_t delay;
431
432   /**
433    * What is the identity of the sender
434    */
435   struct GNUNET_PeerIdentity sender;
436
437 };
438
439 /**
440  * Address options
441  */
442 static uint32_t myoptions;
443
444 /**
445  * Encapsulation of all of the state of the plugin.
446  */
447 struct Plugin * plugin;
448
449 /**
450  * We have been notified that our readset has something to read.  We don't
451  * know which socket needs to be read, so we have to check each one
452  * Then reschedule this function to be called again once more is available.
453  *
454  * @param cls the plugin handle
455  * @param tc the scheduling context (for rescheduling this function again)
456  */
457 static void
458 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
459
460 /**
461  * We have been notified that our readset has something to read.  We don't
462  * know which socket needs to be read, so we have to check each one
463  * Then reschedule this function to be called again once more is available.
464  *
465  * @param cls the plugin handle
466  * @param tc the scheduling context (for rescheduling this function again)
467  */
468 static void
469 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
470
471 /**
472  * (re)schedule select tasks for this plugin.
473  *
474  * @param plugin plugin to reschedule
475  */
476 static void
477 schedule_select (struct Plugin *plugin)
478 {
479   struct GNUNET_TIME_Relative min_delay;
480   struct UDP_MessageWrapper *udpw;
481
482   if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
483   {
484     /* Find a message ready to send:
485      * Flow delay from other peer is expired or not set (0) */
486     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
487     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
488       min_delay = GNUNET_TIME_relative_min (min_delay,
489           GNUNET_TIME_absolute_get_remaining (
490               udpw->session->flow_delay_from_other_peer));
491
492     if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK )
493       GNUNET_SCHEDULER_cancel (plugin->select_task);
494
495     /* Schedule with:
496      * - write active set if message is ready
497      * - timeout minimum delay */
498     plugin->select_task = GNUNET_SCHEDULER_add_select (
499         GNUNET_SCHEDULER_PRIORITY_DEFAULT,
500         (0 == min_delay.rel_value_us) ?
501             GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v4,
502         (0 == min_delay.rel_value_us) ? plugin->ws_v4 : NULL,
503         &udp_plugin_select, plugin);
504   }
505   if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
506   {
507     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
508     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
509       min_delay = GNUNET_TIME_relative_min (min_delay,
510           GNUNET_TIME_absolute_get_remaining (
511               udpw->session->flow_delay_from_other_peer));
512
513     if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task_v6)
514       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
515     plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (
516         GNUNET_SCHEDULER_PRIORITY_DEFAULT,
517         (0 == min_delay.rel_value_us) ?
518             GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v6,
519         (0 == min_delay.rel_value_us) ? plugin->ws_v6 : NULL,
520         &udp_plugin_select_v6, plugin);
521   }
522 }
523
524 /**
525  * Function called for a quick conversion of the binary address to
526  * a numeric address.  Note that the caller must not free the
527  * address and that the next call to this function is allowed
528  * to override the address again.
529  *
530  * @param cls closure
531  * @param addr binary address
532  * @param addrlen length of the address
533  * @return string representing the same address
534  */
535 const char *
536 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
537 {
538   static char rbuf[INET6_ADDRSTRLEN + 10];
539   char buf[INET6_ADDRSTRLEN];
540   const void *sb;
541   struct in_addr a4;
542   struct in6_addr a6;
543   const struct IPv4UdpAddress *t4;
544   const struct IPv6UdpAddress *t6;
545   int af;
546   uint16_t port;
547   uint32_t options;
548
549   if ((NULL != addr) && (addrlen == sizeof(struct IPv6UdpAddress)))
550   {
551     t6 = addr;
552     af = AF_INET6;
553     options = ntohl (t6->options);
554     port = ntohs (t6->u6_port);
555     memcpy (&a6, &t6->ipv6_addr, sizeof(a6));
556     sb = &a6;
557   }
558   else if ((NULL != addr) && (addrlen == sizeof(struct IPv4UdpAddress)))
559   {
560     t4 = addr;
561     af = AF_INET;
562     options = ntohl (t4->options);
563     port = ntohs (t4->u4_port);
564     memcpy (&a4, &t4->ipv4_addr, sizeof(a4));
565     sb = &a4;
566   }
567   else
568   {
569     return NULL ;
570   }
571   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
572
573   GNUNET_snprintf (rbuf, sizeof(rbuf),
574       (af == AF_INET6) ? "%s.%u.[%s]:%u" : "%s.%u.%s:%u", PLUGIN_NAME, options,
575       buf, port);
576   return rbuf;
577 }
578
579 /**
580  * Function called to convert a string address to
581  * a binary address.
582  *
583  * @param cls closure ('struct Plugin*')
584  * @param addr string address
585  * @param addrlen length of the address
586  * @param buf location to store the buffer
587  * @param added location to store the number of bytes in the buffer.
588  *        If the function returns GNUNET_SYSERR, its contents are undefined.
589  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
590  */
591 static int
592 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
593     void **buf, size_t *added)
594 {
595   struct sockaddr_storage socket_address;
596   char *address;
597   char *plugin;
598   char *optionstr;
599   uint32_t options;
600
601   /* Format tcp.options.address:port */
602   address = NULL;
603   plugin = NULL;
604   optionstr = NULL;
605
606   if ((NULL == addr) || (addrlen == 0))
607   {
608     GNUNET_break(0);
609     return GNUNET_SYSERR;
610   }
611   if ('\0' != addr[addrlen - 1])
612   {
613     GNUNET_break(0);
614     return GNUNET_SYSERR;
615   }
616   if (strlen (addr) != addrlen - 1)
617   {
618     GNUNET_break(0);
619     return GNUNET_SYSERR;
620   }
621   plugin = GNUNET_strdup (addr);
622   optionstr = strchr (plugin, '.');
623   if (NULL == optionstr)
624   {
625     GNUNET_break(0);
626     GNUNET_free(plugin);
627     return GNUNET_SYSERR;
628   }
629   optionstr[0] = '\0';
630   optionstr++;
631   options = atol (optionstr);
632   address = strchr (optionstr, '.');
633   if (NULL == address)
634   {
635     GNUNET_break(0);
636     GNUNET_free(plugin);
637     return GNUNET_SYSERR;
638   }
639   address[0] = '\0';
640   address++;
641
642   if (GNUNET_OK
643       != GNUNET_STRINGS_to_address_ip (address, strlen (address),
644           &socket_address))
645   {
646     GNUNET_break(0);
647     GNUNET_free(plugin);
648     return GNUNET_SYSERR;
649   }
650
651   GNUNET_free(plugin);
652
653   switch (socket_address.ss_family)
654   {
655   case AF_INET:
656   {
657     struct IPv4UdpAddress *u4;
658     struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
659     u4 = GNUNET_new (struct IPv4UdpAddress);
660     u4->options = htonl (options);
661     u4->ipv4_addr = in4->sin_addr.s_addr;
662     u4->u4_port = in4->sin_port;
663     *buf = u4;
664     *added = sizeof(struct IPv4UdpAddress);
665     return GNUNET_OK;
666   }
667   case AF_INET6:
668   {
669     struct IPv6UdpAddress *u6;
670     struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
671     u6 = GNUNET_new (struct IPv6UdpAddress);
672     u6->options = htonl (options);
673     u6->ipv6_addr = in6->sin6_addr;
674     u6->u6_port = in6->sin6_port;
675     *buf = u6;
676     *added = sizeof(struct IPv6UdpAddress);
677     return GNUNET_OK;
678   }
679   default:
680     GNUNET_break(0);
681     return GNUNET_SYSERR;
682   }
683 }
684
685 static void
686 ppc_cancel_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
687 {
688   struct PrettyPrinterContext *ppc = cls;
689
690   ppc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
691   if (NULL != ppc->resolver_handle)
692   {
693     GNUNET_RESOLVER_request_cancel (ppc->resolver_handle);
694     ppc->resolver_handle = NULL;
695   }
696   GNUNET_CONTAINER_DLL_remove(ppc_dll_head, ppc_dll_tail, ppc);
697   GNUNET_free(ppc);
698 }
699
700 /**
701  * Append our port and forward the result.
702  *
703  * @param cls a 'struct PrettyPrinterContext'
704  * @param hostname result from DNS resolver
705  */
706 static void
707 append_port (void *cls, const char *hostname)
708 {
709   struct PrettyPrinterContext *ppc = cls;
710   struct PrettyPrinterContext *cur;
711   char *ret;
712
713   if (hostname == NULL )
714   {
715     ppc->asc (ppc->asc_cls, NULL );
716     GNUNET_CONTAINER_DLL_remove(ppc_dll_head, ppc_dll_tail, ppc);
717     GNUNET_SCHEDULER_cancel (ppc->timeout_task);
718     ppc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
719     ppc->resolver_handle = NULL;
720     GNUNET_free(ppc);
721     return;
722   }
723   for (cur = ppc_dll_head; (NULL != cur); cur = cur->next)
724   {
725     if (cur == ppc)
726       break;
727   }
728   if (NULL == cur)
729   {
730     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Invalid callback for PPC %p \n", ppc);
731     return;
732   }
733
734   if (GNUNET_YES == ppc->ipv6)
735     GNUNET_asprintf (&ret, "%s.%u.[%s]:%d", PLUGIN_NAME, ppc->options, hostname,
736         ppc->port);
737   else
738     GNUNET_asprintf (&ret, "%s.%u.%s:%d", PLUGIN_NAME, ppc->options, hostname,
739         ppc->port);
740   ppc->asc (ppc->asc_cls, ret);
741   GNUNET_free(ret);
742 }
743
744 /**
745  * Convert the transports address to a nice, human-readable
746  * format.
747  *
748  * @param cls closure
749  * @param type name of the transport that generated the address
750  * @param addr one of the addresses of the host, NULL for the last address
751  *        the specific address format depends on the transport
752  * @param addrlen length of the address
753  * @param numeric should (IP) addresses be displayed in numeric form?
754  * @param timeout after how long should we give up?
755  * @param asc function to call on each string
756  * @param asc_cls closure for @a asc
757  */
758 static void
759 udp_plugin_address_pretty_printer (void *cls, const char *type,
760     const void *addr, size_t addrlen, int numeric,
761     struct GNUNET_TIME_Relative timeout,
762     GNUNET_TRANSPORT_AddressStringCallback asc, void *asc_cls)
763 {
764   struct PrettyPrinterContext *ppc;
765   const void *sb;
766   size_t sbs;
767   struct sockaddr_in a4;
768   struct sockaddr_in6 a6;
769   const struct IPv4UdpAddress *u4;
770   const struct IPv6UdpAddress *u6;
771   uint16_t port;
772   uint32_t options;
773
774   if (addrlen == sizeof(struct IPv6UdpAddress))
775   {
776     u6 = addr;
777     memset (&a6, 0, sizeof(a6));
778     a6.sin6_family = AF_INET6;
779 #if HAVE_SOCKADDR_IN_SIN_LEN
780     a6.sin6_len = sizeof (a6);
781 #endif
782     a6.sin6_port = u6->u6_port;
783     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
784     port = ntohs (u6->u6_port);
785     options = ntohl (u6->options);
786     sb = &a6;
787     sbs = sizeof(a6);
788   }
789   else if (addrlen == sizeof(struct IPv4UdpAddress))
790   {
791     u4 = addr;
792     memset (&a4, 0, sizeof(a4));
793     a4.sin_family = AF_INET;
794 #if HAVE_SOCKADDR_IN_SIN_LEN
795     a4.sin_len = sizeof (a4);
796 #endif
797     a4.sin_port = u4->u4_port;
798     a4.sin_addr.s_addr = u4->ipv4_addr;
799     port = ntohs (u4->u4_port);
800     options = ntohl (u4->options);
801     sb = &a4;
802     sbs = sizeof(a4);
803   }
804   else
805   {
806     /* invalid address */
807     GNUNET_break_op(0);
808     asc (asc_cls, NULL );
809     return;
810   }
811   ppc = GNUNET_new (struct PrettyPrinterContext);
812   ppc->asc = asc;
813   ppc->asc_cls = asc_cls;
814   ppc->port = port;
815   ppc->options = options;
816   if (addrlen == sizeof(struct IPv6UdpAddress))
817     ppc->ipv6 = GNUNET_YES;
818   else
819     ppc->ipv6 = GNUNET_NO;
820   ppc->timeout_task = GNUNET_SCHEDULER_add_delayed (
821       GNUNET_TIME_relative_multiply (timeout, 2), &ppc_cancel_task, ppc);
822   GNUNET_CONTAINER_DLL_insert(ppc_dll_head, ppc_dll_tail, ppc);
823   ppc->resolver_handle = GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric,
824       timeout, &append_port, ppc);
825 }
826
827 static void
828 call_continuation (struct UDP_MessageWrapper *udpw, int result)
829 {
830   size_t overhead;
831
832   LOG(GNUNET_ERROR_TYPE_DEBUG,
833       "Calling continuation for %u byte message to `%s' with result %s\n",
834       udpw->payload_size, GNUNET_i2s (&udpw->session->target),
835       (GNUNET_OK == result) ? "OK" : "SYSERR");
836
837   if (udpw->msg_size >= udpw->payload_size)
838     overhead = udpw->msg_size - udpw->payload_size;
839   else
840     overhead = udpw->msg_size;
841
842   switch (result)
843   {
844   case GNUNET_OK:
845     switch (udpw->msg_type)
846     {
847     case MSG_UNFRAGMENTED:
848       if (NULL != udpw->cont)
849       {
850         /* Transport continuation */
851         udpw->cont (udpw->cont_cls, &udpw->session->target, result,
852             udpw->payload_size, udpw->msg_size);
853       }
854       GNUNET_STATISTICS_update (plugin->env->stats,
855           "# UDP, unfragmented msgs, messages, sent, success", 1, GNUNET_NO);
856       GNUNET_STATISTICS_update (plugin->env->stats,
857           "# UDP, unfragmented msgs, bytes payload, sent, success",
858           udpw->payload_size, GNUNET_NO);
859       GNUNET_STATISTICS_update (plugin->env->stats,
860           "# UDP, unfragmented msgs, bytes overhead, sent, success", overhead,
861           GNUNET_NO);
862       GNUNET_STATISTICS_update (plugin->env->stats,
863           "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO);
864       GNUNET_STATISTICS_update (plugin->env->stats,
865           "# UDP, total, bytes payload, sent", udpw->payload_size, GNUNET_NO);
866       break;
867     case MSG_FRAGMENTED_COMPLETE:
868       GNUNET_assert(NULL != udpw->frag_ctx);
869       if (udpw->frag_ctx->cont != NULL )
870         udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target,
871             GNUNET_OK, udpw->frag_ctx->payload_size,
872             udpw->frag_ctx->on_wire_size);
873       GNUNET_STATISTICS_update (plugin->env->stats,
874           "# UDP, fragmented msgs, messages, sent, success", 1, GNUNET_NO);
875       GNUNET_STATISTICS_update (plugin->env->stats,
876           "# UDP, fragmented msgs, bytes payload, sent, success",
877           udpw->payload_size, GNUNET_NO);
878       GNUNET_STATISTICS_update (plugin->env->stats,
879           "# UDP, fragmented msgs, bytes overhead, sent, success", overhead,
880           GNUNET_NO);
881       GNUNET_STATISTICS_update (plugin->env->stats,
882           "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO);
883       GNUNET_STATISTICS_update (plugin->env->stats,
884           "# UDP, total, bytes payload, sent", udpw->payload_size, GNUNET_NO);
885       GNUNET_STATISTICS_update (plugin->env->stats,
886           "# UDP, fragmented msgs, messages, pending", -1, GNUNET_NO);
887       break;
888     case MSG_FRAGMENTED:
889       /* Fragmented message: enqueue next fragment */
890       if (NULL != udpw->cont)
891         udpw->cont (udpw->cont_cls, &udpw->session->target, result,
892             udpw->payload_size, udpw->msg_size);
893       GNUNET_STATISTICS_update (plugin->env->stats,
894           "# UDP, fragmented msgs, fragments, sent, success", 1, GNUNET_NO);
895       GNUNET_STATISTICS_update (plugin->env->stats,
896           "# UDP, fragmented msgs, fragments bytes, sent, success",
897           udpw->msg_size, GNUNET_NO);
898       break;
899     case MSG_ACK:
900       /* No continuation */
901       GNUNET_STATISTICS_update (plugin->env->stats,
902           "# UDP, ACK msgs, messages, sent, success", 1, GNUNET_NO);
903       GNUNET_STATISTICS_update (plugin->env->stats,
904           "# UDP, ACK msgs, bytes overhead, sent, success", overhead,
905           GNUNET_NO);
906       GNUNET_STATISTICS_update (plugin->env->stats,
907           "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO);
908       break;
909     case MSG_BEACON:
910       GNUNET_break(0);
911       break;
912     default:
913       LOG(GNUNET_ERROR_TYPE_ERROR, "ERROR: %u\n", udpw->msg_type);
914       GNUNET_break(0);
915       break;
916     }
917     break;
918   case GNUNET_SYSERR:
919     switch (udpw->msg_type)
920     {
921     case MSG_UNFRAGMENTED:
922       /* Unfragmented message: failed to send */
923       if (NULL != udpw->cont)
924         udpw->cont (udpw->cont_cls, &udpw->session->target, result,
925             udpw->payload_size, overhead);
926       GNUNET_STATISTICS_update (plugin->env->stats,
927           "# UDP, unfragmented msgs, messages, sent, failure", 1, GNUNET_NO);
928       GNUNET_STATISTICS_update (plugin->env->stats,
929           "# UDP, unfragmented msgs, bytes payload, sent, failure",
930           udpw->payload_size, GNUNET_NO);
931       GNUNET_STATISTICS_update (plugin->env->stats,
932           "# UDP, unfragmented msgs, bytes overhead, sent, failure", overhead,
933           GNUNET_NO);
934       break;
935     case MSG_FRAGMENTED_COMPLETE:
936       GNUNET_assert(NULL != udpw->frag_ctx);
937       if (udpw->frag_ctx->cont != NULL )
938         udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target,
939             GNUNET_SYSERR, udpw->frag_ctx->payload_size,
940             udpw->frag_ctx->on_wire_size);
941       GNUNET_STATISTICS_update (plugin->env->stats,
942           "# UDP, fragmented msgs, messages, sent, failure", 1, GNUNET_NO);
943       GNUNET_STATISTICS_update (plugin->env->stats,
944           "# UDP, fragmented msgs, bytes payload, sent, failure",
945           udpw->payload_size, GNUNET_NO);
946       GNUNET_STATISTICS_update (plugin->env->stats,
947           "# UDP, fragmented msgs, bytes payload, sent, failure", overhead,
948           GNUNET_NO);
949       GNUNET_STATISTICS_update (plugin->env->stats,
950           "# UDP, fragmented msgs, bytes payload, sent, failure", overhead,
951           GNUNET_NO);
952       GNUNET_STATISTICS_update (plugin->env->stats,
953           "# UDP, fragmented msgs, messages, pending", -1, GNUNET_NO);
954       break;
955     case MSG_FRAGMENTED:
956       GNUNET_assert(NULL != udpw->frag_ctx);
957       /* Fragmented message: failed to send */
958       GNUNET_STATISTICS_update (plugin->env->stats,
959           "# UDP, fragmented msgs, fragments, sent, failure", 1, GNUNET_NO);
960       GNUNET_STATISTICS_update (plugin->env->stats,
961           "# UDP, fragmented msgs, fragments bytes, sent, failure",
962           udpw->msg_size, GNUNET_NO);
963       break;
964     case MSG_ACK:
965       /* ACK message: failed to send */
966       GNUNET_STATISTICS_update (plugin->env->stats,
967           "# UDP, ACK msgs, messages, sent, failure", 1, GNUNET_NO);
968       break;
969     case MSG_BEACON:
970       /* Beacon message: failed to send */
971       GNUNET_break(0);
972       break;
973     default:
974       GNUNET_break(0);
975       break;
976     }
977     break;
978   default:
979     GNUNET_break(0);
980     break;
981   }
982 }
983
984 /**
985  * Check if the given port is plausible (must be either our listen
986  * port or our advertised port).  If it is neither, we return
987  * #GNUNET_SYSERR.
988  *
989  * @param plugin global variables
990  * @param in_port port number to check
991  * @return #GNUNET_OK if port is either open_port or adv_port
992  */
993 static int
994 check_port (struct Plugin *plugin, uint16_t in_port)
995 {
996   if ((in_port == plugin->port) || (in_port == plugin->aport))
997     return GNUNET_OK;
998   return GNUNET_SYSERR;
999 }
1000
1001 /**
1002  * Function that will be called to check if a binary address for this
1003  * plugin is well-formed and corresponds to an address for THIS peer
1004  * (as per our configuration).  Naturally, if absolutely necessary,
1005  * plugins can be a bit conservative in their answer, but in general
1006  * plugins should make sure that the address does not redirect
1007  * traffic to a 3rd party that might try to man-in-the-middle our
1008  * traffic.
1009  *
1010  * @param cls closure, should be our handle to the Plugin
1011  * @param addr pointer to the address
1012  * @param addrlen length of @a addr
1013  * @return #GNUNET_OK if this is a plausible address for this peer
1014  *         and transport, #GNUNET_SYSERR if not
1015  *
1016  */
1017 static int
1018 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
1019 {
1020   struct Plugin *plugin = cls;
1021   struct IPv4UdpAddress *v4;
1022   struct IPv6UdpAddress *v6;
1023
1024   if ((addrlen != sizeof(struct IPv4UdpAddress))
1025       && (addrlen != sizeof(struct IPv6UdpAddress)))
1026   {
1027     GNUNET_break_op(0);
1028     return GNUNET_SYSERR;
1029   }
1030   if (addrlen == sizeof(struct IPv4UdpAddress))
1031   {
1032     v4 = (struct IPv4UdpAddress *) addr;
1033     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1034       return GNUNET_SYSERR;
1035     if (GNUNET_OK
1036         != GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
1037             sizeof(struct in_addr)))
1038       return GNUNET_SYSERR;
1039   }
1040   else
1041   {
1042     v6 = (struct IPv6UdpAddress *) addr;
1043     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1044     {
1045       GNUNET_break_op(0);
1046       return GNUNET_SYSERR;
1047     }
1048     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
1049       return GNUNET_SYSERR;
1050     if (GNUNET_OK
1051         != GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
1052             sizeof(struct in6_addr)))
1053       return GNUNET_SYSERR;
1054   }
1055   return GNUNET_OK;
1056 }
1057
1058 /**
1059  * Function to free last resources associated with a session.
1060  *
1061  * @param s session to free
1062  */
1063 static void
1064 free_session (struct Session *s)
1065 {
1066   if (NULL != s->frag_ctx)
1067   {
1068     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, NULL, NULL );
1069     GNUNET_free(s->frag_ctx);
1070     s->frag_ctx = NULL;
1071   }
1072   GNUNET_free(s);
1073 }
1074
1075 static void
1076 dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1077 {
1078   if (plugin->bytes_in_buffer < udpw->msg_size)
1079     GNUNET_break(0);
1080   else
1081   {
1082     GNUNET_STATISTICS_update (plugin->env->stats,
1083         "# UDP, total, bytes in buffers", -(long long) udpw->msg_size,
1084         GNUNET_NO);
1085     plugin->bytes_in_buffer -= udpw->msg_size;
1086   }
1087   GNUNET_STATISTICS_update (plugin->env->stats, "# UDP, total, msgs in buffers",
1088       -1, GNUNET_NO);
1089   if (udpw->session->address->address_length == sizeof(struct IPv4UdpAddress))
1090     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head,
1091         plugin->ipv4_queue_tail, udpw);
1092   else if (udpw->session->address->address_length == sizeof(struct IPv6UdpAddress))
1093     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head,
1094         plugin->ipv6_queue_tail, udpw);
1095   else
1096     GNUNET_break (0);
1097 }
1098
1099 static void
1100 fragmented_message_done (struct UDP_FragmentationContext *fc, int result)
1101 {
1102   struct UDP_MessageWrapper *udpw;
1103   struct UDP_MessageWrapper *tmp;
1104   struct UDP_MessageWrapper dummy;
1105   struct Session *s = fc->session;
1106
1107   LOG(GNUNET_ERROR_TYPE_DEBUG,
1108       "%p : Fragmented message removed with result %s\n", fc,
1109       (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1110
1111   /* Call continuation for fragmented message */
1112   memset (&dummy, 0, sizeof(dummy));
1113   dummy.msg_type = MSG_FRAGMENTED_COMPLETE;
1114   dummy.msg_size = s->frag_ctx->on_wire_size;
1115   dummy.payload_size = s->frag_ctx->payload_size;
1116   dummy.frag_ctx = s->frag_ctx;
1117   dummy.cont = NULL;
1118   dummy.cont_cls = NULL;
1119   dummy.session = s;
1120
1121   call_continuation (&dummy, result);
1122
1123   /* Remove leftover fragments from queue */
1124   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1125   {
1126     udpw = plugin->ipv6_queue_head;
1127     while (NULL != udpw)
1128     {
1129       tmp = udpw->next;
1130       if ((udpw->frag_ctx != NULL )&& (udpw->frag_ctx == s->frag_ctx)){
1131       dequeue (plugin, udpw);
1132       call_continuation (udpw, GNUNET_SYSERR);
1133       GNUNET_free (udpw);
1134     }
1135       udpw = tmp;
1136     }
1137   }
1138   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1139   {
1140     udpw = plugin->ipv4_queue_head;
1141     while (udpw != NULL )
1142     {
1143       tmp = udpw->next;
1144       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
1145       {
1146         dequeue (plugin, udpw);
1147         call_continuation (udpw, GNUNET_SYSERR);
1148         GNUNET_free(udpw);
1149       }
1150       udpw = tmp;
1151     }
1152   }
1153
1154   /* Destroy fragmentation context */
1155   GNUNET_FRAGMENT_context_destroy (fc->frag, &s->last_expected_msg_delay,
1156       &s->last_expected_ack_delay);
1157   s->frag_ctx = NULL;
1158   GNUNET_free(fc);
1159 }
1160
1161 /**
1162  * Functions with this signature are called whenever we need
1163  * to close a session due to a disconnect or failure to
1164  * establish a connection.
1165  *
1166  * @param cls closure with the `struct Plugin`
1167  * @param s session to close down
1168  * @return #GNUNET_OK on success
1169  */
1170 static int
1171 udp_disconnect_session (void *cls, struct Session *s)
1172 {
1173   struct Plugin *plugin = cls;
1174   struct UDP_MessageWrapper *udpw;
1175   struct UDP_MessageWrapper *next;
1176
1177   GNUNET_assert(GNUNET_YES != s->in_destroy);
1178   LOG(GNUNET_ERROR_TYPE_DEBUG, "Session %p to peer `%s' address ended\n", s,
1179       GNUNET_i2s (&s->target),
1180       udp_address_to_string (NULL, s->address->address, s->address->address_length));
1181   /* stop timeout task */
1182   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
1183   {
1184     GNUNET_SCHEDULER_cancel (s->timeout_task);
1185     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1186   }
1187   if (NULL != s->frag_ctx)
1188   {
1189     /* Remove fragmented message due to disconnect */
1190     fragmented_message_done (s->frag_ctx, GNUNET_SYSERR);
1191   }
1192
1193   next = plugin->ipv4_queue_head;
1194   while (NULL != (udpw = next))
1195   {
1196     next = udpw->next;
1197     if (udpw->session == s)
1198     {
1199       dequeue (plugin, udpw);
1200       call_continuation (udpw, GNUNET_SYSERR);
1201       GNUNET_free(udpw);
1202     }
1203   }
1204   next = plugin->ipv6_queue_head;
1205   while (NULL != (udpw = next))
1206   {
1207     next = udpw->next;
1208     if (udpw->session == s)
1209     {
1210       dequeue (plugin, udpw);
1211       call_continuation (udpw, GNUNET_SYSERR);
1212       GNUNET_free(udpw);
1213     }
1214   }
1215   plugin->env->session_end (plugin->env->cls, &s->target, s);
1216
1217   if (NULL != s->frag_ctx)
1218   {
1219     if (NULL != s->frag_ctx->cont)
1220     {
1221       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR,
1222           s->frag_ctx->payload_size, s->frag_ctx->on_wire_size);
1223       LOG(GNUNET_ERROR_TYPE_DEBUG,
1224           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1225           GNUNET_i2s (&s->target));
1226     }
1227   }
1228
1229   GNUNET_assert(
1230       GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (plugin->sessions, &s->target, s));
1231   GNUNET_STATISTICS_set (plugin->env->stats, "# UDP, sessions active",
1232       GNUNET_CONTAINER_multipeermap_size (plugin->sessions), GNUNET_NO);
1233   if (s->rc > 0)
1234     s->in_destroy = GNUNET_YES;
1235   else
1236   {
1237     GNUNET_HELLO_address_free (s->address);
1238     free_session (s);
1239   }
1240   return GNUNET_OK;
1241 }
1242
1243 /**
1244  * Function that is called to get the keepalive factor.
1245  * GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
1246  * calculate the interval between keepalive packets.
1247  *
1248  * @param cls closure with the `struct Plugin`
1249  * @return keepalive factor
1250  */
1251 static unsigned int
1252 udp_query_keepalive_factor (void *cls)
1253 {
1254   return 15;
1255 }
1256
1257 /**
1258  * Destroy a session, plugin is being unloaded.
1259  *
1260  * @param cls the `struct Plugin`
1261  * @param key hash of public key of target peer
1262  * @param value a `struct PeerSession *` to clean up
1263  * @return #GNUNET_OK (continue to iterate)
1264  */
1265 static int
1266 disconnect_and_free_it (void *cls, const struct GNUNET_PeerIdentity *key,
1267     void *value)
1268 {
1269   struct Plugin *plugin = cls;
1270
1271   udp_disconnect_session (plugin, value);
1272   return GNUNET_OK;
1273 }
1274
1275 /**
1276  * Disconnect from a remote node.  Clean up session if we have one for
1277  * this peer.
1278  *
1279  * @param cls closure for this call (should be handle to Plugin)
1280  * @param target the peeridentity of the peer to disconnect
1281  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
1282  */
1283 static void
1284 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
1285 {
1286   struct Plugin *plugin = cls;
1287
1288   LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from peer `%s'\n",
1289       GNUNET_i2s (target));
1290   /* Clean up sessions */
1291   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions, target,
1292       &disconnect_and_free_it, plugin);
1293 }
1294
1295 /**
1296  * Session was idle, so disconnect it
1297  *
1298  * @param cls the `struct Session` to time out
1299  * @param tc scheduler context
1300  */
1301 static void
1302 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1303 {
1304   struct Session *s = cls;
1305
1306   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1307   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1308       "Session %p was idle for %s, disconnecting\n", s,
1309       GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT, GNUNET_YES));
1310   /* call session destroy function */
1311   udp_disconnect_session (plugin, s);
1312 }
1313
1314 /**
1315  * Increment session timeout due to activity
1316  *
1317  * @param s session to reschedule timeout activity for
1318  */
1319 static void
1320 reschedule_session_timeout (struct Session *s)
1321 {
1322   if (GNUNET_YES == s->in_destroy)
1323     return;
1324   GNUNET_assert(GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1325   GNUNET_SCHEDULER_cancel (s->timeout_task);
1326   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
1327       &session_timeout, s);
1328   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Timeout restarted for session %p\n", s);
1329 }
1330
1331 static struct Session *
1332 create_session (struct Plugin *plugin,
1333     const struct GNUNET_HELLO_Address *address)
1334 {
1335   struct Session *s;
1336
1337   s = GNUNET_new (struct Session);
1338   s->address = GNUNET_HELLO_address_copy (address);
1339   s->target = address->peer;
1340   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (
1341       GNUNET_TIME_UNIT_MILLISECONDS, 250);
1342   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1343   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1344   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1345   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
1346       &session_timeout, s);
1347   return s;
1348 }
1349
1350 static int
1351 session_cmp_it (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
1352 {
1353   struct SessionCompareContext * cctx = cls;
1354   const struct GNUNET_HELLO_Address *address = cctx->address;
1355   struct Session *s = value;
1356
1357   LOG(GNUNET_ERROR_TYPE_DEBUG, "Comparing address %s <-> %s\n",
1358       udp_address_to_string (NULL, (void *) address->address, address->address_length),
1359       udp_address_to_string (NULL, s->address->address, s->address->address_length));
1360
1361   if (0 == GNUNET_HELLO_address_cmp(s->address, cctx->address))
1362   {
1363     cctx->res = s;
1364     return GNUNET_NO;
1365   }
1366   return GNUNET_YES;
1367 }
1368
1369 /**
1370  * Function obtain the network type for a session
1371  *
1372  * @param cls closure ('struct Plugin*')
1373  * @param session the session
1374  * @return the network type
1375  */
1376 static enum GNUNET_ATS_Network_Type
1377 udp_get_network (void *cls, struct Session *session)
1378 {
1379   return ntohl (session->ats.value);
1380 }
1381
1382 /**
1383  * Creates a new outbound session the transport service will use to
1384  * send data to the peer
1385  *
1386  * @param cls the plugin
1387  * @param address the address
1388  * @return the session or NULL of max connections exceeded
1389  */
1390 static struct Session *
1391 udp_plugin_lookup_session (void *cls,
1392     const struct GNUNET_HELLO_Address *address)
1393 {
1394   struct Plugin * plugin = cls;
1395   struct IPv6UdpAddress * udp_a6;
1396   struct IPv4UdpAddress * udp_a4;
1397
1398   GNUNET_assert(plugin != NULL);
1399   GNUNET_assert(address != NULL);
1400
1401   if ((address->address == NULL )||
1402   ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1403       (address->address_length != sizeof (struct IPv6UdpAddress)))){
1404   LOG (GNUNET_ERROR_TYPE_WARNING,
1405       _("Trying to create session for address of unexpected length %u (should be %u or %u)\n"),
1406       address->address_length,
1407       sizeof (struct IPv4UdpAddress),
1408       sizeof (struct IPv6UdpAddress));
1409   return NULL;
1410 }
1411
1412   if (address->address_length == sizeof(struct IPv4UdpAddress))
1413   {
1414     if (plugin->sockv4 == NULL )
1415       return NULL ;
1416     udp_a4 = (struct IPv4UdpAddress *) address->address;
1417     if (udp_a4->u4_port == 0)
1418       return NULL ;
1419   }
1420
1421   if (address->address_length == sizeof(struct IPv6UdpAddress))
1422   {
1423     if (plugin->sockv6 == NULL )
1424       return NULL ;
1425     udp_a6 = (struct IPv6UdpAddress *) address->address;
1426     if (udp_a6->u6_port == 0)
1427       return NULL ;
1428   }
1429
1430   /* check if session already exists */
1431   struct SessionCompareContext cctx;
1432   cctx.address = address;
1433   cctx.res = NULL;
1434   LOG(GNUNET_ERROR_TYPE_DEBUG,
1435       "Looking for existing session for peer `%s' `%s' \n",
1436       GNUNET_i2s (&address->peer),
1437       udp_address_to_string(NULL, address->address, address->address_length));
1438   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions, &address->peer,
1439       session_cmp_it, &cctx);
1440   if (cctx.res != NULL )
1441   {
1442     LOG(GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
1443     return cctx.res;
1444   }
1445   return NULL ;
1446 }
1447
1448
1449 static struct Session *
1450 udp_plugin_create_session (void *cls,
1451     const struct GNUNET_HELLO_Address *address)
1452 {
1453   struct Session *s;
1454   struct IPv4UdpAddress *udp_v4;
1455   struct IPv6UdpAddress *udp_v6;
1456
1457   s = create_session (plugin, address);
1458   if (sizeof (struct IPv4UdpAddress) == address->address_length)
1459   {
1460     struct sockaddr_in v4;
1461     udp_v4 = (struct IPv4UdpAddress *) address->address;
1462     memset (&v4, '\0', sizeof (v4));
1463     v4.sin_family = AF_INET;
1464 #if HAVE_SOCKADDR_IN_SIN_LEN
1465     v4.sin_len = sizeof (struct sockaddr_in);
1466 #endif
1467     v4.sin_port = udp_v4->u4_port;
1468     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
1469     s->ats = plugin->env->get_address_type (plugin->env->cls,
1470         (const struct sockaddr *) &v4, sizeof (v4));
1471   }
1472   else if (sizeof (struct IPv6UdpAddress) == address->address_length)
1473   {
1474     struct sockaddr_in6 v6;
1475     udp_v6 = (struct IPv6UdpAddress *) address->address;
1476     memset (&v6, '\0', sizeof (v6));
1477     v6.sin6_family = AF_INET6;
1478 #if HAVE_SOCKADDR_IN_SIN_LEN
1479     v6.sin6_len = sizeof (struct sockaddr_in6);
1480 #endif
1481     v6.sin6_port = udp_v6->u6_port;
1482     v6.sin6_addr = udp_v6->ipv6_addr;
1483     s->ats = plugin->env->get_address_type (plugin->env->cls,
1484         (const struct sockaddr *) &v6, sizeof (v6));
1485   }
1486
1487   if (NULL == s)
1488     return NULL ; /* protocol not supported or address invalid */
1489   LOG(GNUNET_ERROR_TYPE_DEBUG,
1490       "Creating new %s session %p for peer `%s' address `%s'\n",
1491       GNUNET_HELLO_address_check_option (address, GNUNET_HELLO_ADDRESS_INFO_INBOUND) ? "inbound" : "outbound",
1492       s, GNUNET_i2s (&address->peer),
1493       udp_address_to_string( NULL,address->address,address->address_length));
1494   GNUNET_assert(
1495       GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (plugin->sessions, &s->target, s, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1496   GNUNET_STATISTICS_set (plugin->env->stats, "# UDP, sessions active",
1497       GNUNET_CONTAINER_multipeermap_size (plugin->sessions), GNUNET_NO);
1498   return s;
1499 }
1500
1501 static void
1502 udp_plugin_update_session_timeout (void *cls,
1503     const struct GNUNET_PeerIdentity *peer, struct Session *session)
1504 {
1505   if (GNUNET_YES
1506       != GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions, peer,
1507           session))
1508   {
1509     GNUNET_break(0);
1510     return;
1511   }
1512
1513   /* Reschedule session timeout */
1514   reschedule_session_timeout (session);
1515 }
1516
1517 /**
1518  * Creates a new outbound session the transport service will use to send data to the
1519  * peer
1520  *
1521  * @param cls the plugin
1522  * @param address the address
1523  * @return the session or NULL of max connections exceeded
1524  */
1525 static struct Session *
1526 udp_plugin_get_session (void *cls, const struct GNUNET_HELLO_Address *address)
1527 {
1528   struct Session *s;
1529
1530   if (NULL == address)
1531   {
1532     GNUNET_break(0);
1533     return NULL ;
1534   }
1535   if ((address->address_length != sizeof(struct IPv4UdpAddress))
1536       && (address->address_length != sizeof(struct IPv6UdpAddress)))
1537     return NULL ;
1538
1539   /* otherwise create new */
1540   if (NULL != (s = udp_plugin_lookup_session (cls, address)))
1541     return s;
1542   return udp_plugin_create_session (cls, address);
1543 }
1544
1545 static void
1546 enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1547 {
1548   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1549     GNUNET_break(0);
1550   else
1551   {
1552     GNUNET_STATISTICS_update (plugin->env->stats,
1553         "# UDP, total, bytes in buffers", udpw->msg_size, GNUNET_NO);
1554     plugin->bytes_in_buffer += udpw->msg_size;
1555   }
1556   GNUNET_STATISTICS_update (plugin->env->stats, "# UDP, total, msgs in buffers",
1557       1, GNUNET_NO);
1558   if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
1559     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1560         plugin->ipv4_queue_tail, udpw);
1561   else if (udpw->session->address->address_length == sizeof (struct IPv6UdpAddress))
1562     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head,
1563         plugin->ipv6_queue_tail, udpw);
1564   else
1565     GNUNET_break (0);
1566 }
1567
1568 /**
1569  * Fragment message was transmitted via UDP, let fragmentation know
1570  * to send the next fragment now.
1571  *
1572  * @param cls the 'struct UDPMessageWrapper' of the fragment
1573  * @param target destination peer (ignored)
1574  * @param result GNUNET_OK on success (ignored)
1575  * @param payload bytes payload sent
1576  * @param physical bytes physical sent
1577  */
1578 static void
1579 send_next_fragment (void *cls, const struct GNUNET_PeerIdentity *target,
1580     int result, size_t payload, size_t physical)
1581 {
1582   struct UDP_MessageWrapper *udpw = cls;
1583
1584   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1585 }
1586
1587 /**
1588  * Function that is called with messages created by the fragmentation
1589  * module.  In the case of the 'proc' callback of the
1590  * GNUNET_FRAGMENT_context_create function, this function must
1591  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
1592  *
1593  * @param cls closure, the 'struct FragmentationContext'
1594  * @param msg the message that was created
1595  */
1596 static void
1597 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1598 {
1599   struct UDP_FragmentationContext *frag_ctx = cls;
1600   struct Plugin *plugin = frag_ctx->plugin;
1601   struct UDP_MessageWrapper * udpw;
1602   size_t msg_len = ntohs (msg->size);
1603
1604   LOG(GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes\n", msg_len);
1605   frag_ctx->fragments_used++;
1606   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1607   udpw->session = frag_ctx->session;
1608   udpw->msg_buf = (char *) &udpw[1];
1609   udpw->msg_size = msg_len;
1610   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
1611   udpw->cont = &send_next_fragment;
1612   udpw->cont_cls = udpw;
1613   udpw->timeout = frag_ctx->timeout;
1614   udpw->frag_ctx = frag_ctx;
1615   udpw->msg_type = MSG_FRAGMENTED;
1616   memcpy (udpw->msg_buf, msg, msg_len);
1617   enqueue (plugin, udpw);
1618   schedule_select (plugin);
1619 }
1620
1621 /**
1622  * Function that can be used by the transport service to transmit
1623  * a message using the plugin.   Note that in the case of a
1624  * peer disconnecting, the continuation MUST be called
1625  * prior to the disconnect notification itself.  This function
1626  * will be called with this peer's HELLO message to initiate
1627  * a fresh connection to another peer.
1628  *
1629  * @param cls closure
1630  * @param s which session must be used
1631  * @param msgbuf the message to transmit
1632  * @param msgbuf_size number of bytes in 'msgbuf'
1633  * @param priority how important is the message (most plugins will
1634  *                 ignore message priority and just FIFO)
1635  * @param to how long to wait at most for the transmission (does not
1636  *                require plugins to discard the message after the timeout,
1637  *                just advisory for the desired delay; most plugins will ignore
1638  *                this as well)
1639  * @param cont continuation to call once the message has
1640  *        been transmitted (or if the transport is ready
1641  *        for the next transmission call; or if the
1642  *        peer disconnected...); can be NULL
1643  * @param cont_cls closure for cont
1644  * @return number of bytes used (on the physical network, with overheads);
1645  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1646  *         and does NOT mean that the message was not transmitted (DV)
1647  */
1648 static ssize_t
1649 udp_plugin_send (void *cls, struct Session *s, const char *msgbuf,
1650     size_t msgbuf_size, unsigned int priority, struct GNUNET_TIME_Relative to,
1651     GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1652 {
1653   struct Plugin *plugin = cls;
1654   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
1655   struct UDP_FragmentationContext * frag_ctx;
1656   struct UDP_MessageWrapper * udpw;
1657   struct UDPMessage *udp;
1658   char mbuf[udpmlen];
1659   GNUNET_assert(plugin != NULL);
1660   GNUNET_assert(s != NULL);
1661
1662   if ((s->address->address_length == sizeof(struct IPv6UdpAddress)) && (plugin->sockv6 == NULL ))
1663   {
1664     return GNUNET_SYSERR;
1665   }
1666   if ((s->address->address_length == sizeof(struct IPv4UdpAddress)) && (plugin->sockv4 == NULL ))
1667   {
1668     return GNUNET_SYSERR;
1669   }
1670   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1671   {
1672     GNUNET_break(0);
1673     return GNUNET_SYSERR;
1674   }
1675   if (GNUNET_YES
1676       != GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1677           &s->target, s))
1678   {
1679     GNUNET_break(0);
1680     return GNUNET_SYSERR;
1681   }
1682   LOG(GNUNET_ERROR_TYPE_DEBUG,
1683       "UDP transmits %u-byte message to `%s' using address `%s'\n", udpmlen,
1684       GNUNET_i2s (&s->target), udp_address_to_string (NULL, s->address->address, s->address->address_length));
1685
1686   /* Message */
1687   udp = (struct UDPMessage *) mbuf;
1688   udp->header.size = htons (udpmlen);
1689   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1690   udp->reserved = htonl (0);
1691   udp->sender = *plugin->env->my_identity;
1692
1693   /* We do not update the session time out here!
1694    * Otherwise this session will not timeout since we send keep alive before
1695    * session can timeout
1696    *
1697    * For UDP we update session timeout only on receive, this will cover keep
1698    * alives, since remote peer will reply with keep alive response!
1699    */
1700   if (udpmlen <= UDP_MTU)
1701   {
1702     /* unfragmented message */
1703     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
1704     udpw->session = s;
1705     udpw->msg_buf = (char *) &udpw[1];
1706     udpw->msg_size = udpmlen; /* message size with UDP overhead */
1707     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
1708     udpw->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), to);
1709     udpw->cont = cont;
1710     udpw->cont_cls = cont_cls;
1711     udpw->frag_ctx = NULL;
1712     udpw->msg_type = MSG_UNFRAGMENTED;
1713     memcpy (udpw->msg_buf, udp, sizeof(struct UDPMessage));
1714     memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)], msgbuf, msgbuf_size);
1715     enqueue (plugin, udpw);
1716
1717     GNUNET_STATISTICS_update (plugin->env->stats,
1718         "# UDP, unfragmented msgs, messages, attempt", 1, GNUNET_NO);
1719     GNUNET_STATISTICS_update (plugin->env->stats,
1720         "# UDP, unfragmented msgs, bytes payload, attempt", udpw->payload_size,
1721         GNUNET_NO);
1722   }
1723   else
1724   {
1725     /* fragmented message */
1726     if (s->frag_ctx != NULL )
1727       return GNUNET_SYSERR;
1728     memcpy (&udp[1], msgbuf, msgbuf_size);
1729     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
1730     frag_ctx->plugin = plugin;
1731     frag_ctx->session = s;
1732     frag_ctx->cont = cont;
1733     frag_ctx->cont_cls = cont_cls;
1734     frag_ctx->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
1735         to);
1736     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1737     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
1738     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1739         UDP_MTU, &plugin->tracker, s->last_expected_msg_delay,
1740         s->last_expected_ack_delay, &udp->header, &enqueue_fragment, frag_ctx);
1741     s->frag_ctx = frag_ctx;
1742     GNUNET_STATISTICS_update (plugin->env->stats,
1743         "# UDP, fragmented msgs, messages, pending", 1, GNUNET_NO);
1744     GNUNET_STATISTICS_update (plugin->env->stats,
1745         "# UDP, fragmented msgs, messages, attempt", 1, GNUNET_NO);
1746     GNUNET_STATISTICS_update (plugin->env->stats,
1747         "# UDP, fragmented msgs, bytes payload, attempt",
1748         frag_ctx->payload_size, GNUNET_NO);
1749   }
1750   schedule_select (plugin);
1751   return udpmlen;
1752 }
1753
1754 /**
1755  * Our external IP address/port mapping has changed.
1756  *
1757  * @param cls closure, the 'struct LocalAddrList'
1758  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1759  *     the previous (now invalid) one
1760  * @param addr either the previous or the new public IP address
1761  * @param addrlen actual lenght of the address
1762  */
1763 static void
1764 udp_nat_port_map_callback (void *cls, int add_remove,
1765     const struct sockaddr *addr, socklen_t addrlen)
1766 {
1767   struct Plugin *plugin = cls;
1768   struct GNUNET_HELLO_Address *address;
1769   struct IPv4UdpAddress u4;
1770   struct IPv6UdpAddress u6;
1771   void *arg;
1772   size_t args;
1773
1774   LOG(GNUNET_ERROR_TYPE_INFO, "NAT notification to %s address `%s'\n",
1775       (GNUNET_YES == add_remove) ? "add" : "remove",
1776       GNUNET_a2s (addr, addrlen));
1777
1778   /* convert 'address' to our internal format */
1779   switch (addr->sa_family)
1780   {
1781   case AF_INET:
1782     GNUNET_assert(addrlen == sizeof(struct sockaddr_in));
1783     memset (&u4, 0, sizeof(u4));
1784     u4.options = htonl (myoptions);
1785     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1786     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1787     if (0 == ((struct sockaddr_in *) addr)->sin_port)
1788       return;
1789     arg = &u4;
1790     args = sizeof(struct IPv4UdpAddress);
1791     break;
1792   case AF_INET6:
1793     GNUNET_assert(addrlen == sizeof(struct sockaddr_in6));
1794     memset (&u6, 0, sizeof(u6));
1795     u6.options = htonl (myoptions);
1796     if (0 == ((struct sockaddr_in6 *) addr)->sin6_port)
1797       return;
1798     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1799         sizeof(struct in6_addr));
1800     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1801     arg = &u6;
1802     args = sizeof(struct IPv6UdpAddress);
1803     break;
1804   default:
1805     GNUNET_break(0);
1806     return;
1807   }
1808   /* modify our published address list */
1809   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
1810       PLUGIN_NAME, arg, args, GNUNET_HELLO_ADDRESS_INFO_NONE);
1811   plugin->env->notify_address (plugin->env->cls, add_remove, address);
1812   GNUNET_HELLO_address_free (address);
1813 }
1814
1815 /**
1816  * Message tokenizer has broken up an incomming message. Pass it on
1817  * to the service.
1818  *
1819  * @param cls the 'struct Plugin'
1820  * @param client the `struct SourceInformation`
1821  * @param hdr the actual message
1822  * @return #GNUNET_OK (always)
1823  */
1824 static int
1825 process_inbound_tokenized_messages (void *cls, void *client,
1826     const struct GNUNET_MessageHeader *hdr)
1827 {
1828   struct Plugin *plugin = cls;
1829   struct SourceInformation *si = client;
1830   struct GNUNET_TIME_Relative delay;
1831
1832   GNUNET_assert(si->session != NULL);
1833   if (GNUNET_YES == si->session->in_destroy)
1834     return GNUNET_OK;
1835   /* setup ATS */
1836   GNUNET_break(ntohl (si->session->ats.value) != GNUNET_ATS_NET_UNSPECIFIED);
1837   reschedule_session_timeout (si->session);
1838   delay = plugin->env->receive (plugin->env->cls, si->session->address, si->session, hdr);
1839   plugin->env->update_address_metrics (plugin->env->cls,
1840       si->session->address, si->session,
1841       &si->session->ats, 1);
1842   si->session->flow_delay_for_other_peer = delay;
1843   return GNUNET_OK;
1844 }
1845
1846 /**
1847  * We've received a UDP Message.  Process it (pass contents to main service).
1848  *
1849  * @param plugin plugin context
1850  * @param msg the message
1851  * @param sender_addr sender address
1852  * @param sender_addr_len number of bytes in sender_addr
1853  */
1854 static void
1855 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1856     const struct sockaddr *sender_addr, socklen_t sender_addr_len)
1857 {
1858   struct SourceInformation si;
1859   struct Session * s;
1860   struct GNUNET_HELLO_Address *address;
1861   struct IPv4UdpAddress u4;
1862   struct IPv6UdpAddress u6;
1863   const void *arg;
1864   size_t args;
1865
1866   if (0 != ntohl (msg->reserved))
1867   {
1868     GNUNET_break_op(0);
1869     return;
1870   }
1871   if (ntohs (msg->header.size)
1872       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
1873   {
1874     GNUNET_break_op(0);
1875     return;
1876   }
1877
1878   /* convert address */
1879   switch (sender_addr->sa_family)
1880   {
1881   case AF_INET:
1882     GNUNET_assert(sender_addr_len == sizeof(struct sockaddr_in));
1883     memset (&u4, 0, sizeof(u4));
1884     u6.options = htonl (0);
1885     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1886     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1887     arg = &u4;
1888     args = sizeof(u4);
1889     break;
1890   case AF_INET6:
1891     GNUNET_assert(sender_addr_len == sizeof(struct sockaddr_in6));
1892     memset (&u6, 0, sizeof(u6));
1893     u6.options = htonl (0);
1894     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1895     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1896     arg = &u6;
1897     args = sizeof(u6);
1898     break;
1899   default:
1900     GNUNET_break(0);
1901     return;
1902   }
1903   LOG(GNUNET_ERROR_TYPE_DEBUG,
1904       "Received message with %u bytes from peer `%s' at `%s'\n",
1905       (unsigned int ) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1906       GNUNET_a2s (sender_addr, sender_addr_len));
1907
1908   address = GNUNET_HELLO_address_allocate ( &msg->sender, PLUGIN_NAME,
1909       arg, args, GNUNET_HELLO_ADDRESS_INFO_INBOUND);
1910   if (NULL == (s = udp_plugin_lookup_session (plugin, address)))
1911   {
1912     s = udp_plugin_create_session (plugin, address);
1913     plugin->env->session_start (NULL, address, s, NULL, 0);
1914   }
1915   GNUNET_free(address);
1916
1917   /* iterate over all embedded messages */
1918   si.session = s;
1919   si.sender = msg->sender;
1920   si.arg = arg;
1921   si.args = args;
1922   s->rc++;
1923   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1924       ntohs (msg->header.size) - sizeof(struct UDPMessage), GNUNET_YES,
1925       GNUNET_NO);
1926   s->rc--;
1927   if ((0 == s->rc) && (GNUNET_YES == s->in_destroy))
1928     free_session (s);
1929 }
1930
1931 /**
1932  * Scan the heap for a receive context with the given address.
1933  *
1934  * @param cls the `struct FindReceiveContext`
1935  * @param node internal node of the heap
1936  * @param element value stored at the node (a 'struct ReceiveContext')
1937  * @param cost cost associated with the node
1938  * @return #GNUNET_YES if we should continue to iterate,
1939  *         #GNUNET_NO if not.
1940  */
1941 static int
1942 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1943     void *element, GNUNET_CONTAINER_HeapCostType cost)
1944 {
1945   struct FindReceiveContext *frc = cls;
1946   struct DefragContext *e = element;
1947
1948   if ((frc->addr_len == e->addr_len)
1949       && (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1950   {
1951     frc->rc = e;
1952     return GNUNET_NO;
1953   }
1954   return GNUNET_YES;
1955 }
1956
1957 /**
1958  * Process a defragmented message.
1959  *
1960  * @param cls the 'struct ReceiveContext'
1961  * @param msg the message
1962  */
1963 static void
1964 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1965 {
1966   struct DefragContext *rc = cls;
1967
1968   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1969   {
1970     GNUNET_break(0);
1971     return;
1972   }
1973   if (ntohs (msg->size) < sizeof(struct UDPMessage))
1974   {
1975     GNUNET_break(0);
1976     return;
1977   }
1978   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1979       rc->src_addr, rc->addr_len);
1980 }
1981
1982
1983 /**
1984  * Context to lookup a session based on a IP address
1985  */
1986 struct LookupContext
1987 {
1988   /**
1989    * The result
1990    */
1991   struct Session *res;
1992
1993   /**
1994    * The socket address
1995    */
1996   const struct sockaddr *address;
1997
1998   /**
1999    * The socket address length
2000    */
2001   size_t addr_len;
2002
2003   /**
2004    * Is a fragmentation context required for the session
2005    */
2006   int must_have_frag_ctx;
2007 };
2008
2009 static int
2010 lookup_session_by_sockaddr_it (void *cls, const struct GNUNET_PeerIdentity *key,
2011     void *value)
2012 {
2013   struct LookupContext *l_ctx = cls;
2014   struct Session * s = value;
2015   struct IPv4UdpAddress u4;
2016   struct IPv6UdpAddress u6;
2017   void *arg;
2018   size_t args;
2019
2020   /* convert address */
2021   switch (l_ctx->address->sa_family)
2022   {
2023   case AF_INET:
2024     GNUNET_assert(l_ctx->addr_len == sizeof(struct sockaddr_in));
2025     memset (&u4, 0, sizeof(u4));
2026     u6.options = htonl (0);
2027     u4.ipv4_addr = ((struct sockaddr_in *) l_ctx->address)->sin_addr.s_addr;
2028     u4.u4_port = ((struct sockaddr_in *) l_ctx->address)->sin_port;
2029     arg = &u4;
2030     args = sizeof(u4);
2031     break;
2032   case AF_INET6:
2033     GNUNET_assert(l_ctx->addr_len == sizeof(struct sockaddr_in6));
2034     memset (&u6, 0, sizeof(u6));
2035     u6.options = htonl (0);
2036     u6.ipv6_addr = ((struct sockaddr_in6 *) l_ctx->address)->sin6_addr;
2037     u6.u6_port = ((struct sockaddr_in6 *) l_ctx->address)->sin6_port;
2038     arg = &u6;
2039     args = sizeof(u6);
2040     break;
2041   default:
2042     GNUNET_break(0);
2043     return GNUNET_YES;
2044   }
2045
2046
2047   if ((GNUNET_YES == l_ctx->must_have_frag_ctx) && (NULL == s->frag_ctx))
2048     return GNUNET_YES;
2049
2050   /* Does not compare peer identities but addresses */
2051   if ((args == s->address->address_length) &&
2052       (0 == memcmp (arg, s->address->address, args)))
2053   {
2054     l_ctx->res = s;
2055     return GNUNET_YES;
2056   }
2057   return GNUNET_YES;
2058 }
2059
2060 /**
2061  * Transmit an acknowledgement.
2062  *
2063  * @param cls the 'struct ReceiveContext'
2064  * @param id message ID (unused)
2065  * @param msg ack to transmit
2066  */
2067 static void
2068 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
2069 {
2070   struct DefragContext *rc = cls;
2071   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2072   struct UDP_ACK_Message *udp_ack;
2073   uint32_t delay = 0;
2074   struct UDP_MessageWrapper *udpw;
2075   struct Session *s;
2076   struct LookupContext l_ctx;
2077
2078   l_ctx.address = rc->src_addr;
2079   l_ctx.addr_len = rc->addr_len;
2080   l_ctx.must_have_frag_ctx = GNUNET_NO;
2081   l_ctx.res = NULL;
2082   GNUNET_CONTAINER_multipeermap_iterate (rc->plugin->sessions,
2083       &lookup_session_by_sockaddr_it, &l_ctx);
2084   s = l_ctx.res;
2085   if (NULL == s)
2086   {
2087     GNUNET_break (0);
2088     return;
2089   }
2090   if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
2091     delay = s->flow_delay_for_other_peer.rel_value_us;
2092
2093   LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %s\n",
2094       GNUNET_a2s (rc->src_addr, (rc->src_addr->sa_family == AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct sockaddr_in6)),
2095       GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer, GNUNET_YES));
2096   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2097   udpw->msg_size = msize;
2098   udpw->payload_size = 0;
2099   udpw->session = s;
2100   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2101   udpw->msg_buf = (char *) &udpw[1];
2102   udpw->msg_type = MSG_ACK;
2103   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2104   udp_ack->header.size = htons ((uint16_t) msize);
2105   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2106   udp_ack->delay = htonl (delay);
2107   udp_ack->sender = *rc->plugin->env->my_identity;
2108   memcpy (&udp_ack[1], msg, ntohs (msg->size));
2109   enqueue (rc->plugin, udpw);
2110   schedule_select (rc->plugin);
2111 }
2112
2113 static void
2114 read_process_msg (struct Plugin *plugin, const struct GNUNET_MessageHeader *msg,
2115     const struct sockaddr *addr, socklen_t fromlen)
2116 {
2117   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2118   {
2119     GNUNET_break_op(0);
2120     return;
2121   }
2122   process_udp_message (plugin, (const struct UDPMessage *) msg, addr, fromlen);
2123 }
2124
2125 static void
2126 read_process_ack (struct Plugin *plugin, const struct GNUNET_MessageHeader *msg,
2127     const struct sockaddr *addr, socklen_t fromlen)
2128 {
2129   const struct GNUNET_MessageHeader *ack;
2130   const struct UDP_ACK_Message *udp_ack;
2131   struct LookupContext l_ctx;
2132   struct Session *s;
2133   struct GNUNET_TIME_Relative flow_delay;
2134
2135   if (ntohs (msg->size)
2136       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2137   {
2138     GNUNET_break_op(0);
2139     return;
2140   }
2141   udp_ack = (const struct UDP_ACK_Message *) msg;
2142
2143   /* Lookup session based on sockaddr */
2144   l_ctx.address = addr;
2145   l_ctx.addr_len = fromlen;
2146   l_ctx.res = NULL;
2147   l_ctx.must_have_frag_ctx = GNUNET_YES;
2148   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
2149       &lookup_session_by_sockaddr_it, &l_ctx);
2150   s = l_ctx.res;
2151   if ((NULL == s) || (NULL == s->frag_ctx))
2152   {
2153     return;
2154   }
2155
2156   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2157   LOG(GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %s\n",
2158       GNUNET_STRINGS_relative_time_to_string (flow_delay, GNUNET_YES));
2159   s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
2160
2161   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2162   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2163   {
2164     GNUNET_break_op(0);
2165     return;
2166   }
2167
2168   if (0
2169       != memcmp (&l_ctx.res->target, &udp_ack->sender,
2170           sizeof(struct GNUNET_PeerIdentity)))
2171     GNUNET_break(0);
2172   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
2173   {
2174     LOG(GNUNET_ERROR_TYPE_DEBUG,
2175         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2176         (unsigned int ) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2177         GNUNET_a2s (addr, fromlen));
2178     /* Expect more ACKs to arrive */
2179     return;
2180   }
2181
2182   LOG(GNUNET_ERROR_TYPE_DEBUG, "Message full ACK'ed\n",
2183       (unsigned int ) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2184       GNUNET_a2s (addr, fromlen));
2185
2186   /* Remove fragmented message after successful sending */
2187   fragmented_message_done (s->frag_ctx, GNUNET_OK);
2188 }
2189
2190 static void
2191 read_process_fragment (struct Plugin *plugin,
2192     const struct GNUNET_MessageHeader *msg, const struct sockaddr *addr,
2193     socklen_t fromlen)
2194 {
2195   struct DefragContext *d_ctx;
2196   struct GNUNET_TIME_Absolute now;
2197   struct FindReceiveContext frc;
2198
2199   frc.rc = NULL;
2200   frc.addr = addr;
2201   frc.addr_len = fromlen;
2202
2203   LOG(GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
2204       (unsigned int ) ntohs (msg->size), GNUNET_a2s (addr, fromlen));
2205   /* Lookup existing receive context for this address */
2206   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2207       &find_receive_context, &frc);
2208   now = GNUNET_TIME_absolute_get ();
2209   d_ctx = frc.rc;
2210
2211   if (d_ctx == NULL )
2212   {
2213     /* Create a new defragmentation context */
2214     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
2215     memcpy (&d_ctx[1], addr, fromlen);
2216     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
2217     d_ctx->addr_len = fromlen;
2218     d_ctx->plugin = plugin;
2219     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2220         UDP_MTU, UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx, &fragment_msg_proc,
2221         &ack_proc);
2222     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
2223         (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2224     LOG(GNUNET_ERROR_TYPE_DEBUG,
2225         "Created new defragmentation context for %u-byte fragment from `%s'\n",
2226         (unsigned int ) ntohs (msg->size), GNUNET_a2s (addr, fromlen));
2227   }
2228   else
2229   {
2230     LOG(GNUNET_ERROR_TYPE_DEBUG,
2231         "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2232         (unsigned int ) ntohs (msg->size), GNUNET_a2s (addr, fromlen));
2233   }
2234
2235   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2236   {
2237     /* keep this 'rc' from expiring */
2238     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
2239         (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2240   }
2241   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2242   UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2243   {
2244     /* remove 'rc' that was inactive the longest */
2245     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2246     GNUNET_assert(NULL != d_ctx);
2247     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2248     GNUNET_free(d_ctx);
2249   }
2250 }
2251
2252 /**
2253  * Read and process a message from the given socket.
2254  *
2255  * @param plugin the overall plugin
2256  * @param rsock socket to read from
2257  */
2258 static void
2259 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
2260 {
2261   socklen_t fromlen;
2262   struct sockaddr_storage addr;
2263   char buf[65536] GNUNET_ALIGN;
2264   ssize_t size;
2265   const struct GNUNET_MessageHeader *msg;
2266
2267   fromlen = sizeof(addr);
2268   memset (&addr, 0, sizeof(addr));
2269   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof(buf),
2270       (struct sockaddr *) &addr, &fromlen);
2271 #if MINGW
2272   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2273    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
2274    * on this socket has failed.
2275    * Quote from MSDN:
2276    *   WSAECONNRESET - The virtual circuit was reset by the remote side
2277    *   executing a hard or abortive close. The application should close
2278    *   the socket; it is no longer usable. On a UDP-datagram socket this
2279    *   error indicates a previous send operation resulted in an ICMP Port
2280    *   Unreachable message.
2281    */
2282   if ( (-1 == size) && (ECONNRESET == errno) )
2283   return;
2284 #endif
2285   if (-1 == size)
2286   {
2287     LOG(GNUNET_ERROR_TYPE_DEBUG, "UDP failed to receive data: %s\n",
2288         STRERROR (errno));
2289     /* Connection failure or something. Not a protocol violation. */
2290     return;
2291   }
2292   if (size < sizeof(struct GNUNET_MessageHeader))
2293   {
2294     LOG(GNUNET_ERROR_TYPE_WARNING,
2295         "UDP got %u bytes, which is not enough for a GNUnet message header\n",
2296         (unsigned int ) size);
2297     /* _MAY_ be a connection failure (got partial message) */
2298     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2299     GNUNET_break_op(0);
2300     return;
2301   }
2302   msg = (const struct GNUNET_MessageHeader *) buf;
2303
2304   LOG(GNUNET_ERROR_TYPE_DEBUG,
2305       "UDP received %u-byte message from `%s' type %u\n", (unsigned int ) size,
2306       GNUNET_a2s ((const struct sockaddr * ) &addr, fromlen),
2307       ntohs (msg->type));
2308
2309   if (size != ntohs (msg->size))
2310   {
2311     GNUNET_break_op(0);
2312     return;
2313   }
2314
2315   GNUNET_STATISTICS_update (plugin->env->stats, "# UDP, total, bytes, received",
2316       size, GNUNET_NO);
2317
2318
2319
2320
2321   switch (ntohs (msg->type))
2322   {
2323   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2324     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
2325       udp_broadcast_receive (plugin, buf, size, (const struct sockaddr *) &addr,
2326           fromlen);
2327     return;
2328   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2329     read_process_msg (plugin, msg, (const struct sockaddr *) &addr, fromlen);
2330     return;
2331   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2332     read_process_ack (plugin, msg, (const struct sockaddr *) &addr, fromlen);
2333     return;
2334   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2335     read_process_fragment (plugin, msg, (const struct sockaddr *) &addr,
2336         fromlen);
2337     return;
2338   default:
2339     GNUNET_break_op(0);
2340     return;
2341   }
2342 }
2343
2344 static struct UDP_MessageWrapper *
2345 remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
2346     struct GNUNET_NETWORK_Handle *sock)
2347 {
2348   struct UDP_MessageWrapper *udpw = NULL;
2349   struct GNUNET_TIME_Relative remaining;
2350
2351   udpw = head;
2352   while (udpw != NULL )
2353   {
2354     /* Find messages with timeout */
2355     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2356     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2357     {
2358       /* Message timed out */
2359       switch (udpw->msg_type)
2360       {
2361       case MSG_UNFRAGMENTED:
2362         GNUNET_STATISTICS_update (plugin->env->stats,
2363             "# UDP, total, bytes, sent, timeout", udpw->msg_size, GNUNET_NO);
2364         GNUNET_STATISTICS_update (plugin->env->stats,
2365             "# UDP, total, messages, sent, timeout", 1, GNUNET_NO);
2366         GNUNET_STATISTICS_update (plugin->env->stats,
2367             "# UDP, unfragmented msgs, messages, sent, timeout", 1, GNUNET_NO);
2368         GNUNET_STATISTICS_update (plugin->env->stats,
2369             "# UDP, unfragmented msgs, bytes, sent, timeout",
2370             udpw->payload_size, GNUNET_NO);
2371         /* Not fragmented message */
2372         LOG(GNUNET_ERROR_TYPE_DEBUG,
2373             "Message for peer `%s' with size %u timed out\n",
2374             GNUNET_i2s (&udpw->session->target), udpw->payload_size);
2375         call_continuation (udpw, GNUNET_SYSERR);
2376         /* Remove message */
2377         dequeue (plugin, udpw);
2378         GNUNET_free(udpw);
2379         break;
2380       case MSG_FRAGMENTED:
2381         /* Fragmented message */
2382         GNUNET_STATISTICS_update (plugin->env->stats,
2383             "# UDP, total, bytes, sent, timeout", udpw->frag_ctx->on_wire_size,
2384             GNUNET_NO);
2385         GNUNET_STATISTICS_update (plugin->env->stats,
2386             "# UDP, total, messages, sent, timeout", 1, GNUNET_NO);
2387         call_continuation (udpw, GNUNET_SYSERR);
2388         LOG(GNUNET_ERROR_TYPE_DEBUG,
2389             "Fragment for message for peer `%s' with size %u timed out\n",
2390             GNUNET_i2s (&udpw->session->target), udpw->frag_ctx->payload_size);
2391
2392         GNUNET_STATISTICS_update (plugin->env->stats,
2393             "# UDP, fragmented msgs, messages, sent, timeout", 1, GNUNET_NO);
2394         GNUNET_STATISTICS_update (plugin->env->stats,
2395             "# UDP, fragmented msgs, bytes, sent, timeout",
2396             udpw->frag_ctx->payload_size, GNUNET_NO);
2397         /* Remove fragmented message due to timeout */
2398         fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
2399         break;
2400       case MSG_ACK:
2401         GNUNET_STATISTICS_update (plugin->env->stats,
2402             "# UDP, total, bytes, sent, timeout", udpw->msg_size, GNUNET_NO);
2403         GNUNET_STATISTICS_update (plugin->env->stats,
2404             "# UDP, total, messages, sent, timeout", 1, GNUNET_NO);
2405         LOG(GNUNET_ERROR_TYPE_DEBUG,
2406             "ACK Message for peer `%s' with size %u timed out\n",
2407             GNUNET_i2s (&udpw->session->target), udpw->payload_size);
2408         call_continuation (udpw, GNUNET_SYSERR);
2409         dequeue (plugin, udpw);
2410         GNUNET_free(udpw);
2411         break;
2412       default:
2413         break;
2414       }
2415       if (sock == plugin->sockv4)
2416         udpw = plugin->ipv4_queue_head;
2417       else if (sock == plugin->sockv6)
2418         udpw = plugin->ipv6_queue_head;
2419       else
2420       {
2421         GNUNET_break(0); /* should never happen */
2422         udpw = NULL;
2423       }
2424       GNUNET_STATISTICS_update (plugin->env->stats,
2425           "# messages dismissed due to timeout", 1, GNUNET_NO);
2426     }
2427     else
2428     {
2429       /* Message did not time out, check flow delay */
2430       remaining = GNUNET_TIME_absolute_get_remaining (
2431           udpw->session->flow_delay_from_other_peer);
2432       if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2433       {
2434         /* this message is not delayed */
2435         LOG(GNUNET_ERROR_TYPE_DEBUG,
2436             "Message for peer `%s' (%u bytes) is not delayed \n",
2437             GNUNET_i2s (&udpw->session->target), udpw->payload_size);
2438         break; /* Found message to send, break */
2439       }
2440       else
2441       {
2442         /* Message is delayed, try next */
2443         LOG(GNUNET_ERROR_TYPE_DEBUG,
2444             "Message for peer `%s' (%u bytes) is delayed for %s\n",
2445             GNUNET_i2s (&udpw->session->target), udpw->payload_size,
2446             GNUNET_STRINGS_relative_time_to_string (remaining, GNUNET_YES));
2447         udpw = udpw->next;
2448       }
2449     }
2450   }
2451   return udpw;
2452 }
2453
2454 static void
2455 analyze_send_error (struct Plugin *plugin, const struct sockaddr * sa,
2456     socklen_t slen, int error)
2457 {
2458   static int network_down_error;
2459   struct GNUNET_ATS_Information type;
2460
2461   type = plugin->env->get_address_type (plugin->env->cls, sa, slen);
2462   if (((GNUNET_ATS_NET_LAN == ntohl (type.value))
2463       || (GNUNET_ATS_NET_WAN == ntohl (type.value)))
2464       && ((ENETUNREACH == errno)|| (ENETDOWN == errno)))
2465       {
2466         if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in)))
2467         {
2468           /* IPv4: "Network unreachable" or "Network down"
2469            *
2470            * This indicates we do not have connectivity
2471            */
2472           LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2473               _("UDP could not transmit message to `%s': "
2474                   "Network seems down, please check your network configuration\n"),
2475               GNUNET_a2s (sa, slen));
2476         }
2477         if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in6)))
2478         {
2479           /* IPv6: "Network unreachable" or "Network down"
2480            *
2481            * This indicates that this system is IPv6 enabled, but does not
2482            * have a valid global IPv6 address assigned or we do not have
2483            * connectivity
2484            */
2485
2486           LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2487               _("UDP could not transmit IPv6 message! "
2488                   "Please check your network configuration and disable IPv6 if your "
2489                   "connection does not have a global IPv6 address\n"));
2490         }
2491       }
2492       else
2493       {
2494         LOG (GNUNET_ERROR_TYPE_WARNING,
2495             "UDP could not transmit message to `%s': `%s'\n",
2496             GNUNET_a2s (sa, slen), STRERROR (error));
2497       }
2498     }
2499
2500 static size_t
2501 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
2502 {
2503   ssize_t sent;
2504   socklen_t slen;
2505   struct sockaddr *a;
2506   const struct IPv4UdpAddress *u4;
2507   struct sockaddr_in a4;
2508   const struct IPv6UdpAddress *u6;
2509   struct sockaddr_in6 a6;
2510
2511
2512   struct UDP_MessageWrapper *udpw = NULL;
2513
2514   /* Find message to send */
2515   udpw = remove_timeout_messages_and_select (
2516       (sock == plugin->sockv4) ?
2517           plugin->ipv4_queue_head : plugin->ipv6_queue_head, sock);
2518   if (NULL == udpw)
2519     return 0; /* No message to send */
2520
2521   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
2522   {
2523     u4 = udpw->session->address->address;
2524     memset (&a4, 0, sizeof(a4));
2525     a4.sin_family = AF_INET;
2526 #if HAVE_SOCKADDR_IN_SIN_LEN
2527     a4.sin_len = sizeof (a4);
2528 #endif
2529     a4.sin_port = u4->u4_port;
2530     memcpy (&a4.sin_addr, &u4->ipv4_addr, sizeof(struct in_addr));
2531     a = (struct sockaddr *) &a4;
2532     slen = sizeof (a4);
2533   }
2534   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
2535   {
2536     u6 = udpw->session->address->address;
2537     memset (&a6, 0, sizeof(a6));
2538     a6.sin6_family = AF_INET6;
2539 #if HAVE_SOCKADDR_IN_SIN_LEN
2540     a6.sin6_len = sizeof (a6);
2541 #endif
2542     a6.sin6_port = u6->u6_port;
2543     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
2544     a = (struct sockaddr *) &a6;
2545     slen = sizeof (a6);
2546   }
2547   else
2548   {
2549     GNUNET_break (0);
2550   }
2551   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->msg_buf, udpw->msg_size, a,
2552       slen);
2553
2554   if (GNUNET_SYSERR == sent)
2555   {
2556     /* Failure */
2557     analyze_send_error (plugin, a, slen, errno);
2558     call_continuation (udpw, GNUNET_SYSERR);
2559     GNUNET_STATISTICS_update (plugin->env->stats,
2560         "# UDP, total, bytes, sent, failure", sent, GNUNET_NO);
2561     GNUNET_STATISTICS_update (plugin->env->stats,
2562         "# UDP, total, messages, sent, failure", 1, GNUNET_NO);
2563   }
2564   else
2565   {
2566     /* Success */
2567     LOG(GNUNET_ERROR_TYPE_DEBUG,
2568         "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
2569         (unsigned int ) (udpw->msg_size), GNUNET_i2s (&udpw->session->target),
2570         GNUNET_a2s (a, slen), (int ) sent,
2571         (sent < 0) ? STRERROR (errno) : "ok");
2572     GNUNET_STATISTICS_update (plugin->env->stats,
2573         "# UDP, total, bytes, sent, success", sent, GNUNET_NO);
2574     GNUNET_STATISTICS_update (plugin->env->stats,
2575         "# UDP, total, messages, sent, success", 1, GNUNET_NO);
2576     if (NULL != udpw->frag_ctx)
2577       udpw->frag_ctx->on_wire_size += udpw->msg_size;
2578     call_continuation (udpw, GNUNET_OK);
2579   }
2580   dequeue (plugin, udpw);
2581   GNUNET_free(udpw);
2582   udpw = NULL;
2583
2584   return sent;
2585 }
2586
2587 /**
2588  * We have been notified that our readset has something to read.  We don't
2589  * know which socket needs to be read, so we have to check each one
2590  * Then reschedule this function to be called again once more is available.
2591  *
2592  * @param cls the plugin handle
2593  * @param tc the scheduling context (for rescheduling this function again)
2594  */
2595 static void
2596 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2597 {
2598   struct Plugin *plugin = cls;
2599
2600   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2601   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2602     return;
2603   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
2604       && (NULL != plugin->sockv4)
2605       && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
2606     udp_select_read (plugin, plugin->sockv4);
2607   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
2608       && (NULL != plugin->sockv4) && (NULL != plugin->ipv4_queue_head)
2609       && (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
2610     udp_select_send (plugin, plugin->sockv4);
2611   schedule_select (plugin);
2612 }
2613
2614 /**
2615  * We have been notified that our readset has something to read.  We don't
2616  * know which socket needs to be read, so we have to check each one
2617  * Then reschedule this function to be called again once more is available.
2618  *
2619  * @param cls the plugin handle
2620  * @param tc the scheduling context (for rescheduling this function again)
2621  */
2622 static void
2623 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2624 {
2625   struct Plugin *plugin = cls;
2626
2627   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2628   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2629     return;
2630   if (((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
2631       && (NULL != plugin->sockv6)
2632       && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
2633     udp_select_read (plugin, plugin->sockv6);
2634   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
2635       && (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL )&&
2636       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )udp_select_send (plugin, plugin->sockv6);
2637   schedule_select (plugin);
2638 }
2639
2640 /**
2641  *
2642  * @return number of sockets that were successfully bound
2643  */
2644 static int
2645 setup_sockets (struct Plugin *plugin, const struct sockaddr_in6 *bind_v6,
2646     const struct sockaddr_in *bind_v4)
2647 {
2648   int tries;
2649   int sockets_created = 0;
2650   struct sockaddr_in6 server_addrv6;
2651   struct sockaddr_in server_addrv4;
2652   struct sockaddr *server_addr;
2653   struct sockaddr *addrs[2];
2654   socklen_t addrlens[2];
2655   socklen_t addrlen;
2656   int eno;
2657
2658   /* Create IPv6 socket */
2659   eno = EINVAL;
2660   if (plugin->enable_ipv6 == GNUNET_YES)
2661   {
2662     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2663     if (NULL == plugin->sockv6)
2664     {
2665       LOG(GNUNET_ERROR_TYPE_WARNING,
2666           "Disabling IPv6 since it is not supported on this system!\n");
2667       plugin->enable_ipv6 = GNUNET_NO;
2668     }
2669     else
2670     {
2671       memset (&server_addrv6, '\0', sizeof(struct sockaddr_in6));
2672 #if HAVE_SOCKADDR_IN_SIN_LEN
2673       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
2674 #endif
2675       server_addrv6.sin6_family = AF_INET6;
2676       if (NULL != bind_v6)
2677         server_addrv6.sin6_addr = bind_v6->sin6_addr;
2678       else
2679         server_addrv6.sin6_addr = in6addr_any;
2680
2681       if (0 == plugin->port) /* autodetect */
2682         server_addrv6.sin6_port = htons (
2683             GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
2684                 + 32000);
2685       else
2686         server_addrv6.sin6_port = htons (plugin->port);
2687       addrlen = sizeof(struct sockaddr_in6);
2688       server_addr = (struct sockaddr *) &server_addrv6;
2689
2690       tries = 0;
2691       while (tries < 10)
2692       {
2693         LOG(GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 `%s'\n",
2694             GNUNET_a2s (server_addr, addrlen));
2695         /* binding */
2696         if (GNUNET_OK
2697             == GNUNET_NETWORK_socket_bind (plugin->sockv6, server_addr,
2698                 addrlen))
2699           break;
2700         eno = errno;
2701         if (0 != plugin->port)
2702         {
2703           tries = 10; /* fail */
2704           break; /* bind failed on specific port */
2705         }
2706         /* autodetect */
2707         server_addrv6.sin6_port = htons (
2708             GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
2709                 + 32000);
2710         tries++;
2711       }
2712       if (tries >= 10)
2713       {
2714         GNUNET_NETWORK_socket_close (plugin->sockv6);
2715         plugin->enable_ipv6 = GNUNET_NO;
2716         plugin->sockv6 = NULL;
2717       }
2718
2719       if (plugin->sockv6 != NULL )
2720       {
2721         LOG(GNUNET_ERROR_TYPE_DEBUG, "IPv6 socket created on port %s\n",
2722             GNUNET_a2s (server_addr, addrlen));
2723         addrs[sockets_created] = (struct sockaddr *) &server_addrv6;
2724         addrlens[sockets_created] = sizeof(struct sockaddr_in6);
2725         sockets_created++;
2726       }
2727       else
2728       {
2729         LOG(GNUNET_ERROR_TYPE_ERROR, "Failed to bind UDP socket to %s: %s\n",
2730             GNUNET_a2s (server_addr, addrlen), STRERROR (eno));
2731       }
2732     }
2733   }
2734
2735   /* Create IPv4 socket */
2736   eno = EINVAL;
2737   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2738   if (NULL == plugin->sockv4)
2739   {
2740     GNUNET_log_strerror(GNUNET_ERROR_TYPE_WARNING, "socket");
2741     LOG(GNUNET_ERROR_TYPE_WARNING,
2742         "Disabling IPv4 since it is not supported on this system!\n");
2743     plugin->enable_ipv4 = GNUNET_NO;
2744   }
2745   else
2746   {
2747     memset (&server_addrv4, '\0', sizeof(struct sockaddr_in));
2748 #if HAVE_SOCKADDR_IN_SIN_LEN
2749     server_addrv4.sin_len = sizeof (struct sockaddr_in);
2750 #endif
2751     server_addrv4.sin_family = AF_INET;
2752     if (NULL != bind_v4)
2753       server_addrv4.sin_addr = bind_v4->sin_addr;
2754     else
2755       server_addrv4.sin_addr.s_addr = INADDR_ANY;
2756
2757     if (0 == plugin->port)
2758       /* autodetect */
2759       server_addrv4.sin_port = htons (
2760           GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
2761               + 32000);
2762     else
2763       server_addrv4.sin_port = htons (plugin->port);
2764
2765     addrlen = sizeof(struct sockaddr_in);
2766     server_addr = (struct sockaddr *) &server_addrv4;
2767
2768     tries = 0;
2769     while (tries < 10)
2770     {
2771       LOG(GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 `%s'\n",
2772           GNUNET_a2s (server_addr, addrlen));
2773
2774       /* binding */
2775       if (GNUNET_OK
2776           == GNUNET_NETWORK_socket_bind (plugin->sockv4, server_addr, addrlen))
2777         break;
2778       eno = errno;
2779       if (0 != plugin->port)
2780       {
2781         tries = 10; /* fail */
2782         break; /* bind failed on specific port */
2783       }
2784
2785       /* autodetect */
2786       server_addrv4.sin_port = htons (
2787           GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
2788               + 32000);
2789       tries++;
2790     }
2791
2792     if (tries >= 10)
2793     {
2794       GNUNET_NETWORK_socket_close (plugin->sockv4);
2795       plugin->enable_ipv4 = GNUNET_NO;
2796       plugin->sockv4 = NULL;
2797     }
2798
2799     if (plugin->sockv4 != NULL )
2800     {
2801       LOG(GNUNET_ERROR_TYPE_DEBUG, "IPv4 socket created on port %s\n",
2802           GNUNET_a2s (server_addr, addrlen));
2803       addrs[sockets_created] = (struct sockaddr *) &server_addrv4;
2804       addrlens[sockets_created] = sizeof(struct sockaddr_in);
2805       sockets_created++;
2806     }
2807     else
2808     {
2809       LOG(GNUNET_ERROR_TYPE_ERROR, "Failed to bind UDP socket to %s: %s\n",
2810           GNUNET_a2s (server_addr, addrlen), STRERROR (eno));
2811     }
2812   }
2813
2814   if (0 == sockets_created)
2815   {
2816     LOG(GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2817     return 0; /* No sockets created, return */
2818   }
2819
2820   /* Create file descriptors */
2821   if (plugin->enable_ipv4 == GNUNET_YES)
2822   {
2823     plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2824     plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2825     GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2826     GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2827     if (NULL != plugin->sockv4)
2828     {
2829       GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2830       GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2831     }
2832   }
2833
2834   if (plugin->enable_ipv6 == GNUNET_YES)
2835   {
2836     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2837     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2838     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2839     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2840     if (NULL != plugin->sockv6)
2841     {
2842       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2843       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2844     }
2845   }
2846
2847   schedule_select (plugin);
2848   plugin->nat = GNUNET_NAT_register (plugin->env->cfg, GNUNET_NO, plugin->port,
2849       sockets_created, (const struct sockaddr **) addrs, addrlens,
2850       &udp_nat_port_map_callback, NULL, plugin);
2851
2852   return sockets_created;
2853 }
2854
2855 /**
2856  * The exported method. Makes the core api available via a global and
2857  * returns the udp transport API.
2858  *
2859  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2860  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2861  */
2862 void *
2863 libgnunet_plugin_transport_udp_init (void *cls)
2864 {
2865   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2866   struct GNUNET_TRANSPORT_PluginFunctions *api;
2867   struct Plugin *p;
2868   unsigned long long port;
2869   unsigned long long aport;
2870   unsigned long long udp_max_bps;
2871   unsigned long long enable_v6;
2872   unsigned long long enable_broadcasting;
2873   unsigned long long enable_broadcasting_recv;
2874   char * bind4_address;
2875   char * bind6_address;
2876   char * fancy_interval;
2877   struct GNUNET_TIME_Relative interval;
2878   struct sockaddr_in server_addrv4;
2879   struct sockaddr_in6 server_addrv6;
2880   int res;
2881   int have_bind4;
2882   int have_bind6;
2883
2884   if (NULL == env->receive)
2885   {
2886     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2887      initialze the plugin or the API */
2888     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
2889     api->cls = NULL;
2890     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2891     api->address_to_string = &udp_address_to_string;
2892     api->string_to_address = &udp_string_to_address;
2893     return api;
2894   }
2895
2896   GNUNET_assert(NULL != env->stats);
2897
2898   /* Get port number: port == 0 : autodetect a port,
2899    * > 0 : use this port, not given : 2086 default */
2900   if (GNUNET_OK
2901       != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2902           "PORT", &port))
2903     port = 2086;
2904   if (GNUNET_OK
2905       != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2906           "ADVERTISED_PORT", &aport))
2907     aport = port;
2908   if (port > 65535)
2909   {
2910     LOG(GNUNET_ERROR_TYPE_WARNING,
2911         _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2912         65535);
2913     return NULL ;
2914   }
2915
2916   /* Protocols */
2917   if ((GNUNET_YES
2918       == GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat", "DISABLEV6")))
2919     enable_v6 = GNUNET_NO;
2920   else
2921     enable_v6 = GNUNET_YES;
2922
2923   /* Addresses */
2924   have_bind4 = GNUNET_NO;
2925   memset (&server_addrv4, 0, sizeof(server_addrv4));
2926   if (GNUNET_YES
2927       == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2928           "BINDTO", &bind4_address))
2929   {
2930     LOG(GNUNET_ERROR_TYPE_DEBUG,
2931         "Binding udp plugin to specific address: `%s'\n", bind4_address);
2932     if (1 != inet_pton (AF_INET, bind4_address, &server_addrv4.sin_addr))
2933     {
2934       GNUNET_free(bind4_address);
2935       return NULL ;
2936     }
2937     have_bind4 = GNUNET_YES;
2938   }
2939   GNUNET_free_non_null(bind4_address);
2940   have_bind6 = GNUNET_NO;
2941   memset (&server_addrv6, 0, sizeof(server_addrv6));
2942   if (GNUNET_YES
2943       == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2944           "BINDTO6", &bind6_address))
2945   {
2946     LOG(GNUNET_ERROR_TYPE_DEBUG,
2947         "Binding udp plugin to specific address: `%s'\n", bind6_address);
2948     if (1 != inet_pton (AF_INET6, bind6_address, &server_addrv6.sin6_addr))
2949     {
2950       LOG(GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2951           bind6_address);
2952       GNUNET_free(bind6_address);
2953       return NULL ;
2954     }
2955     have_bind6 = GNUNET_YES;
2956   }
2957   GNUNET_free_non_null(bind6_address);
2958
2959   /* Initialize my flags */
2960   myoptions = 0;
2961
2962   /* Enable neighbour discovery */
2963   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
2964       "transport-udp", "BROADCAST");
2965   if (enable_broadcasting == GNUNET_SYSERR)
2966     enable_broadcasting = GNUNET_NO;
2967
2968   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
2969       "transport-udp", "BROADCAST_RECEIVE");
2970   if (enable_broadcasting_recv == GNUNET_SYSERR)
2971     enable_broadcasting_recv = GNUNET_YES;
2972
2973   if (GNUNET_SYSERR
2974       == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2975           "BROADCAST_INTERVAL", &fancy_interval))
2976   {
2977     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2978   }
2979   else
2980   {
2981     if (GNUNET_SYSERR
2982         == GNUNET_STRINGS_fancy_time_to_relative (fancy_interval, &interval))
2983     {
2984       interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
2985     }
2986     GNUNET_free(fancy_interval);
2987   }
2988
2989   /* Maximum datarate */
2990   if (GNUNET_OK
2991       != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2992           "MAX_BPS", &udp_max_bps))
2993   {
2994     udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */
2995   }
2996
2997   p = GNUNET_new (struct Plugin);
2998   p->port = port;
2999   p->aport = aport;
3000   p->broadcast_interval = interval;
3001   p->enable_ipv6 = enable_v6;
3002   p->enable_ipv4 = GNUNET_YES; /* default */
3003   p->enable_broadcasting = enable_broadcasting;
3004   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3005   p->env = env;
3006   p->sessions = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
3007   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (
3008       GNUNET_CONTAINER_HEAP_ORDER_MIN);
3009   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
3010   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3011       GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps), 30);
3012   plugin = p;
3013
3014   LOG(GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
3015   res = setup_sockets (p, (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3016       (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL );
3017   if ((res == 0) || ((p->sockv4 == NULL )&& (p->sockv6 == NULL)))
3018   {
3019     LOG (GNUNET_ERROR_TYPE_ERROR,
3020         _("Failed to create network sockets, plugin failed\n"));
3021     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3022     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3023     GNUNET_SERVER_mst_destroy (p->mst);
3024     GNUNET_free (p);
3025     return NULL;
3026   }
3027
3028   /* Setup broadcasting and receiving beacons */
3029   setup_broadcast (p, &server_addrv6, &server_addrv4);
3030
3031   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3032   api->cls = p;
3033   api->send = NULL;
3034   api->disconnect_session = &udp_disconnect_session;
3035   api->query_keepalive_factor = &udp_query_keepalive_factor;
3036   api->disconnect_peer = &udp_disconnect;
3037   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3038   api->address_to_string = &udp_address_to_string;
3039   api->string_to_address = &udp_string_to_address;
3040   api->check_address = &udp_plugin_check_address;
3041   api->get_session = &udp_plugin_get_session;
3042   api->send = &udp_plugin_send;
3043   api->get_network = &udp_get_network;
3044   api->update_session_timeout = &udp_plugin_update_session_timeout;
3045   return api;
3046 }
3047
3048 static int
3049 heap_cleanup_iterator (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
3050     void *element, GNUNET_CONTAINER_HeapCostType cost)
3051 {
3052   struct DefragContext * d_ctx = element;
3053
3054   GNUNET_CONTAINER_heap_remove_node (node);
3055   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3056   GNUNET_free(d_ctx);
3057
3058   return GNUNET_YES;
3059 }
3060
3061 /**
3062  * The exported method. Makes the core api available via a global and
3063  * returns the udp transport API.
3064  *
3065  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
3066  * @return NULL
3067  */
3068 void *
3069 libgnunet_plugin_transport_udp_done (void *cls)
3070 {
3071   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3072   struct Plugin *plugin = api->cls;
3073   struct PrettyPrinterContext *cur;
3074   struct PrettyPrinterContext *next;
3075
3076   if (NULL == plugin)
3077   {
3078     GNUNET_free(api);
3079     return NULL ;
3080   }
3081
3082   stop_broadcast (plugin);
3083   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK )
3084   {
3085     GNUNET_SCHEDULER_cancel (plugin->select_task);
3086     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
3087   }
3088   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK )
3089   {
3090     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3091     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
3092   }
3093
3094   /* Closing sockets */
3095   if (GNUNET_YES == plugin->enable_ipv4)
3096   {
3097     if (NULL != plugin->sockv4)
3098     {
3099       GNUNET_break(GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
3100       plugin->sockv4 = NULL;
3101     }
3102     GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
3103     GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
3104   }
3105   if (GNUNET_YES == plugin->enable_ipv6)
3106   {
3107     if (NULL != plugin->sockv6)
3108     {
3109       GNUNET_break(GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
3110       plugin->sockv6 = NULL;
3111
3112       GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
3113       GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
3114     }
3115   }
3116   if (NULL != plugin->nat)
3117   {
3118     GNUNET_NAT_unregister (plugin->nat);
3119     plugin->nat = NULL;
3120   }
3121   if (NULL != plugin->defrag_ctxs)
3122   {
3123     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs, heap_cleanup_iterator,
3124         NULL );
3125     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
3126     plugin->defrag_ctxs = NULL;
3127   }
3128   if (plugin->mst != NULL )
3129   {
3130     GNUNET_SERVER_mst_destroy (plugin->mst);
3131     plugin->mst = NULL;
3132   }
3133
3134   /* Clean up leftover messages */
3135   struct UDP_MessageWrapper * udpw;
3136   udpw = plugin->ipv4_queue_head;
3137   while (udpw != NULL )
3138   {
3139     struct UDP_MessageWrapper *tmp = udpw->next;
3140     dequeue (plugin, udpw);
3141     call_continuation (udpw, GNUNET_SYSERR);
3142     GNUNET_free(udpw);
3143
3144     udpw = tmp;
3145   }
3146   udpw = plugin->ipv6_queue_head;
3147   while (udpw != NULL )
3148   {
3149     struct UDP_MessageWrapper *tmp = udpw->next;
3150     dequeue (plugin, udpw);
3151     call_continuation (udpw, GNUNET_SYSERR);
3152     GNUNET_free(udpw);
3153
3154     udpw = tmp;
3155   }
3156
3157   /* Clean up sessions */
3158   LOG(GNUNET_ERROR_TYPE_DEBUG, "Cleaning up sessions\n");
3159   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3160       &disconnect_and_free_it, plugin);
3161   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
3162
3163   next = ppc_dll_head;
3164   for (cur = next; NULL != cur; cur = next)
3165   {
3166     next = cur->next;
3167     GNUNET_CONTAINER_DLL_remove(ppc_dll_head, ppc_dll_tail, cur);
3168     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
3169     GNUNET_SCHEDULER_cancel (cur->timeout_task);
3170     GNUNET_free(cur);
3171     GNUNET_break(0);
3172   }
3173
3174   plugin->nat = NULL;
3175   GNUNET_free(plugin);
3176   GNUNET_free(api);
3177 #if DEBUG_MALLOC
3178   struct Allocation *allocation;
3179   while (NULL != ahead)
3180   {
3181     allocation = ahead;
3182     GNUNET_CONTAINER_DLL_remove (ahead, atail, allocation);
3183     GNUNET_free (allocation);
3184   }
3185   struct Allocator *allocator;
3186   while (NULL != aehead)
3187   {
3188     allocator = aehead;
3189     GNUNET_CONTAINER_DLL_remove (aehead, aetail, allocator);
3190     GNUNET_free (allocator);
3191   }
3192 #endif
3193   return NULL ;
3194 }
3195
3196 /* end of plugin_transport_udp.c */