7bde633f29b06287e17f17579b1a1e2514823670
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 struct Session
90 {
91   /**
92    * Which peer is this session for?
93    */
94   struct GNUNET_PeerIdentity target;
95
96   /**
97    * Address of the other peer
98    */
99   const struct sockaddr *sock_addr;
100
101   size_t addrlen;
102
103   /**
104    * Desired delay for next sending we send to other peer
105    */
106   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
107
108   /**
109    * Desired delay for next sending we received from other peer
110    */
111   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
112
113   /**
114    * expected delay for ACKs
115    */
116   struct GNUNET_TIME_Relative last_expected_delay;
117
118   struct GNUNET_ATS_Information ats;
119
120   struct FragmentationContext * frag_ctx;
121
122   unsigned int rc;
123
124   int in_destroy;
125 };
126
127
128 struct SessionCompareContext
129 {
130   struct Session *res;
131   const struct GNUNET_HELLO_Address *addr;
132 };
133
134
135 /**
136  * Closure for 'process_inbound_tokenized_messages'
137  */
138 struct SourceInformation
139 {
140   /**
141    * Sender identity.
142    */
143   struct GNUNET_PeerIdentity sender;
144
145   /**
146    * Source address.
147    */
148   const void *arg;
149
150   /**
151    * Number of bytes in source address.
152    */
153   size_t args;
154
155   struct Session *session;
156 };
157
158
159 /**
160  * Closure for 'find_receive_context'.
161  */
162 struct FindReceiveContext
163 {
164   /**
165    * Where to store the result.
166    */
167   struct DefragContext *rc;
168
169   /**
170    * Address to find.
171    */
172   const struct sockaddr *addr;
173
174   /**
175    * Number of bytes in 'addr'.
176    */
177   socklen_t addr_len;
178
179   struct Session *session;
180 };
181
182
183
184 /**
185  * Data structure to track defragmentation contexts based
186  * on the source of the UDP traffic.
187  */
188 struct DefragContext
189 {
190
191   /**
192    * Defragmentation context.
193    */
194   struct GNUNET_DEFRAGMENT_Context *defrag;
195
196   /**
197    * Source address this receive context is for (allocated at the
198    * end of the struct).
199    */
200   const struct sockaddr *src_addr;
201
202   /**
203    * Reference to master plugin struct.
204    */
205   struct Plugin *plugin;
206
207   /**
208    * Node in the defrag heap.
209    */
210   struct GNUNET_CONTAINER_HeapNode *hnode;
211
212   /**
213    * Length of 'src_addr'
214    */
215   size_t addr_len;
216 };
217
218
219
220 /**
221  * Closure for 'process_inbound_tokenized_messages'
222  */
223 struct FragmentationContext
224 {
225   struct FragmentationContext * next;
226   struct FragmentationContext * prev;
227
228   struct Plugin * plugin;
229   struct GNUNET_FRAGMENT_Context * frag;
230   struct Session * session;
231
232   struct GNUNET_TIME_Absolute timeout;
233
234
235   /**
236    * Function to call upon completion of the transmission.
237    */
238   GNUNET_TRANSPORT_TransmitContinuation cont;
239
240   /**
241    * Closure for 'cont'.
242    */
243   void *cont_cls;
244
245   size_t bytes_to_send;
246 };
247
248
249 struct UDPMessageWrapper
250 {
251   struct Session *session;
252   struct UDPMessageWrapper *prev;
253   struct UDPMessageWrapper *next;
254   char *udp;
255   size_t msg_size;
256
257   struct GNUNET_TIME_Absolute timeout;
258
259   /**
260    * Function to call upon completion of the transmission.
261    */
262   GNUNET_TRANSPORT_TransmitContinuation cont;
263
264   /**
265    * Closure for 'cont'.
266    */
267   void *cont_cls;
268
269   struct FragmentationContext *frag_ctx;
270
271 };
272
273
274 /**
275  * UDP ACK Message-Packet header (after defragmentation).
276  */
277 struct UDP_ACK_Message
278 {
279   /**
280    * Message header.
281    */
282   struct GNUNET_MessageHeader header;
283
284   /**
285    * Desired delay for flow control
286    */
287   uint32_t delay;
288
289   /**
290    * What is the identity of the sender
291    */
292   struct GNUNET_PeerIdentity sender;
293
294 };
295
296 static int cont_calls;
297
298 /**
299  * We have been notified that our readset has something to read.  We don't
300  * know which socket needs to be read, so we have to check each one
301  * Then reschedule this function to be called again once more is available.
302  *
303  * @param cls the plugin handle
304  * @param tc the scheduling context (for rescheduling this function again)
305  */
306 static void
307 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
308
309 /**
310  * We have been notified that our readset has something to read.  We don't
311  * know which socket needs to be read, so we have to check each one
312  * Then reschedule this function to be called again once more is available.
313  *
314  * @param cls the plugin handle
315  * @param tc the scheduling context (for rescheduling this function again)
316  */
317 static void
318 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
319
320 /**
321  * Function called for a quick conversion of the binary address to
322  * a numeric address.  Note that the caller must not free the
323  * address and that the next call to this function is allowed
324  * to override the address again.
325  *
326  * @param cls closure
327  * @param addr binary address
328  * @param addrlen length of the address
329  * @return string representing the same address
330  */
331 const char *
332 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
333 {
334   static char rbuf[INET6_ADDRSTRLEN + 10];
335   char buf[INET6_ADDRSTRLEN];
336   const void *sb;
337   struct in_addr a4;
338   struct in6_addr a6;
339   const struct IPv4UdpAddress *t4;
340   const struct IPv6UdpAddress *t6;
341   int af;
342   uint16_t port;
343
344   if (addrlen == sizeof (struct IPv6UdpAddress))
345   {
346     t6 = addr;
347     af = AF_INET6;
348     port = ntohs (t6->u6_port);
349     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
350     sb = &a6;
351   }
352   else if (addrlen == sizeof (struct IPv4UdpAddress))
353   {
354     t4 = addr;
355     af = AF_INET;
356     port = ntohs (t4->u4_port);
357     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
358     sb = &a4;
359   }
360   else
361   {
362     GNUNET_break_op (0);
363     return NULL;
364   }
365   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
366   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
367                    buf, port);
368   return rbuf;
369 }
370
371
372 /**
373  * Function called to convert a string address to
374  * a binary address.
375  *
376  * @param cls closure ('struct Plugin*')
377  * @param addr string address
378  * @param addrlen length of the address
379  * @param buf location to store the buffer
380  * @param added location to store the number of bytes in the buffer.
381  *        If the function returns GNUNET_SYSERR, its contents are undefined.
382  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
383  */
384 int
385 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
386     void **buf, size_t *added)
387 {
388   struct sockaddr_storage socket_address;
389
390   if ((NULL == addr) || (addrlen == 0))
391   {
392     GNUNET_break (0);
393     return GNUNET_SYSERR;
394   }
395
396   if ('\0' != addr[addrlen - 1])
397   {
398     return GNUNET_SYSERR;
399   }
400
401   if (strlen (addr) != addrlen - 1)
402   {
403     return GNUNET_SYSERR;
404   }
405
406   int ret = GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
407     &socket_address);
408
409   if (ret != GNUNET_OK)
410   {
411     return GNUNET_SYSERR;
412   }
413
414   if (socket_address.ss_family == AF_INET)
415   {
416     struct IPv4UdpAddress *u4;
417     struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
418     u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
419     u4->ipv4_addr = in4->sin_addr.s_addr;
420     u4->u4_port = in4->sin_port;
421     *buf = u4;
422     *added = sizeof (struct IPv4UdpAddress);
423     return GNUNET_OK;
424   }
425   else if (socket_address.ss_family == AF_INET6)
426   {
427     struct IPv6UdpAddress *u6;
428     struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
429     u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
430     u6->ipv6_addr = in6->sin6_addr;
431     u6->u6_port = in6->sin6_port;
432     *buf = u6;
433     *added = sizeof (struct IPv6UdpAddress);
434     return GNUNET_OK;
435   }
436   return GNUNET_SYSERR;
437 }
438
439
440 /**
441  * Append our port and forward the result.
442  *
443  * @param cls a 'struct PrettyPrinterContext'
444  * @param hostname result from DNS resolver
445  */
446 static void
447 append_port (void *cls, const char *hostname)
448 {
449   struct PrettyPrinterContext *ppc = cls;
450   char *ret;
451
452   if (hostname == NULL)
453   {
454     ppc->asc (ppc->asc_cls, NULL);
455     GNUNET_free (ppc);
456     return;
457   }
458   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
459   ppc->asc (ppc->asc_cls, ret);
460   GNUNET_free (ret);
461 }
462
463
464 /**
465  * Convert the transports address to a nice, human-readable
466  * format.
467  *
468  * @param cls closure
469  * @param type name of the transport that generated the address
470  * @param addr one of the addresses of the host, NULL for the last address
471  *        the specific address format depends on the transport
472  * @param addrlen length of the address
473  * @param numeric should (IP) addresses be displayed in numeric form?
474  * @param timeout after how long should we give up?
475  * @param asc function to call on each string
476  * @param asc_cls closure for asc
477  */
478 static void
479 udp_plugin_address_pretty_printer (void *cls, const char *type,
480                                    const void *addr, size_t addrlen,
481                                    int numeric,
482                                    struct GNUNET_TIME_Relative timeout,
483                                    GNUNET_TRANSPORT_AddressStringCallback asc,
484                                    void *asc_cls)
485 {
486   struct PrettyPrinterContext *ppc;
487   const void *sb;
488   size_t sbs;
489   struct sockaddr_in a4;
490   struct sockaddr_in6 a6;
491   const struct IPv4UdpAddress *u4;
492   const struct IPv6UdpAddress *u6;
493   uint16_t port;
494
495   if (addrlen == sizeof (struct IPv6UdpAddress))
496   {
497     u6 = addr;
498     memset (&a6, 0, sizeof (a6));
499     a6.sin6_family = AF_INET6;
500 #if HAVE_SOCKADDR_IN_SIN_LEN
501     a6.sin6_len = sizeof (a6);
502 #endif
503     a6.sin6_port = u6->u6_port;
504     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
505     port = ntohs (u6->u6_port);
506     sb = &a6;
507     sbs = sizeof (a6);
508   }
509   else if (addrlen == sizeof (struct IPv4UdpAddress))
510   {
511     u4 = addr;
512     memset (&a4, 0, sizeof (a4));
513     a4.sin_family = AF_INET;
514 #if HAVE_SOCKADDR_IN_SIN_LEN
515     a4.sin_len = sizeof (a4);
516 #endif
517     a4.sin_port = u4->u4_port;
518     a4.sin_addr.s_addr = u4->ipv4_addr;
519     port = ntohs (u4->u4_port);
520     sb = &a4;
521     sbs = sizeof (a4);
522   }
523   else if (0 == addrlen)
524   {
525     asc (asc_cls, "<inbound connection>");
526     asc (asc_cls, NULL);
527     return;
528   }
529   else
530   {
531     /* invalid address */
532     GNUNET_break_op (0);
533     asc (asc_cls, NULL);
534     return;
535   }
536   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
537   ppc->asc = asc;
538   ppc->asc_cls = asc_cls;
539   ppc->port = port;
540   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
541 }
542
543 static void
544 call_continuation (struct UDPMessageWrapper *udpw, int result)
545 {
546   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
547       "Calling continuation for %u byte message to `%s' with result %s\n",
548       udpw->msg_size, GNUNET_i2s (&udpw->session->target),
549       (GNUNET_OK == result) ? "OK" : "SYSERR");
550   if (NULL != udpw->cont)
551   {
552     udpw->cont (udpw->cont_cls, &udpw->session->target,result);
553     GNUNET_assert (cont_calls > 0);
554     cont_calls --;
555   }
556
557 }
558
559 /**
560  * Check if the given port is plausible (must be either our listen
561  * port or our advertised port).  If it is neither, we return
562  * GNUNET_SYSERR.
563  *
564  * @param plugin global variables
565  * @param in_port port number to check
566  * @return GNUNET_OK if port is either open_port or adv_port
567  */
568 static int
569 check_port (struct Plugin *plugin, uint16_t in_port)
570 {
571   if ((in_port == plugin->port) || (in_port == plugin->aport))
572     return GNUNET_OK;
573   return GNUNET_SYSERR;
574 }
575
576
577
578 /**
579  * Function that will be called to check if a binary address for this
580  * plugin is well-formed and corresponds to an address for THIS peer
581  * (as per our configuration).  Naturally, if absolutely necessary,
582  * plugins can be a bit conservative in their answer, but in general
583  * plugins should make sure that the address does not redirect
584  * traffic to a 3rd party that might try to man-in-the-middle our
585  * traffic.
586  *
587  * @param cls closure, should be our handle to the Plugin
588  * @param addr pointer to the address
589  * @param addrlen length of addr
590  * @return GNUNET_OK if this is a plausible address for this peer
591  *         and transport, GNUNET_SYSERR if not
592  *
593  */
594 static int
595 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
596 {
597   struct Plugin *plugin = cls;
598   struct IPv4UdpAddress *v4;
599   struct IPv6UdpAddress *v6;
600
601   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
602       (addrlen != sizeof (struct IPv6UdpAddress)))
603   {
604     GNUNET_break_op (0);
605     return GNUNET_SYSERR;
606   }
607   if (addrlen == sizeof (struct IPv4UdpAddress))
608   {
609     v4 = (struct IPv4UdpAddress *) addr;
610     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
611       return GNUNET_SYSERR;
612     if (GNUNET_OK !=
613         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
614                                  sizeof (struct in_addr)))
615       return GNUNET_SYSERR;
616   }
617   else
618   {
619     v6 = (struct IPv6UdpAddress *) addr;
620     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
621     {
622       GNUNET_break_op (0);
623       return GNUNET_SYSERR;
624     }
625     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
626       return GNUNET_SYSERR;
627     if (GNUNET_OK !=
628         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
629                                  sizeof (struct in6_addr)))
630       return GNUNET_SYSERR;
631   }
632   return GNUNET_OK;
633 }
634
635
636 /**
637  * Task to free resources associated with a session.
638  *
639  * @param s session to free
640  */
641 static void
642 free_session (struct Session *s)
643 {
644   if (s->frag_ctx != NULL)
645   {
646     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
647     GNUNET_free (s->frag_ctx);
648     s->frag_ctx = NULL;
649   }
650   GNUNET_free (s);
651 }
652
653
654 /**
655  * Destroy a session, plugin is being unloaded.
656  *
657  * @param cls unused
658  * @param key hash of public key of target peer
659  * @param value a 'struct PeerSession*' to clean up
660  * @return GNUNET_OK (continue to iterate)
661  */
662 static int
663 disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
664 {
665   struct Plugin *plugin = cls;
666   struct Session *s = value;
667   struct UDPMessageWrapper *udpw;
668   struct UDPMessageWrapper *next;
669
670   GNUNET_assert (GNUNET_YES != s->in_destroy);
671   LOG (GNUNET_ERROR_TYPE_DEBUG,
672        "Session %p to peer `%s' address ended \n",
673          s,
674          GNUNET_i2s (&s->target),
675          GNUNET_a2s (s->sock_addr, s->addrlen));
676   next = plugin->ipv4_queue_head;
677   while (NULL != (udpw = next))
678   {
679     next = udpw->next;
680     if (udpw->session == s)
681     {
682       GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
683       call_continuation(udpw, GNUNET_SYSERR);
684       GNUNET_free (udpw);
685     }
686   }
687   next = plugin->ipv6_queue_head;
688   while (NULL != (udpw = next))
689   {
690     next = udpw->next;
691     if (udpw->session == s)
692     {
693       GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
694       call_continuation(udpw, GNUNET_SYSERR);
695       GNUNET_free (udpw);
696     }
697     udpw = next;
698   }
699   plugin->env->session_end (plugin->env->cls, &s->target, s);
700
701   if (NULL != s->frag_ctx)
702   {
703     if (NULL != s->frag_ctx->cont)
704     {
705       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR);
706       cont_calls --;
707       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
708           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
709           GNUNET_i2s (&s->target));
710     }
711   }
712
713   GNUNET_assert (GNUNET_YES ==
714                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
715                                                        &s->target.hashPubKey,
716                                                        s));
717   GNUNET_STATISTICS_set(plugin->env->stats,
718                         "# UDP sessions active",
719                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
720                         GNUNET_NO);
721   if (s->rc > 0)
722     s->in_destroy = GNUNET_YES;
723   else
724     free_session (s);
725   return GNUNET_OK;
726 }
727
728
729 /**
730  * Disconnect from a remote node.  Clean up session if we have one for this peer
731  *
732  * @param cls closure for this call (should be handle to Plugin)
733  * @param target the peeridentity of the peer to disconnect
734  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
735  */
736 static void
737 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
738 {
739   struct Plugin *plugin = cls;
740   GNUNET_assert (plugin != NULL);
741
742   GNUNET_assert (target != NULL);
743   LOG (GNUNET_ERROR_TYPE_DEBUG,
744        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
745   /* Clean up sessions */
746   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
747 }
748
749 static struct Session *
750 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
751                 const void *addr, size_t addrlen,
752                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
753 {
754   struct Session *s;
755   const struct IPv4UdpAddress *t4;
756   const struct IPv6UdpAddress *t6;
757   struct sockaddr_in *v4;
758   struct sockaddr_in6 *v6;
759   size_t len;
760
761   switch (addrlen)
762   {
763   case sizeof (struct IPv4UdpAddress):
764     if (NULL == plugin->sockv4)
765     {
766       return NULL;
767     }
768     t4 = addr;
769     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
770     len = sizeof (struct sockaddr_in);
771     v4 = (struct sockaddr_in *) &s[1];
772     v4->sin_family = AF_INET;
773 #if HAVE_SOCKADDR_IN_SIN_LEN
774     v4->sin_len = sizeof (struct sockaddr_in);
775 #endif
776     v4->sin_port = t4->u4_port;
777     v4->sin_addr.s_addr = t4->ipv4_addr;
778     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
779     break;
780   case sizeof (struct IPv6UdpAddress):
781     if (NULL == plugin->sockv6)
782     {
783       return NULL;
784     }
785     t6 = addr;
786     s =
787         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
788     len = sizeof (struct sockaddr_in6);
789     v6 = (struct sockaddr_in6 *) &s[1];
790     v6->sin6_family = AF_INET6;
791 #if HAVE_SOCKADDR_IN_SIN_LEN
792     v6->sin6_len = sizeof (struct sockaddr_in6);
793 #endif
794     v6->sin6_port = t6->u6_port;
795     v6->sin6_addr = t6->ipv6_addr;
796     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
797     break;
798   default:
799     /* Must have a valid address to send to */
800     GNUNET_break_op (0);
801     return NULL;
802   }
803
804   s->addrlen = len;
805   s->target = *target;
806   s->sock_addr = (const struct sockaddr *) &s[1];
807   s->flow_delay_for_other_peer = GNUNET_TIME_relative_get_zero();
808   s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
809   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
810
811   return s;
812 }
813
814
815 static int
816 session_cmp_it (void *cls,
817                 const GNUNET_HashCode * key,
818                 void *value)
819 {
820   struct SessionCompareContext * cctx = cls;
821   const struct GNUNET_HELLO_Address *address = cctx->addr;
822   struct Session *s = value;
823
824   socklen_t s_addrlen = s->addrlen;
825
826 #if VERBOSE_UDP
827   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
828       udp_address_to_string (NULL, (void *) address->address, address->address_length),
829       GNUNET_a2s (s->sock_addr, s->addrlen));
830 #endif
831
832   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
833       (s_addrlen == sizeof (struct sockaddr_in)))
834   {
835     struct IPv4UdpAddress * u4 = NULL;
836     u4 = (struct IPv4UdpAddress *) address->address;
837     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
838     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
839         (u4->u4_port == s4->sin_port))
840     {
841       cctx->res = s;
842       return GNUNET_NO;
843     }
844
845   }
846   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
847       (s_addrlen == sizeof (struct sockaddr_in6)))
848   {
849     struct IPv6UdpAddress * u6 = NULL;
850     u6 = (struct IPv6UdpAddress *) address->address;
851     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
852     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
853         (u6->u6_port == s6->sin6_port))
854     {
855       cctx->res = s;
856       return GNUNET_NO;
857     }
858   }
859
860
861   return GNUNET_YES;
862 }
863
864
865 /**
866  * Creates a new outbound session the transport service will use to send data to the
867  * peer
868  *
869  * @param cls the plugin
870  * @param address the address
871  * @return the session or NULL of max connections exceeded
872  */
873 static struct Session *
874 udp_plugin_get_session (void *cls,
875                   const struct GNUNET_HELLO_Address *address)
876 {
877   struct Session * s = NULL;
878   struct Plugin * plugin = cls;
879   struct IPv6UdpAddress * udp_a6;
880   struct IPv4UdpAddress * udp_a4;
881
882   GNUNET_assert (plugin != NULL);
883   GNUNET_assert (address != NULL);
884
885
886   if ((address->address == NULL) ||
887       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
888       (address->address_length != sizeof (struct IPv6UdpAddress))))
889   {
890     GNUNET_break (0);
891     return NULL;
892   }
893
894   if (address->address_length == sizeof (struct IPv4UdpAddress))
895   {
896     if (plugin->sockv4 == NULL)
897       return NULL;
898     udp_a4 = (struct IPv4UdpAddress *) address->address;
899     if (udp_a4->u4_port == 0)
900       return NULL;
901   }
902
903   if (address->address_length == sizeof (struct IPv6UdpAddress))
904   {
905     if (plugin->sockv6 == NULL)
906       return NULL;
907     udp_a6 = (struct IPv6UdpAddress *) address->address;
908     if (udp_a6->u6_port == 0)
909       return NULL;
910   }
911
912   /* check if session already exists */
913   struct SessionCompareContext cctx;
914   cctx.addr = address;
915   cctx.res = NULL;
916 #if VERBOSE_UDP
917   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length));
918 #endif
919   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
920   if (cctx.res != NULL)
921   {
922 #if VERBOSE_UDP
923     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
924 #endif
925     return cctx.res;
926   }
927
928   /* otherwise create new */
929   s = create_session (plugin,
930       &address->peer,
931       address->address,
932       address->address_length,
933       NULL, NULL);
934 #if VERBOSE
935     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
936               "Creating new session %p for peer `%s' address `%s'\n",
937               s,
938               GNUNET_i2s(&address->peer),
939               udp_address_to_string(NULL,address->address,address->address_length));
940 #endif
941   GNUNET_assert (GNUNET_OK ==
942                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
943                                                     &s->target.hashPubKey,
944                                                     s,
945                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
946
947   GNUNET_STATISTICS_set(plugin->env->stats,
948                         "# UDP sessions active",
949                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
950                         GNUNET_NO);
951
952   return s;
953 }
954
955
956 static void 
957 enqueue (struct Plugin *plugin, struct UDPMessageWrapper * udpw)
958 {
959
960   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
961     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
962   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
963     GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
964 }
965
966
967 /**
968  * Fragment message was transmitted via UDP, let fragmentation know
969  * to send the next fragment now.
970  *
971  * @param cls the 'struct UDPMessageWrapper' of the fragment
972  * @param target destination peer (ignored)
973  * @param result GNUNET_OK on success (ignored)
974  */
975 static void
976 send_next_fragment (void *cls,
977                     const struct GNUNET_PeerIdentity *target,
978                     int result)
979 {
980   struct UDPMessageWrapper *udpw = cls;
981
982   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
983 }
984
985
986 /**
987  * Function that is called with messages created by the fragmentation
988  * module.  In the case of the 'proc' callback of the
989  * GNUNET_FRAGMENT_context_create function, this function must
990  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
991  *
992  * @param cls closure, the 'struct FragmentationContext'
993  * @param msg the message that was created
994  */
995 static void
996 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
997 {
998   struct FragmentationContext *frag_ctx = cls;
999   struct Plugin *plugin = frag_ctx->plugin;
1000   struct UDPMessageWrapper * udpw;
1001   struct Session *s;
1002
1003   size_t msg_len = ntohs (msg->size);
1004
1005 #if VERBOSE_UDP
1006   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper));
1007 #endif
1008
1009   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
1010   udpw->session = frag_ctx->session;
1011   s = udpw->session;
1012   udpw->udp = (char *) &udpw[1];
1013
1014   udpw->msg_size = msg_len;
1015   udpw->cont = &send_next_fragment;
1016   udpw->cont_cls = udpw;
1017   udpw->timeout = frag_ctx->timeout;
1018   udpw->frag_ctx = frag_ctx;
1019   memcpy (udpw->udp, msg, msg_len);
1020
1021   enqueue (plugin, udpw);
1022
1023
1024   if (s->addrlen == sizeof (struct sockaddr_in))
1025   {
1026     if (plugin->with_v4_ws == GNUNET_NO)
1027     {
1028       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1029         GNUNET_SCHEDULER_cancel(plugin->select_task);
1030
1031       plugin->select_task =
1032           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1033                                        GNUNET_TIME_UNIT_FOREVER_REL,
1034                                        plugin->rs_v4,
1035                                        plugin->ws_v4,
1036                                        &udp_plugin_select, plugin);
1037       plugin->with_v4_ws = GNUNET_YES;
1038     }
1039   }
1040
1041   else if (s->addrlen == sizeof (struct sockaddr_in6))
1042   {
1043     if (plugin->with_v6_ws == GNUNET_NO)
1044     {
1045       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1046         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1047
1048       plugin->select_task_v6 =
1049           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1050                                        GNUNET_TIME_UNIT_FOREVER_REL,
1051                                        plugin->rs_v6,
1052                                        plugin->ws_v6,
1053                                        &udp_plugin_select_v6, plugin);
1054       plugin->with_v6_ws = GNUNET_YES;
1055     }
1056   }
1057 }
1058
1059
1060 /**
1061  * Function that can be used by the transport service to transmit
1062  * a message using the plugin.   Note that in the case of a
1063  * peer disconnecting, the continuation MUST be called
1064  * prior to the disconnect notification itself.  This function
1065  * will be called with this peer's HELLO message to initiate
1066  * a fresh connection to another peer.
1067  *
1068  * @param cls closure
1069  * @param s which session must be used
1070  * @param msgbuf the message to transmit
1071  * @param msgbuf_size number of bytes in 'msgbuf'
1072  * @param priority how important is the message (most plugins will
1073  *                 ignore message priority and just FIFO)
1074  * @param to how long to wait at most for the transmission (does not
1075  *                require plugins to discard the message after the timeout,
1076  *                just advisory for the desired delay; most plugins will ignore
1077  *                this as well)
1078  * @param cont continuation to call once the message has
1079  *        been transmitted (or if the transport is ready
1080  *        for the next transmission call; or if the
1081  *        peer disconnected...); can be NULL
1082  * @param cont_cls closure for cont
1083  * @return number of bytes used (on the physical network, with overheads);
1084  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1085  *         and does NOT mean that the message was not transmitted (DV)
1086  */
1087 static ssize_t
1088 udp_plugin_send (void *cls,
1089                   struct Session *s,
1090                   const char *msgbuf, size_t msgbuf_size,
1091                   unsigned int priority,
1092                   struct GNUNET_TIME_Relative to,
1093                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1094 {
1095   struct Plugin *plugin = cls;
1096   size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
1097
1098   struct UDPMessageWrapper * udpw;
1099   struct UDPMessage *udp;
1100   char mbuf[mlen];
1101   GNUNET_assert (plugin != NULL);
1102   GNUNET_assert (s != NULL);
1103
1104   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1105     return GNUNET_SYSERR;
1106
1107    if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1108      return GNUNET_SYSERR;
1109
1110
1111   if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1112   {
1113     GNUNET_break (0);
1114     return GNUNET_SYSERR;
1115   }
1116
1117   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1118   {
1119     GNUNET_break (0);
1120     return GNUNET_SYSERR;
1121   }
1122
1123   LOG (GNUNET_ERROR_TYPE_DEBUG,
1124        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1125          mlen,
1126          GNUNET_i2s (&s->target),
1127          GNUNET_a2s(s->sock_addr, s->addrlen));
1128
1129   /* Message */
1130   udp = (struct UDPMessage *) mbuf;
1131   udp->header.size = htons (mlen);
1132   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1133   udp->reserved = htonl (0);
1134   udp->sender = *plugin->env->my_identity;
1135
1136   if (mlen <= UDP_MTU)
1137   {
1138     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
1139     udpw->session = s;
1140     udpw->udp = (char *) &udpw[1];
1141     udpw->msg_size = mlen;
1142     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1143     udpw->cont = cont;
1144     udpw->cont_cls = cont_cls;
1145     udpw->frag_ctx = NULL;
1146     if (NULL != udpw->cont)
1147     {
1148       cont_calls ++;
1149     }
1150     memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
1151     memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1152
1153     enqueue (plugin, udpw);
1154   }
1155   else
1156   {
1157     LOG (GNUNET_ERROR_TYPE_DEBUG,
1158          "UDP has to fragment message \n");
1159     if  (s->frag_ctx != NULL)
1160       return GNUNET_SYSERR;
1161     memcpy (&udp[1], msgbuf, msgbuf_size);
1162     struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext));
1163
1164     frag_ctx->plugin = plugin;
1165     frag_ctx->session = s;
1166     frag_ctx->cont = cont;
1167     frag_ctx->cont_cls = cont_cls;
1168     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1169     frag_ctx->bytes_to_send = mlen;
1170     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1171               UDP_MTU,
1172               &plugin->tracker,
1173               s->last_expected_delay,
1174               &udp->header,
1175               &enqueue_fragment,
1176               frag_ctx);
1177
1178     if (NULL != frag_ctx->cont)
1179       cont_calls ++;
1180     s->frag_ctx = frag_ctx;
1181   }
1182
1183   if (s->addrlen == sizeof (struct sockaddr_in))
1184   {
1185     if (plugin->with_v4_ws == GNUNET_NO)
1186     {
1187       if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1188         GNUNET_SCHEDULER_cancel(plugin->select_task);
1189
1190       plugin->select_task =
1191           GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1192                                        GNUNET_TIME_UNIT_FOREVER_REL,
1193                                        plugin->rs_v4,
1194                                        plugin->ws_v4,
1195                                        &udp_plugin_select, plugin);
1196       plugin->with_v4_ws = GNUNET_YES;
1197     }
1198   }
1199
1200   else if (s->addrlen == sizeof (struct sockaddr_in6))
1201   {
1202     if (plugin->with_v6_ws == GNUNET_NO)
1203     {
1204       if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1205         GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
1206
1207       plugin->select_task_v6 =
1208         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1209                                      GNUNET_TIME_UNIT_FOREVER_REL,
1210                                      plugin->rs_v6,
1211                                      plugin->ws_v6,
1212                                      &udp_plugin_select_v6, plugin);
1213       plugin->with_v6_ws = GNUNET_YES;
1214     }
1215   }
1216
1217   return mlen;
1218 }
1219
1220
1221 /**
1222  * Our external IP address/port mapping has changed.
1223  *
1224  * @param cls closure, the 'struct LocalAddrList'
1225  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1226  *     the previous (now invalid) one
1227  * @param addr either the previous or the new public IP address
1228  * @param addrlen actual lenght of the address
1229  */
1230 static void
1231 udp_nat_port_map_callback (void *cls, int add_remove,
1232                            const struct sockaddr *addr, socklen_t addrlen)
1233 {
1234   struct Plugin *plugin = cls;
1235   struct IPv4UdpAddress u4;
1236   struct IPv6UdpAddress u6;
1237   void *arg;
1238   size_t args;
1239
1240   /* convert 'addr' to our internal format */
1241   switch (addr->sa_family)
1242   {
1243   case AF_INET:
1244     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1245     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1246     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1247     arg = &u4;
1248     args = sizeof (u4);
1249     break;
1250   case AF_INET6:
1251     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1252     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1253             sizeof (struct in6_addr));
1254     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1255     arg = &u6;
1256     args = sizeof (u6);
1257     break;
1258   default:
1259     GNUNET_break (0);
1260     return;
1261   }
1262   /* modify our published address list */
1263   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args);
1264 }
1265
1266
1267
1268 /**
1269  * Message tokenizer has broken up an incomming message. Pass it on
1270  * to the service.
1271  *
1272  * @param cls the 'struct Plugin'
1273  * @param client the 'struct SourceInformation'
1274  * @param hdr the actual message
1275  */
1276 static void
1277 process_inbound_tokenized_messages (void *cls, void *client,
1278                                     const struct GNUNET_MessageHeader *hdr)
1279 {
1280   struct Plugin *plugin = cls;
1281   struct SourceInformation *si = client;
1282   struct GNUNET_ATS_Information ats[2];
1283   struct GNUNET_TIME_Relative delay;
1284
1285   GNUNET_assert (si->session != NULL);
1286   if (GNUNET_YES == si->session->in_destroy)
1287     return; 
1288   /* setup ATS */
1289   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1290   ats[0].value = htonl (1);
1291   ats[1] = si->session->ats;
1292   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1293
1294   delay = plugin->env->receive (plugin->env->cls,
1295                 &si->sender,
1296                 hdr,
1297                 (const struct GNUNET_ATS_Information *) &ats, 2,
1298                 NULL,
1299                 si->arg,
1300                 si->args);
1301   si->session->flow_delay_for_other_peer = delay;
1302 }
1303
1304
1305 /**
1306  * We've received a UDP Message.  Process it (pass contents to main service).
1307  *
1308  * @param plugin plugin context
1309  * @param msg the message
1310  * @param sender_addr sender address
1311  * @param sender_addr_len number of bytes in sender_addr
1312  */
1313 static void
1314 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1315                      const struct sockaddr *sender_addr,
1316                      socklen_t sender_addr_len)
1317 {
1318   struct SourceInformation si;
1319   struct Session * s;
1320   struct IPv4UdpAddress u4;
1321   struct IPv6UdpAddress u6;
1322   const void *arg;
1323   size_t args;
1324
1325   if (0 != ntohl (msg->reserved))
1326   {
1327     GNUNET_break_op (0);
1328     return;
1329   }
1330   if (ntohs (msg->header.size) <
1331       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1332   {
1333     GNUNET_break_op (0);
1334     return;
1335   }
1336
1337   /* convert address */
1338   switch (sender_addr->sa_family)
1339   {
1340   case AF_INET:
1341     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1342     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1343     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1344     arg = &u4;
1345     args = sizeof (u4);
1346     break;
1347   case AF_INET6:
1348     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1349     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1350     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1351     arg = &u6;
1352     args = sizeof (u6);
1353     break;
1354   default:
1355     GNUNET_break (0);
1356     return;
1357   }
1358   LOG (GNUNET_ERROR_TYPE_DEBUG,
1359        "Received message with %u bytes from peer `%s' at `%s'\n",
1360        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1361        GNUNET_a2s (sender_addr, sender_addr_len));
1362
1363   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1364   s = udp_plugin_get_session(plugin, address);
1365   GNUNET_free (address);
1366
1367   /* iterate over all embedded messages */
1368   si.session = s;
1369   si.sender = msg->sender;
1370   si.arg = arg;
1371   si.args = args;
1372   s->rc++;
1373   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1374                              ntohs (msg->header.size) -
1375                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1376   s->rc--;
1377   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1378     free_session (s);
1379 }
1380
1381
1382 /**
1383  * Scan the heap for a receive context with the given address.
1384  *
1385  * @param cls the 'struct FindReceiveContext'
1386  * @param node internal node of the heap
1387  * @param element value stored at the node (a 'struct ReceiveContext')
1388  * @param cost cost associated with the node
1389  * @return GNUNET_YES if we should continue to iterate,
1390  *         GNUNET_NO if not.
1391  */
1392 static int
1393 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1394                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1395 {
1396   struct FindReceiveContext *frc = cls;
1397   struct DefragContext *e = element;
1398
1399   if ((frc->addr_len == e->addr_len) &&
1400       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1401   {
1402     frc->rc = e;
1403     return GNUNET_NO;
1404   }
1405   return GNUNET_YES;
1406 }
1407
1408
1409 /**
1410  * Process a defragmented message.
1411  *
1412  * @param cls the 'struct ReceiveContext'
1413  * @param msg the message
1414  */
1415 static void
1416 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1417 {
1418   struct DefragContext *rc = cls;
1419
1420   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1421   {
1422     GNUNET_break (0);
1423     return;
1424   }
1425   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1426   {
1427     GNUNET_break (0);
1428     return;
1429   }
1430   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1431                        rc->src_addr, rc->addr_len);
1432 }
1433
1434 struct LookupContext
1435 {
1436   const struct sockaddr * addr;
1437   size_t addrlen;
1438
1439   struct Session *res;
1440 };
1441
1442 static int
1443 lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value)
1444 {
1445   struct LookupContext *l_ctx = cls;
1446   struct Session * s = value;
1447
1448   if ((s->addrlen == l_ctx->addrlen) &&
1449       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1450   {
1451     l_ctx->res = s;
1452     return GNUNET_NO;
1453   }
1454   return GNUNET_YES;
1455 }
1456
1457 /**
1458  * Transmit an acknowledgement.
1459  *
1460  * @param cls the 'struct ReceiveContext'
1461  * @param id message ID (unused)
1462  * @param msg ack to transmit
1463  */
1464 static void
1465 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1466 {
1467   struct DefragContext *rc = cls;
1468
1469   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1470   struct UDP_ACK_Message *udp_ack;
1471   uint32_t delay = 0;
1472   struct UDPMessageWrapper *udpw;
1473   struct Session *s;
1474
1475   struct LookupContext l_ctx;
1476   l_ctx.addr = rc->src_addr;
1477   l_ctx.addrlen = rc->addr_len;
1478   l_ctx.res = NULL;
1479   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1480       &lookup_session_by_addr_it,
1481       &l_ctx);
1482   s = l_ctx.res;
1483
1484   if (NULL == s)
1485     return;
1486
1487   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1488     delay = s->flow_delay_for_other_peer.rel_value;
1489
1490   LOG (GNUNET_ERROR_TYPE_DEBUG,
1491        "Sending ACK to `%s' including delay of %u ms\n",
1492        GNUNET_a2s (rc->src_addr,
1493                    (rc->src_addr->sa_family ==
1494                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1495                                                                      sockaddr_in6)),
1496        delay);
1497   udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
1498   udpw->cont = NULL;
1499   udpw->cont_cls = NULL;
1500   udpw->frag_ctx = NULL;
1501   udpw->msg_size = msize;
1502   udpw->session = s;
1503   udpw->timeout = GNUNET_TIME_absolute_get_forever();
1504   udpw->udp = (char *)&udpw[1];
1505
1506   udp_ack = (struct UDP_ACK_Message *) udpw->udp;
1507   udp_ack->header.size = htons ((uint16_t) msize);
1508   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1509   udp_ack->delay = htonl (delay);
1510   udp_ack->sender = *rc->plugin->env->my_identity;
1511   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1512
1513   enqueue (rc->plugin, udpw);
1514 }
1515
1516
1517 static void read_process_msg (struct Plugin *plugin,
1518     const struct GNUNET_MessageHeader *msg,
1519     char *addr,
1520     socklen_t fromlen)
1521 {
1522   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1523   {
1524     GNUNET_break_op (0);
1525     return;
1526   }
1527   process_udp_message (plugin, (const struct UDPMessage *) msg,
1528                        (const struct sockaddr *) addr, fromlen);
1529   return;
1530 }
1531
1532 static void read_process_ack (struct Plugin *plugin,
1533     const struct GNUNET_MessageHeader *msg,
1534     char *addr,
1535     socklen_t fromlen)
1536 {
1537   const struct GNUNET_MessageHeader *ack;
1538   const struct UDP_ACK_Message *udp_ack;
1539   struct LookupContext l_ctx;
1540   struct Session *s = NULL;
1541   struct GNUNET_TIME_Relative flow_delay;
1542
1543   if (ntohs (msg->size) <
1544       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1545   {
1546     GNUNET_break_op (0);
1547     return;
1548   }
1549
1550   udp_ack = (const struct UDP_ACK_Message *) msg;
1551
1552   l_ctx.addr = (const struct sockaddr *) addr;
1553   l_ctx.addrlen = fromlen;
1554   l_ctx.res = NULL;
1555   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1556       &lookup_session_by_addr_it,
1557       &l_ctx);
1558   s = l_ctx.res;
1559
1560   if ((s == NULL) || (s->frag_ctx == NULL))
1561     return;
1562
1563   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1564   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1565        flow_delay.rel_value);
1566   s->flow_delay_from_other_peer =
1567       GNUNET_TIME_relative_to_absolute (flow_delay);
1568
1569   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1570   if (ntohs (ack->size) !=
1571       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1572   {
1573     GNUNET_break_op (0);
1574     return;
1575   }
1576
1577   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1578   {
1579   LOG (GNUNET_ERROR_TYPE_DEBUG,
1580        "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1581        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1582        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1583     return;
1584   }
1585
1586   LOG (GNUNET_ERROR_TYPE_DEBUG,
1587        "FULL MESSAGE ACKed\n",
1588        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1589        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1590   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1591
1592   struct UDPMessageWrapper * udpw = NULL;
1593   struct UDPMessageWrapper * tmp = NULL;
1594   if (s->addrlen == sizeof (struct sockaddr_in6))
1595   {
1596     udpw = plugin->ipv6_queue_head;
1597     while (udpw!= NULL)
1598     {
1599       tmp = udpw->next;
1600       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1601       {
1602         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1603         GNUNET_free (udpw);
1604       }
1605       udpw = tmp;
1606     }
1607   }
1608   if (s->addrlen == sizeof (struct sockaddr_in))
1609   {
1610     udpw = plugin->ipv4_queue_head;
1611     while (udpw!= NULL)
1612     {
1613       tmp = udpw->next;
1614       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1615       {
1616         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1617         GNUNET_free (udpw);
1618       }
1619       udpw = tmp;
1620     }
1621   }
1622
1623   if (s->frag_ctx->cont != NULL)
1624   {
1625     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1626         "Calling continuation for fragmented message to `%s' with result %s\n",
1627         GNUNET_i2s (&s->target), "OK");
1628     s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
1629     cont_calls --;
1630   }
1631
1632   GNUNET_free (s->frag_ctx);
1633   s->frag_ctx = NULL;
1634   return;
1635 }
1636
1637 static void read_process_fragment (struct Plugin *plugin,
1638     const struct GNUNET_MessageHeader *msg,
1639     char *addr,
1640     socklen_t fromlen)
1641 {
1642   struct DefragContext *d_ctx;
1643   struct GNUNET_TIME_Absolute now;
1644   struct FindReceiveContext frc;
1645
1646
1647   frc.rc = NULL;
1648   frc.addr = (const struct sockaddr *) addr;
1649   frc.addr_len = fromlen;
1650
1651   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1652        (unsigned int) ntohs (msg->size),
1653        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1654   /* Lookup existing receive context for this address */
1655   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1656                                  &find_receive_context,
1657                                  &frc);
1658   now = GNUNET_TIME_absolute_get ();
1659   d_ctx = frc.rc;
1660
1661   if (d_ctx == NULL)
1662   {
1663     /* Create a new defragmentation context */
1664     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1665     memcpy (&d_ctx[1], addr, fromlen);
1666     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1667     d_ctx->addr_len = fromlen;
1668     d_ctx->plugin = plugin;
1669     d_ctx->defrag =
1670         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1671                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1672                                           &fragment_msg_proc, &ack_proc);
1673     d_ctx->hnode =
1674         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1675                                       (GNUNET_CONTAINER_HeapCostType)
1676                                       now.abs_value);
1677   LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n",
1678        (unsigned int) ntohs (msg->size),
1679        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1680   }
1681   else
1682   {
1683   LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1684        (unsigned int) ntohs (msg->size),
1685        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1686   }
1687
1688   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1689   {
1690     /* keep this 'rc' from expiring */
1691     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1692                                        (GNUNET_CONTAINER_HeapCostType)
1693                                        now.abs_value);
1694   }
1695   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1696       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1697   {
1698     /* remove 'rc' that was inactive the longest */
1699     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1700     GNUNET_assert (NULL != d_ctx);
1701     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1702     GNUNET_free (d_ctx);
1703   }
1704 }
1705
1706 /**
1707  * Read and process a message from the given socket.
1708  *
1709  * @param plugin the overall plugin
1710  * @param rsock socket to read from
1711  */
1712 static void
1713 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1714 {
1715   socklen_t fromlen;
1716   char addr[32];
1717   char buf[65536] GNUNET_ALIGN;
1718   ssize_t size;
1719   const struct GNUNET_MessageHeader *msg;
1720
1721   fromlen = sizeof (addr);
1722   memset (&addr, 0, sizeof (addr));
1723   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1724                                       (struct sockaddr *) &addr, &fromlen);
1725
1726   if (size < sizeof (struct GNUNET_MessageHeader))
1727   {
1728     GNUNET_break_op (0);
1729     return;
1730   }
1731   msg = (const struct GNUNET_MessageHeader *) buf;
1732
1733   LOG (GNUNET_ERROR_TYPE_DEBUG,
1734        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1735        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1736
1737   if (size != ntohs (msg->size))
1738   {
1739     GNUNET_break_op (0);
1740     return;
1741   }
1742
1743   switch (ntohs (msg->type))
1744   {
1745   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1746     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1747     return;
1748
1749   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1750     read_process_msg (plugin, msg, addr, fromlen);
1751     return;
1752
1753   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1754     read_process_ack (plugin, msg, addr, fromlen);
1755     return;
1756
1757   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1758     read_process_fragment (plugin, msg, addr, fromlen);
1759     return;
1760
1761   default:
1762     GNUNET_break_op (0);
1763     return;
1764   }
1765 }
1766
1767
1768 static size_t
1769 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
1770 {
1771   ssize_t sent;
1772   size_t slen;
1773   struct GNUNET_TIME_Absolute max;
1774   struct GNUNET_TIME_Absolute ;
1775
1776   struct UDPMessageWrapper *udpw = NULL;
1777
1778   if (sock == plugin->sockv4)
1779   {
1780     udpw = plugin->ipv4_queue_head;
1781   }
1782   else if (sock == plugin->sockv6)
1783   {
1784     udpw = plugin->ipv6_queue_head;
1785   }
1786   else
1787   {
1788     GNUNET_break (0);
1789     return 0;
1790   }
1791
1792   const struct sockaddr * sa = udpw->session->sock_addr;
1793   slen = udpw->session->addrlen;
1794
1795   max = GNUNET_TIME_absolute_max(udpw->timeout, GNUNET_TIME_absolute_get());
1796
1797   while (udpw != NULL)
1798   {
1799     if (max.abs_value != udpw->timeout.abs_value)
1800     {
1801       /* Message timed out */
1802       call_continuation(udpw, GNUNET_SYSERR);
1803       if (udpw->frag_ctx != NULL)
1804       {
1805         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n",
1806             GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send);
1807         udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag);
1808         GNUNET_free (udpw->frag_ctx);
1809         udpw->session->frag_ctx = NULL;
1810       }
1811       else
1812       {
1813         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' with size %u timed out\n",
1814             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1815       }
1816
1817       if (sock == plugin->sockv4)
1818       {
1819         GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1820         GNUNET_free (udpw);
1821         udpw = plugin->ipv4_queue_head;
1822       }
1823       else if (sock == plugin->sockv6)
1824       {
1825         GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1826         GNUNET_free (udpw);
1827         udpw = plugin->ipv6_queue_head;
1828       }
1829     }
1830     else
1831     {
1832       struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
1833       if (delta.rel_value == 0)
1834       {
1835         /* this message is not delayed */
1836         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is not delayed \n",
1837             GNUNET_i2s(&udpw->session->target), udpw->msg_size);
1838         break;
1839       }
1840       else
1841       {
1842         /* this message is delayed, try next */
1843         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is delayed for %llu \n",
1844             GNUNET_i2s(&udpw->session->target), udpw->msg_size,
1845             delta);
1846         udpw = udpw->next;
1847       }
1848     }
1849   }
1850
1851   if (udpw == NULL)
1852   {
1853     /* No message left */
1854     return 0;
1855   }
1856
1857   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->udp, udpw->msg_size, sa, slen);
1858
1859   if (GNUNET_SYSERR == sent)
1860   {
1861     LOG (GNUNET_ERROR_TYPE_ERROR,
1862          "UDP could not transmit %u-byte message to `%s': `%s'\n",
1863          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen),
1864          STRERROR (errno));
1865     call_continuation(udpw, GNUNET_SYSERR);
1866   }
1867   else
1868   {
1869     LOG (GNUNET_ERROR_TYPE_DEBUG,
1870          "UDP transmitted %u-byte message to `%s' (%d: %s)\n",
1871          (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent,
1872          (sent < 0) ? STRERROR (errno) : "ok");
1873     call_continuation(udpw, GNUNET_OK);
1874   }
1875
1876   if (sock == plugin->sockv4)
1877     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
1878   else if (sock == plugin->sockv6)
1879     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
1880   GNUNET_free (udpw);
1881   udpw = NULL;
1882
1883   return sent;
1884 }
1885
1886 /**
1887  * We have been notified that our readset has something to read.  We don't
1888  * know which socket needs to be read, so we have to check each one
1889  * Then reschedule this function to be called again once more is available.
1890  *
1891  * @param cls the plugin handle
1892  * @param tc the scheduling context (for rescheduling this function again)
1893  */
1894 static void
1895 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1896 {
1897   struct Plugin *plugin = cls;
1898
1899   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
1900   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1901     return;
1902   plugin->with_v4_ws = GNUNET_NO;
1903
1904   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1905   {
1906     if ((NULL != plugin->sockv4) &&
1907       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
1908         udp_select_read (plugin, plugin->sockv4);
1909
1910   }
1911
1912   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1913   {
1914     if ((NULL != plugin->sockv4) && (plugin->ipv4_queue_head != NULL) &&
1915       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
1916       {
1917         udp_select_send (plugin, plugin->sockv4);
1918       }
1919   }
1920
1921   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
1922     GNUNET_SCHEDULER_cancel (plugin->select_task);
1923   plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1924                                    GNUNET_TIME_UNIT_FOREVER_REL,
1925                                    plugin->rs_v4,
1926                                    (plugin->ipv4_queue_head != NULL) ? plugin->ws_v4 : NULL,
1927                                    &udp_plugin_select, plugin);
1928   if (plugin->ipv4_queue_head != NULL)
1929     plugin->with_v4_ws = GNUNET_YES;
1930   else
1931     plugin->with_v4_ws = GNUNET_NO;
1932 }
1933
1934
1935 /**
1936  * We have been notified that our readset has something to read.  We don't
1937  * know which socket needs to be read, so we have to check each one
1938  * Then reschedule this function to be called again once more is available.
1939  *
1940  * @param cls the plugin handle
1941  * @param tc the scheduling context (for rescheduling this function again)
1942  */
1943 static void
1944 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1945 {
1946   struct Plugin *plugin = cls;
1947
1948   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
1949   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1950     return;
1951
1952   plugin->with_v6_ws = GNUNET_NO;
1953   if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
1954   {
1955     if ((NULL != plugin->sockv6) &&
1956       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
1957         udp_select_read (plugin, plugin->sockv6);
1958   }
1959
1960   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1961   {
1962     if ((NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
1963       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)))
1964       {
1965         udp_select_send (plugin, plugin->sockv6);
1966       }
1967   }
1968   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
1969     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
1970   plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1971                                    GNUNET_TIME_UNIT_FOREVER_REL,
1972                                    plugin->rs_v6,
1973                                    (plugin->ipv6_queue_head != NULL) ? plugin->ws_v6 : NULL,
1974                                    &udp_plugin_select_v6, plugin);
1975   if (plugin->ipv6_queue_head != NULL)
1976     plugin->with_v6_ws = GNUNET_YES;
1977   else
1978     plugin->with_v6_ws = GNUNET_NO;
1979 }
1980
1981
1982 static int
1983 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
1984 {
1985   int tries;
1986   int sockets_created = 0;
1987   struct sockaddr *serverAddr;
1988   struct sockaddr *addrs[2];
1989   socklen_t addrlens[2];
1990   socklen_t addrlen;
1991
1992   /* Create IPv6 socket */
1993   if (plugin->enable_ipv6 == GNUNET_YES)
1994   {
1995     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
1996     if (NULL == plugin->sockv6)
1997     {
1998       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
1999       plugin->enable_ipv6 = GNUNET_NO;
2000     }
2001     else
2002     {
2003 #if HAVE_SOCKADDR_IN_SIN_LEN
2004       serverAddrv6->sin6_len = sizeof (serverAddrv6);
2005 #endif
2006       serverAddrv6->sin6_family = AF_INET6;
2007       serverAddrv6->sin6_addr = in6addr_any;
2008       serverAddrv6->sin6_port = htons (plugin->port);
2009       addrlen = sizeof (struct sockaddr_in6);
2010       serverAddr = (struct sockaddr *) serverAddrv6;
2011       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
2012            ntohs (serverAddrv6->sin6_port));
2013       tries = 0;
2014       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
2015              GNUNET_OK)
2016       {
2017         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
2018         LOG (GNUNET_ERROR_TYPE_DEBUG,
2019              "IPv6 Binding failed, trying new port %d\n",
2020              ntohs (serverAddrv6->sin6_port));
2021         tries++;
2022         if (tries > 10)
2023         {
2024           GNUNET_NETWORK_socket_close (plugin->sockv6);
2025           plugin->sockv6 = NULL;
2026           break;
2027         }
2028       }
2029       if (plugin->sockv6 != NULL)
2030       {
2031         LOG (GNUNET_ERROR_TYPE_DEBUG,
2032              "IPv6 socket created on port %d\n",
2033              ntohs (serverAddrv6->sin6_port));
2034         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
2035         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2036         sockets_created++;
2037       }
2038     }
2039   }
2040
2041   /* Create IPv4 socket */
2042   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2043   if (NULL == plugin->sockv4)
2044   {
2045     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2046   }
2047   else
2048   {
2049 #if HAVE_SOCKADDR_IN_SIN_LEN
2050     serverAddrv4->sin_len = sizeof (serverAddrv4);
2051 #endif
2052     serverAddrv4->sin_family = AF_INET;
2053     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2054     serverAddrv4->sin_port = htons (plugin->port);
2055     addrlen = sizeof (struct sockaddr_in);
2056     serverAddr = (struct sockaddr *) serverAddrv4;
2057
2058     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2059          ntohs (serverAddrv4->sin_port));
2060     tries = 0;
2061     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2062            GNUNET_OK)
2063     {
2064       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2065       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2066            ntohs (serverAddrv4->sin_port));
2067       tries++;
2068       if (tries > 10)
2069       {
2070         GNUNET_NETWORK_socket_close (plugin->sockv4);
2071         plugin->sockv4 = NULL;
2072         break;
2073       }
2074     }
2075     if (plugin->sockv4 != NULL)
2076     {
2077       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2078       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2079       sockets_created++;
2080     }
2081   }
2082
2083   /* Create file descriptors */
2084   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2085   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2086   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2087   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2088   if (NULL != plugin->sockv4)
2089   {
2090     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2091     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2092   }
2093
2094   if (sockets_created == 0)
2095     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2096
2097   plugin->select_task =
2098       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2099                                    GNUNET_TIME_UNIT_FOREVER_REL,
2100                                    plugin->rs_v4,
2101                                    NULL,
2102                                    &udp_plugin_select, plugin);
2103   plugin->with_v4_ws = GNUNET_NO;
2104
2105   if (plugin->enable_ipv6 == GNUNET_YES)
2106   {
2107     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2108     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2109     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2110     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2111     if (NULL != plugin->sockv6)
2112     {
2113       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2114       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2115     }
2116
2117     plugin->select_task_v6 =
2118         GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
2119                                      GNUNET_TIME_UNIT_FOREVER_REL,
2120                                      plugin->rs_v6,
2121                                      NULL,
2122                                      &udp_plugin_select_v6, plugin);
2123     plugin->with_v6_ws = GNUNET_NO;
2124   }
2125
2126   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2127                            GNUNET_NO, plugin->port,
2128                            sockets_created,
2129                            (const struct sockaddr **) addrs, addrlens,
2130                            &udp_nat_port_map_callback, NULL, plugin);
2131
2132   return sockets_created;
2133 }
2134
2135
2136 /**
2137  * The exported method. Makes the core api available via a global and
2138  * returns the udp transport API.
2139  *
2140  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2141  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2142  */
2143 void *
2144 libgnunet_plugin_transport_udp_init (void *cls)
2145 {
2146   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2147   struct GNUNET_TRANSPORT_PluginFunctions *api;
2148   struct Plugin *plugin;
2149
2150   unsigned long long port;
2151   unsigned long long aport;
2152   unsigned long long broadcast;
2153   unsigned long long udp_max_bps;
2154   unsigned long long enable_v6;
2155   char * bind4_address;
2156   char * bind6_address;
2157   struct GNUNET_TIME_Relative interval;
2158
2159   struct sockaddr_in serverAddrv4;
2160   struct sockaddr_in6 serverAddrv6;
2161
2162   int res;
2163
2164   if (NULL == env->receive)
2165   {
2166     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2167        initialze the plugin or the API */
2168     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2169     api->cls = NULL;
2170     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2171     api->address_to_string = &udp_address_to_string;
2172     api->string_to_address = &udp_string_to_address;
2173     return api;
2174   }
2175
2176   GNUNET_assert( NULL != env->stats);
2177
2178   /* Get port number */
2179   if (GNUNET_OK !=
2180       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2181                                              &port))
2182     port = 2086;
2183   if (GNUNET_OK !=
2184       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2185                                              "ADVERTISED_PORT", &aport))
2186     aport = port;
2187   if (port > 65535)
2188   {
2189     LOG (GNUNET_ERROR_TYPE_WARNING,
2190          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2191          65535);
2192     return NULL;
2193   }
2194
2195   /* Protocols */
2196   if ((GNUNET_YES ==
2197        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2198                                              "DISABLEV6")))
2199   {
2200     enable_v6 = GNUNET_NO;
2201   }
2202   else
2203     enable_v6 = GNUNET_YES;
2204
2205
2206   /* Addresses */
2207   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2208   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2209
2210   if (GNUNET_YES ==
2211       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2212                                              "BINDTO", &bind4_address))
2213   {
2214     LOG (GNUNET_ERROR_TYPE_DEBUG,
2215          "Binding udp plugin to specific address: `%s'\n",
2216          bind4_address);
2217     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2218     {
2219       GNUNET_free (bind4_address);
2220       return NULL;
2221     }
2222   }
2223
2224   if (GNUNET_YES ==
2225       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2226                                              "BINDTO6", &bind6_address))
2227   {
2228     LOG (GNUNET_ERROR_TYPE_DEBUG,
2229          "Binding udp plugin to specific address: `%s'\n",
2230          bind6_address);
2231     if (1 !=
2232         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2233     {
2234       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2235            bind6_address);
2236       GNUNET_free_non_null (bind4_address);
2237       GNUNET_free (bind6_address);
2238       return NULL;
2239     }
2240   }
2241
2242
2243   /* Enable neighbour discovery */
2244   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2245                                             "BROADCAST");
2246   if (broadcast == GNUNET_SYSERR)
2247     broadcast = GNUNET_NO;
2248
2249   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-udp",
2250                                            "BROADCAST_INTERVAL", &interval))
2251   {
2252     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2253   }
2254
2255   /* Maximum datarate */
2256   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2257                                              "MAX_BPS", &udp_max_bps))
2258   {
2259     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2260   }
2261
2262   plugin = GNUNET_malloc (sizeof (struct Plugin));
2263   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2264
2265   GNUNET_BANDWIDTH_tracker_init (&plugin->tracker,
2266                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2267
2268
2269   plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
2270   plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2271   plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
2272   plugin->port = port;
2273   plugin->aport = aport;
2274   plugin->broadcast_interval = interval;
2275   plugin->enable_ipv6 = enable_v6;
2276   plugin->env = env;
2277
2278   api->cls = plugin;
2279   api->send = NULL;
2280   api->disconnect = &udp_disconnect;
2281   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2282   api->address_to_string = &udp_address_to_string;
2283   api->string_to_address = &udp_string_to_address;
2284   api->check_address = &udp_plugin_check_address;
2285   api->get_session = &udp_plugin_get_session;
2286   api->send = &udp_plugin_send;
2287
2288   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2289   res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
2290   if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
2291   {
2292     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2293     GNUNET_free (plugin);
2294     GNUNET_free (api);
2295     return NULL;
2296   }
2297
2298   if (broadcast == GNUNET_YES)
2299   {
2300     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2301     setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
2302   }
2303
2304   GNUNET_free_non_null (bind4_address);
2305   GNUNET_free_non_null (bind6_address);
2306   return api;
2307 }
2308
2309
2310 static int
2311 heap_cleanup_iterator (void *cls,
2312                        struct GNUNET_CONTAINER_HeapNode *
2313                        node, void *element,
2314                        GNUNET_CONTAINER_HeapCostType
2315                        cost)
2316 {
2317   struct DefragContext * d_ctx = element;
2318
2319   GNUNET_CONTAINER_heap_remove_node (node);
2320   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2321   GNUNET_free (d_ctx);
2322
2323   return GNUNET_YES;
2324 }
2325
2326
2327 /**
2328  * The exported method. Makes the core api available via a global and
2329  * returns the udp transport API.
2330  *
2331  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2332  * @return NULL
2333  */
2334 void *
2335 libgnunet_plugin_transport_udp_done (void *cls)
2336 {
2337   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2338   struct Plugin *plugin = api->cls;
2339
2340   if (NULL == plugin)
2341   {
2342     GNUNET_free (api);
2343     return NULL;
2344   }
2345
2346   stop_broadcast (plugin);
2347
2348   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2349   {
2350     GNUNET_SCHEDULER_cancel (plugin->select_task);
2351     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2352   }
2353   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2354   {
2355     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2356     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2357   }
2358
2359   /* Closing sockets */
2360   if (plugin->sockv4 != NULL)
2361   {
2362     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2363     plugin->sockv4 = NULL;
2364   }
2365   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2366   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2367
2368   if (plugin->sockv6 != NULL)
2369   {
2370     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2371     plugin->sockv6 = NULL;
2372
2373     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2374     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2375   }
2376
2377   GNUNET_NAT_unregister (plugin->nat);
2378
2379   if (plugin->defrag_ctxs != NULL)
2380   {
2381     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2382         heap_cleanup_iterator, NULL);
2383     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2384     plugin->defrag_ctxs = NULL;
2385   }
2386   if (plugin->mst != NULL)
2387   {
2388     GNUNET_SERVER_mst_destroy(plugin->mst);
2389     plugin->mst = NULL;
2390   }
2391
2392   /* Clean up leftover messages */
2393   struct UDPMessageWrapper * udpw;
2394   udpw = plugin->ipv4_queue_head;
2395   while (udpw != NULL)
2396   {
2397     struct UDPMessageWrapper *tmp = udpw->next;
2398     GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
2399     call_continuation(udpw, GNUNET_SYSERR);
2400     GNUNET_free (udpw);
2401     udpw = tmp;
2402   }
2403   udpw = plugin->ipv6_queue_head;
2404   while (udpw != NULL)
2405   {
2406     struct UDPMessageWrapper *tmp = udpw->next;
2407     GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
2408     call_continuation(udpw, GNUNET_SYSERR);
2409     GNUNET_free (udpw);
2410     udpw = tmp;
2411   }
2412
2413   /* Clean up sessions */
2414   LOG (GNUNET_ERROR_TYPE_DEBUG,
2415        "Cleaning up sessions\n");
2416   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2417   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2418
2419   plugin->nat = NULL;
2420   GNUNET_free (plugin);
2421   GNUNET_free (api);
2422   GNUNET_assert (0 == cont_calls);
2423   return NULL;
2424 }
2425
2426
2427 /* end of plugin_transport_udp.c */