-fix
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88 struct Session
89 {
90   /**
91    * Which peer is this session for?
92    */
93   struct GNUNET_PeerIdentity target;
94
95   /**
96    * Address of the other peer
97    */
98   const struct sockaddr *sock_addr;
99
100   size_t addrlen;
101
102   /**
103    * Desired delay for next sending we send to other peer
104    */
105   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
106
107   /**
108    * Desired delay for next sending we received from other peer
109    */
110   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
111
112   /**
113    * expected delay for ACKs
114    */
115   struct GNUNET_TIME_Relative last_expected_delay;
116
117
118   struct GNUNET_ATS_Information ats;
119
120   struct FragmentationContext * frag_ctx;
121 };
122
123
124 struct SessionCompareContext
125 {
126   struct Session *res;
127   const struct GNUNET_HELLO_Address *addr;
128 };
129
130
131 /**
132  * Closure for 'process_inbound_tokenized_messages'
133  */
134 struct SourceInformation
135 {
136   /**
137    * Sender identity.
138    */
139   struct GNUNET_PeerIdentity sender;
140
141   /**
142    * Source address.
143    */
144   const void *arg;
145
146   /**
147    * Number of bytes in source address.
148    */
149   size_t args;
150
151   struct Session *session;
152 };
153
154
155 /**
156  * Closure for 'find_receive_context'.
157  */
158 struct FindReceiveContext
159 {
160   /**
161    * Where to store the result.
162    */
163   struct DefragContext *rc;
164
165   /**
166    * Address to find.
167    */
168   const struct sockaddr *addr;
169
170   /**
171    * Number of bytes in 'addr'.
172    */
173   socklen_t addr_len;
174
175   struct Session *session;
176 };
177
178
179
180 /**
181  * Data structure to track defragmentation contexts based
182  * on the source of the UDP traffic.
183  */
184 struct DefragContext
185 {
186
187   /**
188    * Defragmentation context.
189    */
190   struct GNUNET_DEFRAGMENT_Context *defrag;
191
192   /**
193    * Source address this receive context is for (allocated at the
194    * end of the struct).
195    */
196   const struct sockaddr *src_addr;
197
198   /**
199    * Reference to master plugin struct.
200    */
201   struct Plugin *plugin;
202
203   /**
204    * Node in the defrag heap.
205    */
206   struct GNUNET_CONTAINER_HeapNode *hnode;
207
208   /**
209    * Length of 'src_addr'
210    */
211   size_t addr_len;
212 };
213
214
215
216 /**
217  * Closure for 'process_inbound_tokenized_messages'
218  */
219 struct FragmentationContext
220 {
221   struct FragmentationContext * next;
222   struct FragmentationContext * prev;
223
224   struct Plugin * plugin;
225   struct GNUNET_FRAGMENT_Context * frag;
226   struct Session * session;
227
228   struct GNUNET_TIME_Absolute timeout;
229
230
231   /**
232    * Function to call upon completion of the transmission.
233    */
234   GNUNET_TRANSPORT_TransmitContinuation cont;
235
236   /**
237    * Closure for 'cont'.
238    */
239   void *cont_cls;
240
241   size_t bytes_to_send;
242 };
243
244
245 struct UDPMessageWrapper
246 {
247   struct Session *session;
248   struct UDPMessageWrapper *prev;
249   struct UDPMessageWrapper *next;
250   char *udp;
251   size_t msg_size;
252
253   struct GNUNET_TIME_Absolute timeout;
254
255   /**
256    * Function to call upon completion of the transmission.
257    */
258   GNUNET_TRANSPORT_TransmitContinuation cont;
259
260   /**
261    * Closure for 'cont'.
262    */
263   void *cont_cls;
264
265   struct FragmentationContext *frag_ctx;
266
267 };
268
269
270 /**
271  * UDP ACK Message-Packet header (after defragmentation).
272  */
273 struct UDP_ACK_Message
274 {
275   /**
276    * Message header.
277    */
278   struct GNUNET_MessageHeader header;
279
280   /**
281    * Desired delay for flow control
282    */
283   uint32_t delay;
284
285   /**
286    * What is the identity of the sender
287    */
288   struct GNUNET_PeerIdentity sender;
289
290 };
291
292 /**
293  * We have been notified that our readset has something to read.  We don't
294  * know which socket needs to be read, so we have to check each one
295  * Then reschedule this function to be called again once more is available.
296  *
297  * @param cls the plugin handle
298  * @param tc the scheduling context (for rescheduling this function again)
299  */
300 static void
301 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
302
303 /**
304  * We have been notified that our readset has something to read.  We don't
305  * know which socket needs to be read, so we have to check each one
306  * Then reschedule this function to be called again once more is available.
307  *
308  * @param cls the plugin handle
309  * @param tc the scheduling context (for rescheduling this function again)
310  */
311 static void
312 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
313
314 /**
315  * Function called for a quick conversion of the binary address to
316  * a numeric address.  Note that the caller must not free the
317  * address and that the next call to this function is allowed
318  * to override the address again.
319  *
320  * @param cls closure
321  * @param addr binary address
322  * @param addrlen length of the address
323  * @return string representing the same address
324  */
325 const char *
326 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
327 {
328   static char rbuf[INET6_ADDRSTRLEN + 10];
329   char buf[INET6_ADDRSTRLEN];
330   const void *sb;
331   struct in_addr a4;
332   struct in6_addr a6;
333   const struct IPv4UdpAddress *t4;
334   const struct IPv6UdpAddress *t6;
335   int af;
336   uint16_t port;
337
338   if (addrlen == sizeof (struct IPv6UdpAddress))
339   {
340     t6 = addr;
341     af = AF_INET6;
342     port = ntohs (t6->u6_port);
343     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
344     sb = &a6;
345   }
346   else if (addrlen == sizeof (struct IPv4UdpAddress))
347   {
348     t4 = addr;
349     af = AF_INET;
350     port = ntohs (t4->u4_port);
351     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
352     sb = &a4;
353   }
354   else
355   {
356     GNUNET_break_op (0);
357     return NULL;
358   }
359   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
360   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
361                    buf, port);
362   return rbuf;
363 }
364
365
366 /**
367  * Append our port and forward the result.
368  *
369  * @param cls a 'struct PrettyPrinterContext'
370  * @param hostname result from DNS resolver
371  */
372 static void
373 append_port (void *cls, const char *hostname)
374 {
375   struct PrettyPrinterContext *ppc = cls;
376   char *ret;
377
378   if (hostname == NULL)
379   {
380     ppc->asc (ppc->asc_cls, NULL);
381     GNUNET_free (ppc);
382     return;
383   }
384   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
385   ppc->asc (ppc->asc_cls, ret);
386   GNUNET_free (ret);
387 }
388
389
390 /**
391  * Convert the transports address to a nice, human-readable
392  * format.
393  *
394  * @param cls closure
395  * @param type name of the transport that generated the address
396  * @param addr one of the addresses of the host, NULL for the last address
397  *        the specific address format depends on the transport
398  * @param addrlen length of the address
399  * @param numeric should (IP) addresses be displayed in numeric form?
400  * @param timeout after how long should we give up?
401  * @param asc function to call on each string
402  * @param asc_cls closure for asc
403  */
404 static void
405 udp_plugin_address_pretty_printer (void *cls, const char *type,
406                                    const void *addr, size_t addrlen,
407                                    int numeric,
408                                    struct GNUNET_TIME_Relative timeout,
409                                    GNUNET_TRANSPORT_AddressStringCallback asc,
410                                    void *asc_cls)
411 {
412   struct PrettyPrinterContext *ppc;
413   const void *sb;
414   size_t sbs;
415   struct sockaddr_in a4;
416   struct sockaddr_in6 a6;
417   const struct IPv4UdpAddress *u4;
418   const struct IPv6UdpAddress *u6;
419   uint16_t port;
420
421   if (addrlen == sizeof (struct IPv6UdpAddress))
422   {
423     u6 = addr;
424     memset (&a6, 0, sizeof (a6));
425     a6.sin6_family = AF_INET6;
426 #if HAVE_SOCKADDR_IN_SIN_LEN
427     a6.sin6_len = sizeof (a6);
428 #endif
429     a6.sin6_port = u6->u6_port;
430     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
431     port = ntohs (u6->u6_port);
432     sb = &a6;
433     sbs = sizeof (a6);
434   }
435   else if (addrlen == sizeof (struct IPv4UdpAddress))
436   {
437     u4 = addr;
438     memset (&a4, 0, sizeof (a4));
439     a4.sin_family = AF_INET;
440 #if HAVE_SOCKADDR_IN_SIN_LEN
441     a4.sin_len = sizeof (a4);
442 #endif
443     a4.sin_port = u4->u4_port;
444     a4.sin_addr.s_addr = u4->ipv4_addr;
445     port = ntohs (u4->u4_port);
446     sb = &a4;
447     sbs = sizeof (a4);
448   }
449   else
450   {
451     /* invalid address */
452     GNUNET_break_op (0);
453     asc (asc_cls, NULL);
454     return;
455   }
456   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
457   ppc->asc = asc;
458   ppc->asc_cls = asc_cls;
459   ppc->port = port;
460   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
461 }
462
463
464 /**
465  * Check if the given port is plausible (must be either our listen
466  * port or our advertised port).  If it is neither, we return
467  * GNUNET_SYSERR.
468  *
469  * @param plugin global variables
470  * @param in_port port number to check
471  * @return GNUNET_OK if port is either open_port or adv_port
472  */
473 static int
474 check_port (struct Plugin *plugin, uint16_t in_port)
475 {
476   if ((in_port == plugin->port) || (in_port == plugin->aport))
477     return GNUNET_OK;
478   return GNUNET_SYSERR;
479 }
480
481
482
483 /**
484  * Function that will be called to check if a binary address for this
485  * plugin is well-formed and corresponds to an address for THIS peer
486  * (as per our configuration).  Naturally, if absolutely necessary,
487  * plugins can be a bit conservative in their answer, but in general
488  * plugins should make sure that the address does not redirect
489  * traffic to a 3rd party that might try to man-in-the-middle our
490  * traffic.
491  *
492  * @param cls closure, should be our handle to the Plugin
493  * @param addr pointer to the address
494  * @param addrlen length of addr
495  * @return GNUNET_OK if this is a plausible address for this peer
496  *         and transport, GNUNET_SYSERR if not
497  *
498  */
499 static int
500 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
501 {
502   struct Plugin *plugin = cls;
503   struct IPv4UdpAddress *v4;
504   struct IPv6UdpAddress *v6;
505
506   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
507       (addrlen != sizeof (struct IPv6UdpAddress)))
508   {
509     GNUNET_break_op (0);
510     return GNUNET_SYSERR;
511   }
512   if (addrlen == sizeof (struct IPv4UdpAddress))
513   {
514     v4 = (struct IPv4UdpAddress *) addr;
515     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
516       return GNUNET_SYSERR;
517     if (GNUNET_OK !=
518         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
519                                  sizeof (struct in_addr)))
520       return GNUNET_SYSERR;
521   }
522   else
523   {
524     v6 = (struct IPv6UdpAddress *) addr;
525     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
526     {
527       GNUNET_break_op (0);
528       return GNUNET_SYSERR;
529     }
530     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
531       return GNUNET_SYSERR;
532     if (GNUNET_OK !=
533         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
534                                  sizeof (struct in6_addr)))
535       return GNUNET_SYSERR;
536   }
537   return GNUNET_OK;
538 }
539
540
541 /**
542  * Destroy a session, plugin is being unloaded.
543  *
544  * @param cls unused
545  * @param key hash of public key of target peer
546  * @param value a 'struct PeerSession*' to clean up
547  * @return GNUNET_OK (continue to iterate)
548  */
549 static int
550 disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
551 {
552   struct Plugin *plugin = cls;
553   struct Session *s = value;
554   struct UDPMessageWrapper *udpw;
555   struct UDPMessageWrapper *next;
556
557 #if DEBUG_UDP
558   LOG (GNUNET_ERROR_TYPE_DEBUG,
559        "Session %p to peer `%s' address ended \n",
560          s,
561          GNUNET_i2s (&s->target),
562          GNUNET_a2s (s->sock_addr, s->addrlen));
563 #endif
564
565   if (s->frag_ctx != NULL)
566   {
567     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
568     GNUNET_free (s->frag_ctx);
569     s->frag_ctx = NULL;
570   }
571
572   udpw = plugin->ipv4_queue_head;
573   while (udpw != NULL)
574   {
575     next = udpw->next;
576     if (udpw->session == s)
577     {
578       GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
579
580       if (udpw->cont != NULL)
581         udpw->cont (udpw->cont_cls, &s->target, GNUNET_SYSERR);
582       GNUNET_free (udpw);
583     }
584     udpw = next;
585   }
586
587   udpw = plugin->ipv6_queue_head;
588   while (udpw != NULL)
589   {
590     next = udpw->next;
591     if (udpw->session == s)
592     {
593       GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
594
595       if (udpw->cont != NULL)
596         udpw->cont (udpw->cont_cls, &s->target, GNUNET_SYSERR);
597       GNUNET_free (udpw);
598     }
599     udpw = next;
600   }
601
602   plugin->env->session_end (plugin->env->cls, &s->target, s);
603
604   GNUNET_assert (GNUNET_YES ==
605                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
606                                                        &s->target.hashPubKey,
607                                                        s));
608
609
610   GNUNET_free (s);
611   return GNUNET_OK;
612 }
613
614
615 /**
616  * Disconnect from a remote node.  Clean up session if we have one for this peer
617  *
618  * @param cls closure for this call (should be handle to Plugin)
619  * @param target the peeridentity of the peer to disconnect
620  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
621  */
622 static void
623 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
624 {
625   struct Plugin *plugin = cls;
626   GNUNET_assert (plugin != NULL);
627
628   GNUNET_assert (target != NULL);
629 #if DEBUG_UDP
630   LOG (GNUNET_ERROR_TYPE_DEBUG,
631        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
632 #endif
633   /* Clean up sessions */
634   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
635 }
636
637 static struct Session *
638 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
639                 const void *addr, size_t addrlen,
640                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
641 {
642   struct Session *s;
643   const struct IPv4UdpAddress *t4;
644   const struct IPv6UdpAddress *t6;
645   struct sockaddr_in *v4;
646   struct sockaddr_in6 *v6;
647   size_t len;
648
649   switch (addrlen)
650   {
651   case sizeof (struct IPv4UdpAddress):
652     if (NULL == plugin->sockv4)
653     {
654       return NULL;
655     }
656     t4 = addr;
657     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
658     len = sizeof (struct sockaddr_in);
659     v4 = (struct sockaddr_in *) &s[1];
660     v4->sin_family = AF_INET;
661 #if HAVE_SOCKADDR_IN_SIN_LEN
662     v4->sin_len = sizeof (struct sockaddr_in);
663 #endif
664     v4->sin_port = t4->u4_port;
665     v4->sin_addr.s_addr = t4->ipv4_addr;
666     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
667     break;
668   case sizeof (struct IPv6UdpAddress):
669     if (NULL == plugin->sockv6)
670     {
671       return NULL;
672     }
673     t6 = addr;
674     s =
675         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
676     len = sizeof (struct sockaddr_in6);
677     v6 = (struct sockaddr_in6 *) &s[1];
678     v6->sin6_family = AF_INET6;
679 #if HAVE_SOCKADDR_IN_SIN_LEN
680     v6->sin6_len = sizeof (struct sockaddr_in6);
681 #endif
682     v6->sin6_port = t6->u6_port;
683     v6->sin6_addr = t6->ipv6_addr;
684     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
685     break;
686   default:
687     /* Must have a valid address to send to */
688     GNUNET_break_op (0);
689     return NULL;
690   }
691
692   s->addrlen = len;
693   s->target = *target;
694   s->sock_addr = (const struct sockaddr *) &s[1];
695   s->flow_delay_for_other_peer = GNUNET_TIME_relative_get_zero();
696   s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
697   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
698
699   return s;
700 }
701
702 static int session_cmp_it (void *cls,
703                            const GNUNET_HashCode * key,
704                            void *value)
705 {
706   struct SessionCompareContext * cctx = cls;
707   const struct GNUNET_HELLO_Address *address = cctx->addr;
708   struct Session *s = value;
709
710   socklen_t s_addrlen = s->addrlen;
711
712 #if VERBOSE_UDP
713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
714       udp_address_to_string (NULL, (void *) address->address, address->address_length),
715       GNUNET_a2s (s->sock_addr, s->addrlen));
716 #endif
717
718   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
719       (s_addrlen == sizeof (struct sockaddr_in)))
720   {
721     struct IPv4UdpAddress * u4 = NULL;
722     u4 = (struct IPv4UdpAddress *) address->address;
723     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
724     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
725         (u4->u4_port == s4->sin_port))
726     {
727       cctx->res = s;
728       return GNUNET_NO;
729     }
730
731   }
732   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
733       (s_addrlen == sizeof (struct sockaddr_in6)))
734   {
735     struct IPv6UdpAddress * u6 = NULL;
736     u6 = (struct IPv6UdpAddress *) address->address;
737     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
738     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
739         (u6->u6_port == s6->sin6_port))
740     {
741       cctx->res = s;
742       return GNUNET_NO;
743     }
744   }
745
746
747   return GNUNET_YES;
748 }
749
750
751 /**
752  * Creates a new outbound session the transport service will use to send data to the
753  * peer
754  *
755  * @param cls the plugin
756  * @param address the address
757  * @return the session or NULL of max connections exceeded
758  */
759 static struct Session *
760 udp_plugin_get_session (void *cls,
761                   const struct GNUNET_HELLO_Address *address)
762 {
763   struct Session * s = NULL;
764   struct Plugin * plugin = cls;
765
766   GNUNET_assert (plugin != NULL);
767   GNUNET_assert (address != NULL);
768
769
770   if ((address->address == NULL) ||
771       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
772       (address->address_length != sizeof (struct IPv6UdpAddress))))
773   {
774     GNUNET_break (0);
775     return NULL;
776   }
777
778   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
779       (plugin->sockv4 == NULL))
780     return NULL;
781
782   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
783       (plugin->sockv6 == NULL))
784     return NULL;
785
786
787   /* check if session already exists */
788   struct SessionCompareContext cctx;
789   cctx.addr = address;
790   cctx.res = NULL;
791 #if VERBOSE_UDP
792   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length));
793 #endif
794   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
795   if (cctx.res != NULL)
796   {
797 #if VERBOSE_UDP
798     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
799 #endif
800     return cctx.res;
801   }
802
803   /* otherwise create new */
804   s = create_session (plugin,
805       &address->peer,
806       address->address,
807       address->address_length,
808       NULL, NULL);
809 #if VERBOSE
810     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811               "Creating new session %p for peer `%s' address `%s'\n",
812               s,
813               GNUNET_i2s(&address->peer),
814               udp_address_to_string(NULL,address->address,address->address_length));
815 #endif
816   GNUNET_assert (GNUNET_OK ==
817                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
818                                                     &s->target.hashPubKey,
819                                                     s,
820                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
821
822   return s;
823 }
824
825 static void enqueue (struct Plugin *plugin, struct UDPMessageWrapper * udpw)
826 {
827
828   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
829     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
830   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
831     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
832 }
833
834 /**
835  * Function that is called with messages created by the fragmentation
836  * module.  In the case of the 'proc' callback of the
837  * GNUNET_FRAGMENT_context_create function, this function must
838  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
839  *
840  * @param cls closure, the 'struct FragmentationContext'
841  * @param msg the message that was created
842  */
843 static void
844 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
845 {
846   struct FragmentationContext *frag_ctx = cls;
847   struct Plugin *plugin = frag_ctx->plugin;
848   struct UDPMessageWrapper * udpw;
849   struct Session *s;
850
851   size_t msg_len = ntohs (msg->size);
852
853 #if VERBOSE_UDP
854   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
855 #endif
856
857   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
858   udpw->session = frag_ctx->session;
859   s = udpw->session;
860   udpw->udp = (char *) &udpw[1];
861
862   udpw->msg_size = msg_len;
863   udpw->cont = frag_ctx->cont;
864   udpw->cont_cls = frag_ctx->cont_cls;
865   udpw->timeout = frag_ctx->timeout;
866   udpw->frag_ctx = frag_ctx;
867   memcpy (udpw->udp, msg, msg_len);
868
869   enqueue (plugin, udpw);
870
871
872   if (s->addrlen == sizeof (struct sockaddr_in))
873   {
874     if (plugin->with_v4_ws == GNUNET_NO)
875     {
876       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
877         GNUNET_SCHEDULER_cancel(plugin->select_task);
878
879       plugin->select_task =
880           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
881                                        GNUNET_SCHEDULER_NO_TASK,
882                                        GNUNET_TIME_UNIT_FOREVER_REL,
883                                        plugin->rs_v4,
884                                        plugin->ws_v4,
885                                        &udp_plugin_select, plugin);
886       plugin->with_v4_ws = GNUNET_YES;
887     }
888   }
889
890   else if (s->addrlen == sizeof (struct sockaddr_in6))
891   {
892     if (plugin->with_v6_ws == GNUNET_NO)
893     {
894       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
895         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
896
897       plugin->select_task_v6 =
898           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
899                                        GNUNET_SCHEDULER_NO_TASK,
900                                        GNUNET_TIME_UNIT_FOREVER_REL,
901                                        plugin->rs_v6,
902                                        plugin->ws_v6,
903                                        &udp_plugin_select_v6, plugin);
904       plugin->with_v6_ws = GNUNET_YES;
905     }
906   }
907
908 }
909
910
911
912
913 /**
914  * Function that can be used by the transport service to transmit
915  * a message using the plugin.   Note that in the case of a
916  * peer disconnecting, the continuation MUST be called
917  * prior to the disconnect notification itself.  This function
918  * will be called with this peer's HELLO message to initiate
919  * a fresh connection to another peer.
920  *
921  * @param cls closure
922  * @param s which session must be used
923  * @param msgbuf the message to transmit
924  * @param msgbuf_size number of bytes in 'msgbuf'
925  * @param priority how important is the message (most plugins will
926  *                 ignore message priority and just FIFO)
927  * @param to how long to wait at most for the transmission (does not
928  *                require plugins to discard the message after the timeout,
929  *                just advisory for the desired delay; most plugins will ignore
930  *                this as well)
931  * @param cont continuation to call once the message has
932  *        been transmitted (or if the transport is ready
933  *        for the next transmission call; or if the
934  *        peer disconnected...); can be NULL
935  * @param cont_cls closure for cont
936  * @return number of bytes used (on the physical network, with overheads);
937  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
938  *         and does NOT mean that the message was not transmitted (DV)
939  */
940 static ssize_t
941 udp_plugin_send (void *cls,
942                   struct Session *s,
943                   const char *msgbuf, size_t msgbuf_size,
944                   unsigned int priority,
945                   struct GNUNET_TIME_Relative to,
946                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
947 {
948   struct Plugin *plugin = cls;
949   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
950
951   struct UDPMessageWrapper * udpw;
952   struct UDPMessage *udp;
953   char mbuf[mlen];
954   GNUNET_assert (plugin != NULL);
955   GNUNET_assert (s != NULL);
956
957   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
958     return GNUNET_SYSERR;
959
960    if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
961      return GNUNET_SYSERR;
962
963
964   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
965   {
966     GNUNET_break (0);
967     return GNUNET_SYSERR;
968   }
969
970   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
971   {
972     GNUNET_break (0);
973     return GNUNET_SYSERR;
974   }
975
976   LOG (GNUNET_ERROR_TYPE_DEBUG,
977        "UDP transmits %u-byte message to `%s' using address `%s'\n",
978          msgbuf_size,
979          GNUNET_i2s (&s->target),
980          GNUNET_a2s(s->sock_addr, s->addrlen));
981
982   /* Message */
983   udp = (struct UDPMessage *) mbuf;
984   udp->header.size = htons (mlen);
985   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
986   udp->reserved = htonl (0);
987   udp->sender = *plugin->env->my_identity;
988
989   if (mlen <= UDP_MTU)
990   {
991     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
992     udpw->session = s;
993     udpw->udp = (char *) &udpw[1];
994     udpw->msg_size = mlen;
995     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
996     udpw->cont = cont;
997     udpw->cont_cls = cont_cls;
998     udpw->frag_ctx = NULL;
999
1000     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
1001     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1002
1003     enqueue (plugin, udpw);
1004   }
1005   else
1006   {
1007     LOG (GNUNET_ERROR_TYPE_DEBUG,
1008          "UDP has to fragment message \n");
1009     if  (s->frag_ctx != NULL)
1010       return GNUNET_SYSERR;
1011     memcpy (&udp[1], msgbuf, msgbuf_size);
1012     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
1013
1014     frag_ctx->plugin = plugin;
1015     frag_ctx->session = s;
1016     frag_ctx->cont = cont;
1017     frag_ctx->cont_cls = cont_cls;
1018     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1019     frag_ctx->bytes_to_send = mlen;
1020     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1021               UDP_MTU,
1022               &plugin->tracker,
1023               s->last_expected_delay,
1024               &udp->header,
1025               &enqueue_fragment,
1026               frag_ctx);
1027
1028     s->frag_ctx = frag_ctx;
1029
1030   }
1031
1032   if (s->addrlen == sizeof (struct sockaddr_in))
1033   {
1034     if (plugin->with_v4_ws == GNUNET_NO)
1035     {
1036       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1037         GNUNET_SCHEDULER_cancel(plugin->select_task);
1038
1039       plugin->select_task =
1040           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1041                                        GNUNET_SCHEDULER_NO_TASK,
1042                                        GNUNET_TIME_UNIT_FOREVER_REL,
1043                                        plugin->rs_v4,
1044                                        plugin->ws_v4,
1045                                        &udp_plugin_select, plugin);
1046       plugin->with_v4_ws = GNUNET_YES;
1047     }
1048   }
1049
1050   else if (s->addrlen == sizeof (struct sockaddr_in6))
1051   {
1052     if (plugin->with_v6_ws == GNUNET_NO)
1053     {
1054       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1055         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1056
1057       plugin->select_task_v6 =
1058         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1059                                      GNUNET_SCHEDULER_NO_TASK,
1060                                      GNUNET_TIME_UNIT_FOREVER_REL,
1061                                      plugin->rs_v6,
1062                                      plugin->ws_v6,
1063                                      &udp_plugin_select_v6, plugin);
1064       plugin->with_v6_ws = GNUNET_YES;
1065     }
1066   }
1067
1068   return mlen;
1069 }
1070
1071
1072 /**
1073  * Our external IP address/port mapping has changed.
1074  *
1075  * @param cls closure, the 'struct LocalAddrList'
1076  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1077  *     the previous (now invalid) one
1078  * @param addr either the previous or the new public IP address
1079  * @param addrlen actual lenght of the address
1080  */
1081 static void
1082 udp_nat_port_map_callback (void *cls, int add_remove,
1083                            const struct sockaddr *addr, socklen_t addrlen)
1084 {
1085   struct Plugin *plugin = cls;
1086   struct IPv4UdpAddress u4;
1087   struct IPv6UdpAddress u6;
1088   void *arg;
1089   size_t args;
1090
1091   /* convert 'addr' to our internal format */
1092   switch (addr->sa_family)
1093   {
1094   case AF_INET:
1095     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1096     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1097     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1098     arg = &u4;
1099     args = sizeof (u4);
1100     break;
1101   case AF_INET6:
1102     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1103     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1104             sizeof (struct in6_addr));
1105     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1106     arg = &u6;
1107     args = sizeof (u6);
1108     break;
1109   default:
1110     GNUNET_break (0);
1111     return;
1112   }
1113   /* modify our published address list */
1114   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args);
1115 }
1116
1117
1118
1119 /**
1120  * Message tokenizer has broken up an incomming message. Pass it on
1121  * to the service.
1122  *
1123  * @param cls the 'struct Plugin'
1124  * @param client the 'struct SourceInformation'
1125  * @param hdr the actual message
1126  */
1127 static void
1128 process_inbound_tokenized_messages (void *cls, void *client,
1129                                     const struct GNUNET_MessageHeader *hdr)
1130 {
1131   struct Plugin *plugin = cls;
1132   struct SourceInformation *si = client;
1133   struct GNUNET_ATS_Information ats[2];
1134   struct GNUNET_TIME_Relative delay;
1135
1136   GNUNET_assert (si->session != NULL);
1137   /* setup ATS */
1138   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1139   ats[0].value = htonl (1);
1140   ats[1] = si->session->ats;
1141   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1142
1143   delay = plugin->env->receive (plugin->env->cls,
1144                 &si->sender,
1145                 hdr,
1146                 (const struct GNUNET_ATS_Information *) &ats, 2,
1147                 NULL,
1148                 si->arg,
1149                 si->args);
1150   si->session->flow_delay_for_other_peer = delay;
1151 }
1152
1153
1154 /**
1155  * We've received a UDP Message.  Process it (pass contents to main service).
1156  *
1157  * @param plugin plugin context
1158  * @param msg the message
1159  * @param sender_addr sender address
1160  * @param sender_addr_len number of bytes in sender_addr
1161  */
1162 static void
1163 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1164                      const struct sockaddr *sender_addr,
1165                      socklen_t sender_addr_len)
1166 {
1167   struct SourceInformation si;
1168   struct Session * s = NULL;
1169   struct IPv4UdpAddress u4;
1170   struct IPv6UdpAddress u6;
1171   const void *arg;
1172   size_t args;
1173
1174   if (0 != ntohl (msg->reserved))
1175   {
1176     GNUNET_break_op (0);
1177     return;
1178   }
1179   if (ntohs (msg->header.size) <
1180       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1181   {
1182     GNUNET_break_op (0);
1183     return;
1184   }
1185
1186   /* convert address */
1187   switch (sender_addr->sa_family)
1188   {
1189   case AF_INET:
1190     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1191     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1192     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1193     arg = &u4;
1194     args = sizeof (u4);
1195     break;
1196   case AF_INET6:
1197     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1198     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1199     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1200     arg = &u6;
1201     args = sizeof (u6);
1202     break;
1203   default:
1204     GNUNET_break (0);
1205     return;
1206   }
1207 #if DEBUG_UDP
1208   LOG (GNUNET_ERROR_TYPE_DEBUG,
1209        "Received message with %u bytes from peer `%s' at `%s'\n",
1210        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1211        GNUNET_a2s (sender_addr, sender_addr_len));
1212 #endif
1213
1214   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1215   s = udp_plugin_get_session(plugin, address);
1216   GNUNET_free (address);
1217
1218   /* iterate over all embedded messages */
1219   si.session = s;
1220   si.sender = msg->sender;
1221   si.arg = arg;
1222   si.args = args;
1223
1224   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1225                              ntohs (msg->header.size) -
1226                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1227 }
1228
1229
1230 /**
1231  * Scan the heap for a receive context with the given address.
1232  *
1233  * @param cls the 'struct FindReceiveContext'
1234  * @param node internal node of the heap
1235  * @param element value stored at the node (a 'struct ReceiveContext')
1236  * @param cost cost associated with the node
1237  * @return GNUNET_YES if we should continue to iterate,
1238  *         GNUNET_NO if not.
1239  */
1240 static int
1241 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1242                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1243 {
1244   struct FindReceiveContext *frc = cls;
1245   struct DefragContext *e = element;
1246
1247   if ((frc->addr_len == e->addr_len) &&
1248       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1249   {
1250     frc->rc = e;
1251     return GNUNET_NO;
1252   }
1253   return GNUNET_YES;
1254 }
1255
1256
1257 /**
1258  * Process a defragmented message.
1259  *
1260  * @param cls the 'struct ReceiveContext'
1261  * @param msg the message
1262  */
1263 static void
1264 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1265 {
1266   struct DefragContext *rc = cls;
1267
1268   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1269   {
1270     GNUNET_break (0);
1271     return;
1272   }
1273   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1274   {
1275     GNUNET_break (0);
1276     return;
1277   }
1278   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1279                        rc->src_addr, rc->addr_len);
1280 }
1281
1282 struct LookupContext
1283 {
1284   const struct sockaddr * addr;
1285   size_t addrlen;
1286
1287   struct Session *res;
1288 };
1289
1290 static int
1291 lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value)
1292 {
1293   struct LookupContext *l_ctx = cls;
1294   struct Session * s = value;
1295
1296   if ((s->addrlen == l_ctx->addrlen) &&
1297       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1298   {
1299     l_ctx->res = s;
1300     return GNUNET_NO;
1301   }
1302   return GNUNET_YES;
1303 }
1304
1305 /**
1306  * Transmit an acknowledgement.
1307  *
1308  * @param cls the 'struct ReceiveContext'
1309  * @param id message ID (unused)
1310  * @param msg ack to transmit
1311  */
1312 static void
1313 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1314 {
1315   struct DefragContext *rc = cls;
1316
1317   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1318   struct UDP_ACK_Message *udp_ack;
1319   uint32_t delay = 0;
1320   struct UDPMessageWrapper *udpw;
1321   struct Session *s;
1322
1323   struct LookupContext l_ctx;
1324   l_ctx.addr = rc->src_addr;
1325   l_ctx.addrlen = rc->addr_len;
1326   l_ctx.res = NULL;
1327   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1328       &lookup_session_by_addr_it,
1329       &l_ctx);
1330   s = l_ctx.res;
1331
1332   GNUNET_assert (s != NULL);
1333
1334   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1335     delay = s->flow_delay_for_other_peer.rel_value;
1336
1337 #if DEBUG_UDP
1338   LOG (GNUNET_ERROR_TYPE_DEBUG,
1339        "Sending ACK to `%s' including delay of %u ms\n",
1340        GNUNET_a2s (rc->src_addr,
1341                    (rc->src_addr->sa_family ==
1342                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1343                                                                      sockaddr_in6)),
1344        delay);
1345 #endif
1346   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
1347   udpw->cont = NULL;
1348   udpw->cont_cls = NULL;
1349   udpw->frag_ctx = NULL;
1350   udpw->msg_size = msize;
1351   udpw->session = s;
1352   udpw->timeout = GNUNET_TIME_absolute_get_forever();
1353   udpw->udp = (char *)&udpw[1];
1354
1355   udp_ack = (struct UDP_ACK_Message *) udpw->udp;
1356   udp_ack->header.size = htons ((uint16_t) msize);
1357   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1358   udp_ack->delay = htonl (delay);
1359   udp_ack->sender = *rc->plugin->env->my_identity;
1360   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1361
1362   enqueue (rc->plugin, udpw);
1363 }
1364
1365
1366 static void read_process_msg (struct Plugin *plugin,
1367     const struct GNUNET_MessageHeader *msg,
1368     char *addr,
1369     socklen_t fromlen)
1370 {
1371   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1372   {
1373     GNUNET_break_op (0);
1374     return;
1375   }
1376   process_udp_message (plugin, (const struct UDPMessage *) msg,
1377                        (const struct sockaddr *) addr, fromlen);
1378   return;
1379 }
1380
1381 static void read_process_ack (struct Plugin *plugin,
1382     const struct GNUNET_MessageHeader *msg,
1383     char *addr,
1384     socklen_t fromlen)
1385 {
1386   const struct GNUNET_MessageHeader *ack;
1387   const struct UDP_ACK_Message *udp_ack;
1388   struct LookupContext l_ctx;
1389   struct Session *s = NULL;
1390   struct GNUNET_TIME_Relative flow_delay;
1391
1392   if (ntohs (msg->size) <
1393       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1394   {
1395     GNUNET_break_op (0);
1396     return;
1397   }
1398
1399   udp_ack = (const struct UDP_ACK_Message *) msg;
1400
1401   l_ctx.addr = (const struct sockaddr *) addr;
1402   l_ctx.addrlen = fromlen;
1403   l_ctx.res = NULL;
1404   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1405       &lookup_session_by_addr_it,
1406       &l_ctx);
1407   s = l_ctx.res;
1408
1409   if ((s == NULL) || (s->frag_ctx == NULL))
1410     return;
1411
1412   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1413   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1414        flow_delay.rel_value);
1415   s->flow_delay_from_other_peer =
1416       GNUNET_TIME_relative_to_absolute (flow_delay);
1417
1418   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1419   if (ntohs (ack->size) !=
1420       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1421   {
1422     GNUNET_break_op (0);
1423     return;
1424   }
1425
1426   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1427   {
1428 #if DEBUG_UDP
1429   LOG (GNUNET_ERROR_TYPE_DEBUG,
1430        "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1431        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1432        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1433 #endif
1434     return;
1435   }
1436
1437 #if DEBUG_UDP
1438   LOG (GNUNET_ERROR_TYPE_DEBUG,
1439        "FULL MESSAGE ACKed\n",
1440        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1441        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1442 #endif
1443   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1444
1445   struct UDPMessageWrapper * udpw = NULL;
1446   if (s->addrlen == sizeof (struct sockaddr_in6))
1447   {
1448     udpw = plugin->ipv6_queue_head;
1449     while (udpw!= NULL)
1450     {
1451       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1452       {
1453         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1454         GNUNET_free (udpw);
1455       }
1456       udpw = udpw->next;
1457     }
1458   }
1459   if (s->addrlen == sizeof (struct sockaddr_in))
1460   {
1461     udpw = plugin->ipv4_queue_head;
1462     while (udpw!= NULL)
1463     {
1464       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1465       {
1466         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1467         GNUNET_free (udpw);
1468       }
1469       udpw = udpw->next;
1470     }
1471   }
1472
1473   if (s->frag_ctx->cont != NULL)
1474     s->frag_ctx->cont
1475     (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
1476   GNUNET_free (s->frag_ctx);
1477   s->frag_ctx = NULL;
1478   return;
1479 }
1480
1481 static void read_process_fragment (struct Plugin *plugin,
1482     const struct GNUNET_MessageHeader *msg,
1483     char *addr,
1484     socklen_t fromlen)
1485 {
1486   struct DefragContext *d_ctx;
1487   struct GNUNET_TIME_Absolute now;
1488   struct FindReceiveContext frc;
1489
1490
1491   frc.rc = NULL;
1492   frc.addr = (const struct sockaddr *) addr;
1493   frc.addr_len = fromlen;
1494
1495 #if DEBUG_UDP
1496   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1497        (unsigned int) ntohs (msg->size),
1498        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1499 #endif
1500
1501   /* Lookup existing receive context for this address */
1502   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1503                                  &find_receive_context,
1504                                  &frc);
1505   now = GNUNET_TIME_absolute_get ();
1506   d_ctx = frc.rc;
1507
1508   if (d_ctx == NULL)
1509   {
1510     /* Create a new defragmentation context */
1511     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1512     memcpy (&d_ctx[1], addr, fromlen);
1513     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1514     d_ctx->addr_len = fromlen;
1515     d_ctx->plugin = plugin;
1516     d_ctx->defrag =
1517         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1518                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1519                                           &fragment_msg_proc, &ack_proc);
1520     d_ctx->hnode =
1521         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1522                                       (GNUNET_CONTAINER_HeapCostType)
1523                                       now.abs_value);
1524 #if DEBUG_UDP
1525   LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n",
1526        (unsigned int) ntohs (msg->size),
1527        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1528 #endif
1529   }
1530   else
1531   {
1532 #if DEBUG_UDP
1533   LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1534        (unsigned int) ntohs (msg->size),
1535        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1536 #endif
1537   }
1538
1539   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1540   {
1541     /* keep this 'rc' from expiring */
1542     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1543                                        (GNUNET_CONTAINER_HeapCostType)
1544                                        now.abs_value);
1545   }
1546   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1547       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1548   {
1549     /* remove 'rc' that was inactive the longest */
1550     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1551     GNUNET_assert (NULL != d_ctx);
1552     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1553     GNUNET_free (d_ctx);
1554   }
1555 }
1556
1557 /**
1558  * Read and process a message from the given socket.
1559  *
1560  * @param plugin the overall plugin
1561  * @param rsock socket to read from
1562  */
1563 static void
1564 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1565 {
1566   socklen_t fromlen;
1567   char addr[32];
1568   char buf[65536];
1569   ssize_t size;
1570   const struct GNUNET_MessageHeader *msg;
1571
1572   fromlen = sizeof (addr);
1573   memset (&addr, 0, sizeof (addr));
1574   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1575                                       (struct sockaddr *) &addr, &fromlen);
1576
1577   if (size < sizeof (struct GNUNET_MessageHeader))
1578   {
1579     GNUNET_break_op (0);
1580     return;
1581   }
1582   msg = (const struct GNUNET_MessageHeader *) buf;
1583
1584   LOG (GNUNET_ERROR_TYPE_DEBUG,
1585        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1586        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1587
1588   if (size != ntohs (msg->size))
1589   {
1590     GNUNET_break_op (0);
1591     return;
1592   }
1593
1594   switch (ntohs (msg->type))
1595   {
1596   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1597     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1598     return;
1599
1600   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1601     read_process_msg (plugin, msg, addr, fromlen);
1602     return;
1603
1604   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1605     read_process_ack (plugin, msg, addr, fromlen);;
1606     return;
1607
1608   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1609     read_process_fragment (plugin, msg, addr, fromlen);
1610     return;
1611
1612   default:
1613     GNUNET_break_op (0);
1614     return;
1615   }
1616 }
1617
1618 size_t
1619 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
1620 {
1621   ssize_t sent;
1622   size_t slen;
1623   struct GNUNET_TIME_Absolute max;
1624   struct GNUNET_TIME_Absolute ;
1625
1626   struct UDPMessageWrapper *udpw = NULL;
1627
1628   if (sock == plugin->sockv4)
1629   {
1630     udpw = plugin->ipv4_queue_head;
1631   }
1632   else if (sock == plugin->sockv6)
1633   {
1634     udpw = plugin->ipv6_queue_head;
1635   }
1636   else
1637     GNUNET_break (0);
1638
1639   const struct sockaddr * sa = udpw->session->sock_addr;
1640   slen = udpw->session->addrlen;
1641
1642   max = GNUNET_TIME_absolute_max(udpw->timeout, GNUNET_TIME_absolute_get());
1643
1644   while (udpw != NULL)
1645   {
1646     if (max.abs_value != udpw->timeout.abs_value)
1647     {
1648       /* Message timed out */
1649
1650       if (udpw->cont != NULL)
1651         udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
1652       if (udpw->frag_ctx != NULL)
1653       {
1654 #if DEBUG_UDP
1655         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n",
1656             GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
1657 #endif
1658         udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
1659         GNUNET_free (udpw->frag_ctx);
1660         udpw->session->frag_ctx = NULL;
1661       }
1662       else
1663       {
1664 #if DEBUG_UDP
1665         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' with size %u timed out\n",
1666             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1667 #endif
1668       }
1669
1670       if (sock == plugin->sockv4)
1671       {
1672         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1673         GNUNET_free (udpw);
1674         udpw = plugin->ipv4_queue_head;
1675       }
1676       else if (sock == plugin->sockv6)
1677       {
1678         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1679         GNUNET_free (udpw);
1680         udpw = plugin->ipv6_queue_head;
1681       }
1682     }
1683     else
1684     {
1685       struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
1686       if (delta.rel_value == 0)
1687       {
1688         /* this message is not delayed */
1689         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is not delayed \n",
1690             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1691         break;
1692       }
1693       else
1694       {
1695         /* this message is delayed, try next */
1696         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is delayed for %llu \n",
1697             GNUNET_i2s(&udpw->session->target), udpw->msg_size,
1698             delta);
1699         udpw = udpw->next;
1700       }
1701     }
1702
1703   }
1704
1705   if (udpw == NULL)
1706   {
1707     /* No message left */
1708     return 0;
1709   }
1710
1711   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->udp, udpw->msg_size, sa, slen);
1712
1713   if (GNUNET_SYSERR == sent)
1714   {
1715     GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "sendto");
1716     LOG (GNUNET_ERROR_TYPE_DEBUG,
1717          "UDP transmitted %u-byte message to %s (%d: %s)\n",
1718          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1719          (sent < 0) ? STRERROR (errno) : "ok");
1720     if (udpw->cont != NULL)
1721       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
1722   }
1723   LOG (GNUNET_ERROR_TYPE_DEBUG,
1724        "UDP transmitted %u-byte message to %s (%d: %s)\n",
1725        (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1726        (sent < 0) ? STRERROR (errno) : "ok");
1727
1728   /* This was just a message fragment */
1729   if (udpw->frag_ctx != NULL)
1730   {
1731     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1732   }
1733   /* This was a complete message*/
1734   else
1735   {
1736     if (udpw->cont != NULL)
1737       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK);
1738   }
1739
1740   if (sock == plugin->sockv4)
1741     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1742   else if (sock == plugin->sockv6)
1743     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1744   GNUNET_free (udpw);
1745   udpw = NULL;
1746
1747   return sent;
1748 }
1749
1750 /**
1751  * We have been notified that our readset has something to read.  We don't
1752  * know which socket needs to be read, so we have to check each one
1753  * Then reschedule this function to be called again once more is available.
1754  *
1755  * @param cls the plugin handle
1756  * @param tc the scheduling context (for rescheduling this function again)
1757  */
1758 static void
1759 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1760 {
1761   struct Plugin *plugin = cls;
1762
1763   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
1764   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1765     return;
1766   plugin->with_v4_ws = GNUNET_NO;
1767
1768   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1769   {
1770     if ((NULL != plugin->sockv4) &&
1771       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
1772         udp_select_read (plugin, plugin->sockv4);
1773
1774   }
1775
1776   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1777   {
1778     if ((NULL != plugin->sockv4) && (plugin->ipv4_queue_head != NULL) &&
1779       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
1780       {
1781         udp_select_send (plugin, plugin->sockv4);
1782       }
1783   }
1784
1785   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1786     GNUNET_SCHEDULER_cancel (plugin->select_task);
1787   plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1788                                    GNUNET_SCHEDULER_NO_TASK,
1789                                    GNUNET_TIME_UNIT_FOREVER_REL,
1790                                    plugin->rs_v4,
1791                                    (plugin->ipv4_queue_head != NULL) ? plugin->ws_v4 : NULL,
1792                                    &udp_plugin_select, plugin);
1793   if (plugin->ipv4_queue_head != NULL)
1794     plugin->with_v4_ws = GNUNET_YES;
1795   else
1796     plugin->with_v4_ws = GNUNET_NO;
1797 }
1798
1799
1800 /**
1801  * We have been notified that our readset has something to read.  We don't
1802  * know which socket needs to be read, so we have to check each one
1803  * Then reschedule this function to be called again once more is available.
1804  *
1805  * @param cls the plugin handle
1806  * @param tc the scheduling context (for rescheduling this function again)
1807  */
1808 static void
1809 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1810 {
1811   struct Plugin *plugin = cls;
1812
1813   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
1814   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1815     return;
1816
1817   plugin->with_v6_ws = GNUNET_NO;
1818   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1819   {
1820     if ((NULL != plugin->sockv6) &&
1821       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
1822         udp_select_read (plugin, plugin->sockv6);
1823   }
1824
1825   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1826   {
1827     if ((NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
1828       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)))
1829       {
1830         udp_select_send (plugin, plugin->sockv6);
1831       }
1832   }
1833   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1834     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
1835   plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1836                                    GNUNET_SCHEDULER_NO_TASK,
1837                                    GNUNET_TIME_UNIT_FOREVER_REL,
1838                                    plugin->rs_v6,
1839                                    (plugin->ipv6_queue_head != NULL) ? plugin->ws_v6 : NULL,
1840                                    &udp_plugin_select_v6, plugin);
1841   if (plugin->ipv6_queue_head != NULL)
1842     plugin->with_v6_ws = GNUNET_YES;
1843   else
1844     plugin->with_v6_ws = GNUNET_NO;
1845 }
1846
1847
1848 static int
1849 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
1850 {
1851   int tries;
1852   int sockets_created = 0;
1853   struct sockaddr *serverAddr;
1854   struct sockaddr *addrs[2];
1855   socklen_t addrlens[2];
1856   socklen_t addrlen;
1857
1858   /* Create IPv6 socket */
1859   if (plugin->enable_ipv6 == GNUNET_YES)
1860   {
1861     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
1862     if (NULL == plugin->sockv6)
1863     {
1864       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
1865       plugin->enable_ipv6 = GNUNET_NO;
1866     }
1867     else
1868     {
1869 #if HAVE_SOCKADDR_IN_SIN_LEN
1870       serverAddrv6->sin6_len = sizeof (serverAddrv6);
1871 #endif
1872       serverAddrv6->sin6_family = AF_INET6;
1873       serverAddrv6->sin6_addr = in6addr_any;
1874       serverAddrv6->sin6_port = htons (plugin->port);
1875       addrlen = sizeof (struct sockaddr_in6);
1876       serverAddr = (struct sockaddr *) serverAddrv6;
1877 #if DEBUG_UDP
1878       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
1879            ntohs (serverAddrv6->sin6_port));
1880 #endif
1881       tries = 0;
1882       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
1883              GNUNET_OK)
1884       {
1885         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
1886 #if DEBUG_UDP
1887         LOG (GNUNET_ERROR_TYPE_DEBUG,
1888              "IPv6 Binding failed, trying new port %d\n",
1889              ntohs (serverAddrv6->sin6_port));
1890 #endif
1891         tries++;
1892         if (tries > 10)
1893         {
1894           GNUNET_NETWORK_socket_close (plugin->sockv6);
1895           plugin->sockv6 = NULL;
1896           break;
1897         }
1898       }
1899       if (plugin->sockv6 != NULL)
1900       {
1901 #if DEBUG_UDP
1902         LOG (GNUNET_ERROR_TYPE_DEBUG,
1903              "IPv6 socket created on port %d\n",
1904              ntohs (serverAddrv6->sin6_port));
1905 #endif
1906         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
1907         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
1908         sockets_created++;
1909       }
1910     }
1911   }
1912
1913   /* Create IPv4 socket */
1914   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
1915   if (NULL == plugin->sockv4)
1916   {
1917     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
1918   }
1919   else
1920   {
1921 #if HAVE_SOCKADDR_IN_SIN_LEN
1922     serverAddrv4->sin_len = sizeof (serverAddrv4);
1923 #endif
1924     serverAddrv4->sin_family = AF_INET;
1925     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
1926     serverAddrv4->sin_port = htons (plugin->port);
1927     addrlen = sizeof (struct sockaddr_in);
1928     serverAddr = (struct sockaddr *) serverAddrv4;
1929
1930 #if DEBUG_UDP
1931     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
1932          ntohs (serverAddrv4->sin_port));
1933 #endif
1934     tries = 0;
1935     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
1936            GNUNET_OK)
1937     {
1938       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
1939 #if DEBUG_UDP
1940       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
1941            ntohs (serverAddrv4->sin_port));
1942 #endif
1943       tries++;
1944       if (tries > 10)
1945       {
1946         GNUNET_NETWORK_socket_close (plugin->sockv4);
1947         plugin->sockv4 = NULL;
1948         break;
1949       }
1950     }
1951     if (plugin->sockv4 != NULL)
1952     {
1953       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
1954       addrlens[sockets_created] = sizeof (struct sockaddr_in);
1955       sockets_created++;
1956     }
1957   }
1958
1959   /* Create file descriptors */
1960   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
1961   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
1962   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
1963   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
1964   if (NULL != plugin->sockv4)
1965   {
1966     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
1967     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
1968   }
1969
1970   if (sockets_created == 0)
1971     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
1972
1973   plugin->select_task =
1974       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1975                                    GNUNET_SCHEDULER_NO_TASK,
1976                                    GNUNET_TIME_UNIT_FOREVER_REL,
1977                                    plugin->rs_v4,
1978                                    NULL,
1979                                    &udp_plugin_select, plugin);
1980   plugin->with_v4_ws = GNUNET_NO;
1981
1982   if (plugin->enable_ipv6 == GNUNET_YES)
1983   {
1984     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
1985     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
1986     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
1987     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
1988     if (NULL != plugin->sockv6)
1989     {
1990       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
1991       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
1992     }
1993
1994     plugin->select_task_v6 =
1995         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1996                                      GNUNET_SCHEDULER_NO_TASK,
1997                                      GNUNET_TIME_UNIT_FOREVER_REL,
1998                                      plugin->rs_v6,
1999                                      NULL,
2000                                      &udp_plugin_select_v6, plugin);
2001     plugin->with_v6_ws = GNUNET_NO;
2002   }
2003
2004   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2005                            GNUNET_NO, plugin->port,
2006                            sockets_created,
2007                            (const struct sockaddr **) addrs, addrlens,
2008                            &udp_nat_port_map_callback, NULL, plugin);
2009
2010   return sockets_created;
2011 }
2012
2013
2014 /**
2015  * The exported method. Makes the core api available via a global and
2016  * returns the udp transport API.
2017  *
2018  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2019  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2020  */
2021 void *
2022 libgnunet_plugin_transport_udp_init (void *cls)
2023 {
2024   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2025   struct GNUNET_TRANSPORT_PluginFunctions *api;
2026   struct Plugin *plugin;
2027
2028   unsigned long long port;
2029   unsigned long long aport;
2030   unsigned long long broadcast;
2031   unsigned long long udp_max_bps;
2032   unsigned long long enable_v6;
2033   char * bind4_address;
2034   char * bind6_address;
2035   struct GNUNET_TIME_Relative interval;
2036
2037   struct sockaddr_in serverAddrv4;
2038   struct sockaddr_in6 serverAddrv6;
2039
2040   int res;
2041
2042   /* Get port number */
2043   if (GNUNET_OK !=
2044       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2045                                              &port))
2046     port = 2086;
2047   if (GNUNET_OK !=
2048       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2049                                              "ADVERTISED_PORT", &aport))
2050     aport = port;
2051   if (port > 65535)
2052   {
2053     LOG (GNUNET_ERROR_TYPE_WARNING,
2054          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2055          65535);
2056     return NULL;
2057   }
2058
2059   /* Protocols */
2060   if ((GNUNET_YES ==
2061        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2062                                              "DISABLEV6")))
2063   {
2064     enable_v6 = GNUNET_NO;
2065   }
2066   else
2067     enable_v6 = GNUNET_YES;
2068
2069
2070   /* Addresses */
2071   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2072   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2073
2074   if (GNUNET_YES ==
2075       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2076                                              "BINDTO", &bind4_address))
2077   {
2078     LOG (GNUNET_ERROR_TYPE_DEBUG,
2079          "Binding udp plugin to specific address: `%s'\n",
2080          bind4_address);
2081     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2082     {
2083       GNUNET_free (bind4_address);
2084       return NULL;
2085     }
2086   }
2087
2088   if (GNUNET_YES ==
2089       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2090                                              "BINDTO6", &bind6_address))
2091   {
2092     LOG (GNUNET_ERROR_TYPE_DEBUG,
2093          "Binding udp plugin to specific address: `%s'\n",
2094          bind6_address);
2095     if (1 !=
2096         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2097     {
2098       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2099            bind6_address);
2100       GNUNET_free_non_null (bind4_address);
2101       GNUNET_free (bind6_address);
2102       return NULL;
2103     }
2104   }
2105
2106
2107   /* Enable neighbour discovery */
2108   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2109                                             "BROADCAST");
2110   if (broadcast == GNUNET_SYSERR)
2111     broadcast = GNUNET_NO;
2112
2113   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-udp",
2114                                            "BROADCAST_INTERVAL", &interval))
2115   {
2116     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2117   }
2118
2119   /* Maximum datarate */
2120   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2121                                              "MAX_BPS", &udp_max_bps))
2122   {
2123     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2124   }
2125
2126   plugin = GNUNET_malloc (sizeof (struct Plugin));
2127   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2128
2129   GNUNET_BANDWIDTH_tracker_init (&plugin->tracker,
2130                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2131
2132
2133   plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
2134   plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2135   plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
2136   plugin->port = port;
2137   plugin->aport = aport;
2138   plugin->broadcast_interval = interval;
2139   plugin->enable_ipv6 = enable_v6;
2140   plugin->env = env;
2141
2142   api->cls = plugin;
2143   api->send = NULL;
2144   api->disconnect = &udp_disconnect;
2145   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2146   api->address_to_string = &udp_address_to_string;
2147   api->check_address = &udp_plugin_check_address;
2148   api->get_session = &udp_plugin_get_session;
2149   api->send = &udp_plugin_send;
2150
2151   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2152   res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
2153   if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
2154   {
2155     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2156     GNUNET_free (plugin);
2157     GNUNET_free (api);
2158     return NULL;
2159   }
2160
2161   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2162   if (broadcast == GNUNET_YES)
2163     setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
2164
2165
2166   GNUNET_free_non_null (bind4_address);
2167   GNUNET_free_non_null (bind6_address);
2168   return api;
2169 }
2170
2171 int heap_cleanup_iterator (void *cls,
2172                           struct GNUNET_CONTAINER_HeapNode *
2173                           node, void *element,
2174                           GNUNET_CONTAINER_HeapCostType
2175                           cost)
2176 {
2177   struct DefragContext * d_ctx = element;
2178
2179   GNUNET_CONTAINER_heap_remove_node (node);
2180   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2181   GNUNET_free (d_ctx);
2182
2183   return GNUNET_YES;
2184 }
2185
2186
2187 /**
2188  * The exported method. Makes the core api available via a global and
2189  * returns the udp transport API.
2190  *
2191  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2192  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2193  */
2194 void *
2195 libgnunet_plugin_transport_udp_done (void *cls)
2196 {
2197   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2198   struct Plugin *plugin = api->cls;
2199   stop_broadcast (plugin);
2200
2201   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2202   {
2203     GNUNET_SCHEDULER_cancel (plugin->select_task);
2204     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2205   }
2206   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2207   {
2208     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2209     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2210   }
2211
2212   /* Closing sockets */
2213   if (plugin->sockv4 != NULL)
2214   {
2215     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2216     plugin->sockv4 = NULL;
2217   }
2218   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2219   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2220
2221   if (plugin->sockv6 != NULL)
2222   {
2223     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2224     plugin->sockv6 = NULL;
2225
2226     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2227     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2228   }
2229
2230   GNUNET_NAT_unregister (plugin->nat);
2231
2232   if (plugin->defrag_ctxs != NULL)
2233   {
2234     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2235         heap_cleanup_iterator, NULL);
2236     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2237     plugin->defrag_ctxs = NULL;
2238   }
2239   if (plugin->mst != NULL)
2240   {
2241     GNUNET_SERVER_mst_destroy(plugin->mst);
2242     plugin->mst = NULL;
2243   }
2244
2245   /* Clean up leftover messages */
2246   struct UDPMessageWrapper * udpw;
2247   udpw = plugin->ipv4_queue_head;
2248   while (udpw != NULL)
2249   {
2250     struct UDPMessageWrapper *tmp = udpw->next;
2251     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
2252     if (udpw->cont != NULL)
2253       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
2254     GNUNET_free (udpw);
2255     udpw = tmp;
2256   }
2257   udpw = plugin->ipv6_queue_head;
2258   while (udpw != NULL)
2259   {
2260     struct UDPMessageWrapper *tmp = udpw->next;
2261     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
2262     if (udpw->cont != NULL)
2263       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
2264     GNUNET_free (udpw);
2265     udpw = tmp;
2266   }
2267
2268   /* Clean up sessions */
2269 #if DEBUG_UDP
2270   LOG (GNUNET_ERROR_TYPE_DEBUG,
2271        "Cleaning up sessions\n");
2272 #endif
2273   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2274   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2275
2276   plugin->nat = NULL;
2277   GNUNET_free (plugin);
2278   GNUNET_free (api);
2279   return NULL;
2280 }
2281
2282
2283 /* end of plugin_transport_udp.c */