b14e14357c48df790014df80154277063e44f713
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 struct Session
90 {
91   /**
92    * Which peer is this session for?
93    */
94   struct GNUNET_PeerIdentity target;
95
96   /**
97    * Address of the other peer
98    */
99   const struct sockaddr *sock_addr;
100
101   size_t addrlen;
102
103   /**
104    * Desired delay for next sending we send to other peer
105    */
106   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
107
108   /**
109    * Desired delay for next sending we received from other peer
110    */
111   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
112
113   /**
114    * expected delay for ACKs
115    */
116   struct GNUNET_TIME_Relative last_expected_delay;
117
118   struct GNUNET_ATS_Information ats;
119
120   struct FragmentationContext * frag_ctx;
121
122   unsigned int rc;
123
124   int in_destroy;
125 };
126
127
128 struct SessionCompareContext
129 {
130   struct Session *res;
131   const struct GNUNET_HELLO_Address *addr;
132 };
133
134
135 /**
136  * Closure for 'process_inbound_tokenized_messages'
137  */
138 struct SourceInformation
139 {
140   /**
141    * Sender identity.
142    */
143   struct GNUNET_PeerIdentity sender;
144
145   /**
146    * Source address.
147    */
148   const void *arg;
149
150   /**
151    * Number of bytes in source address.
152    */
153   size_t args;
154
155   struct Session *session;
156 };
157
158
159 /**
160  * Closure for 'find_receive_context'.
161  */
162 struct FindReceiveContext
163 {
164   /**
165    * Where to store the result.
166    */
167   struct DefragContext *rc;
168
169   /**
170    * Address to find.
171    */
172   const struct sockaddr *addr;
173
174   /**
175    * Number of bytes in 'addr'.
176    */
177   socklen_t addr_len;
178
179   struct Session *session;
180 };
181
182
183
184 /**
185  * Data structure to track defragmentation contexts based
186  * on the source of the UDP traffic.
187  */
188 struct DefragContext
189 {
190
191   /**
192    * Defragmentation context.
193    */
194   struct GNUNET_DEFRAGMENT_Context *defrag;
195
196   /**
197    * Source address this receive context is for (allocated at the
198    * end of the struct).
199    */
200   const struct sockaddr *src_addr;
201
202   /**
203    * Reference to master plugin struct.
204    */
205   struct Plugin *plugin;
206
207   /**
208    * Node in the defrag heap.
209    */
210   struct GNUNET_CONTAINER_HeapNode *hnode;
211
212   /**
213    * Length of 'src_addr'
214    */
215   size_t addr_len;
216 };
217
218
219
220 /**
221  * Closure for 'process_inbound_tokenized_messages'
222  */
223 struct FragmentationContext
224 {
225   struct FragmentationContext * next;
226   struct FragmentationContext * prev;
227
228   struct Plugin * plugin;
229   struct GNUNET_FRAGMENT_Context * frag;
230   struct Session * session;
231
232   struct GNUNET_TIME_Absolute timeout;
233
234
235   /**
236    * Function to call upon completion of the transmission.
237    */
238   GNUNET_TRANSPORT_TransmitContinuation cont;
239
240   /**
241    * Closure for 'cont'.
242    */
243   void *cont_cls;
244
245   size_t bytes_to_send;
246 };
247
248
249 struct UDPMessageWrapper
250 {
251   struct Session *session;
252   struct UDPMessageWrapper *prev;
253   struct UDPMessageWrapper *next;
254   char *udp;
255   size_t msg_size;
256
257   struct GNUNET_TIME_Absolute timeout;
258
259   /**
260    * Function to call upon completion of the transmission.
261    */
262   GNUNET_TRANSPORT_TransmitContinuation cont;
263
264   /**
265    * Closure for 'cont'.
266    */
267   void *cont_cls;
268
269   struct FragmentationContext *frag_ctx;
270
271 };
272
273
274 /**
275  * UDP ACK Message-Packet header (after defragmentation).
276  */
277 struct UDP_ACK_Message
278 {
279   /**
280    * Message header.
281    */
282   struct GNUNET_MessageHeader header;
283
284   /**
285    * Desired delay for flow control
286    */
287   uint32_t delay;
288
289   /**
290    * What is the identity of the sender
291    */
292   struct GNUNET_PeerIdentity sender;
293
294 };
295
296
297 /**
298  * We have been notified that our readset has something to read.  We don't
299  * know which socket needs to be read, so we have to check each one
300  * Then reschedule this function to be called again once more is available.
301  *
302  * @param cls the plugin handle
303  * @param tc the scheduling context (for rescheduling this function again)
304  */
305 static void
306 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
307
308 /**
309  * We have been notified that our readset has something to read.  We don't
310  * know which socket needs to be read, so we have to check each one
311  * Then reschedule this function to be called again once more is available.
312  *
313  * @param cls the plugin handle
314  * @param tc the scheduling context (for rescheduling this function again)
315  */
316 static void
317 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
318
319 /**
320  * Function called for a quick conversion of the binary address to
321  * a numeric address.  Note that the caller must not free the
322  * address and that the next call to this function is allowed
323  * to override the address again.
324  *
325  * @param cls closure
326  * @param addr binary address
327  * @param addrlen length of the address
328  * @return string representing the same address
329  */
330 const char *
331 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
332 {
333   static char rbuf[INET6_ADDRSTRLEN + 10];
334   char buf[INET6_ADDRSTRLEN];
335   const void *sb;
336   struct in_addr a4;
337   struct in6_addr a6;
338   const struct IPv4UdpAddress *t4;
339   const struct IPv6UdpAddress *t6;
340   int af;
341   uint16_t port;
342
343   if (addrlen == sizeof (struct IPv6UdpAddress))
344   {
345     t6 = addr;
346     af = AF_INET6;
347     port = ntohs (t6->u6_port);
348     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
349     sb = &a6;
350   }
351   else if (addrlen == sizeof (struct IPv4UdpAddress))
352   {
353     t4 = addr;
354     af = AF_INET;
355     port = ntohs (t4->u4_port);
356     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
357     sb = &a4;
358   }
359   else
360   {
361     GNUNET_break_op (0);
362     return NULL;
363   }
364   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
365   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
366                    buf, port);
367   return rbuf;
368 }
369
370
371 /**
372  * Function called to convert a string address to
373  * a binary address.
374  *
375  * @param cls closure ('struct Plugin*')
376  * @param addr string address
377  * @param addrlen length of the address
378  * @param buf location to store the buffer
379  * @param added location to store the number of bytes in the buffer.
380  *        If the function returns GNUNET_SYSERR, its contents are undefined.
381  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
382  */
383 int
384 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
385     void **buf, size_t *added)
386 {
387   struct sockaddr_storage socket_address;
388
389   if ((NULL == addr) || (addrlen == 0))
390   {
391     GNUNET_break (0);
392     return GNUNET_SYSERR;
393   }
394
395   if ('\0' != addr[addrlen - 1])
396   {
397     return GNUNET_SYSERR;
398   }
399
400   if (strlen (addr) != addrlen - 1)
401   {
402     return GNUNET_SYSERR;
403   }
404
405   int ret = GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
406     &socket_address);
407
408   if (ret != GNUNET_OK)
409   {
410     return GNUNET_SYSERR;
411   }
412
413   if (socket_address.ss_family == AF_INET)
414   {
415     struct IPv4UdpAddress *u4;
416     struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
417     u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
418     u4->ipv4_addr = in4->sin_addr.s_addr;
419     u4->u4_port = in4->sin_port;
420     *buf = u4;
421     *added = sizeof (struct IPv4UdpAddress);
422     return GNUNET_OK;
423   }
424   else if (socket_address.ss_family == AF_INET6)
425   {
426     struct IPv6UdpAddress *u6;
427     struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
428     u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
429     u6->ipv6_addr = in6->sin6_addr;
430     u6->u6_port = in6->sin6_port;
431     *buf = u6;
432     *added = sizeof (struct IPv6UdpAddress);
433     return GNUNET_OK;
434   }
435   return GNUNET_SYSERR;
436 }
437
438
439 /**
440  * Append our port and forward the result.
441  *
442  * @param cls a 'struct PrettyPrinterContext'
443  * @param hostname result from DNS resolver
444  */
445 static void
446 append_port (void *cls, const char *hostname)
447 {
448   struct PrettyPrinterContext *ppc = cls;
449   char *ret;
450
451   if (hostname == NULL)
452   {
453     ppc->asc (ppc->asc_cls, NULL);
454     GNUNET_free (ppc);
455     return;
456   }
457   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
458   ppc->asc (ppc->asc_cls, ret);
459   GNUNET_free (ret);
460 }
461
462
463 /**
464  * Convert the transports address to a nice, human-readable
465  * format.
466  *
467  * @param cls closure
468  * @param type name of the transport that generated the address
469  * @param addr one of the addresses of the host, NULL for the last address
470  *        the specific address format depends on the transport
471  * @param addrlen length of the address
472  * @param numeric should (IP) addresses be displayed in numeric form?
473  * @param timeout after how long should we give up?
474  * @param asc function to call on each string
475  * @param asc_cls closure for asc
476  */
477 static void
478 udp_plugin_address_pretty_printer (void *cls, const char *type,
479                                    const void *addr, size_t addrlen,
480                                    int numeric,
481                                    struct GNUNET_TIME_Relative timeout,
482                                    GNUNET_TRANSPORT_AddressStringCallback asc,
483                                    void *asc_cls)
484 {
485   struct PrettyPrinterContext *ppc;
486   const void *sb;
487   size_t sbs;
488   struct sockaddr_in a4;
489   struct sockaddr_in6 a6;
490   const struct IPv4UdpAddress *u4;
491   const struct IPv6UdpAddress *u6;
492   uint16_t port;
493
494   if (addrlen == sizeof (struct IPv6UdpAddress))
495   {
496     u6 = addr;
497     memset (&a6, 0, sizeof (a6));
498     a6.sin6_family = AF_INET6;
499 #if HAVE_SOCKADDR_IN_SIN_LEN
500     a6.sin6_len = sizeof (a6);
501 #endif
502     a6.sin6_port = u6->u6_port;
503     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
504     port = ntohs (u6->u6_port);
505     sb = &a6;
506     sbs = sizeof (a6);
507   }
508   else if (addrlen == sizeof (struct IPv4UdpAddress))
509   {
510     u4 = addr;
511     memset (&a4, 0, sizeof (a4));
512     a4.sin_family = AF_INET;
513 #if HAVE_SOCKADDR_IN_SIN_LEN
514     a4.sin_len = sizeof (a4);
515 #endif
516     a4.sin_port = u4->u4_port;
517     a4.sin_addr.s_addr = u4->ipv4_addr;
518     port = ntohs (u4->u4_port);
519     sb = &a4;
520     sbs = sizeof (a4);
521   }
522   else if (0 == addrlen)
523   {
524     asc (asc_cls, "<inbound connection>");
525     asc (asc_cls, NULL);
526     return;
527   }
528   else
529   {
530     /* invalid address */
531     GNUNET_break_op (0);
532     asc (asc_cls, NULL);
533     return;
534   }
535   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
536   ppc->asc = asc;
537   ppc->asc_cls = asc_cls;
538   ppc->port = port;
539   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
540 }
541
542 static void
543 call_continuation (struct UDPMessageWrapper *udpw, int result)
544 {
545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
546       "Calling continuation for %u byte message to `%s' with result %s\n",
547       udpw->msg_size, GNUNET_i2s (&udpw->session->target),
548       (GNUNET_OK == result) ? "OK" : "SYSERR");
549   if (NULL != udpw->cont)
550   {
551     udpw->cont (udpw->cont_cls, &udpw->session->target,result);
552   }
553
554 }
555
556 /**
557  * Check if the given port is plausible (must be either our listen
558  * port or our advertised port).  If it is neither, we return
559  * GNUNET_SYSERR.
560  *
561  * @param plugin global variables
562  * @param in_port port number to check
563  * @return GNUNET_OK if port is either open_port or adv_port
564  */
565 static int
566 check_port (struct Plugin *plugin, uint16_t in_port)
567 {
568   if ((in_port == plugin->port) || (in_port == plugin->aport))
569     return GNUNET_OK;
570   return GNUNET_SYSERR;
571 }
572
573
574
575 /**
576  * Function that will be called to check if a binary address for this
577  * plugin is well-formed and corresponds to an address for THIS peer
578  * (as per our configuration).  Naturally, if absolutely necessary,
579  * plugins can be a bit conservative in their answer, but in general
580  * plugins should make sure that the address does not redirect
581  * traffic to a 3rd party that might try to man-in-the-middle our
582  * traffic.
583  *
584  * @param cls closure, should be our handle to the Plugin
585  * @param addr pointer to the address
586  * @param addrlen length of addr
587  * @return GNUNET_OK if this is a plausible address for this peer
588  *         and transport, GNUNET_SYSERR if not
589  *
590  */
591 static int
592 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
593 {
594   struct Plugin *plugin = cls;
595   struct IPv4UdpAddress *v4;
596   struct IPv6UdpAddress *v6;
597
598   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
599       (addrlen != sizeof (struct IPv6UdpAddress)))
600   {
601     GNUNET_break_op (0);
602     return GNUNET_SYSERR;
603   }
604   if (addrlen == sizeof (struct IPv4UdpAddress))
605   {
606     v4 = (struct IPv4UdpAddress *) addr;
607     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
608       return GNUNET_SYSERR;
609     if (GNUNET_OK !=
610         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
611                                  sizeof (struct in_addr)))
612       return GNUNET_SYSERR;
613   }
614   else
615   {
616     v6 = (struct IPv6UdpAddress *) addr;
617     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
618     {
619       GNUNET_break_op (0);
620       return GNUNET_SYSERR;
621     }
622     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
623       return GNUNET_SYSERR;
624     if (GNUNET_OK !=
625         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
626                                  sizeof (struct in6_addr)))
627       return GNUNET_SYSERR;
628   }
629   return GNUNET_OK;
630 }
631
632
633 /**
634  * Task to free resources associated with a session.
635  *
636  * @param s session to free
637  */
638 static void
639 free_session (struct Session *s)
640 {
641   if (s->frag_ctx != NULL)
642   {
643     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
644     GNUNET_free (s->frag_ctx);
645     s->frag_ctx = NULL;
646   }
647   GNUNET_free (s);
648 }
649
650
651 /**
652  * Destroy a session, plugin is being unloaded.
653  *
654  * @param cls unused
655  * @param key hash of public key of target peer
656  * @param value a 'struct PeerSession*' to clean up
657  * @return GNUNET_OK (continue to iterate)
658  */
659 static int
660 disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
661 {
662   struct Plugin *plugin = cls;
663   struct Session *s = value;
664   struct UDPMessageWrapper *udpw;
665   struct UDPMessageWrapper *next;
666
667   GNUNET_assert (GNUNET_YES != s->in_destroy);
668   LOG (GNUNET_ERROR_TYPE_DEBUG,
669        "Session %p to peer `%s' address ended \n",
670          s,
671          GNUNET_i2s (&s->target),
672          GNUNET_a2s (s->sock_addr, s->addrlen));
673   next = plugin->ipv4_queue_head;
674   while (NULL != (udpw = next))
675   {
676     next = udpw->next;
677     if (udpw->session == s)
678     {
679       GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
680       call_continuation(udpw, GNUNET_SYSERR);
681       GNUNET_free (udpw);
682     }
683   }
684   next = plugin->ipv6_queue_head;
685   while (NULL != (udpw = next))
686   {
687     next = udpw->next;
688     if (udpw->session == s)
689     {
690       GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
691       call_continuation(udpw, GNUNET_SYSERR);
692       GNUNET_free (udpw);
693     }
694     udpw = next;
695   }
696   plugin->env->session_end (plugin->env->cls, &s->target, s);
697
698   if (NULL != s->frag_ctx)
699   {
700     if (NULL != s->frag_ctx->cont)
701     {
702       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR);
703       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
704           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
705           GNUNET_i2s (&s->target));
706     }
707   }
708
709   GNUNET_assert (GNUNET_YES ==
710                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
711                                                        &s->target.hashPubKey,
712                                                        s));
713   GNUNET_STATISTICS_set(plugin->env->stats,
714                         "# UDP sessions active",
715                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
716                         GNUNET_NO);
717   if (s->rc > 0)
718     s->in_destroy = GNUNET_YES;
719   else
720     free_session (s);
721   return GNUNET_OK;
722 }
723
724
725 /**
726  * Disconnect from a remote node.  Clean up session if we have one for this peer
727  *
728  * @param cls closure for this call (should be handle to Plugin)
729  * @param target the peeridentity of the peer to disconnect
730  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
731  */
732 static void
733 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
734 {
735   struct Plugin *plugin = cls;
736   GNUNET_assert (plugin != NULL);
737
738   GNUNET_assert (target != NULL);
739   LOG (GNUNET_ERROR_TYPE_DEBUG,
740        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
741   /* Clean up sessions */
742   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
743 }
744
745 static struct Session *
746 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
747                 const void *addr, size_t addrlen,
748                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
749 {
750   struct Session *s;
751   const struct IPv4UdpAddress *t4;
752   const struct IPv6UdpAddress *t6;
753   struct sockaddr_in *v4;
754   struct sockaddr_in6 *v6;
755   size_t len;
756
757   switch (addrlen)
758   {
759   case sizeof (struct IPv4UdpAddress):
760     if (NULL == plugin->sockv4)
761     {
762       return NULL;
763     }
764     t4 = addr;
765     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
766     len = sizeof (struct sockaddr_in);
767     v4 = (struct sockaddr_in *) &s[1];
768     v4->sin_family = AF_INET;
769 #if HAVE_SOCKADDR_IN_SIN_LEN
770     v4->sin_len = sizeof (struct sockaddr_in);
771 #endif
772     v4->sin_port = t4->u4_port;
773     v4->sin_addr.s_addr = t4->ipv4_addr;
774     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
775     break;
776   case sizeof (struct IPv6UdpAddress):
777     if (NULL == plugin->sockv6)
778     {
779       return NULL;
780     }
781     t6 = addr;
782     s =
783         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
784     len = sizeof (struct sockaddr_in6);
785     v6 = (struct sockaddr_in6 *) &s[1];
786     v6->sin6_family = AF_INET6;
787 #if HAVE_SOCKADDR_IN_SIN_LEN
788     v6->sin6_len = sizeof (struct sockaddr_in6);
789 #endif
790     v6->sin6_port = t6->u6_port;
791     v6->sin6_addr = t6->ipv6_addr;
792     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
793     break;
794   default:
795     /* Must have a valid address to send to */
796     GNUNET_break_op (0);
797     return NULL;
798   }
799
800   s->addrlen = len;
801   s->target = *target;
802   s->sock_addr = (const struct sockaddr *) &s[1];
803   s->flow_delay_for_other_peer = GNUNET_TIME_relative_get_zero();
804   s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
805   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
806
807   return s;
808 }
809
810
811 static int
812 session_cmp_it (void *cls,
813                 const GNUNET_HashCode * key,
814                 void *value)
815 {
816   struct SessionCompareContext * cctx = cls;
817   const struct GNUNET_HELLO_Address *address = cctx->addr;
818   struct Session *s = value;
819
820   socklen_t s_addrlen = s->addrlen;
821
822 #if VERBOSE_UDP
823   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
824       udp_address_to_string (NULL, (void *) address->address, address->address_length),
825       GNUNET_a2s (s->sock_addr, s->addrlen));
826 #endif
827
828   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
829       (s_addrlen == sizeof (struct sockaddr_in)))
830   {
831     struct IPv4UdpAddress * u4 = NULL;
832     u4 = (struct IPv4UdpAddress *) address->address;
833     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
834     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
835         (u4->u4_port == s4->sin_port))
836     {
837       cctx->res = s;
838       return GNUNET_NO;
839     }
840
841   }
842   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
843       (s_addrlen == sizeof (struct sockaddr_in6)))
844   {
845     struct IPv6UdpAddress * u6 = NULL;
846     u6 = (struct IPv6UdpAddress *) address->address;
847     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
848     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
849         (u6->u6_port == s6->sin6_port))
850     {
851       cctx->res = s;
852       return GNUNET_NO;
853     }
854   }
855
856
857   return GNUNET_YES;
858 }
859
860
861 /**
862  * Creates a new outbound session the transport service will use to send data to the
863  * peer
864  *
865  * @param cls the plugin
866  * @param address the address
867  * @return the session or NULL of max connections exceeded
868  */
869 static struct Session *
870 udp_plugin_get_session (void *cls,
871                   const struct GNUNET_HELLO_Address *address)
872 {
873   struct Session * s = NULL;
874   struct Plugin * plugin = cls;
875   struct IPv6UdpAddress * udp_a6;
876   struct IPv4UdpAddress * udp_a4;
877
878   GNUNET_assert (plugin != NULL);
879   GNUNET_assert (address != NULL);
880
881
882   if ((address->address == NULL) ||
883       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
884       (address->address_length != sizeof (struct IPv6UdpAddress))))
885   {
886     GNUNET_break (0);
887     return NULL;
888   }
889
890   if (address->address_length == sizeof (struct IPv4UdpAddress))
891   {
892     if (plugin->sockv4 == NULL)
893       return NULL;
894     udp_a4 = (struct IPv4UdpAddress *) address->address;
895     if (udp_a4->u4_port == 0)
896       return NULL;
897   }
898
899   if (address->address_length == sizeof (struct IPv6UdpAddress))
900   {
901     if (plugin->sockv6 == NULL)
902       return NULL;
903     udp_a6 = (struct IPv6UdpAddress *) address->address;
904     if (udp_a6->u6_port == 0)
905       return NULL;
906   }
907
908   /* check if session already exists */
909   struct SessionCompareContext cctx;
910   cctx.addr = address;
911   cctx.res = NULL;
912 #if VERBOSE_UDP
913   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length));
914 #endif
915   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
916   if (cctx.res != NULL)
917   {
918 #if VERBOSE_UDP
919     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
920 #endif
921     return cctx.res;
922   }
923
924   /* otherwise create new */
925   s = create_session (plugin,
926       &address->peer,
927       address->address,
928       address->address_length,
929       NULL, NULL);
930 #if VERBOSE
931     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
932               "Creating new session %p for peer `%s' address `%s'\n",
933               s,
934               GNUNET_i2s(&address->peer),
935               udp_address_to_string(NULL,address->address,address->address_length));
936 #endif
937   GNUNET_assert (GNUNET_OK ==
938                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
939                                                     &s->target.hashPubKey,
940                                                     s,
941                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
942
943   GNUNET_STATISTICS_set(plugin->env->stats,
944                         "# UDP sessions active",
945                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
946                         GNUNET_NO);
947
948   return s;
949 }
950
951
952 static void 
953 enqueue (struct Plugin *plugin, struct UDPMessageWrapper * udpw)
954 {
955
956   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
957     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
958   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
959     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
960 }
961
962
963 /**
964  * Fragment message was transmitted via UDP, let fragmentation know
965  * to send the next fragment now.
966  *
967  * @param cls the 'struct UDPMessageWrapper' of the fragment
968  * @param target destination peer (ignored)
969  * @param result GNUNET_OK on success (ignored)
970  */
971 static void
972 send_next_fragment (void *cls,
973                     const struct GNUNET_PeerIdentity *target,
974                     int result)
975 {
976   struct UDPMessageWrapper *udpw = cls;
977
978   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
979 }
980
981
982 /**
983  * Function that is called with messages created by the fragmentation
984  * module.  In the case of the 'proc' callback of the
985  * GNUNET_FRAGMENT_context_create function, this function must
986  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
987  *
988  * @param cls closure, the 'struct FragmentationContext'
989  * @param msg the message that was created
990  */
991 static void
992 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
993 {
994   struct FragmentationContext *frag_ctx = cls;
995   struct Plugin *plugin = frag_ctx->plugin;
996   struct UDPMessageWrapper * udpw;
997   struct Session *s;
998
999   size_t msg_len = ntohs (msg->size);
1000
1001 #if VERBOSE_UDP
1002   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
1003 #endif
1004
1005   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
1006   udpw->session = frag_ctx->session;
1007   s = udpw->session;
1008   udpw->udp = (char *) &udpw[1];
1009
1010   udpw->msg_size = msg_len;
1011   udpw->cont = &send_next_fragment;
1012   udpw->cont_cls = udpw;
1013   udpw->timeout = frag_ctx->timeout;
1014   udpw->frag_ctx = frag_ctx;
1015   memcpy (udpw->udp, msg, msg_len);
1016
1017   enqueue (plugin, udpw);
1018
1019
1020   if (s->addrlen == sizeof (struct sockaddr_in))
1021   {
1022     if (plugin->with_v4_ws == GNUNET_NO)
1023     {
1024       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1025         GNUNET_SCHEDULER_cancel(plugin->select_task);
1026
1027       plugin->select_task =
1028           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1029                                        GNUNET_TIME_UNIT_FOREVER_REL,
1030                                        plugin->rs_v4,
1031                                        plugin->ws_v4,
1032                                        &udp_plugin_select, plugin);
1033       plugin->with_v4_ws = GNUNET_YES;
1034     }
1035   }
1036
1037   else if (s->addrlen == sizeof (struct sockaddr_in6))
1038   {
1039     if (plugin->with_v6_ws == GNUNET_NO)
1040     {
1041       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1042         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1043
1044       plugin->select_task_v6 =
1045           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1046                                        GNUNET_TIME_UNIT_FOREVER_REL,
1047                                        plugin->rs_v6,
1048                                        plugin->ws_v6,
1049                                        &udp_plugin_select_v6, plugin);
1050       plugin->with_v6_ws = GNUNET_YES;
1051     }
1052   }
1053 }
1054
1055
1056 /**
1057  * Function that can be used by the transport service to transmit
1058  * a message using the plugin.   Note that in the case of a
1059  * peer disconnecting, the continuation MUST be called
1060  * prior to the disconnect notification itself.  This function
1061  * will be called with this peer's HELLO message to initiate
1062  * a fresh connection to another peer.
1063  *
1064  * @param cls closure
1065  * @param s which session must be used
1066  * @param msgbuf the message to transmit
1067  * @param msgbuf_size number of bytes in 'msgbuf'
1068  * @param priority how important is the message (most plugins will
1069  *                 ignore message priority and just FIFO)
1070  * @param to how long to wait at most for the transmission (does not
1071  *                require plugins to discard the message after the timeout,
1072  *                just advisory for the desired delay; most plugins will ignore
1073  *                this as well)
1074  * @param cont continuation to call once the message has
1075  *        been transmitted (or if the transport is ready
1076  *        for the next transmission call; or if the
1077  *        peer disconnected...); can be NULL
1078  * @param cont_cls closure for cont
1079  * @return number of bytes used (on the physical network, with overheads);
1080  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1081  *         and does NOT mean that the message was not transmitted (DV)
1082  */
1083 static ssize_t
1084 udp_plugin_send (void *cls,
1085                   struct Session *s,
1086                   const char *msgbuf, size_t msgbuf_size,
1087                   unsigned int priority,
1088                   struct GNUNET_TIME_Relative to,
1089                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1090 {
1091   struct Plugin *plugin = cls;
1092   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
1093
1094   struct UDPMessageWrapper * udpw;
1095   struct UDPMessage *udp;
1096   char mbuf[mlen];
1097   GNUNET_assert (plugin != NULL);
1098   GNUNET_assert (s != NULL);
1099
1100   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1101     return GNUNET_SYSERR;
1102
1103    if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1104      return GNUNET_SYSERR;
1105
1106
1107   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1108   {
1109     GNUNET_break (0);
1110     return GNUNET_SYSERR;
1111   }
1112
1113   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1114   {
1115     GNUNET_break (0);
1116     return GNUNET_SYSERR;
1117   }
1118
1119   LOG (GNUNET_ERROR_TYPE_DEBUG,
1120        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1121          mlen,
1122          GNUNET_i2s (&s->target),
1123          GNUNET_a2s(s->sock_addr, s->addrlen));
1124
1125   /* Message */
1126   udp = (struct UDPMessage *) mbuf;
1127   udp->header.size = htons (mlen);
1128   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1129   udp->reserved = htonl (0);
1130   udp->sender = *plugin->env->my_identity;
1131
1132   if (mlen <= UDP_MTU)
1133   {
1134     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
1135     udpw->session = s;
1136     udpw->udp = (char *) &udpw[1];
1137     udpw->msg_size = mlen;
1138     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1139     udpw->cont = cont;
1140     udpw->cont_cls = cont_cls;
1141     udpw->frag_ctx = NULL;
1142     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
1143     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1144
1145     enqueue (plugin, udpw);
1146   }
1147   else
1148   {
1149     LOG (GNUNET_ERROR_TYPE_DEBUG,
1150          "UDP has to fragment message \n");
1151     if  (s->frag_ctx != NULL)
1152       return GNUNET_SYSERR;
1153     memcpy (&udp[1], msgbuf, msgbuf_size);
1154     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
1155
1156     frag_ctx->plugin = plugin;
1157     frag_ctx->session = s;
1158     frag_ctx->cont = cont;
1159     frag_ctx->cont_cls = cont_cls;
1160     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1161     frag_ctx->bytes_to_send = mlen;
1162     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1163               UDP_MTU,
1164               &plugin->tracker,
1165               s->last_expected_delay,
1166               &udp->header,
1167               &enqueue_fragment,
1168               frag_ctx);
1169
1170     s->frag_ctx = frag_ctx;
1171   }
1172
1173   if (s->addrlen == sizeof (struct sockaddr_in))
1174   {
1175     if (plugin->with_v4_ws == GNUNET_NO)
1176     {
1177       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1178         GNUNET_SCHEDULER_cancel(plugin->select_task);
1179
1180       plugin->select_task =
1181           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1182                                        GNUNET_TIME_UNIT_FOREVER_REL,
1183                                        plugin->rs_v4,
1184                                        plugin->ws_v4,
1185                                        &udp_plugin_select, plugin);
1186       plugin->with_v4_ws = GNUNET_YES;
1187     }
1188   }
1189
1190   else if (s->addrlen == sizeof (struct sockaddr_in6))
1191   {
1192     if (plugin->with_v6_ws == GNUNET_NO)
1193     {
1194       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1195         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1196
1197       plugin->select_task_v6 =
1198         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1199                                      GNUNET_TIME_UNIT_FOREVER_REL,
1200                                      plugin->rs_v6,
1201                                      plugin->ws_v6,
1202                                      &udp_plugin_select_v6, plugin);
1203       plugin->with_v6_ws = GNUNET_YES;
1204     }
1205   }
1206
1207   return mlen;
1208 }
1209
1210
1211 /**
1212  * Our external IP address/port mapping has changed.
1213  *
1214  * @param cls closure, the 'struct LocalAddrList'
1215  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1216  *     the previous (now invalid) one
1217  * @param addr either the previous or the new public IP address
1218  * @param addrlen actual lenght of the address
1219  */
1220 static void
1221 udp_nat_port_map_callback (void *cls, int add_remove,
1222                            const struct sockaddr *addr, socklen_t addrlen)
1223 {
1224   struct Plugin *plugin = cls;
1225   struct IPv4UdpAddress u4;
1226   struct IPv6UdpAddress u6;
1227   void *arg;
1228   size_t args;
1229
1230   /* convert 'addr' to our internal format */
1231   switch (addr->sa_family)
1232   {
1233   case AF_INET:
1234     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1235     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1236     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1237     arg = &u4;
1238     args = sizeof (u4);
1239     break;
1240   case AF_INET6:
1241     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1242     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1243             sizeof (struct in6_addr));
1244     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1245     arg = &u6;
1246     args = sizeof (u6);
1247     break;
1248   default:
1249     GNUNET_break (0);
1250     return;
1251   }
1252   /* modify our published address list */
1253   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args);
1254 }
1255
1256
1257
1258 /**
1259  * Message tokenizer has broken up an incomming message. Pass it on
1260  * to the service.
1261  *
1262  * @param cls the 'struct Plugin'
1263  * @param client the 'struct SourceInformation'
1264  * @param hdr the actual message
1265  */
1266 static void
1267 process_inbound_tokenized_messages (void *cls, void *client,
1268                                     const struct GNUNET_MessageHeader *hdr)
1269 {
1270   struct Plugin *plugin = cls;
1271   struct SourceInformation *si = client;
1272   struct GNUNET_ATS_Information ats[2];
1273   struct GNUNET_TIME_Relative delay;
1274
1275   GNUNET_assert (si->session != NULL);
1276   if (GNUNET_YES == si->session->in_destroy)
1277     return; 
1278   /* setup ATS */
1279   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1280   ats[0].value = htonl (1);
1281   ats[1] = si->session->ats;
1282   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1283
1284   delay = plugin->env->receive (plugin->env->cls,
1285                 &si->sender,
1286                 hdr,
1287                 (const struct GNUNET_ATS_Information *) &ats, 2,
1288                 NULL,
1289                 si->arg,
1290                 si->args);
1291   si->session->flow_delay_for_other_peer = delay;
1292 }
1293
1294
1295 /**
1296  * We've received a UDP Message.  Process it (pass contents to main service).
1297  *
1298  * @param plugin plugin context
1299  * @param msg the message
1300  * @param sender_addr sender address
1301  * @param sender_addr_len number of bytes in sender_addr
1302  */
1303 static void
1304 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1305                      const struct sockaddr *sender_addr,
1306                      socklen_t sender_addr_len)
1307 {
1308   struct SourceInformation si;
1309   struct Session * s;
1310   struct IPv4UdpAddress u4;
1311   struct IPv6UdpAddress u6;
1312   const void *arg;
1313   size_t args;
1314
1315   if (0 != ntohl (msg->reserved))
1316   {
1317     GNUNET_break_op (0);
1318     return;
1319   }
1320   if (ntohs (msg->header.size) <
1321       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1322   {
1323     GNUNET_break_op (0);
1324     return;
1325   }
1326
1327   /* convert address */
1328   switch (sender_addr->sa_family)
1329   {
1330   case AF_INET:
1331     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1332     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1333     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1334     arg = &u4;
1335     args = sizeof (u4);
1336     break;
1337   case AF_INET6:
1338     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1339     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1340     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1341     arg = &u6;
1342     args = sizeof (u6);
1343     break;
1344   default:
1345     GNUNET_break (0);
1346     return;
1347   }
1348   LOG (GNUNET_ERROR_TYPE_DEBUG,
1349        "Received message with %u bytes from peer `%s' at `%s'\n",
1350        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1351        GNUNET_a2s (sender_addr, sender_addr_len));
1352
1353   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1354   s = udp_plugin_get_session(plugin, address);
1355   GNUNET_free (address);
1356
1357   /* iterate over all embedded messages */
1358   si.session = s;
1359   si.sender = msg->sender;
1360   si.arg = arg;
1361   si.args = args;
1362   s->rc++;
1363   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1364                              ntohs (msg->header.size) -
1365                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1366   s->rc--;
1367   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1368     free_session (s);
1369 }
1370
1371
1372 /**
1373  * Scan the heap for a receive context with the given address.
1374  *
1375  * @param cls the 'struct FindReceiveContext'
1376  * @param node internal node of the heap
1377  * @param element value stored at the node (a 'struct ReceiveContext')
1378  * @param cost cost associated with the node
1379  * @return GNUNET_YES if we should continue to iterate,
1380  *         GNUNET_NO if not.
1381  */
1382 static int
1383 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1384                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1385 {
1386   struct FindReceiveContext *frc = cls;
1387   struct DefragContext *e = element;
1388
1389   if ((frc->addr_len == e->addr_len) &&
1390       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1391   {
1392     frc->rc = e;
1393     return GNUNET_NO;
1394   }
1395   return GNUNET_YES;
1396 }
1397
1398
1399 /**
1400  * Process a defragmented message.
1401  *
1402  * @param cls the 'struct ReceiveContext'
1403  * @param msg the message
1404  */
1405 static void
1406 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1407 {
1408   struct DefragContext *rc = cls;
1409
1410   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1411   {
1412     GNUNET_break (0);
1413     return;
1414   }
1415   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1416   {
1417     GNUNET_break (0);
1418     return;
1419   }
1420   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1421                        rc->src_addr, rc->addr_len);
1422 }
1423
1424 struct LookupContext
1425 {
1426   const struct sockaddr * addr;
1427   size_t addrlen;
1428
1429   struct Session *res;
1430 };
1431
1432 static int
1433 lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value)
1434 {
1435   struct LookupContext *l_ctx = cls;
1436   struct Session * s = value;
1437
1438   if ((s->addrlen == l_ctx->addrlen) &&
1439       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1440   {
1441     l_ctx->res = s;
1442     return GNUNET_NO;
1443   }
1444   return GNUNET_YES;
1445 }
1446
1447 /**
1448  * Transmit an acknowledgement.
1449  *
1450  * @param cls the 'struct ReceiveContext'
1451  * @param id message ID (unused)
1452  * @param msg ack to transmit
1453  */
1454 static void
1455 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1456 {
1457   struct DefragContext *rc = cls;
1458
1459   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1460   struct UDP_ACK_Message *udp_ack;
1461   uint32_t delay = 0;
1462   struct UDPMessageWrapper *udpw;
1463   struct Session *s;
1464
1465   struct LookupContext l_ctx;
1466   l_ctx.addr = rc->src_addr;
1467   l_ctx.addrlen = rc->addr_len;
1468   l_ctx.res = NULL;
1469   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1470       &lookup_session_by_addr_it,
1471       &l_ctx);
1472   s = l_ctx.res;
1473
1474   if (NULL == s)
1475     return;
1476
1477   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1478     delay = s->flow_delay_for_other_peer.rel_value;
1479
1480   LOG (GNUNET_ERROR_TYPE_DEBUG,
1481        "Sending ACK to `%s' including delay of %u ms\n",
1482        GNUNET_a2s (rc->src_addr,
1483                    (rc->src_addr->sa_family ==
1484                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1485                                                                      sockaddr_in6)),
1486        delay);
1487   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
1488   udpw->cont = NULL;
1489   udpw->cont_cls = NULL;
1490   udpw->frag_ctx = NULL;
1491   udpw->msg_size = msize;
1492   udpw->session = s;
1493   udpw->timeout = GNUNET_TIME_absolute_get_forever();
1494   udpw->udp = (char *)&udpw[1];
1495
1496   udp_ack = (struct UDP_ACK_Message *) udpw->udp;
1497   udp_ack->header.size = htons ((uint16_t) msize);
1498   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1499   udp_ack->delay = htonl (delay);
1500   udp_ack->sender = *rc->plugin->env->my_identity;
1501   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1502
1503   enqueue (rc->plugin, udpw);
1504 }
1505
1506
1507 static void read_process_msg (struct Plugin *plugin,
1508     const struct GNUNET_MessageHeader *msg,
1509     char *addr,
1510     socklen_t fromlen)
1511 {
1512   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1513   {
1514     GNUNET_break_op (0);
1515     return;
1516   }
1517   process_udp_message (plugin, (const struct UDPMessage *) msg,
1518                        (const struct sockaddr *) addr, fromlen);
1519   return;
1520 }
1521
1522 static void read_process_ack (struct Plugin *plugin,
1523     const struct GNUNET_MessageHeader *msg,
1524     char *addr,
1525     socklen_t fromlen)
1526 {
1527   const struct GNUNET_MessageHeader *ack;
1528   const struct UDP_ACK_Message *udp_ack;
1529   struct LookupContext l_ctx;
1530   struct Session *s = NULL;
1531   struct GNUNET_TIME_Relative flow_delay;
1532
1533   if (ntohs (msg->size) <
1534       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1535   {
1536     GNUNET_break_op (0);
1537     return;
1538   }
1539
1540   udp_ack = (const struct UDP_ACK_Message *) msg;
1541
1542   l_ctx.addr = (const struct sockaddr *) addr;
1543   l_ctx.addrlen = fromlen;
1544   l_ctx.res = NULL;
1545   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1546       &lookup_session_by_addr_it,
1547       &l_ctx);
1548   s = l_ctx.res;
1549
1550   if ((s == NULL) || (s->frag_ctx == NULL))
1551     return;
1552
1553   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1554   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1555        flow_delay.rel_value);
1556   s->flow_delay_from_other_peer =
1557       GNUNET_TIME_relative_to_absolute (flow_delay);
1558
1559   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1560   if (ntohs (ack->size) !=
1561       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1562   {
1563     GNUNET_break_op (0);
1564     return;
1565   }
1566
1567   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1568   {
1569   LOG (GNUNET_ERROR_TYPE_DEBUG,
1570        "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1571        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1572        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1573     return;
1574   }
1575
1576   LOG (GNUNET_ERROR_TYPE_DEBUG,
1577        "FULL MESSAGE ACKed\n",
1578        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1579        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1580   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1581
1582   struct UDPMessageWrapper * udpw = NULL;
1583   struct UDPMessageWrapper * tmp = NULL;
1584   if (s->addrlen == sizeof (struct sockaddr_in6))
1585   {
1586     udpw = plugin->ipv6_queue_head;
1587     while (udpw!= NULL)
1588     {
1589       tmp = udpw->next;
1590       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1591       {
1592         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1593         GNUNET_free (udpw);
1594       }
1595       udpw = tmp;
1596     }
1597   }
1598   if (s->addrlen == sizeof (struct sockaddr_in))
1599   {
1600     udpw = plugin->ipv4_queue_head;
1601     while (udpw!= NULL)
1602     {
1603       tmp = udpw->next;
1604       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1605       {
1606         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1607         GNUNET_free (udpw);
1608       }
1609       udpw = tmp;
1610     }
1611   }
1612
1613   if (s->frag_ctx->cont != NULL)
1614   {
1615     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1616         "Calling continuation for fragmented message to `%s' with result %s\n",
1617         GNUNET_i2s (&s->target), "OK");
1618     s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
1619   }
1620
1621   GNUNET_free (s->frag_ctx);
1622   s->frag_ctx = NULL;
1623   return;
1624 }
1625
1626 static void read_process_fragment (struct Plugin *plugin,
1627     const struct GNUNET_MessageHeader *msg,
1628     char *addr,
1629     socklen_t fromlen)
1630 {
1631   struct DefragContext *d_ctx;
1632   struct GNUNET_TIME_Absolute now;
1633   struct FindReceiveContext frc;
1634
1635
1636   frc.rc = NULL;
1637   frc.addr = (const struct sockaddr *) addr;
1638   frc.addr_len = fromlen;
1639
1640   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1641        (unsigned int) ntohs (msg->size),
1642        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1643   /* Lookup existing receive context for this address */
1644   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1645                                  &find_receive_context,
1646                                  &frc);
1647   now = GNUNET_TIME_absolute_get ();
1648   d_ctx = frc.rc;
1649
1650   if (d_ctx == NULL)
1651   {
1652     /* Create a new defragmentation context */
1653     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1654     memcpy (&d_ctx[1], addr, fromlen);
1655     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1656     d_ctx->addr_len = fromlen;
1657     d_ctx->plugin = plugin;
1658     d_ctx->defrag =
1659         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1660                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1661                                           &fragment_msg_proc, &ack_proc);
1662     d_ctx->hnode =
1663         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1664                                       (GNUNET_CONTAINER_HeapCostType)
1665                                       now.abs_value);
1666   LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n",
1667        (unsigned int) ntohs (msg->size),
1668        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1669   }
1670   else
1671   {
1672   LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1673        (unsigned int) ntohs (msg->size),
1674        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1675   }
1676
1677   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1678   {
1679     /* keep this 'rc' from expiring */
1680     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1681                                        (GNUNET_CONTAINER_HeapCostType)
1682                                        now.abs_value);
1683   }
1684   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1685       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1686   {
1687     /* remove 'rc' that was inactive the longest */
1688     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1689     GNUNET_assert (NULL != d_ctx);
1690     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1691     GNUNET_free (d_ctx);
1692   }
1693 }
1694
1695 /**
1696  * Read and process a message from the given socket.
1697  *
1698  * @param plugin the overall plugin
1699  * @param rsock socket to read from
1700  */
1701 static void
1702 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1703 {
1704   socklen_t fromlen;
1705   char addr[32];
1706   char buf[65536] GNUNET_ALIGN;
1707   ssize_t size;
1708   const struct GNUNET_MessageHeader *msg;
1709
1710   fromlen = sizeof (addr);
1711   memset (&addr, 0, sizeof (addr));
1712   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1713                                       (struct sockaddr *) &addr, &fromlen);
1714
1715   if (size < sizeof (struct GNUNET_MessageHeader))
1716   {
1717     GNUNET_break_op (0);
1718     return;
1719   }
1720   msg = (const struct GNUNET_MessageHeader *) buf;
1721
1722   LOG (GNUNET_ERROR_TYPE_DEBUG,
1723        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1724        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1725
1726   if (size != ntohs (msg->size))
1727   {
1728     GNUNET_break_op (0);
1729     return;
1730   }
1731
1732   switch (ntohs (msg->type))
1733   {
1734   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1735     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1736     return;
1737
1738   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1739     read_process_msg (plugin, msg, addr, fromlen);
1740     return;
1741
1742   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1743     read_process_ack (plugin, msg, addr, fromlen);
1744     return;
1745
1746   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1747     read_process_fragment (plugin, msg, addr, fromlen);
1748     return;
1749
1750   default:
1751     GNUNET_break_op (0);
1752     return;
1753   }
1754 }
1755
1756
1757 static size_t
1758 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
1759 {
1760   ssize_t sent;
1761   size_t slen;
1762   struct GNUNET_TIME_Absolute max;
1763   struct GNUNET_TIME_Absolute ;
1764
1765   struct UDPMessageWrapper *udpw = NULL;
1766
1767   if (sock == plugin->sockv4)
1768   {
1769     udpw = plugin->ipv4_queue_head;
1770   }
1771   else if (sock == plugin->sockv6)
1772   {
1773     udpw = plugin->ipv6_queue_head;
1774   }
1775   else
1776   {
1777     GNUNET_break (0);
1778     return 0;
1779   }
1780
1781   const struct sockaddr * sa = udpw->session->sock_addr;
1782   slen = udpw->session->addrlen;
1783
1784   max = GNUNET_TIME_absolute_max(udpw->timeout, GNUNET_TIME_absolute_get());
1785
1786   while (udpw != NULL)
1787   {
1788     if (max.abs_value != udpw->timeout.abs_value)
1789     {
1790       /* Message timed out */
1791       call_continuation(udpw, GNUNET_SYSERR);
1792       if (udpw->frag_ctx != NULL)
1793       {
1794         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n",
1795             GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
1796         udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
1797         GNUNET_free (udpw->frag_ctx);
1798         udpw->session->frag_ctx = NULL;
1799       }
1800       else
1801       {
1802         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' with size %u timed out\n",
1803             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1804       }
1805
1806       if (sock == plugin->sockv4)
1807       {
1808         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1809         GNUNET_free (udpw);
1810         udpw = plugin->ipv4_queue_head;
1811       }
1812       else if (sock == plugin->sockv6)
1813       {
1814         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1815         GNUNET_free (udpw);
1816         udpw = plugin->ipv6_queue_head;
1817       }
1818     }
1819     else
1820     {
1821       struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
1822       if (delta.rel_value == 0)
1823       {
1824         /* this message is not delayed */
1825         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is not delayed \n",
1826             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1827         break;
1828       }
1829       else
1830       {
1831         /* this message is delayed, try next */
1832         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is delayed for %llu \n",
1833             GNUNET_i2s(&udpw->session->target), udpw->msg_size,
1834             delta);
1835         udpw = udpw->next;
1836       }
1837     }
1838   }
1839
1840   if (udpw == NULL)
1841   {
1842     /* No message left */
1843     return 0;
1844   }
1845
1846   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->udp, udpw->msg_size, sa, slen);
1847
1848   if (GNUNET_SYSERR == sent)
1849   {
1850     LOG (GNUNET_ERROR_TYPE_ERROR,
1851          "UDP could not transmit %u-byte message to `%s': `%s'\n",
1852          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen),
1853          STRERROR (errno));
1854     call_continuation(udpw, GNUNET_SYSERR);
1855   }
1856   else
1857   {
1858     LOG (GNUNET_ERROR_TYPE_DEBUG,
1859          "UDP transmitted %u-byte message to `%s' (%d: %s)\n",
1860          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1861          (sent < 0) ? STRERROR (errno) : "ok");
1862     call_continuation(udpw, GNUNET_OK);
1863   }
1864
1865   if (sock == plugin->sockv4)
1866     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1867   else if (sock == plugin->sockv6)
1868     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1869   GNUNET_free (udpw);
1870   udpw = NULL;
1871
1872   return sent;
1873 }
1874
1875 /**
1876  * We have been notified that our readset has something to read.  We don't
1877  * know which socket needs to be read, so we have to check each one
1878  * Then reschedule this function to be called again once more is available.
1879  *
1880  * @param cls the plugin handle
1881  * @param tc the scheduling context (for rescheduling this function again)
1882  */
1883 static void
1884 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1885 {
1886   struct Plugin *plugin = cls;
1887
1888   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
1889   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1890     return;
1891   plugin->with_v4_ws = GNUNET_NO;
1892
1893   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1894   {
1895     if ((NULL != plugin->sockv4) &&
1896       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
1897         udp_select_read (plugin, plugin->sockv4);
1898
1899   }
1900
1901   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1902   {
1903     if ((NULL != plugin->sockv4) && (plugin->ipv4_queue_head != NULL) &&
1904       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
1905       {
1906         udp_select_send (plugin, plugin->sockv4);
1907       }
1908   }
1909
1910   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1911     GNUNET_SCHEDULER_cancel (plugin->select_task);
1912   plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1913                                    GNUNET_TIME_UNIT_FOREVER_REL,
1914                                    plugin->rs_v4,
1915                                    (plugin->ipv4_queue_head != NULL) ? plugin->ws_v4 : NULL,
1916                                    &udp_plugin_select, plugin);
1917   if (plugin->ipv4_queue_head != NULL)
1918     plugin->with_v4_ws = GNUNET_YES;
1919   else
1920     plugin->with_v4_ws = GNUNET_NO;
1921 }
1922
1923
1924 /**
1925  * We have been notified that our readset has something to read.  We don't
1926  * know which socket needs to be read, so we have to check each one
1927  * Then reschedule this function to be called again once more is available.
1928  *
1929  * @param cls the plugin handle
1930  * @param tc the scheduling context (for rescheduling this function again)
1931  */
1932 static void
1933 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1934 {
1935   struct Plugin *plugin = cls;
1936
1937   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
1938   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1939     return;
1940
1941   plugin->with_v6_ws = GNUNET_NO;
1942   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1943   {
1944     if ((NULL != plugin->sockv6) &&
1945       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
1946         udp_select_read (plugin, plugin->sockv6);
1947   }
1948
1949   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1950   {
1951     if ((NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
1952       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)))
1953       {
1954         udp_select_send (plugin, plugin->sockv6);
1955       }
1956   }
1957   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1958     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
1959   plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1960                                    GNUNET_TIME_UNIT_FOREVER_REL,
1961                                    plugin->rs_v6,
1962                                    (plugin->ipv6_queue_head != NULL) ? plugin->ws_v6 : NULL,
1963                                    &udp_plugin_select_v6, plugin);
1964   if (plugin->ipv6_queue_head != NULL)
1965     plugin->with_v6_ws = GNUNET_YES;
1966   else
1967     plugin->with_v6_ws = GNUNET_NO;
1968 }
1969
1970
1971 static int
1972 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
1973 {
1974   int tries;
1975   int sockets_created = 0;
1976   struct sockaddr *serverAddr;
1977   struct sockaddr *addrs[2];
1978   socklen_t addrlens[2];
1979   socklen_t addrlen;
1980
1981   /* Create IPv6 socket */
1982   if (plugin->enable_ipv6 == GNUNET_YES)
1983   {
1984     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
1985     if (NULL == plugin->sockv6)
1986     {
1987       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
1988       plugin->enable_ipv6 = GNUNET_NO;
1989     }
1990     else
1991     {
1992 #if HAVE_SOCKADDR_IN_SIN_LEN
1993       serverAddrv6->sin6_len = sizeof (serverAddrv6);
1994 #endif
1995       serverAddrv6->sin6_family = AF_INET6;
1996       serverAddrv6->sin6_addr = in6addr_any;
1997       serverAddrv6->sin6_port = htons (plugin->port);
1998       addrlen = sizeof (struct sockaddr_in6);
1999       serverAddr = (struct sockaddr *) serverAddrv6;
2000       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
2001            ntohs (serverAddrv6->sin6_port));
2002       tries = 0;
2003       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
2004              GNUNET_OK)
2005       {
2006         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
2007         LOG (GNUNET_ERROR_TYPE_DEBUG,
2008              "IPv6 Binding failed, trying new port %d\n",
2009              ntohs (serverAddrv6->sin6_port));
2010         tries++;
2011         if (tries > 10)
2012         {
2013           GNUNET_NETWORK_socket_close (plugin->sockv6);
2014           plugin->sockv6 = NULL;
2015           break;
2016         }
2017       }
2018       if (plugin->sockv6 != NULL)
2019       {
2020         LOG (GNUNET_ERROR_TYPE_DEBUG,
2021              "IPv6 socket created on port %d\n",
2022              ntohs (serverAddrv6->sin6_port));
2023         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
2024         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2025         sockets_created++;
2026       }
2027     }
2028   }
2029
2030   /* Create IPv4 socket */
2031   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2032   if (NULL == plugin->sockv4)
2033   {
2034     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2035   }
2036   else
2037   {
2038 #if HAVE_SOCKADDR_IN_SIN_LEN
2039     serverAddrv4->sin_len = sizeof (serverAddrv4);
2040 #endif
2041     serverAddrv4->sin_family = AF_INET;
2042     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2043     serverAddrv4->sin_port = htons (plugin->port);
2044     addrlen = sizeof (struct sockaddr_in);
2045     serverAddr = (struct sockaddr *) serverAddrv4;
2046
2047     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2048          ntohs (serverAddrv4->sin_port));
2049     tries = 0;
2050     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2051            GNUNET_OK)
2052     {
2053       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2054       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2055            ntohs (serverAddrv4->sin_port));
2056       tries++;
2057       if (tries > 10)
2058       {
2059         GNUNET_NETWORK_socket_close (plugin->sockv4);
2060         plugin->sockv4 = NULL;
2061         break;
2062       }
2063     }
2064     if (plugin->sockv4 != NULL)
2065     {
2066       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2067       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2068       sockets_created++;
2069     }
2070   }
2071
2072   /* Create file descriptors */
2073   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2074   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2075   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2076   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2077   if (NULL != plugin->sockv4)
2078   {
2079     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2080     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2081   }
2082
2083   if (sockets_created == 0)
2084     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2085
2086   plugin->select_task =
2087       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2088                                    GNUNET_TIME_UNIT_FOREVER_REL,
2089                                    plugin->rs_v4,
2090                                    NULL,
2091                                    &udp_plugin_select, plugin);
2092   plugin->with_v4_ws = GNUNET_NO;
2093
2094   if (plugin->enable_ipv6 == GNUNET_YES)
2095   {
2096     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2097     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2098     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2099     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2100     if (NULL != plugin->sockv6)
2101     {
2102       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2103       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2104     }
2105
2106     plugin->select_task_v6 =
2107         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2108                                      GNUNET_TIME_UNIT_FOREVER_REL,
2109                                      plugin->rs_v6,
2110                                      NULL,
2111                                      &udp_plugin_select_v6, plugin);
2112     plugin->with_v6_ws = GNUNET_NO;
2113   }
2114
2115   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2116                            GNUNET_NO, plugin->port,
2117                            sockets_created,
2118                            (const struct sockaddr **) addrs, addrlens,
2119                            &udp_nat_port_map_callback, NULL, plugin);
2120
2121   return sockets_created;
2122 }
2123
2124
2125 /**
2126  * The exported method. Makes the core api available via a global and
2127  * returns the udp transport API.
2128  *
2129  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2130  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2131  */
2132 void *
2133 libgnunet_plugin_transport_udp_init (void *cls)
2134 {
2135   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2136   struct GNUNET_TRANSPORT_PluginFunctions *api;
2137   struct Plugin *plugin;
2138
2139   unsigned long long port;
2140   unsigned long long aport;
2141   unsigned long long broadcast;
2142   unsigned long long udp_max_bps;
2143   unsigned long long enable_v6;
2144   char * bind4_address;
2145   char * bind6_address;
2146   struct GNUNET_TIME_Relative interval;
2147
2148   struct sockaddr_in serverAddrv4;
2149   struct sockaddr_in6 serverAddrv6;
2150
2151   int res;
2152
2153   if (NULL == env->receive)
2154   {
2155     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2156        initialze the plugin or the API */
2157     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2158     api->cls = NULL;
2159     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2160     api->address_to_string = &udp_address_to_string;
2161     api->string_to_address = &udp_string_to_address;
2162     return api;
2163   }
2164
2165   GNUNET_assert( NULL != env->stats);
2166
2167   /* Get port number */
2168   if (GNUNET_OK !=
2169       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2170                                              &port))
2171     port = 2086;
2172   if (GNUNET_OK !=
2173       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2174                                              "ADVERTISED_PORT", &aport))
2175     aport = port;
2176   if (port > 65535)
2177   {
2178     LOG (GNUNET_ERROR_TYPE_WARNING,
2179          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2180          65535);
2181     return NULL;
2182   }
2183
2184   /* Protocols */
2185   if ((GNUNET_YES ==
2186        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2187                                              "DISABLEV6")))
2188   {
2189     enable_v6 = GNUNET_NO;
2190   }
2191   else
2192     enable_v6 = GNUNET_YES;
2193
2194
2195   /* Addresses */
2196   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2197   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2198
2199   if (GNUNET_YES ==
2200       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2201                                              "BINDTO", &bind4_address))
2202   {
2203     LOG (GNUNET_ERROR_TYPE_DEBUG,
2204          "Binding udp plugin to specific address: `%s'\n",
2205          bind4_address);
2206     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2207     {
2208       GNUNET_free (bind4_address);
2209       return NULL;
2210     }
2211   }
2212
2213   if (GNUNET_YES ==
2214       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2215                                              "BINDTO6", &bind6_address))
2216   {
2217     LOG (GNUNET_ERROR_TYPE_DEBUG,
2218          "Binding udp plugin to specific address: `%s'\n",
2219          bind6_address);
2220     if (1 !=
2221         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2222     {
2223       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2224            bind6_address);
2225       GNUNET_free_non_null (bind4_address);
2226       GNUNET_free (bind6_address);
2227       return NULL;
2228     }
2229   }
2230
2231
2232   /* Enable neighbour discovery */
2233   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2234                                             "BROADCAST");
2235   if (broadcast == GNUNET_SYSERR)
2236     broadcast = GNUNET_NO;
2237
2238   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-udp",
2239                                            "BROADCAST_INTERVAL", &interval))
2240   {
2241     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2242   }
2243
2244   /* Maximum datarate */
2245   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2246                                              "MAX_BPS", &udp_max_bps))
2247   {
2248     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2249   }
2250
2251   plugin = GNUNET_malloc (sizeof (struct Plugin));
2252   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2253
2254   GNUNET_BANDWIDTH_tracker_init (&plugin->tracker,
2255                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2256
2257
2258   plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
2259   plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2260   plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
2261   plugin->port = port;
2262   plugin->aport = aport;
2263   plugin->broadcast_interval = interval;
2264   plugin->enable_ipv6 = enable_v6;
2265   plugin->env = env;
2266
2267   api->cls = plugin;
2268   api->send = NULL;
2269   api->disconnect = &udp_disconnect;
2270   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2271   api->address_to_string = &udp_address_to_string;
2272   api->string_to_address = &udp_string_to_address;
2273   api->check_address = &udp_plugin_check_address;
2274   api->get_session = &udp_plugin_get_session;
2275   api->send = &udp_plugin_send;
2276
2277   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2278   res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
2279   if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
2280   {
2281     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2282     GNUNET_free (plugin);
2283     GNUNET_free (api);
2284     return NULL;
2285   }
2286
2287   if (broadcast == GNUNET_YES)
2288   {
2289     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2290     setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
2291   }
2292
2293   GNUNET_free_non_null (bind4_address);
2294   GNUNET_free_non_null (bind6_address);
2295   return api;
2296 }
2297
2298
2299 static int
2300 heap_cleanup_iterator (void *cls,
2301                        struct GNUNET_CONTAINER_HeapNode *
2302                        node, void *element,
2303                        GNUNET_CONTAINER_HeapCostType
2304                        cost)
2305 {
2306   struct DefragContext * d_ctx = element;
2307
2308   GNUNET_CONTAINER_heap_remove_node (node);
2309   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2310   GNUNET_free (d_ctx);
2311
2312   return GNUNET_YES;
2313 }
2314
2315
2316 /**
2317  * The exported method. Makes the core api available via a global and
2318  * returns the udp transport API.
2319  *
2320  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2321  * @return NULL
2322  */
2323 void *
2324 libgnunet_plugin_transport_udp_done (void *cls)
2325 {
2326   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2327   struct Plugin *plugin = api->cls;
2328
2329   if (NULL == plugin)
2330   {
2331     GNUNET_free (api);
2332     return NULL;
2333   }
2334
2335   stop_broadcast (plugin);
2336
2337   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2338   {
2339     GNUNET_SCHEDULER_cancel (plugin->select_task);
2340     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2341   }
2342   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2343   {
2344     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2345     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2346   }
2347
2348   /* Closing sockets */
2349   if (plugin->sockv4 != NULL)
2350   {
2351     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2352     plugin->sockv4 = NULL;
2353   }
2354   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2355   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2356
2357   if (plugin->sockv6 != NULL)
2358   {
2359     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2360     plugin->sockv6 = NULL;
2361
2362     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2363     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2364   }
2365
2366   GNUNET_NAT_unregister (plugin->nat);
2367
2368   if (plugin->defrag_ctxs != NULL)
2369   {
2370     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2371         heap_cleanup_iterator, NULL);
2372     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2373     plugin->defrag_ctxs = NULL;
2374   }
2375   if (plugin->mst != NULL)
2376   {
2377     GNUNET_SERVER_mst_destroy(plugin->mst);
2378     plugin->mst = NULL;
2379   }
2380
2381   /* Clean up leftover messages */
2382   struct UDPMessageWrapper * udpw;
2383   udpw = plugin->ipv4_queue_head;
2384   while (udpw != NULL)
2385   {
2386     struct UDPMessageWrapper *tmp = udpw->next;
2387     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
2388     call_continuation(udpw, GNUNET_SYSERR);
2389     GNUNET_free (udpw);
2390     udpw = tmp;
2391   }
2392   udpw = plugin->ipv6_queue_head;
2393   while (udpw != NULL)
2394   {
2395     struct UDPMessageWrapper *tmp = udpw->next;
2396     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
2397     call_continuation(udpw, GNUNET_SYSERR);
2398     GNUNET_free (udpw);
2399     udpw = tmp;
2400   }
2401
2402   /* Clean up sessions */
2403   LOG (GNUNET_ERROR_TYPE_DEBUG,
2404        "Cleaning up sessions\n");
2405   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2406   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2407
2408   plugin->nat = NULL;
2409   GNUNET_free (plugin);
2410   GNUNET_free (api);
2411   return NULL;
2412 }
2413
2414
2415 /* end of plugin_transport_udp.c */