ae710c73cf1b81806b5a6f88b8a66b76a9a4ccb6
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 struct Session
90 {
91   /**
92    * Which peer is this session for?
93    */
94   struct GNUNET_PeerIdentity target;
95
96   /**
97    * Address of the other peer
98    */
99   const struct sockaddr *sock_addr;
100
101   size_t addrlen;
102
103   /**
104    * Desired delay for next sending we send to other peer
105    */
106   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
107
108   /**
109    * Desired delay for next sending we received from other peer
110    */
111   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
112
113   /**
114    * expected delay for ACKs
115    */
116   struct GNUNET_TIME_Relative last_expected_delay;
117
118   struct GNUNET_ATS_Information ats;
119
120   struct FragmentationContext * frag_ctx;
121
122   unsigned int rc;
123
124   int in_destroy;
125 };
126
127
128 struct SessionCompareContext
129 {
130   struct Session *res;
131   const struct GNUNET_HELLO_Address *addr;
132 };
133
134
135 /**
136  * Closure for 'process_inbound_tokenized_messages'
137  */
138 struct SourceInformation
139 {
140   /**
141    * Sender identity.
142    */
143   struct GNUNET_PeerIdentity sender;
144
145   /**
146    * Source address.
147    */
148   const void *arg;
149
150   /**
151    * Number of bytes in source address.
152    */
153   size_t args;
154
155   struct Session *session;
156 };
157
158
159 /**
160  * Closure for 'find_receive_context'.
161  */
162 struct FindReceiveContext
163 {
164   /**
165    * Where to store the result.
166    */
167   struct DefragContext *rc;
168
169   /**
170    * Address to find.
171    */
172   const struct sockaddr *addr;
173
174   /**
175    * Number of bytes in 'addr'.
176    */
177   socklen_t addr_len;
178
179   struct Session *session;
180 };
181
182
183
184 /**
185  * Data structure to track defragmentation contexts based
186  * on the source of the UDP traffic.
187  */
188 struct DefragContext
189 {
190
191   /**
192    * Defragmentation context.
193    */
194   struct GNUNET_DEFRAGMENT_Context *defrag;
195
196   /**
197    * Source address this receive context is for (allocated at the
198    * end of the struct).
199    */
200   const struct sockaddr *src_addr;
201
202   /**
203    * Reference to master plugin struct.
204    */
205   struct Plugin *plugin;
206
207   /**
208    * Node in the defrag heap.
209    */
210   struct GNUNET_CONTAINER_HeapNode *hnode;
211
212   /**
213    * Length of 'src_addr'
214    */
215   size_t addr_len;
216 };
217
218
219
220 /**
221  * Closure for 'process_inbound_tokenized_messages'
222  */
223 struct FragmentationContext
224 {
225   struct FragmentationContext * next;
226   struct FragmentationContext * prev;
227
228   struct Plugin * plugin;
229   struct GNUNET_FRAGMENT_Context * frag;
230   struct Session * session;
231
232   struct GNUNET_TIME_Absolute timeout;
233
234
235   /**
236    * Function to call upon completion of the transmission.
237    */
238   GNUNET_TRANSPORT_TransmitContinuation cont;
239
240   /**
241    * Closure for 'cont'.
242    */
243   void *cont_cls;
244
245   size_t bytes_to_send;
246 };
247
248
249 struct UDPMessageWrapper
250 {
251   struct Session *session;
252   struct UDPMessageWrapper *prev;
253   struct UDPMessageWrapper *next;
254   char *udp;
255   size_t msg_size;
256
257   struct GNUNET_TIME_Absolute timeout;
258
259   /**
260    * Function to call upon completion of the transmission.
261    */
262   GNUNET_TRANSPORT_TransmitContinuation cont;
263
264   /**
265    * Closure for 'cont'.
266    */
267   void *cont_cls;
268
269   struct FragmentationContext *frag_ctx;
270
271 };
272
273
274 /**
275  * UDP ACK Message-Packet header (after defragmentation).
276  */
277 struct UDP_ACK_Message
278 {
279   /**
280    * Message header.
281    */
282   struct GNUNET_MessageHeader header;
283
284   /**
285    * Desired delay for flow control
286    */
287   uint32_t delay;
288
289   /**
290    * What is the identity of the sender
291    */
292   struct GNUNET_PeerIdentity sender;
293
294 };
295
296
297 /**
298  * We have been notified that our readset has something to read.  We don't
299  * know which socket needs to be read, so we have to check each one
300  * Then reschedule this function to be called again once more is available.
301  *
302  * @param cls the plugin handle
303  * @param tc the scheduling context (for rescheduling this function again)
304  */
305 static void
306 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
307
308 /**
309  * We have been notified that our readset has something to read.  We don't
310  * know which socket needs to be read, so we have to check each one
311  * Then reschedule this function to be called again once more is available.
312  *
313  * @param cls the plugin handle
314  * @param tc the scheduling context (for rescheduling this function again)
315  */
316 static void
317 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
318
319 /**
320  * Function called for a quick conversion of the binary address to
321  * a numeric address.  Note that the caller must not free the
322  * address and that the next call to this function is allowed
323  * to override the address again.
324  *
325  * @param cls closure
326  * @param addr binary address
327  * @param addrlen length of the address
328  * @return string representing the same address
329  */
330 const char *
331 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
332 {
333   static char rbuf[INET6_ADDRSTRLEN + 10];
334   char buf[INET6_ADDRSTRLEN];
335   const void *sb;
336   struct in_addr a4;
337   struct in6_addr a6;
338   const struct IPv4UdpAddress *t4;
339   const struct IPv6UdpAddress *t6;
340   int af;
341   uint16_t port;
342
343   if (addrlen == sizeof (struct IPv6UdpAddress))
344   {
345     t6 = addr;
346     af = AF_INET6;
347     port = ntohs (t6->u6_port);
348     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
349     sb = &a6;
350   }
351   else if (addrlen == sizeof (struct IPv4UdpAddress))
352   {
353     t4 = addr;
354     af = AF_INET;
355     port = ntohs (t4->u4_port);
356     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
357     sb = &a4;
358   }
359   else
360   {
361     GNUNET_break_op (0);
362     return NULL;
363   }
364   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
365   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
366                    buf, port);
367   return rbuf;
368 }
369
370
371 /**
372  * Function called to convert a string address to
373  * a binary address.
374  *
375  * @param cls closure ('struct Plugin*')
376  * @param addr string address
377  * @param addrlen length of the address
378  * @param buf location to store the buffer
379  * @param added location to store the number of bytes in the buffer.
380  *        If the function returns GNUNET_SYSERR, its contents are undefined.
381  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
382  */
383 int
384 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
385     void **buf, size_t *added)
386 {
387   struct sockaddr_storage socket_address;
388
389   if ((NULL == addr) || (addrlen == 0))
390   {
391     GNUNET_break (0);
392     return GNUNET_SYSERR;
393   }
394
395   if ('\0' != addr[addrlen - 1])
396   {
397     return GNUNET_SYSERR;
398   }
399
400   if (strlen (addr) != addrlen - 1)
401   {
402     return GNUNET_SYSERR;
403   }
404
405   int ret = GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
406     &socket_address);
407
408   if (ret != GNUNET_OK)
409   {
410     return GNUNET_SYSERR;
411   }
412
413   if (socket_address.ss_family == AF_INET)
414   {
415     struct IPv4UdpAddress *u4;
416     struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
417     u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
418     u4->ipv4_addr = in4->sin_addr.s_addr;
419     u4->u4_port = in4->sin_port;
420     *buf = u4;
421     *added = sizeof (struct IPv4UdpAddress);
422     return GNUNET_OK;
423   }
424   else if (socket_address.ss_family == AF_INET6)
425   {
426     struct IPv6UdpAddress *u6;
427     struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
428     u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
429     u6->ipv6_addr = in6->sin6_addr;
430     u6->u6_port = in6->sin6_port;
431     *buf = u6;
432     *added = sizeof (struct IPv6UdpAddress);
433     return GNUNET_OK;
434   }
435   return GNUNET_SYSERR;
436 }
437
438
439 /**
440  * Append our port and forward the result.
441  *
442  * @param cls a 'struct PrettyPrinterContext'
443  * @param hostname result from DNS resolver
444  */
445 static void
446 append_port (void *cls, const char *hostname)
447 {
448   struct PrettyPrinterContext *ppc = cls;
449   char *ret;
450
451   if (hostname == NULL)
452   {
453     ppc->asc (ppc->asc_cls, NULL);
454     GNUNET_free (ppc);
455     return;
456   }
457   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
458   ppc->asc (ppc->asc_cls, ret);
459   GNUNET_free (ret);
460 }
461
462
463 /**
464  * Convert the transports address to a nice, human-readable
465  * format.
466  *
467  * @param cls closure
468  * @param type name of the transport that generated the address
469  * @param addr one of the addresses of the host, NULL for the last address
470  *        the specific address format depends on the transport
471  * @param addrlen length of the address
472  * @param numeric should (IP) addresses be displayed in numeric form?
473  * @param timeout after how long should we give up?
474  * @param asc function to call on each string
475  * @param asc_cls closure for asc
476  */
477 static void
478 udp_plugin_address_pretty_printer (void *cls, const char *type,
479                                    const void *addr, size_t addrlen,
480                                    int numeric,
481                                    struct GNUNET_TIME_Relative timeout,
482                                    GNUNET_TRANSPORT_AddressStringCallback asc,
483                                    void *asc_cls)
484 {
485   struct PrettyPrinterContext *ppc;
486   const void *sb;
487   size_t sbs;
488   struct sockaddr_in a4;
489   struct sockaddr_in6 a6;
490   const struct IPv4UdpAddress *u4;
491   const struct IPv6UdpAddress *u6;
492   uint16_t port;
493
494   if (addrlen == sizeof (struct IPv6UdpAddress))
495   {
496     u6 = addr;
497     memset (&a6, 0, sizeof (a6));
498     a6.sin6_family = AF_INET6;
499 #if HAVE_SOCKADDR_IN_SIN_LEN
500     a6.sin6_len = sizeof (a6);
501 #endif
502     a6.sin6_port = u6->u6_port;
503     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
504     port = ntohs (u6->u6_port);
505     sb = &a6;
506     sbs = sizeof (a6);
507   }
508   else if (addrlen == sizeof (struct IPv4UdpAddress))
509   {
510     u4 = addr;
511     memset (&a4, 0, sizeof (a4));
512     a4.sin_family = AF_INET;
513 #if HAVE_SOCKADDR_IN_SIN_LEN
514     a4.sin_len = sizeof (a4);
515 #endif
516     a4.sin_port = u4->u4_port;
517     a4.sin_addr.s_addr = u4->ipv4_addr;
518     port = ntohs (u4->u4_port);
519     sb = &a4;
520     sbs = sizeof (a4);
521   }
522   else if (0 == addrlen)
523   {
524     asc (asc_cls, "<inbound connection>");
525     asc (asc_cls, NULL);
526     return;
527   }
528   else
529   {
530     /* invalid address */
531     GNUNET_break_op (0);
532     asc (asc_cls, NULL);
533     return;
534   }
535   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
536   ppc->asc = asc;
537   ppc->asc_cls = asc_cls;
538   ppc->port = port;
539   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
540 }
541
542 static void
543 call_continuation (struct UDPMessageWrapper *udpw, int result)
544 {
545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546       "Calling continuation for %u byte message to `%s' with result %s\n",
547       udpw->msg_size, GNUNET_i2s (&udpw->session->target),
548       (GNUNET_OK == result) ? "OK" : "SYSERR");
549   if (NULL != udpw->cont)
550   {
551     udpw->cont (udpw->cont_cls, &udpw->session->target,result);
552   }
553
554 }
555
556 /**
557  * Check if the given port is plausible (must be either our listen
558  * port or our advertised port).  If it is neither, we return
559  * GNUNET_SYSERR.
560  *
561  * @param plugin global variables
562  * @param in_port port number to check
563  * @return GNUNET_OK if port is either open_port or adv_port
564  */
565 static int
566 check_port (struct Plugin *plugin, uint16_t in_port)
567 {
568   if ((in_port == plugin->port) || (in_port == plugin->aport))
569     return GNUNET_OK;
570   return GNUNET_SYSERR;
571 }
572
573
574
575 /**
576  * Function that will be called to check if a binary address for this
577  * plugin is well-formed and corresponds to an address for THIS peer
578  * (as per our configuration).  Naturally, if absolutely necessary,
579  * plugins can be a bit conservative in their answer, but in general
580  * plugins should make sure that the address does not redirect
581  * traffic to a 3rd party that might try to man-in-the-middle our
582  * traffic.
583  *
584  * @param cls closure, should be our handle to the Plugin
585  * @param addr pointer to the address
586  * @param addrlen length of addr
587  * @return GNUNET_OK if this is a plausible address for this peer
588  *         and transport, GNUNET_SYSERR if not
589  *
590  */
591 static int
592 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
593 {
594   struct Plugin *plugin = cls;
595   struct IPv4UdpAddress *v4;
596   struct IPv6UdpAddress *v6;
597
598   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
599       (addrlen != sizeof (struct IPv6UdpAddress)))
600   {
601     GNUNET_break_op (0);
602     return GNUNET_SYSERR;
603   }
604   if (addrlen == sizeof (struct IPv4UdpAddress))
605   {
606     v4 = (struct IPv4UdpAddress *) addr;
607     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
608       return GNUNET_SYSERR;
609     if (GNUNET_OK !=
610         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
611                                  sizeof (struct in_addr)))
612       return GNUNET_SYSERR;
613   }
614   else
615   {
616     v6 = (struct IPv6UdpAddress *) addr;
617     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
618     {
619       GNUNET_break_op (0);
620       return GNUNET_SYSERR;
621     }
622     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
623       return GNUNET_SYSERR;
624     if (GNUNET_OK !=
625         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
626                                  sizeof (struct in6_addr)))
627       return GNUNET_SYSERR;
628   }
629   return GNUNET_OK;
630 }
631
632
633 /**
634  * Task to free resources associated with a session.
635  *
636  * @param s session to free
637  */
638 static void
639 free_session (struct Session *s)
640 {
641   if (s->frag_ctx != NULL)
642   {
643     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
644     GNUNET_free (s->frag_ctx);
645     s->frag_ctx = NULL;
646   }
647   GNUNET_free (s);
648 }
649
650
651 /**
652  * Destroy a session, plugin is being unloaded.
653  *
654  * @param cls unused
655  * @param key hash of public key of target peer
656  * @param value a 'struct PeerSession*' to clean up
657  * @return GNUNET_OK (continue to iterate)
658  */
659 static int
660 disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
661 {
662   struct Plugin *plugin = cls;
663   struct Session *s = value;
664   struct UDPMessageWrapper *udpw;
665   struct UDPMessageWrapper *next;
666
667   GNUNET_assert (GNUNET_YES != s->in_destroy);
668   LOG (GNUNET_ERROR_TYPE_DEBUG,
669        "Session %p to peer `%s' address ended \n",
670          s,
671          GNUNET_i2s (&s->target),
672          GNUNET_a2s (s->sock_addr, s->addrlen));
673   next = plugin->ipv4_queue_head;
674   while (NULL != (udpw = next))
675   {
676     next = udpw->next;
677     if (udpw->session == s)
678     {
679       GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
680       call_continuation(udpw, GNUNET_SYSERR);
681       GNUNET_free (udpw);
682     }
683   }
684   next = plugin->ipv6_queue_head;
685   while (NULL != (udpw = next))
686   {
687     next = udpw->next;
688     if (udpw->session == s)
689     {
690       GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
691       call_continuation(udpw, GNUNET_SYSERR);
692       GNUNET_free (udpw);
693     }
694     udpw = next;
695   }
696   plugin->env->session_end (plugin->env->cls, &s->target, s);
697
698   if (NULL != s->frag_ctx)
699   {
700     if (NULL != s->frag_ctx->cont)
701     {
702       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR);
703       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
704           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
705           GNUNET_i2s (&s->target));
706     }
707   }
708
709   GNUNET_assert (GNUNET_YES ==
710                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
711                                                        &s->target.hashPubKey,
712                                                        s));
713   GNUNET_STATISTICS_set(plugin->env->stats,
714                         "# UDP sessions active",
715                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
716                         GNUNET_NO);
717   if (s->rc > 0)
718     s->in_destroy = GNUNET_YES;
719   else
720     free_session (s);
721   return GNUNET_OK;
722 }
723
724
725 /**
726  * Disconnect from a remote node.  Clean up session if we have one for this peer
727  *
728  * @param cls closure for this call (should be handle to Plugin)
729  * @param target the peeridentity of the peer to disconnect
730  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
731  */
732 static void
733 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
734 {
735   struct Plugin *plugin = cls;
736   GNUNET_assert (plugin != NULL);
737
738   GNUNET_assert (target != NULL);
739   LOG (GNUNET_ERROR_TYPE_DEBUG,
740        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
741   /* Clean up sessions */
742   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
743 }
744
745 static struct Session *
746 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
747                 const void *addr, size_t addrlen,
748                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
749 {
750   struct Session *s;
751   const struct IPv4UdpAddress *t4;
752   const struct IPv6UdpAddress *t6;
753   struct sockaddr_in *v4;
754   struct sockaddr_in6 *v6;
755   size_t len;
756
757   switch (addrlen)
758   {
759   case sizeof (struct IPv4UdpAddress):
760     if (NULL == plugin->sockv4)
761     {
762       return NULL;
763     }
764     t4 = addr;
765     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
766     len = sizeof (struct sockaddr_in);
767     v4 = (struct sockaddr_in *) &s[1];
768     v4->sin_family = AF_INET;
769 #if HAVE_SOCKADDR_IN_SIN_LEN
770     v4->sin_len = sizeof (struct sockaddr_in);
771 #endif
772     v4->sin_port = t4->u4_port;
773     v4->sin_addr.s_addr = t4->ipv4_addr;
774     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
775     break;
776   case sizeof (struct IPv6UdpAddress):
777     if (NULL == plugin->sockv6)
778     {
779       return NULL;
780     }
781     t6 = addr;
782     s =
783         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
784     len = sizeof (struct sockaddr_in6);
785     v6 = (struct sockaddr_in6 *) &s[1];
786     v6->sin6_family = AF_INET6;
787 #if HAVE_SOCKADDR_IN_SIN_LEN
788     v6->sin6_len = sizeof (struct sockaddr_in6);
789 #endif
790     v6->sin6_port = t6->u6_port;
791     v6->sin6_addr = t6->ipv6_addr;
792     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
793     break;
794   default:
795     /* Must have a valid address to send to */
796     GNUNET_break_op (0);
797     return NULL;
798   }
799
800   s->addrlen = len;
801   s->target = *target;
802   s->sock_addr = (const struct sockaddr *) &s[1];
803   s->flow_delay_for_other_peer = GNUNET_TIME_relative_get_zero();
804   s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
805   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
806
807   return s;
808 }
809
810
811 static int
812 session_cmp_it (void *cls,
813                 const GNUNET_HashCode * key,
814                 void *value)
815 {
816   struct SessionCompareContext * cctx = cls;
817   const struct GNUNET_HELLO_Address *address = cctx->addr;
818   struct Session *s = value;
819
820   socklen_t s_addrlen = s->addrlen;
821
822 #if VERBOSE_UDP
823   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
824       udp_address_to_string (NULL, (void *) address->address, address->address_length),
825       GNUNET_a2s (s->sock_addr, s->addrlen));
826 #endif
827
828   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
829       (s_addrlen == sizeof (struct sockaddr_in)))
830   {
831     struct IPv4UdpAddress * u4 = NULL;
832     u4 = (struct IPv4UdpAddress *) address->address;
833     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
834     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
835         (u4->u4_port == s4->sin_port))
836     {
837       cctx->res = s;
838       return GNUNET_NO;
839     }
840
841   }
842   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
843       (s_addrlen == sizeof (struct sockaddr_in6)))
844   {
845     struct IPv6UdpAddress * u6 = NULL;
846     u6 = (struct IPv6UdpAddress *) address->address;
847     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
848     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
849         (u6->u6_port == s6->sin6_port))
850     {
851       cctx->res = s;
852       return GNUNET_NO;
853     }
854   }
855
856
857   return GNUNET_YES;
858 }
859
860
861 /**
862  * Creates a new outbound session the transport service will use to send data to the
863  * peer
864  *
865  * @param cls the plugin
866  * @param address the address
867  * @return the session or NULL of max connections exceeded
868  */
869 static struct Session *
870 udp_plugin_get_session (void *cls,
871                   const struct GNUNET_HELLO_Address *address)
872 {
873   struct Session * s = NULL;
874   struct Plugin * plugin = cls;
875   struct IPv6UdpAddress * udp_a6;
876   struct IPv4UdpAddress * udp_a4;
877
878   GNUNET_assert (plugin != NULL);
879   GNUNET_assert (address != NULL);
880
881
882   if ((address->address == NULL) ||
883       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
884       (address->address_length != sizeof (struct IPv6UdpAddress))))
885   {
886     GNUNET_break (0);
887     return NULL;
888   }
889
890   if (address->address_length == sizeof (struct IPv4UdpAddress))
891   {
892     if (plugin->sockv4 == NULL)
893       return NULL;
894     udp_a4 = (struct IPv4UdpAddress *) address->address;
895     if (udp_a4->u4_port == 0)
896       return NULL;
897   }
898
899   if (address->address_length == sizeof (struct IPv6UdpAddress))
900   {
901     if (plugin->sockv6 == NULL)
902       return NULL;
903     udp_a6 = (struct IPv6UdpAddress *) address->address;
904     if (udp_a6->u6_port == 0)
905       return NULL;
906   }
907
908   /* check if session already exists */
909   struct SessionCompareContext cctx;
910   cctx.addr = address;
911   cctx.res = NULL;
912 #if VERBOSE_UDP
913   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length));
914 #endif
915   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
916   if (cctx.res != NULL)
917   {
918 #if VERBOSE_UDP
919     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
920 #endif
921     return cctx.res;
922   }
923
924   /* otherwise create new */
925   s = create_session (plugin,
926       &address->peer,
927       address->address,
928       address->address_length,
929       NULL, NULL);
930 #if VERBOSE
931     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
932               "Creating new session %p for peer `%s' address `%s'\n",
933               s,
934               GNUNET_i2s(&address->peer),
935               udp_address_to_string(NULL,address->address,address->address_length));
936 #endif
937   GNUNET_assert (GNUNET_OK ==
938                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
939                                                     &s->target.hashPubKey,
940                                                     s,
941                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
942
943   GNUNET_STATISTICS_set(plugin->env->stats,
944                         "# UDP sessions active",
945                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
946                         GNUNET_NO);
947
948   return s;
949 }
950
951
952 static void 
953 enqueue (struct Plugin *plugin, struct UDPMessageWrapper * udpw)
954 {
955
956   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
957     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
958   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
959     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
960 }
961
962
963 /**
964  * Fragment message was transmitted via UDP, let fragmentation know
965  * to send the next fragment now.
966  *
967  * @param cls the 'struct UDPMessageWrapper' of the fragment
968  * @param target destination peer (ignored)
969  * @param result GNUNET_OK on success (ignored)
970  */
971 static void
972 send_next_fragment (void *cls,
973                     const struct GNUNET_PeerIdentity *target,
974                     int result)
975 {
976   struct UDPMessageWrapper *udpw = cls;
977
978   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
979 }
980
981
982 /**
983  * Function that is called with messages created by the fragmentation
984  * module.  In the case of the 'proc' callback of the
985  * GNUNET_FRAGMENT_context_create function, this function must
986  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
987  *
988  * @param cls closure, the 'struct FragmentationContext'
989  * @param msg the message that was created
990  */
991 static void
992 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
993 {
994   struct FragmentationContext *frag_ctx = cls;
995   struct Plugin *plugin = frag_ctx->plugin;
996   struct UDPMessageWrapper * udpw;
997   struct Session *s;
998
999   size_t msg_len = ntohs (msg->size);
1000
1001 #if VERBOSE_UDP
1002   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
1003 #endif
1004
1005   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
1006   udpw->session = frag_ctx->session;
1007   s = udpw->session;
1008   udpw->udp = (char *) &udpw[1];
1009
1010   udpw->msg_size = msg_len;
1011   udpw->cont = &send_next_fragment;
1012   udpw->cont_cls = udpw;
1013   udpw->timeout = frag_ctx->timeout;
1014   udpw->frag_ctx = frag_ctx;
1015   memcpy (udpw->udp, msg, msg_len);
1016
1017   enqueue (plugin, udpw);
1018
1019
1020   if (s->addrlen == sizeof (struct sockaddr_in))
1021   {
1022     if (plugin->with_v4_ws == GNUNET_NO)
1023     {
1024       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1025         GNUNET_SCHEDULER_cancel(plugin->select_task);
1026
1027       plugin->select_task =
1028           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1029                                        GNUNET_TIME_UNIT_FOREVER_REL,
1030                                        plugin->rs_v4,
1031                                        plugin->ws_v4,
1032                                        &udp_plugin_select, plugin);
1033       plugin->with_v4_ws = GNUNET_YES;
1034     }
1035   }
1036
1037   else if (s->addrlen == sizeof (struct sockaddr_in6))
1038   {
1039     if (plugin->with_v6_ws == GNUNET_NO)
1040     {
1041       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1042         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1043
1044       plugin->select_task_v6 =
1045           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1046                                        GNUNET_TIME_UNIT_FOREVER_REL,
1047                                        plugin->rs_v6,
1048                                        plugin->ws_v6,
1049                                        &udp_plugin_select_v6, plugin);
1050       plugin->with_v6_ws = GNUNET_YES;
1051     }
1052   }
1053 }
1054
1055
1056 /**
1057  * Function that can be used by the transport service to transmit
1058  * a message using the plugin.   Note that in the case of a
1059  * peer disconnecting, the continuation MUST be called
1060  * prior to the disconnect notification itself.  This function
1061  * will be called with this peer's HELLO message to initiate
1062  * a fresh connection to another peer.
1063  *
1064  * @param cls closure
1065  * @param s which session must be used
1066  * @param msgbuf the message to transmit
1067  * @param msgbuf_size number of bytes in 'msgbuf'
1068  * @param priority how important is the message (most plugins will
1069  *                 ignore message priority and just FIFO)
1070  * @param to how long to wait at most for the transmission (does not
1071  *                require plugins to discard the message after the timeout,
1072  *                just advisory for the desired delay; most plugins will ignore
1073  *                this as well)
1074  * @param cont continuation to call once the message has
1075  *        been transmitted (or if the transport is ready
1076  *        for the next transmission call; or if the
1077  *        peer disconnected...); can be NULL
1078  * @param cont_cls closure for cont
1079  * @return number of bytes used (on the physical network, with overheads);
1080  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1081  *         and does NOT mean that the message was not transmitted (DV)
1082  */
1083 static ssize_t
1084 udp_plugin_send (void *cls,
1085                   struct Session *s,
1086                   const char *msgbuf, size_t msgbuf_size,
1087                   unsigned int priority,
1088                   struct GNUNET_TIME_Relative to,
1089                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1090 {
1091   struct Plugin *plugin = cls;
1092   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
1093
1094   struct UDPMessageWrapper * udpw;
1095   struct UDPMessage *udp;
1096   char mbuf[mlen];
1097   GNUNET_assert (plugin != NULL);
1098   GNUNET_assert (s != NULL);
1099
1100   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1101     return GNUNET_SYSERR;
1102
1103    if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1104      return GNUNET_SYSERR;
1105
1106
1107   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1108   {
1109     GNUNET_break (0);
1110     return GNUNET_SYSERR;
1111   }
1112
1113   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1114   {
1115     GNUNET_break (0);
1116     return GNUNET_SYSERR;
1117   }
1118
1119   LOG (GNUNET_ERROR_TYPE_DEBUG,
1120        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1121          mlen,
1122          GNUNET_i2s (&s->target),
1123          GNUNET_a2s(s->sock_addr, s->addrlen));
1124
1125   /* Message */
1126   udp = (struct UDPMessage *) mbuf;
1127   udp->header.size = htons (mlen);
1128   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1129   udp->reserved = htonl (0);
1130   udp->sender = *plugin->env->my_identity;
1131
1132   if (mlen <= UDP_MTU)
1133   {
1134     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
1135     udpw->session = s;
1136     udpw->udp = (char *) &udpw[1];
1137     udpw->msg_size = mlen;
1138     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1139     udpw->cont = cont;
1140     udpw->cont_cls = cont_cls;
1141     udpw->frag_ctx = NULL;
1142     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
1143     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1144
1145     enqueue (plugin, udpw);
1146   }
1147   else
1148   {
1149     LOG (GNUNET_ERROR_TYPE_DEBUG,
1150          "UDP has to fragment message \n");
1151     if  (s->frag_ctx != NULL)
1152       return GNUNET_SYSERR;
1153     memcpy (&udp[1], msgbuf, msgbuf_size);
1154     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
1155
1156     frag_ctx->plugin = plugin;
1157     frag_ctx->session = s;
1158     frag_ctx->cont = cont;
1159     frag_ctx->cont_cls = cont_cls;
1160     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1161     frag_ctx->bytes_to_send = mlen;
1162     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1163               UDP_MTU,
1164               &plugin->tracker,
1165               s->last_expected_delay,
1166               &udp->header,
1167               &enqueue_fragment,
1168               frag_ctx);
1169
1170     s->frag_ctx = frag_ctx;
1171   }
1172
1173   if (s->addrlen == sizeof (struct sockaddr_in))
1174   {
1175     if (plugin->with_v4_ws == GNUNET_NO)
1176     {
1177       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1178         GNUNET_SCHEDULER_cancel(plugin->select_task);
1179
1180       plugin->select_task =
1181           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1182                                        GNUNET_TIME_UNIT_FOREVER_REL,
1183                                        plugin->rs_v4,
1184                                        plugin->ws_v4,
1185                                        &udp_plugin_select, plugin);
1186       plugin->with_v4_ws = GNUNET_YES;
1187     }
1188   }
1189
1190   else if (s->addrlen == sizeof (struct sockaddr_in6))
1191   {
1192     if (plugin->with_v6_ws == GNUNET_NO)
1193     {
1194       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1195         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1196
1197       plugin->select_task_v6 =
1198         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1199                                      GNUNET_TIME_UNIT_FOREVER_REL,
1200                                      plugin->rs_v6,
1201                                      plugin->ws_v6,
1202                                      &udp_plugin_select_v6, plugin);
1203       plugin->with_v6_ws = GNUNET_YES;
1204     }
1205   }
1206
1207   return mlen;
1208 }
1209
1210
1211 /**
1212  * Our external IP address/port mapping has changed.
1213  *
1214  * @param cls closure, the 'struct LocalAddrList'
1215  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1216  *     the previous (now invalid) one
1217  * @param addr either the previous or the new public IP address
1218  * @param addrlen actual lenght of the address
1219  */
1220 static void
1221 udp_nat_port_map_callback (void *cls, int add_remove,
1222                            const struct sockaddr *addr, socklen_t addrlen)
1223 {
1224   struct Plugin *plugin = cls;
1225   struct IPv4UdpAddress u4;
1226   struct IPv6UdpAddress u6;
1227   void *arg;
1228   size_t args;
1229
1230   /* convert 'addr' to our internal format */
1231   switch (addr->sa_family)
1232   {
1233   case AF_INET:
1234     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1235     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1236     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1237     arg = &u4;
1238     args = sizeof (u4);
1239     break;
1240   case AF_INET6:
1241     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1242     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1243             sizeof (struct in6_addr));
1244     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1245     arg = &u6;
1246     args = sizeof (u6);
1247     break;
1248   default:
1249     GNUNET_break (0);
1250     return;
1251   }
1252   /* modify our published address list */
1253   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args);
1254 }
1255
1256
1257
1258 /**
1259  * Message tokenizer has broken up an incomming message. Pass it on
1260  * to the service.
1261  *
1262  * @param cls the 'struct Plugin'
1263  * @param client the 'struct SourceInformation'
1264  * @param hdr the actual message
1265  */
1266 static void
1267 process_inbound_tokenized_messages (void *cls, void *client,
1268                                     const struct GNUNET_MessageHeader *hdr)
1269 {
1270   struct Plugin *plugin = cls;
1271   struct SourceInformation *si = client;
1272   struct GNUNET_ATS_Information ats[2];
1273   struct GNUNET_TIME_Relative delay;
1274
1275   GNUNET_assert (si->session != NULL);
1276   if (GNUNET_YES == si->session->in_destroy)
1277     return; 
1278   /* setup ATS */
1279   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1280   ats[0].value = htonl (1);
1281   ats[1] = si->session->ats;
1282   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1283
1284   delay = plugin->env->receive (plugin->env->cls,
1285                 &si->sender,
1286                 hdr,
1287                 (const struct GNUNET_ATS_Information *) &ats, 2,
1288                 NULL,
1289                 si->arg,
1290                 si->args);
1291   si->session->flow_delay_for_other_peer = delay;
1292 }
1293
1294
1295 /**
1296  * We've received a UDP Message.  Process it (pass contents to main service).
1297  *
1298  * @param plugin plugin context
1299  * @param msg the message
1300  * @param sender_addr sender address
1301  * @param sender_addr_len number of bytes in sender_addr
1302  */
1303 static void
1304 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1305                      const struct sockaddr *sender_addr,
1306                      socklen_t sender_addr_len)
1307 {
1308   struct SourceInformation si;
1309   struct Session * s;
1310   struct IPv4UdpAddress u4;
1311   struct IPv6UdpAddress u6;
1312   const void *arg;
1313   size_t args;
1314
1315   if (0 != ntohl (msg->reserved))
1316   {
1317     GNUNET_break_op (0);
1318     return;
1319   }
1320   if (ntohs (msg->header.size) <
1321       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1322   {
1323     GNUNET_break_op (0);
1324     return;
1325   }
1326
1327   /* convert address */
1328   switch (sender_addr->sa_family)
1329   {
1330   case AF_INET:
1331     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1332     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1333     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1334     arg = &u4;
1335     args = sizeof (u4);
1336     break;
1337   case AF_INET6:
1338     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1339     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1340     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1341     arg = &u6;
1342     args = sizeof (u6);
1343     break;
1344   default:
1345     GNUNET_break (0);
1346     return;
1347   }
1348   LOG (GNUNET_ERROR_TYPE_DEBUG,
1349        "Received message with %u bytes from peer `%s' at `%s'\n",
1350        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1351        GNUNET_a2s (sender_addr, sender_addr_len));
1352
1353   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1354   s = udp_plugin_get_session(plugin, address);
1355   GNUNET_free (address);
1356
1357   /* iterate over all embedded messages */
1358   si.session = s;
1359   si.sender = msg->sender;
1360   si.arg = arg;
1361   si.args = args;
1362   s->rc++;
1363   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1364                              ntohs (msg->header.size) -
1365                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1366   s->rc--;
1367   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1368     free_session (s);
1369 }
1370
1371
1372 /**
1373  * Scan the heap for a receive context with the given address.
1374  *
1375  * @param cls the 'struct FindReceiveContext'
1376  * @param node internal node of the heap
1377  * @param element value stored at the node (a 'struct ReceiveContext')
1378  * @param cost cost associated with the node
1379  * @return GNUNET_YES if we should continue to iterate,
1380  *         GNUNET_NO if not.
1381  */
1382 static int
1383 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1384                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1385 {
1386   struct FindReceiveContext *frc = cls;
1387   struct DefragContext *e = element;
1388
1389   if ((frc->addr_len == e->addr_len) &&
1390       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1391   {
1392     frc->rc = e;
1393     return GNUNET_NO;
1394   }
1395   return GNUNET_YES;
1396 }
1397
1398
1399 /**
1400  * Process a defragmented message.
1401  *
1402  * @param cls the 'struct ReceiveContext'
1403  * @param msg the message
1404  */
1405 static void
1406 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1407 {
1408   struct DefragContext *rc = cls;
1409
1410   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1411   {
1412     GNUNET_break (0);
1413     return;
1414   }
1415   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1416   {
1417     GNUNET_break (0);
1418     return;
1419   }
1420   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1421                        rc->src_addr, rc->addr_len);
1422 }
1423
1424 struct LookupContext
1425 {
1426   const struct sockaddr * addr;
1427   size_t addrlen;
1428
1429   struct Session *res;
1430 };
1431
1432 static int
1433 lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value)
1434 {
1435   struct LookupContext *l_ctx = cls;
1436   struct Session * s = value;
1437
1438   if ((s->addrlen == l_ctx->addrlen) &&
1439       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1440   {
1441     l_ctx->res = s;
1442     return GNUNET_NO;
1443   }
1444   return GNUNET_YES;
1445 }
1446
1447 /**
1448  * Transmit an acknowledgement.
1449  *
1450  * @param cls the 'struct ReceiveContext'
1451  * @param id message ID (unused)
1452  * @param msg ack to transmit
1453  */
1454 static void
1455 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1456 {
1457   struct DefragContext *rc = cls;
1458
1459   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1460   struct UDP_ACK_Message *udp_ack;
1461   uint32_t delay = 0;
1462   struct UDPMessageWrapper *udpw;
1463   struct Session *s;
1464
1465   struct LookupContext l_ctx;
1466   l_ctx.addr = rc->src_addr;
1467   l_ctx.addrlen = rc->addr_len;
1468   l_ctx.res = NULL;
1469   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1470       &lookup_session_by_addr_it,
1471       &l_ctx);
1472   s = l_ctx.res;
1473
1474   if (NULL == s)
1475     return;
1476
1477   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1478     delay = s->flow_delay_for_other_peer.rel_value;
1479
1480   LOG (GNUNET_ERROR_TYPE_DEBUG,
1481        "Sending ACK to `%s' including delay of %u ms\n",
1482        GNUNET_a2s (rc->src_addr,
1483                    (rc->src_addr->sa_family ==
1484                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1485                                                                      sockaddr_in6)),
1486        delay);
1487   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
1488   udpw->cont = NULL;
1489   udpw->cont_cls = NULL;
1490   udpw->frag_ctx = NULL;
1491   udpw->msg_size = msize;
1492   udpw->session = s;
1493   udpw->timeout = GNUNET_TIME_absolute_get_forever();
1494   udpw->udp = (char *)&udpw[1];
1495
1496   udp_ack = (struct UDP_ACK_Message *) udpw->udp;
1497   udp_ack->header.size = htons ((uint16_t) msize);
1498   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1499   udp_ack->delay = htonl (delay);
1500   udp_ack->sender = *rc->plugin->env->my_identity;
1501   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1502
1503   enqueue (rc->plugin, udpw);
1504 }
1505
1506
1507 static void read_process_msg (struct Plugin *plugin,
1508     const struct GNUNET_MessageHeader *msg,
1509     char *addr,
1510     socklen_t fromlen)
1511 {
1512   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1513   {
1514     GNUNET_break_op (0);
1515     return;
1516   }
1517   process_udp_message (plugin, (const struct UDPMessage *) msg,
1518                        (const struct sockaddr *) addr, fromlen);
1519   return;
1520 }
1521
1522 static void read_process_ack (struct Plugin *plugin,
1523     const struct GNUNET_MessageHeader *msg,
1524     char *addr,
1525     socklen_t fromlen)
1526 {
1527   const struct GNUNET_MessageHeader *ack;
1528   const struct UDP_ACK_Message *udp_ack;
1529   struct LookupContext l_ctx;
1530   struct Session *s = NULL;
1531   struct GNUNET_TIME_Relative flow_delay;
1532
1533   if (ntohs (msg->size) <
1534       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1535   {
1536     GNUNET_break_op (0);
1537     return;
1538   }
1539
1540   udp_ack = (const struct UDP_ACK_Message *) msg;
1541
1542   l_ctx.addr = (const struct sockaddr *) addr;
1543   l_ctx.addrlen = fromlen;
1544   l_ctx.res = NULL;
1545   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1546       &lookup_session_by_addr_it,
1547       &l_ctx);
1548   s = l_ctx.res;
1549
1550   if ((s == NULL) || (s->frag_ctx == NULL))
1551     return;
1552
1553   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1554   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1555        flow_delay.rel_value);
1556   s->flow_delay_from_other_peer =
1557       GNUNET_TIME_relative_to_absolute (flow_delay);
1558
1559   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1560   if (ntohs (ack->size) !=
1561       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1562   {
1563     GNUNET_break_op (0);
1564     return;
1565   }
1566
1567   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1568   {
1569   LOG (GNUNET_ERROR_TYPE_DEBUG,
1570        "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1571        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1572        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1573     return;
1574   }
1575
1576   LOG (GNUNET_ERROR_TYPE_DEBUG,
1577        "FULL MESSAGE ACKed\n",
1578        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1579        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1580   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1581
1582   struct UDPMessageWrapper * udpw = NULL;
1583   struct UDPMessageWrapper * tmp = NULL;
1584   if (s->addrlen == sizeof (struct sockaddr_in6))
1585   {
1586     udpw = plugin->ipv6_queue_head;
1587     while (udpw!= NULL)
1588     {
1589       tmp = udpw->next;
1590       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1591       {
1592         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1593         GNUNET_free (udpw);
1594       }
1595       udpw = tmp;
1596     }
1597   }
1598   if (s->addrlen == sizeof (struct sockaddr_in))
1599   {
1600     udpw = plugin->ipv4_queue_head;
1601     while (udpw!= NULL)
1602     {
1603       tmp = udpw->next;
1604       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1605       {
1606         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1607         GNUNET_free (udpw);
1608       }
1609       udpw = tmp;
1610     }
1611   }
1612
1613   if (s->frag_ctx->cont != NULL)
1614   {
1615     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1616         "Calling continuation for fragmented message to `%s' with result %s\n",
1617         GNUNET_i2s (&s->target), "OK");
1618     s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
1619   }
1620
1621   GNUNET_free (s->frag_ctx);
1622   s->frag_ctx = NULL;
1623   return;
1624 }
1625
1626 static void read_process_fragment (struct Plugin *plugin,
1627     const struct GNUNET_MessageHeader *msg,
1628     char *addr,
1629     socklen_t fromlen)
1630 {
1631   struct DefragContext *d_ctx;
1632   struct GNUNET_TIME_Absolute now;
1633   struct FindReceiveContext frc;
1634
1635
1636   frc.rc = NULL;
1637   frc.addr = (const struct sockaddr *) addr;
1638   frc.addr_len = fromlen;
1639
1640   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1641        (unsigned int) ntohs (msg->size),
1642        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1643   /* Lookup existing receive context for this address */
1644   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1645                                  &find_receive_context,
1646                                  &frc);
1647   now = GNUNET_TIME_absolute_get ();
1648   d_ctx = frc.rc;
1649
1650   if (d_ctx == NULL)
1651   {
1652     /* Create a new defragmentation context */
1653     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1654     memcpy (&d_ctx[1], addr, fromlen);
1655     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1656     d_ctx->addr_len = fromlen;
1657     d_ctx->plugin = plugin;
1658     d_ctx->defrag =
1659         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1660                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1661                                           &fragment_msg_proc, &ack_proc);
1662     d_ctx->hnode =
1663         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1664                                       (GNUNET_CONTAINER_HeapCostType)
1665                                       now.abs_value);
1666   LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n",
1667        (unsigned int) ntohs (msg->size),
1668        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1669   }
1670   else
1671   {
1672   LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1673        (unsigned int) ntohs (msg->size),
1674        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1675   }
1676
1677   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1678   {
1679     /* keep this 'rc' from expiring */
1680     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1681                                        (GNUNET_CONTAINER_HeapCostType)
1682                                        now.abs_value);
1683   }
1684   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1685       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1686   {
1687     /* remove 'rc' that was inactive the longest */
1688     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1689     GNUNET_assert (NULL != d_ctx);
1690     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1691     GNUNET_free (d_ctx);
1692   }
1693 }
1694
1695 /**
1696  * Read and process a message from the given socket.
1697  *
1698  * @param plugin the overall plugin
1699  * @param rsock socket to read from
1700  */
1701 static void
1702 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1703 {
1704   socklen_t fromlen;
1705   char addr[32];
1706   char buf[65536] GNUNET_ALIGN;
1707   ssize_t size;
1708   const struct GNUNET_MessageHeader *msg;
1709
1710   fromlen = sizeof (addr);
1711   memset (&addr, 0, sizeof (addr));
1712   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1713                                       (struct sockaddr *) &addr, &fromlen);
1714
1715   if (size < sizeof (struct GNUNET_MessageHeader))
1716   {
1717     GNUNET_break_op (0);
1718     return;
1719   }
1720   msg = (const struct GNUNET_MessageHeader *) buf;
1721
1722   LOG (GNUNET_ERROR_TYPE_DEBUG,
1723        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1724        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1725
1726   if (size != ntohs (msg->size))
1727   {
1728     GNUNET_break_op (0);
1729     return;
1730   }
1731
1732   switch (ntohs (msg->type))
1733   {
1734   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1735     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1736     return;
1737
1738   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1739     read_process_msg (plugin, msg, addr, fromlen);
1740     return;
1741
1742   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1743     read_process_ack (plugin, msg, addr, fromlen);
1744     return;
1745
1746   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1747     read_process_fragment (plugin, msg, addr, fromlen);
1748     return;
1749
1750   default:
1751     GNUNET_break_op (0);
1752     return;
1753   }
1754 }
1755
1756
1757 static size_t
1758 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
1759 {
1760   ssize_t sent;
1761   size_t slen;
1762   struct GNUNET_TIME_Absolute max;
1763   struct GNUNET_TIME_Absolute ;
1764
1765   struct UDPMessageWrapper *udpw = NULL;
1766
1767   if (sock == plugin->sockv4)
1768   {
1769     udpw = plugin->ipv4_queue_head;
1770   }
1771   else if (sock == plugin->sockv6)
1772   {
1773     udpw = plugin->ipv6_queue_head;
1774   }
1775   else
1776   {
1777     GNUNET_break (0);
1778     return 0;
1779   }
1780
1781   const struct sockaddr * sa = udpw->session->sock_addr;
1782   slen = udpw->session->addrlen;
1783
1784   max = GNUNET_TIME_absolute_max(udpw->timeout, GNUNET_TIME_absolute_get());
1785
1786   while (udpw != NULL)
1787   {
1788     if (max.abs_value != udpw->timeout.abs_value)
1789     {
1790       /* Message timed out */
1791       call_continuation(udpw, GNUNET_SYSERR);
1792       if (udpw->frag_ctx != NULL)
1793       {
1794         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n",
1795             GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
1796         udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
1797         GNUNET_free (udpw->frag_ctx);
1798         udpw->session->frag_ctx = NULL;
1799       }
1800       else
1801       {
1802         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' with size %u timed out\n",
1803             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1804       }
1805
1806       if (sock == plugin->sockv4)
1807       {
1808         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1809         GNUNET_free (udpw);
1810         udpw = plugin->ipv4_queue_head;
1811       }
1812       else if (sock == plugin->sockv6)
1813       {
1814         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1815         GNUNET_free (udpw);
1816         udpw = plugin->ipv6_queue_head;
1817       }
1818     }
1819     else
1820     {
1821       struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
1822       if (delta.rel_value == 0)
1823       {
1824         /* this message is not delayed */
1825         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is not delayed \n",
1826             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1827         break;
1828       }
1829       else
1830       {
1831         /* this message is delayed, try next */
1832         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is delayed for %llu \n",
1833             GNUNET_i2s(&udpw->session->target), udpw->msg_size,
1834             delta);
1835         udpw = udpw->next;
1836       }
1837     }
1838   }
1839
1840   if (udpw == NULL)
1841   {
1842     /* No message left */
1843     return 0;
1844   }
1845
1846   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->udp, udpw->msg_size, sa, slen);
1847
1848   if (GNUNET_SYSERR == sent)
1849   {
1850     const struct GNUNET_ATS_Information type = plugin->env->get_address_type
1851         (plugin->env->cls,sa, slen);
1852
1853     if ((GNUNET_ATS_NET_WAN == type.value) &&
1854         ((ENETUNREACH == errno) || (ENETDOWN == errno)))
1855     {
1856       /* "Network unreachable" or "Network down" */
1857       /*
1858        * This indicates that this system is IPv6 enabled, but does not
1859        * have a valid global IPv6 address assigned
1860        */
1861        LOG (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1862            _("UDP could not message to `%s': `%s'\n, " \
1863            "Please check your network configuration and disable IPv6 if your\n" \
1864            "connection does not have a global IPv6 address"),
1865            GNUNET_a2s (sa, slen),
1866            STRERROR (errno));
1867     }
1868     else
1869     {
1870       LOG (GNUNET_ERROR_TYPE_ERROR,
1871          "UDP could not transmit %u-byte message to `%s': `%s'\n",
1872          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen),
1873          STRERROR (errno));
1874     }
1875     call_continuation(udpw, GNUNET_SYSERR);
1876   }
1877   else
1878   {
1879     LOG (GNUNET_ERROR_TYPE_DEBUG,
1880          "UDP transmitted %u-byte message to `%s' (%d: %s)\n",
1881          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1882          (sent < 0) ? STRERROR (errno) : "ok");
1883     call_continuation(udpw, GNUNET_OK);
1884   }
1885
1886   if (sock == plugin->sockv4)
1887     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1888   else if (sock == plugin->sockv6)
1889     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1890   GNUNET_free (udpw);
1891   udpw = NULL;
1892
1893   return sent;
1894 }
1895
1896 /**
1897  * We have been notified that our readset has something to read.  We don't
1898  * know which socket needs to be read, so we have to check each one
1899  * Then reschedule this function to be called again once more is available.
1900  *
1901  * @param cls the plugin handle
1902  * @param tc the scheduling context (for rescheduling this function again)
1903  */
1904 static void
1905 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1906 {
1907   struct Plugin *plugin = cls;
1908
1909   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
1910   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1911     return;
1912   plugin->with_v4_ws = GNUNET_NO;
1913
1914   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1915   {
1916     if ((NULL != plugin->sockv4) &&
1917       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
1918         udp_select_read (plugin, plugin->sockv4);
1919
1920   }
1921
1922   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1923   {
1924     if ((NULL != plugin->sockv4) && (plugin->ipv4_queue_head != NULL) &&
1925       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
1926       {
1927         udp_select_send (plugin, plugin->sockv4);
1928       }
1929   }
1930
1931   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1932     GNUNET_SCHEDULER_cancel (plugin->select_task);
1933   plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1934                                    GNUNET_TIME_UNIT_FOREVER_REL,
1935                                    plugin->rs_v4,
1936                                    (plugin->ipv4_queue_head != NULL) ? plugin->ws_v4 : NULL,
1937                                    &udp_plugin_select, plugin);
1938   if (plugin->ipv4_queue_head != NULL)
1939     plugin->with_v4_ws = GNUNET_YES;
1940   else
1941     plugin->with_v4_ws = GNUNET_NO;
1942 }
1943
1944
1945 /**
1946  * We have been notified that our readset has something to read.  We don't
1947  * know which socket needs to be read, so we have to check each one
1948  * Then reschedule this function to be called again once more is available.
1949  *
1950  * @param cls the plugin handle
1951  * @param tc the scheduling context (for rescheduling this function again)
1952  */
1953 static void
1954 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1955 {
1956   struct Plugin *plugin = cls;
1957
1958   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
1959   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1960     return;
1961
1962   plugin->with_v6_ws = GNUNET_NO;
1963   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1964   {
1965     if ((NULL != plugin->sockv6) &&
1966       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
1967         udp_select_read (plugin, plugin->sockv6);
1968   }
1969
1970   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1971   {
1972     if ((NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
1973       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)))
1974       {
1975         udp_select_send (plugin, plugin->sockv6);
1976       }
1977   }
1978   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1979     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
1980   plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1981                                    GNUNET_TIME_UNIT_FOREVER_REL,
1982                                    plugin->rs_v6,
1983                                    (plugin->ipv6_queue_head != NULL) ? plugin->ws_v6 : NULL,
1984                                    &udp_plugin_select_v6, plugin);
1985   if (plugin->ipv6_queue_head != NULL)
1986     plugin->with_v6_ws = GNUNET_YES;
1987   else
1988     plugin->with_v6_ws = GNUNET_NO;
1989 }
1990
1991
1992 static int
1993 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
1994 {
1995   int tries;
1996   int sockets_created = 0;
1997   struct sockaddr *serverAddr;
1998   struct sockaddr *addrs[2];
1999   socklen_t addrlens[2];
2000   socklen_t addrlen;
2001
2002   /* Create IPv6 socket */
2003   if (plugin->enable_ipv6 == GNUNET_YES)
2004   {
2005     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2006     if (NULL == plugin->sockv6)
2007     {
2008       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2009       plugin->enable_ipv6 = GNUNET_NO;
2010     }
2011     else
2012     {
2013 #if HAVE_SOCKADDR_IN_SIN_LEN
2014       serverAddrv6->sin6_len = sizeof (serverAddrv6);
2015 #endif
2016       serverAddrv6->sin6_family = AF_INET6;
2017       serverAddrv6->sin6_addr = in6addr_any;
2018       serverAddrv6->sin6_port = htons (plugin->port);
2019       addrlen = sizeof (struct sockaddr_in6);
2020       serverAddr = (struct sockaddr *) serverAddrv6;
2021       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
2022            ntohs (serverAddrv6->sin6_port));
2023       tries = 0;
2024       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
2025              GNUNET_OK)
2026       {
2027         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
2028         LOG (GNUNET_ERROR_TYPE_DEBUG,
2029              "IPv6 Binding failed, trying new port %d\n",
2030              ntohs (serverAddrv6->sin6_port));
2031         tries++;
2032         if (tries > 10)
2033         {
2034           GNUNET_NETWORK_socket_close (plugin->sockv6);
2035           plugin->sockv6 = NULL;
2036           break;
2037         }
2038       }
2039       if (plugin->sockv6 != NULL)
2040       {
2041         LOG (GNUNET_ERROR_TYPE_DEBUG,
2042              "IPv6 socket created on port %d\n",
2043              ntohs (serverAddrv6->sin6_port));
2044         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
2045         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2046         sockets_created++;
2047       }
2048     }
2049   }
2050
2051   /* Create IPv4 socket */
2052   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2053   if (NULL == plugin->sockv4)
2054   {
2055     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2056   }
2057   else
2058   {
2059 #if HAVE_SOCKADDR_IN_SIN_LEN
2060     serverAddrv4->sin_len = sizeof (serverAddrv4);
2061 #endif
2062     serverAddrv4->sin_family = AF_INET;
2063     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2064     serverAddrv4->sin_port = htons (plugin->port);
2065     addrlen = sizeof (struct sockaddr_in);
2066     serverAddr = (struct sockaddr *) serverAddrv4;
2067
2068     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2069          ntohs (serverAddrv4->sin_port));
2070     tries = 0;
2071     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2072            GNUNET_OK)
2073     {
2074       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2075       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2076            ntohs (serverAddrv4->sin_port));
2077       tries++;
2078       if (tries > 10)
2079       {
2080         GNUNET_NETWORK_socket_close (plugin->sockv4);
2081         plugin->sockv4 = NULL;
2082         break;
2083       }
2084     }
2085     if (plugin->sockv4 != NULL)
2086     {
2087       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2088       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2089       sockets_created++;
2090     }
2091   }
2092
2093   /* Create file descriptors */
2094   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2095   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2096   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2097   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2098   if (NULL != plugin->sockv4)
2099   {
2100     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2101     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2102   }
2103
2104   if (sockets_created == 0)
2105     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2106
2107   plugin->select_task =
2108       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2109                                    GNUNET_TIME_UNIT_FOREVER_REL,
2110                                    plugin->rs_v4,
2111                                    NULL,
2112                                    &udp_plugin_select, plugin);
2113   plugin->with_v4_ws = GNUNET_NO;
2114
2115   if (plugin->enable_ipv6 == GNUNET_YES)
2116   {
2117     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2118     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2119     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2120     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2121     if (NULL != plugin->sockv6)
2122     {
2123       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2124       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2125     }
2126
2127     plugin->select_task_v6 =
2128         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2129                                      GNUNET_TIME_UNIT_FOREVER_REL,
2130                                      plugin->rs_v6,
2131                                      NULL,
2132                                      &udp_plugin_select_v6, plugin);
2133     plugin->with_v6_ws = GNUNET_NO;
2134   }
2135
2136   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2137                            GNUNET_NO, plugin->port,
2138                            sockets_created,
2139                            (const struct sockaddr **) addrs, addrlens,
2140                            &udp_nat_port_map_callback, NULL, plugin);
2141
2142   return sockets_created;
2143 }
2144
2145
2146 /**
2147  * The exported method. Makes the core api available via a global and
2148  * returns the udp transport API.
2149  *
2150  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2151  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2152  */
2153 void *
2154 libgnunet_plugin_transport_udp_init (void *cls)
2155 {
2156   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2157   struct GNUNET_TRANSPORT_PluginFunctions *api;
2158   struct Plugin *plugin;
2159
2160   unsigned long long port;
2161   unsigned long long aport;
2162   unsigned long long broadcast;
2163   unsigned long long udp_max_bps;
2164   unsigned long long enable_v6;
2165   char * bind4_address;
2166   char * bind6_address;
2167   struct GNUNET_TIME_Relative interval;
2168
2169   struct sockaddr_in serverAddrv4;
2170   struct sockaddr_in6 serverAddrv6;
2171
2172   int res;
2173
2174   if (NULL == env->receive)
2175   {
2176     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2177        initialze the plugin or the API */
2178     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2179     api->cls = NULL;
2180     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2181     api->address_to_string = &udp_address_to_string;
2182     api->string_to_address = &udp_string_to_address;
2183     return api;
2184   }
2185
2186   GNUNET_assert( NULL != env->stats);
2187
2188   /* Get port number */
2189   if (GNUNET_OK !=
2190       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2191                                              &port))
2192     port = 2086;
2193   if (GNUNET_OK !=
2194       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2195                                              "ADVERTISED_PORT", &aport))
2196     aport = port;
2197   if (port > 65535)
2198   {
2199     LOG (GNUNET_ERROR_TYPE_WARNING,
2200          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2201          65535);
2202     return NULL;
2203   }
2204
2205   /* Protocols */
2206   if ((GNUNET_YES ==
2207        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2208                                              "DISABLEV6")))
2209   {
2210     enable_v6 = GNUNET_NO;
2211   }
2212   else
2213     enable_v6 = GNUNET_YES;
2214
2215
2216   /* Addresses */
2217   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2218   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2219
2220   if (GNUNET_YES ==
2221       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2222                                              "BINDTO", &bind4_address))
2223   {
2224     LOG (GNUNET_ERROR_TYPE_DEBUG,
2225          "Binding udp plugin to specific address: `%s'\n",
2226          bind4_address);
2227     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2228     {
2229       GNUNET_free (bind4_address);
2230       return NULL;
2231     }
2232   }
2233
2234   if (GNUNET_YES ==
2235       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2236                                              "BINDTO6", &bind6_address))
2237   {
2238     LOG (GNUNET_ERROR_TYPE_DEBUG,
2239          "Binding udp plugin to specific address: `%s'\n",
2240          bind6_address);
2241     if (1 !=
2242         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2243     {
2244       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2245            bind6_address);
2246       GNUNET_free_non_null (bind4_address);
2247       GNUNET_free (bind6_address);
2248       return NULL;
2249     }
2250   }
2251
2252
2253   /* Enable neighbour discovery */
2254   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2255                                             "BROADCAST");
2256   if (broadcast == GNUNET_SYSERR)
2257     broadcast = GNUNET_NO;
2258
2259   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-udp",
2260                                            "BROADCAST_INTERVAL", &interval))
2261   {
2262     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2263   }
2264
2265   /* Maximum datarate */
2266   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2267                                              "MAX_BPS", &udp_max_bps))
2268   {
2269     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2270   }
2271
2272   plugin = GNUNET_malloc (sizeof (struct Plugin));
2273   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2274
2275   GNUNET_BANDWIDTH_tracker_init (&plugin->tracker,
2276                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2277
2278
2279   plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
2280   plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2281   plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
2282   plugin->port = port;
2283   plugin->aport = aport;
2284   plugin->broadcast_interval = interval;
2285   plugin->enable_ipv6 = enable_v6;
2286   plugin->env = env;
2287
2288   api->cls = plugin;
2289   api->send = NULL;
2290   api->disconnect = &udp_disconnect;
2291   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2292   api->address_to_string = &udp_address_to_string;
2293   api->string_to_address = &udp_string_to_address;
2294   api->check_address = &udp_plugin_check_address;
2295   api->get_session = &udp_plugin_get_session;
2296   api->send = &udp_plugin_send;
2297
2298   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2299   res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
2300   if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
2301   {
2302     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2303     GNUNET_free (plugin);
2304     GNUNET_free (api);
2305     return NULL;
2306   }
2307
2308   if (broadcast == GNUNET_YES)
2309   {
2310     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2311     setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
2312   }
2313
2314   GNUNET_free_non_null (bind4_address);
2315   GNUNET_free_non_null (bind6_address);
2316   return api;
2317 }
2318
2319
2320 static int
2321 heap_cleanup_iterator (void *cls,
2322                        struct GNUNET_CONTAINER_HeapNode *
2323                        node, void *element,
2324                        GNUNET_CONTAINER_HeapCostType
2325                        cost)
2326 {
2327   struct DefragContext * d_ctx = element;
2328
2329   GNUNET_CONTAINER_heap_remove_node (node);
2330   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2331   GNUNET_free (d_ctx);
2332
2333   return GNUNET_YES;
2334 }
2335
2336
2337 /**
2338  * The exported method. Makes the core api available via a global and
2339  * returns the udp transport API.
2340  *
2341  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2342  * @return NULL
2343  */
2344 void *
2345 libgnunet_plugin_transport_udp_done (void *cls)
2346 {
2347   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2348   struct Plugin *plugin = api->cls;
2349
2350   if (NULL == plugin)
2351   {
2352     GNUNET_free (api);
2353     return NULL;
2354   }
2355
2356   stop_broadcast (plugin);
2357
2358   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2359   {
2360     GNUNET_SCHEDULER_cancel (plugin->select_task);
2361     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2362   }
2363   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2364   {
2365     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2366     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2367   }
2368
2369   /* Closing sockets */
2370   if (plugin->sockv4 != NULL)
2371   {
2372     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2373     plugin->sockv4 = NULL;
2374   }
2375   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2376   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2377
2378   if (plugin->sockv6 != NULL)
2379   {
2380     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2381     plugin->sockv6 = NULL;
2382
2383     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2384     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2385   }
2386
2387   GNUNET_NAT_unregister (plugin->nat);
2388
2389   if (plugin->defrag_ctxs != NULL)
2390   {
2391     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2392         heap_cleanup_iterator, NULL);
2393     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2394     plugin->defrag_ctxs = NULL;
2395   }
2396   if (plugin->mst != NULL)
2397   {
2398     GNUNET_SERVER_mst_destroy(plugin->mst);
2399     plugin->mst = NULL;
2400   }
2401
2402   /* Clean up leftover messages */
2403   struct UDPMessageWrapper * udpw;
2404   udpw = plugin->ipv4_queue_head;
2405   while (udpw != NULL)
2406   {
2407     struct UDPMessageWrapper *tmp = udpw->next;
2408     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
2409     call_continuation(udpw, GNUNET_SYSERR);
2410     GNUNET_free (udpw);
2411     udpw = tmp;
2412   }
2413   udpw = plugin->ipv6_queue_head;
2414   while (udpw != NULL)
2415   {
2416     struct UDPMessageWrapper *tmp = udpw->next;
2417     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
2418     call_continuation(udpw, GNUNET_SYSERR);
2419     GNUNET_free (udpw);
2420     udpw = tmp;
2421   }
2422
2423   /* Clean up sessions */
2424   LOG (GNUNET_ERROR_TYPE_DEBUG,
2425        "Cleaning up sessions\n");
2426   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2427   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2428
2429   plugin->nat = NULL;
2430   GNUNET_free (plugin);
2431   GNUNET_free (api);
2432   return NULL;
2433 }
2434
2435
2436 /* end of plugin_transport_udp.c */