5a12578451affb558e218aee4d9e4a41c35831af
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 struct Session
90 {
91   /**
92    * Which peer is this session for?
93    */
94   struct GNUNET_PeerIdentity target;
95
96   struct FragmentationContext * frag_ctx;
97
98   /**
99    * Address of the other peer
100    */
101   const struct sockaddr *sock_addr;
102
103   /**
104    * Desired delay for next sending we send to other peer
105    */
106   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
107
108   /**
109    * Desired delay for next sending we received from other peer
110    */
111   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
112
113   /**
114    * Session timeout task
115    */
116   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
117
118   /**
119    * expected delay for ACKs
120    */
121   struct GNUNET_TIME_Relative last_expected_delay;
122
123   struct GNUNET_ATS_Information ats;
124
125   size_t addrlen;
126
127
128   unsigned int rc;
129
130   int in_destroy;
131 };
132
133
134 struct SessionCompareContext
135 {
136   struct Session *res;
137   const struct GNUNET_HELLO_Address *addr;
138 };
139
140
141 /**
142  * Closure for 'process_inbound_tokenized_messages'
143  */
144 struct SourceInformation
145 {
146   /**
147    * Sender identity.
148    */
149   struct GNUNET_PeerIdentity sender;
150
151   /**
152    * Source address.
153    */
154   const void *arg;
155
156   struct Session *session;
157   /**
158    * Number of bytes in source address.
159    */
160   size_t args;
161
162 };
163
164
165 /**
166  * Closure for 'find_receive_context'.
167  */
168 struct FindReceiveContext
169 {
170   /**
171    * Where to store the result.
172    */
173   struct DefragContext *rc;
174
175   /**
176    * Address to find.
177    */
178   const struct sockaddr *addr;
179
180   struct Session *session;
181
182   /**
183    * Number of bytes in 'addr'.
184    */
185   socklen_t addr_len;
186
187 };
188
189
190
191 /**
192  * Data structure to track defragmentation contexts based
193  * on the source of the UDP traffic.
194  */
195 struct DefragContext
196 {
197
198   /**
199    * Defragmentation context.
200    */
201   struct GNUNET_DEFRAGMENT_Context *defrag;
202
203   /**
204    * Source address this receive context is for (allocated at the
205    * end of the struct).
206    */
207   const struct sockaddr *src_addr;
208
209   /**
210    * Reference to master plugin struct.
211    */
212   struct Plugin *plugin;
213
214   /**
215    * Node in the defrag heap.
216    */
217   struct GNUNET_CONTAINER_HeapNode *hnode;
218
219   /**
220    * Length of 'src_addr'
221    */
222   size_t addr_len;
223 };
224
225
226
227 /**
228  * Closure for 'process_inbound_tokenized_messages'
229  */
230 struct FragmentationContext
231 {
232   struct FragmentationContext * next;
233   struct FragmentationContext * prev;
234
235   struct Plugin * plugin;
236   struct GNUNET_FRAGMENT_Context * frag;
237   struct Session * session;
238
239   /**
240    * Function to call upon completion of the transmission.
241    */
242   GNUNET_TRANSPORT_TransmitContinuation cont;
243
244   /**
245    * Closure for 'cont'.
246    */
247   void *cont_cls;
248
249   struct GNUNET_TIME_Absolute timeout;
250
251   size_t bytes_to_send;
252 };
253
254
255 struct UDPMessageWrapper
256 {
257   struct Session *session;
258   struct UDPMessageWrapper *prev;
259   struct UDPMessageWrapper *next;
260   char *udp;
261
262   /**
263    * Function to call upon completion of the transmission.
264    */
265   GNUNET_TRANSPORT_TransmitContinuation cont;
266
267   /**
268    * Closure for 'cont'.
269    */
270   void *cont_cls;
271
272   struct FragmentationContext *frag_ctx;
273
274   size_t msg_size;
275
276   struct GNUNET_TIME_Absolute timeout;
277 };
278
279
280 /**
281  * UDP ACK Message-Packet header (after defragmentation).
282  */
283 struct UDP_ACK_Message
284 {
285   /**
286    * Message header.
287    */
288   struct GNUNET_MessageHeader header;
289
290   /**
291    * Desired delay for flow control
292    */
293   uint32_t delay;
294
295   /**
296    * What is the identity of the sender
297    */
298   struct GNUNET_PeerIdentity sender;
299
300 };
301
302 /**
303  * Encapsulation of all of the state of the plugin.
304  */
305 struct Plugin * plugin;
306
307
308 /**
309  * We have been notified that our readset has something to read.  We don't
310  * know which socket needs to be read, so we have to check each one
311  * Then reschedule this function to be called again once more is available.
312  *
313  * @param cls the plugin handle
314  * @param tc the scheduling context (for rescheduling this function again)
315  */
316 static void
317 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
318
319
320 /**
321  * We have been notified that our readset has something to read.  We don't
322  * know which socket needs to be read, so we have to check each one
323  * Then reschedule this function to be called again once more is available.
324  *
325  * @param cls the plugin handle
326  * @param tc the scheduling context (for rescheduling this function again)
327  */
328 static void
329 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
330
331
332 /**
333  * Start session timeout
334  */
335 static void
336 start_session_timeout (struct Session *s);
337
338 /**
339  * Increment session timeout due to activity
340  */
341 static void
342 reschedule_session_timeout (struct Session *s);
343
344 /**
345  * Cancel timeout
346  */
347 static void
348 stop_session_timeout (struct Session *s);
349
350
351
352 /**
353  * Function called for a quick conversion of the binary address to
354  * a numeric address.  Note that the caller must not free the
355  * address and that the next call to this function is allowed
356  * to override the address again.
357  *
358  * @param cls closure
359  * @param addr binary address
360  * @param addrlen length of the address
361  * @return string representing the same address
362  */
363 const char *
364 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
365 {
366   static char rbuf[INET6_ADDRSTRLEN + 10];
367   char buf[INET6_ADDRSTRLEN];
368   const void *sb;
369   struct in_addr a4;
370   struct in6_addr a6;
371   const struct IPv4UdpAddress *t4;
372   const struct IPv6UdpAddress *t6;
373   int af;
374   uint16_t port;
375
376   if (addrlen == sizeof (struct IPv6UdpAddress))
377   {
378     t6 = addr;
379     af = AF_INET6;
380     port = ntohs (t6->u6_port);
381     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
382     sb = &a6;
383   }
384   else if (addrlen == sizeof (struct IPv4UdpAddress))
385   {
386     t4 = addr;
387     af = AF_INET;
388     port = ntohs (t4->u4_port);
389     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
390     sb = &a4;
391   }
392   else
393   {
394     GNUNET_break_op (0);
395     return NULL;
396   }
397   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
398   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
399                    buf, port);
400   return rbuf;
401 }
402
403
404 /**
405  * Function called to convert a string address to
406  * a binary address.
407  *
408  * @param cls closure ('struct Plugin*')
409  * @param addr string address
410  * @param addrlen length of the address
411  * @param buf location to store the buffer
412  * @param added location to store the number of bytes in the buffer.
413  *        If the function returns GNUNET_SYSERR, its contents are undefined.
414  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
415  */
416 static int
417 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
418     void **buf, size_t *added)
419 {
420   struct sockaddr_storage socket_address;
421   
422   if ((NULL == addr) || (0 == addrlen))
423   {
424     GNUNET_break (0);
425     return GNUNET_SYSERR;
426   }
427
428   if ('\0' != addr[addrlen - 1])
429   {
430     return GNUNET_SYSERR;
431   }
432
433   if (strlen (addr) != addrlen - 1)
434   {
435     return GNUNET_SYSERR;
436   }
437
438   if (GNUNET_OK != GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
439                                                  &socket_address))
440   {
441     return GNUNET_SYSERR;
442   }
443
444   switch (socket_address.ss_family)
445   {
446   case AF_INET:
447     {
448       struct IPv4UdpAddress *u4;
449       struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
450       u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
451       u4->ipv4_addr = in4->sin_addr.s_addr;
452       u4->u4_port = in4->sin_port;
453       *buf = u4;
454       *added = sizeof (struct IPv4UdpAddress);
455       return GNUNET_OK;
456     }
457   case AF_INET6:
458     {
459       struct IPv6UdpAddress *u6;
460       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
461       u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
462       u6->ipv6_addr = in6->sin6_addr;
463       u6->u6_port = in6->sin6_port;
464       *buf = u6;
465       *added = sizeof (struct IPv6UdpAddress);
466       return GNUNET_OK;
467     }
468   default:
469     GNUNET_break (0);
470     return GNUNET_SYSERR;
471   }
472 }
473
474
475 /**
476  * Append our port and forward the result.
477  *
478  * @param cls a 'struct PrettyPrinterContext'
479  * @param hostname result from DNS resolver
480  */
481 static void
482 append_port (void *cls, const char *hostname)
483 {
484   struct PrettyPrinterContext *ppc = cls;
485   char *ret;
486
487   if (hostname == NULL)
488   {
489     ppc->asc (ppc->asc_cls, NULL);
490     GNUNET_free (ppc);
491     return;
492   }
493   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
494   ppc->asc (ppc->asc_cls, ret);
495   GNUNET_free (ret);
496 }
497
498
499 /**
500  * Convert the transports address to a nice, human-readable
501  * format.
502  *
503  * @param cls closure
504  * @param type name of the transport that generated the address
505  * @param addr one of the addresses of the host, NULL for the last address
506  *        the specific address format depends on the transport
507  * @param addrlen length of the address
508  * @param numeric should (IP) addresses be displayed in numeric form?
509  * @param timeout after how long should we give up?
510  * @param asc function to call on each string
511  * @param asc_cls closure for asc
512  */
513 static void
514 udp_plugin_address_pretty_printer (void *cls, const char *type,
515                                    const void *addr, size_t addrlen,
516                                    int numeric,
517                                    struct GNUNET_TIME_Relative timeout,
518                                    GNUNET_TRANSPORT_AddressStringCallback asc,
519                                    void *asc_cls)
520 {
521   struct PrettyPrinterContext *ppc;
522   const void *sb;
523   size_t sbs;
524   struct sockaddr_in a4;
525   struct sockaddr_in6 a6;
526   const struct IPv4UdpAddress *u4;
527   const struct IPv6UdpAddress *u6;
528   uint16_t port;
529
530   if (addrlen == sizeof (struct IPv6UdpAddress))
531   {
532     u6 = addr;
533     memset (&a6, 0, sizeof (a6));
534     a6.sin6_family = AF_INET6;
535 #if HAVE_SOCKADDR_IN_SIN_LEN
536     a6.sin6_len = sizeof (a6);
537 #endif
538     a6.sin6_port = u6->u6_port;
539     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
540     port = ntohs (u6->u6_port);
541     sb = &a6;
542     sbs = sizeof (a6);
543   }
544   else if (addrlen == sizeof (struct IPv4UdpAddress))
545   {
546     u4 = addr;
547     memset (&a4, 0, sizeof (a4));
548     a4.sin_family = AF_INET;
549 #if HAVE_SOCKADDR_IN_SIN_LEN
550     a4.sin_len = sizeof (a4);
551 #endif
552     a4.sin_port = u4->u4_port;
553     a4.sin_addr.s_addr = u4->ipv4_addr;
554     port = ntohs (u4->u4_port);
555     sb = &a4;
556     sbs = sizeof (a4);
557   }
558   else if (0 == addrlen)
559   {
560     asc (asc_cls, "<inbound connection>");
561     asc (asc_cls, NULL);
562     return;
563   }
564   else
565   {
566     /* invalid address */
567     GNUNET_break_op (0);
568     asc (asc_cls, NULL);
569     return;
570   }
571   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
572   ppc->asc = asc;
573   ppc->asc_cls = asc_cls;
574   ppc->port = port;
575   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
576 }
577
578
579 static void
580 call_continuation (struct UDPMessageWrapper *udpw, int result)
581 {
582   LOG (GNUNET_ERROR_TYPE_DEBUG,
583       "Calling continuation for %u byte message to `%s' with result %s\n",
584       udpw->msg_size, GNUNET_i2s (&udpw->session->target),
585       (GNUNET_OK == result) ? "OK" : "SYSERR");
586   if (NULL != udpw->cont)
587   {
588     udpw->cont (udpw->cont_cls, &udpw->session->target,result);
589   }
590
591 }
592
593
594 /**
595  * Check if the given port is plausible (must be either our listen
596  * port or our advertised port).  If it is neither, we return
597  * GNUNET_SYSERR.
598  *
599  * @param plugin global variables
600  * @param in_port port number to check
601  * @return GNUNET_OK if port is either open_port or adv_port
602  */
603 static int
604 check_port (struct Plugin *plugin, uint16_t in_port)
605 {
606   if ((in_port == plugin->port) || (in_port == plugin->aport))
607     return GNUNET_OK;
608   return GNUNET_SYSERR;
609 }
610
611
612 /**
613  * Function that will be called to check if a binary address for this
614  * plugin is well-formed and corresponds to an address for THIS peer
615  * (as per our configuration).  Naturally, if absolutely necessary,
616  * plugins can be a bit conservative in their answer, but in general
617  * plugins should make sure that the address does not redirect
618  * traffic to a 3rd party that might try to man-in-the-middle our
619  * traffic.
620  *
621  * @param cls closure, should be our handle to the Plugin
622  * @param addr pointer to the address
623  * @param addrlen length of addr
624  * @return GNUNET_OK if this is a plausible address for this peer
625  *         and transport, GNUNET_SYSERR if not
626  *
627  */
628 static int
629 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
630 {
631   struct Plugin *plugin = cls;
632   struct IPv4UdpAddress *v4;
633   struct IPv6UdpAddress *v6;
634
635   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
636       (addrlen != sizeof (struct IPv6UdpAddress)))
637   {
638     GNUNET_break_op (0);
639     return GNUNET_SYSERR;
640   }
641   if (addrlen == sizeof (struct IPv4UdpAddress))
642   {
643     v4 = (struct IPv4UdpAddress *) addr;
644     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
645       return GNUNET_SYSERR;
646     if (GNUNET_OK !=
647         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
648                                  sizeof (struct in_addr)))
649       return GNUNET_SYSERR;
650   }
651   else
652   {
653     v6 = (struct IPv6UdpAddress *) addr;
654     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
655     {
656       GNUNET_break_op (0);
657       return GNUNET_SYSERR;
658     }
659     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
660       return GNUNET_SYSERR;
661     if (GNUNET_OK !=
662         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
663                                  sizeof (struct in6_addr)))
664       return GNUNET_SYSERR;
665   }
666   return GNUNET_OK;
667 }
668
669
670 /**
671  * Task to free resources associated with a session.
672  *
673  * @param s session to free
674  */
675 static void
676 free_session (struct Session *s)
677 {
678   if (s->frag_ctx != NULL)
679   {
680     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
681     GNUNET_free (s->frag_ctx);
682     s->frag_ctx = NULL;
683   }
684   GNUNET_free (s);
685 }
686
687
688 /**
689  * Functions with this signature are called whenever we need
690  * to close a session due to a disconnect or failure to
691  * establish a connection.
692  *
693  * @param s session to close down
694  */
695 static void
696 disconnect_session (struct Session *s)
697 {
698   struct UDPMessageWrapper *udpw;
699   struct UDPMessageWrapper *next;
700
701   GNUNET_assert (GNUNET_YES != s->in_destroy);
702   LOG (GNUNET_ERROR_TYPE_DEBUG,
703        "Session %p to peer `%s' address ended \n",
704          s,
705          GNUNET_i2s (&s->target),
706          GNUNET_a2s (s->sock_addr, s->addrlen));
707   stop_session_timeout (s);
708   next = plugin->ipv4_queue_head;
709   while (NULL != (udpw = next))
710   {
711     next = udpw->next;
712     if (udpw->session == s)
713     {
714       GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
715       call_continuation(udpw, GNUNET_SYSERR);
716       GNUNET_free (udpw);
717     }
718   }
719   next = plugin->ipv6_queue_head;
720   while (NULL != (udpw = next))
721   {
722     next = udpw->next;
723     if (udpw->session == s)
724     {
725       GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
726       call_continuation(udpw, GNUNET_SYSERR);
727       GNUNET_free (udpw);
728     }
729     udpw = next;
730   }
731   plugin->env->session_end (plugin->env->cls, &s->target, s);
732
733   if (NULL != s->frag_ctx)
734   {
735     if (NULL != s->frag_ctx->cont)
736     {
737       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR);
738       LOG (GNUNET_ERROR_TYPE_DEBUG,
739           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
740           GNUNET_i2s (&s->target));
741     }
742   }
743
744   GNUNET_assert (GNUNET_YES ==
745                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
746                                                        &s->target.hashPubKey,
747                                                        s));
748   GNUNET_STATISTICS_set(plugin->env->stats,
749                         "# UDP sessions active",
750                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
751                         GNUNET_NO);
752   if (s->rc > 0)
753     s->in_destroy = GNUNET_YES;
754   else
755     free_session (s);
756 }
757
758 /**
759  * Destroy a session, plugin is being unloaded.
760  *
761  * @param cls unused
762  * @param key hash of public key of target peer
763  * @param value a 'struct PeerSession*' to clean up
764  * @return GNUNET_OK (continue to iterate)
765  */
766 static int
767 disconnect_and_free_it (void *cls, const struct GNUNET_HashCode * key, void *value)
768 {
769   disconnect_session(value);
770   return GNUNET_OK;
771 }
772
773
774 /**
775  * Disconnect from a remote node.  Clean up session if we have one for this peer
776  *
777  * @param cls closure for this call (should be handle to Plugin)
778  * @param target the peeridentity of the peer to disconnect
779  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
780  */
781 static void
782 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
783 {
784   struct Plugin *plugin = cls;
785   GNUNET_assert (plugin != NULL);
786
787   GNUNET_assert (target != NULL);
788   LOG (GNUNET_ERROR_TYPE_DEBUG,
789        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
790   /* Clean up sessions */
791   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
792 }
793
794
795 static struct Session *
796 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
797                 const void *addr, size_t addrlen,
798                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
799 {
800   struct Session *s;
801   const struct IPv4UdpAddress *t4;
802   const struct IPv6UdpAddress *t6;
803   struct sockaddr_in *v4;
804   struct sockaddr_in6 *v6;
805   size_t len;
806
807   switch (addrlen)
808   {
809   case sizeof (struct IPv4UdpAddress):
810     if (NULL == plugin->sockv4)
811     {
812       return NULL;
813     }
814     t4 = addr;
815     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
816     len = sizeof (struct sockaddr_in);
817     v4 = (struct sockaddr_in *) &s[1];
818     v4->sin_family = AF_INET;
819 #if HAVE_SOCKADDR_IN_SIN_LEN
820     v4->sin_len = sizeof (struct sockaddr_in);
821 #endif
822     v4->sin_port = t4->u4_port;
823     v4->sin_addr.s_addr = t4->ipv4_addr;
824     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
825     break;
826   case sizeof (struct IPv6UdpAddress):
827     if (NULL == plugin->sockv6)
828     {
829       return NULL;
830     }
831     t6 = addr;
832     s =
833         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
834     len = sizeof (struct sockaddr_in6);
835     v6 = (struct sockaddr_in6 *) &s[1];
836     v6->sin6_family = AF_INET6;
837 #if HAVE_SOCKADDR_IN_SIN_LEN
838     v6->sin6_len = sizeof (struct sockaddr_in6);
839 #endif
840     v6->sin6_port = t6->u6_port;
841     v6->sin6_addr = t6->ipv6_addr;
842     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
843     break;
844   default:
845     /* Must have a valid address to send to */
846     GNUNET_break_op (0);
847     return NULL;
848   }
849   s->addrlen = len;
850   s->target = *target;
851   s->sock_addr = (const struct sockaddr *) &s[1];
852   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
853   start_session_timeout(s);
854   return s;
855 }
856
857
858 static int
859 session_cmp_it (void *cls,
860                 const struct GNUNET_HashCode * key,
861                 void *value)
862 {
863   struct SessionCompareContext * cctx = cls;
864   const struct GNUNET_HELLO_Address *address = cctx->addr;
865   struct Session *s = value;
866
867   socklen_t s_addrlen = s->addrlen;
868
869   LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
870       udp_address_to_string (NULL, (void *) address->address, address->address_length),
871       GNUNET_a2s (s->sock_addr, s->addrlen));
872   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
873       (s_addrlen == sizeof (struct sockaddr_in)))
874   {
875     struct IPv4UdpAddress * u4 = NULL;
876     u4 = (struct IPv4UdpAddress *) address->address;
877     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
878     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
879         (u4->u4_port == s4->sin_port))
880     {
881       cctx->res = s;
882       return GNUNET_NO;
883     }
884
885   }
886   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
887       (s_addrlen == sizeof (struct sockaddr_in6)))
888   {
889     struct IPv6UdpAddress * u6 = NULL;
890     u6 = (struct IPv6UdpAddress *) address->address;
891     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
892     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
893         (u6->u6_port == s6->sin6_port))
894     {
895       cctx->res = s;
896       return GNUNET_NO;
897     }
898   }
899   return GNUNET_YES;
900 }
901
902
903 /**
904  * Creates a new outbound session the transport service will use to send data to the
905  * peer
906  *
907  * @param cls the plugin
908  * @param address the address
909  * @return the session or NULL of max connections exceeded
910  */
911 static struct Session *
912 udp_plugin_get_session (void *cls,
913                   const struct GNUNET_HELLO_Address *address)
914 {
915   struct Session * s = NULL;
916   struct Plugin * plugin = cls;
917   struct IPv6UdpAddress * udp_a6;
918   struct IPv4UdpAddress * udp_a4;
919
920   GNUNET_assert (plugin != NULL);
921   GNUNET_assert (address != NULL);
922
923
924   if ((address->address == NULL) ||
925       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
926       (address->address_length != sizeof (struct IPv6UdpAddress))))
927   {
928     GNUNET_break (0);
929     return NULL;
930   }
931
932   if (address->address_length == sizeof (struct IPv4UdpAddress))
933   {
934     if (plugin->sockv4 == NULL)
935       return NULL;
936     udp_a4 = (struct IPv4UdpAddress *) address->address;
937     if (udp_a4->u4_port == 0)
938       return NULL;
939   }
940
941   if (address->address_length == sizeof (struct IPv6UdpAddress))
942   {
943     if (plugin->sockv6 == NULL)
944       return NULL;
945     udp_a6 = (struct IPv6UdpAddress *) address->address;
946     if (udp_a6->u6_port == 0)
947       return NULL;
948   }
949
950   /* check if session already exists */
951   struct SessionCompareContext cctx;
952   cctx.addr = address;
953   cctx.res = NULL;
954   LOG (GNUNET_ERROR_TYPE_DEBUG,
955        "Looking for existing session for peer `%s' `%s' \n", 
956        GNUNET_i2s (&address->peer), 
957        udp_address_to_string(NULL, address->address, address->address_length));
958   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
959   if (cctx.res != NULL)
960   {
961     LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
962     return cctx.res;
963   }
964
965   /* otherwise create new */
966   s = create_session (plugin,
967       &address->peer,
968       address->address,
969       address->address_length,
970       NULL, NULL);
971   LOG (GNUNET_ERROR_TYPE_DEBUG,
972        "Creating new session %p for peer `%s' address `%s'\n",
973        s,
974        GNUNET_i2s(&address->peer),
975        udp_address_to_string(NULL,address->address,address->address_length));
976   GNUNET_assert (GNUNET_OK ==
977                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
978                                                     &s->target.hashPubKey,
979                                                     s,
980                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
981
982   GNUNET_STATISTICS_set(plugin->env->stats,
983                         "# UDP sessions active",
984                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
985                         GNUNET_NO);
986
987   return s;
988 }
989
990
991 static void 
992 enqueue (struct Plugin *plugin, struct UDPMessageWrapper * udpw)
993 {
994
995   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
996     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
997   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
998     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
999 }
1000
1001
1002 /**
1003  * Fragment message was transmitted via UDP, let fragmentation know
1004  * to send the next fragment now.
1005  *
1006  * @param cls the 'struct UDPMessageWrapper' of the fragment
1007  * @param target destination peer (ignored)
1008  * @param result GNUNET_OK on success (ignored)
1009  */
1010 static void
1011 send_next_fragment (void *cls,
1012                     const struct GNUNET_PeerIdentity *target,
1013                     int result)
1014 {
1015   struct UDPMessageWrapper *udpw = cls;
1016
1017   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
1018 }
1019
1020
1021 /**
1022  * Function that is called with messages created by the fragmentation
1023  * module.  In the case of the 'proc' callback of the
1024  * GNUNET_FRAGMENT_context_create function, this function must
1025  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
1026  *
1027  * @param cls closure, the 'struct FragmentationContext'
1028  * @param msg the message that was created
1029  */
1030 static void
1031 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1032 {
1033   struct FragmentationContext *frag_ctx = cls;
1034   struct Plugin *plugin = frag_ctx->plugin;
1035   struct UDPMessageWrapper * udpw;
1036   struct Session *s;
1037   size_t msg_len = ntohs (msg->size);
1038
1039   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1040        "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
1041   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
1042   udpw->session = frag_ctx->session;
1043   s = udpw->session;
1044   udpw->udp = (char *) &udpw[1];
1045
1046   udpw->msg_size = msg_len;
1047   udpw->cont = &send_next_fragment;
1048   udpw->cont_cls = udpw;
1049   udpw->timeout = frag_ctx->timeout;
1050   udpw->frag_ctx = frag_ctx;
1051   memcpy (udpw->udp, msg, msg_len);
1052   enqueue (plugin, udpw);
1053
1054   if (s->addrlen == sizeof (struct sockaddr_in))
1055   {
1056     if (plugin->with_v4_ws == GNUNET_NO)
1057     {
1058       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1059         GNUNET_SCHEDULER_cancel(plugin->select_task);
1060
1061       plugin->select_task =
1062           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1063                                        GNUNET_TIME_UNIT_FOREVER_REL,
1064                                        plugin->rs_v4,
1065                                        plugin->ws_v4,
1066                                        &udp_plugin_select, plugin);
1067       plugin->with_v4_ws = GNUNET_YES;
1068     }
1069   }
1070   else if (s->addrlen == sizeof (struct sockaddr_in6))
1071   {
1072     if (plugin->with_v6_ws == GNUNET_NO)
1073     {
1074       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1075         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1076
1077       plugin->select_task_v6 =
1078           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1079                                        GNUNET_TIME_UNIT_FOREVER_REL,
1080                                        plugin->rs_v6,
1081                                        plugin->ws_v6,
1082                                        &udp_plugin_select_v6, plugin);
1083       plugin->with_v6_ws = GNUNET_YES;
1084     }
1085   }
1086 }
1087
1088
1089 /**
1090  * Function that can be used by the transport service to transmit
1091  * a message using the plugin.   Note that in the case of a
1092  * peer disconnecting, the continuation MUST be called
1093  * prior to the disconnect notification itself.  This function
1094  * will be called with this peer's HELLO message to initiate
1095  * a fresh connection to another peer.
1096  *
1097  * @param cls closure
1098  * @param s which session must be used
1099  * @param msgbuf the message to transmit
1100  * @param msgbuf_size number of bytes in 'msgbuf'
1101  * @param priority how important is the message (most plugins will
1102  *                 ignore message priority and just FIFO)
1103  * @param to how long to wait at most for the transmission (does not
1104  *                require plugins to discard the message after the timeout,
1105  *                just advisory for the desired delay; most plugins will ignore
1106  *                this as well)
1107  * @param cont continuation to call once the message has
1108  *        been transmitted (or if the transport is ready
1109  *        for the next transmission call; or if the
1110  *        peer disconnected...); can be NULL
1111  * @param cont_cls closure for cont
1112  * @return number of bytes used (on the physical network, with overheads);
1113  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1114  *         and does NOT mean that the message was not transmitted (DV)
1115  */
1116 static ssize_t
1117 udp_plugin_send (void *cls,
1118                   struct Session *s,
1119                   const char *msgbuf, size_t msgbuf_size,
1120                   unsigned int priority,
1121                   struct GNUNET_TIME_Relative to,
1122                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1123 {
1124   struct Plugin *plugin = cls;
1125   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
1126   struct UDPMessageWrapper * udpw;
1127   struct UDPMessage *udp;
1128   char mbuf[mlen];
1129   GNUNET_assert (plugin != NULL);
1130   GNUNET_assert (s != NULL);
1131
1132   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1133     return GNUNET_SYSERR;
1134   if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1135     return GNUNET_SYSERR;
1136   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1137   {
1138     GNUNET_break (0);
1139     return GNUNET_SYSERR;
1140   }
1141   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1142   {
1143     GNUNET_break (0);
1144     return GNUNET_SYSERR;
1145   }
1146   LOG (GNUNET_ERROR_TYPE_DEBUG,
1147        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1148        mlen,
1149        GNUNET_i2s (&s->target),
1150        GNUNET_a2s(s->sock_addr, s->addrlen));
1151   
1152   /* Message */
1153   udp = (struct UDPMessage *) mbuf;
1154   udp->header.size = htons (mlen);
1155   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1156   udp->reserved = htonl (0);
1157   udp->sender = *plugin->env->my_identity;
1158
1159   reschedule_session_timeout(s);
1160   if (mlen <= UDP_MTU)
1161   {
1162     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
1163     udpw->session = s;
1164     udpw->udp = (char *) &udpw[1];
1165     udpw->msg_size = mlen;
1166     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1167     udpw->cont = cont;
1168     udpw->cont_cls = cont_cls;
1169     udpw->frag_ctx = NULL;
1170     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
1171     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1172
1173     enqueue (plugin, udpw);
1174   }
1175   else
1176   {
1177     LOG (GNUNET_ERROR_TYPE_DEBUG,
1178          "UDP has to fragment message \n");
1179     if  (s->frag_ctx != NULL)
1180       return GNUNET_SYSERR;
1181     memcpy (&udp[1], msgbuf, msgbuf_size);
1182     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
1183
1184     frag_ctx->plugin = plugin;
1185     frag_ctx->session = s;
1186     frag_ctx->cont = cont;
1187     frag_ctx->cont_cls = cont_cls;
1188     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1189     frag_ctx->bytes_to_send = mlen;
1190     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1191               UDP_MTU,
1192               &plugin->tracker,
1193               s->last_expected_delay,
1194               &udp->header,
1195               &enqueue_fragment,
1196               frag_ctx);
1197
1198     s->frag_ctx = frag_ctx;
1199   }
1200
1201   if (s->addrlen == sizeof (struct sockaddr_in))
1202   {
1203     if (plugin->with_v4_ws == GNUNET_NO)
1204     {
1205       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1206         GNUNET_SCHEDULER_cancel(plugin->select_task);
1207
1208       plugin->select_task =
1209           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1210                                        GNUNET_TIME_UNIT_FOREVER_REL,
1211                                        plugin->rs_v4,
1212                                        plugin->ws_v4,
1213                                        &udp_plugin_select, plugin);
1214       plugin->with_v4_ws = GNUNET_YES;
1215     }
1216   }
1217   else if (s->addrlen == sizeof (struct sockaddr_in6))
1218   {
1219     if (plugin->with_v6_ws == GNUNET_NO)
1220     {
1221       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1222         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1223
1224       plugin->select_task_v6 =
1225         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1226                                      GNUNET_TIME_UNIT_FOREVER_REL,
1227                                      plugin->rs_v6,
1228                                      plugin->ws_v6,
1229                                      &udp_plugin_select_v6, plugin);
1230       plugin->with_v6_ws = GNUNET_YES;
1231     }
1232   }
1233
1234   return mlen;
1235 }
1236
1237
1238 /**
1239  * Our external IP address/port mapping has changed.
1240  *
1241  * @param cls closure, the 'struct LocalAddrList'
1242  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1243  *     the previous (now invalid) one
1244  * @param addr either the previous or the new public IP address
1245  * @param addrlen actual lenght of the address
1246  */
1247 static void
1248 udp_nat_port_map_callback (void *cls, int add_remove,
1249                            const struct sockaddr *addr, socklen_t addrlen)
1250 {
1251   struct Plugin *plugin = cls;
1252   struct IPv4UdpAddress u4;
1253   struct IPv6UdpAddress u6;
1254   void *arg;
1255   size_t args;
1256
1257   /* convert 'addr' to our internal format */
1258   switch (addr->sa_family)
1259   {
1260   case AF_INET:
1261     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1262     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1263     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1264     arg = &u4;
1265     args = sizeof (u4);
1266     break;
1267   case AF_INET6:
1268     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1269     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1270             sizeof (struct in6_addr));
1271     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1272     arg = &u6;
1273     args = sizeof (u6);
1274     break;
1275   default:
1276     GNUNET_break (0);
1277     return;
1278   }
1279   /* modify our published address list */
1280   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args);
1281 }
1282
1283
1284
1285 /**
1286  * Message tokenizer has broken up an incomming message. Pass it on
1287  * to the service.
1288  *
1289  * @param cls the 'struct Plugin'
1290  * @param client the 'struct SourceInformation'
1291  * @param hdr the actual message
1292  */
1293 static int
1294 process_inbound_tokenized_messages (void *cls, void *client,
1295                                     const struct GNUNET_MessageHeader *hdr)
1296 {
1297   struct Plugin *plugin = cls;
1298   struct SourceInformation *si = client;
1299   struct GNUNET_ATS_Information ats[2];
1300   struct GNUNET_TIME_Relative delay;
1301
1302   GNUNET_assert (si->session != NULL);
1303   if (GNUNET_YES == si->session->in_destroy)
1304     return GNUNET_OK;
1305   /* setup ATS */
1306   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1307   ats[0].value = htonl (1);
1308   ats[1] = si->session->ats;
1309   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1310   delay = plugin->env->receive (plugin->env->cls,
1311                                 &si->sender,
1312                                 hdr,
1313                                 (const struct GNUNET_ATS_Information *) &ats, 2,
1314                                 si->session,
1315                                 si->arg,
1316                                 si->args);
1317   si->session->flow_delay_for_other_peer = delay;
1318   reschedule_session_timeout(si->session);
1319   return GNUNET_OK;
1320 }
1321
1322
1323 /**
1324  * We've received a UDP Message.  Process it (pass contents to main service).
1325  *
1326  * @param plugin plugin context
1327  * @param msg the message
1328  * @param sender_addr sender address
1329  * @param sender_addr_len number of bytes in sender_addr
1330  */
1331 static void
1332 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1333                      const struct sockaddr *sender_addr,
1334                      socklen_t sender_addr_len)
1335 {
1336   struct SourceInformation si;
1337   struct Session * s;
1338   struct IPv4UdpAddress u4;
1339   struct IPv6UdpAddress u6;
1340   const void *arg;
1341   size_t args;
1342
1343   if (0 != ntohl (msg->reserved))
1344   {
1345     GNUNET_break_op (0);
1346     return;
1347   }
1348   if (ntohs (msg->header.size) <
1349       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1350   {
1351     GNUNET_break_op (0);
1352     return;
1353   }
1354
1355   /* convert address */
1356   switch (sender_addr->sa_family)
1357   {
1358   case AF_INET:
1359     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1360     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1361     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1362     arg = &u4;
1363     args = sizeof (u4);
1364     break;
1365   case AF_INET6:
1366     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1367     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1368     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1369     arg = &u6;
1370     args = sizeof (u6);
1371     break;
1372   default:
1373     GNUNET_break (0);
1374     return;
1375   }
1376   LOG (GNUNET_ERROR_TYPE_DEBUG,
1377        "Received message with %u bytes from peer `%s' at `%s'\n",
1378        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1379        GNUNET_a2s (sender_addr, sender_addr_len));
1380
1381   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1382   s = udp_plugin_get_session(plugin, address);
1383   GNUNET_free (address);
1384
1385   /* iterate over all embedded messages */
1386   si.session = s;
1387   si.sender = msg->sender;
1388   si.arg = arg;
1389   si.args = args;
1390   s->rc++;
1391   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1392                              ntohs (msg->header.size) -
1393                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1394   s->rc--;
1395   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1396     free_session (s);
1397 }
1398
1399
1400 /**
1401  * Scan the heap for a receive context with the given address.
1402  *
1403  * @param cls the 'struct FindReceiveContext'
1404  * @param node internal node of the heap
1405  * @param element value stored at the node (a 'struct ReceiveContext')
1406  * @param cost cost associated with the node
1407  * @return GNUNET_YES if we should continue to iterate,
1408  *         GNUNET_NO if not.
1409  */
1410 static int
1411 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1412                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1413 {
1414   struct FindReceiveContext *frc = cls;
1415   struct DefragContext *e = element;
1416
1417   if ((frc->addr_len == e->addr_len) &&
1418       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1419   {
1420     frc->rc = e;
1421     return GNUNET_NO;
1422   }
1423   return GNUNET_YES;
1424 }
1425
1426
1427 /**
1428  * Process a defragmented message.
1429  *
1430  * @param cls the 'struct ReceiveContext'
1431  * @param msg the message
1432  */
1433 static void
1434 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1435 {
1436   struct DefragContext *rc = cls;
1437
1438   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1439   {
1440     GNUNET_break (0);
1441     return;
1442   }
1443   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1444   {
1445     GNUNET_break (0);
1446     return;
1447   }
1448   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1449                        rc->src_addr, rc->addr_len);
1450 }
1451
1452
1453 struct LookupContext
1454 {
1455   const struct sockaddr * addr;
1456
1457   struct Session *res;
1458
1459   size_t addrlen;
1460 };
1461
1462
1463 static int
1464 lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1465 {
1466   struct LookupContext *l_ctx = cls;
1467   struct Session * s = value;
1468
1469   if ((s->addrlen == l_ctx->addrlen) &&
1470       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1471   {
1472     l_ctx->res = s;
1473     return GNUNET_NO;
1474   }
1475   return GNUNET_YES;
1476 }
1477
1478
1479 /**
1480  * Transmit an acknowledgement.
1481  *
1482  * @param cls the 'struct ReceiveContext'
1483  * @param id message ID (unused)
1484  * @param msg ack to transmit
1485  */
1486 static void
1487 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1488 {
1489   struct DefragContext *rc = cls;
1490   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1491   struct UDP_ACK_Message *udp_ack;
1492   uint32_t delay = 0;
1493   struct UDPMessageWrapper *udpw;
1494   struct Session *s;
1495
1496   struct LookupContext l_ctx;
1497   l_ctx.addr = rc->src_addr;
1498   l_ctx.addrlen = rc->addr_len;
1499   l_ctx.res = NULL;
1500   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1501       &lookup_session_by_addr_it,
1502       &l_ctx);
1503   s = l_ctx.res;
1504
1505   if (NULL == s)
1506     return;
1507
1508   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1509     delay = s->flow_delay_for_other_peer.rel_value;
1510
1511   LOG (GNUNET_ERROR_TYPE_DEBUG,
1512        "Sending ACK to `%s' including delay of %u ms\n",
1513        GNUNET_a2s (rc->src_addr,
1514                    (rc->src_addr->sa_family ==
1515                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1516                                                                      sockaddr_in6)),
1517        delay);
1518   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
1519   udpw->msg_size = msize;
1520   udpw->session = s;
1521   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1522   udpw->udp = (char *)&udpw[1];
1523   udp_ack = (struct UDP_ACK_Message *) udpw->udp;
1524   udp_ack->header.size = htons ((uint16_t) msize);
1525   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1526   udp_ack->delay = htonl (delay);
1527   udp_ack->sender = *rc->plugin->env->my_identity;
1528   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1529
1530   enqueue (rc->plugin, udpw);
1531 }
1532
1533
1534 static void 
1535 read_process_msg (struct Plugin *plugin,
1536                   const struct GNUNET_MessageHeader *msg,
1537                   const char *addr,
1538                   socklen_t fromlen)
1539 {
1540   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1541   {
1542     GNUNET_break_op (0);
1543     return;
1544   }
1545   process_udp_message (plugin, (const struct UDPMessage *) msg,
1546                        (const struct sockaddr *) addr, fromlen);
1547 }
1548
1549
1550 static void 
1551 read_process_ack (struct Plugin *plugin,
1552                   const struct GNUNET_MessageHeader *msg,
1553                   char *addr,
1554                   socklen_t fromlen)
1555 {
1556   const struct GNUNET_MessageHeader *ack;
1557   const struct UDP_ACK_Message *udp_ack;
1558   struct LookupContext l_ctx;
1559   struct Session *s;
1560   struct GNUNET_TIME_Relative flow_delay;
1561
1562   if (ntohs (msg->size) <
1563       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1564   {
1565     GNUNET_break_op (0);
1566     return;
1567   }
1568   udp_ack = (const struct UDP_ACK_Message *) msg;
1569   l_ctx.addr = (const struct sockaddr *) addr;
1570   l_ctx.addrlen = fromlen;
1571   l_ctx.res = NULL;
1572   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1573       &lookup_session_by_addr_it,
1574       &l_ctx);
1575   s = l_ctx.res;
1576
1577   if ((s == NULL) || (s->frag_ctx == NULL))
1578     return;
1579
1580   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1581   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1582        flow_delay.rel_value);
1583   s->flow_delay_from_other_peer =
1584       GNUNET_TIME_relative_to_absolute (flow_delay);
1585
1586   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1587   if (ntohs (ack->size) !=
1588       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1589   {
1590     GNUNET_break_op (0);
1591     return;
1592   }
1593
1594   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1595   {
1596     LOG (GNUNET_ERROR_TYPE_DEBUG,
1597          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1598          (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1599          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1600     return;
1601   }
1602
1603   LOG (GNUNET_ERROR_TYPE_DEBUG,
1604        "FULL MESSAGE ACKed\n",
1605        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1606        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1607   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1608
1609   struct UDPMessageWrapper * udpw;
1610   struct UDPMessageWrapper * tmp;
1611   if (s->addrlen == sizeof (struct sockaddr_in6))
1612   {
1613     udpw = plugin->ipv6_queue_head;
1614     while (NULL != udpw)
1615     {
1616       tmp = udpw->next;
1617       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1618       {
1619         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1620         GNUNET_free (udpw);
1621       }
1622       udpw = tmp;
1623     }
1624   }
1625   if (s->addrlen == sizeof (struct sockaddr_in))
1626   {
1627     udpw = plugin->ipv4_queue_head;
1628     while (udpw!= NULL)
1629     {
1630       tmp = udpw->next;
1631       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1632       {
1633         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1634         GNUNET_free (udpw);
1635       }
1636       udpw = tmp;
1637     }
1638   }
1639
1640   if (s->frag_ctx->cont != NULL)
1641   {
1642     LOG (GNUNET_ERROR_TYPE_DEBUG,
1643         "Calling continuation for fragmented message to `%s' with result %s\n",
1644         GNUNET_i2s (&s->target), "OK");
1645     s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
1646   }
1647
1648   GNUNET_free (s->frag_ctx);
1649   s->frag_ctx = NULL;
1650 }
1651
1652
1653 static void 
1654 read_process_fragment (struct Plugin *plugin,
1655                        const struct GNUNET_MessageHeader *msg,
1656                        char *addr,
1657                        socklen_t fromlen)
1658 {
1659   struct DefragContext *d_ctx;
1660   struct GNUNET_TIME_Absolute now;
1661   struct FindReceiveContext frc;
1662
1663   frc.rc = NULL;
1664   frc.addr = (const struct sockaddr *) addr;
1665   frc.addr_len = fromlen;
1666
1667   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1668        (unsigned int) ntohs (msg->size),
1669        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1670   /* Lookup existing receive context for this address */
1671   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1672                                  &find_receive_context,
1673                                  &frc);
1674   now = GNUNET_TIME_absolute_get ();
1675   d_ctx = frc.rc;
1676
1677   if (d_ctx == NULL)
1678   {
1679     /* Create a new defragmentation context */
1680     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1681     memcpy (&d_ctx[1], addr, fromlen);
1682     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1683     d_ctx->addr_len = fromlen;
1684     d_ctx->plugin = plugin;
1685     d_ctx->defrag =
1686         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1687                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1688                                           &fragment_msg_proc, &ack_proc);
1689     d_ctx->hnode =
1690         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1691                                       (GNUNET_CONTAINER_HeapCostType)
1692                                       now.abs_value);
1693     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1694          "Created new defragmentation context for %u-byte fragment from `%s'\n",
1695          (unsigned int) ntohs (msg->size),
1696          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1697   }
1698   else
1699   {
1700     LOG (GNUNET_ERROR_TYPE_DEBUG,
1701          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1702          (unsigned int) ntohs (msg->size),
1703          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1704   }
1705
1706   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1707   {
1708     /* keep this 'rc' from expiring */
1709     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1710                                        (GNUNET_CONTAINER_HeapCostType)
1711                                        now.abs_value);
1712   }
1713   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1714       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1715   {
1716     /* remove 'rc' that was inactive the longest */
1717     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1718     GNUNET_assert (NULL != d_ctx);
1719     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1720     GNUNET_free (d_ctx);
1721   }
1722 }
1723
1724
1725 /**
1726  * Read and process a message from the given socket.
1727  *
1728  * @param plugin the overall plugin
1729  * @param rsock socket to read from
1730  */
1731 static void
1732 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1733 {
1734   socklen_t fromlen;
1735   char addr[32];
1736   char buf[65536] GNUNET_ALIGN;
1737   ssize_t size;
1738   const struct GNUNET_MessageHeader *msg;
1739
1740   fromlen = sizeof (addr);
1741   memset (&addr, 0, sizeof (addr));
1742   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1743                                       (struct sockaddr *) &addr, &fromlen);
1744 #if MINGW
1745   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
1746    * WSAECONNRESET error to indicate that previous sendto() (???)
1747    * on this socket has failed.
1748    */
1749   if ( (-1 == size) && (ECONNRESET == errno) )
1750     return;
1751 #endif
1752   if ( (-1 == size) || (size < sizeof (struct GNUNET_MessageHeader)))
1753   {
1754     GNUNET_break_op (0);
1755     return;
1756   }
1757   msg = (const struct GNUNET_MessageHeader *) buf;
1758
1759   LOG (GNUNET_ERROR_TYPE_DEBUG,
1760        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1761        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1762
1763   if (size != ntohs (msg->size))
1764   {
1765     GNUNET_break_op (0);
1766     return;
1767   }
1768
1769   switch (ntohs (msg->type))
1770   {
1771   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1772     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1773     return;
1774
1775   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1776     read_process_msg (plugin, msg, addr, fromlen);
1777     return;
1778
1779   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1780     read_process_ack (plugin, msg, addr, fromlen);
1781     return;
1782
1783   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1784     read_process_fragment (plugin, msg, addr, fromlen);
1785     return;
1786
1787   default:
1788     GNUNET_break_op (0);
1789     return;
1790   }
1791 }
1792
1793
1794 static size_t
1795 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
1796 {
1797   ssize_t sent;
1798   size_t slen;
1799   struct GNUNET_TIME_Absolute max;
1800   struct UDPMessageWrapper *udpw = NULL;
1801
1802   if (sock == plugin->sockv4)
1803   {
1804     udpw = plugin->ipv4_queue_head;
1805   }
1806   else if (sock == plugin->sockv6)
1807   {
1808     udpw = plugin->ipv6_queue_head;
1809   }
1810   else
1811   {
1812     GNUNET_break (0);
1813     return 0;
1814   }
1815
1816   const struct sockaddr * sa = udpw->session->sock_addr;
1817   slen = udpw->session->addrlen;
1818
1819   max = GNUNET_TIME_absolute_max(udpw->timeout, GNUNET_TIME_absolute_get());
1820
1821   while (udpw != NULL)
1822   {
1823     if (max.abs_value != udpw->timeout.abs_value)
1824     {
1825       /* Message timed out */
1826       call_continuation(udpw, GNUNET_SYSERR);
1827       if (udpw->frag_ctx != NULL)
1828       {
1829         LOG (GNUNET_ERROR_TYPE_DEBUG,
1830              "Fragmented message for peer `%s' with size %u timed out\n",
1831              GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
1832         udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
1833         GNUNET_free (udpw->frag_ctx);
1834         udpw->session->frag_ctx = NULL;
1835       }
1836       else
1837       {
1838         LOG (GNUNET_ERROR_TYPE_DEBUG, 
1839              "Message for peer `%s' with size %u timed out\n",
1840              GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1841       }
1842
1843       if (sock == plugin->sockv4)
1844       {
1845         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1846         GNUNET_free (udpw);
1847         udpw = plugin->ipv4_queue_head;
1848       }
1849       else if (sock == plugin->sockv6)
1850       {
1851         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1852         GNUNET_free (udpw);
1853         udpw = plugin->ipv6_queue_head;
1854       }
1855     }
1856     else
1857     {
1858       struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
1859       if (delta.rel_value == 0)
1860       {
1861         /* this message is not delayed */
1862         LOG (GNUNET_ERROR_TYPE_DEBUG, 
1863              "Message for peer `%s' (%u bytes) is not delayed \n",
1864              GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1865         break;
1866       }
1867       else
1868       {
1869         /* this message is delayed, try next */
1870         LOG (GNUNET_ERROR_TYPE_DEBUG,
1871              "Message for peer `%s' (%u bytes) is delayed for %llu \n",
1872              GNUNET_i2s(&udpw->session->target), udpw->msg_size,
1873              delta);
1874         udpw = udpw->next;
1875       }
1876     }
1877   }
1878
1879   if (udpw == NULL)
1880   {
1881     /* No message left */
1882     return 0;
1883   }
1884
1885   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->udp, udpw->msg_size, sa, slen);
1886
1887   if (GNUNET_SYSERR == sent)
1888   {
1889     const struct GNUNET_ATS_Information type = plugin->env->get_address_type
1890         (plugin->env->cls,sa, slen);
1891
1892     if ((GNUNET_ATS_NET_WAN == type.value) &&
1893         ((ENETUNREACH == errno) || (ENETDOWN == errno)))
1894     {
1895       /* "Network unreachable" or "Network down" */
1896       /*
1897        * This indicates that this system is IPv6 enabled, but does not
1898        * have a valid global IPv6 address assigned
1899        */
1900        LOG (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1901            _("UDP could not message to `%s': `%s'. "
1902              "Please check your network configuration and disable IPv6 if your "
1903              "connection does not have a global IPv6 address\n"),
1904            GNUNET_a2s (sa, slen),
1905            STRERROR (errno));
1906     }
1907     else
1908     {
1909       LOG (GNUNET_ERROR_TYPE_ERROR,
1910          "UDP could not transmit %u-byte message to `%s': `%s'\n",
1911          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen),
1912          STRERROR (errno));
1913     }
1914     call_continuation(udpw, GNUNET_SYSERR);
1915   }
1916   else
1917   {
1918     LOG (GNUNET_ERROR_TYPE_DEBUG,
1919          "UDP transmitted %u-byte message to `%s' (%d: %s)\n",
1920          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1921          (sent < 0) ? STRERROR (errno) : "ok");
1922     call_continuation(udpw, GNUNET_OK);
1923   }
1924
1925   if (sock == plugin->sockv4)
1926     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1927   else if (sock == plugin->sockv6)
1928     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1929   GNUNET_free (udpw);
1930   udpw = NULL;
1931
1932   return sent;
1933 }
1934
1935
1936 /**
1937  * We have been notified that our readset has something to read.  We don't
1938  * know which socket needs to be read, so we have to check each one
1939  * Then reschedule this function to be called again once more is available.
1940  *
1941  * @param cls the plugin handle
1942  * @param tc the scheduling context (for rescheduling this function again)
1943  */
1944 static void
1945 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1946 {
1947   struct Plugin *plugin = cls;
1948
1949   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
1950   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1951     return;
1952   plugin->with_v4_ws = GNUNET_NO;
1953
1954   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1955   {
1956     if ((NULL != plugin->sockv4) &&
1957       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
1958         udp_select_read (plugin, plugin->sockv4);
1959
1960   }
1961
1962   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1963   {
1964     if ((NULL != plugin->sockv4) && (plugin->ipv4_queue_head != NULL) &&
1965       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
1966       {
1967         udp_select_send (plugin, plugin->sockv4);
1968       }
1969   }
1970
1971   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1972     GNUNET_SCHEDULER_cancel (plugin->select_task);
1973   plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1974                                    GNUNET_TIME_UNIT_FOREVER_REL,
1975                                    plugin->rs_v4,
1976                                    (plugin->ipv4_queue_head != NULL) ? plugin->ws_v4 : NULL,
1977                                    &udp_plugin_select, plugin);
1978   if (plugin->ipv4_queue_head != NULL)
1979     plugin->with_v4_ws = GNUNET_YES;
1980   else
1981     plugin->with_v4_ws = GNUNET_NO;
1982 }
1983
1984
1985 /**
1986  * We have been notified that our readset has something to read.  We don't
1987  * know which socket needs to be read, so we have to check each one
1988  * Then reschedule this function to be called again once more is available.
1989  *
1990  * @param cls the plugin handle
1991  * @param tc the scheduling context (for rescheduling this function again)
1992  */
1993 static void
1994 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1995 {
1996   struct Plugin *plugin = cls;
1997
1998   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
1999   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
2000     return;
2001
2002   plugin->with_v6_ws = GNUNET_NO;
2003   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
2004   {
2005     if ((NULL != plugin->sockv6) &&
2006       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
2007         udp_select_read (plugin, plugin->sockv6);
2008   }
2009
2010   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
2011   {
2012     if ((NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
2013       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)))
2014       {
2015         udp_select_send (plugin, plugin->sockv6);
2016       }
2017   }
2018   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2019     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2020   plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2021                                    GNUNET_TIME_UNIT_FOREVER_REL,
2022                                    plugin->rs_v6,
2023                                    (plugin->ipv6_queue_head != NULL) ? plugin->ws_v6 : NULL,
2024                                    &udp_plugin_select_v6, plugin);
2025   if (plugin->ipv6_queue_head != NULL)
2026     plugin->with_v6_ws = GNUNET_YES;
2027   else
2028     plugin->with_v6_ws = GNUNET_NO;
2029 }
2030
2031
2032 static int
2033 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
2034 {
2035   int tries;
2036   int sockets_created = 0;
2037   struct sockaddr *serverAddr;
2038   struct sockaddr *addrs[2];
2039   socklen_t addrlens[2];
2040   socklen_t addrlen;
2041
2042   /* Create IPv6 socket */
2043   if (plugin->enable_ipv6 == GNUNET_YES)
2044   {
2045     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2046     if (NULL == plugin->sockv6)
2047     {
2048       LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2049       plugin->enable_ipv6 = GNUNET_NO;
2050     }
2051     else
2052     {
2053 #if HAVE_SOCKADDR_IN_SIN_LEN
2054       serverAddrv6->sin6_len = sizeof (serverAddrv6);
2055 #endif
2056       serverAddrv6->sin6_family = AF_INET6;
2057       serverAddrv6->sin6_addr = in6addr_any;
2058       serverAddrv6->sin6_port = htons (plugin->port);
2059       addrlen = sizeof (struct sockaddr_in6);
2060       serverAddr = (struct sockaddr *) serverAddrv6;
2061       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
2062            ntohs (serverAddrv6->sin6_port));
2063       tries = 0;
2064       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
2065              GNUNET_OK)
2066       {
2067         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
2068         LOG (GNUNET_ERROR_TYPE_DEBUG,
2069              "IPv6 Binding failed, trying new port %d\n",
2070              ntohs (serverAddrv6->sin6_port));
2071         tries++;
2072         if (tries > 10)
2073         {
2074           GNUNET_NETWORK_socket_close (plugin->sockv6);
2075           plugin->sockv6 = NULL;
2076           break;
2077         }
2078       }
2079       if (plugin->sockv6 != NULL)
2080       {
2081         LOG (GNUNET_ERROR_TYPE_DEBUG,
2082              "IPv6 socket created on port %d\n",
2083              ntohs (serverAddrv6->sin6_port));
2084         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
2085         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2086         sockets_created++;
2087       }
2088     }
2089   }
2090
2091   /* Create IPv4 socket */
2092   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2093   if (NULL == plugin->sockv4)
2094   {
2095     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2096   }
2097   else
2098   {
2099 #if HAVE_SOCKADDR_IN_SIN_LEN
2100     serverAddrv4->sin_len = sizeof (serverAddrv4);
2101 #endif
2102     serverAddrv4->sin_family = AF_INET;
2103     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2104     serverAddrv4->sin_port = htons (plugin->port);
2105     addrlen = sizeof (struct sockaddr_in);
2106     serverAddr = (struct sockaddr *) serverAddrv4;
2107
2108     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2109          ntohs (serverAddrv4->sin_port));
2110     tries = 0;
2111     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2112            GNUNET_OK)
2113     {
2114       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2115       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2116            ntohs (serverAddrv4->sin_port));
2117       tries++;
2118       if (tries > 10)
2119       {
2120         GNUNET_NETWORK_socket_close (plugin->sockv4);
2121         plugin->sockv4 = NULL;
2122         break;
2123       }
2124     }
2125     if (plugin->sockv4 != NULL)
2126     {
2127       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2128       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2129       sockets_created++;
2130     }
2131   }
2132
2133   /* Create file descriptors */
2134   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2135   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2136   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2137   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2138   if (NULL != plugin->sockv4)
2139   {
2140     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2141     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2142   }
2143
2144   if (sockets_created == 0)
2145     LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2146
2147   plugin->select_task =
2148       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2149                                    GNUNET_TIME_UNIT_FOREVER_REL,
2150                                    plugin->rs_v4,
2151                                    NULL,
2152                                    &udp_plugin_select, plugin);
2153   plugin->with_v4_ws = GNUNET_NO;
2154
2155   if (plugin->enable_ipv6 == GNUNET_YES)
2156   {
2157     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2158     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2159     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2160     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2161     if (NULL != plugin->sockv6)
2162     {
2163       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2164       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2165     }
2166
2167     plugin->select_task_v6 =
2168         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2169                                      GNUNET_TIME_UNIT_FOREVER_REL,
2170                                      plugin->rs_v6,
2171                                      NULL,
2172                                      &udp_plugin_select_v6, plugin);
2173     plugin->with_v6_ws = GNUNET_NO;
2174   }
2175
2176   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2177                            GNUNET_NO, plugin->port,
2178                            sockets_created,
2179                            (const struct sockaddr **) addrs, addrlens,
2180                            &udp_nat_port_map_callback, NULL, plugin);
2181
2182   return sockets_created;
2183 }
2184
2185 /**
2186  * Session was idle, so disconnect it
2187  */
2188 static void
2189 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2190 {
2191   GNUNET_assert (NULL != cls);
2192   struct Session *s = cls;
2193
2194   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
2195   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2196               "Session %p was idle for %llu ms, disconnecting\n",
2197               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2198   /* call session destroy function */
2199   disconnect_session(s);
2200 }
2201
2202
2203 /**
2204  * Start session timeout
2205  */
2206 static void
2207 start_session_timeout (struct Session *s)
2208 {
2209   GNUNET_assert (NULL != s);
2210   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
2211   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2212                                                    &session_timeout,
2213                                                    s);
2214   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2215               "Timeout for session %p set to %llu ms\n",
2216               s,  (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2217 }
2218
2219
2220 /**
2221  * Increment session timeout due to activity
2222  */
2223 static void
2224 reschedule_session_timeout (struct Session *s)
2225 {
2226   GNUNET_assert (NULL != s);
2227   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
2228
2229   GNUNET_SCHEDULER_cancel (s->timeout_task);
2230   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2231                                                    &session_timeout,
2232                                                    s);
2233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2234               "Timeout rescheduled for session %p set to %llu ms\n",
2235               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2236 }
2237
2238
2239 /**
2240  * Cancel timeout
2241  */
2242 static void
2243 stop_session_timeout (struct Session *s)
2244 {
2245   GNUNET_assert (NULL != s);
2246
2247   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
2248   {
2249     GNUNET_SCHEDULER_cancel (s->timeout_task);
2250     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
2251     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2252                 "Timeout stopped for session %p canceled\n",
2253                 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2254   }
2255 }
2256
2257
2258
2259 /**
2260  * The exported method. Makes the core api available via a global and
2261  * returns the udp transport API.
2262  *
2263  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2264  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2265  */
2266 void *
2267 libgnunet_plugin_transport_udp_init (void *cls)
2268 {
2269   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2270   struct GNUNET_TRANSPORT_PluginFunctions *api;
2271   struct Plugin *p;
2272   unsigned long long port;
2273   unsigned long long aport;
2274   unsigned long long broadcast;
2275   unsigned long long udp_max_bps;
2276   unsigned long long enable_v6;
2277   char * bind4_address;
2278   char * bind6_address;
2279   struct GNUNET_TIME_Relative interval;
2280   struct sockaddr_in serverAddrv4;
2281   struct sockaddr_in6 serverAddrv6;
2282   int res;
2283
2284   if (NULL == env->receive)
2285   {
2286     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2287        initialze the plugin or the API */
2288     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2289     api->cls = NULL;
2290     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2291     api->address_to_string = &udp_address_to_string;
2292     api->string_to_address = &udp_string_to_address;
2293     return api;
2294   }
2295
2296   GNUNET_assert( NULL != env->stats);
2297
2298   /* Get port number */
2299   if (GNUNET_OK !=
2300       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2301                                              &port))
2302     port = 2086;
2303   if (GNUNET_OK !=
2304       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2305                                              "ADVERTISED_PORT", &aport))
2306     aport = port;
2307   if (port > 65535)
2308   {
2309     LOG (GNUNET_ERROR_TYPE_WARNING,
2310          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2311          65535);
2312     return NULL;
2313   }
2314
2315   /* Protocols */
2316   if ((GNUNET_YES ==
2317        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2318                                              "DISABLEV6")))
2319   {
2320     enable_v6 = GNUNET_NO;
2321   }
2322   else
2323     enable_v6 = GNUNET_YES;
2324
2325   /* Addresses */
2326   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2327   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2328
2329   if (GNUNET_YES ==
2330       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2331                                              "BINDTO", &bind4_address))
2332   {
2333     LOG (GNUNET_ERROR_TYPE_DEBUG,
2334          "Binding udp plugin to specific address: `%s'\n",
2335          bind4_address);
2336     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2337     {
2338       GNUNET_free (bind4_address);
2339       return NULL;
2340     }
2341   }
2342
2343   if (GNUNET_YES ==
2344       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2345                                              "BINDTO6", &bind6_address))
2346   {
2347     LOG (GNUNET_ERROR_TYPE_DEBUG,
2348          "Binding udp plugin to specific address: `%s'\n",
2349          bind6_address);
2350     if (1 !=
2351         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2352     {
2353       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2354            bind6_address);
2355       GNUNET_free_non_null (bind4_address);
2356       GNUNET_free (bind6_address);
2357       return NULL;
2358     }
2359   }
2360
2361   /* Enable neighbour discovery */
2362   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2363                                             "BROADCAST");
2364   if (broadcast == GNUNET_SYSERR)
2365     broadcast = GNUNET_NO;
2366
2367   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-udp",
2368                                            "BROADCAST_INTERVAL", &interval))
2369   {
2370     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2371   }
2372
2373   /* Maximum datarate */
2374   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2375                                              "MAX_BPS", &udp_max_bps))
2376   {
2377     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2378   }
2379
2380   p = GNUNET_malloc (sizeof (struct Plugin));
2381   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2382
2383   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
2384                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2385   p->sessions = GNUNET_CONTAINER_multihashmap_create (10);
2386   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2387   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
2388   p->port = port;
2389   p->aport = aport;
2390   p->broadcast_interval = interval;
2391   p->enable_ipv6 = enable_v6;
2392   p->env = env;
2393
2394   plugin = p;
2395
2396   api->cls = p;
2397   api->send = NULL;
2398   api->disconnect = &udp_disconnect;
2399   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2400   api->address_to_string = &udp_address_to_string;
2401   api->string_to_address = &udp_string_to_address;
2402   api->check_address = &udp_plugin_check_address;
2403   api->get_session = &udp_plugin_get_session;
2404   api->send = &udp_plugin_send;
2405
2406   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2407   res = setup_sockets (p, &serverAddrv6, &serverAddrv4);
2408   if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
2409   {
2410     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2411     GNUNET_free (p);
2412     GNUNET_free (api);
2413     return NULL;
2414   }
2415
2416   if (broadcast == GNUNET_YES)
2417   {
2418     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2419     setup_broadcast (p, &serverAddrv6, &serverAddrv4);
2420   }
2421
2422   GNUNET_free_non_null (bind4_address);
2423   GNUNET_free_non_null (bind6_address);
2424   return api;
2425 }
2426
2427
2428 static int
2429 heap_cleanup_iterator (void *cls,
2430                        struct GNUNET_CONTAINER_HeapNode *
2431                        node, void *element,
2432                        GNUNET_CONTAINER_HeapCostType
2433                        cost)
2434 {
2435   struct DefragContext * d_ctx = element;
2436
2437   GNUNET_CONTAINER_heap_remove_node (node);
2438   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2439   GNUNET_free (d_ctx);
2440
2441   return GNUNET_YES;
2442 }
2443
2444
2445 /**
2446  * The exported method. Makes the core api available via a global and
2447  * returns the udp transport API.
2448  *
2449  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2450  * @return NULL
2451  */
2452 void *
2453 libgnunet_plugin_transport_udp_done (void *cls)
2454 {
2455   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2456   struct Plugin *plugin = api->cls;
2457
2458   if (NULL == plugin)
2459   {
2460     GNUNET_free (api);
2461     return NULL;
2462   }
2463
2464   stop_broadcast (plugin);
2465
2466   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2467   {
2468     GNUNET_SCHEDULER_cancel (plugin->select_task);
2469     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2470   }
2471   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2472   {
2473     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2474     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2475   }
2476
2477   /* Closing sockets */
2478   if (plugin->sockv4 != NULL)
2479   {
2480     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2481     plugin->sockv4 = NULL;
2482   }
2483   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2484   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2485
2486   if (plugin->sockv6 != NULL)
2487   {
2488     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2489     plugin->sockv6 = NULL;
2490
2491     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2492     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2493   }
2494
2495   GNUNET_NAT_unregister (plugin->nat);
2496
2497   if (plugin->defrag_ctxs != NULL)
2498   {
2499     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2500         heap_cleanup_iterator, NULL);
2501     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2502     plugin->defrag_ctxs = NULL;
2503   }
2504   if (plugin->mst != NULL)
2505   {
2506     GNUNET_SERVER_mst_destroy(plugin->mst);
2507     plugin->mst = NULL;
2508   }
2509
2510   /* Clean up leftover messages */
2511   struct UDPMessageWrapper * udpw;
2512   udpw = plugin->ipv4_queue_head;
2513   while (udpw != NULL)
2514   {
2515     struct UDPMessageWrapper *tmp = udpw->next;
2516     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
2517     call_continuation(udpw, GNUNET_SYSERR);
2518     GNUNET_free (udpw);
2519     udpw = tmp;
2520   }
2521   udpw = plugin->ipv6_queue_head;
2522   while (udpw != NULL)
2523   {
2524     struct UDPMessageWrapper *tmp = udpw->next;
2525     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
2526     call_continuation(udpw, GNUNET_SYSERR);
2527     GNUNET_free (udpw);
2528     udpw = tmp;
2529   }
2530
2531   /* Clean up sessions */
2532   LOG (GNUNET_ERROR_TYPE_DEBUG,
2533        "Cleaning up sessions\n");
2534   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2535   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2536
2537   plugin->nat = NULL;
2538   GNUNET_free (plugin);
2539   GNUNET_free (api);
2540   return NULL;
2541 }
2542
2543
2544 /* end of plugin_transport_udp.c */