hunting bugs
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 struct Session
90 {
91   /**
92    * Which peer is this session for?
93    */
94   struct GNUNET_PeerIdentity target;
95
96   struct FragmentationContext * frag_ctx;
97
98   /**
99    * Address of the other peer
100    */
101   const struct sockaddr *sock_addr;
102
103   /**
104    * Desired delay for next sending we send to other peer
105    */
106   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
107
108   /**
109    * Desired delay for next sending we received from other peer
110    */
111   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
112
113   /**
114    * Session timeout task
115    */
116   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
117
118   /**
119    * expected delay for ACKs
120    */
121   struct GNUNET_TIME_Relative last_expected_delay;
122
123   struct GNUNET_ATS_Information ats;
124
125   size_t addrlen;
126
127
128   unsigned int rc;
129
130   int in_destroy;
131 };
132
133
134 struct SessionCompareContext
135 {
136   struct Session *res;
137   const struct GNUNET_HELLO_Address *addr;
138 };
139
140
141 /**
142  * Closure for 'process_inbound_tokenized_messages'
143  */
144 struct SourceInformation
145 {
146   /**
147    * Sender identity.
148    */
149   struct GNUNET_PeerIdentity sender;
150
151   /**
152    * Source address.
153    */
154   const void *arg;
155
156   struct Session *session;
157   /**
158    * Number of bytes in source address.
159    */
160   size_t args;
161
162 };
163
164
165 /**
166  * Closure for 'find_receive_context'.
167  */
168 struct FindReceiveContext
169 {
170   /**
171    * Where to store the result.
172    */
173   struct DefragContext *rc;
174
175   /**
176    * Address to find.
177    */
178   const struct sockaddr *addr;
179
180   struct Session *session;
181
182   /**
183    * Number of bytes in 'addr'.
184    */
185   socklen_t addr_len;
186
187 };
188
189
190
191 /**
192  * Data structure to track defragmentation contexts based
193  * on the source of the UDP traffic.
194  */
195 struct DefragContext
196 {
197
198   /**
199    * Defragmentation context.
200    */
201   struct GNUNET_DEFRAGMENT_Context *defrag;
202
203   /**
204    * Source address this receive context is for (allocated at the
205    * end of the struct).
206    */
207   const struct sockaddr *src_addr;
208
209   /**
210    * Reference to master plugin struct.
211    */
212   struct Plugin *plugin;
213
214   /**
215    * Node in the defrag heap.
216    */
217   struct GNUNET_CONTAINER_HeapNode *hnode;
218
219   /**
220    * Length of 'src_addr'
221    */
222   size_t addr_len;
223 };
224
225
226
227 /**
228  * Closure for 'process_inbound_tokenized_messages'
229  */
230 struct FragmentationContext
231 {
232   struct FragmentationContext * next;
233   struct FragmentationContext * prev;
234
235   struct Plugin * plugin;
236   struct GNUNET_FRAGMENT_Context * frag;
237   struct Session * session;
238
239   /**
240    * Function to call upon completion of the transmission.
241    */
242   GNUNET_TRANSPORT_TransmitContinuation cont;
243
244   /**
245    * Closure for 'cont'.
246    */
247   void *cont_cls;
248
249   struct GNUNET_TIME_Absolute timeout;
250
251   size_t bytes_to_send;
252 };
253
254
255 struct UDPMessageWrapper
256 {
257   struct Session *session;
258   struct UDPMessageWrapper *prev;
259   struct UDPMessageWrapper *next;
260   char *udp;
261
262   /**
263    * Function to call upon completion of the transmission.
264    */
265   GNUNET_TRANSPORT_TransmitContinuation cont;
266
267   /**
268    * Closure for 'cont'.
269    */
270   void *cont_cls;
271
272   struct FragmentationContext *frag_ctx;
273
274   size_t msg_size;
275
276   struct GNUNET_TIME_Absolute timeout;
277 };
278
279
280 /**
281  * UDP ACK Message-Packet header (after defragmentation).
282  */
283 struct UDP_ACK_Message
284 {
285   /**
286    * Message header.
287    */
288   struct GNUNET_MessageHeader header;
289
290   /**
291    * Desired delay for flow control
292    */
293   uint32_t delay;
294
295   /**
296    * What is the identity of the sender
297    */
298   struct GNUNET_PeerIdentity sender;
299
300 };
301
302 /**
303  * Encapsulation of all of the state of the plugin.
304  */
305 struct Plugin * plugin;
306
307
308 /**
309  * We have been notified that our readset has something to read.  We don't
310  * know which socket needs to be read, so we have to check each one
311  * Then reschedule this function to be called again once more is available.
312  *
313  * @param cls the plugin handle
314  * @param tc the scheduling context (for rescheduling this function again)
315  */
316 static void
317 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
318
319
320 /**
321  * We have been notified that our readset has something to read.  We don't
322  * know which socket needs to be read, so we have to check each one
323  * Then reschedule this function to be called again once more is available.
324  *
325  * @param cls the plugin handle
326  * @param tc the scheduling context (for rescheduling this function again)
327  */
328 static void
329 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
330
331
332 /**
333  * Start session timeout
334  */
335 static void
336 start_session_timeout (struct Session *s);
337
338 /**
339  * Increment session timeout due to activity
340  */
341 static void
342 reschedule_session_timeout (struct Session *s);
343
344 /**
345  * Cancel timeout
346  */
347 static void
348 stop_session_timeout (struct Session *s);
349
350
351 /**
352  * (re)schedule select tasks for this plugin.
353  *
354  * @param plugin plugin to reschedule
355  */
356 static void
357 schedule_select (struct Plugin *plugin)
358 {
359   struct GNUNET_TIME_Relative min_delay;
360   struct UDPMessageWrapper *udpw;
361
362   if (NULL != plugin->sockv4)
363   {
364     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
365     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
366       min_delay = GNUNET_TIME_relative_min (min_delay,
367                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
368     
369     if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
370       GNUNET_SCHEDULER_cancel(plugin->select_task);
371     plugin->select_task =
372       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
373                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
374                                    plugin->rs_v4,
375                                    (0 == min_delay.rel_value) ? plugin->ws_v4 : NULL,
376                                    &udp_plugin_select, plugin);  
377   }
378   if (NULL != plugin->sockv6)
379   {
380     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
381     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
382       min_delay = GNUNET_TIME_relative_min (min_delay,
383                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
384     
385     if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task_v6)
386       GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
387     plugin->select_task_v6 =
388       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
389                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
390                                    plugin->rs_v6,
391                                    (0 == min_delay.rel_value) ? plugin->ws_v6 : NULL,
392                                    &udp_plugin_select_v6, plugin);
393   }
394 }
395
396
397 /**
398  * Function called for a quick conversion of the binary address to
399  * a numeric address.  Note that the caller must not free the
400  * address and that the next call to this function is allowed
401  * to override the address again.
402  *
403  * @param cls closure
404  * @param addr binary address
405  * @param addrlen length of the address
406  * @return string representing the same address
407  */
408 const char *
409 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
410 {
411   static char rbuf[INET6_ADDRSTRLEN + 10];
412   char buf[INET6_ADDRSTRLEN];
413   const void *sb;
414   struct in_addr a4;
415   struct in6_addr a6;
416   const struct IPv4UdpAddress *t4;
417   const struct IPv6UdpAddress *t6;
418   int af;
419   uint16_t port;
420
421   if (addrlen == sizeof (struct IPv6UdpAddress))
422   {
423     t6 = addr;
424     af = AF_INET6;
425     port = ntohs (t6->u6_port);
426     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
427     sb = &a6;
428   }
429   else if (addrlen == sizeof (struct IPv4UdpAddress))
430   {
431     t4 = addr;
432     af = AF_INET;
433     port = ntohs (t4->u4_port);
434     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
435     sb = &a4;
436   }
437   else
438   {
439     GNUNET_break_op (0);
440     return NULL;
441   }
442   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
443   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
444                    buf, port);
445   return rbuf;
446 }
447
448
449 /**
450  * Function called to convert a string address to
451  * a binary address.
452  *
453  * @param cls closure ('struct Plugin*')
454  * @param addr string address
455  * @param addrlen length of the address
456  * @param buf location to store the buffer
457  * @param added location to store the number of bytes in the buffer.
458  *        If the function returns GNUNET_SYSERR, its contents are undefined.
459  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
460  */
461 static int
462 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
463     void **buf, size_t *added)
464 {
465   struct sockaddr_storage socket_address;
466   
467   if ((NULL == addr) || (0 == addrlen))
468   {
469     GNUNET_break (0);
470     return GNUNET_SYSERR;
471   }
472
473   if ('\0' != addr[addrlen - 1])
474   {
475     return GNUNET_SYSERR;
476   }
477
478   if (strlen (addr) != addrlen - 1)
479   {
480     return GNUNET_SYSERR;
481   }
482
483   if (GNUNET_OK != GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
484                                                  &socket_address))
485   {
486     return GNUNET_SYSERR;
487   }
488
489   switch (socket_address.ss_family)
490   {
491   case AF_INET:
492     {
493       struct IPv4UdpAddress *u4;
494       struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
495       u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
496       u4->ipv4_addr = in4->sin_addr.s_addr;
497       u4->u4_port = in4->sin_port;
498       *buf = u4;
499       *added = sizeof (struct IPv4UdpAddress);
500       return GNUNET_OK;
501     }
502   case AF_INET6:
503     {
504       struct IPv6UdpAddress *u6;
505       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
506       u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
507       u6->ipv6_addr = in6->sin6_addr;
508       u6->u6_port = in6->sin6_port;
509       *buf = u6;
510       *added = sizeof (struct IPv6UdpAddress);
511       return GNUNET_OK;
512     }
513   default:
514     GNUNET_break (0);
515     return GNUNET_SYSERR;
516   }
517 }
518
519
520 /**
521  * Append our port and forward the result.
522  *
523  * @param cls a 'struct PrettyPrinterContext'
524  * @param hostname result from DNS resolver
525  */
526 static void
527 append_port (void *cls, const char *hostname)
528 {
529   struct PrettyPrinterContext *ppc = cls;
530   char *ret;
531
532   if (hostname == NULL)
533   {
534     ppc->asc (ppc->asc_cls, NULL);
535     GNUNET_free (ppc);
536     return;
537   }
538   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
539   ppc->asc (ppc->asc_cls, ret);
540   GNUNET_free (ret);
541 }
542
543
544 /**
545  * Convert the transports address to a nice, human-readable
546  * format.
547  *
548  * @param cls closure
549  * @param type name of the transport that generated the address
550  * @param addr one of the addresses of the host, NULL for the last address
551  *        the specific address format depends on the transport
552  * @param addrlen length of the address
553  * @param numeric should (IP) addresses be displayed in numeric form?
554  * @param timeout after how long should we give up?
555  * @param asc function to call on each string
556  * @param asc_cls closure for asc
557  */
558 static void
559 udp_plugin_address_pretty_printer (void *cls, const char *type,
560                                    const void *addr, size_t addrlen,
561                                    int numeric,
562                                    struct GNUNET_TIME_Relative timeout,
563                                    GNUNET_TRANSPORT_AddressStringCallback asc,
564                                    void *asc_cls)
565 {
566   struct PrettyPrinterContext *ppc;
567   const void *sb;
568   size_t sbs;
569   struct sockaddr_in a4;
570   struct sockaddr_in6 a6;
571   const struct IPv4UdpAddress *u4;
572   const struct IPv6UdpAddress *u6;
573   uint16_t port;
574
575   if (addrlen == sizeof (struct IPv6UdpAddress))
576   {
577     u6 = addr;
578     memset (&a6, 0, sizeof (a6));
579     a6.sin6_family = AF_INET6;
580 #if HAVE_SOCKADDR_IN_SIN_LEN
581     a6.sin6_len = sizeof (a6);
582 #endif
583     a6.sin6_port = u6->u6_port;
584     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
585     port = ntohs (u6->u6_port);
586     sb = &a6;
587     sbs = sizeof (a6);
588   }
589   else if (addrlen == sizeof (struct IPv4UdpAddress))
590   {
591     u4 = addr;
592     memset (&a4, 0, sizeof (a4));
593     a4.sin_family = AF_INET;
594 #if HAVE_SOCKADDR_IN_SIN_LEN
595     a4.sin_len = sizeof (a4);
596 #endif
597     a4.sin_port = u4->u4_port;
598     a4.sin_addr.s_addr = u4->ipv4_addr;
599     port = ntohs (u4->u4_port);
600     sb = &a4;
601     sbs = sizeof (a4);
602   }
603   else if (0 == addrlen)
604   {
605     asc (asc_cls, "<inbound connection>");
606     asc (asc_cls, NULL);
607     return;
608   }
609   else
610   {
611     /* invalid address */
612     GNUNET_break_op (0);
613     asc (asc_cls, NULL);
614     return;
615   }
616   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
617   ppc->asc = asc;
618   ppc->asc_cls = asc_cls;
619   ppc->port = port;
620   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
621 }
622
623
624 static void
625 call_continuation (struct UDPMessageWrapper *udpw, int result)
626 {
627   LOG (GNUNET_ERROR_TYPE_DEBUG,
628       "Calling continuation for %u byte message to `%s' with result %s\n",
629       udpw->msg_size, GNUNET_i2s (&udpw->session->target),
630       (GNUNET_OK == result) ? "OK" : "SYSERR");
631   if (NULL != udpw->cont)
632   {
633     udpw->cont (udpw->cont_cls, &udpw->session->target,result);
634   }
635
636 }
637
638
639 /**
640  * Check if the given port is plausible (must be either our listen
641  * port or our advertised port).  If it is neither, we return
642  * GNUNET_SYSERR.
643  *
644  * @param plugin global variables
645  * @param in_port port number to check
646  * @return GNUNET_OK if port is either open_port or adv_port
647  */
648 static int
649 check_port (struct Plugin *plugin, uint16_t in_port)
650 {
651   if ((in_port == plugin->port) || (in_port == plugin->aport))
652     return GNUNET_OK;
653   return GNUNET_SYSERR;
654 }
655
656
657 /**
658  * Function that will be called to check if a binary address for this
659  * plugin is well-formed and corresponds to an address for THIS peer
660  * (as per our configuration).  Naturally, if absolutely necessary,
661  * plugins can be a bit conservative in their answer, but in general
662  * plugins should make sure that the address does not redirect
663  * traffic to a 3rd party that might try to man-in-the-middle our
664  * traffic.
665  *
666  * @param cls closure, should be our handle to the Plugin
667  * @param addr pointer to the address
668  * @param addrlen length of addr
669  * @return GNUNET_OK if this is a plausible address for this peer
670  *         and transport, GNUNET_SYSERR if not
671  *
672  */
673 static int
674 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
675 {
676   struct Plugin *plugin = cls;
677   struct IPv4UdpAddress *v4;
678   struct IPv6UdpAddress *v6;
679
680   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
681       (addrlen != sizeof (struct IPv6UdpAddress)))
682   {
683     GNUNET_break_op (0);
684     return GNUNET_SYSERR;
685   }
686   if (addrlen == sizeof (struct IPv4UdpAddress))
687   {
688     v4 = (struct IPv4UdpAddress *) addr;
689     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
690       return GNUNET_SYSERR;
691     if (GNUNET_OK !=
692         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
693                                  sizeof (struct in_addr)))
694       return GNUNET_SYSERR;
695   }
696   else
697   {
698     v6 = (struct IPv6UdpAddress *) addr;
699     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
700     {
701       GNUNET_break_op (0);
702       return GNUNET_SYSERR;
703     }
704     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
705       return GNUNET_SYSERR;
706     if (GNUNET_OK !=
707         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
708                                  sizeof (struct in6_addr)))
709       return GNUNET_SYSERR;
710   }
711   return GNUNET_OK;
712 }
713
714
715 /**
716  * Task to free resources associated with a session.
717  *
718  * @param s session to free
719  */
720 static void
721 free_session (struct Session *s)
722 {
723   if (s->frag_ctx != NULL)
724   {
725     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
726     GNUNET_free (s->frag_ctx);
727     s->frag_ctx = NULL;
728   }
729   GNUNET_free (s);
730 }
731
732
733 /**
734  * Functions with this signature are called whenever we need
735  * to close a session due to a disconnect or failure to
736  * establish a connection.
737  *
738  * @param s session to close down
739  */
740 static void
741 disconnect_session (struct Session *s)
742 {
743   struct UDPMessageWrapper *udpw;
744   struct UDPMessageWrapper *next;
745
746   GNUNET_assert (GNUNET_YES != s->in_destroy);
747   LOG (GNUNET_ERROR_TYPE_DEBUG,
748        "Session %p to peer `%s' address ended \n",
749          s,
750          GNUNET_i2s (&s->target),
751          GNUNET_a2s (s->sock_addr, s->addrlen));
752   stop_session_timeout (s);
753   next = plugin->ipv4_queue_head;
754   while (NULL != (udpw = next))
755   {
756     next = udpw->next;
757     if (udpw->session == s)
758     {
759       GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
760       call_continuation(udpw, GNUNET_SYSERR);
761       GNUNET_free (udpw);
762     }
763   }
764   next = plugin->ipv6_queue_head;
765   while (NULL != (udpw = next))
766   {
767     next = udpw->next;
768     if (udpw->session == s)
769     {
770       GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
771       call_continuation(udpw, GNUNET_SYSERR);
772       GNUNET_free (udpw);
773     }
774     udpw = next;
775   }
776   plugin->env->session_end (plugin->env->cls, &s->target, s);
777
778   if (NULL != s->frag_ctx)
779   {
780     if (NULL != s->frag_ctx->cont)
781     {
782       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR);
783       LOG (GNUNET_ERROR_TYPE_DEBUG,
784           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
785           GNUNET_i2s (&s->target));
786     }
787   }
788
789   GNUNET_assert (GNUNET_YES ==
790                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
791                                                        &s->target.hashPubKey,
792                                                        s));
793   GNUNET_STATISTICS_set(plugin->env->stats,
794                         "# UDP sessions active",
795                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
796                         GNUNET_NO);
797   if (s->rc > 0)
798     s->in_destroy = GNUNET_YES;
799   else
800     free_session (s);
801 }
802
803 /**
804  * Destroy a session, plugin is being unloaded.
805  *
806  * @param cls unused
807  * @param key hash of public key of target peer
808  * @param value a 'struct PeerSession*' to clean up
809  * @return GNUNET_OK (continue to iterate)
810  */
811 static int
812 disconnect_and_free_it (void *cls, const struct GNUNET_HashCode * key, void *value)
813 {
814   disconnect_session(value);
815   return GNUNET_OK;
816 }
817
818
819 /**
820  * Disconnect from a remote node.  Clean up session if we have one for this peer
821  *
822  * @param cls closure for this call (should be handle to Plugin)
823  * @param target the peeridentity of the peer to disconnect
824  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
825  */
826 static void
827 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
828 {
829   struct Plugin *plugin = cls;
830   GNUNET_assert (plugin != NULL);
831
832   GNUNET_assert (target != NULL);
833   LOG (GNUNET_ERROR_TYPE_DEBUG,
834        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
835   /* Clean up sessions */
836   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
837 }
838
839
840 /**
841  * Session was idle, so disconnect it
842  */
843 static void
844 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
845 {
846   GNUNET_assert (NULL != cls);
847   struct Session *s = cls;
848
849   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
850   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
851               "Session %p was idle for %llu ms, disconnecting\n",
852               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
853   /* call session destroy function */
854   disconnect_session (s);
855 }
856
857
858 /**
859  * Start session timeout
860  */
861 static void
862 start_session_timeout (struct Session *s)
863 {
864   GNUNET_assert (NULL != s);
865   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
866   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
867                                                    &session_timeout,
868                                                    s);
869   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
870               "Timeout for session %p set to %llu ms\n",
871               s,  (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
872 }
873
874
875 /**
876  * Increment session timeout due to activity
877  */
878 static void
879 reschedule_session_timeout (struct Session *s)
880 {
881   GNUNET_assert (NULL != s);
882   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
883
884   GNUNET_SCHEDULER_cancel (s->timeout_task);
885   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
886                                                    &session_timeout,
887                                                    s);
888   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
889               "Timeout rescheduled for session %p set to %llu ms\n",
890               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
891 }
892
893
894 /**
895  * Cancel timeout
896  */
897 static void
898 stop_session_timeout (struct Session *s)
899 {
900   GNUNET_assert (NULL != s);
901
902   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
903   {
904     GNUNET_SCHEDULER_cancel (s->timeout_task);
905     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
906     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907                 "Timeout stopped for session %p canceled\n",
908                 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
909   }
910 }
911
912
913 static struct Session *
914 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
915                 const void *addr, size_t addrlen,
916                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
917 {
918   struct Session *s;
919   const struct IPv4UdpAddress *t4;
920   const struct IPv6UdpAddress *t6;
921   struct sockaddr_in *v4;
922   struct sockaddr_in6 *v6;
923   size_t len;
924
925   switch (addrlen)
926   {
927   case sizeof (struct IPv4UdpAddress):
928     if (NULL == plugin->sockv4)
929     {
930       return NULL;
931     }
932     t4 = addr;
933     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
934     len = sizeof (struct sockaddr_in);
935     v4 = (struct sockaddr_in *) &s[1];
936     v4->sin_family = AF_INET;
937 #if HAVE_SOCKADDR_IN_SIN_LEN
938     v4->sin_len = sizeof (struct sockaddr_in);
939 #endif
940     v4->sin_port = t4->u4_port;
941     v4->sin_addr.s_addr = t4->ipv4_addr;
942     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
943     break;
944   case sizeof (struct IPv6UdpAddress):
945     if (NULL == plugin->sockv6)
946     {
947       return NULL;
948     }
949     t6 = addr;
950     s =
951         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
952     len = sizeof (struct sockaddr_in6);
953     v6 = (struct sockaddr_in6 *) &s[1];
954     v6->sin6_family = AF_INET6;
955 #if HAVE_SOCKADDR_IN_SIN_LEN
956     v6->sin6_len = sizeof (struct sockaddr_in6);
957 #endif
958     v6->sin6_port = t6->u6_port;
959     v6->sin6_addr = t6->ipv6_addr;
960     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
961     break;
962   default:
963     /* Must have a valid address to send to */
964     GNUNET_break_op (0);
965     return NULL;
966   }
967   s->addrlen = len;
968   s->target = *target;
969   s->sock_addr = (const struct sockaddr *) &s[1];
970   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
971   start_session_timeout (s);
972   return s;
973 }
974
975
976 static int
977 session_cmp_it (void *cls,
978                 const struct GNUNET_HashCode * key,
979                 void *value)
980 {
981   struct SessionCompareContext * cctx = cls;
982   const struct GNUNET_HELLO_Address *address = cctx->addr;
983   struct Session *s = value;
984
985   socklen_t s_addrlen = s->addrlen;
986
987   LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
988       udp_address_to_string (NULL, (void *) address->address, address->address_length),
989       GNUNET_a2s (s->sock_addr, s->addrlen));
990   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
991       (s_addrlen == sizeof (struct sockaddr_in)))
992   {
993     struct IPv4UdpAddress * u4 = NULL;
994     u4 = (struct IPv4UdpAddress *) address->address;
995     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
996     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
997         (u4->u4_port == s4->sin_port))
998     {
999       cctx->res = s;
1000       return GNUNET_NO;
1001     }
1002
1003   }
1004   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
1005       (s_addrlen == sizeof (struct sockaddr_in6)))
1006   {
1007     struct IPv6UdpAddress * u6 = NULL;
1008     u6 = (struct IPv6UdpAddress *) address->address;
1009     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
1010     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
1011         (u6->u6_port == s6->sin6_port))
1012     {
1013       cctx->res = s;
1014       return GNUNET_NO;
1015     }
1016   }
1017   return GNUNET_YES;
1018 }
1019
1020
1021 /**
1022  * Creates a new outbound session the transport service will use to send data to the
1023  * peer
1024  *
1025  * @param cls the plugin
1026  * @param address the address
1027  * @return the session or NULL of max connections exceeded
1028  */
1029 static struct Session *
1030 udp_plugin_get_session (void *cls,
1031                   const struct GNUNET_HELLO_Address *address)
1032 {
1033   struct Session * s = NULL;
1034   struct Plugin * plugin = cls;
1035   struct IPv6UdpAddress * udp_a6;
1036   struct IPv4UdpAddress * udp_a4;
1037
1038   GNUNET_assert (plugin != NULL);
1039   GNUNET_assert (address != NULL);
1040
1041
1042   if ((address->address == NULL) ||
1043       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1044       (address->address_length != sizeof (struct IPv6UdpAddress))))
1045   {
1046     GNUNET_break (0);
1047     return NULL;
1048   }
1049
1050   if (address->address_length == sizeof (struct IPv4UdpAddress))
1051   {
1052     if (plugin->sockv4 == NULL)
1053       return NULL;
1054     udp_a4 = (struct IPv4UdpAddress *) address->address;
1055     if (udp_a4->u4_port == 0)
1056       return NULL;
1057   }
1058
1059   if (address->address_length == sizeof (struct IPv6UdpAddress))
1060   {
1061     if (plugin->sockv6 == NULL)
1062       return NULL;
1063     udp_a6 = (struct IPv6UdpAddress *) address->address;
1064     if (udp_a6->u6_port == 0)
1065       return NULL;
1066   }
1067
1068   /* check if session already exists */
1069   struct SessionCompareContext cctx;
1070   cctx.addr = address;
1071   cctx.res = NULL;
1072   LOG (GNUNET_ERROR_TYPE_DEBUG,
1073        "Looking for existing session for peer `%s' `%s' \n", 
1074        GNUNET_i2s (&address->peer), 
1075        udp_address_to_string(NULL, address->address, address->address_length));
1076   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
1077   if (cctx.res != NULL)
1078   {
1079     LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
1080     return cctx.res;
1081   }
1082
1083   /* otherwise create new */
1084   s = create_session (plugin,
1085       &address->peer,
1086       address->address,
1087       address->address_length,
1088       NULL, NULL);
1089   LOG (GNUNET_ERROR_TYPE_DEBUG,
1090        "Creating new session %p for peer `%s' address `%s'\n",
1091        s,
1092        GNUNET_i2s(&address->peer),
1093        udp_address_to_string(NULL,address->address,address->address_length));
1094   GNUNET_assert (GNUNET_OK ==
1095                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
1096                                                     &s->target.hashPubKey,
1097                                                     s,
1098                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1099
1100   GNUNET_STATISTICS_set(plugin->env->stats,
1101                         "# UDP sessions active",
1102                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1103                         GNUNET_NO);
1104
1105   return s;
1106 }
1107
1108
1109 static void 
1110 enqueue (struct Plugin *plugin, struct UDPMessageWrapper * udpw)
1111 {
1112
1113   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1114     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1115   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1116     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1117 }
1118
1119
1120 /**
1121  * Fragment message was transmitted via UDP, let fragmentation know
1122  * to send the next fragment now.
1123  *
1124  * @param cls the 'struct UDPMessageWrapper' of the fragment
1125  * @param target destination peer (ignored)
1126  * @param result GNUNET_OK on success (ignored)
1127  */
1128 static void
1129 send_next_fragment (void *cls,
1130                     const struct GNUNET_PeerIdentity *target,
1131                     int result)
1132 {
1133   struct UDPMessageWrapper *udpw = cls;
1134
1135   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
1136 }
1137
1138
1139 /**
1140  * Function that is called with messages created by the fragmentation
1141  * module.  In the case of the 'proc' callback of the
1142  * GNUNET_FRAGMENT_context_create function, this function must
1143  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
1144  *
1145  * @param cls closure, the 'struct FragmentationContext'
1146  * @param msg the message that was created
1147  */
1148 static void
1149 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1150 {
1151   struct FragmentationContext *frag_ctx = cls;
1152   struct Plugin *plugin = frag_ctx->plugin;
1153   struct UDPMessageWrapper * udpw;
1154   size_t msg_len = ntohs (msg->size);
1155  
1156   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1157        "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
1158   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
1159   udpw->session = frag_ctx->session;
1160   udpw->udp = (char *) &udpw[1];
1161
1162   udpw->msg_size = msg_len;
1163   udpw->cont = &send_next_fragment;
1164   udpw->cont_cls = udpw;
1165   udpw->timeout = frag_ctx->timeout;
1166   udpw->frag_ctx = frag_ctx;
1167   memcpy (udpw->udp, msg, msg_len);
1168   enqueue (plugin, udpw);
1169   schedule_select (plugin);
1170 }
1171
1172
1173 /**
1174  * Function that can be used by the transport service to transmit
1175  * a message using the plugin.   Note that in the case of a
1176  * peer disconnecting, the continuation MUST be called
1177  * prior to the disconnect notification itself.  This function
1178  * will be called with this peer's HELLO message to initiate
1179  * a fresh connection to another peer.
1180  *
1181  * @param cls closure
1182  * @param s which session must be used
1183  * @param msgbuf the message to transmit
1184  * @param msgbuf_size number of bytes in 'msgbuf'
1185  * @param priority how important is the message (most plugins will
1186  *                 ignore message priority and just FIFO)
1187  * @param to how long to wait at most for the transmission (does not
1188  *                require plugins to discard the message after the timeout,
1189  *                just advisory for the desired delay; most plugins will ignore
1190  *                this as well)
1191  * @param cont continuation to call once the message has
1192  *        been transmitted (or if the transport is ready
1193  *        for the next transmission call; or if the
1194  *        peer disconnected...); can be NULL
1195  * @param cont_cls closure for cont
1196  * @return number of bytes used (on the physical network, with overheads);
1197  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1198  *         and does NOT mean that the message was not transmitted (DV)
1199  */
1200 static ssize_t
1201 udp_plugin_send (void *cls,
1202                   struct Session *s,
1203                   const char *msgbuf, size_t msgbuf_size,
1204                   unsigned int priority,
1205                   struct GNUNET_TIME_Relative to,
1206                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1207 {
1208   struct Plugin *plugin = cls;
1209   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
1210   struct UDPMessageWrapper * udpw;
1211   struct UDPMessage *udp;
1212   char mbuf[mlen];
1213   GNUNET_assert (plugin != NULL);
1214   GNUNET_assert (s != NULL);
1215
1216   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1217     return GNUNET_SYSERR;
1218   if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1219     return GNUNET_SYSERR;
1220   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1221   {
1222     GNUNET_break (0);
1223     return GNUNET_SYSERR;
1224   }
1225   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1226   {
1227     GNUNET_break (0);
1228     return GNUNET_SYSERR;
1229   }
1230   LOG (GNUNET_ERROR_TYPE_DEBUG,
1231        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1232        mlen,
1233        GNUNET_i2s (&s->target),
1234        GNUNET_a2s(s->sock_addr, s->addrlen));
1235   
1236   /* Message */
1237   udp = (struct UDPMessage *) mbuf;
1238   udp->header.size = htons (mlen);
1239   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1240   udp->reserved = htonl (0);
1241   udp->sender = *plugin->env->my_identity;
1242
1243   reschedule_session_timeout(s);
1244   if (mlen <= UDP_MTU)
1245   {
1246     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
1247     udpw->session = s;
1248     udpw->udp = (char *) &udpw[1];
1249     udpw->msg_size = mlen;
1250     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1251     udpw->cont = cont;
1252     udpw->cont_cls = cont_cls;
1253     udpw->frag_ctx = NULL;
1254     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
1255     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1256     enqueue (plugin, udpw);
1257   }
1258   else
1259   {
1260     LOG (GNUNET_ERROR_TYPE_DEBUG,
1261          "UDP has to fragment message\n");
1262     if  (s->frag_ctx != NULL)
1263       return GNUNET_SYSERR;
1264     memcpy (&udp[1], msgbuf, msgbuf_size);
1265     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
1266
1267     frag_ctx->plugin = plugin;
1268     frag_ctx->session = s;
1269     frag_ctx->cont = cont;
1270     frag_ctx->cont_cls = cont_cls;
1271     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1272     frag_ctx->bytes_to_send = mlen;
1273     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1274               UDP_MTU,
1275               &plugin->tracker,
1276               s->last_expected_delay,
1277               &udp->header,
1278               &enqueue_fragment,
1279               frag_ctx);
1280
1281     s->frag_ctx = frag_ctx;
1282   }
1283   schedule_select (plugin);
1284   return mlen;
1285 }
1286
1287
1288 /**
1289  * Our external IP address/port mapping has changed.
1290  *
1291  * @param cls closure, the 'struct LocalAddrList'
1292  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1293  *     the previous (now invalid) one
1294  * @param addr either the previous or the new public IP address
1295  * @param addrlen actual lenght of the address
1296  */
1297 static void
1298 udp_nat_port_map_callback (void *cls, int add_remove,
1299                            const struct sockaddr *addr, socklen_t addrlen)
1300 {
1301   struct Plugin *plugin = cls;
1302   struct IPv4UdpAddress u4;
1303   struct IPv6UdpAddress u6;
1304   void *arg;
1305   size_t args;
1306
1307   /* convert 'addr' to our internal format */
1308   switch (addr->sa_family)
1309   {
1310   case AF_INET:
1311     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1312     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1313     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1314     arg = &u4;
1315     args = sizeof (u4);
1316     break;
1317   case AF_INET6:
1318     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1319     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1320             sizeof (struct in6_addr));
1321     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1322     arg = &u6;
1323     args = sizeof (u6);
1324     break;
1325   default:
1326     GNUNET_break (0);
1327     return;
1328   }
1329   /* modify our published address list */
1330   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args, "udp");
1331 }
1332
1333
1334
1335 /**
1336  * Message tokenizer has broken up an incomming message. Pass it on
1337  * to the service.
1338  *
1339  * @param cls the 'struct Plugin'
1340  * @param client the 'struct SourceInformation'
1341  * @param hdr the actual message
1342  */
1343 static int
1344 process_inbound_tokenized_messages (void *cls, void *client,
1345                                     const struct GNUNET_MessageHeader *hdr)
1346 {
1347   struct Plugin *plugin = cls;
1348   struct SourceInformation *si = client;
1349   struct GNUNET_ATS_Information ats[2];
1350   struct GNUNET_TIME_Relative delay;
1351
1352   GNUNET_assert (si->session != NULL);
1353   if (GNUNET_YES == si->session->in_destroy)
1354     return GNUNET_OK;
1355   /* setup ATS */
1356   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1357   ats[0].value = htonl (1);
1358   ats[1] = si->session->ats;
1359   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1360   delay = plugin->env->receive (plugin->env->cls,
1361                                 &si->sender,
1362                                 hdr,
1363                                 (const struct GNUNET_ATS_Information *) &ats, 2,
1364                                 si->session,
1365                                 si->arg,
1366                                 si->args);
1367   si->session->flow_delay_for_other_peer = delay;
1368   reschedule_session_timeout(si->session);
1369   return GNUNET_OK;
1370 }
1371
1372
1373 /**
1374  * We've received a UDP Message.  Process it (pass contents to main service).
1375  *
1376  * @param plugin plugin context
1377  * @param msg the message
1378  * @param sender_addr sender address
1379  * @param sender_addr_len number of bytes in sender_addr
1380  */
1381 static void
1382 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1383                      const struct sockaddr *sender_addr,
1384                      socklen_t sender_addr_len)
1385 {
1386   struct SourceInformation si;
1387   struct Session * s;
1388   struct IPv4UdpAddress u4;
1389   struct IPv6UdpAddress u6;
1390   const void *arg;
1391   size_t args;
1392
1393   if (0 != ntohl (msg->reserved))
1394   {
1395     GNUNET_break_op (0);
1396     return;
1397   }
1398   if (ntohs (msg->header.size) <
1399       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1400   {
1401     GNUNET_break_op (0);
1402     return;
1403   }
1404
1405   /* convert address */
1406   switch (sender_addr->sa_family)
1407   {
1408   case AF_INET:
1409     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1410     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1411     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1412     arg = &u4;
1413     args = sizeof (u4);
1414     break;
1415   case AF_INET6:
1416     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1417     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1418     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1419     arg = &u6;
1420     args = sizeof (u6);
1421     break;
1422   default:
1423     GNUNET_break (0);
1424     return;
1425   }
1426   LOG (GNUNET_ERROR_TYPE_DEBUG,
1427        "Received message with %u bytes from peer `%s' at `%s'\n",
1428        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1429        GNUNET_a2s (sender_addr, sender_addr_len));
1430
1431   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1432   s = udp_plugin_get_session(plugin, address);
1433   GNUNET_free (address);
1434
1435   /* iterate over all embedded messages */
1436   si.session = s;
1437   si.sender = msg->sender;
1438   si.arg = arg;
1439   si.args = args;
1440   s->rc++;
1441   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1442                              ntohs (msg->header.size) -
1443                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1444   s->rc--;
1445   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1446     free_session (s);
1447 }
1448
1449
1450 /**
1451  * Scan the heap for a receive context with the given address.
1452  *
1453  * @param cls the 'struct FindReceiveContext'
1454  * @param node internal node of the heap
1455  * @param element value stored at the node (a 'struct ReceiveContext')
1456  * @param cost cost associated with the node
1457  * @return GNUNET_YES if we should continue to iterate,
1458  *         GNUNET_NO if not.
1459  */
1460 static int
1461 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1462                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1463 {
1464   struct FindReceiveContext *frc = cls;
1465   struct DefragContext *e = element;
1466
1467   if ((frc->addr_len == e->addr_len) &&
1468       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1469   {
1470     frc->rc = e;
1471     return GNUNET_NO;
1472   }
1473   return GNUNET_YES;
1474 }
1475
1476
1477 /**
1478  * Process a defragmented message.
1479  *
1480  * @param cls the 'struct ReceiveContext'
1481  * @param msg the message
1482  */
1483 static void
1484 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1485 {
1486   struct DefragContext *rc = cls;
1487
1488   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1489   {
1490     GNUNET_break (0);
1491     return;
1492   }
1493   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1494   {
1495     GNUNET_break (0);
1496     return;
1497   }
1498   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1499                        rc->src_addr, rc->addr_len);
1500 }
1501
1502
1503 struct LookupContext
1504 {
1505   const struct sockaddr * addr;
1506
1507   struct Session *res;
1508
1509   size_t addrlen;
1510 };
1511
1512
1513 static int
1514 lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1515 {
1516   struct LookupContext *l_ctx = cls;
1517   struct Session * s = value;
1518
1519   if ((s->addrlen == l_ctx->addrlen) &&
1520       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1521   {
1522     l_ctx->res = s;
1523     return GNUNET_NO;
1524   }
1525   return GNUNET_YES;
1526 }
1527
1528
1529 /**
1530  * Transmit an acknowledgement.
1531  *
1532  * @param cls the 'struct ReceiveContext'
1533  * @param id message ID (unused)
1534  * @param msg ack to transmit
1535  */
1536 static void
1537 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1538 {
1539   struct DefragContext *rc = cls;
1540   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1541   struct UDP_ACK_Message *udp_ack;
1542   uint32_t delay = 0;
1543   struct UDPMessageWrapper *udpw;
1544   struct Session *s;
1545
1546   struct LookupContext l_ctx;
1547   l_ctx.addr = rc->src_addr;
1548   l_ctx.addrlen = rc->addr_len;
1549   l_ctx.res = NULL;
1550   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1551       &lookup_session_by_addr_it,
1552       &l_ctx);
1553   s = l_ctx.res;
1554
1555   if (NULL == s)
1556     return;
1557
1558   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1559     delay = s->flow_delay_for_other_peer.rel_value;
1560
1561   LOG (GNUNET_ERROR_TYPE_DEBUG,
1562        "Sending ACK to `%s' including delay of %u ms\n",
1563        GNUNET_a2s (rc->src_addr,
1564                    (rc->src_addr->sa_family ==
1565                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1566                                                                      sockaddr_in6)),
1567        delay);
1568   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
1569   udpw->msg_size = msize;
1570   udpw->session = s;
1571   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1572   udpw->udp = (char *)&udpw[1];
1573   udp_ack = (struct UDP_ACK_Message *) udpw->udp;
1574   udp_ack->header.size = htons ((uint16_t) msize);
1575   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1576   udp_ack->delay = htonl (delay);
1577   udp_ack->sender = *rc->plugin->env->my_identity;
1578   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1579
1580   enqueue (rc->plugin, udpw);
1581 }
1582
1583
1584 static void 
1585 read_process_msg (struct Plugin *plugin,
1586                   const struct GNUNET_MessageHeader *msg,
1587                   const char *addr,
1588                   socklen_t fromlen)
1589 {
1590   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1591   {
1592     GNUNET_break_op (0);
1593     return;
1594   }
1595   process_udp_message (plugin, (const struct UDPMessage *) msg,
1596                        (const struct sockaddr *) addr, fromlen);
1597 }
1598
1599
1600 static void 
1601 read_process_ack (struct Plugin *plugin,
1602                   const struct GNUNET_MessageHeader *msg,
1603                   char *addr,
1604                   socklen_t fromlen)
1605 {
1606   const struct GNUNET_MessageHeader *ack;
1607   const struct UDP_ACK_Message *udp_ack;
1608   struct LookupContext l_ctx;
1609   struct Session *s;
1610   struct GNUNET_TIME_Relative flow_delay;
1611
1612   if (ntohs (msg->size) <
1613       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1614   {
1615     GNUNET_break_op (0);
1616     return;
1617   }
1618   udp_ack = (const struct UDP_ACK_Message *) msg;
1619   l_ctx.addr = (const struct sockaddr *) addr;
1620   l_ctx.addrlen = fromlen;
1621   l_ctx.res = NULL;
1622   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1623       &lookup_session_by_addr_it,
1624       &l_ctx);
1625   s = l_ctx.res;
1626
1627   if ((s == NULL) || (s->frag_ctx == NULL))
1628     return;
1629
1630   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1631   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1632        flow_delay.rel_value);
1633   s->flow_delay_from_other_peer =
1634       GNUNET_TIME_relative_to_absolute (flow_delay);
1635
1636   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1637   if (ntohs (ack->size) !=
1638       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1639   {
1640     GNUNET_break_op (0);
1641     return;
1642   }
1643
1644   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1645   {
1646     LOG (GNUNET_ERROR_TYPE_DEBUG,
1647          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1648          (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1649          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1650     return;
1651   }
1652
1653   LOG (GNUNET_ERROR_TYPE_DEBUG,
1654        "FULL MESSAGE ACKed\n",
1655        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1656        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1657   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1658
1659   struct UDPMessageWrapper * udpw;
1660   struct UDPMessageWrapper * tmp;
1661   if (s->addrlen == sizeof (struct sockaddr_in6))
1662   {
1663     udpw = plugin->ipv6_queue_head;
1664     while (NULL != udpw)
1665     {
1666       tmp = udpw->next;
1667       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1668       {
1669         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1670         GNUNET_free (udpw);
1671       }
1672       udpw = tmp;
1673     }
1674   }
1675   if (s->addrlen == sizeof (struct sockaddr_in))
1676   {
1677     udpw = plugin->ipv4_queue_head;
1678     while (udpw!= NULL)
1679     {
1680       tmp = udpw->next;
1681       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1682       {
1683         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1684         GNUNET_free (udpw);
1685       }
1686       udpw = tmp;
1687     }
1688   }
1689
1690   if (s->frag_ctx->cont != NULL)
1691   {
1692     LOG (GNUNET_ERROR_TYPE_DEBUG,
1693         "Calling continuation for fragmented message to `%s' with result %s\n",
1694         GNUNET_i2s (&s->target), "OK");
1695     s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
1696   }
1697
1698   GNUNET_free (s->frag_ctx);
1699   s->frag_ctx = NULL;
1700 }
1701
1702
1703 static void 
1704 read_process_fragment (struct Plugin *plugin,
1705                        const struct GNUNET_MessageHeader *msg,
1706                        char *addr,
1707                        socklen_t fromlen)
1708 {
1709   struct DefragContext *d_ctx;
1710   struct GNUNET_TIME_Absolute now;
1711   struct FindReceiveContext frc;
1712
1713   frc.rc = NULL;
1714   frc.addr = (const struct sockaddr *) addr;
1715   frc.addr_len = fromlen;
1716
1717   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1718        (unsigned int) ntohs (msg->size),
1719        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1720   /* Lookup existing receive context for this address */
1721   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1722                                  &find_receive_context,
1723                                  &frc);
1724   now = GNUNET_TIME_absolute_get ();
1725   d_ctx = frc.rc;
1726
1727   if (d_ctx == NULL)
1728   {
1729     /* Create a new defragmentation context */
1730     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1731     memcpy (&d_ctx[1], addr, fromlen);
1732     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1733     d_ctx->addr_len = fromlen;
1734     d_ctx->plugin = plugin;
1735     d_ctx->defrag =
1736         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1737                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1738                                           &fragment_msg_proc, &ack_proc);
1739     d_ctx->hnode =
1740         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1741                                       (GNUNET_CONTAINER_HeapCostType)
1742                                       now.abs_value);
1743     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1744          "Created new defragmentation context for %u-byte fragment from `%s'\n",
1745          (unsigned int) ntohs (msg->size),
1746          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1747   }
1748   else
1749   {
1750     LOG (GNUNET_ERROR_TYPE_DEBUG,
1751          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1752          (unsigned int) ntohs (msg->size),
1753          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1754   }
1755
1756   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1757   {
1758     /* keep this 'rc' from expiring */
1759     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1760                                        (GNUNET_CONTAINER_HeapCostType)
1761                                        now.abs_value);
1762   }
1763   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1764       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1765   {
1766     /* remove 'rc' that was inactive the longest */
1767     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1768     GNUNET_assert (NULL != d_ctx);
1769     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1770     GNUNET_free (d_ctx);
1771   }
1772 }
1773
1774
1775 /**
1776  * Read and process a message from the given socket.
1777  *
1778  * @param plugin the overall plugin
1779  * @param rsock socket to read from
1780  */
1781 static void
1782 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1783 {
1784   socklen_t fromlen;
1785   char addr[32];
1786   char buf[65536] GNUNET_ALIGN;
1787   ssize_t size;
1788   const struct GNUNET_MessageHeader *msg;
1789
1790   fromlen = sizeof (addr);
1791   memset (&addr, 0, sizeof (addr));
1792   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1793                                       (struct sockaddr *) &addr, &fromlen);
1794 #if MINGW
1795   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
1796    * WSAECONNRESET error to indicate that previous sendto() (???)
1797    * on this socket has failed.
1798    */
1799   if ( (-1 == size) && (ECONNRESET == errno) )
1800     return;
1801 #endif
1802   if ( (-1 == size) || (size < sizeof (struct GNUNET_MessageHeader)))
1803   {
1804     GNUNET_break_op (0);
1805     return;
1806   }
1807   msg = (const struct GNUNET_MessageHeader *) buf;
1808
1809   LOG (GNUNET_ERROR_TYPE_DEBUG,
1810        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1811        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1812
1813   if (size != ntohs (msg->size))
1814   {
1815     GNUNET_break_op (0);
1816     return;
1817   }
1818
1819   switch (ntohs (msg->type))
1820   {
1821   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1822     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1823     return;
1824
1825   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1826     read_process_msg (plugin, msg, addr, fromlen);
1827     return;
1828
1829   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1830     read_process_ack (plugin, msg, addr, fromlen);
1831     return;
1832
1833   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1834     read_process_fragment (plugin, msg, addr, fromlen);
1835     return;
1836
1837   default:
1838     GNUNET_break_op (0);
1839     return;
1840   }
1841 }
1842
1843
1844 static size_t
1845 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
1846 {
1847   ssize_t sent;
1848   size_t slen;
1849   struct GNUNET_TIME_Absolute max;
1850   struct UDPMessageWrapper *udpw = NULL;
1851   static int network_down_error;
1852
1853   if (sock == plugin->sockv4)
1854   {
1855     udpw = plugin->ipv4_queue_head;
1856   }
1857   else if (sock == plugin->sockv6)
1858   {
1859     udpw = plugin->ipv6_queue_head;
1860   }
1861   else
1862   {
1863     GNUNET_break (0);
1864     return 0;
1865   }
1866
1867   const struct sockaddr * sa = udpw->session->sock_addr;
1868   slen = udpw->session->addrlen;
1869
1870   max = GNUNET_TIME_absolute_max(udpw->timeout, GNUNET_TIME_absolute_get());
1871
1872   while (udpw != NULL)
1873   {
1874     if (max.abs_value != udpw->timeout.abs_value)
1875     {
1876       /* Message timed out */
1877       call_continuation(udpw, GNUNET_SYSERR);
1878       if (udpw->frag_ctx != NULL)
1879       {
1880         LOG (GNUNET_ERROR_TYPE_DEBUG,
1881              "Fragmented message for peer `%s' with size %u timed out\n",
1882              GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
1883         udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
1884         GNUNET_free (udpw->frag_ctx);
1885         udpw->session->frag_ctx = NULL;
1886       }
1887       else
1888       {
1889         LOG (GNUNET_ERROR_TYPE_DEBUG, 
1890              "Message for peer `%s' with size %u timed out\n",
1891              GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1892       }
1893
1894       if (sock == plugin->sockv4)
1895       {
1896         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1897         GNUNET_free (udpw);
1898         udpw = plugin->ipv4_queue_head;
1899       }
1900       else if (sock == plugin->sockv6)
1901       {
1902         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1903         GNUNET_free (udpw);
1904         udpw = plugin->ipv6_queue_head;
1905       }
1906     }
1907     else
1908     {
1909       struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
1910       if (delta.rel_value == 0)
1911       {
1912         /* this message is not delayed */
1913         LOG (GNUNET_ERROR_TYPE_DEBUG, 
1914              "Message for peer `%s' (%u bytes) is not delayed \n",
1915              GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1916         break;
1917       }
1918       else
1919       {
1920         /* this message is delayed, try next */
1921         LOG (GNUNET_ERROR_TYPE_DEBUG,
1922              "Message for peer `%s' (%u bytes) is delayed for %llu \n",
1923              GNUNET_i2s(&udpw->session->target), udpw->msg_size,
1924              delta);
1925         udpw = udpw->next;
1926       }
1927     }
1928   }
1929
1930   if (udpw == NULL)
1931   {
1932     /* No message left */
1933     return 0;
1934   }
1935
1936   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->udp, udpw->msg_size, sa, slen);
1937
1938   if (GNUNET_SYSERR == sent)
1939   {
1940     const struct GNUNET_ATS_Information type = plugin->env->get_address_type
1941         (plugin->env->cls,sa, slen);
1942
1943     if (((GNUNET_ATS_NET_LAN == ntohl(type.value)) || (GNUNET_ATS_NET_WAN == ntohl(type.value))) &&
1944         ((ENETUNREACH == errno) || (ENETDOWN == errno)))
1945     {
1946       if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in)))
1947       {
1948         /* IPv4: "Network unreachable" or "Network down"
1949          *
1950          * This indicates we do not have connectivity
1951          */
1952         LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1953             _("UDP could not transmit message to `%s': "
1954               "Network seems down, please check your network configuration\n"),
1955             GNUNET_a2s (sa, slen));
1956       }
1957       if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in6)))
1958       {
1959         /* IPv6: "Network unreachable" or "Network down"
1960          *
1961          * This indicates that this system is IPv6 enabled, but does not
1962          * have a valid global IPv6 address assigned or we do not have
1963          * connectivity
1964          */
1965
1966        LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1967            _("UDP could not transmit message to `%s': "
1968              "Please check your network configuration and disable IPv6 if your "
1969              "connection does not have a global IPv6 address\n"),
1970            GNUNET_a2s (sa, slen));
1971       }
1972     }
1973     else
1974     {
1975       LOG (GNUNET_ERROR_TYPE_WARNING,
1976          "UDP could not transmit %u-byte message to `%s': `%s'\n",
1977          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen),
1978          STRERROR (errno));
1979     }
1980     call_continuation(udpw, GNUNET_SYSERR);
1981   }
1982   else
1983   {
1984     LOG (GNUNET_ERROR_TYPE_DEBUG,
1985          "UDP transmitted %u-byte message to `%s' (%d: %s)\n",
1986          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1987          (sent < 0) ? STRERROR (errno) : "ok");
1988     call_continuation(udpw, GNUNET_OK);
1989     network_down_error = GNUNET_NO;
1990   }
1991
1992   if (sock == plugin->sockv4)
1993     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1994   else if (sock == plugin->sockv6)
1995     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1996   GNUNET_free (udpw);
1997   udpw = NULL;
1998
1999   return sent;
2000 }
2001
2002
2003 /**
2004  * We have been notified that our readset has something to read.  We don't
2005  * know which socket needs to be read, so we have to check each one
2006  * Then reschedule this function to be called again once more is available.
2007  *
2008  * @param cls the plugin handle
2009  * @param tc the scheduling context (for rescheduling this function again)
2010  */
2011 static void
2012 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2013 {
2014   struct Plugin *plugin = cls;
2015
2016   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2017   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2018     return;
2019   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
2020        (NULL != plugin->sockv4) &&
2021        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)) )
2022     udp_select_read (plugin, plugin->sockv4);
2023   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2024        (NULL != plugin->sockv4) && 
2025        (NULL != plugin->ipv4_queue_head) &&
2026        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)) )
2027     udp_select_send (plugin, plugin->sockv4);   
2028   schedule_select (plugin);
2029 }
2030
2031
2032 /**
2033  * We have been notified that our readset has something to read.  We don't
2034  * know which socket needs to be read, so we have to check each one
2035  * Then reschedule this function to be called again once more is available.
2036  *
2037  * @param cls the plugin handle
2038  * @param tc the scheduling context (for rescheduling this function again)
2039  */
2040 static void
2041 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2042 {
2043   struct Plugin *plugin = cls;
2044
2045   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2046   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2047     return;
2048   if ( ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) &&
2049        (NULL != plugin->sockv6) &&
2050        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)) )
2051     udp_select_read (plugin, plugin->sockv6);
2052   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2053        (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
2054        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )    
2055     udp_select_send (plugin, plugin->sockv6);
2056   schedule_select (plugin);
2057 }
2058
2059
2060 static int
2061 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
2062 {
2063   int tries;
2064   int sockets_created = 0;
2065   struct sockaddr *serverAddr;
2066   struct sockaddr *addrs[2];
2067   socklen_t addrlens[2];
2068   socklen_t addrlen;
2069
2070   /* Create IPv6 socket */
2071   if (plugin->enable_ipv6 == GNUNET_YES)
2072   {
2073     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2074     if (NULL == plugin->sockv6)
2075     {
2076       LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2077       plugin->enable_ipv6 = GNUNET_NO;
2078     }
2079     else
2080     {
2081 #if HAVE_SOCKADDR_IN_SIN_LEN
2082       serverAddrv6->sin6_len = sizeof (serverAddrv6);
2083 #endif
2084       serverAddrv6->sin6_family = AF_INET6;
2085       serverAddrv6->sin6_addr = in6addr_any;
2086       serverAddrv6->sin6_port = htons (plugin->port);
2087       addrlen = sizeof (struct sockaddr_in6);
2088       serverAddr = (struct sockaddr *) serverAddrv6;
2089       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
2090            ntohs (serverAddrv6->sin6_port));
2091       tries = 0;
2092       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
2093              GNUNET_OK)
2094       {
2095         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
2096         LOG (GNUNET_ERROR_TYPE_DEBUG,
2097              "IPv6 Binding failed, trying new port %d\n",
2098              ntohs (serverAddrv6->sin6_port));
2099         tries++;
2100         if (tries > 10)
2101         {
2102           GNUNET_NETWORK_socket_close (plugin->sockv6);
2103           plugin->sockv6 = NULL;
2104           break;
2105         }
2106       }
2107       if (plugin->sockv6 != NULL)
2108       {
2109         LOG (GNUNET_ERROR_TYPE_DEBUG,
2110              "IPv6 socket created on port %d\n",
2111              ntohs (serverAddrv6->sin6_port));
2112         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
2113         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2114         sockets_created++;
2115       }
2116     }
2117   }
2118
2119   /* Create IPv4 socket */
2120   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2121   if (NULL == plugin->sockv4)
2122   {
2123     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2124   }
2125   else
2126   {
2127 #if HAVE_SOCKADDR_IN_SIN_LEN
2128     serverAddrv4->sin_len = sizeof (serverAddrv4);
2129 #endif
2130     serverAddrv4->sin_family = AF_INET;
2131     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2132     serverAddrv4->sin_port = htons (plugin->port);
2133     addrlen = sizeof (struct sockaddr_in);
2134     serverAddr = (struct sockaddr *) serverAddrv4;
2135
2136     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2137          ntohs (serverAddrv4->sin_port));
2138     tries = 0;
2139     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2140            GNUNET_OK)
2141     {
2142       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2143       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2144            ntohs (serverAddrv4->sin_port));
2145       tries++;
2146       if (tries > 10)
2147       {
2148         GNUNET_NETWORK_socket_close (plugin->sockv4);
2149         plugin->sockv4 = NULL;
2150         break;
2151       }
2152     }
2153     if (plugin->sockv4 != NULL)
2154     {
2155       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2156       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2157       sockets_created++;
2158     }
2159   }
2160
2161   /* Create file descriptors */
2162   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2163   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2164   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2165   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2166   if (NULL != plugin->sockv4)
2167   {
2168     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2169     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2170   }
2171
2172   if (0 == sockets_created)
2173     LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2174   if (plugin->enable_ipv6 == GNUNET_YES)
2175   {
2176     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2177     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2178     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2179     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2180     if (NULL != plugin->sockv6)
2181     {
2182       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2183       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2184     }
2185   }
2186   schedule_select (plugin);
2187   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2188                            GNUNET_NO, plugin->port,
2189                            sockets_created,
2190                            (const struct sockaddr **) addrs, addrlens,
2191                            &udp_nat_port_map_callback, NULL, plugin);
2192
2193   return sockets_created;
2194 }
2195
2196
2197 /**
2198  * The exported method. Makes the core api available via a global and
2199  * returns the udp transport API.
2200  *
2201  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2202  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2203  */
2204 void *
2205 libgnunet_plugin_transport_udp_init (void *cls)
2206 {
2207   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2208   struct GNUNET_TRANSPORT_PluginFunctions *api;
2209   struct Plugin *p;
2210   unsigned long long port;
2211   unsigned long long aport;
2212   unsigned long long broadcast;
2213   unsigned long long udp_max_bps;
2214   unsigned long long enable_v6;
2215   char * bind4_address;
2216   char * bind6_address;
2217   char * fancy_interval;
2218   struct GNUNET_TIME_Relative interval;
2219   struct sockaddr_in serverAddrv4;
2220   struct sockaddr_in6 serverAddrv6;
2221   int res;
2222
2223   if (NULL == env->receive)
2224   {
2225     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2226        initialze the plugin or the API */
2227     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2228     api->cls = NULL;
2229     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2230     api->address_to_string = &udp_address_to_string;
2231     api->string_to_address = &udp_string_to_address;
2232     return api;
2233   }
2234
2235   GNUNET_assert( NULL != env->stats);
2236
2237   /* Get port number */
2238   if (GNUNET_OK !=
2239       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2240                                              &port))
2241     port = 2086;
2242   if (GNUNET_OK !=
2243       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2244                                              "ADVERTISED_PORT", &aport))
2245     aport = port;
2246   if (port > 65535)
2247   {
2248     LOG (GNUNET_ERROR_TYPE_WARNING,
2249          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2250          65535);
2251     return NULL;
2252   }
2253
2254   /* Protocols */
2255   if ((GNUNET_YES ==
2256        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2257                                              "DISABLEV6")))
2258   {
2259     enable_v6 = GNUNET_NO;
2260   }
2261   else
2262     enable_v6 = GNUNET_YES;
2263
2264   /* Addresses */
2265   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2266   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2267
2268   if (GNUNET_YES ==
2269       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2270                                              "BINDTO", &bind4_address))
2271   {
2272     LOG (GNUNET_ERROR_TYPE_DEBUG,
2273          "Binding udp plugin to specific address: `%s'\n",
2274          bind4_address);
2275     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2276     {
2277       GNUNET_free (bind4_address);
2278       return NULL;
2279     }
2280   }
2281
2282   if (GNUNET_YES ==
2283       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2284                                              "BINDTO6", &bind6_address))
2285   {
2286     LOG (GNUNET_ERROR_TYPE_DEBUG,
2287          "Binding udp plugin to specific address: `%s'\n",
2288          bind6_address);
2289     if (1 !=
2290         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2291     {
2292       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2293            bind6_address);
2294       GNUNET_free_non_null (bind4_address);
2295       GNUNET_free (bind6_address);
2296       return NULL;
2297     }
2298   }
2299
2300   /* Enable neighbour discovery */
2301   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2302                                             "BROADCAST");
2303   if (broadcast == GNUNET_SYSERR)
2304     broadcast = GNUNET_NO;
2305
2306   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2307                                            "BROADCAST_INTERVAL", &fancy_interval))
2308   {
2309     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2310   }
2311   else
2312   {
2313      if (GNUNET_SYSERR == GNUNET_STRINGS_fancy_time_to_relative(fancy_interval, &interval))
2314      {
2315        interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
2316      }
2317      GNUNET_free (fancy_interval);
2318   }
2319
2320   /* Maximum datarate */
2321   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2322                                              "MAX_BPS", &udp_max_bps))
2323   {
2324     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2325   }
2326
2327   p = GNUNET_malloc (sizeof (struct Plugin));
2328   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2329
2330   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
2331                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2332   p->sessions = GNUNET_CONTAINER_multihashmap_create (10);
2333   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2334   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
2335   p->port = port;
2336   p->aport = aport;
2337   p->broadcast_interval = interval;
2338   p->enable_ipv6 = enable_v6;
2339   p->env = env;
2340
2341   plugin = p;
2342
2343   api->cls = p;
2344   api->send = NULL;
2345   api->disconnect = &udp_disconnect;
2346   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2347   api->address_to_string = &udp_address_to_string;
2348   api->string_to_address = &udp_string_to_address;
2349   api->check_address = &udp_plugin_check_address;
2350   api->get_session = &udp_plugin_get_session;
2351   api->send = &udp_plugin_send;
2352
2353   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2354   res = setup_sockets (p, &serverAddrv6, &serverAddrv4);
2355   if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
2356   {
2357     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2358     GNUNET_free (p);
2359     GNUNET_free (api);
2360     return NULL;
2361   }
2362
2363   if (broadcast == GNUNET_YES)
2364   {
2365     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2366     setup_broadcast (p, &serverAddrv6, &serverAddrv4);
2367   }
2368
2369   GNUNET_free_non_null (bind4_address);
2370   GNUNET_free_non_null (bind6_address);
2371   return api;
2372 }
2373
2374
2375 static int
2376 heap_cleanup_iterator (void *cls,
2377                        struct GNUNET_CONTAINER_HeapNode *
2378                        node, void *element,
2379                        GNUNET_CONTAINER_HeapCostType
2380                        cost)
2381 {
2382   struct DefragContext * d_ctx = element;
2383
2384   GNUNET_CONTAINER_heap_remove_node (node);
2385   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2386   GNUNET_free (d_ctx);
2387
2388   return GNUNET_YES;
2389 }
2390
2391
2392 /**
2393  * The exported method. Makes the core api available via a global and
2394  * returns the udp transport API.
2395  *
2396  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2397  * @return NULL
2398  */
2399 void *
2400 libgnunet_plugin_transport_udp_done (void *cls)
2401 {
2402   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2403   struct Plugin *plugin = api->cls;
2404
2405   if (NULL == plugin)
2406   {
2407     GNUNET_free (api);
2408     return NULL;
2409   }
2410
2411   stop_broadcast (plugin);
2412   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2413   {
2414     GNUNET_SCHEDULER_cancel (plugin->select_task);
2415     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2416   }
2417   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2418   {
2419     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2420     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2421   }
2422
2423   /* Closing sockets */
2424   if (plugin->sockv4 != NULL)
2425   {
2426     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2427     plugin->sockv4 = NULL;
2428   }
2429   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2430   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2431
2432   if (plugin->sockv6 != NULL)
2433   {
2434     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2435     plugin->sockv6 = NULL;
2436
2437     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2438     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2439   }
2440
2441   GNUNET_NAT_unregister (plugin->nat);
2442
2443   if (plugin->defrag_ctxs != NULL)
2444   {
2445     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2446         heap_cleanup_iterator, NULL);
2447     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2448     plugin->defrag_ctxs = NULL;
2449   }
2450   if (plugin->mst != NULL)
2451   {
2452     GNUNET_SERVER_mst_destroy(plugin->mst);
2453     plugin->mst = NULL;
2454   }
2455
2456   /* Clean up leftover messages */
2457   struct UDPMessageWrapper * udpw;
2458   udpw = plugin->ipv4_queue_head;
2459   while (udpw != NULL)
2460   {
2461     struct UDPMessageWrapper *tmp = udpw->next;
2462     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
2463     call_continuation(udpw, GNUNET_SYSERR);
2464     GNUNET_free (udpw);
2465     udpw = tmp;
2466   }
2467   udpw = plugin->ipv6_queue_head;
2468   while (udpw != NULL)
2469   {
2470     struct UDPMessageWrapper *tmp = udpw->next;
2471     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
2472     call_continuation(udpw, GNUNET_SYSERR);
2473     GNUNET_free (udpw);
2474     udpw = tmp;
2475   }
2476
2477   /* Clean up sessions */
2478   LOG (GNUNET_ERROR_TYPE_DEBUG,
2479        "Cleaning up sessions\n");
2480   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2481   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2482
2483   plugin->nat = NULL;
2484   GNUNET_free (plugin);
2485   GNUNET_free (api);
2486   return NULL;
2487 }
2488
2489
2490 /* end of plugin_transport_udp.c */