-fix uninit
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 struct Session
90 {
91   /**
92    * Which peer is this session for?
93    */
94   struct GNUNET_PeerIdentity target;
95
96   /**
97    * Address of the other peer
98    */
99   const struct sockaddr *sock_addr;
100
101   size_t addrlen;
102
103   /**
104    * Desired delay for next sending we send to other peer
105    */
106   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
107
108   /**
109    * Desired delay for next sending we received from other peer
110    */
111   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
112
113   /**
114    * expected delay for ACKs
115    */
116   struct GNUNET_TIME_Relative last_expected_delay;
117
118   struct GNUNET_ATS_Information ats;
119
120   struct FragmentationContext * frag_ctx;
121
122   unsigned int rc;
123
124   int in_destroy;
125 };
126
127
128 struct SessionCompareContext
129 {
130   struct Session *res;
131   const struct GNUNET_HELLO_Address *addr;
132 };
133
134
135 /**
136  * Closure for 'process_inbound_tokenized_messages'
137  */
138 struct SourceInformation
139 {
140   /**
141    * Sender identity.
142    */
143   struct GNUNET_PeerIdentity sender;
144
145   /**
146    * Source address.
147    */
148   const void *arg;
149
150   /**
151    * Number of bytes in source address.
152    */
153   size_t args;
154
155   struct Session *session;
156 };
157
158
159 /**
160  * Closure for 'find_receive_context'.
161  */
162 struct FindReceiveContext
163 {
164   /**
165    * Where to store the result.
166    */
167   struct DefragContext *rc;
168
169   /**
170    * Address to find.
171    */
172   const struct sockaddr *addr;
173
174   /**
175    * Number of bytes in 'addr'.
176    */
177   socklen_t addr_len;
178
179   struct Session *session;
180 };
181
182
183
184 /**
185  * Data structure to track defragmentation contexts based
186  * on the source of the UDP traffic.
187  */
188 struct DefragContext
189 {
190
191   /**
192    * Defragmentation context.
193    */
194   struct GNUNET_DEFRAGMENT_Context *defrag;
195
196   /**
197    * Source address this receive context is for (allocated at the
198    * end of the struct).
199    */
200   const struct sockaddr *src_addr;
201
202   /**
203    * Reference to master plugin struct.
204    */
205   struct Plugin *plugin;
206
207   /**
208    * Node in the defrag heap.
209    */
210   struct GNUNET_CONTAINER_HeapNode *hnode;
211
212   /**
213    * Length of 'src_addr'
214    */
215   size_t addr_len;
216 };
217
218
219
220 /**
221  * Closure for 'process_inbound_tokenized_messages'
222  */
223 struct FragmentationContext
224 {
225   struct FragmentationContext * next;
226   struct FragmentationContext * prev;
227
228   struct Plugin * plugin;
229   struct GNUNET_FRAGMENT_Context * frag;
230   struct Session * session;
231
232   struct GNUNET_TIME_Absolute timeout;
233
234
235   /**
236    * Function to call upon completion of the transmission.
237    */
238   GNUNET_TRANSPORT_TransmitContinuation cont;
239
240   /**
241    * Closure for 'cont'.
242    */
243   void *cont_cls;
244
245   size_t bytes_to_send;
246 };
247
248
249 struct UDPMessageWrapper
250 {
251   struct Session *session;
252   struct UDPMessageWrapper *prev;
253   struct UDPMessageWrapper *next;
254   char *udp;
255   size_t msg_size;
256
257   struct GNUNET_TIME_Absolute timeout;
258
259   /**
260    * Function to call upon completion of the transmission.
261    */
262   GNUNET_TRANSPORT_TransmitContinuation cont;
263
264   /**
265    * Closure for 'cont'.
266    */
267   void *cont_cls;
268
269   struct FragmentationContext *frag_ctx;
270
271 };
272
273
274 /**
275  * UDP ACK Message-Packet header (after defragmentation).
276  */
277 struct UDP_ACK_Message
278 {
279   /**
280    * Message header.
281    */
282   struct GNUNET_MessageHeader header;
283
284   /**
285    * Desired delay for flow control
286    */
287   uint32_t delay;
288
289   /**
290    * What is the identity of the sender
291    */
292   struct GNUNET_PeerIdentity sender;
293
294 };
295
296 /**
297  * We have been notified that our readset has something to read.  We don't
298  * know which socket needs to be read, so we have to check each one
299  * Then reschedule this function to be called again once more is available.
300  *
301  * @param cls the plugin handle
302  * @param tc the scheduling context (for rescheduling this function again)
303  */
304 static void
305 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
306
307 /**
308  * We have been notified that our readset has something to read.  We don't
309  * know which socket needs to be read, so we have to check each one
310  * Then reschedule this function to be called again once more is available.
311  *
312  * @param cls the plugin handle
313  * @param tc the scheduling context (for rescheduling this function again)
314  */
315 static void
316 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
317
318 /**
319  * Function called for a quick conversion of the binary address to
320  * a numeric address.  Note that the caller must not free the
321  * address and that the next call to this function is allowed
322  * to override the address again.
323  *
324  * @param cls closure
325  * @param addr binary address
326  * @param addrlen length of the address
327  * @return string representing the same address
328  */
329 const char *
330 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
331 {
332   static char rbuf[INET6_ADDRSTRLEN + 10];
333   char buf[INET6_ADDRSTRLEN];
334   const void *sb;
335   struct in_addr a4;
336   struct in6_addr a6;
337   const struct IPv4UdpAddress *t4;
338   const struct IPv6UdpAddress *t6;
339   int af;
340   uint16_t port;
341
342   if (addrlen == sizeof (struct IPv6UdpAddress))
343   {
344     t6 = addr;
345     af = AF_INET6;
346     port = ntohs (t6->u6_port);
347     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
348     sb = &a6;
349   }
350   else if (addrlen == sizeof (struct IPv4UdpAddress))
351   {
352     t4 = addr;
353     af = AF_INET;
354     port = ntohs (t4->u4_port);
355     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
356     sb = &a4;
357   }
358   else
359   {
360     GNUNET_break_op (0);
361     return NULL;
362   }
363   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
364   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
365                    buf, port);
366   return rbuf;
367 }
368
369
370 /**
371  * Function called to convert a string address to
372  * a binary address.
373  *
374  * @param cls closure ('struct Plugin*')
375  * @param addr string address
376  * @param addrlen length of the address
377  * @param buf location to store the buffer
378  * @param added location to store the number of bytes in the buffer.
379  *        If the function returns GNUNET_SYSERR, its contents are undefined.
380  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
381  */
382 int
383 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
384     void **buf, size_t *added)
385 {
386   struct sockaddr_storage socket_address;
387
388   if ((NULL == addr) || (addrlen == 0))
389   {
390     GNUNET_break (0);
391     return GNUNET_SYSERR;
392   }
393
394   if ('\0' != addr[addrlen - 1])
395   {
396     return GNUNET_SYSERR;
397   }
398
399   if (strlen (addr) != addrlen - 1)
400   {
401     return GNUNET_SYSERR;
402   }
403
404   int ret = GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
405     &socket_address);
406
407   if (ret != GNUNET_OK)
408   {
409     return GNUNET_SYSERR;
410   }
411
412   if (socket_address.ss_family == AF_INET)
413   {
414     struct IPv4UdpAddress *u4;
415     struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
416     u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
417     u4->ipv4_addr = in4->sin_addr.s_addr;
418     u4->u4_port = in4->sin_port;
419     *buf = u4;
420     *added = sizeof (struct IPv4UdpAddress);
421     return GNUNET_OK;
422   }
423   else if (socket_address.ss_family == AF_INET6)
424   {
425     struct IPv6UdpAddress *u6;
426     struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
427     u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
428     u6->ipv6_addr = in6->sin6_addr;
429     u6->u6_port = in6->sin6_port;
430     *buf = u6;
431     *added = sizeof (struct IPv6UdpAddress);
432     return GNUNET_OK;
433   }
434   return GNUNET_SYSERR;
435 }
436
437
438 /**
439  * Append our port and forward the result.
440  *
441  * @param cls a 'struct PrettyPrinterContext'
442  * @param hostname result from DNS resolver
443  */
444 static void
445 append_port (void *cls, const char *hostname)
446 {
447   struct PrettyPrinterContext *ppc = cls;
448   char *ret;
449
450   if (hostname == NULL)
451   {
452     ppc->asc (ppc->asc_cls, NULL);
453     GNUNET_free (ppc);
454     return;
455   }
456   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
457   ppc->asc (ppc->asc_cls, ret);
458   GNUNET_free (ret);
459 }
460
461
462 /**
463  * Convert the transports address to a nice, human-readable
464  * format.
465  *
466  * @param cls closure
467  * @param type name of the transport that generated the address
468  * @param addr one of the addresses of the host, NULL for the last address
469  *        the specific address format depends on the transport
470  * @param addrlen length of the address
471  * @param numeric should (IP) addresses be displayed in numeric form?
472  * @param timeout after how long should we give up?
473  * @param asc function to call on each string
474  * @param asc_cls closure for asc
475  */
476 static void
477 udp_plugin_address_pretty_printer (void *cls, const char *type,
478                                    const void *addr, size_t addrlen,
479                                    int numeric,
480                                    struct GNUNET_TIME_Relative timeout,
481                                    GNUNET_TRANSPORT_AddressStringCallback asc,
482                                    void *asc_cls)
483 {
484   struct PrettyPrinterContext *ppc;
485   const void *sb;
486   size_t sbs;
487   struct sockaddr_in a4;
488   struct sockaddr_in6 a6;
489   const struct IPv4UdpAddress *u4;
490   const struct IPv6UdpAddress *u6;
491   uint16_t port;
492
493   if (addrlen == sizeof (struct IPv6UdpAddress))
494   {
495     u6 = addr;
496     memset (&a6, 0, sizeof (a6));
497     a6.sin6_family = AF_INET6;
498 #if HAVE_SOCKADDR_IN_SIN_LEN
499     a6.sin6_len = sizeof (a6);
500 #endif
501     a6.sin6_port = u6->u6_port;
502     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
503     port = ntohs (u6->u6_port);
504     sb = &a6;
505     sbs = sizeof (a6);
506   }
507   else if (addrlen == sizeof (struct IPv4UdpAddress))
508   {
509     u4 = addr;
510     memset (&a4, 0, sizeof (a4));
511     a4.sin_family = AF_INET;
512 #if HAVE_SOCKADDR_IN_SIN_LEN
513     a4.sin_len = sizeof (a4);
514 #endif
515     a4.sin_port = u4->u4_port;
516     a4.sin_addr.s_addr = u4->ipv4_addr;
517     port = ntohs (u4->u4_port);
518     sb = &a4;
519     sbs = sizeof (a4);
520   }
521   else if (0 == addrlen)
522   {
523     asc (asc_cls, "<inbound connection>");
524     asc (asc_cls, NULL);
525     return;
526   }
527   else
528   {
529     /* invalid address */
530     GNUNET_break_op (0);
531     asc (asc_cls, NULL);
532     return;
533   }
534   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
535   ppc->asc = asc;
536   ppc->asc_cls = asc_cls;
537   ppc->port = port;
538   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
539 }
540
541
542 /**
543  * Check if the given port is plausible (must be either our listen
544  * port or our advertised port).  If it is neither, we return
545  * GNUNET_SYSERR.
546  *
547  * @param plugin global variables
548  * @param in_port port number to check
549  * @return GNUNET_OK if port is either open_port or adv_port
550  */
551 static int
552 check_port (struct Plugin *plugin, uint16_t in_port)
553 {
554   if ((in_port == plugin->port) || (in_port == plugin->aport))
555     return GNUNET_OK;
556   return GNUNET_SYSERR;
557 }
558
559
560
561 /**
562  * Function that will be called to check if a binary address for this
563  * plugin is well-formed and corresponds to an address for THIS peer
564  * (as per our configuration).  Naturally, if absolutely necessary,
565  * plugins can be a bit conservative in their answer, but in general
566  * plugins should make sure that the address does not redirect
567  * traffic to a 3rd party that might try to man-in-the-middle our
568  * traffic.
569  *
570  * @param cls closure, should be our handle to the Plugin
571  * @param addr pointer to the address
572  * @param addrlen length of addr
573  * @return GNUNET_OK if this is a plausible address for this peer
574  *         and transport, GNUNET_SYSERR if not
575  *
576  */
577 static int
578 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
579 {
580   struct Plugin *plugin = cls;
581   struct IPv4UdpAddress *v4;
582   struct IPv6UdpAddress *v6;
583
584   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
585       (addrlen != sizeof (struct IPv6UdpAddress)))
586   {
587     GNUNET_break_op (0);
588     return GNUNET_SYSERR;
589   }
590   if (addrlen == sizeof (struct IPv4UdpAddress))
591   {
592     v4 = (struct IPv4UdpAddress *) addr;
593     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
594       return GNUNET_SYSERR;
595     if (GNUNET_OK !=
596         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
597                                  sizeof (struct in_addr)))
598       return GNUNET_SYSERR;
599   }
600   else
601   {
602     v6 = (struct IPv6UdpAddress *) addr;
603     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
604     {
605       GNUNET_break_op (0);
606       return GNUNET_SYSERR;
607     }
608     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
609       return GNUNET_SYSERR;
610     if (GNUNET_OK !=
611         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
612                                  sizeof (struct in6_addr)))
613       return GNUNET_SYSERR;
614   }
615   return GNUNET_OK;
616 }
617
618
619 /**
620  * Task to free resources associated with a session.
621  *
622  * @param s session to free
623  */
624 static void
625 free_session (struct Session *s)
626 {
627   if (s->frag_ctx != NULL)
628   {
629     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
630     GNUNET_free (s->frag_ctx);
631     s->frag_ctx = NULL;
632   }
633   GNUNET_free (s);
634 }
635
636
637 /**
638  * Destroy a session, plugin is being unloaded.
639  *
640  * @param cls unused
641  * @param key hash of public key of target peer
642  * @param value a 'struct PeerSession*' to clean up
643  * @return GNUNET_OK (continue to iterate)
644  */
645 static int
646 disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
647 {
648   struct Plugin *plugin = cls;
649   struct Session *s = value;
650   struct UDPMessageWrapper *udpw;
651   struct UDPMessageWrapper *next;
652
653   GNUNET_assert (GNUNET_YES != s->in_destroy);
654   LOG (GNUNET_ERROR_TYPE_DEBUG,
655        "Session %p to peer `%s' address ended \n",
656          s,
657          GNUNET_i2s (&s->target),
658          GNUNET_a2s (s->sock_addr, s->addrlen));
659   next = plugin->ipv4_queue_head;
660   while (NULL != (udpw = next))
661   {
662     next = udpw->next;
663     if (udpw->session == s)
664     {
665       GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
666       if (udpw->cont != NULL)
667         udpw->cont (udpw->cont_cls, &s->target, GNUNET_SYSERR);
668       GNUNET_free (udpw);
669     }
670   }
671   next = plugin->ipv6_queue_head;
672   while (NULL != (udpw = next))
673   {
674     next = udpw->next;
675     if (udpw->session == s)
676     {
677       GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
678
679       if (udpw->cont != NULL)
680         udpw->cont (udpw->cont_cls, &s->target, GNUNET_SYSERR);
681       GNUNET_free (udpw);
682     }
683     udpw = next;
684   }
685   plugin->env->session_end (plugin->env->cls, &s->target, s);
686   GNUNET_assert (GNUNET_YES ==
687                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
688                                                        &s->target.hashPubKey,
689                                                        s));
690   GNUNET_STATISTICS_set(plugin->env->stats,
691                         "# UDP sessions active",
692                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
693                         GNUNET_NO);
694   if (s->rc > 0)
695     s->in_destroy = GNUNET_YES;
696   else
697     free_session (s);
698   return GNUNET_OK;
699 }
700
701
702 /**
703  * Disconnect from a remote node.  Clean up session if we have one for this peer
704  *
705  * @param cls closure for this call (should be handle to Plugin)
706  * @param target the peeridentity of the peer to disconnect
707  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
708  */
709 static void
710 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
711 {
712   struct Plugin *plugin = cls;
713   GNUNET_assert (plugin != NULL);
714
715   GNUNET_assert (target != NULL);
716   LOG (GNUNET_ERROR_TYPE_DEBUG,
717        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
718   /* Clean up sessions */
719   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
720 }
721
722 static struct Session *
723 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
724                 const void *addr, size_t addrlen,
725                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
726 {
727   struct Session *s;
728   const struct IPv4UdpAddress *t4;
729   const struct IPv6UdpAddress *t6;
730   struct sockaddr_in *v4;
731   struct sockaddr_in6 *v6;
732   size_t len;
733
734   switch (addrlen)
735   {
736   case sizeof (struct IPv4UdpAddress):
737     if (NULL == plugin->sockv4)
738     {
739       return NULL;
740     }
741     t4 = addr;
742     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
743     len = sizeof (struct sockaddr_in);
744     v4 = (struct sockaddr_in *) &s[1];
745     v4->sin_family = AF_INET;
746 #if HAVE_SOCKADDR_IN_SIN_LEN
747     v4->sin_len = sizeof (struct sockaddr_in);
748 #endif
749     v4->sin_port = t4->u4_port;
750     v4->sin_addr.s_addr = t4->ipv4_addr;
751     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
752     break;
753   case sizeof (struct IPv6UdpAddress):
754     if (NULL == plugin->sockv6)
755     {
756       return NULL;
757     }
758     t6 = addr;
759     s =
760         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
761     len = sizeof (struct sockaddr_in6);
762     v6 = (struct sockaddr_in6 *) &s[1];
763     v6->sin6_family = AF_INET6;
764 #if HAVE_SOCKADDR_IN_SIN_LEN
765     v6->sin6_len = sizeof (struct sockaddr_in6);
766 #endif
767     v6->sin6_port = t6->u6_port;
768     v6->sin6_addr = t6->ipv6_addr;
769     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
770     break;
771   default:
772     /* Must have a valid address to send to */
773     GNUNET_break_op (0);
774     return NULL;
775   }
776
777   s->addrlen = len;
778   s->target = *target;
779   s->sock_addr = (const struct sockaddr *) &s[1];
780   s->flow_delay_for_other_peer = GNUNET_TIME_relative_get_zero();
781   s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
782   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
783
784   return s;
785 }
786
787
788 static int
789 session_cmp_it (void *cls,
790                 const GNUNET_HashCode * key,
791                 void *value)
792 {
793   struct SessionCompareContext * cctx = cls;
794   const struct GNUNET_HELLO_Address *address = cctx->addr;
795   struct Session *s = value;
796
797   socklen_t s_addrlen = s->addrlen;
798
799 #if VERBOSE_UDP
800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
801       udp_address_to_string (NULL, (void *) address->address, address->address_length),
802       GNUNET_a2s (s->sock_addr, s->addrlen));
803 #endif
804
805   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
806       (s_addrlen == sizeof (struct sockaddr_in)))
807   {
808     struct IPv4UdpAddress * u4 = NULL;
809     u4 = (struct IPv4UdpAddress *) address->address;
810     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
811     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
812         (u4->u4_port == s4->sin_port))
813     {
814       cctx->res = s;
815       return GNUNET_NO;
816     }
817
818   }
819   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
820       (s_addrlen == sizeof (struct sockaddr_in6)))
821   {
822     struct IPv6UdpAddress * u6 = NULL;
823     u6 = (struct IPv6UdpAddress *) address->address;
824     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
825     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
826         (u6->u6_port == s6->sin6_port))
827     {
828       cctx->res = s;
829       return GNUNET_NO;
830     }
831   }
832
833
834   return GNUNET_YES;
835 }
836
837
838 /**
839  * Creates a new outbound session the transport service will use to send data to the
840  * peer
841  *
842  * @param cls the plugin
843  * @param address the address
844  * @return the session or NULL of max connections exceeded
845  */
846 static struct Session *
847 udp_plugin_get_session (void *cls,
848                   const struct GNUNET_HELLO_Address *address)
849 {
850   struct Session * s = NULL;
851   struct Plugin * plugin = cls;
852   struct IPv6UdpAddress * udp_a6;
853   struct IPv4UdpAddress * udp_a4;
854
855   GNUNET_assert (plugin != NULL);
856   GNUNET_assert (address != NULL);
857
858
859   if ((address->address == NULL) ||
860       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
861       (address->address_length != sizeof (struct IPv6UdpAddress))))
862   {
863     GNUNET_break (0);
864     return NULL;
865   }
866
867   if (address->address_length == sizeof (struct IPv4UdpAddress))
868   {
869     if (plugin->sockv4 == NULL)
870       return NULL;
871     udp_a4 = (struct IPv4UdpAddress *) address->address;
872     if (udp_a4->u4_port == 0)
873       return NULL;
874   }
875
876   if (address->address_length == sizeof (struct IPv6UdpAddress))
877   {
878     if (plugin->sockv6 == NULL)
879       return NULL;
880     udp_a6 = (struct IPv6UdpAddress *) address->address;
881     if (udp_a6->u6_port == 0)
882       return NULL;
883   }
884
885   /* check if session already exists */
886   struct SessionCompareContext cctx;
887   cctx.addr = address;
888   cctx.res = NULL;
889 #if VERBOSE_UDP
890   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length));
891 #endif
892   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
893   if (cctx.res != NULL)
894   {
895 #if VERBOSE_UDP
896     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
897 #endif
898     return cctx.res;
899   }
900
901   /* otherwise create new */
902   s = create_session (plugin,
903       &address->peer,
904       address->address,
905       address->address_length,
906       NULL, NULL);
907 #if VERBOSE
908     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909               "Creating new session %p for peer `%s' address `%s'\n",
910               s,
911               GNUNET_i2s(&address->peer),
912               udp_address_to_string(NULL,address->address,address->address_length));
913 #endif
914   GNUNET_assert (GNUNET_OK ==
915                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
916                                                     &s->target.hashPubKey,
917                                                     s,
918                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
919
920   GNUNET_STATISTICS_set(plugin->env->stats,
921                         "# UDP sessions active",
922                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
923                         GNUNET_NO);
924
925   return s;
926 }
927
928
929 static void 
930 enqueue (struct Plugin *plugin, struct UDPMessageWrapper * udpw)
931 {
932
933   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
934     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
935   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
936     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
937 }
938
939 /**
940  * Function that is called with messages created by the fragmentation
941  * module.  In the case of the 'proc' callback of the
942  * GNUNET_FRAGMENT_context_create function, this function must
943  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
944  *
945  * @param cls closure, the 'struct FragmentationContext'
946  * @param msg the message that was created
947  */
948 static void
949 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
950 {
951   struct FragmentationContext *frag_ctx = cls;
952   struct Plugin *plugin = frag_ctx->plugin;
953   struct UDPMessageWrapper * udpw;
954   struct Session *s;
955
956   size_t msg_len = ntohs (msg->size);
957
958 #if VERBOSE_UDP
959   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
960 #endif
961
962   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
963   udpw->session = frag_ctx->session;
964   s = udpw->session;
965   udpw->udp = (char *) &udpw[1];
966
967   udpw->msg_size = msg_len;
968   udpw->cont = frag_ctx->cont;
969   udpw->cont_cls = frag_ctx->cont_cls;
970   udpw->timeout = frag_ctx->timeout;
971   udpw->frag_ctx = frag_ctx;
972   memcpy (udpw->udp, msg, msg_len);
973
974   enqueue (plugin, udpw);
975
976
977   if (s->addrlen == sizeof (struct sockaddr_in))
978   {
979     if (plugin->with_v4_ws == GNUNET_NO)
980     {
981       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
982         GNUNET_SCHEDULER_cancel(plugin->select_task);
983
984       plugin->select_task =
985           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
986                                        GNUNET_TIME_UNIT_FOREVER_REL,
987                                        plugin->rs_v4,
988                                        plugin->ws_v4,
989                                        &udp_plugin_select, plugin);
990       plugin->with_v4_ws = GNUNET_YES;
991     }
992   }
993
994   else if (s->addrlen == sizeof (struct sockaddr_in6))
995   {
996     if (plugin->with_v6_ws == GNUNET_NO)
997     {
998       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
999         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1000
1001       plugin->select_task_v6 =
1002           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1003                                        GNUNET_TIME_UNIT_FOREVER_REL,
1004                                        plugin->rs_v6,
1005                                        plugin->ws_v6,
1006                                        &udp_plugin_select_v6, plugin);
1007       plugin->with_v6_ws = GNUNET_YES;
1008     }
1009   }
1010 }
1011
1012
1013 /**
1014  * Function that can be used by the transport service to transmit
1015  * a message using the plugin.   Note that in the case of a
1016  * peer disconnecting, the continuation MUST be called
1017  * prior to the disconnect notification itself.  This function
1018  * will be called with this peer's HELLO message to initiate
1019  * a fresh connection to another peer.
1020  *
1021  * @param cls closure
1022  * @param s which session must be used
1023  * @param msgbuf the message to transmit
1024  * @param msgbuf_size number of bytes in 'msgbuf'
1025  * @param priority how important is the message (most plugins will
1026  *                 ignore message priority and just FIFO)
1027  * @param to how long to wait at most for the transmission (does not
1028  *                require plugins to discard the message after the timeout,
1029  *                just advisory for the desired delay; most plugins will ignore
1030  *                this as well)
1031  * @param cont continuation to call once the message has
1032  *        been transmitted (or if the transport is ready
1033  *        for the next transmission call; or if the
1034  *        peer disconnected...); can be NULL
1035  * @param cont_cls closure for cont
1036  * @return number of bytes used (on the physical network, with overheads);
1037  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1038  *         and does NOT mean that the message was not transmitted (DV)
1039  */
1040 static ssize_t
1041 udp_plugin_send (void *cls,
1042                   struct Session *s,
1043                   const char *msgbuf, size_t msgbuf_size,
1044                   unsigned int priority,
1045                   struct GNUNET_TIME_Relative to,
1046                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1047 {
1048   struct Plugin *plugin = cls;
1049   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
1050
1051   struct UDPMessageWrapper * udpw;
1052   struct UDPMessage *udp;
1053   char mbuf[mlen];
1054   GNUNET_assert (plugin != NULL);
1055   GNUNET_assert (s != NULL);
1056
1057   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1058     return GNUNET_SYSERR;
1059
1060    if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1061      return GNUNET_SYSERR;
1062
1063
1064   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1065   {
1066     GNUNET_break (0);
1067     return GNUNET_SYSERR;
1068   }
1069
1070   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1071   {
1072     GNUNET_break (0);
1073     return GNUNET_SYSERR;
1074   }
1075
1076   LOG (GNUNET_ERROR_TYPE_DEBUG,
1077        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1078          msgbuf_size,
1079          GNUNET_i2s (&s->target),
1080          GNUNET_a2s(s->sock_addr, s->addrlen));
1081
1082   /* Message */
1083   udp = (struct UDPMessage *) mbuf;
1084   udp->header.size = htons (mlen);
1085   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1086   udp->reserved = htonl (0);
1087   udp->sender = *plugin->env->my_identity;
1088
1089   if (mlen <= UDP_MTU)
1090   {
1091     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
1092     udpw->session = s;
1093     udpw->udp = (char *) &udpw[1];
1094     udpw->msg_size = mlen;
1095     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1096     udpw->cont = cont;
1097     udpw->cont_cls = cont_cls;
1098     udpw->frag_ctx = NULL;
1099
1100     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
1101     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1102
1103     enqueue (plugin, udpw);
1104   }
1105   else
1106   {
1107     LOG (GNUNET_ERROR_TYPE_DEBUG,
1108          "UDP has to fragment message \n");
1109     if  (s->frag_ctx != NULL)
1110       return GNUNET_SYSERR;
1111     memcpy (&udp[1], msgbuf, msgbuf_size);
1112     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
1113
1114     frag_ctx->plugin = plugin;
1115     frag_ctx->session = s;
1116     frag_ctx->cont = cont;
1117     frag_ctx->cont_cls = cont_cls;
1118     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1119     frag_ctx->bytes_to_send = mlen;
1120     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1121               UDP_MTU,
1122               &plugin->tracker,
1123               s->last_expected_delay,
1124               &udp->header,
1125               &enqueue_fragment,
1126               frag_ctx);
1127
1128     s->frag_ctx = frag_ctx;
1129
1130   }
1131
1132   if (s->addrlen == sizeof (struct sockaddr_in))
1133   {
1134     if (plugin->with_v4_ws == GNUNET_NO)
1135     {
1136       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1137         GNUNET_SCHEDULER_cancel(plugin->select_task);
1138
1139       plugin->select_task =
1140           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1141                                        GNUNET_TIME_UNIT_FOREVER_REL,
1142                                        plugin->rs_v4,
1143                                        plugin->ws_v4,
1144                                        &udp_plugin_select, plugin);
1145       plugin->with_v4_ws = GNUNET_YES;
1146     }
1147   }
1148
1149   else if (s->addrlen == sizeof (struct sockaddr_in6))
1150   {
1151     if (plugin->with_v6_ws == GNUNET_NO)
1152     {
1153       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1154         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1155
1156       plugin->select_task_v6 =
1157         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1158                                      GNUNET_TIME_UNIT_FOREVER_REL,
1159                                      plugin->rs_v6,
1160                                      plugin->ws_v6,
1161                                      &udp_plugin_select_v6, plugin);
1162       plugin->with_v6_ws = GNUNET_YES;
1163     }
1164   }
1165
1166   return mlen;
1167 }
1168
1169
1170 /**
1171  * Our external IP address/port mapping has changed.
1172  *
1173  * @param cls closure, the 'struct LocalAddrList'
1174  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1175  *     the previous (now invalid) one
1176  * @param addr either the previous or the new public IP address
1177  * @param addrlen actual lenght of the address
1178  */
1179 static void
1180 udp_nat_port_map_callback (void *cls, int add_remove,
1181                            const struct sockaddr *addr, socklen_t addrlen)
1182 {
1183   struct Plugin *plugin = cls;
1184   struct IPv4UdpAddress u4;
1185   struct IPv6UdpAddress u6;
1186   void *arg;
1187   size_t args;
1188
1189   /* convert 'addr' to our internal format */
1190   switch (addr->sa_family)
1191   {
1192   case AF_INET:
1193     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1194     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1195     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1196     arg = &u4;
1197     args = sizeof (u4);
1198     break;
1199   case AF_INET6:
1200     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1201     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1202             sizeof (struct in6_addr));
1203     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1204     arg = &u6;
1205     args = sizeof (u6);
1206     break;
1207   default:
1208     GNUNET_break (0);
1209     return;
1210   }
1211   /* modify our published address list */
1212   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args);
1213 }
1214
1215
1216
1217 /**
1218  * Message tokenizer has broken up an incomming message. Pass it on
1219  * to the service.
1220  *
1221  * @param cls the 'struct Plugin'
1222  * @param client the 'struct SourceInformation'
1223  * @param hdr the actual message
1224  */
1225 static void
1226 process_inbound_tokenized_messages (void *cls, void *client,
1227                                     const struct GNUNET_MessageHeader *hdr)
1228 {
1229   struct Plugin *plugin = cls;
1230   struct SourceInformation *si = client;
1231   struct GNUNET_ATS_Information ats[2];
1232   struct GNUNET_TIME_Relative delay;
1233
1234   GNUNET_assert (si->session != NULL);
1235   if (GNUNET_YES == si->session->in_destroy)
1236     return; 
1237   /* setup ATS */
1238   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1239   ats[0].value = htonl (1);
1240   ats[1] = si->session->ats;
1241   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1242
1243   delay = plugin->env->receive (plugin->env->cls,
1244                 &si->sender,
1245                 hdr,
1246                 (const struct GNUNET_ATS_Information *) &ats, 2,
1247                 NULL,
1248                 si->arg,
1249                 si->args);
1250   si->session->flow_delay_for_other_peer = delay;
1251 }
1252
1253
1254 /**
1255  * We've received a UDP Message.  Process it (pass contents to main service).
1256  *
1257  * @param plugin plugin context
1258  * @param msg the message
1259  * @param sender_addr sender address
1260  * @param sender_addr_len number of bytes in sender_addr
1261  */
1262 static void
1263 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1264                      const struct sockaddr *sender_addr,
1265                      socklen_t sender_addr_len)
1266 {
1267   struct SourceInformation si;
1268   struct Session * s;
1269   struct IPv4UdpAddress u4;
1270   struct IPv6UdpAddress u6;
1271   const void *arg;
1272   size_t args;
1273
1274   if (0 != ntohl (msg->reserved))
1275   {
1276     GNUNET_break_op (0);
1277     return;
1278   }
1279   if (ntohs (msg->header.size) <
1280       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1281   {
1282     GNUNET_break_op (0);
1283     return;
1284   }
1285
1286   /* convert address */
1287   switch (sender_addr->sa_family)
1288   {
1289   case AF_INET:
1290     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1291     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1292     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1293     arg = &u4;
1294     args = sizeof (u4);
1295     break;
1296   case AF_INET6:
1297     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1298     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1299     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1300     arg = &u6;
1301     args = sizeof (u6);
1302     break;
1303   default:
1304     GNUNET_break (0);
1305     return;
1306   }
1307   LOG (GNUNET_ERROR_TYPE_DEBUG,
1308        "Received message with %u bytes from peer `%s' at `%s'\n",
1309        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1310        GNUNET_a2s (sender_addr, sender_addr_len));
1311
1312   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1313   s = udp_plugin_get_session(plugin, address);
1314   GNUNET_free (address);
1315
1316   /* iterate over all embedded messages */
1317   si.session = s;
1318   si.sender = msg->sender;
1319   si.arg = arg;
1320   si.args = args;
1321   s->rc++;
1322   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1323                              ntohs (msg->header.size) -
1324                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1325   s->rc--;
1326   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1327     free_session (s);
1328 }
1329
1330
1331 /**
1332  * Scan the heap for a receive context with the given address.
1333  *
1334  * @param cls the 'struct FindReceiveContext'
1335  * @param node internal node of the heap
1336  * @param element value stored at the node (a 'struct ReceiveContext')
1337  * @param cost cost associated with the node
1338  * @return GNUNET_YES if we should continue to iterate,
1339  *         GNUNET_NO if not.
1340  */
1341 static int
1342 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1343                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1344 {
1345   struct FindReceiveContext *frc = cls;
1346   struct DefragContext *e = element;
1347
1348   if ((frc->addr_len == e->addr_len) &&
1349       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1350   {
1351     frc->rc = e;
1352     return GNUNET_NO;
1353   }
1354   return GNUNET_YES;
1355 }
1356
1357
1358 /**
1359  * Process a defragmented message.
1360  *
1361  * @param cls the 'struct ReceiveContext'
1362  * @param msg the message
1363  */
1364 static void
1365 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1366 {
1367   struct DefragContext *rc = cls;
1368
1369   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1370   {
1371     GNUNET_break (0);
1372     return;
1373   }
1374   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1375   {
1376     GNUNET_break (0);
1377     return;
1378   }
1379   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1380                        rc->src_addr, rc->addr_len);
1381 }
1382
1383 struct LookupContext
1384 {
1385   const struct sockaddr * addr;
1386   size_t addrlen;
1387
1388   struct Session *res;
1389 };
1390
1391 static int
1392 lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value)
1393 {
1394   struct LookupContext *l_ctx = cls;
1395   struct Session * s = value;
1396
1397   if ((s->addrlen == l_ctx->addrlen) &&
1398       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1399   {
1400     l_ctx->res = s;
1401     return GNUNET_NO;
1402   }
1403   return GNUNET_YES;
1404 }
1405
1406 /**
1407  * Transmit an acknowledgement.
1408  *
1409  * @param cls the 'struct ReceiveContext'
1410  * @param id message ID (unused)
1411  * @param msg ack to transmit
1412  */
1413 static void
1414 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1415 {
1416   struct DefragContext *rc = cls;
1417
1418   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1419   struct UDP_ACK_Message *udp_ack;
1420   uint32_t delay = 0;
1421   struct UDPMessageWrapper *udpw;
1422   struct Session *s;
1423
1424   struct LookupContext l_ctx;
1425   l_ctx.addr = rc->src_addr;
1426   l_ctx.addrlen = rc->addr_len;
1427   l_ctx.res = NULL;
1428   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1429       &lookup_session_by_addr_it,
1430       &l_ctx);
1431   s = l_ctx.res;
1432
1433   if (NULL == s)
1434     return;
1435
1436   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1437     delay = s->flow_delay_for_other_peer.rel_value;
1438
1439   LOG (GNUNET_ERROR_TYPE_DEBUG,
1440        "Sending ACK to `%s' including delay of %u ms\n",
1441        GNUNET_a2s (rc->src_addr,
1442                    (rc->src_addr->sa_family ==
1443                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1444                                                                      sockaddr_in6)),
1445        delay);
1446   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
1447   udpw->cont = NULL;
1448   udpw->cont_cls = NULL;
1449   udpw->frag_ctx = NULL;
1450   udpw->msg_size = msize;
1451   udpw->session = s;
1452   udpw->timeout = GNUNET_TIME_absolute_get_forever();
1453   udpw->udp = (char *)&udpw[1];
1454
1455   udp_ack = (struct UDP_ACK_Message *) udpw->udp;
1456   udp_ack->header.size = htons ((uint16_t) msize);
1457   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1458   udp_ack->delay = htonl (delay);
1459   udp_ack->sender = *rc->plugin->env->my_identity;
1460   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1461
1462   enqueue (rc->plugin, udpw);
1463 }
1464
1465
1466 static void read_process_msg (struct Plugin *plugin,
1467     const struct GNUNET_MessageHeader *msg,
1468     char *addr,
1469     socklen_t fromlen)
1470 {
1471   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1472   {
1473     GNUNET_break_op (0);
1474     return;
1475   }
1476   process_udp_message (plugin, (const struct UDPMessage *) msg,
1477                        (const struct sockaddr *) addr, fromlen);
1478   return;
1479 }
1480
1481 static void read_process_ack (struct Plugin *plugin,
1482     const struct GNUNET_MessageHeader *msg,
1483     char *addr,
1484     socklen_t fromlen)
1485 {
1486   const struct GNUNET_MessageHeader *ack;
1487   const struct UDP_ACK_Message *udp_ack;
1488   struct LookupContext l_ctx;
1489   struct Session *s = NULL;
1490   struct GNUNET_TIME_Relative flow_delay;
1491
1492   if (ntohs (msg->size) <
1493       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1494   {
1495     GNUNET_break_op (0);
1496     return;
1497   }
1498
1499   udp_ack = (const struct UDP_ACK_Message *) msg;
1500
1501   l_ctx.addr = (const struct sockaddr *) addr;
1502   l_ctx.addrlen = fromlen;
1503   l_ctx.res = NULL;
1504   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1505       &lookup_session_by_addr_it,
1506       &l_ctx);
1507   s = l_ctx.res;
1508
1509   if ((s == NULL) || (s->frag_ctx == NULL))
1510     return;
1511
1512   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1513   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1514        flow_delay.rel_value);
1515   s->flow_delay_from_other_peer =
1516       GNUNET_TIME_relative_to_absolute (flow_delay);
1517
1518   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1519   if (ntohs (ack->size) !=
1520       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1521   {
1522     GNUNET_break_op (0);
1523     return;
1524   }
1525
1526   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1527   {
1528   LOG (GNUNET_ERROR_TYPE_DEBUG,
1529        "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1530        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1531        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1532     return;
1533   }
1534
1535   LOG (GNUNET_ERROR_TYPE_DEBUG,
1536        "FULL MESSAGE ACKed\n",
1537        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1538        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1539   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1540
1541   struct UDPMessageWrapper * udpw = NULL;
1542   struct UDPMessageWrapper * tmp = NULL;
1543   if (s->addrlen == sizeof (struct sockaddr_in6))
1544   {
1545     udpw = plugin->ipv6_queue_head;
1546     while (udpw!= NULL)
1547     {
1548       tmp = udpw->next;
1549       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1550       {
1551         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1552         GNUNET_free (udpw);
1553       }
1554       udpw = tmp;
1555     }
1556   }
1557   if (s->addrlen == sizeof (struct sockaddr_in))
1558   {
1559     udpw = plugin->ipv4_queue_head;
1560     while (udpw!= NULL)
1561     {
1562       tmp = udpw->next;
1563       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1564       {
1565         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1566         GNUNET_free (udpw);
1567       }
1568       udpw = tmp;
1569     }
1570   }
1571
1572   if (s->frag_ctx->cont != NULL)
1573     s->frag_ctx->cont
1574     (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
1575   GNUNET_free (s->frag_ctx);
1576   s->frag_ctx = NULL;
1577   return;
1578 }
1579
1580 static void read_process_fragment (struct Plugin *plugin,
1581     const struct GNUNET_MessageHeader *msg,
1582     char *addr,
1583     socklen_t fromlen)
1584 {
1585   struct DefragContext *d_ctx;
1586   struct GNUNET_TIME_Absolute now;
1587   struct FindReceiveContext frc;
1588
1589
1590   frc.rc = NULL;
1591   frc.addr = (const struct sockaddr *) addr;
1592   frc.addr_len = fromlen;
1593
1594   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1595        (unsigned int) ntohs (msg->size),
1596        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1597   /* Lookup existing receive context for this address */
1598   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1599                                  &find_receive_context,
1600                                  &frc);
1601   now = GNUNET_TIME_absolute_get ();
1602   d_ctx = frc.rc;
1603
1604   if (d_ctx == NULL)
1605   {
1606     /* Create a new defragmentation context */
1607     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1608     memcpy (&d_ctx[1], addr, fromlen);
1609     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1610     d_ctx->addr_len = fromlen;
1611     d_ctx->plugin = plugin;
1612     d_ctx->defrag =
1613         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1614                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1615                                           &fragment_msg_proc, &ack_proc);
1616     d_ctx->hnode =
1617         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1618                                       (GNUNET_CONTAINER_HeapCostType)
1619                                       now.abs_value);
1620   LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n",
1621        (unsigned int) ntohs (msg->size),
1622        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1623   }
1624   else
1625   {
1626   LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1627        (unsigned int) ntohs (msg->size),
1628        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1629   }
1630
1631   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1632   {
1633     /* keep this 'rc' from expiring */
1634     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1635                                        (GNUNET_CONTAINER_HeapCostType)
1636                                        now.abs_value);
1637   }
1638   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1639       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1640   {
1641     /* remove 'rc' that was inactive the longest */
1642     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1643     GNUNET_assert (NULL != d_ctx);
1644     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1645     GNUNET_free (d_ctx);
1646   }
1647 }
1648
1649 /**
1650  * Read and process a message from the given socket.
1651  *
1652  * @param plugin the overall plugin
1653  * @param rsock socket to read from
1654  */
1655 static void
1656 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1657 {
1658   socklen_t fromlen;
1659   char addr[32];
1660   char buf[65536] GNUNET_ALIGN;
1661   ssize_t size;
1662   const struct GNUNET_MessageHeader *msg;
1663
1664   fromlen = sizeof (addr);
1665   memset (&addr, 0, sizeof (addr));
1666   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1667                                       (struct sockaddr *) &addr, &fromlen);
1668
1669   if (size < sizeof (struct GNUNET_MessageHeader))
1670   {
1671     GNUNET_break_op (0);
1672     return;
1673   }
1674   msg = (const struct GNUNET_MessageHeader *) buf;
1675
1676   LOG (GNUNET_ERROR_TYPE_DEBUG,
1677        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1678        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1679
1680   if (size != ntohs (msg->size))
1681   {
1682     GNUNET_break_op (0);
1683     return;
1684   }
1685
1686   switch (ntohs (msg->type))
1687   {
1688   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1689     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1690     return;
1691
1692   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1693     read_process_msg (plugin, msg, addr, fromlen);
1694     return;
1695
1696   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1697     read_process_ack (plugin, msg, addr, fromlen);
1698     return;
1699
1700   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1701     read_process_fragment (plugin, msg, addr, fromlen);
1702     return;
1703
1704   default:
1705     GNUNET_break_op (0);
1706     return;
1707   }
1708 }
1709
1710
1711 static size_t
1712 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
1713 {
1714   ssize_t sent;
1715   size_t slen;
1716   struct GNUNET_TIME_Absolute max;
1717   struct GNUNET_TIME_Absolute ;
1718
1719   struct UDPMessageWrapper *udpw = NULL;
1720
1721   if (sock == plugin->sockv4)
1722   {
1723     udpw = plugin->ipv4_queue_head;
1724   }
1725   else if (sock == plugin->sockv6)
1726   {
1727     udpw = plugin->ipv6_queue_head;
1728   }
1729   else
1730   {
1731     GNUNET_break (0);
1732     return 0;
1733   }
1734
1735   const struct sockaddr * sa = udpw->session->sock_addr;
1736   slen = udpw->session->addrlen;
1737
1738   max = GNUNET_TIME_absolute_max(udpw->timeout, GNUNET_TIME_absolute_get());
1739
1740   while (udpw != NULL)
1741   {
1742     if (max.abs_value != udpw->timeout.abs_value)
1743     {
1744       /* Message timed out */
1745
1746       if (udpw->cont != NULL)
1747         udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
1748       if (udpw->frag_ctx != NULL)
1749       {
1750         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n",
1751             GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
1752         udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
1753         GNUNET_free (udpw->frag_ctx);
1754         udpw->session->frag_ctx = NULL;
1755       }
1756       else
1757       {
1758         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' with size %u timed out\n",
1759             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1760       }
1761
1762       if (sock == plugin->sockv4)
1763       {
1764         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1765         GNUNET_free (udpw);
1766         udpw = plugin->ipv4_queue_head;
1767       }
1768       else if (sock == plugin->sockv6)
1769       {
1770         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1771         GNUNET_free (udpw);
1772         udpw = plugin->ipv6_queue_head;
1773       }
1774     }
1775     else
1776     {
1777       struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
1778       if (delta.rel_value == 0)
1779       {
1780         /* this message is not delayed */
1781         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is not delayed \n",
1782             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1783         break;
1784       }
1785       else
1786       {
1787         /* this message is delayed, try next */
1788         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is delayed for %llu \n",
1789             GNUNET_i2s(&udpw->session->target), udpw->msg_size,
1790             delta);
1791         udpw = udpw->next;
1792       }
1793     }
1794   }
1795
1796   if (udpw == NULL)
1797   {
1798     /* No message left */
1799     return 0;
1800   }
1801
1802   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->udp, udpw->msg_size, sa, slen);
1803
1804   if (GNUNET_SYSERR == sent)
1805   {
1806     LOG (GNUNET_ERROR_TYPE_ERROR,
1807          "UDP could not transmit %u-byte message to `%s': `%s'\n",
1808          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen),
1809          STRERROR (errno));
1810     if (udpw->cont != NULL)
1811       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
1812   }
1813   else
1814   {
1815     LOG (GNUNET_ERROR_TYPE_DEBUG,
1816          "UDP transmitted %u-byte message to `%s' (%d: %s)\n",
1817          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1818          (sent < 0) ? STRERROR (errno) : "ok");
1819   }
1820   /* This was just a message fragment */
1821   if (udpw->frag_ctx != NULL)
1822   {
1823     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1824   }
1825   /* This was a complete message*/
1826   else
1827   {
1828     if (udpw->cont != NULL)
1829       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK);
1830   }
1831
1832   if (sock == plugin->sockv4)
1833     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1834   else if (sock == plugin->sockv6)
1835     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1836   GNUNET_free (udpw);
1837   udpw = NULL;
1838
1839   return sent;
1840 }
1841
1842 /**
1843  * We have been notified that our readset has something to read.  We don't
1844  * know which socket needs to be read, so we have to check each one
1845  * Then reschedule this function to be called again once more is available.
1846  *
1847  * @param cls the plugin handle
1848  * @param tc the scheduling context (for rescheduling this function again)
1849  */
1850 static void
1851 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1852 {
1853   struct Plugin *plugin = cls;
1854
1855   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
1856   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1857     return;
1858   plugin->with_v4_ws = GNUNET_NO;
1859
1860   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1861   {
1862     if ((NULL != plugin->sockv4) &&
1863       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
1864         udp_select_read (plugin, plugin->sockv4);
1865
1866   }
1867
1868   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1869   {
1870     if ((NULL != plugin->sockv4) && (plugin->ipv4_queue_head != NULL) &&
1871       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
1872       {
1873         udp_select_send (plugin, plugin->sockv4);
1874       }
1875   }
1876
1877   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1878     GNUNET_SCHEDULER_cancel (plugin->select_task);
1879   plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1880                                    GNUNET_TIME_UNIT_FOREVER_REL,
1881                                    plugin->rs_v4,
1882                                    (plugin->ipv4_queue_head != NULL) ? plugin->ws_v4 : NULL,
1883                                    &udp_plugin_select, plugin);
1884   if (plugin->ipv4_queue_head != NULL)
1885     plugin->with_v4_ws = GNUNET_YES;
1886   else
1887     plugin->with_v4_ws = GNUNET_NO;
1888 }
1889
1890
1891 /**
1892  * We have been notified that our readset has something to read.  We don't
1893  * know which socket needs to be read, so we have to check each one
1894  * Then reschedule this function to be called again once more is available.
1895  *
1896  * @param cls the plugin handle
1897  * @param tc the scheduling context (for rescheduling this function again)
1898  */
1899 static void
1900 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1901 {
1902   struct Plugin *plugin = cls;
1903
1904   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
1905   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1906     return;
1907
1908   plugin->with_v6_ws = GNUNET_NO;
1909   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1910   {
1911     if ((NULL != plugin->sockv6) &&
1912       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
1913         udp_select_read (plugin, plugin->sockv6);
1914   }
1915
1916   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1917   {
1918     if ((NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
1919       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)))
1920       {
1921         udp_select_send (plugin, plugin->sockv6);
1922       }
1923   }
1924   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1925     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
1926   plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1927                                    GNUNET_TIME_UNIT_FOREVER_REL,
1928                                    plugin->rs_v6,
1929                                    (plugin->ipv6_queue_head != NULL) ? plugin->ws_v6 : NULL,
1930                                    &udp_plugin_select_v6, plugin);
1931   if (plugin->ipv6_queue_head != NULL)
1932     plugin->with_v6_ws = GNUNET_YES;
1933   else
1934     plugin->with_v6_ws = GNUNET_NO;
1935 }
1936
1937
1938 static int
1939 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
1940 {
1941   int tries;
1942   int sockets_created = 0;
1943   struct sockaddr *serverAddr;
1944   struct sockaddr *addrs[2];
1945   socklen_t addrlens[2];
1946   socklen_t addrlen;
1947
1948   /* Create IPv6 socket */
1949   if (plugin->enable_ipv6 == GNUNET_YES)
1950   {
1951     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
1952     if (NULL == plugin->sockv6)
1953     {
1954       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
1955       plugin->enable_ipv6 = GNUNET_NO;
1956     }
1957     else
1958     {
1959 #if HAVE_SOCKADDR_IN_SIN_LEN
1960       serverAddrv6->sin6_len = sizeof (serverAddrv6);
1961 #endif
1962       serverAddrv6->sin6_family = AF_INET6;
1963       serverAddrv6->sin6_addr = in6addr_any;
1964       serverAddrv6->sin6_port = htons (plugin->port);
1965       addrlen = sizeof (struct sockaddr_in6);
1966       serverAddr = (struct sockaddr *) serverAddrv6;
1967       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
1968            ntohs (serverAddrv6->sin6_port));
1969       tries = 0;
1970       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
1971              GNUNET_OK)
1972       {
1973         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
1974         LOG (GNUNET_ERROR_TYPE_DEBUG,
1975              "IPv6 Binding failed, trying new port %d\n",
1976              ntohs (serverAddrv6->sin6_port));
1977         tries++;
1978         if (tries > 10)
1979         {
1980           GNUNET_NETWORK_socket_close (plugin->sockv6);
1981           plugin->sockv6 = NULL;
1982           break;
1983         }
1984       }
1985       if (plugin->sockv6 != NULL)
1986       {
1987         LOG (GNUNET_ERROR_TYPE_DEBUG,
1988              "IPv6 socket created on port %d\n",
1989              ntohs (serverAddrv6->sin6_port));
1990         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
1991         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
1992         sockets_created++;
1993       }
1994     }
1995   }
1996
1997   /* Create IPv4 socket */
1998   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
1999   if (NULL == plugin->sockv4)
2000   {
2001     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2002   }
2003   else
2004   {
2005 #if HAVE_SOCKADDR_IN_SIN_LEN
2006     serverAddrv4->sin_len = sizeof (serverAddrv4);
2007 #endif
2008     serverAddrv4->sin_family = AF_INET;
2009     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2010     serverAddrv4->sin_port = htons (plugin->port);
2011     addrlen = sizeof (struct sockaddr_in);
2012     serverAddr = (struct sockaddr *) serverAddrv4;
2013
2014     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2015          ntohs (serverAddrv4->sin_port));
2016     tries = 0;
2017     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2018            GNUNET_OK)
2019     {
2020       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2021       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2022            ntohs (serverAddrv4->sin_port));
2023       tries++;
2024       if (tries > 10)
2025       {
2026         GNUNET_NETWORK_socket_close (plugin->sockv4);
2027         plugin->sockv4 = NULL;
2028         break;
2029       }
2030     }
2031     if (plugin->sockv4 != NULL)
2032     {
2033       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2034       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2035       sockets_created++;
2036     }
2037   }
2038
2039   /* Create file descriptors */
2040   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2041   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2042   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2043   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2044   if (NULL != plugin->sockv4)
2045   {
2046     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2047     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2048   }
2049
2050   if (sockets_created == 0)
2051     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2052
2053   plugin->select_task =
2054       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2055                                    GNUNET_TIME_UNIT_FOREVER_REL,
2056                                    plugin->rs_v4,
2057                                    NULL,
2058                                    &udp_plugin_select, plugin);
2059   plugin->with_v4_ws = GNUNET_NO;
2060
2061   if (plugin->enable_ipv6 == GNUNET_YES)
2062   {
2063     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2064     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2065     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2066     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2067     if (NULL != plugin->sockv6)
2068     {
2069       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2070       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2071     }
2072
2073     plugin->select_task_v6 =
2074         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2075                                      GNUNET_TIME_UNIT_FOREVER_REL,
2076                                      plugin->rs_v6,
2077                                      NULL,
2078                                      &udp_plugin_select_v6, plugin);
2079     plugin->with_v6_ws = GNUNET_NO;
2080   }
2081
2082   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2083                            GNUNET_NO, plugin->port,
2084                            sockets_created,
2085                            (const struct sockaddr **) addrs, addrlens,
2086                            &udp_nat_port_map_callback, NULL, plugin);
2087
2088   return sockets_created;
2089 }
2090
2091
2092 /**
2093  * The exported method. Makes the core api available via a global and
2094  * returns the udp transport API.
2095  *
2096  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2097  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2098  */
2099 void *
2100 libgnunet_plugin_transport_udp_init (void *cls)
2101 {
2102   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2103   struct GNUNET_TRANSPORT_PluginFunctions *api;
2104   struct Plugin *plugin;
2105
2106   unsigned long long port;
2107   unsigned long long aport;
2108   unsigned long long broadcast;
2109   unsigned long long udp_max_bps;
2110   unsigned long long enable_v6;
2111   char * bind4_address;
2112   char * bind6_address;
2113   struct GNUNET_TIME_Relative interval;
2114
2115   struct sockaddr_in serverAddrv4;
2116   struct sockaddr_in6 serverAddrv6;
2117
2118   int res;
2119
2120   if (NULL == env->receive)
2121   {
2122     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2123        initialze the plugin or the API */
2124     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2125     api->cls = NULL;
2126     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2127     api->address_to_string = &udp_address_to_string;
2128     api->string_to_address = &udp_string_to_address;
2129     return api;
2130   }
2131
2132   GNUNET_assert( NULL != env->stats);
2133
2134   /* Get port number */
2135   if (GNUNET_OK !=
2136       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2137                                              &port))
2138     port = 2086;
2139   if (GNUNET_OK !=
2140       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2141                                              "ADVERTISED_PORT", &aport))
2142     aport = port;
2143   if (port > 65535)
2144   {
2145     LOG (GNUNET_ERROR_TYPE_WARNING,
2146          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2147          65535);
2148     return NULL;
2149   }
2150
2151   /* Protocols */
2152   if ((GNUNET_YES ==
2153        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2154                                              "DISABLEV6")))
2155   {
2156     enable_v6 = GNUNET_NO;
2157   }
2158   else
2159     enable_v6 = GNUNET_YES;
2160
2161
2162   /* Addresses */
2163   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2164   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2165
2166   if (GNUNET_YES ==
2167       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2168                                              "BINDTO", &bind4_address))
2169   {
2170     LOG (GNUNET_ERROR_TYPE_DEBUG,
2171          "Binding udp plugin to specific address: `%s'\n",
2172          bind4_address);
2173     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2174     {
2175       GNUNET_free (bind4_address);
2176       return NULL;
2177     }
2178   }
2179
2180   if (GNUNET_YES ==
2181       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2182                                              "BINDTO6", &bind6_address))
2183   {
2184     LOG (GNUNET_ERROR_TYPE_DEBUG,
2185          "Binding udp plugin to specific address: `%s'\n",
2186          bind6_address);
2187     if (1 !=
2188         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2189     {
2190       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2191            bind6_address);
2192       GNUNET_free_non_null (bind4_address);
2193       GNUNET_free (bind6_address);
2194       return NULL;
2195     }
2196   }
2197
2198
2199   /* Enable neighbour discovery */
2200   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2201                                             "BROADCAST");
2202   if (broadcast == GNUNET_SYSERR)
2203     broadcast = GNUNET_NO;
2204
2205   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-udp",
2206                                            "BROADCAST_INTERVAL", &interval))
2207   {
2208     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2209   }
2210
2211   /* Maximum datarate */
2212   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2213                                              "MAX_BPS", &udp_max_bps))
2214   {
2215     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2216   }
2217
2218   plugin = GNUNET_malloc (sizeof (struct Plugin));
2219   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2220
2221   GNUNET_BANDWIDTH_tracker_init (&plugin->tracker,
2222                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2223
2224
2225   plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
2226   plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2227   plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
2228   plugin->port = port;
2229   plugin->aport = aport;
2230   plugin->broadcast_interval = interval;
2231   plugin->enable_ipv6 = enable_v6;
2232   plugin->env = env;
2233
2234   api->cls = plugin;
2235   api->send = NULL;
2236   api->disconnect = &udp_disconnect;
2237   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2238   api->address_to_string = &udp_address_to_string;
2239   api->string_to_address = &udp_string_to_address;
2240   api->check_address = &udp_plugin_check_address;
2241   api->get_session = &udp_plugin_get_session;
2242   api->send = &udp_plugin_send;
2243
2244   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2245   res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
2246   if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
2247   {
2248     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2249     GNUNET_free (plugin);
2250     GNUNET_free (api);
2251     return NULL;
2252   }
2253
2254   if (broadcast == GNUNET_YES)
2255   {
2256     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2257     setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
2258   }
2259
2260   GNUNET_free_non_null (bind4_address);
2261   GNUNET_free_non_null (bind6_address);
2262   return api;
2263 }
2264
2265
2266 static int
2267 heap_cleanup_iterator (void *cls,
2268                        struct GNUNET_CONTAINER_HeapNode *
2269                        node, void *element,
2270                        GNUNET_CONTAINER_HeapCostType
2271                        cost)
2272 {
2273   struct DefragContext * d_ctx = element;
2274
2275   GNUNET_CONTAINER_heap_remove_node (node);
2276   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2277   GNUNET_free (d_ctx);
2278
2279   return GNUNET_YES;
2280 }
2281
2282
2283 /**
2284  * The exported method. Makes the core api available via a global and
2285  * returns the udp transport API.
2286  *
2287  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2288  * @return NULL
2289  */
2290 void *
2291 libgnunet_plugin_transport_udp_done (void *cls)
2292 {
2293   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2294   struct Plugin *plugin = api->cls;
2295
2296   if (NULL == plugin)
2297   {
2298     GNUNET_free (api);
2299     return NULL;
2300   }
2301
2302   stop_broadcast (plugin);
2303
2304   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2305   {
2306     GNUNET_SCHEDULER_cancel (plugin->select_task);
2307     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2308   }
2309   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2310   {
2311     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2312     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2313   }
2314
2315   /* Closing sockets */
2316   if (plugin->sockv4 != NULL)
2317   {
2318     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2319     plugin->sockv4 = NULL;
2320   }
2321   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2322   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2323
2324   if (plugin->sockv6 != NULL)
2325   {
2326     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2327     plugin->sockv6 = NULL;
2328
2329     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2330     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2331   }
2332
2333   GNUNET_NAT_unregister (plugin->nat);
2334
2335   if (plugin->defrag_ctxs != NULL)
2336   {
2337     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2338         heap_cleanup_iterator, NULL);
2339     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2340     plugin->defrag_ctxs = NULL;
2341   }
2342   if (plugin->mst != NULL)
2343   {
2344     GNUNET_SERVER_mst_destroy(plugin->mst);
2345     plugin->mst = NULL;
2346   }
2347
2348   /* Clean up leftover messages */
2349   struct UDPMessageWrapper * udpw;
2350   udpw = plugin->ipv4_queue_head;
2351   while (udpw != NULL)
2352   {
2353     struct UDPMessageWrapper *tmp = udpw->next;
2354     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
2355     if (udpw->cont != NULL)
2356       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
2357     GNUNET_free (udpw);
2358     udpw = tmp;
2359   }
2360   udpw = plugin->ipv6_queue_head;
2361   while (udpw != NULL)
2362   {
2363     struct UDPMessageWrapper *tmp = udpw->next;
2364     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
2365     if (udpw->cont != NULL)
2366       udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR);
2367     GNUNET_free (udpw);
2368     udpw = tmp;
2369   }
2370
2371   /* Clean up sessions */
2372   LOG (GNUNET_ERROR_TYPE_DEBUG,
2373        "Cleaning up sessions\n");
2374   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2375   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2376
2377   plugin->nat = NULL;
2378   GNUNET_free (plugin);
2379   GNUNET_free (api);
2380   return NULL;
2381 }
2382
2383
2384 /* end of plugin_transport_udp.c */