- improved nat handling
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88 struct Session
89 {
90   /**
91    * Which peer is this session for?
92    */
93   struct GNUNET_PeerIdentity target;
94
95   /**
96    * Address of the other peer
97    */
98   const struct sockaddr *sock_addr;
99
100   size_t addrlen;
101
102   /**
103    * Desired delay for next sending we send to other peer
104    */
105   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
106
107   /**
108    * Desired delay for next sending we received from other peer
109    */
110   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
111
112   /**
113    * expected delay for ACKs
114    */
115   struct GNUNET_TIME_Relative last_expected_delay;
116
117
118   struct GNUNET_ATS_Information ats;
119
120   struct FragmentationContext * frag_ctx;
121 };
122
123
124 struct SessionCompareContext
125 {
126   struct Session *res;
127   const struct GNUNET_HELLO_Address *addr;
128 };
129
130
131 /**
132  * Closure for 'process_inbound_tokenized_messages'
133  */
134 struct SourceInformation
135 {
136   /**
137    * Sender identity.
138    */
139   struct GNUNET_PeerIdentity sender;
140
141   /**
142    * Source address.
143    */
144   const void *arg;
145
146   /**
147    * Number of bytes in source address.
148    */
149   size_t args;
150
151   struct Session *session;
152 };
153
154
155 /**
156  * Closure for 'find_receive_context'.
157  */
158 struct FindReceiveContext
159 {
160   /**
161    * Where to store the result.
162    */
163   struct DefragContext *rc;
164
165   /**
166    * Address to find.
167    */
168   const struct sockaddr *addr;
169
170   /**
171    * Number of bytes in 'addr'.
172    */
173   socklen_t addr_len;
174
175   struct Session *session;
176 };
177
178
179
180 /**
181  * Data structure to track defragmentation contexts based
182  * on the source of the UDP traffic.
183  */
184 struct DefragContext
185 {
186
187   /**
188    * Defragmentation context.
189    */
190   struct GNUNET_DEFRAGMENT_Context *defrag;
191
192   /**
193    * Source address this receive context is for (allocated at the
194    * end of the struct).
195    */
196   const struct sockaddr *src_addr;
197
198   /**
199    * Reference to master plugin struct.
200    */
201   struct Plugin *plugin;
202
203   /**
204    * Node in the defrag heap.
205    */
206   struct GNUNET_CONTAINER_HeapNode *hnode;
207
208   /**
209    * Length of 'src_addr'
210    */
211   size_t addr_len;
212 };
213
214
215
216 /**
217  * Closure for 'process_inbound_tokenized_messages'
218  */
219 struct FragmentationContext
220 {
221   struct FragmentationContext * next;
222   struct FragmentationContext * prev;
223
224   struct Plugin * plugin;
225   struct GNUNET_FRAGMENT_Context * frag;
226   struct Session * session;
227
228   struct GNUNET_TIME_Absolute timeout;
229
230
231   /**
232    * Function to call upon completion of the transmission.
233    */
234   GNUNET_TRANSPORT_TransmitContinuation cont;
235
236   /**
237    * Closure for 'cont'.
238    */
239   void *cont_cls;
240
241   size_t bytes_to_send;
242 };
243
244
245 struct UDPMessageWrapper
246 {
247   struct Session *session;
248   struct UDPMessageWrapper *prev;
249   struct UDPMessageWrapper *next;
250   char *udp;
251   size_t msg_size;
252
253   struct GNUNET_TIME_Absolute timeout;
254
255   /**
256    * Function to call upon completion of the transmission.
257    */
258   GNUNET_TRANSPORT_TransmitContinuation cont;
259
260   /**
261    * Closure for 'cont'.
262    */
263   void *cont_cls;
264
265   struct FragmentationContext *frag_ctx;
266
267 };
268
269
270 /**
271  * UDP ACK Message-Packet header (after defragmentation).
272  */
273 struct UDP_ACK_Message
274 {
275   /**
276    * Message header.
277    */
278   struct GNUNET_MessageHeader header;
279
280   /**
281    * Desired delay for flow control
282    */
283   uint32_t delay;
284
285   /**
286    * What is the identity of the sender
287    */
288   struct GNUNET_PeerIdentity sender;
289
290 };
291
292 /**
293  * We have been notified that our readset has something to read.  We don't
294  * know which socket needs to be read, so we have to check each one
295  * Then reschedule this function to be called again once more is available.
296  *
297  * @param cls the plugin handle
298  * @param tc the scheduling context (for rescheduling this function again)
299  */
300 static void
301 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
302
303 /**
304  * We have been notified that our readset has something to read.  We don't
305  * know which socket needs to be read, so we have to check each one
306  * Then reschedule this function to be called again once more is available.
307  *
308  * @param cls the plugin handle
309  * @param tc the scheduling context (for rescheduling this function again)
310  */
311 static void
312 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
313
314 /**
315  * Function called for a quick conversion of the binary address to
316  * a numeric address.  Note that the caller must not free the
317  * address and that the next call to this function is allowed
318  * to override the address again.
319  *
320  * @param cls closure
321  * @param addr binary address
322  * @param addrlen length of the address
323  * @return string representing the same address
324  */
325 const char *
326 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
327 {
328   static char rbuf[INET6_ADDRSTRLEN + 10];
329   char buf[INET6_ADDRSTRLEN];
330   const void *sb;
331   struct in_addr a4;
332   struct in6_addr a6;
333   const struct IPv4UdpAddress *t4;
334   const struct IPv6UdpAddress *t6;
335   int af;
336   uint16_t port;
337
338   if (addrlen == sizeof (struct IPv6UdpAddress))
339   {
340     t6 = addr;
341     af = AF_INET6;
342     port = ntohs (t6->u6_port);
343     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
344     sb = &a6;
345   }
346   else if (addrlen == sizeof (struct IPv4UdpAddress))
347   {
348     t4 = addr;
349     af = AF_INET;
350     port = ntohs (t4->u4_port);
351     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
352     sb = &a4;
353   }
354   else
355   {
356     GNUNET_break_op (0);
357     return NULL;
358   }
359   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
360   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
361                    buf, port);
362   return rbuf;
363 }
364
365
366 /**
367  * Append our port and forward the result.
368  *
369  * @param cls a 'struct PrettyPrinterContext'
370  * @param hostname result from DNS resolver
371  */
372 static void
373 append_port (void *cls, const char *hostname)
374 {
375   struct PrettyPrinterContext *ppc = cls;
376   char *ret;
377
378   if (hostname == NULL)
379   {
380     ppc->asc (ppc->asc_cls, NULL);
381     GNUNET_free (ppc);
382     return;
383   }
384   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
385   ppc->asc (ppc->asc_cls, ret);
386   GNUNET_free (ret);
387 }
388
389
390 /**
391  * Convert the transports address to a nice, human-readable
392  * format.
393  *
394  * @param cls closure
395  * @param type name of the transport that generated the address
396  * @param addr one of the addresses of the host, NULL for the last address
397  *        the specific address format depends on the transport
398  * @param addrlen length of the address
399  * @param numeric should (IP) addresses be displayed in numeric form?
400  * @param timeout after how long should we give up?
401  * @param asc function to call on each string
402  * @param asc_cls closure for asc
403  */
404 static void
405 udp_plugin_address_pretty_printer (void *cls, const char *type,
406                                    const void *addr, size_t addrlen,
407                                    int numeric,
408                                    struct GNUNET_TIME_Relative timeout,
409                                    GNUNET_TRANSPORT_AddressStringCallback asc,
410                                    void *asc_cls)
411 {
412   struct PrettyPrinterContext *ppc;
413   const void *sb;
414   size_t sbs;
415   struct sockaddr_in a4;
416   struct sockaddr_in6 a6;
417   const struct IPv4UdpAddress *u4;
418   const struct IPv6UdpAddress *u6;
419   uint16_t port;
420
421   if (addrlen == sizeof (struct IPv6UdpAddress))
422   {
423     u6 = addr;
424     memset (&a6, 0, sizeof (a6));
425     a6.sin6_family = AF_INET6;
426 #if HAVE_SOCKADDR_IN_SIN_LEN
427     a6.sin6_len = sizeof (a6);
428 #endif
429     a6.sin6_port = u6->u6_port;
430     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
431     port = ntohs (u6->u6_port);
432     sb = &a6;
433     sbs = sizeof (a6);
434   }
435   else if (addrlen == sizeof (struct IPv4UdpAddress))
436   {
437     u4 = addr;
438     memset (&a4, 0, sizeof (a4));
439     a4.sin_family = AF_INET;
440 #if HAVE_SOCKADDR_IN_SIN_LEN
441     a4.sin_len = sizeof (a4);
442 #endif
443     a4.sin_port = u4->u4_port;
444     a4.sin_addr.s_addr = u4->ipv4_addr;
445     port = ntohs (u4->u4_port);
446     sb = &a4;
447     sbs = sizeof (a4);
448   }
449   else
450   {
451     /* invalid address */
452     GNUNET_break_op (0);
453     asc (asc_cls, NULL);
454     return;
455   }
456   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
457   ppc->asc = asc;
458   ppc->asc_cls = asc_cls;
459   ppc->port = port;
460   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
461 }
462
463
464 /**
465  * Check if the given port is plausible (must be either our listen
466  * port or our advertised port).  If it is neither, we return
467  * GNUNET_SYSERR.
468  *
469  * @param plugin global variables
470  * @param in_port port number to check
471  * @return GNUNET_OK if port is either open_port or adv_port
472  */
473 static int
474 check_port (struct Plugin *plugin, uint16_t in_port)
475 {
476   if ((in_port == plugin->port) || (in_port == plugin->aport))
477     return GNUNET_OK;
478   return GNUNET_SYSERR;
479 }
480
481
482
483 /**
484  * Function that will be called to check if a binary address for this
485  * plugin is well-formed and corresponds to an address for THIS peer
486  * (as per our configuration).  Naturally, if absolutely necessary,
487  * plugins can be a bit conservative in their answer, but in general
488  * plugins should make sure that the address does not redirect
489  * traffic to a 3rd party that might try to man-in-the-middle our
490  * traffic.
491  *
492  * @param cls closure, should be our handle to the Plugin
493  * @param addr pointer to the address
494  * @param addrlen length of addr
495  * @return GNUNET_OK if this is a plausible address for this peer
496  *         and transport, GNUNET_SYSERR if not
497  *
498  */
499 static int
500 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
501 {
502   struct Plugin *plugin = cls;
503   struct IPv4UdpAddress *v4;
504   struct IPv6UdpAddress *v6;
505
506   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
507       (addrlen != sizeof (struct IPv6UdpAddress)))
508   {
509     GNUNET_break_op (0);
510     return GNUNET_SYSERR;
511   }
512   if (addrlen == sizeof (struct IPv4UdpAddress))
513   {
514     v4 = (struct IPv4UdpAddress *) addr;
515     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
516       return GNUNET_SYSERR;
517     if (GNUNET_OK !=
518         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
519                                  sizeof (struct in_addr)))
520       return GNUNET_SYSERR;
521   }
522   else
523   {
524     v6 = (struct IPv6UdpAddress *) addr;
525     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
526     {
527       GNUNET_break_op (0);
528       return GNUNET_SYSERR;
529     }
530     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
531       return GNUNET_SYSERR;
532     if (GNUNET_OK !=
533         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
534                                  sizeof (struct in6_addr)))
535       return GNUNET_SYSERR;
536   }
537   return GNUNET_OK;
538 }
539
540
541 /**
542  * Destroy a session, plugin is being unloaded.
543  *
544  * @param cls unused
545  * @param key hash of public key of target peer
546  * @param value a 'struct PeerSession*' to clean up
547  * @return GNUNET_OK (continue to iterate)
548  */
549 static int
550 disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
551 {
552   struct Plugin *plugin = cls;
553   struct Session *s = value;
554   struct UDPMessageWrapper *udpw;
555   struct UDPMessageWrapper *next;
556
557 #if DEBUG_UDP
558   LOG (GNUNET_ERROR_TYPE_DEBUG,
559        "Session %p to peer `%s' address ended \n",
560          s,
561          GNUNET_i2s (&s->target),
562          GNUNET_a2s (s->sock_addr, s->addrlen));
563 #endif
564
565   if (s->frag_ctx != NULL)
566   {
567     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
568     GNUNET_free (s->frag_ctx);
569     s->frag_ctx = NULL;
570   }
571
572   udpw = plugin->ipv4_queue_head;
573   while (udpw != NULL)
574   {
575     next = udpw->next;
576     if (udpw->session == s)
577     {
578       GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
579
580       if (udpw->cont != NULL)
581         udpw->cont (udpw->cont_cls, &s->target, GNUNET_SYSERR);
582       GNUNET_free (udpw);
583     }
584     udpw = next;
585   }
586
587   udpw = plugin->ipv6_queue_head;
588   while (udpw != NULL)
589   {
590     next = udpw->next;
591     if (udpw->session == s)
592     {
593       GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
594
595       if (udpw->cont != NULL)
596         udpw->cont (udpw->cont_cls, &s->target, GNUNET_SYSERR);
597       GNUNET_free (udpw);
598     }
599     udpw = next;
600   }
601
602   plugin->env->session_end (plugin->env->cls, &s->target, s);
603
604   GNUNET_assert (GNUNET_YES ==
605                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
606                                                        &s->target.hashPubKey,
607                                                        s));
608
609
610   GNUNET_free (s);
611   return GNUNET_OK;
612 }
613
614
615 /**
616  * Disconnect from a remote node.  Clean up session if we have one for this peer
617  *
618  * @param cls closure for this call (should be handle to Plugin)
619  * @param target the peeridentity of the peer to disconnect
620  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
621  */
622 static void
623 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
624 {
625   struct Plugin *plugin = cls;
626   GNUNET_assert (plugin != NULL);
627
628   GNUNET_assert (target != NULL);
629 #if DEBUG_UDP
630   LOG (GNUNET_ERROR_TYPE_DEBUG,
631        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
632 #endif
633   /* Clean up sessions */
634   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
635 }
636
637 static struct Session *
638 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
639                 const void *addr, size_t addrlen,
640                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
641 {
642   struct Session *s;
643   const struct IPv4UdpAddress *t4;
644   const struct IPv6UdpAddress *t6;
645   struct sockaddr_in *v4;
646   struct sockaddr_in6 *v6;
647   size_t len;
648
649   switch (addrlen)
650   {
651   case sizeof (struct IPv4UdpAddress):
652     if (NULL == plugin->sockv4)
653     {
654       return NULL;
655     }
656     t4 = addr;
657     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
658     len = sizeof (struct sockaddr_in);
659     v4 = (struct sockaddr_in *) &s[1];
660     v4->sin_family = AF_INET;
661 #if HAVE_SOCKADDR_IN_SIN_LEN
662     v4->sin_len = sizeof (struct sockaddr_in);
663 #endif
664     v4->sin_port = t4->u4_port;
665     v4->sin_addr.s_addr = t4->ipv4_addr;
666     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
667     break;
668   case sizeof (struct IPv6UdpAddress):
669     if (NULL == plugin->sockv6)
670     {
671       return NULL;
672     }
673     t6 = addr;
674     s =
675         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
676     len = sizeof (struct sockaddr_in6);
677     v6 = (struct sockaddr_in6 *) &s[1];
678     v6->sin6_family = AF_INET6;
679 #if HAVE_SOCKADDR_IN_SIN_LEN
680     v6->sin6_len = sizeof (struct sockaddr_in6);
681 #endif
682     v6->sin6_port = t6->u6_port;
683     v6->sin6_addr = t6->ipv6_addr;
684     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
685     break;
686   default:
687     /* Must have a valid address to send to */
688     GNUNET_break_op (0);
689     return NULL;
690   }
691
692   s->addrlen = len;
693   s->target = *target;
694   s->sock_addr = (const struct sockaddr *) &s[1];
695   s->flow_delay_for_other_peer = GNUNET_TIME_relative_get_zero();
696   s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
697   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
698
699   return s;
700 }
701
702 static int session_cmp_it (void *cls,
703                            const GNUNET_HashCode * key,
704                            void *value)
705 {
706   struct SessionCompareContext * cctx = cls;
707   const struct GNUNET_HELLO_Address *address = cctx->addr;
708   struct Session *s = value;
709
710   socklen_t s_addrlen = s->addrlen;
711
712 #if VERBOSE_UDP
713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
714       udp_address_to_string (NULL, (void *) address->address, address->address_length),
715       GNUNET_a2s (s->sock_addr, s->addrlen));
716 #endif
717
718   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
719       (s_addrlen == sizeof (struct sockaddr_in)))
720   {
721     struct IPv4UdpAddress * u4 = NULL;
722     u4 = (struct IPv4UdpAddress *) address->address;
723     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
724     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
725         (u4->u4_port == s4->sin_port))
726     {
727       cctx->res = s;
728       return GNUNET_NO;
729     }
730
731   }
732   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
733       (s_addrlen == sizeof (struct sockaddr_in6)))
734   {
735     struct IPv6UdpAddress * u6 = NULL;
736     u6 = (struct IPv6UdpAddress *) address->address;
737     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
738     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
739         (u6->u6_port == s6->sin6_port))
740     {
741       cctx->res = s;
742       return GNUNET_NO;
743     }
744   }
745
746
747   return GNUNET_YES;
748 }
749
750
751 /**
752  * Creates a new outbound session the transport service will use to send data to the
753  * peer
754  *
755  * @param cls the plugin
756  * @param address the address
757  * @return the session or NULL of max connections exceeded
758  */
759 static struct Session *
760 udp_plugin_get_session (void *cls,
761                   const struct GNUNET_HELLO_Address *address)
762 {
763   struct Session * s = NULL;
764   struct Plugin * plugin = cls;
765   struct IPv6UdpAddress * udp_a6;
766   struct IPv4UdpAddress * udp_a4;
767
768   GNUNET_assert (plugin != NULL);
769   GNUNET_assert (address != NULL);
770
771
772   if ((address->address == NULL) ||
773       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
774       (address->address_length != sizeof (struct IPv6UdpAddress))))
775   {
776     GNUNET_break (0);
777     return NULL;
778   }
779
780   if (address->address_length == sizeof (struct IPv4UdpAddress))
781   {
782     if (plugin->sockv4 == NULL)
783       return NULL;
784     udp_a4 = (struct IPv4UdpAddress *) address->address;
785     if (udp_a4->u4_port == 0)
786       return NULL;
787   }
788
789   if (address->address_length == sizeof (struct IPv6UdpAddress))
790   {
791     if (plugin->sockv6 == NULL)
792       return NULL;
793     udp_a6 = (struct IPv6UdpAddress *) address->address;
794     if (udp_a6->u6_port == 0)
795       return NULL;
796   }
797
798   /* check if session already exists */
799   struct SessionCompareContext cctx;
800   cctx.addr = address;
801   cctx.res = NULL;
802 #if VERBOSE_UDP
803   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length));
804 #endif
805   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
806   if (cctx.res != NULL)
807   {
808 #if VERBOSE_UDP
809     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
810 #endif
811     return cctx.res;
812   }
813
814   /* otherwise create new */
815   s = create_session (plugin,
816       &address->peer,
817       address->address,
818       address->address_length,
819       NULL, NULL);
820 #if VERBOSE
821     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822               "Creating new session %p for peer `%s' address `%s'\n",
823               s,
824               GNUNET_i2s(&address->peer),
825               udp_address_to_string(NULL,address->address,address->address_length));
826 #endif
827   GNUNET_assert (GNUNET_OK ==
828                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
829                                                     &s->target.hashPubKey,
830                                                     s,
831                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
832
833   return s;
834 }
835
836 static void enqueue (struct Plugin *plugin, struct UDPMessageWrapper * udpw)
837 {
838
839   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
840     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
841   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
842     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
843 }
844
845 /**
846  * Function that is called with messages created by the fragmentation
847  * module.  In the case of the 'proc' callback of the
848  * GNUNET_FRAGMENT_context_create function, this function must
849  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
850  *
851  * @param cls closure, the 'struct FragmentationContext'
852  * @param msg the message that was created
853  */
854 static void
855 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
856 {
857   struct FragmentationContext *frag_ctx = cls;
858   struct Plugin *plugin = frag_ctx->plugin;
859   struct UDPMessageWrapper * udpw;
860   struct Session *s;
861
862   size_t msg_len = ntohs (msg->size);
863
864 #if VERBOSE_UDP
865   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
866 #endif
867
868   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
869   udpw->session = frag_ctx->session;
870   s = udpw->session;
871   udpw->udp = (char *) &udpw[1];
872
873   udpw->msg_size = msg_len;
874   udpw->cont = frag_ctx->cont;
875   udpw->cont_cls = frag_ctx->cont_cls;
876   udpw->timeout = frag_ctx->timeout;
877   udpw->frag_ctx = frag_ctx;
878   memcpy (udpw->udp, msg, msg_len);
879
880   enqueue (plugin, udpw);
881
882
883   if (s->addrlen == sizeof (struct sockaddr_in))
884   {
885     if (plugin->with_v4_ws == GNUNET_NO)
886     {
887       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
888         GNUNET_SCHEDULER_cancel(plugin->select_task);
889
890       plugin->select_task =
891           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
892                                        GNUNET_SCHEDULER_NO_TASK,
893                                        GNUNET_TIME_UNIT_FOREVER_REL,
894                                        plugin->rs_v4,
895                                        plugin->ws_v4,
896                                        &udp_plugin_select, plugin);
897       plugin->with_v4_ws = GNUNET_YES;
898     }
899   }
900
901   else if (s->addrlen == sizeof (struct sockaddr_in6))
902   {
903     if (plugin->with_v6_ws == GNUNET_NO)
904     {
905       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
906         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
907
908       plugin->select_task_v6 =
909           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
910                                        GNUNET_SCHEDULER_NO_TASK,
911                                        GNUNET_TIME_UNIT_FOREVER_REL,
912                                        plugin->rs_v6,
913                                        plugin->ws_v6,
914                                        &udp_plugin_select_v6, plugin);
915       plugin->with_v6_ws = GNUNET_YES;
916     }
917   }
918
919 }
920
921
922
923
924 /**
925  * Function that can be used by the transport service to transmit
926  * a message using the plugin.   Note that in the case of a
927  * peer disconnecting, the continuation MUST be called
928  * prior to the disconnect notification itself.  This function
929  * will be called with this peer's HELLO message to initiate
930  * a fresh connection to another peer.
931  *
932  * @param cls closure
933  * @param s which session must be used
934  * @param msgbuf the message to transmit
935  * @param msgbuf_size number of bytes in 'msgbuf'
936  * @param priority how important is the message (most plugins will
937  *                 ignore message priority and just FIFO)
938  * @param to how long to wait at most for the transmission (does not
939  *                require plugins to discard the message after the timeout,
940  *                just advisory for the desired delay; most plugins will ignore
941  *                this as well)
942  * @param cont continuation to call once the message has
943  *        been transmitted (or if the transport is ready
944  *        for the next transmission call; or if the
945  *        peer disconnected...); can be NULL
946  * @param cont_cls closure for cont
947  * @return number of bytes used (on the physical network, with overheads);
948  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
949  *         and does NOT mean that the message was not transmitted (DV)
950  */
951 static ssize_t
952 udp_plugin_send (void *cls,
953                   struct Session *s,
954                   const char *msgbuf, size_t msgbuf_size,
955                   unsigned int priority,
956                   struct GNUNET_TIME_Relative to,
957                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
958 {
959   struct Plugin *plugin = cls;
960   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
961
962   struct UDPMessageWrapper * udpw;
963   struct UDPMessage *udp;
964   char mbuf[mlen];
965   GNUNET_assert (plugin != NULL);
966   GNUNET_assert (s != NULL);
967
968   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
969     return GNUNET_SYSERR;
970
971    if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
972      return GNUNET_SYSERR;
973
974
975   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
976   {
977     GNUNET_break (0);
978     return GNUNET_SYSERR;
979   }
980
981   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
982   {
983     GNUNET_break (0);
984     return GNUNET_SYSERR;
985   }
986
987   LOG (GNUNET_ERROR_TYPE_DEBUG,
988        "UDP transmits %u-byte message to `%s' using address `%s'\n",
989          msgbuf_size,
990          GNUNET_i2s (&s->target),
991          GNUNET_a2s(s->sock_addr, s->addrlen));
992
993   /* Message */
994   udp = (struct UDPMessage *) mbuf;
995   udp->header.size = htons (mlen);
996   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
997   udp->reserved = htonl (0);
998   udp->sender = *plugin->env->my_identity;
999
1000   if (mlen <= UDP_MTU)
1001   {
1002     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
1003     udpw->session = s;
1004     udpw->udp = (char *) &udpw[1];
1005     udpw->msg_size = mlen;
1006     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1007     udpw->cont = cont;
1008     udpw->cont_cls = cont_cls;
1009     udpw->frag_ctx = NULL;
1010
1011     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
1012     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1013
1014     enqueue (plugin, udpw);
1015   }
1016   else
1017   {
1018     LOG (GNUNET_ERROR_TYPE_DEBUG,
1019          "UDP has to fragment message \n");
1020     if  (s->frag_ctx != NULL)
1021       return GNUNET_SYSERR;
1022     memcpy (&udp[1], msgbuf, msgbuf_size);
1023     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
1024
1025     frag_ctx->plugin = plugin;
1026     frag_ctx->session = s;
1027     frag_ctx->cont = cont;
1028     frag_ctx->cont_cls = cont_cls;
1029     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1030     frag_ctx->bytes_to_send = mlen;
1031     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1032               UDP_MTU,
1033               &plugin->tracker,
1034               s->last_expected_delay,
1035               &udp->header,
1036               &enqueue_fragment,
1037               frag_ctx);
1038
1039     s->frag_ctx = frag_ctx;
1040
1041   }
1042
1043   if (s->addrlen == sizeof (struct sockaddr_in))
1044   {
1045     if (plugin->with_v4_ws == GNUNET_NO)
1046     {
1047       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1048         GNUNET_SCHEDULER_cancel(plugin->select_task);
1049
1050       plugin->select_task =
1051           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1052                                        GNUNET_SCHEDULER_NO_TASK,
1053                                        GNUNET_TIME_UNIT_FOREVER_REL,
1054                                        plugin->rs_v4,
1055                                        plugin->ws_v4,
1056                                        &udp_plugin_select, plugin);
1057       plugin->with_v4_ws = GNUNET_YES;
1058     }
1059   }
1060
1061   else if (s->addrlen == sizeof (struct sockaddr_in6))
1062   {
1063     if (plugin->with_v6_ws == GNUNET_NO)
1064     {
1065       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1066         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1067
1068       plugin->select_task_v6 =
1069         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1070                                      GNUNET_SCHEDULER_NO_TASK,
1071                                      GNUNET_TIME_UNIT_FOREVER_REL,
1072                                      plugin->rs_v6,
1073                                      plugin->ws_v6,
1074                                      &udp_plugin_select_v6, plugin);
1075       plugin->with_v6_ws = GNUNET_YES;
1076     }
1077   }
1078
1079   return mlen;
1080 }
1081
1082
1083 /**
1084  * Our external IP address/port mapping has changed.
1085  *
1086  * @param cls closure, the 'struct LocalAddrList'
1087  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1088  *     the previous (now invalid) one
1089  * @param addr either the previous or the new public IP address
1090  * @param addrlen actual lenght of the address
1091  */
1092 static void
1093 udp_nat_port_map_callback (void *cls, int add_remove,
1094                            const struct sockaddr *addr, socklen_t addrlen)
1095 {
1096   struct Plugin *plugin = cls;
1097   struct IPv4UdpAddress u4;
1098   struct IPv6UdpAddress u6;
1099   void *arg;
1100   size_t args;
1101
1102   /* convert 'addr' to our internal format */
1103   switch (addr->sa_family)
1104   {
1105   case AF_INET:
1106     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1107     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1108     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1109     arg = &u4;
1110     args = sizeof (u4);
1111     break;
1112   case AF_INET6:
1113     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1114     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1115             sizeof (struct in6_addr));
1116     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1117     arg = &u6;
1118     args = sizeof (u6);
1119     break;
1120   default:
1121     GNUNET_break (0);
1122     return;
1123   }
1124   /* modify our published address list */
1125   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args);
1126 }
1127
1128
1129
1130 /**
1131  * Message tokenizer has broken up an incomming message. Pass it on
1132  * to the service.
1133  *
1134  * @param cls the 'struct Plugin'
1135  * @param client the 'struct SourceInformation'
1136  * @param hdr the actual message
1137  */
1138 static void
1139 process_inbound_tokenized_messages (void *cls, void *client,
1140                                     const struct GNUNET_MessageHeader *hdr)
1141 {
1142   struct Plugin *plugin = cls;
1143   struct SourceInformation *si = client;
1144   struct GNUNET_ATS_Information ats[2];
1145   struct GNUNET_TIME_Relative delay;
1146
1147   GNUNET_assert (si->session != NULL);
1148   /* setup ATS */
1149   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1150   ats[0].value = htonl (1);
1151   ats[1] = si->session->ats;
1152   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1153
1154   delay = plugin->env->receive (plugin->env->cls,
1155                 &si->sender,
1156                 hdr,
1157                 (const struct GNUNET_ATS_Information *) &ats, 2,
1158                 NULL,
1159                 si->arg,
1160                 si->args);
1161   si->session->flow_delay_for_other_peer = delay;
1162 }
1163
1164
1165 /**
1166  * We've received a UDP Message.  Process it (pass contents to main service).
1167  *
1168  * @param plugin plugin context
1169  * @param msg the message
1170  * @param sender_addr sender address
1171  * @param sender_addr_len number of bytes in sender_addr
1172  */
1173 static void
1174 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1175                      const struct sockaddr *sender_addr,
1176                      socklen_t sender_addr_len)
1177 {
1178   struct SourceInformation si;
1179   struct Session * s = NULL;
1180   struct IPv4UdpAddress u4;
1181   struct IPv6UdpAddress u6;
1182   const void *arg;
1183   size_t args;
1184
1185   if (0 != ntohl (msg->reserved))
1186   {
1187     GNUNET_break_op (0);
1188     return;
1189   }
1190   if (ntohs (msg->header.size) <
1191       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1192   {
1193     GNUNET_break_op (0);
1194     return;
1195   }
1196
1197   /* convert address */
1198   switch (sender_addr->sa_family)
1199   {
1200   case AF_INET:
1201     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1202     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1203     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1204     arg = &u4;
1205     args = sizeof (u4);
1206     break;
1207   case AF_INET6:
1208     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1209     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1210     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1211     arg = &u6;
1212     args = sizeof (u6);
1213     break;
1214   default:
1215     GNUNET_break (0);
1216     return;
1217   }
1218 #if DEBUG_UDP
1219   LOG (GNUNET_ERROR_TYPE_DEBUG,
1220        "Received message with %u bytes from peer `%s' at `%s'\n",
1221        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1222        GNUNET_a2s (sender_addr, sender_addr_len));
1223 #endif
1224
1225   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1226   s = udp_plugin_get_session(plugin, address);
1227   GNUNET_free (address);
1228
1229   /* iterate over all embedded messages */
1230   si.session = s;
1231   si.sender = msg->sender;
1232   si.arg = arg;
1233   si.args = args;
1234
1235   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1236                              ntohs (msg->header.size) -
1237                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1238 }
1239
1240
1241 /**
1242  * Scan the heap for a receive context with the given address.
1243  *
1244  * @param cls the 'struct FindReceiveContext'
1245  * @param node internal node of the heap
1246  * @param element value stored at the node (a 'struct ReceiveContext')
1247  * @param cost cost associated with the node
1248  * @return GNUNET_YES if we should continue to iterate,
1249  *         GNUNET_NO if not.
1250  */
1251 static int
1252 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1253                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1254 {
1255   struct FindReceiveContext *frc = cls;
1256   struct DefragContext *e = element;
1257
1258   if ((frc->addr_len == e->addr_len) &&
1259       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1260   {
1261     frc->rc = e;
1262     return GNUNET_NO;
1263   }
1264   return GNUNET_YES;
1265 }
1266
1267
1268 /**
1269  * Process a defragmented message.
1270  *
1271  * @param cls the 'struct ReceiveContext'
1272  * @param msg the message
1273  */
1274 static void
1275 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1276 {
1277   struct DefragContext *rc = cls;
1278
1279   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1280   {
1281     GNUNET_break (0);
1282     return;
1283   }
1284   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1285   {
1286     GNUNET_break (0);
1287     return;
1288   }
1289   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1290                        rc->src_addr, rc->addr_len);
1291 }
1292
1293 struct LookupContext
1294 {
1295   const struct sockaddr * addr;
1296   size_t addrlen;
1297
1298   struct Session *res;
1299 };
1300
1301 static int
1302 lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value)
1303 {
1304   struct LookupContext *l_ctx = cls;
1305   struct Session * s = value;
1306
1307   if ((s->addrlen == l_ctx->addrlen) &&
1308       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1309   {
1310     l_ctx->res = s;
1311     return GNUNET_NO;
1312   }
1313   return GNUNET_YES;
1314 }
1315
1316 /**
1317  * Transmit an acknowledgement.
1318  *
1319  * @param cls the 'struct ReceiveContext'
1320  * @param id message ID (unused)
1321  * @param msg ack to transmit
1322  */
1323 static void
1324 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1325 {
1326   struct DefragContext *rc = cls;
1327
1328   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1329   struct UDP_ACK_Message *udp_ack;
1330   uint32_t delay = 0;
1331   struct UDPMessageWrapper *udpw;
1332   struct Session *s;
1333
1334   struct LookupContext l_ctx;
1335   l_ctx.addr = rc->src_addr;
1336   l_ctx.addrlen = rc->addr_len;
1337   l_ctx.res = NULL;
1338   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1339       &lookup_session_by_addr_it,
1340       &l_ctx);
1341   s = l_ctx.res;
1342
1343   GNUNET_assert (s != NULL);
1344
1345   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1346     delay = s->flow_delay_for_other_peer.rel_value;
1347
1348 #if DEBUG_UDP
1349   LOG (GNUNET_ERROR_TYPE_DEBUG,
1350        "Sending ACK to `%s' including delay of %u ms\n",
1351        GNUNET_a2s (rc->src_addr,
1352                    (rc->src_addr->sa_family ==
1353                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1354                                                                      sockaddr_in6)),
1355        delay);
1356 #endif
1357   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
1358   udpw->cont = NULL;
1359   udpw->cont_cls = NULL;
1360   udpw->frag_ctx = NULL;
1361   udpw->msg_size = msize;
1362   udpw->session = s;
1363   udpw->timeout = GNUNET_TIME_absolute_get_forever();
1364   udpw->udp = (char *)&udpw[1];
1365
1366   udp_ack = (struct UDP_ACK_Message *) udpw->udp;
1367   udp_ack->header.size = htons ((uint16_t) msize);
1368   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1369   udp_ack->delay = htonl (delay);
1370   udp_ack->sender = *rc->plugin->env->my_identity;
1371   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1372
1373   enqueue (rc->plugin, udpw);
1374 }
1375
1376
1377 static void read_process_msg (struct Plugin *plugin,
1378     const struct GNUNET_MessageHeader *msg,
1379     char *addr,
1380     socklen_t fromlen)
1381 {
1382   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1383   {
1384     GNUNET_break_op (0);
1385     return;
1386   }
1387   process_udp_message (plugin, (const struct UDPMessage *) msg,
1388                        (const struct sockaddr *) addr, fromlen);
1389   return;
1390 }
1391
1392 static void read_process_ack (struct Plugin *plugin,
1393     const struct GNUNET_MessageHeader *msg,
1394     char *addr,
1395     socklen_t fromlen)
1396 {
1397   const struct GNUNET_MessageHeader *ack;
1398   const struct UDP_ACK_Message *udp_ack;
1399   struct LookupContext l_ctx;
1400   struct Session *s = NULL;
1401   struct GNUNET_TIME_Relative flow_delay;
1402
1403   if (ntohs (msg->size) <
1404       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1405   {
1406     GNUNET_break_op (0);
1407     return;
1408   }
1409
1410   udp_ack = (const struct UDP_ACK_Message *) msg;
1411
1412   l_ctx.addr = (const struct sockaddr *) addr;
1413   l_ctx.addrlen = fromlen;
1414   l_ctx.res = NULL;
1415   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1416       &lookup_session_by_addr_it,
1417       &l_ctx);
1418   s = l_ctx.res;
1419
1420   if ((s == NULL) || (s->frag_ctx == NULL))
1421     return;
1422
1423   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1424   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1425        flow_delay.rel_value);
1426   s->flow_delay_from_other_peer =
1427       GNUNET_TIME_relative_to_absolute (flow_delay);
1428
1429   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1430   if (ntohs (ack->size) !=
1431       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1432   {
1433     GNUNET_break_op (0);
1434     return;
1435   }
1436
1437   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1438   {
1439 #if DEBUG_UDP
1440   LOG (GNUNET_ERROR_TYPE_DEBUG,
1441        "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1442        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1443        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1444 #endif
1445     return;
1446   }
1447
1448 #if DEBUG_UDP
1449   LOG (GNUNET_ERROR_TYPE_DEBUG,
1450        "FULL MESSAGE ACKed\n",
1451        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1452        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1453 #endif
1454   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1455
1456   struct UDPMessageWrapper * udpw = NULL;
1457   if (s->addrlen == sizeof (struct sockaddr_in6))
1458   {
1459     udpw = plugin->ipv6_queue_head;
1460     while (udpw!= NULL)
1461     {
1462       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1463       {
1464         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1465         GNUNET_free (udpw);
1466       }
1467       udpw = udpw->next;
1468     }
1469   }
1470   if (s->addrlen == sizeof (struct sockaddr_in))
1471   {
1472     udpw = plugin->ipv4_queue_head;
1473     while (udpw!= NULL)
1474     {
1475       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1476       {
1477         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1478         GNUNET_free (udpw);
1479       }
1480       udpw = udpw->next;
1481     }
1482   }
1483
1484   if (s->frag_ctx->cont != NULL)
1485     s->frag_ctx->cont
1486     (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
1487   GNUNET_free (s->frag_ctx);
1488   s->frag_ctx = NULL;
1489   return;
1490 }
1491
1492 static void read_process_fragment (struct Plugin *plugin,
1493     const struct GNUNET_MessageHeader *msg,
1494     char *addr,
1495     socklen_t fromlen)
1496 {
1497   struct DefragContext *d_ctx;
1498   struct GNUNET_TIME_Absolute now;
1499   struct FindReceiveContext frc;
1500
1501
1502   frc.rc = NULL;
1503   frc.addr = (const struct sockaddr *) addr;
1504   frc.addr_len = fromlen;
1505
1506 #if DEBUG_UDP
1507   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1508        (unsigned int) ntohs (msg->size),
1509        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1510 #endif
1511
1512   /* Lookup existing receive context for this address */
1513   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1514                                  &find_receive_context,
1515                                  &frc);
1516   now = GNUNET_TIME_absolute_get ();
1517   d_ctx = frc.rc;
1518
1519   if (d_ctx == NULL)
1520   {
1521     /* Create a new defragmentation context */
1522     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1523     memcpy (&d_ctx[1], addr, fromlen);
1524     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1525     d_ctx->addr_len = fromlen;
1526     d_ctx->plugin = plugin;
1527     d_ctx->defrag =
1528         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1529                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1530                                           &fragment_msg_proc, &ack_proc);
1531     d_ctx->hnode =
1532         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1533                                       (GNUNET_CONTAINER_HeapCostType)
1534                                       now.abs_value);
1535 #if DEBUG_UDP
1536   LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n",
1537        (unsigned int) ntohs (msg->size),
1538        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1539 #endif
1540   }
1541   else
1542   {
1543 #if DEBUG_UDP
1544   LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1545        (unsigned int) ntohs (msg->size),
1546        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1547 #endif
1548   }
1549
1550   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1551   {
1552     /* keep this 'rc' from expiring */
1553     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1554                                        (GNUNET_CONTAINER_HeapCostType)
1555                                        now.abs_value);
1556   }
1557   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1558       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1559   {
1560     /* remove 'rc' that was inactive the longest */
1561     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1562     GNUNET_assert (NULL != d_ctx);
1563     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1564     GNUNET_free (d_ctx);
1565   }
1566 }
1567
1568 /**
1569  * Read and process a message from the given socket.
1570  *
1571  * @param plugin the overall plugin
1572  * @param rsock socket to read from
1573  */
1574 static void
1575 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1576 {
1577   socklen_t fromlen;
1578   char addr[32];
1579   char buf[65536];
1580   ssize_t size;
1581   const struct GNUNET_MessageHeader *msg;
1582
1583   fromlen = sizeof (addr);
1584   memset (&addr, 0, sizeof (addr));
1585   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1586                                       (struct sockaddr *) &addr, &fromlen);
1587
1588   if (size < sizeof (struct GNUNET_MessageHeader))
1589   {
1590     GNUNET_break_op (0);
1591     return;
1592   }
1593   msg = (const struct GNUNET_MessageHeader *) buf;
1594
1595   LOG (GNUNET_ERROR_TYPE_DEBUG,
1596        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1597        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1598
1599   if (size != ntohs (msg->size))
1600   {
1601     GNUNET_break_op (0);
1602     return;
1603   }
1604
1605   switch (ntohs (msg->type))
1606   {
1607   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1608     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1609     return;
1610
1611   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1612     read_process_msg (plugin, msg, addr, fromlen);
1613     return;
1614
1615   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1616     read_process_ack (plugin, msg, addr, fromlen);;
1617     return;
1618
1619   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1620     read_process_fragment (plugin, msg, addr, fromlen);
1621     return;
1622
1623   default:
1624     GNUNET_break_op (0);
1625     return;
1626   }
1627 }
1628
1629 size_t
1630 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
1631 {
1632   ssize_t sent;
1633   size_t slen;
1634   struct GNUNET_TIME_Absolute max;
1635   struct GNUNET_TIME_Absolute ;
1636
1637   struct UDPMessageWrapper *udpw = NULL;
1638
1639   if (sock == plugin->sockv4)
1640   {
1641     udpw = plugin->ipv4_queue_head;
1642   }
1643   else if (sock == plugin->sockv6)
1644   {
1645     udpw = plugin->ipv6_queue_head;
1646   }
1647   else
1648   {
1649     GNUNET_break (0);
1650     return 0;
1651   }
1652
1653   const struct sockaddr * sa = udpw->session->sock_addr;
1654   slen = udpw->session->addrlen;
1655
1656   max = GNUNET_TIME_absolute_max(udpw->timeout, GNUNET_TIME_absolute_get());
1657
1658   while (udpw != NULL)
1659   {
1660     if (max.abs_value != udpw->timeout.abs_value)
1661     {
1662       /* Message timed out */
1663
1664       if (udpw->cont != NULL)
1665         udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
1666       if (udpw->frag_ctx != NULL)
1667       {
1668 #if DEBUG_UDP
1669         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n",
1670             GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
1671 #endif
1672         udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
1673         GNUNET_free (udpw->frag_ctx);
1674         udpw->session->frag_ctx = NULL;
1675       }
1676       else
1677       {
1678 #if DEBUG_UDP
1679         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' with size %u timed out\n",
1680             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1681 #endif
1682       }
1683
1684       if (sock == plugin->sockv4)
1685       {
1686         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1687         GNUNET_free (udpw);
1688         udpw = plugin->ipv4_queue_head;
1689       }
1690       else if (sock == plugin->sockv6)
1691       {
1692         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1693         GNUNET_free (udpw);
1694         udpw = plugin->ipv6_queue_head;
1695       }
1696     }
1697     else
1698     {
1699       struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
1700       if (delta.rel_value == 0)
1701       {
1702         /* this message is not delayed */
1703         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is not delayed \n",
1704             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1705         break;
1706       }
1707       else
1708       {
1709         /* this message is delayed, try next */
1710         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is delayed for %llu \n",
1711             GNUNET_i2s(&udpw->session->target), udpw->msg_size,
1712             delta);
1713         udpw = udpw->next;
1714       }
1715     }
1716
1717   }
1718
1719   if (udpw == NULL)
1720   {
1721     /* No message left */
1722     return 0;
1723   }
1724
1725   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->udp, udpw->msg_size, sa, slen);
1726
1727   if (GNUNET_SYSERR == sent)
1728   {
1729     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "sendto");
1730     LOG (GNUNET_ERROR_TYPE_DEBUG,
1731          "UDP transmitted %u-byte message to %s (%d: %s)\n",
1732          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1733          (sent < 0) ? STRERROR (errno) : "ok");
1734     if (udpw->cont != NULL)
1735       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
1736   }
1737   LOG (GNUNET_ERROR_TYPE_DEBUG,
1738        "UDP transmitted %u-byte message to %s (%d: %s)\n",
1739        (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1740        (sent < 0) ? STRERROR (errno) : "ok");
1741
1742   /* This was just a message fragment */
1743   if (udpw->frag_ctx != NULL)
1744   {
1745     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1746   }
1747   /* This was a complete message*/
1748   else
1749   {
1750     if (udpw->cont != NULL)
1751       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK);
1752   }
1753
1754   if (sock == plugin->sockv4)
1755     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1756   else if (sock == plugin->sockv6)
1757     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1758   GNUNET_free (udpw);
1759   udpw = NULL;
1760
1761   return sent;
1762 }
1763
1764 /**
1765  * We have been notified that our readset has something to read.  We don't
1766  * know which socket needs to be read, so we have to check each one
1767  * Then reschedule this function to be called again once more is available.
1768  *
1769  * @param cls the plugin handle
1770  * @param tc the scheduling context (for rescheduling this function again)
1771  */
1772 static void
1773 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1774 {
1775   struct Plugin *plugin = cls;
1776
1777   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
1778   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1779     return;
1780   plugin->with_v4_ws = GNUNET_NO;
1781
1782   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1783   {
1784     if ((NULL != plugin->sockv4) &&
1785       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
1786         udp_select_read (plugin, plugin->sockv4);
1787
1788   }
1789
1790   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1791   {
1792     if ((NULL != plugin->sockv4) && (plugin->ipv4_queue_head != NULL) &&
1793       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
1794       {
1795         udp_select_send (plugin, plugin->sockv4);
1796       }
1797   }
1798
1799   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1800     GNUNET_SCHEDULER_cancel (plugin->select_task);
1801   plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1802                                    GNUNET_SCHEDULER_NO_TASK,
1803                                    GNUNET_TIME_UNIT_FOREVER_REL,
1804                                    plugin->rs_v4,
1805                                    (plugin->ipv4_queue_head != NULL) ? plugin->ws_v4 : NULL,
1806                                    &udp_plugin_select, plugin);
1807   if (plugin->ipv4_queue_head != NULL)
1808     plugin->with_v4_ws = GNUNET_YES;
1809   else
1810     plugin->with_v4_ws = GNUNET_NO;
1811 }
1812
1813
1814 /**
1815  * We have been notified that our readset has something to read.  We don't
1816  * know which socket needs to be read, so we have to check each one
1817  * Then reschedule this function to be called again once more is available.
1818  *
1819  * @param cls the plugin handle
1820  * @param tc the scheduling context (for rescheduling this function again)
1821  */
1822 static void
1823 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1824 {
1825   struct Plugin *plugin = cls;
1826
1827   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
1828   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1829     return;
1830
1831   plugin->with_v6_ws = GNUNET_NO;
1832   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1833   {
1834     if ((NULL != plugin->sockv6) &&
1835       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
1836         udp_select_read (plugin, plugin->sockv6);
1837   }
1838
1839   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1840   {
1841     if ((NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
1842       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)))
1843       {
1844         udp_select_send (plugin, plugin->sockv6);
1845       }
1846   }
1847   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1848     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
1849   plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1850                                    GNUNET_SCHEDULER_NO_TASK,
1851                                    GNUNET_TIME_UNIT_FOREVER_REL,
1852                                    plugin->rs_v6,
1853                                    (plugin->ipv6_queue_head != NULL) ? plugin->ws_v6 : NULL,
1854                                    &udp_plugin_select_v6, plugin);
1855   if (plugin->ipv6_queue_head != NULL)
1856     plugin->with_v6_ws = GNUNET_YES;
1857   else
1858     plugin->with_v6_ws = GNUNET_NO;
1859 }
1860
1861
1862 static int
1863 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
1864 {
1865   int tries;
1866   int sockets_created = 0;
1867   struct sockaddr *serverAddr;
1868   struct sockaddr *addrs[2];
1869   socklen_t addrlens[2];
1870   socklen_t addrlen;
1871
1872   /* Create IPv6 socket */
1873   if (plugin->enable_ipv6 == GNUNET_YES)
1874   {
1875     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
1876     if (NULL == plugin->sockv6)
1877     {
1878       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
1879       plugin->enable_ipv6 = GNUNET_NO;
1880     }
1881     else
1882     {
1883 #if HAVE_SOCKADDR_IN_SIN_LEN
1884       serverAddrv6->sin6_len = sizeof (serverAddrv6);
1885 #endif
1886       serverAddrv6->sin6_family = AF_INET6;
1887       serverAddrv6->sin6_addr = in6addr_any;
1888       serverAddrv6->sin6_port = htons (plugin->port);
1889       addrlen = sizeof (struct sockaddr_in6);
1890       serverAddr = (struct sockaddr *) serverAddrv6;
1891 #if DEBUG_UDP
1892       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
1893            ntohs (serverAddrv6->sin6_port));
1894 #endif
1895       tries = 0;
1896       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
1897              GNUNET_OK)
1898       {
1899         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
1900 #if DEBUG_UDP
1901         LOG (GNUNET_ERROR_TYPE_DEBUG,
1902              "IPv6 Binding failed, trying new port %d\n",
1903              ntohs (serverAddrv6->sin6_port));
1904 #endif
1905         tries++;
1906         if (tries > 10)
1907         {
1908           GNUNET_NETWORK_socket_close (plugin->sockv6);
1909           plugin->sockv6 = NULL;
1910           break;
1911         }
1912       }
1913       if (plugin->sockv6 != NULL)
1914       {
1915 #if DEBUG_UDP
1916         LOG (GNUNET_ERROR_TYPE_DEBUG,
1917              "IPv6 socket created on port %d\n",
1918              ntohs (serverAddrv6->sin6_port));
1919 #endif
1920         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
1921         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
1922         sockets_created++;
1923       }
1924     }
1925   }
1926
1927   /* Create IPv4 socket */
1928   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
1929   if (NULL == plugin->sockv4)
1930   {
1931     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
1932   }
1933   else
1934   {
1935 #if HAVE_SOCKADDR_IN_SIN_LEN
1936     serverAddrv4->sin_len = sizeof (serverAddrv4);
1937 #endif
1938     serverAddrv4->sin_family = AF_INET;
1939     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
1940     serverAddrv4->sin_port = htons (plugin->port);
1941     addrlen = sizeof (struct sockaddr_in);
1942     serverAddr = (struct sockaddr *) serverAddrv4;
1943
1944 #if DEBUG_UDP
1945     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
1946          ntohs (serverAddrv4->sin_port));
1947 #endif
1948     tries = 0;
1949     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
1950            GNUNET_OK)
1951     {
1952       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
1953 #if DEBUG_UDP
1954       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
1955            ntohs (serverAddrv4->sin_port));
1956 #endif
1957       tries++;
1958       if (tries > 10)
1959       {
1960         GNUNET_NETWORK_socket_close (plugin->sockv4);
1961         plugin->sockv4 = NULL;
1962         break;
1963       }
1964     }
1965     if (plugin->sockv4 != NULL)
1966     {
1967       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
1968       addrlens[sockets_created] = sizeof (struct sockaddr_in);
1969       sockets_created++;
1970     }
1971   }
1972
1973   /* Create file descriptors */
1974   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
1975   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
1976   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
1977   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
1978   if (NULL != plugin->sockv4)
1979   {
1980     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
1981     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
1982   }
1983
1984   if (sockets_created == 0)
1985     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
1986
1987   plugin->select_task =
1988       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1989                                    GNUNET_SCHEDULER_NO_TASK,
1990                                    GNUNET_TIME_UNIT_FOREVER_REL,
1991                                    plugin->rs_v4,
1992                                    NULL,
1993                                    &udp_plugin_select, plugin);
1994   plugin->with_v4_ws = GNUNET_NO;
1995
1996   if (plugin->enable_ipv6 == GNUNET_YES)
1997   {
1998     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
1999     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2000     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2001     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2002     if (NULL != plugin->sockv6)
2003     {
2004       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2005       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2006     }
2007
2008     plugin->select_task_v6 =
2009         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2010                                      GNUNET_SCHEDULER_NO_TASK,
2011                                      GNUNET_TIME_UNIT_FOREVER_REL,
2012                                      plugin->rs_v6,
2013                                      NULL,
2014                                      &udp_plugin_select_v6, plugin);
2015     plugin->with_v6_ws = GNUNET_NO;
2016   }
2017
2018   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2019                            GNUNET_NO, plugin->port,
2020                            sockets_created,
2021                            (const struct sockaddr **) addrs, addrlens,
2022                            &udp_nat_port_map_callback, NULL, plugin);
2023
2024   return sockets_created;
2025 }
2026
2027
2028 /**
2029  * The exported method. Makes the core api available via a global and
2030  * returns the udp transport API.
2031  *
2032  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2033  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2034  */
2035 void *
2036 libgnunet_plugin_transport_udp_init (void *cls)
2037 {
2038   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2039   struct GNUNET_TRANSPORT_PluginFunctions *api;
2040   struct Plugin *plugin;
2041
2042   unsigned long long port;
2043   unsigned long long aport;
2044   unsigned long long broadcast;
2045   unsigned long long udp_max_bps;
2046   unsigned long long enable_v6;
2047   char * bind4_address;
2048   char * bind6_address;
2049   struct GNUNET_TIME_Relative interval;
2050
2051   struct sockaddr_in serverAddrv4;
2052   struct sockaddr_in6 serverAddrv6;
2053
2054   int res;
2055
2056   /* Get port number */
2057   if (GNUNET_OK !=
2058       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2059                                              &port))
2060     port = 2086;
2061   if (GNUNET_OK !=
2062       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2063                                              "ADVERTISED_PORT", &aport))
2064     aport = port;
2065   if (port > 65535)
2066   {
2067     LOG (GNUNET_ERROR_TYPE_WARNING,
2068          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2069          65535);
2070     return NULL;
2071   }
2072
2073   /* Protocols */
2074   if ((GNUNET_YES ==
2075        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2076                                              "DISABLEV6")))
2077   {
2078     enable_v6 = GNUNET_NO;
2079   }
2080   else
2081     enable_v6 = GNUNET_YES;
2082
2083
2084   /* Addresses */
2085   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2086   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2087
2088   if (GNUNET_YES ==
2089       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2090                                              "BINDTO", &bind4_address))
2091   {
2092     LOG (GNUNET_ERROR_TYPE_DEBUG,
2093          "Binding udp plugin to specific address: `%s'\n",
2094          bind4_address);
2095     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2096     {
2097       GNUNET_free (bind4_address);
2098       return NULL;
2099     }
2100   }
2101
2102   if (GNUNET_YES ==
2103       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2104                                              "BINDTO6", &bind6_address))
2105   {
2106     LOG (GNUNET_ERROR_TYPE_DEBUG,
2107          "Binding udp plugin to specific address: `%s'\n",
2108          bind6_address);
2109     if (1 !=
2110         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2111     {
2112       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2113            bind6_address);
2114       GNUNET_free_non_null (bind4_address);
2115       GNUNET_free (bind6_address);
2116       return NULL;
2117     }
2118   }
2119
2120
2121   /* Enable neighbour discovery */
2122   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2123                                             "BROADCAST");
2124   if (broadcast == GNUNET_SYSERR)
2125     broadcast = GNUNET_NO;
2126
2127   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-udp",
2128                                            "BROADCAST_INTERVAL", &interval))
2129   {
2130     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2131   }
2132
2133   /* Maximum datarate */
2134   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2135                                              "MAX_BPS", &udp_max_bps))
2136   {
2137     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2138   }
2139
2140   plugin = GNUNET_malloc (sizeof (struct Plugin));
2141   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2142
2143   GNUNET_BANDWIDTH_tracker_init (&plugin->tracker,
2144                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2145
2146
2147   plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
2148   plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2149   plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
2150   plugin->port = port;
2151   plugin->aport = aport;
2152   plugin->broadcast_interval = interval;
2153   plugin->enable_ipv6 = enable_v6;
2154   plugin->env = env;
2155
2156   api->cls = plugin;
2157   api->send = NULL;
2158   api->disconnect = &udp_disconnect;
2159   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2160   api->address_to_string = &udp_address_to_string;
2161   api->check_address = &udp_plugin_check_address;
2162   api->get_session = &udp_plugin_get_session;
2163   api->send = &udp_plugin_send;
2164
2165   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2166   res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
2167   if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
2168   {
2169     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2170     GNUNET_free (plugin);
2171     GNUNET_free (api);
2172     return NULL;
2173   }
2174
2175   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2176   if (broadcast == GNUNET_YES)
2177     setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
2178
2179
2180   GNUNET_free_non_null (bind4_address);
2181   GNUNET_free_non_null (bind6_address);
2182   return api;
2183 }
2184
2185 int heap_cleanup_iterator (void *cls,
2186                           struct GNUNET_CONTAINER_HeapNode *
2187                           node, void *element,
2188                           GNUNET_CONTAINER_HeapCostType
2189                           cost)
2190 {
2191   struct DefragContext * d_ctx = element;
2192
2193   GNUNET_CONTAINER_heap_remove_node (node);
2194   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2195   GNUNET_free (d_ctx);
2196
2197   return GNUNET_YES;
2198 }
2199
2200
2201 /**
2202  * The exported method. Makes the core api available via a global and
2203  * returns the udp transport API.
2204  *
2205  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2206  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2207  */
2208 void *
2209 libgnunet_plugin_transport_udp_done (void *cls)
2210 {
2211   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2212   struct Plugin *plugin = api->cls;
2213   stop_broadcast (plugin);
2214
2215   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2216   {
2217     GNUNET_SCHEDULER_cancel (plugin->select_task);
2218     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2219   }
2220   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2221   {
2222     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2223     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2224   }
2225
2226   /* Closing sockets */
2227   if (plugin->sockv4 != NULL)
2228   {
2229     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2230     plugin->sockv4 = NULL;
2231   }
2232   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2233   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2234
2235   if (plugin->sockv6 != NULL)
2236   {
2237     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2238     plugin->sockv6 = NULL;
2239
2240     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2241     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2242   }
2243
2244   GNUNET_NAT_unregister (plugin->nat);
2245
2246   if (plugin->defrag_ctxs != NULL)
2247   {
2248     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2249         heap_cleanup_iterator, NULL);
2250     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2251     plugin->defrag_ctxs = NULL;
2252   }
2253   if (plugin->mst != NULL)
2254   {
2255     GNUNET_SERVER_mst_destroy(plugin->mst);
2256     plugin->mst = NULL;
2257   }
2258
2259   /* Clean up leftover messages */
2260   struct UDPMessageWrapper * udpw;
2261   udpw = plugin->ipv4_queue_head;
2262   while (udpw != NULL)
2263   {
2264     struct UDPMessageWrapper *tmp = udpw->next;
2265     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
2266     if (udpw->cont != NULL)
2267       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
2268     GNUNET_free (udpw);
2269     udpw = tmp;
2270   }
2271   udpw = plugin->ipv6_queue_head;
2272   while (udpw != NULL)
2273   {
2274     struct UDPMessageWrapper *tmp = udpw->next;
2275     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
2276     if (udpw->cont != NULL)
2277       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
2278     GNUNET_free (udpw);
2279     udpw = tmp;
2280   }
2281
2282   /* Clean up sessions */
2283 #if DEBUG_UDP
2284   LOG (GNUNET_ERROR_TYPE_DEBUG,
2285        "Cleaning up sessions\n");
2286 #endif
2287   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2288   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2289
2290   plugin->nat = NULL;
2291   GNUNET_free (plugin);
2292   GNUNET_free (api);
2293   return NULL;
2294 }
2295
2296
2297 /* end of plugin_transport_udp.c */