-fixig #2377
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 struct Session
90 {
91   /**
92    * Which peer is this session for?
93    */
94   struct GNUNET_PeerIdentity target;
95
96   struct FragmentationContext * frag_ctx;
97
98   /**
99    * Address of the other peer
100    */
101   const struct sockaddr *sock_addr;
102
103   /**
104    * Desired delay for next sending we send to other peer
105    */
106   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
107
108   /**
109    * Desired delay for next sending we received from other peer
110    */
111   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
112
113   /**
114    * Session timeout task
115    */
116   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
117
118   /**
119    * expected delay for ACKs
120    */
121   struct GNUNET_TIME_Relative last_expected_delay;
122
123   struct GNUNET_ATS_Information ats;
124
125   size_t addrlen;
126
127
128   unsigned int rc;
129
130   int in_destroy;
131 };
132
133
134 struct SessionCompareContext
135 {
136   struct Session *res;
137   const struct GNUNET_HELLO_Address *addr;
138 };
139
140
141 /**
142  * Closure for 'process_inbound_tokenized_messages'
143  */
144 struct SourceInformation
145 {
146   /**
147    * Sender identity.
148    */
149   struct GNUNET_PeerIdentity sender;
150
151   /**
152    * Source address.
153    */
154   const void *arg;
155
156   struct Session *session;
157   /**
158    * Number of bytes in source address.
159    */
160   size_t args;
161
162 };
163
164
165 /**
166  * Closure for 'find_receive_context'.
167  */
168 struct FindReceiveContext
169 {
170   /**
171    * Where to store the result.
172    */
173   struct DefragContext *rc;
174
175   /**
176    * Address to find.
177    */
178   const struct sockaddr *addr;
179
180   struct Session *session;
181
182   /**
183    * Number of bytes in 'addr'.
184    */
185   socklen_t addr_len;
186
187 };
188
189
190
191 /**
192  * Data structure to track defragmentation contexts based
193  * on the source of the UDP traffic.
194  */
195 struct DefragContext
196 {
197
198   /**
199    * Defragmentation context.
200    */
201   struct GNUNET_DEFRAGMENT_Context *defrag;
202
203   /**
204    * Source address this receive context is for (allocated at the
205    * end of the struct).
206    */
207   const struct sockaddr *src_addr;
208
209   /**
210    * Reference to master plugin struct.
211    */
212   struct Plugin *plugin;
213
214   /**
215    * Node in the defrag heap.
216    */
217   struct GNUNET_CONTAINER_HeapNode *hnode;
218
219   /**
220    * Length of 'src_addr'
221    */
222   size_t addr_len;
223 };
224
225
226
227 /**
228  * Closure for 'process_inbound_tokenized_messages'
229  */
230 struct FragmentationContext
231 {
232   struct FragmentationContext * next;
233   struct FragmentationContext * prev;
234
235   struct Plugin * plugin;
236   struct GNUNET_FRAGMENT_Context * frag;
237   struct Session * session;
238
239   /**
240    * Function to call upon completion of the transmission.
241    */
242   GNUNET_TRANSPORT_TransmitContinuation cont;
243
244   /**
245    * Closure for 'cont'.
246    */
247   void *cont_cls;
248
249   struct GNUNET_TIME_Absolute timeout;
250
251   size_t bytes_to_send;
252 };
253
254
255 struct UDPMessageWrapper
256 {
257   struct Session *session;
258   struct UDPMessageWrapper *prev;
259   struct UDPMessageWrapper *next;
260   char *udp;
261
262   /**
263    * Function to call upon completion of the transmission.
264    */
265   GNUNET_TRANSPORT_TransmitContinuation cont;
266
267   /**
268    * Closure for 'cont'.
269    */
270   void *cont_cls;
271
272   struct FragmentationContext *frag_ctx;
273
274   size_t msg_size;
275
276   struct GNUNET_TIME_Absolute timeout;
277 };
278
279
280 /**
281  * UDP ACK Message-Packet header (after defragmentation).
282  */
283 struct UDP_ACK_Message
284 {
285   /**
286    * Message header.
287    */
288   struct GNUNET_MessageHeader header;
289
290   /**
291    * Desired delay for flow control
292    */
293   uint32_t delay;
294
295   /**
296    * What is the identity of the sender
297    */
298   struct GNUNET_PeerIdentity sender;
299
300 };
301
302 /**
303  * Encapsulation of all of the state of the plugin.
304  */
305 struct Plugin * plugin;
306
307
308 /**
309  * We have been notified that our readset has something to read.  We don't
310  * know which socket needs to be read, so we have to check each one
311  * Then reschedule this function to be called again once more is available.
312  *
313  * @param cls the plugin handle
314  * @param tc the scheduling context (for rescheduling this function again)
315  */
316 static void
317 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
318
319
320 /**
321  * We have been notified that our readset has something to read.  We don't
322  * know which socket needs to be read, so we have to check each one
323  * Then reschedule this function to be called again once more is available.
324  *
325  * @param cls the plugin handle
326  * @param tc the scheduling context (for rescheduling this function again)
327  */
328 static void
329 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
330
331
332 /**
333  * Start session timeout
334  */
335 static void
336 start_session_timeout (struct Session *s);
337
338 /**
339  * Increment session timeout due to activity
340  */
341 static void
342 reschedule_session_timeout (struct Session *s);
343
344 /**
345  * Cancel timeout
346  */
347 static void
348 stop_session_timeout (struct Session *s);
349
350
351
352 /**
353  * Function called for a quick conversion of the binary address to
354  * a numeric address.  Note that the caller must not free the
355  * address and that the next call to this function is allowed
356  * to override the address again.
357  *
358  * @param cls closure
359  * @param addr binary address
360  * @param addrlen length of the address
361  * @return string representing the same address
362  */
363 const char *
364 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
365 {
366   static char rbuf[INET6_ADDRSTRLEN + 10];
367   char buf[INET6_ADDRSTRLEN];
368   const void *sb;
369   struct in_addr a4;
370   struct in6_addr a6;
371   const struct IPv4UdpAddress *t4;
372   const struct IPv6UdpAddress *t6;
373   int af;
374   uint16_t port;
375
376   if (addrlen == sizeof (struct IPv6UdpAddress))
377   {
378     t6 = addr;
379     af = AF_INET6;
380     port = ntohs (t6->u6_port);
381     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
382     sb = &a6;
383   }
384   else if (addrlen == sizeof (struct IPv4UdpAddress))
385   {
386     t4 = addr;
387     af = AF_INET;
388     port = ntohs (t4->u4_port);
389     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
390     sb = &a4;
391   }
392   else
393   {
394     GNUNET_break_op (0);
395     return NULL;
396   }
397   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
398   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
399                    buf, port);
400   return rbuf;
401 }
402
403
404 /**
405  * Function called to convert a string address to
406  * a binary address.
407  *
408  * @param cls closure ('struct Plugin*')
409  * @param addr string address
410  * @param addrlen length of the address
411  * @param buf location to store the buffer
412  * @param added location to store the number of bytes in the buffer.
413  *        If the function returns GNUNET_SYSERR, its contents are undefined.
414  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
415  */
416 static int
417 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
418     void **buf, size_t *added)
419 {
420   struct sockaddr_storage socket_address;
421   
422   if ((NULL == addr) || (0 == addrlen))
423   {
424     GNUNET_break (0);
425     return GNUNET_SYSERR;
426   }
427
428   if ('\0' != addr[addrlen - 1])
429   {
430     return GNUNET_SYSERR;
431   }
432
433   if (strlen (addr) != addrlen - 1)
434   {
435     return GNUNET_SYSERR;
436   }
437
438   if (GNUNET_OK != GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
439                                                  &socket_address))
440   {
441     return GNUNET_SYSERR;
442   }
443
444   switch (socket_address.ss_family)
445   {
446   case AF_INET:
447     {
448       struct IPv4UdpAddress *u4;
449       struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
450       u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
451       u4->ipv4_addr = in4->sin_addr.s_addr;
452       u4->u4_port = in4->sin_port;
453       *buf = u4;
454       *added = sizeof (struct IPv4UdpAddress);
455       return GNUNET_OK;
456     }
457   case AF_INET6:
458     {
459       struct IPv6UdpAddress *u6;
460       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
461       u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
462       u6->ipv6_addr = in6->sin6_addr;
463       u6->u6_port = in6->sin6_port;
464       *buf = u6;
465       *added = sizeof (struct IPv6UdpAddress);
466       return GNUNET_OK;
467     }
468   default:
469     GNUNET_break (0);
470     return GNUNET_SYSERR;
471   }
472 }
473
474
475 /**
476  * Append our port and forward the result.
477  *
478  * @param cls a 'struct PrettyPrinterContext'
479  * @param hostname result from DNS resolver
480  */
481 static void
482 append_port (void *cls, const char *hostname)
483 {
484   struct PrettyPrinterContext *ppc = cls;
485   char *ret;
486
487   if (hostname == NULL)
488   {
489     ppc->asc (ppc->asc_cls, NULL);
490     GNUNET_free (ppc);
491     return;
492   }
493   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
494   ppc->asc (ppc->asc_cls, ret);
495   GNUNET_free (ret);
496 }
497
498
499 /**
500  * Convert the transports address to a nice, human-readable
501  * format.
502  *
503  * @param cls closure
504  * @param type name of the transport that generated the address
505  * @param addr one of the addresses of the host, NULL for the last address
506  *        the specific address format depends on the transport
507  * @param addrlen length of the address
508  * @param numeric should (IP) addresses be displayed in numeric form?
509  * @param timeout after how long should we give up?
510  * @param asc function to call on each string
511  * @param asc_cls closure for asc
512  */
513 static void
514 udp_plugin_address_pretty_printer (void *cls, const char *type,
515                                    const void *addr, size_t addrlen,
516                                    int numeric,
517                                    struct GNUNET_TIME_Relative timeout,
518                                    GNUNET_TRANSPORT_AddressStringCallback asc,
519                                    void *asc_cls)
520 {
521   struct PrettyPrinterContext *ppc;
522   const void *sb;
523   size_t sbs;
524   struct sockaddr_in a4;
525   struct sockaddr_in6 a6;
526   const struct IPv4UdpAddress *u4;
527   const struct IPv6UdpAddress *u6;
528   uint16_t port;
529
530   if (addrlen == sizeof (struct IPv6UdpAddress))
531   {
532     u6 = addr;
533     memset (&a6, 0, sizeof (a6));
534     a6.sin6_family = AF_INET6;
535 #if HAVE_SOCKADDR_IN_SIN_LEN
536     a6.sin6_len = sizeof (a6);
537 #endif
538     a6.sin6_port = u6->u6_port;
539     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
540     port = ntohs (u6->u6_port);
541     sb = &a6;
542     sbs = sizeof (a6);
543   }
544   else if (addrlen == sizeof (struct IPv4UdpAddress))
545   {
546     u4 = addr;
547     memset (&a4, 0, sizeof (a4));
548     a4.sin_family = AF_INET;
549 #if HAVE_SOCKADDR_IN_SIN_LEN
550     a4.sin_len = sizeof (a4);
551 #endif
552     a4.sin_port = u4->u4_port;
553     a4.sin_addr.s_addr = u4->ipv4_addr;
554     port = ntohs (u4->u4_port);
555     sb = &a4;
556     sbs = sizeof (a4);
557   }
558   else if (0 == addrlen)
559   {
560     asc (asc_cls, "<inbound connection>");
561     asc (asc_cls, NULL);
562     return;
563   }
564   else
565   {
566     /* invalid address */
567     GNUNET_break_op (0);
568     asc (asc_cls, NULL);
569     return;
570   }
571   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
572   ppc->asc = asc;
573   ppc->asc_cls = asc_cls;
574   ppc->port = port;
575   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
576 }
577
578
579 static void
580 call_continuation (struct UDPMessageWrapper *udpw, int result)
581 {
582   LOG (GNUNET_ERROR_TYPE_DEBUG,
583       "Calling continuation for %u byte message to `%s' with result %s\n",
584       udpw->msg_size, GNUNET_i2s (&udpw->session->target),
585       (GNUNET_OK == result) ? "OK" : "SYSERR");
586   if (NULL != udpw->cont)
587   {
588     udpw->cont (udpw->cont_cls, &udpw->session->target,result);
589   }
590
591 }
592
593
594 /**
595  * Check if the given port is plausible (must be either our listen
596  * port or our advertised port).  If it is neither, we return
597  * GNUNET_SYSERR.
598  *
599  * @param plugin global variables
600  * @param in_port port number to check
601  * @return GNUNET_OK if port is either open_port or adv_port
602  */
603 static int
604 check_port (struct Plugin *plugin, uint16_t in_port)
605 {
606   if ((in_port == plugin->port) || (in_port == plugin->aport))
607     return GNUNET_OK;
608   return GNUNET_SYSERR;
609 }
610
611
612 /**
613  * Function that will be called to check if a binary address for this
614  * plugin is well-formed and corresponds to an address for THIS peer
615  * (as per our configuration).  Naturally, if absolutely necessary,
616  * plugins can be a bit conservative in their answer, but in general
617  * plugins should make sure that the address does not redirect
618  * traffic to a 3rd party that might try to man-in-the-middle our
619  * traffic.
620  *
621  * @param cls closure, should be our handle to the Plugin
622  * @param addr pointer to the address
623  * @param addrlen length of addr
624  * @return GNUNET_OK if this is a plausible address for this peer
625  *         and transport, GNUNET_SYSERR if not
626  *
627  */
628 static int
629 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
630 {
631   struct Plugin *plugin = cls;
632   struct IPv4UdpAddress *v4;
633   struct IPv6UdpAddress *v6;
634
635   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
636       (addrlen != sizeof (struct IPv6UdpAddress)))
637   {
638     GNUNET_break_op (0);
639     return GNUNET_SYSERR;
640   }
641   if (addrlen == sizeof (struct IPv4UdpAddress))
642   {
643     v4 = (struct IPv4UdpAddress *) addr;
644     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
645       return GNUNET_SYSERR;
646     if (GNUNET_OK !=
647         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
648                                  sizeof (struct in_addr)))
649       return GNUNET_SYSERR;
650   }
651   else
652   {
653     v6 = (struct IPv6UdpAddress *) addr;
654     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
655     {
656       GNUNET_break_op (0);
657       return GNUNET_SYSERR;
658     }
659     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
660       return GNUNET_SYSERR;
661     if (GNUNET_OK !=
662         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
663                                  sizeof (struct in6_addr)))
664       return GNUNET_SYSERR;
665   }
666   return GNUNET_OK;
667 }
668
669
670 /**
671  * Task to free resources associated with a session.
672  *
673  * @param s session to free
674  */
675 static void
676 free_session (struct Session *s)
677 {
678   if (s->frag_ctx != NULL)
679   {
680     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
681     GNUNET_free (s->frag_ctx);
682     s->frag_ctx = NULL;
683   }
684   GNUNET_free (s);
685 }
686
687
688 /**
689  * Functions with this signature are called whenever we need
690  * to close a session due to a disconnect or failure to
691  * establish a connection.
692  *
693  * @param session session to close down
694  */
695 static void
696 disconnect_session (struct Session *s)
697 {
698   struct UDPMessageWrapper *udpw;
699   struct UDPMessageWrapper *next;
700
701   GNUNET_assert (GNUNET_YES != s->in_destroy);
702   LOG (GNUNET_ERROR_TYPE_DEBUG,
703        "Session %p to peer `%s' address ended \n",
704          s,
705          GNUNET_i2s (&s->target),
706          GNUNET_a2s (s->sock_addr, s->addrlen));
707   stop_session_timeout(s);
708   next = plugin->ipv4_queue_head;
709   while (NULL != (udpw = next))
710   {
711     next = udpw->next;
712     if (udpw->session == s)
713     {
714       GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
715       call_continuation(udpw, GNUNET_SYSERR);
716       GNUNET_free (udpw);
717     }
718   }
719   next = plugin->ipv6_queue_head;
720   while (NULL != (udpw = next))
721   {
722     next = udpw->next;
723     if (udpw->session == s)
724     {
725       GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
726       call_continuation(udpw, GNUNET_SYSERR);
727       GNUNET_free (udpw);
728     }
729     udpw = next;
730   }
731   plugin->env->session_end (plugin->env->cls, &s->target, s);
732
733   if (NULL != s->frag_ctx)
734   {
735     if (NULL != s->frag_ctx->cont)
736     {
737       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR);
738       LOG (GNUNET_ERROR_TYPE_DEBUG,
739           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
740           GNUNET_i2s (&s->target));
741     }
742   }
743
744   GNUNET_assert (GNUNET_YES ==
745                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
746                                                        &s->target.hashPubKey,
747                                                        s));
748   GNUNET_STATISTICS_set(plugin->env->stats,
749                         "# UDP sessions active",
750                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
751                         GNUNET_NO);
752   if (s->rc > 0)
753     s->in_destroy = GNUNET_YES;
754   else
755     free_session (s);
756 }
757
758 /**
759  * Destroy a session, plugin is being unloaded.
760  *
761  * @param cls unused
762  * @param key hash of public key of target peer
763  * @param value a 'struct PeerSession*' to clean up
764  * @return GNUNET_OK (continue to iterate)
765  */
766 static int
767 disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
768 {
769   disconnect_session(value);
770   return GNUNET_OK;
771 }
772
773
774 /**
775  * Disconnect from a remote node.  Clean up session if we have one for this peer
776  *
777  * @param cls closure for this call (should be handle to Plugin)
778  * @param target the peeridentity of the peer to disconnect
779  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
780  */
781 static void
782 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
783 {
784   struct Plugin *plugin = cls;
785   GNUNET_assert (plugin != NULL);
786
787   GNUNET_assert (target != NULL);
788   LOG (GNUNET_ERROR_TYPE_DEBUG,
789        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
790   /* Clean up sessions */
791   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
792 }
793
794
795 static struct Session *
796 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
797                 const void *addr, size_t addrlen,
798                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
799 {
800   struct Session *s;
801   const struct IPv4UdpAddress *t4;
802   const struct IPv6UdpAddress *t6;
803   struct sockaddr_in *v4;
804   struct sockaddr_in6 *v6;
805   size_t len;
806
807   switch (addrlen)
808   {
809   case sizeof (struct IPv4UdpAddress):
810     if (NULL == plugin->sockv4)
811     {
812       return NULL;
813     }
814     t4 = addr;
815     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
816     len = sizeof (struct sockaddr_in);
817     v4 = (struct sockaddr_in *) &s[1];
818     v4->sin_family = AF_INET;
819 #if HAVE_SOCKADDR_IN_SIN_LEN
820     v4->sin_len = sizeof (struct sockaddr_in);
821 #endif
822     v4->sin_port = t4->u4_port;
823     v4->sin_addr.s_addr = t4->ipv4_addr;
824     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
825     break;
826   case sizeof (struct IPv6UdpAddress):
827     if (NULL == plugin->sockv6)
828     {
829       return NULL;
830     }
831     t6 = addr;
832     s =
833         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
834     len = sizeof (struct sockaddr_in6);
835     v6 = (struct sockaddr_in6 *) &s[1];
836     v6->sin6_family = AF_INET6;
837 #if HAVE_SOCKADDR_IN_SIN_LEN
838     v6->sin6_len = sizeof (struct sockaddr_in6);
839 #endif
840     v6->sin6_port = t6->u6_port;
841     v6->sin6_addr = t6->ipv6_addr;
842     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
843     break;
844   default:
845     /* Must have a valid address to send to */
846     GNUNET_break_op (0);
847     return NULL;
848   }
849
850   s->addrlen = len;
851   s->target = *target;
852   s->sock_addr = (const struct sockaddr *) &s[1];
853   s->flow_delay_for_other_peer = GNUNET_TIME_relative_get_zero();
854   s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
855   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
856
857   start_session_timeout(s);
858
859   return s;
860 }
861
862
863 static int
864 session_cmp_it (void *cls,
865                 const GNUNET_HashCode * key,
866                 void *value)
867 {
868   struct SessionCompareContext * cctx = cls;
869   const struct GNUNET_HELLO_Address *address = cctx->addr;
870   struct Session *s = value;
871
872   socklen_t s_addrlen = s->addrlen;
873
874   LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
875       udp_address_to_string (NULL, (void *) address->address, address->address_length),
876       GNUNET_a2s (s->sock_addr, s->addrlen));
877   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
878       (s_addrlen == sizeof (struct sockaddr_in)))
879   {
880     struct IPv4UdpAddress * u4 = NULL;
881     u4 = (struct IPv4UdpAddress *) address->address;
882     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
883     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
884         (u4->u4_port == s4->sin_port))
885     {
886       cctx->res = s;
887       return GNUNET_NO;
888     }
889
890   }
891   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
892       (s_addrlen == sizeof (struct sockaddr_in6)))
893   {
894     struct IPv6UdpAddress * u6 = NULL;
895     u6 = (struct IPv6UdpAddress *) address->address;
896     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
897     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
898         (u6->u6_port == s6->sin6_port))
899     {
900       cctx->res = s;
901       return GNUNET_NO;
902     }
903   }
904   return GNUNET_YES;
905 }
906
907
908 /**
909  * Creates a new outbound session the transport service will use to send data to the
910  * peer
911  *
912  * @param cls the plugin
913  * @param address the address
914  * @return the session or NULL of max connections exceeded
915  */
916 static struct Session *
917 udp_plugin_get_session (void *cls,
918                   const struct GNUNET_HELLO_Address *address)
919 {
920   struct Session * s = NULL;
921   struct Plugin * plugin = cls;
922   struct IPv6UdpAddress * udp_a6;
923   struct IPv4UdpAddress * udp_a4;
924
925   GNUNET_assert (plugin != NULL);
926   GNUNET_assert (address != NULL);
927
928
929   if ((address->address == NULL) ||
930       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
931       (address->address_length != sizeof (struct IPv6UdpAddress))))
932   {
933     GNUNET_break (0);
934     return NULL;
935   }
936
937   if (address->address_length == sizeof (struct IPv4UdpAddress))
938   {
939     if (plugin->sockv4 == NULL)
940       return NULL;
941     udp_a4 = (struct IPv4UdpAddress *) address->address;
942     if (udp_a4->u4_port == 0)
943       return NULL;
944   }
945
946   if (address->address_length == sizeof (struct IPv6UdpAddress))
947   {
948     if (plugin->sockv6 == NULL)
949       return NULL;
950     udp_a6 = (struct IPv6UdpAddress *) address->address;
951     if (udp_a6->u6_port == 0)
952       return NULL;
953   }
954
955   /* check if session already exists */
956   struct SessionCompareContext cctx;
957   cctx.addr = address;
958   cctx.res = NULL;
959   LOG (GNUNET_ERROR_TYPE_DEBUG,
960        "Looking for existing session for peer `%s' `%s' \n", 
961        GNUNET_i2s (&address->peer), 
962        udp_address_to_string(NULL, address->address, address->address_length));
963   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
964   if (cctx.res != NULL)
965   {
966     LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
967     return cctx.res;
968   }
969
970   /* otherwise create new */
971   s = create_session (plugin,
972       &address->peer,
973       address->address,
974       address->address_length,
975       NULL, NULL);
976   LOG (GNUNET_ERROR_TYPE_DEBUG,
977        "Creating new session %p for peer `%s' address `%s'\n",
978        s,
979        GNUNET_i2s(&address->peer),
980        udp_address_to_string(NULL,address->address,address->address_length));
981   GNUNET_assert (GNUNET_OK ==
982                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
983                                                     &s->target.hashPubKey,
984                                                     s,
985                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
986
987   GNUNET_STATISTICS_set(plugin->env->stats,
988                         "# UDP sessions active",
989                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
990                         GNUNET_NO);
991
992   return s;
993 }
994
995
996 static void 
997 enqueue (struct Plugin *plugin, struct UDPMessageWrapper * udpw)
998 {
999
1000   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1001     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1002   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1003     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1004 }
1005
1006
1007 /**
1008  * Fragment message was transmitted via UDP, let fragmentation know
1009  * to send the next fragment now.
1010  *
1011  * @param cls the 'struct UDPMessageWrapper' of the fragment
1012  * @param target destination peer (ignored)
1013  * @param result GNUNET_OK on success (ignored)
1014  */
1015 static void
1016 send_next_fragment (void *cls,
1017                     const struct GNUNET_PeerIdentity *target,
1018                     int result)
1019 {
1020   struct UDPMessageWrapper *udpw = cls;
1021
1022   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
1023 }
1024
1025
1026 /**
1027  * Function that is called with messages created by the fragmentation
1028  * module.  In the case of the 'proc' callback of the
1029  * GNUNET_FRAGMENT_context_create function, this function must
1030  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
1031  *
1032  * @param cls closure, the 'struct FragmentationContext'
1033  * @param msg the message that was created
1034  */
1035 static void
1036 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1037 {
1038   struct FragmentationContext *frag_ctx = cls;
1039   struct Plugin *plugin = frag_ctx->plugin;
1040   struct UDPMessageWrapper * udpw;
1041   struct Session *s;
1042   size_t msg_len = ntohs (msg->size);
1043
1044   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1045        "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
1046   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
1047   udpw->session = frag_ctx->session;
1048   s = udpw->session;
1049   udpw->udp = (char *) &udpw[1];
1050
1051   udpw->msg_size = msg_len;
1052   udpw->cont = &send_next_fragment;
1053   udpw->cont_cls = udpw;
1054   udpw->timeout = frag_ctx->timeout;
1055   udpw->frag_ctx = frag_ctx;
1056   memcpy (udpw->udp, msg, msg_len);
1057   enqueue (plugin, udpw);
1058
1059   if (s->addrlen == sizeof (struct sockaddr_in))
1060   {
1061     if (plugin->with_v4_ws == GNUNET_NO)
1062     {
1063       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1064         GNUNET_SCHEDULER_cancel(plugin->select_task);
1065
1066       plugin->select_task =
1067           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1068                                        GNUNET_TIME_UNIT_FOREVER_REL,
1069                                        plugin->rs_v4,
1070                                        plugin->ws_v4,
1071                                        &udp_plugin_select, plugin);
1072       plugin->with_v4_ws = GNUNET_YES;
1073     }
1074   }
1075   else if (s->addrlen == sizeof (struct sockaddr_in6))
1076   {
1077     if (plugin->with_v6_ws == GNUNET_NO)
1078     {
1079       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1080         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1081
1082       plugin->select_task_v6 =
1083           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1084                                        GNUNET_TIME_UNIT_FOREVER_REL,
1085                                        plugin->rs_v6,
1086                                        plugin->ws_v6,
1087                                        &udp_plugin_select_v6, plugin);
1088       plugin->with_v6_ws = GNUNET_YES;
1089     }
1090   }
1091 }
1092
1093
1094 /**
1095  * Function that can be used by the transport service to transmit
1096  * a message using the plugin.   Note that in the case of a
1097  * peer disconnecting, the continuation MUST be called
1098  * prior to the disconnect notification itself.  This function
1099  * will be called with this peer's HELLO message to initiate
1100  * a fresh connection to another peer.
1101  *
1102  * @param cls closure
1103  * @param s which session must be used
1104  * @param msgbuf the message to transmit
1105  * @param msgbuf_size number of bytes in 'msgbuf'
1106  * @param priority how important is the message (most plugins will
1107  *                 ignore message priority and just FIFO)
1108  * @param to how long to wait at most for the transmission (does not
1109  *                require plugins to discard the message after the timeout,
1110  *                just advisory for the desired delay; most plugins will ignore
1111  *                this as well)
1112  * @param cont continuation to call once the message has
1113  *        been transmitted (or if the transport is ready
1114  *        for the next transmission call; or if the
1115  *        peer disconnected...); can be NULL
1116  * @param cont_cls closure for cont
1117  * @return number of bytes used (on the physical network, with overheads);
1118  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1119  *         and does NOT mean that the message was not transmitted (DV)
1120  */
1121 static ssize_t
1122 udp_plugin_send (void *cls,
1123                   struct Session *s,
1124                   const char *msgbuf, size_t msgbuf_size,
1125                   unsigned int priority,
1126                   struct GNUNET_TIME_Relative to,
1127                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1128 {
1129   struct Plugin *plugin = cls;
1130   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
1131   struct UDPMessageWrapper * udpw;
1132   struct UDPMessage *udp;
1133   char mbuf[mlen];
1134   GNUNET_assert (plugin != NULL);
1135   GNUNET_assert (s != NULL);
1136
1137   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1138     return GNUNET_SYSERR;
1139   if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1140     return GNUNET_SYSERR;
1141   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1142   {
1143     GNUNET_break (0);
1144     return GNUNET_SYSERR;
1145   }
1146   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1147   {
1148     GNUNET_break (0);
1149     return GNUNET_SYSERR;
1150   }
1151   LOG (GNUNET_ERROR_TYPE_DEBUG,
1152        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1153        mlen,
1154        GNUNET_i2s (&s->target),
1155        GNUNET_a2s(s->sock_addr, s->addrlen));
1156   
1157   /* Message */
1158   udp = (struct UDPMessage *) mbuf;
1159   udp->header.size = htons (mlen);
1160   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1161   udp->reserved = htonl (0);
1162   udp->sender = *plugin->env->my_identity;
1163
1164   reschedule_session_timeout(s);
1165   if (mlen <= UDP_MTU)
1166   {
1167     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
1168     udpw->session = s;
1169     udpw->udp = (char *) &udpw[1];
1170     udpw->msg_size = mlen;
1171     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1172     udpw->cont = cont;
1173     udpw->cont_cls = cont_cls;
1174     udpw->frag_ctx = NULL;
1175     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
1176     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1177
1178     enqueue (plugin, udpw);
1179   }
1180   else
1181   {
1182     LOG (GNUNET_ERROR_TYPE_DEBUG,
1183          "UDP has to fragment message \n");
1184     if  (s->frag_ctx != NULL)
1185       return GNUNET_SYSERR;
1186     memcpy (&udp[1], msgbuf, msgbuf_size);
1187     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
1188
1189     frag_ctx->plugin = plugin;
1190     frag_ctx->session = s;
1191     frag_ctx->cont = cont;
1192     frag_ctx->cont_cls = cont_cls;
1193     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1194     frag_ctx->bytes_to_send = mlen;
1195     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1196               UDP_MTU,
1197               &plugin->tracker,
1198               s->last_expected_delay,
1199               &udp->header,
1200               &enqueue_fragment,
1201               frag_ctx);
1202
1203     s->frag_ctx = frag_ctx;
1204   }
1205
1206   if (s->addrlen == sizeof (struct sockaddr_in))
1207   {
1208     if (plugin->with_v4_ws == GNUNET_NO)
1209     {
1210       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1211         GNUNET_SCHEDULER_cancel(plugin->select_task);
1212
1213       plugin->select_task =
1214           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1215                                        GNUNET_TIME_UNIT_FOREVER_REL,
1216                                        plugin->rs_v4,
1217                                        plugin->ws_v4,
1218                                        &udp_plugin_select, plugin);
1219       plugin->with_v4_ws = GNUNET_YES;
1220     }
1221   }
1222   else if (s->addrlen == sizeof (struct sockaddr_in6))
1223   {
1224     if (plugin->with_v6_ws == GNUNET_NO)
1225     {
1226       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1227         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1228
1229       plugin->select_task_v6 =
1230         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1231                                      GNUNET_TIME_UNIT_FOREVER_REL,
1232                                      plugin->rs_v6,
1233                                      plugin->ws_v6,
1234                                      &udp_plugin_select_v6, plugin);
1235       plugin->with_v6_ws = GNUNET_YES;
1236     }
1237   }
1238
1239   return mlen;
1240 }
1241
1242
1243 /**
1244  * Our external IP address/port mapping has changed.
1245  *
1246  * @param cls closure, the 'struct LocalAddrList'
1247  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1248  *     the previous (now invalid) one
1249  * @param addr either the previous or the new public IP address
1250  * @param addrlen actual lenght of the address
1251  */
1252 static void
1253 udp_nat_port_map_callback (void *cls, int add_remove,
1254                            const struct sockaddr *addr, socklen_t addrlen)
1255 {
1256   struct Plugin *plugin = cls;
1257   struct IPv4UdpAddress u4;
1258   struct IPv6UdpAddress u6;
1259   void *arg;
1260   size_t args;
1261
1262   /* convert 'addr' to our internal format */
1263   switch (addr->sa_family)
1264   {
1265   case AF_INET:
1266     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1267     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1268     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1269     arg = &u4;
1270     args = sizeof (u4);
1271     break;
1272   case AF_INET6:
1273     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1274     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1275             sizeof (struct in6_addr));
1276     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1277     arg = &u6;
1278     args = sizeof (u6);
1279     break;
1280   default:
1281     GNUNET_break (0);
1282     return;
1283   }
1284   /* modify our published address list */
1285   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args);
1286 }
1287
1288
1289
1290 /**
1291  * Message tokenizer has broken up an incomming message. Pass it on
1292  * to the service.
1293  *
1294  * @param cls the 'struct Plugin'
1295  * @param client the 'struct SourceInformation'
1296  * @param hdr the actual message
1297  */
1298 static int
1299 process_inbound_tokenized_messages (void *cls, void *client,
1300                                     const struct GNUNET_MessageHeader *hdr)
1301 {
1302   struct Plugin *plugin = cls;
1303   struct SourceInformation *si = client;
1304   struct GNUNET_ATS_Information ats[2];
1305   struct GNUNET_TIME_Relative delay;
1306
1307   GNUNET_assert (si->session != NULL);
1308   if (GNUNET_YES == si->session->in_destroy)
1309     return GNUNET_OK;
1310   /* setup ATS */
1311   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1312   ats[0].value = htonl (1);
1313   ats[1] = si->session->ats;
1314   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1315   delay = plugin->env->receive (plugin->env->cls,
1316                                 &si->sender,
1317                                 hdr,
1318                                 (const struct GNUNET_ATS_Information *) &ats, 2,
1319                                 NULL,
1320                                 si->arg,
1321                                 si->args);
1322   si->session->flow_delay_for_other_peer = delay;
1323   reschedule_session_timeout(si->session);
1324   return GNUNET_OK;
1325 }
1326
1327
1328 /**
1329  * We've received a UDP Message.  Process it (pass contents to main service).
1330  *
1331  * @param plugin plugin context
1332  * @param msg the message
1333  * @param sender_addr sender address
1334  * @param sender_addr_len number of bytes in sender_addr
1335  */
1336 static void
1337 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1338                      const struct sockaddr *sender_addr,
1339                      socklen_t sender_addr_len)
1340 {
1341   struct SourceInformation si;
1342   struct Session * s;
1343   struct IPv4UdpAddress u4;
1344   struct IPv6UdpAddress u6;
1345   const void *arg;
1346   size_t args;
1347
1348   if (0 != ntohl (msg->reserved))
1349   {
1350     GNUNET_break_op (0);
1351     return;
1352   }
1353   if (ntohs (msg->header.size) <
1354       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1355   {
1356     GNUNET_break_op (0);
1357     return;
1358   }
1359
1360   /* convert address */
1361   switch (sender_addr->sa_family)
1362   {
1363   case AF_INET:
1364     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1365     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1366     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1367     arg = &u4;
1368     args = sizeof (u4);
1369     break;
1370   case AF_INET6:
1371     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1372     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1373     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1374     arg = &u6;
1375     args = sizeof (u6);
1376     break;
1377   default:
1378     GNUNET_break (0);
1379     return;
1380   }
1381   LOG (GNUNET_ERROR_TYPE_DEBUG,
1382        "Received message with %u bytes from peer `%s' at `%s'\n",
1383        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1384        GNUNET_a2s (sender_addr, sender_addr_len));
1385
1386   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1387   s = udp_plugin_get_session(plugin, address);
1388   GNUNET_free (address);
1389
1390   /* iterate over all embedded messages */
1391   si.session = s;
1392   si.sender = msg->sender;
1393   si.arg = arg;
1394   si.args = args;
1395   s->rc++;
1396   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1397                              ntohs (msg->header.size) -
1398                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1399   s->rc--;
1400   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1401     free_session (s);
1402 }
1403
1404
1405 /**
1406  * Scan the heap for a receive context with the given address.
1407  *
1408  * @param cls the 'struct FindReceiveContext'
1409  * @param node internal node of the heap
1410  * @param element value stored at the node (a 'struct ReceiveContext')
1411  * @param cost cost associated with the node
1412  * @return GNUNET_YES if we should continue to iterate,
1413  *         GNUNET_NO if not.
1414  */
1415 static int
1416 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1417                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1418 {
1419   struct FindReceiveContext *frc = cls;
1420   struct DefragContext *e = element;
1421
1422   if ((frc->addr_len == e->addr_len) &&
1423       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1424   {
1425     frc->rc = e;
1426     return GNUNET_NO;
1427   }
1428   return GNUNET_YES;
1429 }
1430
1431
1432 /**
1433  * Process a defragmented message.
1434  *
1435  * @param cls the 'struct ReceiveContext'
1436  * @param msg the message
1437  */
1438 static void
1439 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1440 {
1441   struct DefragContext *rc = cls;
1442
1443   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1444   {
1445     GNUNET_break (0);
1446     return;
1447   }
1448   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1449   {
1450     GNUNET_break (0);
1451     return;
1452   }
1453   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1454                        rc->src_addr, rc->addr_len);
1455 }
1456
1457
1458 struct LookupContext
1459 {
1460   const struct sockaddr * addr;
1461
1462   struct Session *res;
1463
1464   size_t addrlen;
1465 };
1466
1467
1468 static int
1469 lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value)
1470 {
1471   struct LookupContext *l_ctx = cls;
1472   struct Session * s = value;
1473
1474   if ((s->addrlen == l_ctx->addrlen) &&
1475       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1476   {
1477     l_ctx->res = s;
1478     return GNUNET_NO;
1479   }
1480   return GNUNET_YES;
1481 }
1482
1483
1484 /**
1485  * Transmit an acknowledgement.
1486  *
1487  * @param cls the 'struct ReceiveContext'
1488  * @param id message ID (unused)
1489  * @param msg ack to transmit
1490  */
1491 static void
1492 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1493 {
1494   struct DefragContext *rc = cls;
1495   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1496   struct UDP_ACK_Message *udp_ack;
1497   uint32_t delay = 0;
1498   struct UDPMessageWrapper *udpw;
1499   struct Session *s;
1500
1501   struct LookupContext l_ctx;
1502   l_ctx.addr = rc->src_addr;
1503   l_ctx.addrlen = rc->addr_len;
1504   l_ctx.res = NULL;
1505   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1506       &lookup_session_by_addr_it,
1507       &l_ctx);
1508   s = l_ctx.res;
1509
1510   if (NULL == s)
1511     return;
1512
1513   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1514     delay = s->flow_delay_for_other_peer.rel_value;
1515
1516   LOG (GNUNET_ERROR_TYPE_DEBUG,
1517        "Sending ACK to `%s' including delay of %u ms\n",
1518        GNUNET_a2s (rc->src_addr,
1519                    (rc->src_addr->sa_family ==
1520                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1521                                                                      sockaddr_in6)),
1522        delay);
1523   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
1524   udpw->cont = NULL;
1525   udpw->cont_cls = NULL;
1526   udpw->frag_ctx = NULL;
1527   udpw->msg_size = msize;
1528   udpw->session = s;
1529   udpw->timeout = GNUNET_TIME_absolute_get_forever();
1530   udpw->udp = (char *)&udpw[1];
1531
1532   udp_ack = (struct UDP_ACK_Message *) udpw->udp;
1533   udp_ack->header.size = htons ((uint16_t) msize);
1534   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1535   udp_ack->delay = htonl (delay);
1536   udp_ack->sender = *rc->plugin->env->my_identity;
1537   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1538
1539   enqueue (rc->plugin, udpw);
1540 }
1541
1542
1543 static void 
1544 read_process_msg (struct Plugin *plugin,
1545                   const struct GNUNET_MessageHeader *msg,
1546                   const char *addr,
1547                   socklen_t fromlen)
1548 {
1549   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1550   {
1551     GNUNET_break_op (0);
1552     return;
1553   }
1554   process_udp_message (plugin, (const struct UDPMessage *) msg,
1555                        (const struct sockaddr *) addr, fromlen);
1556 }
1557
1558
1559 static void 
1560 read_process_ack (struct Plugin *plugin,
1561                   const struct GNUNET_MessageHeader *msg,
1562                   char *addr,
1563                   socklen_t fromlen)
1564 {
1565   const struct GNUNET_MessageHeader *ack;
1566   const struct UDP_ACK_Message *udp_ack;
1567   struct LookupContext l_ctx;
1568   struct Session *s;
1569   struct GNUNET_TIME_Relative flow_delay;
1570
1571   if (ntohs (msg->size) <
1572       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1573   {
1574     GNUNET_break_op (0);
1575     return;
1576   }
1577   udp_ack = (const struct UDP_ACK_Message *) msg;
1578   l_ctx.addr = (const struct sockaddr *) addr;
1579   l_ctx.addrlen = fromlen;
1580   l_ctx.res = NULL;
1581   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1582       &lookup_session_by_addr_it,
1583       &l_ctx);
1584   s = l_ctx.res;
1585
1586   if ((s == NULL) || (s->frag_ctx == NULL))
1587     return;
1588
1589   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1590   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1591        flow_delay.rel_value);
1592   s->flow_delay_from_other_peer =
1593       GNUNET_TIME_relative_to_absolute (flow_delay);
1594
1595   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1596   if (ntohs (ack->size) !=
1597       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1598   {
1599     GNUNET_break_op (0);
1600     return;
1601   }
1602
1603   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1604   {
1605     LOG (GNUNET_ERROR_TYPE_DEBUG,
1606          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1607          (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1608          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1609     return;
1610   }
1611
1612   LOG (GNUNET_ERROR_TYPE_DEBUG,
1613        "FULL MESSAGE ACKed\n",
1614        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1615        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1616   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1617
1618   struct UDPMessageWrapper * udpw;
1619   struct UDPMessageWrapper * tmp;
1620   if (s->addrlen == sizeof (struct sockaddr_in6))
1621   {
1622     udpw = plugin->ipv6_queue_head;
1623     while (NULL != udpw)
1624     {
1625       tmp = udpw->next;
1626       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1627       {
1628         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1629         GNUNET_free (udpw);
1630       }
1631       udpw = tmp;
1632     }
1633   }
1634   if (s->addrlen == sizeof (struct sockaddr_in))
1635   {
1636     udpw = plugin->ipv4_queue_head;
1637     while (udpw!= NULL)
1638     {
1639       tmp = udpw->next;
1640       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1641       {
1642         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1643         GNUNET_free (udpw);
1644       }
1645       udpw = tmp;
1646     }
1647   }
1648
1649   if (s->frag_ctx->cont != NULL)
1650   {
1651     LOG (GNUNET_ERROR_TYPE_DEBUG,
1652         "Calling continuation for fragmented message to `%s' with result %s\n",
1653         GNUNET_i2s (&s->target), "OK");
1654     s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
1655   }
1656
1657   GNUNET_free (s->frag_ctx);
1658   s->frag_ctx = NULL;
1659 }
1660
1661
1662 static void 
1663 read_process_fragment (struct Plugin *plugin,
1664                        const struct GNUNET_MessageHeader *msg,
1665                        char *addr,
1666                        socklen_t fromlen)
1667 {
1668   struct DefragContext *d_ctx;
1669   struct GNUNET_TIME_Absolute now;
1670   struct FindReceiveContext frc;
1671
1672   frc.rc = NULL;
1673   frc.addr = (const struct sockaddr *) addr;
1674   frc.addr_len = fromlen;
1675
1676   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1677        (unsigned int) ntohs (msg->size),
1678        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1679   /* Lookup existing receive context for this address */
1680   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1681                                  &find_receive_context,
1682                                  &frc);
1683   now = GNUNET_TIME_absolute_get ();
1684   d_ctx = frc.rc;
1685
1686   if (d_ctx == NULL)
1687   {
1688     /* Create a new defragmentation context */
1689     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1690     memcpy (&d_ctx[1], addr, fromlen);
1691     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1692     d_ctx->addr_len = fromlen;
1693     d_ctx->plugin = plugin;
1694     d_ctx->defrag =
1695         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1696                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1697                                           &fragment_msg_proc, &ack_proc);
1698     d_ctx->hnode =
1699         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1700                                       (GNUNET_CONTAINER_HeapCostType)
1701                                       now.abs_value);
1702     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1703          "Created new defragmentation context for %u-byte fragment from `%s'\n",
1704          (unsigned int) ntohs (msg->size),
1705          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1706   }
1707   else
1708   {
1709     LOG (GNUNET_ERROR_TYPE_DEBUG,
1710          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1711          (unsigned int) ntohs (msg->size),
1712          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1713   }
1714
1715   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1716   {
1717     /* keep this 'rc' from expiring */
1718     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1719                                        (GNUNET_CONTAINER_HeapCostType)
1720                                        now.abs_value);
1721   }
1722   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1723       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1724   {
1725     /* remove 'rc' that was inactive the longest */
1726     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1727     GNUNET_assert (NULL != d_ctx);
1728     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1729     GNUNET_free (d_ctx);
1730   }
1731 }
1732
1733
1734 /**
1735  * Read and process a message from the given socket.
1736  *
1737  * @param plugin the overall plugin
1738  * @param rsock socket to read from
1739  */
1740 static void
1741 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1742 {
1743   socklen_t fromlen;
1744   char addr[32];
1745   char buf[65536] GNUNET_ALIGN;
1746   ssize_t size;
1747   const struct GNUNET_MessageHeader *msg;
1748
1749   fromlen = sizeof (addr);
1750   memset (&addr, 0, sizeof (addr));
1751   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1752                                       (struct sockaddr *) &addr, &fromlen);
1753
1754   if (size < sizeof (struct GNUNET_MessageHeader))
1755   {
1756     GNUNET_break_op (0);
1757     return;
1758   }
1759   msg = (const struct GNUNET_MessageHeader *) buf;
1760
1761   LOG (GNUNET_ERROR_TYPE_DEBUG,
1762        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1763        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1764
1765   if (size != ntohs (msg->size))
1766   {
1767     GNUNET_break_op (0);
1768     return;
1769   }
1770
1771   switch (ntohs (msg->type))
1772   {
1773   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1774     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1775     return;
1776
1777   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1778     read_process_msg (plugin, msg, addr, fromlen);
1779     return;
1780
1781   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1782     read_process_ack (plugin, msg, addr, fromlen);
1783     return;
1784
1785   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1786     read_process_fragment (plugin, msg, addr, fromlen);
1787     return;
1788
1789   default:
1790     GNUNET_break_op (0);
1791     return;
1792   }
1793 }
1794
1795
1796 static size_t
1797 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
1798 {
1799   ssize_t sent;
1800   size_t slen;
1801   struct GNUNET_TIME_Absolute max;
1802   struct UDPMessageWrapper *udpw = NULL;
1803
1804   if (sock == plugin->sockv4)
1805   {
1806     udpw = plugin->ipv4_queue_head;
1807   }
1808   else if (sock == plugin->sockv6)
1809   {
1810     udpw = plugin->ipv6_queue_head;
1811   }
1812   else
1813   {
1814     GNUNET_break (0);
1815     return 0;
1816   }
1817
1818   const struct sockaddr * sa = udpw->session->sock_addr;
1819   slen = udpw->session->addrlen;
1820
1821   max = GNUNET_TIME_absolute_max(udpw->timeout, GNUNET_TIME_absolute_get());
1822
1823   while (udpw != NULL)
1824   {
1825     if (max.abs_value != udpw->timeout.abs_value)
1826     {
1827       /* Message timed out */
1828       call_continuation(udpw, GNUNET_SYSERR);
1829       if (udpw->frag_ctx != NULL)
1830       {
1831         LOG (GNUNET_ERROR_TYPE_DEBUG,
1832              "Fragmented message for peer `%s' with size %u timed out\n",
1833              GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
1834         udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
1835         GNUNET_free (udpw->frag_ctx);
1836         udpw->session->frag_ctx = NULL;
1837       }
1838       else
1839       {
1840         LOG (GNUNET_ERROR_TYPE_DEBUG, 
1841              "Message for peer `%s' with size %u timed out\n",
1842              GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1843       }
1844
1845       if (sock == plugin->sockv4)
1846       {
1847         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1848         GNUNET_free (udpw);
1849         udpw = plugin->ipv4_queue_head;
1850       }
1851       else if (sock == plugin->sockv6)
1852       {
1853         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1854         GNUNET_free (udpw);
1855         udpw = plugin->ipv6_queue_head;
1856       }
1857     }
1858     else
1859     {
1860       struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
1861       if (delta.rel_value == 0)
1862       {
1863         /* this message is not delayed */
1864         LOG (GNUNET_ERROR_TYPE_DEBUG, 
1865              "Message for peer `%s' (%u bytes) is not delayed \n",
1866              GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1867         break;
1868       }
1869       else
1870       {
1871         /* this message is delayed, try next */
1872         LOG (GNUNET_ERROR_TYPE_DEBUG,
1873              "Message for peer `%s' (%u bytes) is delayed for %llu \n",
1874              GNUNET_i2s(&udpw->session->target), udpw->msg_size,
1875              delta);
1876         udpw = udpw->next;
1877       }
1878     }
1879   }
1880
1881   if (udpw == NULL)
1882   {
1883     /* No message left */
1884     return 0;
1885   }
1886
1887   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->udp, udpw->msg_size, sa, slen);
1888
1889   if (GNUNET_SYSERR == sent)
1890   {
1891     const struct GNUNET_ATS_Information type = plugin->env->get_address_type
1892         (plugin->env->cls,sa, slen);
1893
1894     if ((GNUNET_ATS_NET_WAN == type.value) &&
1895         ((ENETUNREACH == errno) || (ENETDOWN == errno)))
1896     {
1897       /* "Network unreachable" or "Network down" */
1898       /*
1899        * This indicates that this system is IPv6 enabled, but does not
1900        * have a valid global IPv6 address assigned
1901        */
1902        LOG (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1903            _("UDP could not message to `%s': `%s'. "
1904              "Please check your network configuration and disable IPv6 if your "
1905              "connection does not have a global IPv6 address\n"),
1906            GNUNET_a2s (sa, slen),
1907            STRERROR (errno));
1908     }
1909     else
1910     {
1911       LOG (GNUNET_ERROR_TYPE_ERROR,
1912          "UDP could not transmit %u-byte message to `%s': `%s'\n",
1913          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen),
1914          STRERROR (errno));
1915     }
1916     call_continuation(udpw, GNUNET_SYSERR);
1917   }
1918   else
1919   {
1920     LOG (GNUNET_ERROR_TYPE_DEBUG,
1921          "UDP transmitted %u-byte message to `%s' (%d: %s)\n",
1922          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1923          (sent < 0) ? STRERROR (errno) : "ok");
1924     call_continuation(udpw, GNUNET_OK);
1925   }
1926
1927   if (sock == plugin->sockv4)
1928     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1929   else if (sock == plugin->sockv6)
1930     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1931   GNUNET_free (udpw);
1932   udpw = NULL;
1933
1934   return sent;
1935 }
1936
1937
1938 /**
1939  * We have been notified that our readset has something to read.  We don't
1940  * know which socket needs to be read, so we have to check each one
1941  * Then reschedule this function to be called again once more is available.
1942  *
1943  * @param cls the plugin handle
1944  * @param tc the scheduling context (for rescheduling this function again)
1945  */
1946 static void
1947 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1948 {
1949   struct Plugin *plugin = cls;
1950
1951   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
1952   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1953     return;
1954   plugin->with_v4_ws = GNUNET_NO;
1955
1956   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1957   {
1958     if ((NULL != plugin->sockv4) &&
1959       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
1960         udp_select_read (plugin, plugin->sockv4);
1961
1962   }
1963
1964   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1965   {
1966     if ((NULL != plugin->sockv4) && (plugin->ipv4_queue_head != NULL) &&
1967       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
1968       {
1969         udp_select_send (plugin, plugin->sockv4);
1970       }
1971   }
1972
1973   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1974     GNUNET_SCHEDULER_cancel (plugin->select_task);
1975   plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1976                                    GNUNET_TIME_UNIT_FOREVER_REL,
1977                                    plugin->rs_v4,
1978                                    (plugin->ipv4_queue_head != NULL) ? plugin->ws_v4 : NULL,
1979                                    &udp_plugin_select, plugin);
1980   if (plugin->ipv4_queue_head != NULL)
1981     plugin->with_v4_ws = GNUNET_YES;
1982   else
1983     plugin->with_v4_ws = GNUNET_NO;
1984 }
1985
1986
1987 /**
1988  * We have been notified that our readset has something to read.  We don't
1989  * know which socket needs to be read, so we have to check each one
1990  * Then reschedule this function to be called again once more is available.
1991  *
1992  * @param cls the plugin handle
1993  * @param tc the scheduling context (for rescheduling this function again)
1994  */
1995 static void
1996 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1997 {
1998   struct Plugin *plugin = cls;
1999
2000   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2001   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
2002     return;
2003
2004   plugin->with_v6_ws = GNUNET_NO;
2005   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
2006   {
2007     if ((NULL != plugin->sockv6) &&
2008       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
2009         udp_select_read (plugin, plugin->sockv6);
2010   }
2011
2012   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
2013   {
2014     if ((NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
2015       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)))
2016       {
2017         udp_select_send (plugin, plugin->sockv6);
2018       }
2019   }
2020   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2021     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2022   plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2023                                    GNUNET_TIME_UNIT_FOREVER_REL,
2024                                    plugin->rs_v6,
2025                                    (plugin->ipv6_queue_head != NULL) ? plugin->ws_v6 : NULL,
2026                                    &udp_plugin_select_v6, plugin);
2027   if (plugin->ipv6_queue_head != NULL)
2028     plugin->with_v6_ws = GNUNET_YES;
2029   else
2030     plugin->with_v6_ws = GNUNET_NO;
2031 }
2032
2033
2034 static int
2035 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
2036 {
2037   int tries;
2038   int sockets_created = 0;
2039   struct sockaddr *serverAddr;
2040   struct sockaddr *addrs[2];
2041   socklen_t addrlens[2];
2042   socklen_t addrlen;
2043
2044   /* Create IPv6 socket */
2045   if (plugin->enable_ipv6 == GNUNET_YES)
2046   {
2047     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2048     if (NULL == plugin->sockv6)
2049     {
2050       LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2051       plugin->enable_ipv6 = GNUNET_NO;
2052     }
2053     else
2054     {
2055 #if HAVE_SOCKADDR_IN_SIN_LEN
2056       serverAddrv6->sin6_len = sizeof (serverAddrv6);
2057 #endif
2058       serverAddrv6->sin6_family = AF_INET6;
2059       serverAddrv6->sin6_addr = in6addr_any;
2060       serverAddrv6->sin6_port = htons (plugin->port);
2061       addrlen = sizeof (struct sockaddr_in6);
2062       serverAddr = (struct sockaddr *) serverAddrv6;
2063       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
2064            ntohs (serverAddrv6->sin6_port));
2065       tries = 0;
2066       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
2067              GNUNET_OK)
2068       {
2069         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
2070         LOG (GNUNET_ERROR_TYPE_DEBUG,
2071              "IPv6 Binding failed, trying new port %d\n",
2072              ntohs (serverAddrv6->sin6_port));
2073         tries++;
2074         if (tries > 10)
2075         {
2076           GNUNET_NETWORK_socket_close (plugin->sockv6);
2077           plugin->sockv6 = NULL;
2078           break;
2079         }
2080       }
2081       if (plugin->sockv6 != NULL)
2082       {
2083         LOG (GNUNET_ERROR_TYPE_DEBUG,
2084              "IPv6 socket created on port %d\n",
2085              ntohs (serverAddrv6->sin6_port));
2086         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
2087         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2088         sockets_created++;
2089       }
2090     }
2091   }
2092
2093   /* Create IPv4 socket */
2094   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2095   if (NULL == plugin->sockv4)
2096   {
2097     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2098   }
2099   else
2100   {
2101 #if HAVE_SOCKADDR_IN_SIN_LEN
2102     serverAddrv4->sin_len = sizeof (serverAddrv4);
2103 #endif
2104     serverAddrv4->sin_family = AF_INET;
2105     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2106     serverAddrv4->sin_port = htons (plugin->port);
2107     addrlen = sizeof (struct sockaddr_in);
2108     serverAddr = (struct sockaddr *) serverAddrv4;
2109
2110     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2111          ntohs (serverAddrv4->sin_port));
2112     tries = 0;
2113     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2114            GNUNET_OK)
2115     {
2116       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2117       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2118            ntohs (serverAddrv4->sin_port));
2119       tries++;
2120       if (tries > 10)
2121       {
2122         GNUNET_NETWORK_socket_close (plugin->sockv4);
2123         plugin->sockv4 = NULL;
2124         break;
2125       }
2126     }
2127     if (plugin->sockv4 != NULL)
2128     {
2129       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2130       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2131       sockets_created++;
2132     }
2133   }
2134
2135   /* Create file descriptors */
2136   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2137   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2138   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2139   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2140   if (NULL != plugin->sockv4)
2141   {
2142     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2143     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2144   }
2145
2146   if (sockets_created == 0)
2147     LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2148
2149   plugin->select_task =
2150       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2151                                    GNUNET_TIME_UNIT_FOREVER_REL,
2152                                    plugin->rs_v4,
2153                                    NULL,
2154                                    &udp_plugin_select, plugin);
2155   plugin->with_v4_ws = GNUNET_NO;
2156
2157   if (plugin->enable_ipv6 == GNUNET_YES)
2158   {
2159     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2160     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2161     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2162     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2163     if (NULL != plugin->sockv6)
2164     {
2165       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2166       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2167     }
2168
2169     plugin->select_task_v6 =
2170         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2171                                      GNUNET_TIME_UNIT_FOREVER_REL,
2172                                      plugin->rs_v6,
2173                                      NULL,
2174                                      &udp_plugin_select_v6, plugin);
2175     plugin->with_v6_ws = GNUNET_NO;
2176   }
2177
2178   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2179                            GNUNET_NO, plugin->port,
2180                            sockets_created,
2181                            (const struct sockaddr **) addrs, addrlens,
2182                            &udp_nat_port_map_callback, NULL, plugin);
2183
2184   return sockets_created;
2185 }
2186
2187 /**
2188  * Session was idle, so disconnect it
2189  */
2190 static void
2191 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2192 {
2193   GNUNET_assert (NULL != cls);
2194   struct Session *s = cls;
2195
2196   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
2197
2198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %llu, disconnecting\n",
2199       s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2200
2201   /* call session destroy function */
2202   disconnect_session(s);
2203
2204 }
2205
2206 /**
2207  * Start session timeout
2208  */
2209 static void
2210 start_session_timeout (struct Session *s)
2211 {
2212   GNUNET_assert (NULL != s);
2213   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
2214
2215   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2216                                                    &session_timeout,
2217                                                    s);
2218
2219   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p set to %llu\n",
2220       s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2221 }
2222
2223 /**
2224  * Increment session timeout due to activity
2225  */
2226 static void
2227 reschedule_session_timeout (struct Session *s)
2228 {
2229   GNUNET_assert (NULL != s);
2230   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
2231
2232   GNUNET_SCHEDULER_cancel (s->timeout_task);
2233   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2234                                                    &session_timeout,
2235                                                    s);
2236
2237   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p set to %llu\n",
2238       s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2239 }
2240
2241 /**
2242  * Cancel timeout
2243  */
2244 static void
2245 stop_session_timeout (struct Session *s)
2246 {
2247   GNUNET_assert (NULL != s);
2248
2249   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
2250   {
2251     GNUNET_SCHEDULER_cancel (s->timeout_task);
2252     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
2253
2254     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p canceled\n",
2255       s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2256   }
2257   else
2258   {
2259     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p was not active\n",
2260       s);
2261   }
2262 }
2263
2264 /**
2265  * The exported method. Makes the core api available via a global and
2266  * returns the udp transport API.
2267  *
2268  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2269  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2270  */
2271 void *
2272 libgnunet_plugin_transport_udp_init (void *cls)
2273 {
2274   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2275   struct GNUNET_TRANSPORT_PluginFunctions *api;
2276   struct Plugin *p;
2277   unsigned long long port;
2278   unsigned long long aport;
2279   unsigned long long broadcast;
2280   unsigned long long udp_max_bps;
2281   unsigned long long enable_v6;
2282   char * bind4_address;
2283   char * bind6_address;
2284   struct GNUNET_TIME_Relative interval;
2285   struct sockaddr_in serverAddrv4;
2286   struct sockaddr_in6 serverAddrv6;
2287   int res;
2288
2289   if (NULL == env->receive)
2290   {
2291     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2292        initialze the plugin or the API */
2293     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2294     api->cls = NULL;
2295     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2296     api->address_to_string = &udp_address_to_string;
2297     api->string_to_address = &udp_string_to_address;
2298     return api;
2299   }
2300
2301   GNUNET_assert( NULL != env->stats);
2302
2303   /* Get port number */
2304   if (GNUNET_OK !=
2305       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2306                                              &port))
2307     port = 2086;
2308   if (GNUNET_OK !=
2309       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2310                                              "ADVERTISED_PORT", &aport))
2311     aport = port;
2312   if (port > 65535)
2313   {
2314     LOG (GNUNET_ERROR_TYPE_WARNING,
2315          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2316          65535);
2317     return NULL;
2318   }
2319
2320   /* Protocols */
2321   if ((GNUNET_YES ==
2322        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2323                                              "DISABLEV6")))
2324   {
2325     enable_v6 = GNUNET_NO;
2326   }
2327   else
2328     enable_v6 = GNUNET_YES;
2329
2330   /* Addresses */
2331   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2332   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2333
2334   if (GNUNET_YES ==
2335       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2336                                              "BINDTO", &bind4_address))
2337   {
2338     LOG (GNUNET_ERROR_TYPE_DEBUG,
2339          "Binding udp plugin to specific address: `%s'\n",
2340          bind4_address);
2341     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2342     {
2343       GNUNET_free (bind4_address);
2344       return NULL;
2345     }
2346   }
2347
2348   if (GNUNET_YES ==
2349       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2350                                              "BINDTO6", &bind6_address))
2351   {
2352     LOG (GNUNET_ERROR_TYPE_DEBUG,
2353          "Binding udp plugin to specific address: `%s'\n",
2354          bind6_address);
2355     if (1 !=
2356         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2357     {
2358       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2359            bind6_address);
2360       GNUNET_free_non_null (bind4_address);
2361       GNUNET_free (bind6_address);
2362       return NULL;
2363     }
2364   }
2365
2366   /* Enable neighbour discovery */
2367   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2368                                             "BROADCAST");
2369   if (broadcast == GNUNET_SYSERR)
2370     broadcast = GNUNET_NO;
2371
2372   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-udp",
2373                                            "BROADCAST_INTERVAL", &interval))
2374   {
2375     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2376   }
2377
2378   /* Maximum datarate */
2379   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2380                                              "MAX_BPS", &udp_max_bps))
2381   {
2382     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2383   }
2384
2385   p = GNUNET_malloc (sizeof (struct Plugin));
2386   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2387
2388   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
2389                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2390   p->sessions = GNUNET_CONTAINER_multihashmap_create (10);
2391   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2392   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
2393   p->port = port;
2394   p->aport = aport;
2395   p->broadcast_interval = interval;
2396   p->enable_ipv6 = enable_v6;
2397   p->env = env;
2398
2399   plugin = p;
2400
2401   api->cls = p;
2402   api->send = NULL;
2403   api->disconnect = &udp_disconnect;
2404   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2405   api->address_to_string = &udp_address_to_string;
2406   api->string_to_address = &udp_string_to_address;
2407   api->check_address = &udp_plugin_check_address;
2408   api->get_session = &udp_plugin_get_session;
2409   api->send = &udp_plugin_send;
2410
2411   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2412   res = setup_sockets (p, &serverAddrv6, &serverAddrv4);
2413   if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
2414   {
2415     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2416     GNUNET_free (p);
2417     GNUNET_free (api);
2418     return NULL;
2419   }
2420
2421   if (broadcast == GNUNET_YES)
2422   {
2423     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2424     setup_broadcast (p, &serverAddrv6, &serverAddrv4);
2425   }
2426
2427   GNUNET_free_non_null (bind4_address);
2428   GNUNET_free_non_null (bind6_address);
2429   return api;
2430 }
2431
2432
2433 static int
2434 heap_cleanup_iterator (void *cls,
2435                        struct GNUNET_CONTAINER_HeapNode *
2436                        node, void *element,
2437                        GNUNET_CONTAINER_HeapCostType
2438                        cost)
2439 {
2440   struct DefragContext * d_ctx = element;
2441
2442   GNUNET_CONTAINER_heap_remove_node (node);
2443   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2444   GNUNET_free (d_ctx);
2445
2446   return GNUNET_YES;
2447 }
2448
2449
2450 /**
2451  * The exported method. Makes the core api available via a global and
2452  * returns the udp transport API.
2453  *
2454  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2455  * @return NULL
2456  */
2457 void *
2458 libgnunet_plugin_transport_udp_done (void *cls)
2459 {
2460   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2461   struct Plugin *plugin = api->cls;
2462
2463   if (NULL == plugin)
2464   {
2465     GNUNET_free (api);
2466     return NULL;
2467   }
2468
2469   stop_broadcast (plugin);
2470
2471   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2472   {
2473     GNUNET_SCHEDULER_cancel (plugin->select_task);
2474     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2475   }
2476   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2477   {
2478     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2479     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2480   }
2481
2482   /* Closing sockets */
2483   if (plugin->sockv4 != NULL)
2484   {
2485     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2486     plugin->sockv4 = NULL;
2487   }
2488   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2489   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2490
2491   if (plugin->sockv6 != NULL)
2492   {
2493     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2494     plugin->sockv6 = NULL;
2495
2496     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2497     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2498   }
2499
2500   GNUNET_NAT_unregister (plugin->nat);
2501
2502   if (plugin->defrag_ctxs != NULL)
2503   {
2504     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2505         heap_cleanup_iterator, NULL);
2506     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2507     plugin->defrag_ctxs = NULL;
2508   }
2509   if (plugin->mst != NULL)
2510   {
2511     GNUNET_SERVER_mst_destroy(plugin->mst);
2512     plugin->mst = NULL;
2513   }
2514
2515   /* Clean up leftover messages */
2516   struct UDPMessageWrapper * udpw;
2517   udpw = plugin->ipv4_queue_head;
2518   while (udpw != NULL)
2519   {
2520     struct UDPMessageWrapper *tmp = udpw->next;
2521     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
2522     call_continuation(udpw, GNUNET_SYSERR);
2523     GNUNET_free (udpw);
2524     udpw = tmp;
2525   }
2526   udpw = plugin->ipv6_queue_head;
2527   while (udpw != NULL)
2528   {
2529     struct UDPMessageWrapper *tmp = udpw->next;
2530     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
2531     call_continuation(udpw, GNUNET_SYSERR);
2532     GNUNET_free (udpw);
2533     udpw = tmp;
2534   }
2535
2536   /* Clean up sessions */
2537   LOG (GNUNET_ERROR_TYPE_DEBUG,
2538        "Cleaning up sessions\n");
2539   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2540   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2541
2542   plugin->nat = NULL;
2543   GNUNET_free (plugin);
2544   GNUNET_free (api);
2545   return NULL;
2546 }
2547
2548
2549 /* end of plugin_transport_udp.c */