- remove msg
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 struct Session
90 {
91   /**
92    * Which peer is this session for?
93    */
94   struct GNUNET_PeerIdentity target;
95
96   /**
97    * Address of the other peer
98    */
99   const struct sockaddr *sock_addr;
100
101   size_t addrlen;
102
103   /**
104    * Desired delay for next sending we send to other peer
105    */
106   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
107
108   /**
109    * Desired delay for next sending we received from other peer
110    */
111   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
112
113   /**
114    * expected delay for ACKs
115    */
116   struct GNUNET_TIME_Relative last_expected_delay;
117
118   struct GNUNET_ATS_Information ats;
119
120   struct FragmentationContext * frag_ctx;
121
122   unsigned int rc;
123
124   int in_destroy;
125 };
126
127
128 struct SessionCompareContext
129 {
130   struct Session *res;
131   const struct GNUNET_HELLO_Address *addr;
132 };
133
134
135 /**
136  * Closure for 'process_inbound_tokenized_messages'
137  */
138 struct SourceInformation
139 {
140   /**
141    * Sender identity.
142    */
143   struct GNUNET_PeerIdentity sender;
144
145   /**
146    * Source address.
147    */
148   const void *arg;
149
150   /**
151    * Number of bytes in source address.
152    */
153   size_t args;
154
155   struct Session *session;
156 };
157
158
159 /**
160  * Closure for 'find_receive_context'.
161  */
162 struct FindReceiveContext
163 {
164   /**
165    * Where to store the result.
166    */
167   struct DefragContext *rc;
168
169   /**
170    * Address to find.
171    */
172   const struct sockaddr *addr;
173
174   /**
175    * Number of bytes in 'addr'.
176    */
177   socklen_t addr_len;
178
179   struct Session *session;
180 };
181
182
183
184 /**
185  * Data structure to track defragmentation contexts based
186  * on the source of the UDP traffic.
187  */
188 struct DefragContext
189 {
190
191   /**
192    * Defragmentation context.
193    */
194   struct GNUNET_DEFRAGMENT_Context *defrag;
195
196   /**
197    * Source address this receive context is for (allocated at the
198    * end of the struct).
199    */
200   const struct sockaddr *src_addr;
201
202   /**
203    * Reference to master plugin struct.
204    */
205   struct Plugin *plugin;
206
207   /**
208    * Node in the defrag heap.
209    */
210   struct GNUNET_CONTAINER_HeapNode *hnode;
211
212   /**
213    * Length of 'src_addr'
214    */
215   size_t addr_len;
216 };
217
218
219
220 /**
221  * Closure for 'process_inbound_tokenized_messages'
222  */
223 struct FragmentationContext
224 {
225   struct FragmentationContext * next;
226   struct FragmentationContext * prev;
227
228   struct Plugin * plugin;
229   struct GNUNET_FRAGMENT_Context * frag;
230   struct Session * session;
231
232   struct GNUNET_TIME_Absolute timeout;
233
234
235   /**
236    * Function to call upon completion of the transmission.
237    */
238   GNUNET_TRANSPORT_TransmitContinuation cont;
239
240   /**
241    * Closure for 'cont'.
242    */
243   void *cont_cls;
244
245   size_t bytes_to_send;
246 };
247
248
249 struct UDPMessageWrapper
250 {
251   struct Session *session;
252   struct UDPMessageWrapper *prev;
253   struct UDPMessageWrapper *next;
254   char *udp;
255   size_t msg_size;
256
257   struct GNUNET_TIME_Absolute timeout;
258
259   /**
260    * Function to call upon completion of the transmission.
261    */
262   GNUNET_TRANSPORT_TransmitContinuation cont;
263
264   /**
265    * Closure for 'cont'.
266    */
267   void *cont_cls;
268
269   struct FragmentationContext *frag_ctx;
270
271 };
272
273
274 /**
275  * UDP ACK Message-Packet header (after defragmentation).
276  */
277 struct UDP_ACK_Message
278 {
279   /**
280    * Message header.
281    */
282   struct GNUNET_MessageHeader header;
283
284   /**
285    * Desired delay for flow control
286    */
287   uint32_t delay;
288
289   /**
290    * What is the identity of the sender
291    */
292   struct GNUNET_PeerIdentity sender;
293
294 };
295
296
297 /**
298  * We have been notified that our readset has something to read.  We don't
299  * know which socket needs to be read, so we have to check each one
300  * Then reschedule this function to be called again once more is available.
301  *
302  * @param cls the plugin handle
303  * @param tc the scheduling context (for rescheduling this function again)
304  */
305 static void
306 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
307
308 /**
309  * We have been notified that our readset has something to read.  We don't
310  * know which socket needs to be read, so we have to check each one
311  * Then reschedule this function to be called again once more is available.
312  *
313  * @param cls the plugin handle
314  * @param tc the scheduling context (for rescheduling this function again)
315  */
316 static void
317 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
318
319 /**
320  * Function called for a quick conversion of the binary address to
321  * a numeric address.  Note that the caller must not free the
322  * address and that the next call to this function is allowed
323  * to override the address again.
324  *
325  * @param cls closure
326  * @param addr binary address
327  * @param addrlen length of the address
328  * @return string representing the same address
329  */
330 const char *
331 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
332 {
333   static char rbuf[INET6_ADDRSTRLEN + 10];
334   char buf[INET6_ADDRSTRLEN];
335   const void *sb;
336   struct in_addr a4;
337   struct in6_addr a6;
338   const struct IPv4UdpAddress *t4;
339   const struct IPv6UdpAddress *t6;
340   int af;
341   uint16_t port;
342
343   if (addrlen == sizeof (struct IPv6UdpAddress))
344   {
345     t6 = addr;
346     af = AF_INET6;
347     port = ntohs (t6->u6_port);
348     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
349     sb = &a6;
350   }
351   else if (addrlen == sizeof (struct IPv4UdpAddress))
352   {
353     t4 = addr;
354     af = AF_INET;
355     port = ntohs (t4->u4_port);
356     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
357     sb = &a4;
358   }
359   else
360   {
361     GNUNET_break_op (0);
362     return NULL;
363   }
364   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
365   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
366                    buf, port);
367   return rbuf;
368 }
369
370
371 /**
372  * Function called to convert a string address to
373  * a binary address.
374  *
375  * @param cls closure ('struct Plugin*')
376  * @param addr string address
377  * @param addrlen length of the address
378  * @param buf location to store the buffer
379  * @param added location to store the number of bytes in the buffer.
380  *        If the function returns GNUNET_SYSERR, its contents are undefined.
381  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
382  */
383 int
384 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
385     void **buf, size_t *added)
386 {
387   struct sockaddr_storage socket_address;
388
389   if ((NULL == addr) || (addrlen == 0))
390   {
391     GNUNET_break (0);
392     return GNUNET_SYSERR;
393   }
394
395   if ('\0' != addr[addrlen - 1])
396   {
397     return GNUNET_SYSERR;
398   }
399
400   if (strlen (addr) != addrlen - 1)
401   {
402     return GNUNET_SYSERR;
403   }
404
405   int ret = GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
406     &socket_address);
407
408   if (ret != GNUNET_OK)
409   {
410     return GNUNET_SYSERR;
411   }
412
413   if (socket_address.ss_family == AF_INET)
414   {
415     struct IPv4UdpAddress *u4;
416     struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
417     u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
418     u4->ipv4_addr = in4->sin_addr.s_addr;
419     u4->u4_port = in4->sin_port;
420     *buf = u4;
421     *added = sizeof (struct IPv4UdpAddress);
422     return GNUNET_OK;
423   }
424   else if (socket_address.ss_family == AF_INET6)
425   {
426     struct IPv6UdpAddress *u6;
427     struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
428     u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
429     u6->ipv6_addr = in6->sin6_addr;
430     u6->u6_port = in6->sin6_port;
431     *buf = u6;
432     *added = sizeof (struct IPv6UdpAddress);
433     return GNUNET_OK;
434   }
435   return GNUNET_SYSERR;
436 }
437
438
439 /**
440  * Append our port and forward the result.
441  *
442  * @param cls a 'struct PrettyPrinterContext'
443  * @param hostname result from DNS resolver
444  */
445 static void
446 append_port (void *cls, const char *hostname)
447 {
448   struct PrettyPrinterContext *ppc = cls;
449   char *ret;
450
451   if (hostname == NULL)
452   {
453     ppc->asc (ppc->asc_cls, NULL);
454     GNUNET_free (ppc);
455     return;
456   }
457   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
458   ppc->asc (ppc->asc_cls, ret);
459   GNUNET_free (ret);
460 }
461
462
463 /**
464  * Convert the transports address to a nice, human-readable
465  * format.
466  *
467  * @param cls closure
468  * @param type name of the transport that generated the address
469  * @param addr one of the addresses of the host, NULL for the last address
470  *        the specific address format depends on the transport
471  * @param addrlen length of the address
472  * @param numeric should (IP) addresses be displayed in numeric form?
473  * @param timeout after how long should we give up?
474  * @param asc function to call on each string
475  * @param asc_cls closure for asc
476  */
477 static void
478 udp_plugin_address_pretty_printer (void *cls, const char *type,
479                                    const void *addr, size_t addrlen,
480                                    int numeric,
481                                    struct GNUNET_TIME_Relative timeout,
482                                    GNUNET_TRANSPORT_AddressStringCallback asc,
483                                    void *asc_cls)
484 {
485   struct PrettyPrinterContext *ppc;
486   const void *sb;
487   size_t sbs;
488   struct sockaddr_in a4;
489   struct sockaddr_in6 a6;
490   const struct IPv4UdpAddress *u4;
491   const struct IPv6UdpAddress *u6;
492   uint16_t port;
493
494   if (addrlen == sizeof (struct IPv6UdpAddress))
495   {
496     u6 = addr;
497     memset (&a6, 0, sizeof (a6));
498     a6.sin6_family = AF_INET6;
499 #if HAVE_SOCKADDR_IN_SIN_LEN
500     a6.sin6_len = sizeof (a6);
501 #endif
502     a6.sin6_port = u6->u6_port;
503     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
504     port = ntohs (u6->u6_port);
505     sb = &a6;
506     sbs = sizeof (a6);
507   }
508   else if (addrlen == sizeof (struct IPv4UdpAddress))
509   {
510     u4 = addr;
511     memset (&a4, 0, sizeof (a4));
512     a4.sin_family = AF_INET;
513 #if HAVE_SOCKADDR_IN_SIN_LEN
514     a4.sin_len = sizeof (a4);
515 #endif
516     a4.sin_port = u4->u4_port;
517     a4.sin_addr.s_addr = u4->ipv4_addr;
518     port = ntohs (u4->u4_port);
519     sb = &a4;
520     sbs = sizeof (a4);
521   }
522   else if (0 == addrlen)
523   {
524     asc (asc_cls, "<inbound connection>");
525     asc (asc_cls, NULL);
526     return;
527   }
528   else
529   {
530     /* invalid address */
531     GNUNET_break_op (0);
532     asc (asc_cls, NULL);
533     return;
534   }
535   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
536   ppc->asc = asc;
537   ppc->asc_cls = asc_cls;
538   ppc->port = port;
539   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
540 }
541
542 static void
543 call_continuation (struct UDPMessageWrapper *udpw, int result)
544 {
545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546       "Calling continuation for %u byte message to `%s' with result %s\n",
547       udpw->msg_size, GNUNET_i2s (&udpw->session->target),
548       (GNUNET_OK == result) ? "OK" : "SYSERR");
549   if (NULL != udpw->cont)
550   {
551     udpw->cont (udpw->cont_cls, &udpw->session->target,result);
552   }
553
554 }
555
556 /**
557  * Check if the given port is plausible (must be either our listen
558  * port or our advertised port).  If it is neither, we return
559  * GNUNET_SYSERR.
560  *
561  * @param plugin global variables
562  * @param in_port port number to check
563  * @return GNUNET_OK if port is either open_port or adv_port
564  */
565 static int
566 check_port (struct Plugin *plugin, uint16_t in_port)
567 {
568   if ((in_port == plugin->port) || (in_port == plugin->aport))
569     return GNUNET_OK;
570   return GNUNET_SYSERR;
571 }
572
573
574
575 /**
576  * Function that will be called to check if a binary address for this
577  * plugin is well-formed and corresponds to an address for THIS peer
578  * (as per our configuration).  Naturally, if absolutely necessary,
579  * plugins can be a bit conservative in their answer, but in general
580  * plugins should make sure that the address does not redirect
581  * traffic to a 3rd party that might try to man-in-the-middle our
582  * traffic.
583  *
584  * @param cls closure, should be our handle to the Plugin
585  * @param addr pointer to the address
586  * @param addrlen length of addr
587  * @return GNUNET_OK if this is a plausible address for this peer
588  *         and transport, GNUNET_SYSERR if not
589  *
590  */
591 static int
592 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
593 {
594   struct Plugin *plugin = cls;
595   struct IPv4UdpAddress *v4;
596   struct IPv6UdpAddress *v6;
597
598   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
599       (addrlen != sizeof (struct IPv6UdpAddress)))
600   {
601     GNUNET_break_op (0);
602     return GNUNET_SYSERR;
603   }
604   if (addrlen == sizeof (struct IPv4UdpAddress))
605   {
606     v4 = (struct IPv4UdpAddress *) addr;
607     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
608       return GNUNET_SYSERR;
609     if (GNUNET_OK !=
610         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
611                                  sizeof (struct in_addr)))
612       return GNUNET_SYSERR;
613   }
614   else
615   {
616     v6 = (struct IPv6UdpAddress *) addr;
617     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
618     {
619       GNUNET_break_op (0);
620       return GNUNET_SYSERR;
621     }
622     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
623       return GNUNET_SYSERR;
624     if (GNUNET_OK !=
625         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
626                                  sizeof (struct in6_addr)))
627       return GNUNET_SYSERR;
628   }
629   return GNUNET_OK;
630 }
631
632
633 /**
634  * Task to free resources associated with a session.
635  *
636  * @param s session to free
637  */
638 static void
639 free_session (struct Session *s)
640 {
641   if (s->frag_ctx != NULL)
642   {
643     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
644     GNUNET_free (s->frag_ctx);
645     s->frag_ctx = NULL;
646   }
647   GNUNET_free (s);
648 }
649
650
651 /**
652  * Destroy a session, plugin is being unloaded.
653  *
654  * @param cls unused
655  * @param key hash of public key of target peer
656  * @param value a 'struct PeerSession*' to clean up
657  * @return GNUNET_OK (continue to iterate)
658  */
659 static int
660 disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
661 {
662   struct Plugin *plugin = cls;
663   struct Session *s = value;
664   struct UDPMessageWrapper *udpw;
665   struct UDPMessageWrapper *next;
666
667   GNUNET_assert (GNUNET_YES != s->in_destroy);
668   LOG (GNUNET_ERROR_TYPE_DEBUG,
669        "Session %p to peer `%s' address ended \n",
670          s,
671          GNUNET_i2s (&s->target),
672          GNUNET_a2s (s->sock_addr, s->addrlen));
673   next = plugin->ipv4_queue_head;
674   while (NULL != (udpw = next))
675   {
676     next = udpw->next;
677     if (udpw->session == s)
678     {
679       GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
680       call_continuation(udpw, GNUNET_SYSERR);
681       GNUNET_free (udpw);
682     }
683   }
684   next = plugin->ipv6_queue_head;
685   while (NULL != (udpw = next))
686   {
687     next = udpw->next;
688     if (udpw->session == s)
689     {
690       GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
691       call_continuation(udpw, GNUNET_SYSERR);
692       GNUNET_free (udpw);
693     }
694     udpw = next;
695   }
696   plugin->env->session_end (plugin->env->cls, &s->target, s);
697
698   if (NULL != s->frag_ctx)
699   {
700     if (NULL != s->frag_ctx->cont)
701     {
702       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR);
703       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
704           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
705           GNUNET_i2s (&s->target));
706     }
707   }
708
709   GNUNET_assert (GNUNET_YES ==
710                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
711                                                        &s->target.hashPubKey,
712                                                        s));
713   GNUNET_STATISTICS_set(plugin->env->stats,
714                         "# UDP sessions active",
715                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
716                         GNUNET_NO);
717   if (s->rc > 0)
718     s->in_destroy = GNUNET_YES;
719   else
720     free_session (s);
721   return GNUNET_OK;
722 }
723
724
725 /**
726  * Disconnect from a remote node.  Clean up session if we have one for this peer
727  *
728  * @param cls closure for this call (should be handle to Plugin)
729  * @param target the peeridentity of the peer to disconnect
730  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
731  */
732 static void
733 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
734 {
735   struct Plugin *plugin = cls;
736   GNUNET_assert (plugin != NULL);
737
738   GNUNET_assert (target != NULL);
739   LOG (GNUNET_ERROR_TYPE_DEBUG,
740        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
741   /* Clean up sessions */
742   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
743 }
744
745 static struct Session *
746 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
747                 const void *addr, size_t addrlen,
748                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
749 {
750   struct Session *s;
751   const struct IPv4UdpAddress *t4;
752   const struct IPv6UdpAddress *t6;
753   struct sockaddr_in *v4;
754   struct sockaddr_in6 *v6;
755   size_t len;
756
757   switch (addrlen)
758   {
759   case sizeof (struct IPv4UdpAddress):
760     if (NULL == plugin->sockv4)
761     {
762       return NULL;
763     }
764     t4 = addr;
765     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
766     len = sizeof (struct sockaddr_in);
767     v4 = (struct sockaddr_in *) &s[1];
768     v4->sin_family = AF_INET;
769 #if HAVE_SOCKADDR_IN_SIN_LEN
770     v4->sin_len = sizeof (struct sockaddr_in);
771 #endif
772     v4->sin_port = t4->u4_port;
773     v4->sin_addr.s_addr = t4->ipv4_addr;
774     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
775     break;
776   case sizeof (struct IPv6UdpAddress):
777     if (NULL == plugin->sockv6)
778     {
779       return NULL;
780     }
781     t6 = addr;
782     s =
783         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
784     len = sizeof (struct sockaddr_in6);
785     v6 = (struct sockaddr_in6 *) &s[1];
786     v6->sin6_family = AF_INET6;
787 #if HAVE_SOCKADDR_IN_SIN_LEN
788     v6->sin6_len = sizeof (struct sockaddr_in6);
789 #endif
790     v6->sin6_port = t6->u6_port;
791     v6->sin6_addr = t6->ipv6_addr;
792     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
793     break;
794   default:
795     /* Must have a valid address to send to */
796     GNUNET_break_op (0);
797     return NULL;
798   }
799
800   s->addrlen = len;
801   s->target = *target;
802   s->sock_addr = (const struct sockaddr *) &s[1];
803   s->flow_delay_for_other_peer = GNUNET_TIME_relative_get_zero();
804   s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
805   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
806
807   return s;
808 }
809
810
811 static int
812 session_cmp_it (void *cls,
813                 const GNUNET_HashCode * key,
814                 void *value)
815 {
816   struct SessionCompareContext * cctx = cls;
817   const struct GNUNET_HELLO_Address *address = cctx->addr;
818   struct Session *s = value;
819
820   socklen_t s_addrlen = s->addrlen;
821
822 #if VERBOSE_UDP
823   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
824       udp_address_to_string (NULL, (void *) address->address, address->address_length),
825       GNUNET_a2s (s->sock_addr, s->addrlen));
826 #endif
827
828   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
829       (s_addrlen == sizeof (struct sockaddr_in)))
830   {
831     struct IPv4UdpAddress * u4 = NULL;
832     u4 = (struct IPv4UdpAddress *) address->address;
833     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
834     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
835         (u4->u4_port == s4->sin_port))
836     {
837       cctx->res = s;
838       return GNUNET_NO;
839     }
840
841   }
842   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
843       (s_addrlen == sizeof (struct sockaddr_in6)))
844   {
845     struct IPv6UdpAddress * u6 = NULL;
846     u6 = (struct IPv6UdpAddress *) address->address;
847     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
848     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
849         (u6->u6_port == s6->sin6_port))
850     {
851       cctx->res = s;
852       return GNUNET_NO;
853     }
854   }
855
856
857   return GNUNET_YES;
858 }
859
860
861 /**
862  * Creates a new outbound session the transport service will use to send data to the
863  * peer
864  *
865  * @param cls the plugin
866  * @param address the address
867  * @return the session or NULL of max connections exceeded
868  */
869 static struct Session *
870 udp_plugin_get_session (void *cls,
871                   const struct GNUNET_HELLO_Address *address)
872 {
873   struct Session * s = NULL;
874   struct Plugin * plugin = cls;
875   struct IPv6UdpAddress * udp_a6;
876   struct IPv4UdpAddress * udp_a4;
877
878   GNUNET_assert (plugin != NULL);
879   GNUNET_assert (address != NULL);
880
881
882   if ((address->address == NULL) ||
883       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
884       (address->address_length != sizeof (struct IPv6UdpAddress))))
885   {
886     GNUNET_break (0);
887     return NULL;
888   }
889
890   if (address->address_length == sizeof (struct IPv4UdpAddress))
891   {
892     if (plugin->sockv4 == NULL)
893       return NULL;
894     udp_a4 = (struct IPv4UdpAddress *) address->address;
895     if (udp_a4->u4_port == 0)
896       return NULL;
897   }
898
899   if (address->address_length == sizeof (struct IPv6UdpAddress))
900   {
901     if (plugin->sockv6 == NULL)
902       return NULL;
903     udp_a6 = (struct IPv6UdpAddress *) address->address;
904     if (udp_a6->u6_port == 0)
905       return NULL;
906   }
907
908   /* check if session already exists */
909   struct SessionCompareContext cctx;
910   cctx.addr = address;
911   cctx.res = NULL;
912 #if VERBOSE_UDP
913   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length));
914 #endif
915   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
916   if (cctx.res != NULL)
917   {
918 #if VERBOSE_UDP
919     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
920 #endif
921     return cctx.res;
922   }
923
924   /* otherwise create new */
925   s = create_session (plugin,
926       &address->peer,
927       address->address,
928       address->address_length,
929       NULL, NULL);
930 #if VERBOSE
931     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
932               "Creating new session %p for peer `%s' address `%s'\n",
933               s,
934               GNUNET_i2s(&address->peer),
935               udp_address_to_string(NULL,address->address,address->address_length));
936 #endif
937   GNUNET_assert (GNUNET_OK ==
938                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
939                                                     &s->target.hashPubKey,
940                                                     s,
941                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
942
943   GNUNET_STATISTICS_set(plugin->env->stats,
944                         "# UDP sessions active",
945                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
946                         GNUNET_NO);
947
948   return s;
949 }
950
951
952 static void 
953 enqueue (struct Plugin *plugin, struct UDPMessageWrapper * udpw)
954 {
955
956   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
957     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
958   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
959     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
960 }
961
962
963 /**
964  * Fragment message was transmitted via UDP, let fragmentation know
965  * to send the next fragment now.
966  *
967  * @param cls the 'struct UDPMessageWrapper' of the fragment
968  * @param target destination peer (ignored)
969  * @param result GNUNET_OK on success (ignored)
970  */
971 static void
972 send_next_fragment (void *cls,
973                     const struct GNUNET_PeerIdentity *target,
974                     int result)
975 {
976   struct UDPMessageWrapper *udpw = cls;
977
978   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
979 }
980
981
982 /**
983  * Function that is called with messages created by the fragmentation
984  * module.  In the case of the 'proc' callback of the
985  * GNUNET_FRAGMENT_context_create function, this function must
986  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
987  *
988  * @param cls closure, the 'struct FragmentationContext'
989  * @param msg the message that was created
990  */
991 static void
992 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
993 {
994   struct FragmentationContext *frag_ctx = cls;
995   struct Plugin *plugin = frag_ctx->plugin;
996   struct UDPMessageWrapper * udpw;
997   struct Session *s;
998
999   size_t msg_len = ntohs (msg->size);
1000
1001 #if VERBOSE_UDP
1002   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
1003 #endif
1004
1005   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
1006   udpw->session = frag_ctx->session;
1007   s = udpw->session;
1008   udpw->udp = (char *) &udpw[1];
1009
1010   udpw->msg_size = msg_len;
1011   udpw->cont = &send_next_fragment;
1012   udpw->cont_cls = udpw;
1013   udpw->timeout = frag_ctx->timeout;
1014   udpw->frag_ctx = frag_ctx;
1015   memcpy (udpw->udp, msg, msg_len);
1016
1017   enqueue (plugin, udpw);
1018
1019
1020   if (s->addrlen == sizeof (struct sockaddr_in))
1021   {
1022     if (plugin->with_v4_ws == GNUNET_NO)
1023     {
1024       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1025         GNUNET_SCHEDULER_cancel(plugin->select_task);
1026
1027       plugin->select_task =
1028           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1029                                        GNUNET_TIME_UNIT_FOREVER_REL,
1030                                        plugin->rs_v4,
1031                                        plugin->ws_v4,
1032                                        &udp_plugin_select, plugin);
1033       plugin->with_v4_ws = GNUNET_YES;
1034     }
1035   }
1036
1037   else if (s->addrlen == sizeof (struct sockaddr_in6))
1038   {
1039     if (plugin->with_v6_ws == GNUNET_NO)
1040     {
1041       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1042         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1043
1044       plugin->select_task_v6 =
1045           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1046                                        GNUNET_TIME_UNIT_FOREVER_REL,
1047                                        plugin->rs_v6,
1048                                        plugin->ws_v6,
1049                                        &udp_plugin_select_v6, plugin);
1050       plugin->with_v6_ws = GNUNET_YES;
1051     }
1052   }
1053 }
1054
1055
1056 /**
1057  * Function that can be used by the transport service to transmit
1058  * a message using the plugin.   Note that in the case of a
1059  * peer disconnecting, the continuation MUST be called
1060  * prior to the disconnect notification itself.  This function
1061  * will be called with this peer's HELLO message to initiate
1062  * a fresh connection to another peer.
1063  *
1064  * @param cls closure
1065  * @param s which session must be used
1066  * @param msgbuf the message to transmit
1067  * @param msgbuf_size number of bytes in 'msgbuf'
1068  * @param priority how important is the message (most plugins will
1069  *                 ignore message priority and just FIFO)
1070  * @param to how long to wait at most for the transmission (does not
1071  *                require plugins to discard the message after the timeout,
1072  *                just advisory for the desired delay; most plugins will ignore
1073  *                this as well)
1074  * @param cont continuation to call once the message has
1075  *        been transmitted (or if the transport is ready
1076  *        for the next transmission call; or if the
1077  *        peer disconnected...); can be NULL
1078  * @param cont_cls closure for cont
1079  * @return number of bytes used (on the physical network, with overheads);
1080  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1081  *         and does NOT mean that the message was not transmitted (DV)
1082  */
1083 static ssize_t
1084 udp_plugin_send (void *cls,
1085                   struct Session *s,
1086                   const char *msgbuf, size_t msgbuf_size,
1087                   unsigned int priority,
1088                   struct GNUNET_TIME_Relative to,
1089                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1090 {
1091   struct Plugin *plugin = cls;
1092   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
1093
1094   struct UDPMessageWrapper * udpw;
1095   struct UDPMessage *udp;
1096   char mbuf[mlen];
1097   GNUNET_assert (plugin != NULL);
1098   GNUNET_assert (s != NULL);
1099
1100   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1101     return GNUNET_SYSERR;
1102
1103    if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1104      return GNUNET_SYSERR;
1105
1106
1107   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1108   {
1109     GNUNET_break (0);
1110     return GNUNET_SYSERR;
1111   }
1112
1113   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1114   {
1115     GNUNET_break (0);
1116     return GNUNET_SYSERR;
1117   }
1118
1119   LOG (GNUNET_ERROR_TYPE_DEBUG,
1120        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1121          mlen,
1122          GNUNET_i2s (&s->target),
1123          GNUNET_a2s(s->sock_addr, s->addrlen));
1124
1125   /* Message */
1126   udp = (struct UDPMessage *) mbuf;
1127   udp->header.size = htons (mlen);
1128   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1129   udp->reserved = htonl (0);
1130   udp->sender = *plugin->env->my_identity;
1131
1132   if (mlen <= UDP_MTU)
1133   {
1134     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
1135     udpw->session = s;
1136     udpw->udp = (char *) &udpw[1];
1137     udpw->msg_size = mlen;
1138     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1139     udpw->cont = cont;
1140     udpw->cont_cls = cont_cls;
1141     udpw->frag_ctx = NULL;
1142     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
1143     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1144
1145     enqueue (plugin, udpw);
1146   }
1147   else
1148   {
1149     LOG (GNUNET_ERROR_TYPE_DEBUG,
1150          "UDP has to fragment message \n");
1151     if  (s->frag_ctx != NULL)
1152       return GNUNET_SYSERR;
1153     memcpy (&udp[1], msgbuf, msgbuf_size);
1154     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
1155
1156     frag_ctx->plugin = plugin;
1157     frag_ctx->session = s;
1158     frag_ctx->cont = cont;
1159     frag_ctx->cont_cls = cont_cls;
1160     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1161     frag_ctx->bytes_to_send = mlen;
1162     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1163               UDP_MTU,
1164               &plugin->tracker,
1165               s->last_expected_delay,
1166               &udp->header,
1167               &enqueue_fragment,
1168               frag_ctx);
1169
1170     s->frag_ctx = frag_ctx;
1171   }
1172
1173   if (s->addrlen == sizeof (struct sockaddr_in))
1174   {
1175     if (plugin->with_v4_ws == GNUNET_NO)
1176     {
1177       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1178         GNUNET_SCHEDULER_cancel(plugin->select_task);
1179
1180       plugin->select_task =
1181           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1182                                        GNUNET_TIME_UNIT_FOREVER_REL,
1183                                        plugin->rs_v4,
1184                                        plugin->ws_v4,
1185                                        &udp_plugin_select, plugin);
1186       plugin->with_v4_ws = GNUNET_YES;
1187     }
1188   }
1189
1190   else if (s->addrlen == sizeof (struct sockaddr_in6))
1191   {
1192     if (plugin->with_v6_ws == GNUNET_NO)
1193     {
1194       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1195         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1196
1197       plugin->select_task_v6 =
1198         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1199                                      GNUNET_TIME_UNIT_FOREVER_REL,
1200                                      plugin->rs_v6,
1201                                      plugin->ws_v6,
1202                                      &udp_plugin_select_v6, plugin);
1203       plugin->with_v6_ws = GNUNET_YES;
1204     }
1205   }
1206
1207   return mlen;
1208 }
1209
1210
1211 /**
1212  * Our external IP address/port mapping has changed.
1213  *
1214  * @param cls closure, the 'struct LocalAddrList'
1215  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1216  *     the previous (now invalid) one
1217  * @param addr either the previous or the new public IP address
1218  * @param addrlen actual lenght of the address
1219  */
1220 static void
1221 udp_nat_port_map_callback (void *cls, int add_remove,
1222                            const struct sockaddr *addr, socklen_t addrlen)
1223 {
1224   struct Plugin *plugin = cls;
1225   struct IPv4UdpAddress u4;
1226   struct IPv6UdpAddress u6;
1227   void *arg;
1228   size_t args;
1229
1230   /* convert 'addr' to our internal format */
1231   switch (addr->sa_family)
1232   {
1233   case AF_INET:
1234     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1235     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1236     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1237     arg = &u4;
1238     args = sizeof (u4);
1239     break;
1240   case AF_INET6:
1241     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1242     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1243             sizeof (struct in6_addr));
1244     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1245     arg = &u6;
1246     args = sizeof (u6);
1247     break;
1248   default:
1249     GNUNET_break (0);
1250     return;
1251   }
1252   /* modify our published address list */
1253   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args);
1254 }
1255
1256
1257
1258 /**
1259  * Message tokenizer has broken up an incomming message. Pass it on
1260  * to the service.
1261  *
1262  * @param cls the 'struct Plugin'
1263  * @param client the 'struct SourceInformation'
1264  * @param hdr the actual message
1265  */
1266 static int
1267 process_inbound_tokenized_messages (void *cls, void *client,
1268                                     const struct GNUNET_MessageHeader *hdr)
1269 {
1270   struct Plugin *plugin = cls;
1271   struct SourceInformation *si = client;
1272   struct GNUNET_ATS_Information ats[2];
1273   struct GNUNET_TIME_Relative delay;
1274
1275   GNUNET_assert (si->session != NULL);
1276   if (GNUNET_YES == si->session->in_destroy)
1277     return GNUNET_OK;
1278   /* setup ATS */
1279   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1280   ats[0].value = htonl (1);
1281   ats[1] = si->session->ats;
1282   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1283
1284   delay = plugin->env->receive (plugin->env->cls,
1285                 &si->sender,
1286                 hdr,
1287                 (const struct GNUNET_ATS_Information *) &ats, 2,
1288                 NULL,
1289                 si->arg,
1290                 si->args);
1291   si->session->flow_delay_for_other_peer = delay;
1292   return GNUNET_OK;
1293 }
1294
1295
1296 /**
1297  * We've received a UDP Message.  Process it (pass contents to main service).
1298  *
1299  * @param plugin plugin context
1300  * @param msg the message
1301  * @param sender_addr sender address
1302  * @param sender_addr_len number of bytes in sender_addr
1303  */
1304 static void
1305 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1306                      const struct sockaddr *sender_addr,
1307                      socklen_t sender_addr_len)
1308 {
1309   struct SourceInformation si;
1310   struct Session * s;
1311   struct IPv4UdpAddress u4;
1312   struct IPv6UdpAddress u6;
1313   const void *arg;
1314   size_t args;
1315
1316   if (0 != ntohl (msg->reserved))
1317   {
1318     GNUNET_break_op (0);
1319     return;
1320   }
1321   if (ntohs (msg->header.size) <
1322       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1323   {
1324     GNUNET_break_op (0);
1325     return;
1326   }
1327
1328   /* convert address */
1329   switch (sender_addr->sa_family)
1330   {
1331   case AF_INET:
1332     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1333     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1334     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1335     arg = &u4;
1336     args = sizeof (u4);
1337     break;
1338   case AF_INET6:
1339     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1340     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1341     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1342     arg = &u6;
1343     args = sizeof (u6);
1344     break;
1345   default:
1346     GNUNET_break (0);
1347     return;
1348   }
1349   LOG (GNUNET_ERROR_TYPE_DEBUG,
1350        "Received message with %u bytes from peer `%s' at `%s'\n",
1351        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1352        GNUNET_a2s (sender_addr, sender_addr_len));
1353
1354   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1355   s = udp_plugin_get_session(plugin, address);
1356   GNUNET_free (address);
1357
1358   /* iterate over all embedded messages */
1359   si.session = s;
1360   si.sender = msg->sender;
1361   si.arg = arg;
1362   si.args = args;
1363   s->rc++;
1364   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1365                              ntohs (msg->header.size) -
1366                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1367   s->rc--;
1368   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1369     free_session (s);
1370 }
1371
1372
1373 /**
1374  * Scan the heap for a receive context with the given address.
1375  *
1376  * @param cls the 'struct FindReceiveContext'
1377  * @param node internal node of the heap
1378  * @param element value stored at the node (a 'struct ReceiveContext')
1379  * @param cost cost associated with the node
1380  * @return GNUNET_YES if we should continue to iterate,
1381  *         GNUNET_NO if not.
1382  */
1383 static int
1384 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1385                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1386 {
1387   struct FindReceiveContext *frc = cls;
1388   struct DefragContext *e = element;
1389
1390   if ((frc->addr_len == e->addr_len) &&
1391       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1392   {
1393     frc->rc = e;
1394     return GNUNET_NO;
1395   }
1396   return GNUNET_YES;
1397 }
1398
1399
1400 /**
1401  * Process a defragmented message.
1402  *
1403  * @param cls the 'struct ReceiveContext'
1404  * @param msg the message
1405  */
1406 static void
1407 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1408 {
1409   struct DefragContext *rc = cls;
1410
1411   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1412   {
1413     GNUNET_break (0);
1414     return;
1415   }
1416   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1417   {
1418     GNUNET_break (0);
1419     return;
1420   }
1421   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1422                        rc->src_addr, rc->addr_len);
1423 }
1424
1425 struct LookupContext
1426 {
1427   const struct sockaddr * addr;
1428   size_t addrlen;
1429
1430   struct Session *res;
1431 };
1432
1433 static int
1434 lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value)
1435 {
1436   struct LookupContext *l_ctx = cls;
1437   struct Session * s = value;
1438
1439   if ((s->addrlen == l_ctx->addrlen) &&
1440       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1441   {
1442     l_ctx->res = s;
1443     return GNUNET_NO;
1444   }
1445   return GNUNET_YES;
1446 }
1447
1448 /**
1449  * Transmit an acknowledgement.
1450  *
1451  * @param cls the 'struct ReceiveContext'
1452  * @param id message ID (unused)
1453  * @param msg ack to transmit
1454  */
1455 static void
1456 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1457 {
1458   struct DefragContext *rc = cls;
1459
1460   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1461   struct UDP_ACK_Message *udp_ack;
1462   uint32_t delay = 0;
1463   struct UDPMessageWrapper *udpw;
1464   struct Session *s;
1465
1466   struct LookupContext l_ctx;
1467   l_ctx.addr = rc->src_addr;
1468   l_ctx.addrlen = rc->addr_len;
1469   l_ctx.res = NULL;
1470   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1471       &lookup_session_by_addr_it,
1472       &l_ctx);
1473   s = l_ctx.res;
1474
1475   if (NULL == s)
1476     return;
1477
1478   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1479     delay = s->flow_delay_for_other_peer.rel_value;
1480
1481   LOG (GNUNET_ERROR_TYPE_DEBUG,
1482        "Sending ACK to `%s' including delay of %u ms\n",
1483        GNUNET_a2s (rc->src_addr,
1484                    (rc->src_addr->sa_family ==
1485                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1486                                                                      sockaddr_in6)),
1487        delay);
1488   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
1489   udpw->cont = NULL;
1490   udpw->cont_cls = NULL;
1491   udpw->frag_ctx = NULL;
1492   udpw->msg_size = msize;
1493   udpw->session = s;
1494   udpw->timeout = GNUNET_TIME_absolute_get_forever();
1495   udpw->udp = (char *)&udpw[1];
1496
1497   udp_ack = (struct UDP_ACK_Message *) udpw->udp;
1498   udp_ack->header.size = htons ((uint16_t) msize);
1499   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1500   udp_ack->delay = htonl (delay);
1501   udp_ack->sender = *rc->plugin->env->my_identity;
1502   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1503
1504   enqueue (rc->plugin, udpw);
1505 }
1506
1507
1508 static void read_process_msg (struct Plugin *plugin,
1509     const struct GNUNET_MessageHeader *msg,
1510     char *addr,
1511     socklen_t fromlen)
1512 {
1513   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1514   {
1515     GNUNET_break_op (0);
1516     return;
1517   }
1518   process_udp_message (plugin, (const struct UDPMessage *) msg,
1519                        (const struct sockaddr *) addr, fromlen);
1520   return;
1521 }
1522
1523 static void read_process_ack (struct Plugin *plugin,
1524     const struct GNUNET_MessageHeader *msg,
1525     char *addr,
1526     socklen_t fromlen)
1527 {
1528   const struct GNUNET_MessageHeader *ack;
1529   const struct UDP_ACK_Message *udp_ack;
1530   struct LookupContext l_ctx;
1531   struct Session *s = NULL;
1532   struct GNUNET_TIME_Relative flow_delay;
1533
1534   if (ntohs (msg->size) <
1535       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1536   {
1537     GNUNET_break_op (0);
1538     return;
1539   }
1540
1541   udp_ack = (const struct UDP_ACK_Message *) msg;
1542
1543   l_ctx.addr = (const struct sockaddr *) addr;
1544   l_ctx.addrlen = fromlen;
1545   l_ctx.res = NULL;
1546   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1547       &lookup_session_by_addr_it,
1548       &l_ctx);
1549   s = l_ctx.res;
1550
1551   if ((s == NULL) || (s->frag_ctx == NULL))
1552     return;
1553
1554   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1555   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1556        flow_delay.rel_value);
1557   s->flow_delay_from_other_peer =
1558       GNUNET_TIME_relative_to_absolute (flow_delay);
1559
1560   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1561   if (ntohs (ack->size) !=
1562       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1563   {
1564     GNUNET_break_op (0);
1565     return;
1566   }
1567
1568   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1569   {
1570   LOG (GNUNET_ERROR_TYPE_DEBUG,
1571        "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1572        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1573        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1574     return;
1575   }
1576
1577   LOG (GNUNET_ERROR_TYPE_DEBUG,
1578        "FULL MESSAGE ACKed\n",
1579        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1580        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1581   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1582
1583   struct UDPMessageWrapper * udpw = NULL;
1584   struct UDPMessageWrapper * tmp = NULL;
1585   if (s->addrlen == sizeof (struct sockaddr_in6))
1586   {
1587     udpw = plugin->ipv6_queue_head;
1588     while (udpw!= NULL)
1589     {
1590       tmp = udpw->next;
1591       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1592       {
1593         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1594         GNUNET_free (udpw);
1595       }
1596       udpw = tmp;
1597     }
1598   }
1599   if (s->addrlen == sizeof (struct sockaddr_in))
1600   {
1601     udpw = plugin->ipv4_queue_head;
1602     while (udpw!= NULL)
1603     {
1604       tmp = udpw->next;
1605       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1606       {
1607         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1608         GNUNET_free (udpw);
1609       }
1610       udpw = tmp;
1611     }
1612   }
1613
1614   if (s->frag_ctx->cont != NULL)
1615   {
1616     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617         "Calling continuation for fragmented message to `%s' with result %s\n",
1618         GNUNET_i2s (&s->target), "OK");
1619     s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
1620   }
1621
1622   GNUNET_free (s->frag_ctx);
1623   s->frag_ctx = NULL;
1624   return;
1625 }
1626
1627 static void read_process_fragment (struct Plugin *plugin,
1628     const struct GNUNET_MessageHeader *msg,
1629     char *addr,
1630     socklen_t fromlen)
1631 {
1632   struct DefragContext *d_ctx;
1633   struct GNUNET_TIME_Absolute now;
1634   struct FindReceiveContext frc;
1635
1636
1637   frc.rc = NULL;
1638   frc.addr = (const struct sockaddr *) addr;
1639   frc.addr_len = fromlen;
1640
1641   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1642        (unsigned int) ntohs (msg->size),
1643        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1644   /* Lookup existing receive context for this address */
1645   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1646                                  &find_receive_context,
1647                                  &frc);
1648   now = GNUNET_TIME_absolute_get ();
1649   d_ctx = frc.rc;
1650
1651   if (d_ctx == NULL)
1652   {
1653     /* Create a new defragmentation context */
1654     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1655     memcpy (&d_ctx[1], addr, fromlen);
1656     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1657     d_ctx->addr_len = fromlen;
1658     d_ctx->plugin = plugin;
1659     d_ctx->defrag =
1660         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1661                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1662                                           &fragment_msg_proc, &ack_proc);
1663     d_ctx->hnode =
1664         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1665                                       (GNUNET_CONTAINER_HeapCostType)
1666                                       now.abs_value);
1667   LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n",
1668        (unsigned int) ntohs (msg->size),
1669        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1670   }
1671   else
1672   {
1673   LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1674        (unsigned int) ntohs (msg->size),
1675        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1676   }
1677
1678   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1679   {
1680     /* keep this 'rc' from expiring */
1681     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1682                                        (GNUNET_CONTAINER_HeapCostType)
1683                                        now.abs_value);
1684   }
1685   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1686       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1687   {
1688     /* remove 'rc' that was inactive the longest */
1689     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1690     GNUNET_assert (NULL != d_ctx);
1691     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1692     GNUNET_free (d_ctx);
1693   }
1694 }
1695
1696 /**
1697  * Read and process a message from the given socket.
1698  *
1699  * @param plugin the overall plugin
1700  * @param rsock socket to read from
1701  */
1702 static void
1703 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1704 {
1705   socklen_t fromlen;
1706   char addr[32];
1707   char buf[65536] GNUNET_ALIGN;
1708   ssize_t size;
1709   const struct GNUNET_MessageHeader *msg;
1710
1711   fromlen = sizeof (addr);
1712   memset (&addr, 0, sizeof (addr));
1713   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1714                                       (struct sockaddr *) &addr, &fromlen);
1715
1716   if (size < sizeof (struct GNUNET_MessageHeader))
1717   {
1718     GNUNET_break_op (0);
1719     return;
1720   }
1721   msg = (const struct GNUNET_MessageHeader *) buf;
1722
1723   LOG (GNUNET_ERROR_TYPE_DEBUG,
1724        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1725        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1726
1727   if (size != ntohs (msg->size))
1728   {
1729     GNUNET_break_op (0);
1730     return;
1731   }
1732
1733   switch (ntohs (msg->type))
1734   {
1735   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1736     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1737     return;
1738
1739   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1740     read_process_msg (plugin, msg, addr, fromlen);
1741     return;
1742
1743   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1744     read_process_ack (plugin, msg, addr, fromlen);
1745     return;
1746
1747   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1748     read_process_fragment (plugin, msg, addr, fromlen);
1749     return;
1750
1751   default:
1752     GNUNET_break_op (0);
1753     return;
1754   }
1755 }
1756
1757
1758 static size_t
1759 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
1760 {
1761   ssize_t sent;
1762   size_t slen;
1763   struct GNUNET_TIME_Absolute max;
1764   struct UDPMessageWrapper *udpw = NULL;
1765
1766   if (sock == plugin->sockv4)
1767   {
1768     udpw = plugin->ipv4_queue_head;
1769   }
1770   else if (sock == plugin->sockv6)
1771   {
1772     udpw = plugin->ipv6_queue_head;
1773   }
1774   else
1775   {
1776     GNUNET_break (0);
1777     return 0;
1778   }
1779
1780   const struct sockaddr * sa = udpw->session->sock_addr;
1781   slen = udpw->session->addrlen;
1782
1783   max = GNUNET_TIME_absolute_max(udpw->timeout, GNUNET_TIME_absolute_get());
1784
1785   while (udpw != NULL)
1786   {
1787     if (max.abs_value != udpw->timeout.abs_value)
1788     {
1789       /* Message timed out */
1790       call_continuation(udpw, GNUNET_SYSERR);
1791       if (udpw->frag_ctx != NULL)
1792       {
1793         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n",
1794             GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
1795         udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
1796         GNUNET_free (udpw->frag_ctx);
1797         udpw->session->frag_ctx = NULL;
1798       }
1799       else
1800       {
1801         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' with size %u timed out\n",
1802             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1803       }
1804
1805       if (sock == plugin->sockv4)
1806       {
1807         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1808         GNUNET_free (udpw);
1809         udpw = plugin->ipv4_queue_head;
1810       }
1811       else if (sock == plugin->sockv6)
1812       {
1813         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1814         GNUNET_free (udpw);
1815         udpw = plugin->ipv6_queue_head;
1816       }
1817     }
1818     else
1819     {
1820       struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
1821       if (delta.rel_value == 0)
1822       {
1823         /* this message is not delayed */
1824         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is not delayed \n",
1825             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1826         break;
1827       }
1828       else
1829       {
1830         /* this message is delayed, try next */
1831         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is delayed for %llu \n",
1832             GNUNET_i2s(&udpw->session->target), udpw->msg_size,
1833             delta);
1834         udpw = udpw->next;
1835       }
1836     }
1837   }
1838
1839   if (udpw == NULL)
1840   {
1841     /* No message left */
1842     return 0;
1843   }
1844
1845   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->udp, udpw->msg_size, sa, slen);
1846
1847   if (GNUNET_SYSERR == sent)
1848   {
1849     const struct GNUNET_ATS_Information type = plugin->env->get_address_type
1850         (plugin->env->cls,sa, slen);
1851
1852     if ((GNUNET_ATS_NET_WAN == type.value) &&
1853         ((ENETUNREACH == errno) || (ENETDOWN == errno)))
1854     {
1855       /* "Network unreachable" or "Network down" */
1856       /*
1857        * This indicates that this system is IPv6 enabled, but does not
1858        * have a valid global IPv6 address assigned
1859        */
1860        LOG (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1861            _("UDP could not message to `%s': `%s'. "
1862              "Please check your network configuration and disable IPv6 if your "
1863              "connection does not have a global IPv6 address\n"),
1864            GNUNET_a2s (sa, slen),
1865            STRERROR (errno));
1866     }
1867     else
1868     {
1869       LOG (GNUNET_ERROR_TYPE_ERROR,
1870          "UDP could not transmit %u-byte message to `%s': `%s'\n",
1871          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen),
1872          STRERROR (errno));
1873     }
1874     call_continuation(udpw, GNUNET_SYSERR);
1875   }
1876   else
1877   {
1878     LOG (GNUNET_ERROR_TYPE_DEBUG,
1879          "UDP transmitted %u-byte message to `%s' (%d: %s)\n",
1880          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1881          (sent < 0) ? STRERROR (errno) : "ok");
1882     call_continuation(udpw, GNUNET_OK);
1883   }
1884
1885   if (sock == plugin->sockv4)
1886     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1887   else if (sock == plugin->sockv6)
1888     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1889   GNUNET_free (udpw);
1890   udpw = NULL;
1891
1892   return sent;
1893 }
1894
1895 /**
1896  * We have been notified that our readset has something to read.  We don't
1897  * know which socket needs to be read, so we have to check each one
1898  * Then reschedule this function to be called again once more is available.
1899  *
1900  * @param cls the plugin handle
1901  * @param tc the scheduling context (for rescheduling this function again)
1902  */
1903 static void
1904 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1905 {
1906   struct Plugin *plugin = cls;
1907
1908   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
1909   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1910     return;
1911   plugin->with_v4_ws = GNUNET_NO;
1912
1913   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1914   {
1915     if ((NULL != plugin->sockv4) &&
1916       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
1917         udp_select_read (plugin, plugin->sockv4);
1918
1919   }
1920
1921   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1922   {
1923     if ((NULL != plugin->sockv4) && (plugin->ipv4_queue_head != NULL) &&
1924       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
1925       {
1926         udp_select_send (plugin, plugin->sockv4);
1927       }
1928   }
1929
1930   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1931     GNUNET_SCHEDULER_cancel (plugin->select_task);
1932   plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1933                                    GNUNET_TIME_UNIT_FOREVER_REL,
1934                                    plugin->rs_v4,
1935                                    (plugin->ipv4_queue_head != NULL) ? plugin->ws_v4 : NULL,
1936                                    &udp_plugin_select, plugin);
1937   if (plugin->ipv4_queue_head != NULL)
1938     plugin->with_v4_ws = GNUNET_YES;
1939   else
1940     plugin->with_v4_ws = GNUNET_NO;
1941 }
1942
1943
1944 /**
1945  * We have been notified that our readset has something to read.  We don't
1946  * know which socket needs to be read, so we have to check each one
1947  * Then reschedule this function to be called again once more is available.
1948  *
1949  * @param cls the plugin handle
1950  * @param tc the scheduling context (for rescheduling this function again)
1951  */
1952 static void
1953 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1954 {
1955   struct Plugin *plugin = cls;
1956
1957   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
1958   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1959     return;
1960
1961   plugin->with_v6_ws = GNUNET_NO;
1962   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1963   {
1964     if ((NULL != plugin->sockv6) &&
1965       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
1966         udp_select_read (plugin, plugin->sockv6);
1967   }
1968
1969   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1970   {
1971     if ((NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
1972       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)))
1973       {
1974         udp_select_send (plugin, plugin->sockv6);
1975       }
1976   }
1977   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1978     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
1979   plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1980                                    GNUNET_TIME_UNIT_FOREVER_REL,
1981                                    plugin->rs_v6,
1982                                    (plugin->ipv6_queue_head != NULL) ? plugin->ws_v6 : NULL,
1983                                    &udp_plugin_select_v6, plugin);
1984   if (plugin->ipv6_queue_head != NULL)
1985     plugin->with_v6_ws = GNUNET_YES;
1986   else
1987     plugin->with_v6_ws = GNUNET_NO;
1988 }
1989
1990
1991 static int
1992 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
1993 {
1994   int tries;
1995   int sockets_created = 0;
1996   struct sockaddr *serverAddr;
1997   struct sockaddr *addrs[2];
1998   socklen_t addrlens[2];
1999   socklen_t addrlen;
2000
2001   /* Create IPv6 socket */
2002   if (plugin->enable_ipv6 == GNUNET_YES)
2003   {
2004     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2005     if (NULL == plugin->sockv6)
2006     {
2007       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2008       plugin->enable_ipv6 = GNUNET_NO;
2009     }
2010     else
2011     {
2012 #if HAVE_SOCKADDR_IN_SIN_LEN
2013       serverAddrv6->sin6_len = sizeof (serverAddrv6);
2014 #endif
2015       serverAddrv6->sin6_family = AF_INET6;
2016       serverAddrv6->sin6_addr = in6addr_any;
2017       serverAddrv6->sin6_port = htons (plugin->port);
2018       addrlen = sizeof (struct sockaddr_in6);
2019       serverAddr = (struct sockaddr *) serverAddrv6;
2020       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
2021            ntohs (serverAddrv6->sin6_port));
2022       tries = 0;
2023       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
2024              GNUNET_OK)
2025       {
2026         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
2027         LOG (GNUNET_ERROR_TYPE_DEBUG,
2028              "IPv6 Binding failed, trying new port %d\n",
2029              ntohs (serverAddrv6->sin6_port));
2030         tries++;
2031         if (tries > 10)
2032         {
2033           GNUNET_NETWORK_socket_close (plugin->sockv6);
2034           plugin->sockv6 = NULL;
2035           break;
2036         }
2037       }
2038       if (plugin->sockv6 != NULL)
2039       {
2040         LOG (GNUNET_ERROR_TYPE_DEBUG,
2041              "IPv6 socket created on port %d\n",
2042              ntohs (serverAddrv6->sin6_port));
2043         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
2044         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2045         sockets_created++;
2046       }
2047     }
2048   }
2049
2050   /* Create IPv4 socket */
2051   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2052   if (NULL == plugin->sockv4)
2053   {
2054     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2055   }
2056   else
2057   {
2058 #if HAVE_SOCKADDR_IN_SIN_LEN
2059     serverAddrv4->sin_len = sizeof (serverAddrv4);
2060 #endif
2061     serverAddrv4->sin_family = AF_INET;
2062     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2063     serverAddrv4->sin_port = htons (plugin->port);
2064     addrlen = sizeof (struct sockaddr_in);
2065     serverAddr = (struct sockaddr *) serverAddrv4;
2066
2067     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2068          ntohs (serverAddrv4->sin_port));
2069     tries = 0;
2070     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2071            GNUNET_OK)
2072     {
2073       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2074       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2075            ntohs (serverAddrv4->sin_port));
2076       tries++;
2077       if (tries > 10)
2078       {
2079         GNUNET_NETWORK_socket_close (plugin->sockv4);
2080         plugin->sockv4 = NULL;
2081         break;
2082       }
2083     }
2084     if (plugin->sockv4 != NULL)
2085     {
2086       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2087       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2088       sockets_created++;
2089     }
2090   }
2091
2092   /* Create file descriptors */
2093   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2094   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2095   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2096   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2097   if (NULL != plugin->sockv4)
2098   {
2099     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2100     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2101   }
2102
2103   if (sockets_created == 0)
2104     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2105
2106   plugin->select_task =
2107       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2108                                    GNUNET_TIME_UNIT_FOREVER_REL,
2109                                    plugin->rs_v4,
2110                                    NULL,
2111                                    &udp_plugin_select, plugin);
2112   plugin->with_v4_ws = GNUNET_NO;
2113
2114   if (plugin->enable_ipv6 == GNUNET_YES)
2115   {
2116     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2117     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2118     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2119     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2120     if (NULL != plugin->sockv6)
2121     {
2122       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2123       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2124     }
2125
2126     plugin->select_task_v6 =
2127         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2128                                      GNUNET_TIME_UNIT_FOREVER_REL,
2129                                      plugin->rs_v6,
2130                                      NULL,
2131                                      &udp_plugin_select_v6, plugin);
2132     plugin->with_v6_ws = GNUNET_NO;
2133   }
2134
2135   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2136                            GNUNET_NO, plugin->port,
2137                            sockets_created,
2138                            (const struct sockaddr **) addrs, addrlens,
2139                            &udp_nat_port_map_callback, NULL, plugin);
2140
2141   return sockets_created;
2142 }
2143
2144
2145 /**
2146  * The exported method. Makes the core api available via a global and
2147  * returns the udp transport API.
2148  *
2149  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2150  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2151  */
2152 void *
2153 libgnunet_plugin_transport_udp_init (void *cls)
2154 {
2155   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2156   struct GNUNET_TRANSPORT_PluginFunctions *api;
2157   struct Plugin *plugin;
2158   unsigned long long port;
2159   unsigned long long aport;
2160   unsigned long long broadcast;
2161   unsigned long long udp_max_bps;
2162   unsigned long long enable_v6;
2163   char * bind4_address;
2164   char * bind6_address;
2165   struct GNUNET_TIME_Relative interval;
2166   struct sockaddr_in serverAddrv4;
2167   struct sockaddr_in6 serverAddrv6;
2168   int res;
2169
2170   if (NULL == env->receive)
2171   {
2172     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2173        initialze the plugin or the API */
2174     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2175     api->cls = NULL;
2176     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2177     api->address_to_string = &udp_address_to_string;
2178     api->string_to_address = &udp_string_to_address;
2179     return api;
2180   }
2181
2182   GNUNET_assert( NULL != env->stats);
2183
2184   /* Get port number */
2185   if (GNUNET_OK !=
2186       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2187                                              &port))
2188     port = 2086;
2189   if (GNUNET_OK !=
2190       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2191                                              "ADVERTISED_PORT", &aport))
2192     aport = port;
2193   if (port > 65535)
2194   {
2195     LOG (GNUNET_ERROR_TYPE_WARNING,
2196          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2197          65535);
2198     return NULL;
2199   }
2200
2201   /* Protocols */
2202   if ((GNUNET_YES ==
2203        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2204                                              "DISABLEV6")))
2205   {
2206     enable_v6 = GNUNET_NO;
2207   }
2208   else
2209     enable_v6 = GNUNET_YES;
2210
2211   /* Addresses */
2212   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2213   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2214
2215   if (GNUNET_YES ==
2216       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2217                                              "BINDTO", &bind4_address))
2218   {
2219     LOG (GNUNET_ERROR_TYPE_DEBUG,
2220          "Binding udp plugin to specific address: `%s'\n",
2221          bind4_address);
2222     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2223     {
2224       GNUNET_free (bind4_address);
2225       return NULL;
2226     }
2227   }
2228
2229   if (GNUNET_YES ==
2230       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2231                                              "BINDTO6", &bind6_address))
2232   {
2233     LOG (GNUNET_ERROR_TYPE_DEBUG,
2234          "Binding udp plugin to specific address: `%s'\n",
2235          bind6_address);
2236     if (1 !=
2237         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2238     {
2239       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2240            bind6_address);
2241       GNUNET_free_non_null (bind4_address);
2242       GNUNET_free (bind6_address);
2243       return NULL;
2244     }
2245   }
2246
2247   /* Enable neighbour discovery */
2248   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2249                                             "BROADCAST");
2250   if (broadcast == GNUNET_SYSERR)
2251     broadcast = GNUNET_NO;
2252
2253   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-udp",
2254                                            "BROADCAST_INTERVAL", &interval))
2255   {
2256     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2257   }
2258
2259   /* Maximum datarate */
2260   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2261                                              "MAX_BPS", &udp_max_bps))
2262   {
2263     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2264   }
2265
2266   plugin = GNUNET_malloc (sizeof (struct Plugin));
2267   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2268
2269   GNUNET_BANDWIDTH_tracker_init (&plugin->tracker,
2270                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2271   plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
2272   plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2273   plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
2274   plugin->port = port;
2275   plugin->aport = aport;
2276   plugin->broadcast_interval = interval;
2277   plugin->enable_ipv6 = enable_v6;
2278   plugin->env = env;
2279
2280   api->cls = plugin;
2281   api->send = NULL;
2282   api->disconnect = &udp_disconnect;
2283   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2284   api->address_to_string = &udp_address_to_string;
2285   api->string_to_address = &udp_string_to_address;
2286   api->check_address = &udp_plugin_check_address;
2287   api->get_session = &udp_plugin_get_session;
2288   api->send = &udp_plugin_send;
2289
2290   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2291   res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
2292   if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
2293   {
2294     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2295     GNUNET_free (plugin);
2296     GNUNET_free (api);
2297     return NULL;
2298   }
2299
2300   if (broadcast == GNUNET_YES)
2301   {
2302     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2303     setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
2304   }
2305
2306   GNUNET_free_non_null (bind4_address);
2307   GNUNET_free_non_null (bind6_address);
2308   return api;
2309 }
2310
2311
2312 static int
2313 heap_cleanup_iterator (void *cls,
2314                        struct GNUNET_CONTAINER_HeapNode *
2315                        node, void *element,
2316                        GNUNET_CONTAINER_HeapCostType
2317                        cost)
2318 {
2319   struct DefragContext * d_ctx = element;
2320
2321   GNUNET_CONTAINER_heap_remove_node (node);
2322   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2323   GNUNET_free (d_ctx);
2324
2325   return GNUNET_YES;
2326 }
2327
2328
2329 /**
2330  * The exported method. Makes the core api available via a global and
2331  * returns the udp transport API.
2332  *
2333  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2334  * @return NULL
2335  */
2336 void *
2337 libgnunet_plugin_transport_udp_done (void *cls)
2338 {
2339   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2340   struct Plugin *plugin = api->cls;
2341
2342   if (NULL == plugin)
2343   {
2344     GNUNET_free (api);
2345     return NULL;
2346   }
2347
2348   stop_broadcast (plugin);
2349
2350   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2351   {
2352     GNUNET_SCHEDULER_cancel (plugin->select_task);
2353     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2354   }
2355   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2356   {
2357     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2358     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2359   }
2360
2361   /* Closing sockets */
2362   if (plugin->sockv4 != NULL)
2363   {
2364     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2365     plugin->sockv4 = NULL;
2366   }
2367   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2368   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2369
2370   if (plugin->sockv6 != NULL)
2371   {
2372     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2373     plugin->sockv6 = NULL;
2374
2375     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2376     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2377   }
2378
2379   GNUNET_NAT_unregister (plugin->nat);
2380
2381   if (plugin->defrag_ctxs != NULL)
2382   {
2383     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2384         heap_cleanup_iterator, NULL);
2385     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2386     plugin->defrag_ctxs = NULL;
2387   }
2388   if (plugin->mst != NULL)
2389   {
2390     GNUNET_SERVER_mst_destroy(plugin->mst);
2391     plugin->mst = NULL;
2392   }
2393
2394   /* Clean up leftover messages */
2395   struct UDPMessageWrapper * udpw;
2396   udpw = plugin->ipv4_queue_head;
2397   while (udpw != NULL)
2398   {
2399     struct UDPMessageWrapper *tmp = udpw->next;
2400     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
2401     call_continuation(udpw, GNUNET_SYSERR);
2402     GNUNET_free (udpw);
2403     udpw = tmp;
2404   }
2405   udpw = plugin->ipv6_queue_head;
2406   while (udpw != NULL)
2407   {
2408     struct UDPMessageWrapper *tmp = udpw->next;
2409     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
2410     call_continuation(udpw, GNUNET_SYSERR);
2411     GNUNET_free (udpw);
2412     udpw = tmp;
2413   }
2414
2415   /* Clean up sessions */
2416   LOG (GNUNET_ERROR_TYPE_DEBUG,
2417        "Cleaning up sessions\n");
2418   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2419   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2420
2421   plugin->nat = NULL;
2422   GNUNET_free (plugin);
2423   GNUNET_free (api);
2424   return NULL;
2425 }
2426
2427
2428 /* end of plugin_transport_udp.c */