new operation queue for limiting overlay connects
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 struct Session
90 {
91   /**
92    * Which peer is this session for?
93    */
94   struct GNUNET_PeerIdentity target;
95
96   struct UDP_FragmentationContext * frag_ctx;
97
98   /**
99    * Address of the other peer
100    */
101   const struct sockaddr *sock_addr;
102
103   /**
104    * Desired delay for next sending we send to other peer
105    */
106   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
107
108   /**
109    * Desired delay for next sending we received from other peer
110    */
111   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
112
113   /**
114    * Session timeout task
115    */
116   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
117
118   /**
119    * expected delay for ACKs
120    */
121   struct GNUNET_TIME_Relative last_expected_delay;
122
123   struct GNUNET_ATS_Information ats;
124
125   size_t addrlen;
126
127
128   unsigned int rc;
129
130   int in_destroy;
131 };
132
133
134 struct SessionCompareContext
135 {
136   struct Session *res;
137   const struct GNUNET_HELLO_Address *addr;
138 };
139
140
141 /**
142  * Closure for 'process_inbound_tokenized_messages'
143  */
144 struct SourceInformation
145 {
146   /**
147    * Sender identity.
148    */
149   struct GNUNET_PeerIdentity sender;
150
151   /**
152    * Source address.
153    */
154   const void *arg;
155
156   struct Session *session;
157   /**
158    * Number of bytes in source address.
159    */
160   size_t args;
161
162 };
163
164
165 /**
166  * Closure for 'find_receive_context'.
167  */
168 struct FindReceiveContext
169 {
170   /**
171    * Where to store the result.
172    */
173   struct DefragContext *rc;
174
175   /**
176    * Address to find.
177    */
178   const struct sockaddr *addr;
179
180   struct Session *session;
181
182   /**
183    * Number of bytes in 'addr'.
184    */
185   socklen_t addr_len;
186
187 };
188
189
190
191 /**
192  * Data structure to track defragmentation contexts based
193  * on the source of the UDP traffic.
194  */
195 struct DefragContext
196 {
197
198   /**
199    * Defragmentation context.
200    */
201   struct GNUNET_DEFRAGMENT_Context *defrag;
202
203   /**
204    * Source address this receive context is for (allocated at the
205    * end of the struct).
206    */
207   const struct sockaddr *src_addr;
208
209   /**
210    * Reference to master plugin struct.
211    */
212   struct Plugin *plugin;
213
214   /**
215    * Node in the defrag heap.
216    */
217   struct GNUNET_CONTAINER_HeapNode *hnode;
218
219   /**
220    * Length of 'src_addr'
221    */
222   size_t addr_len;
223 };
224
225
226
227 /**
228  * Context to send fragmented messages
229  */
230 struct UDP_FragmentationContext
231 {
232   /**
233    * Next in linked list
234    */
235   struct UDP_FragmentationContext * next;
236
237   /**
238    * Previous in linked list
239    */
240   struct UDP_FragmentationContext * prev;
241
242   /**
243    * The plugin
244    */
245   struct Plugin * plugin;
246
247   /**
248    * Handle for GNUNET_FRAGMENT context
249    */
250   struct GNUNET_FRAGMENT_Context * frag;
251
252   /**
253    * The session this fragmentation context belongs to
254    */
255   struct Session * session;
256
257   /**
258    * Function to call upon completion of the transmission.
259    */
260   GNUNET_TRANSPORT_TransmitContinuation cont;
261
262   /**
263    * Closure for 'cont'.
264    */
265   void *cont_cls;
266
267   /**
268    * Message timeout
269    */
270   struct GNUNET_TIME_Absolute timeout;
271
272   /**
273    * Payload size of original unfragmented message
274    */
275   size_t payload_size;
276
277   /**
278    * Bytes used to send all fragments on wire including UDP overhead
279    */
280   size_t on_wire_size;
281 };
282
283
284 struct UDP_MessageWrapper
285 {
286   /**
287    * Session this message belongs to
288    */
289   struct Session *session;
290
291   /**
292    * DLL of messages
293    * previous element
294    */
295   struct UDP_MessageWrapper *prev;
296
297   /**
298    * DLL of messages
299    * previous element
300    */
301   struct UDP_MessageWrapper *next;
302
303   /**
304    * Message with size msg_size including UDP specific overhead
305    */
306   char *msg_buf;
307
308   /**
309    * Size of UDP message to send including UDP specific overhead
310    */
311   size_t msg_size;
312
313   /**
314    * Payload size of original message
315    */
316   size_t payload_size;
317
318   /**
319    * Message timeout
320    */
321   struct GNUNET_TIME_Absolute timeout;
322
323   /**
324    * Function to call upon completion of the transmission.
325    */
326   GNUNET_TRANSPORT_TransmitContinuation cont;
327
328   /**
329    * Closure for 'cont'.
330    */
331   void *cont_cls;
332
333   /**
334    * Fragmentation context
335    * frag_ctx == NULL if transport <= MTU
336    * frag_ctx != NULL if transport > MTU
337    */
338   struct UDP_FragmentationContext *frag_ctx;
339 };
340
341
342 /**
343  * UDP ACK Message-Packet header (after defragmentation).
344  */
345 struct UDP_ACK_Message
346 {
347   /**
348    * Message header.
349    */
350   struct GNUNET_MessageHeader header;
351
352   /**
353    * Desired delay for flow control
354    */
355   uint32_t delay;
356
357   /**
358    * What is the identity of the sender
359    */
360   struct GNUNET_PeerIdentity sender;
361
362 };
363
364 /**
365  * Encapsulation of all of the state of the plugin.
366  */
367 struct Plugin * plugin;
368
369
370 /**
371  * We have been notified that our readset has something to read.  We don't
372  * know which socket needs to be read, so we have to check each one
373  * Then reschedule this function to be called again once more is available.
374  *
375  * @param cls the plugin handle
376  * @param tc the scheduling context (for rescheduling this function again)
377  */
378 static void
379 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
380
381
382 /**
383  * We have been notified that our readset has something to read.  We don't
384  * know which socket needs to be read, so we have to check each one
385  * Then reschedule this function to be called again once more is available.
386  *
387  * @param cls the plugin handle
388  * @param tc the scheduling context (for rescheduling this function again)
389  */
390 static void
391 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
392
393
394 /**
395  * Start session timeout
396  */
397 static void
398 start_session_timeout (struct Session *s);
399
400 /**
401  * Increment session timeout due to activity
402  */
403 static void
404 reschedule_session_timeout (struct Session *s);
405
406 /**
407  * Cancel timeout
408  */
409 static void
410 stop_session_timeout (struct Session *s);
411
412
413 /**
414  * (re)schedule select tasks for this plugin.
415  *
416  * @param plugin plugin to reschedule
417  */
418 static void
419 schedule_select (struct Plugin *plugin)
420 {
421   struct GNUNET_TIME_Relative min_delay;
422   struct UDP_MessageWrapper *udpw;
423
424   if (NULL != plugin->sockv4)
425   {
426     /* Find a message ready to send:
427      * Flow delay from other peer is expired or not set (0) */
428     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
429     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
430       min_delay = GNUNET_TIME_relative_min (min_delay,
431                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
432     
433     if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
434       GNUNET_SCHEDULER_cancel(plugin->select_task);
435
436     /* Schedule with:
437      * - write active set if message is ready
438      * - timeout minimum delay */
439     plugin->select_task =
440       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
441                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
442                                    plugin->rs_v4,
443                                    (0 == min_delay.rel_value) ? plugin->ws_v4 : NULL,
444                                    &udp_plugin_select, plugin);  
445   }
446   if (NULL != plugin->sockv6)
447   {
448     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
449     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
450       min_delay = GNUNET_TIME_relative_min (min_delay,
451                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
452     
453     if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task_v6)
454       GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
455     plugin->select_task_v6 =
456       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
457                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
458                                    plugin->rs_v6,
459                                    (0 == min_delay.rel_value) ? plugin->ws_v6 : NULL,
460                                    &udp_plugin_select_v6, plugin);
461   }
462 }
463
464
465 /**
466  * Function called for a quick conversion of the binary address to
467  * a numeric address.  Note that the caller must not free the
468  * address and that the next call to this function is allowed
469  * to override the address again.
470  *
471  * @param cls closure
472  * @param addr binary address
473  * @param addrlen length of the address
474  * @return string representing the same address
475  */
476 const char *
477 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
478 {
479   static char rbuf[INET6_ADDRSTRLEN + 10];
480   char buf[INET6_ADDRSTRLEN];
481   const void *sb;
482   struct in_addr a4;
483   struct in6_addr a6;
484   const struct IPv4UdpAddress *t4;
485   const struct IPv6UdpAddress *t6;
486   int af;
487   uint16_t port;
488
489   if (addrlen == sizeof (struct IPv6UdpAddress))
490   {
491     t6 = addr;
492     af = AF_INET6;
493     port = ntohs (t6->u6_port);
494     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
495     sb = &a6;
496   }
497   else if (addrlen == sizeof (struct IPv4UdpAddress))
498   {
499     t4 = addr;
500     af = AF_INET;
501     port = ntohs (t4->u4_port);
502     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
503     sb = &a4;
504   }
505   else
506   {
507     GNUNET_break_op (0);
508     return NULL;
509   }
510   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
511   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
512                    buf, port);
513   return rbuf;
514 }
515
516
517 /**
518  * Function called to convert a string address to
519  * a binary address.
520  *
521  * @param cls closure ('struct Plugin*')
522  * @param addr string address
523  * @param addrlen length of the address
524  * @param buf location to store the buffer
525  * @param added location to store the number of bytes in the buffer.
526  *        If the function returns GNUNET_SYSERR, its contents are undefined.
527  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
528  */
529 static int
530 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
531     void **buf, size_t *added)
532 {
533   struct sockaddr_storage socket_address;
534   
535   if ((NULL == addr) || (0 == addrlen))
536   {
537     GNUNET_break (0);
538     return GNUNET_SYSERR;
539   }
540
541   if ('\0' != addr[addrlen - 1])
542   {
543     return GNUNET_SYSERR;
544   }
545
546   if (strlen (addr) != addrlen - 1)
547   {
548     return GNUNET_SYSERR;
549   }
550
551   if (GNUNET_OK != GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
552                                                  &socket_address))
553   {
554     return GNUNET_SYSERR;
555   }
556
557   switch (socket_address.ss_family)
558   {
559   case AF_INET:
560     {
561       struct IPv4UdpAddress *u4;
562       struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
563       u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
564       u4->ipv4_addr = in4->sin_addr.s_addr;
565       u4->u4_port = in4->sin_port;
566       *buf = u4;
567       *added = sizeof (struct IPv4UdpAddress);
568       return GNUNET_OK;
569     }
570   case AF_INET6:
571     {
572       struct IPv6UdpAddress *u6;
573       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
574       u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
575       u6->ipv6_addr = in6->sin6_addr;
576       u6->u6_port = in6->sin6_port;
577       *buf = u6;
578       *added = sizeof (struct IPv6UdpAddress);
579       return GNUNET_OK;
580     }
581   default:
582     GNUNET_break (0);
583     return GNUNET_SYSERR;
584   }
585 }
586
587
588 /**
589  * Append our port and forward the result.
590  *
591  * @param cls a 'struct PrettyPrinterContext'
592  * @param hostname result from DNS resolver
593  */
594 static void
595 append_port (void *cls, const char *hostname)
596 {
597   struct PrettyPrinterContext *ppc = cls;
598   char *ret;
599
600   if (hostname == NULL)
601   {
602     ppc->asc (ppc->asc_cls, NULL);
603     GNUNET_free (ppc);
604     return;
605   }
606   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
607   ppc->asc (ppc->asc_cls, ret);
608   GNUNET_free (ret);
609 }
610
611
612 /**
613  * Convert the transports address to a nice, human-readable
614  * format.
615  *
616  * @param cls closure
617  * @param type name of the transport that generated the address
618  * @param addr one of the addresses of the host, NULL for the last address
619  *        the specific address format depends on the transport
620  * @param addrlen length of the address
621  * @param numeric should (IP) addresses be displayed in numeric form?
622  * @param timeout after how long should we give up?
623  * @param asc function to call on each string
624  * @param asc_cls closure for asc
625  */
626 static void
627 udp_plugin_address_pretty_printer (void *cls, const char *type,
628                                    const void *addr, size_t addrlen,
629                                    int numeric,
630                                    struct GNUNET_TIME_Relative timeout,
631                                    GNUNET_TRANSPORT_AddressStringCallback asc,
632                                    void *asc_cls)
633 {
634   struct PrettyPrinterContext *ppc;
635   const void *sb;
636   size_t sbs;
637   struct sockaddr_in a4;
638   struct sockaddr_in6 a6;
639   const struct IPv4UdpAddress *u4;
640   const struct IPv6UdpAddress *u6;
641   uint16_t port;
642
643   if (addrlen == sizeof (struct IPv6UdpAddress))
644   {
645     u6 = addr;
646     memset (&a6, 0, sizeof (a6));
647     a6.sin6_family = AF_INET6;
648 #if HAVE_SOCKADDR_IN_SIN_LEN
649     a6.sin6_len = sizeof (a6);
650 #endif
651     a6.sin6_port = u6->u6_port;
652     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
653     port = ntohs (u6->u6_port);
654     sb = &a6;
655     sbs = sizeof (a6);
656   }
657   else if (addrlen == sizeof (struct IPv4UdpAddress))
658   {
659     u4 = addr;
660     memset (&a4, 0, sizeof (a4));
661     a4.sin_family = AF_INET;
662 #if HAVE_SOCKADDR_IN_SIN_LEN
663     a4.sin_len = sizeof (a4);
664 #endif
665     a4.sin_port = u4->u4_port;
666     a4.sin_addr.s_addr = u4->ipv4_addr;
667     port = ntohs (u4->u4_port);
668     sb = &a4;
669     sbs = sizeof (a4);
670   }
671   else if (0 == addrlen)
672   {
673     asc (asc_cls, "<inbound connection>");
674     asc (asc_cls, NULL);
675     return;
676   }
677   else
678   {
679     /* invalid address */
680     GNUNET_break_op (0);
681     asc (asc_cls, NULL);
682     return;
683   }
684   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
685   ppc->asc = asc;
686   ppc->asc_cls = asc_cls;
687   ppc->port = port;
688   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
689 }
690
691
692 static void
693 call_continuation (struct UDP_MessageWrapper *udpw, int result)
694 {
695   LOG (GNUNET_ERROR_TYPE_DEBUG,
696       "Calling continuation for %u byte message to `%s' with result %s\n",
697       udpw->payload_size, GNUNET_i2s (&udpw->session->target),
698       (GNUNET_OK == result) ? "OK" : "SYSERR");
699
700   if (NULL == udpw->frag_ctx)
701   {
702       /* Not fragmented message */
703       if (GNUNET_OK == result)
704       {
705         GNUNET_STATISTICS_update (plugin->env->stats,
706                                   "# unfragmented messages transmit with success via UDP",
707                                   1, GNUNET_NO);
708         if (udpw->msg_size >= udpw->payload_size)
709         {
710             GNUNET_STATISTICS_update (plugin->env->stats,
711                                   "# bytes overhead transmitted via UDP",
712                                   udpw->msg_size - udpw->payload_size, GNUNET_NO);
713         }
714         GNUNET_STATISTICS_update (plugin->env->stats,
715                               "# bytes payload transmitted via UDP",
716                               udpw->payload_size, GNUNET_NO);
717       }
718       else
719       {
720           GNUNET_STATISTICS_update (plugin->env->stats,
721                                     "# unfragmented messages transmit with failure via UDP",
722                                     1, GNUNET_NO);
723       }
724       if (NULL != udpw->cont)
725         udpw->cont (udpw->cont_cls, &udpw->session->target, result,
726                   udpw->payload_size, udpw->msg_size);
727   }
728   else
729   {
730       /* Fragmented message */
731       if (GNUNET_OK == result)
732       {
733           /* Fragmented message: only call next_fragment continuation on success */
734           if (NULL != udpw->cont)
735             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
736                       udpw->payload_size, udpw->msg_size);
737       }
738   }
739 }
740
741
742 /**
743  * Check if the given port is plausible (must be either our listen
744  * port or our advertised port).  If it is neither, we return
745  * GNUNET_SYSERR.
746  *
747  * @param plugin global variables
748  * @param in_port port number to check
749  * @return GNUNET_OK if port is either open_port or adv_port
750  */
751 static int
752 check_port (struct Plugin *plugin, uint16_t in_port)
753 {
754   if ((in_port == plugin->port) || (in_port == plugin->aport))
755     return GNUNET_OK;
756   return GNUNET_SYSERR;
757 }
758
759
760 /**
761  * Function that will be called to check if a binary address for this
762  * plugin is well-formed and corresponds to an address for THIS peer
763  * (as per our configuration).  Naturally, if absolutely necessary,
764  * plugins can be a bit conservative in their answer, but in general
765  * plugins should make sure that the address does not redirect
766  * traffic to a 3rd party that might try to man-in-the-middle our
767  * traffic.
768  *
769  * @param cls closure, should be our handle to the Plugin
770  * @param addr pointer to the address
771  * @param addrlen length of addr
772  * @return GNUNET_OK if this is a plausible address for this peer
773  *         and transport, GNUNET_SYSERR if not
774  *
775  */
776 static int
777 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
778 {
779   struct Plugin *plugin = cls;
780   struct IPv4UdpAddress *v4;
781   struct IPv6UdpAddress *v6;
782
783   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
784       (addrlen != sizeof (struct IPv6UdpAddress)))
785   {
786     GNUNET_break_op (0);
787     return GNUNET_SYSERR;
788   }
789   if (addrlen == sizeof (struct IPv4UdpAddress))
790   {
791     v4 = (struct IPv4UdpAddress *) addr;
792     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
793       return GNUNET_SYSERR;
794     if (GNUNET_OK !=
795         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
796                                  sizeof (struct in_addr)))
797       return GNUNET_SYSERR;
798   }
799   else
800   {
801     v6 = (struct IPv6UdpAddress *) addr;
802     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
803     {
804       GNUNET_break_op (0);
805       return GNUNET_SYSERR;
806     }
807     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
808       return GNUNET_SYSERR;
809     if (GNUNET_OK !=
810         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
811                                  sizeof (struct in6_addr)))
812       return GNUNET_SYSERR;
813   }
814   return GNUNET_OK;
815 }
816
817
818 /**
819  * Task to free resources associated with a session.
820  *
821  * @param s session to free
822  */
823 static void
824 free_session (struct Session *s)
825 {
826   if (s->frag_ctx != NULL)
827   {
828     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag);
829     GNUNET_free (s->frag_ctx);
830     s->frag_ctx = NULL;
831   }
832   GNUNET_free (s);
833 }
834
835
836 static void
837 dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
838 {
839   GNUNET_STATISTICS_update (plugin->env->stats,
840                             "# bytes currently in UDP buffers",
841                             -udpw->msg_size, GNUNET_NO);
842   GNUNET_STATISTICS_update (plugin->env->stats,
843                             "# msgs currently in UDP buffers",
844                             -1, GNUNET_NO);
845   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
846     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
847                                  plugin->ipv4_queue_tail, udpw);
848   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
849     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
850                                  plugin->ipv6_queue_tail, udpw);
851 }
852
853 /**
854  * Functions with this signature are called whenever we need
855  * to close a session due to a disconnect or failure to
856  * establish a connection.
857  *
858  * @param s session to close down
859  */
860 static void
861 disconnect_session (struct Session *s)
862 {
863   struct UDP_MessageWrapper *udpw;
864   struct UDP_MessageWrapper *next;
865
866   GNUNET_assert (GNUNET_YES != s->in_destroy);
867   LOG (GNUNET_ERROR_TYPE_DEBUG,
868        "Session %p to peer `%s' address ended \n",
869          s,
870          GNUNET_i2s (&s->target),
871          GNUNET_a2s (s->sock_addr, s->addrlen));
872   stop_session_timeout (s);
873   next = plugin->ipv4_queue_head;
874   while (NULL != (udpw = next))
875   {
876     next = udpw->next;
877     if (udpw->session == s)
878     {
879       dequeue (plugin, udpw);
880       call_continuation(udpw, GNUNET_SYSERR);
881       GNUNET_free (udpw);
882     }
883   }
884   next = plugin->ipv6_queue_head;
885   while (NULL != (udpw = next))
886   {
887     next = udpw->next;
888     if (udpw->session == s)
889     {
890       dequeue (plugin, udpw);
891       call_continuation(udpw, GNUNET_SYSERR);
892       GNUNET_free (udpw);
893     }
894     udpw = next;
895   }
896   plugin->env->session_end (plugin->env->cls, &s->target, s);
897
898   if (NULL != s->frag_ctx)
899   {
900     if (NULL != s->frag_ctx->cont)
901     {
902       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR,
903                          s->frag_ctx->payload_size, s->frag_ctx->on_wire_size);
904       LOG (GNUNET_ERROR_TYPE_DEBUG,
905           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
906           GNUNET_i2s (&s->target));
907     }
908   }
909
910   GNUNET_assert (GNUNET_YES ==
911                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
912                                                        &s->target.hashPubKey,
913                                                        s));
914   GNUNET_STATISTICS_set(plugin->env->stats,
915                         "# UDP sessions active",
916                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
917                         GNUNET_NO);
918   if (s->rc > 0)
919     s->in_destroy = GNUNET_YES;
920   else
921     free_session (s);
922 }
923
924 /**
925  * Destroy a session, plugin is being unloaded.
926  *
927  * @param cls unused
928  * @param key hash of public key of target peer
929  * @param value a 'struct PeerSession*' to clean up
930  * @return GNUNET_OK (continue to iterate)
931  */
932 static int
933 disconnect_and_free_it (void *cls, const struct GNUNET_HashCode * key, void *value)
934 {
935   disconnect_session(value);
936   return GNUNET_OK;
937 }
938
939
940 /**
941  * Disconnect from a remote node.  Clean up session if we have one for this peer
942  *
943  * @param cls closure for this call (should be handle to Plugin)
944  * @param target the peeridentity of the peer to disconnect
945  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
946  */
947 static void
948 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
949 {
950   struct Plugin *plugin = cls;
951   GNUNET_assert (plugin != NULL);
952
953   GNUNET_assert (target != NULL);
954   LOG (GNUNET_ERROR_TYPE_DEBUG,
955        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
956   /* Clean up sessions */
957   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
958 }
959
960
961 /**
962  * Session was idle, so disconnect it
963  */
964 static void
965 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
966 {
967   GNUNET_assert (NULL != cls);
968   struct Session *s = cls;
969
970   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
971   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972               "Session %p was idle for %llu ms, disconnecting\n",
973               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
974   /* call session destroy function */
975   disconnect_session (s);
976 }
977
978
979 /**
980  * Start session timeout
981  */
982 static void
983 start_session_timeout (struct Session *s)
984 {
985   GNUNET_assert (NULL != s);
986   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
987   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
988                                                    &session_timeout,
989                                                    s);
990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991               "Timeout for session %p set to %llu ms\n",
992               s,  (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
993 }
994
995
996 /**
997  * Increment session timeout due to activity
998  */
999 static void
1000 reschedule_session_timeout (struct Session *s)
1001 {
1002   GNUNET_assert (NULL != s);
1003   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1004
1005   GNUNET_SCHEDULER_cancel (s->timeout_task);
1006   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1007                                                    &session_timeout,
1008                                                    s);
1009   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010               "Timeout rescheduled for session %p set to %llu ms\n",
1011               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1012 }
1013
1014
1015 /**
1016  * Cancel timeout
1017  */
1018 static void
1019 stop_session_timeout (struct Session *s)
1020 {
1021   GNUNET_assert (NULL != s);
1022
1023   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
1024   {
1025     GNUNET_SCHEDULER_cancel (s->timeout_task);
1026     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1027     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1028                 "Timeout stopped for session %p canceled\n",
1029                 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1030   }
1031 }
1032
1033
1034 static struct Session *
1035 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
1036                 const void *addr, size_t addrlen,
1037                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1038 {
1039   struct Session *s;
1040   const struct IPv4UdpAddress *t4;
1041   const struct IPv6UdpAddress *t6;
1042   struct sockaddr_in *v4;
1043   struct sockaddr_in6 *v6;
1044   size_t len;
1045
1046   switch (addrlen)
1047   {
1048   case sizeof (struct IPv4UdpAddress):
1049     if (NULL == plugin->sockv4)
1050     {
1051       return NULL;
1052     }
1053     t4 = addr;
1054     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
1055     len = sizeof (struct sockaddr_in);
1056     v4 = (struct sockaddr_in *) &s[1];
1057     v4->sin_family = AF_INET;
1058 #if HAVE_SOCKADDR_IN_SIN_LEN
1059     v4->sin_len = sizeof (struct sockaddr_in);
1060 #endif
1061     v4->sin_port = t4->u4_port;
1062     v4->sin_addr.s_addr = t4->ipv4_addr;
1063     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
1064     break;
1065   case sizeof (struct IPv6UdpAddress):
1066     if (NULL == plugin->sockv6)
1067     {
1068       return NULL;
1069     }
1070     t6 = addr;
1071     s =
1072         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
1073     len = sizeof (struct sockaddr_in6);
1074     v6 = (struct sockaddr_in6 *) &s[1];
1075     v6->sin6_family = AF_INET6;
1076 #if HAVE_SOCKADDR_IN_SIN_LEN
1077     v6->sin6_len = sizeof (struct sockaddr_in6);
1078 #endif
1079     v6->sin6_port = t6->u6_port;
1080     v6->sin6_addr = t6->ipv6_addr;
1081     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
1082     break;
1083   default:
1084     /* Must have a valid address to send to */
1085     GNUNET_break_op (0);
1086     return NULL;
1087   }
1088   s->addrlen = len;
1089   s->target = *target;
1090   s->sock_addr = (const struct sockaddr *) &s[1];
1091   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
1092   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1093   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1094   start_session_timeout (s);
1095   return s;
1096 }
1097
1098
1099 static int
1100 session_cmp_it (void *cls,
1101                 const struct GNUNET_HashCode * key,
1102                 void *value)
1103 {
1104   struct SessionCompareContext * cctx = cls;
1105   const struct GNUNET_HELLO_Address *address = cctx->addr;
1106   struct Session *s = value;
1107
1108   socklen_t s_addrlen = s->addrlen;
1109
1110   LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
1111       udp_address_to_string (NULL, (void *) address->address, address->address_length),
1112       GNUNET_a2s (s->sock_addr, s->addrlen));
1113   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
1114       (s_addrlen == sizeof (struct sockaddr_in)))
1115   {
1116     struct IPv4UdpAddress * u4 = NULL;
1117     u4 = (struct IPv4UdpAddress *) address->address;
1118     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
1119     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
1120         (u4->u4_port == s4->sin_port))
1121     {
1122       cctx->res = s;
1123       return GNUNET_NO;
1124     }
1125
1126   }
1127   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
1128       (s_addrlen == sizeof (struct sockaddr_in6)))
1129   {
1130     struct IPv6UdpAddress * u6 = NULL;
1131     u6 = (struct IPv6UdpAddress *) address->address;
1132     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
1133     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
1134         (u6->u6_port == s6->sin6_port))
1135     {
1136       cctx->res = s;
1137       return GNUNET_NO;
1138     }
1139   }
1140   return GNUNET_YES;
1141 }
1142
1143
1144 /**
1145  * Creates a new outbound session the transport service will use to send data to the
1146  * peer
1147  *
1148  * @param cls the plugin
1149  * @param address the address
1150  * @return the session or NULL of max connections exceeded
1151  */
1152 static struct Session *
1153 udp_plugin_get_session (void *cls,
1154                   const struct GNUNET_HELLO_Address *address)
1155 {
1156   struct Session * s = NULL;
1157   struct Plugin * plugin = cls;
1158   struct IPv6UdpAddress * udp_a6;
1159   struct IPv4UdpAddress * udp_a4;
1160
1161   GNUNET_assert (plugin != NULL);
1162   GNUNET_assert (address != NULL);
1163
1164
1165   if ((address->address == NULL) ||
1166       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1167       (address->address_length != sizeof (struct IPv6UdpAddress))))
1168   {
1169     GNUNET_break (0);
1170     return NULL;
1171   }
1172
1173   if (address->address_length == sizeof (struct IPv4UdpAddress))
1174   {
1175     if (plugin->sockv4 == NULL)
1176       return NULL;
1177     udp_a4 = (struct IPv4UdpAddress *) address->address;
1178     if (udp_a4->u4_port == 0)
1179       return NULL;
1180   }
1181
1182   if (address->address_length == sizeof (struct IPv6UdpAddress))
1183   {
1184     if (plugin->sockv6 == NULL)
1185       return NULL;
1186     udp_a6 = (struct IPv6UdpAddress *) address->address;
1187     if (udp_a6->u6_port == 0)
1188       return NULL;
1189   }
1190
1191   /* check if session already exists */
1192   struct SessionCompareContext cctx;
1193   cctx.addr = address;
1194   cctx.res = NULL;
1195   LOG (GNUNET_ERROR_TYPE_DEBUG,
1196        "Looking for existing session for peer `%s' `%s' \n", 
1197        GNUNET_i2s (&address->peer), 
1198        udp_address_to_string(NULL, address->address, address->address_length));
1199   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
1200   if (cctx.res != NULL)
1201   {
1202     LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
1203     return cctx.res;
1204   }
1205
1206   /* otherwise create new */
1207   s = create_session (plugin,
1208       &address->peer,
1209       address->address,
1210       address->address_length,
1211       NULL, NULL);
1212   LOG (GNUNET_ERROR_TYPE_DEBUG,
1213        "Creating new session %p for peer `%s' address `%s'\n",
1214        s,
1215        GNUNET_i2s(&address->peer),
1216        udp_address_to_string(NULL,address->address,address->address_length));
1217   GNUNET_assert (GNUNET_OK ==
1218                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
1219                                                     &s->target.hashPubKey,
1220                                                     s,
1221                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1222   GNUNET_STATISTICS_set(plugin->env->stats,
1223                         "# UDP sessions active",
1224                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1225                         GNUNET_NO);
1226   return s;
1227 }
1228
1229
1230 static void 
1231 enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1232 {
1233   GNUNET_STATISTICS_update (plugin->env->stats,
1234                             "# bytes currently in UDP buffers",
1235                             udpw->msg_size, GNUNET_NO);
1236   GNUNET_STATISTICS_update (plugin->env->stats,
1237                             "# msgs currently in UDP buffers",
1238                             1, GNUNET_NO);
1239   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1240     GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head,
1241                                  plugin->ipv4_queue_tail, udpw);
1242   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1243     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1244                                  plugin->ipv6_queue_tail, udpw);
1245 }
1246
1247
1248
1249 /**
1250  * Fragment message was transmitted via UDP, let fragmentation know
1251  * to send the next fragment now.
1252  *
1253  * @param cls the 'struct UDPMessageWrapper' of the fragment
1254  * @param target destination peer (ignored)
1255  * @param result GNUNET_OK on success (ignored)
1256  * @param payload bytes payload sent
1257  * @param physical bytes physical sent
1258  */
1259 static void
1260 send_next_fragment (void *cls,
1261                     const struct GNUNET_PeerIdentity *target,
1262                     int result, size_t payload, size_t physical)
1263 {
1264   struct UDP_MessageWrapper *udpw = cls;
1265   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
1266 }
1267
1268
1269 /**
1270  * Function that is called with messages created by the fragmentation
1271  * module.  In the case of the 'proc' callback of the
1272  * GNUNET_FRAGMENT_context_create function, this function must
1273  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
1274  *
1275  * @param cls closure, the 'struct FragmentationContext'
1276  * @param msg the message that was created
1277  */
1278 static void
1279 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1280 {
1281   struct UDP_FragmentationContext *frag_ctx = cls;
1282   struct Plugin *plugin = frag_ctx->plugin;
1283   struct UDP_MessageWrapper * udpw;
1284   size_t msg_len = ntohs (msg->size);
1285  
1286   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1287        "Enqueuing fragment with %u bytes\n", msg_len);
1288   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1289   udpw->session = frag_ctx->session;
1290   udpw->msg_buf = (char *) &udpw[1];
1291   udpw->msg_size = msg_len;
1292   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
1293   udpw->cont = &send_next_fragment;
1294   udpw->cont_cls = udpw;
1295   udpw->timeout = frag_ctx->timeout;
1296   udpw->frag_ctx = frag_ctx;
1297   memcpy (udpw->msg_buf, msg, msg_len);
1298   enqueue (plugin, udpw);
1299   schedule_select (plugin);
1300 }
1301
1302
1303 /**
1304  * Function that can be used by the transport service to transmit
1305  * a message using the plugin.   Note that in the case of a
1306  * peer disconnecting, the continuation MUST be called
1307  * prior to the disconnect notification itself.  This function
1308  * will be called with this peer's HELLO message to initiate
1309  * a fresh connection to another peer.
1310  *
1311  * @param cls closure
1312  * @param s which session must be used
1313  * @param msgbuf the message to transmit
1314  * @param msgbuf_size number of bytes in 'msgbuf'
1315  * @param priority how important is the message (most plugins will
1316  *                 ignore message priority and just FIFO)
1317  * @param to how long to wait at most for the transmission (does not
1318  *                require plugins to discard the message after the timeout,
1319  *                just advisory for the desired delay; most plugins will ignore
1320  *                this as well)
1321  * @param cont continuation to call once the message has
1322  *        been transmitted (or if the transport is ready
1323  *        for the next transmission call; or if the
1324  *        peer disconnected...); can be NULL
1325  * @param cont_cls closure for cont
1326  * @return number of bytes used (on the physical network, with overheads);
1327  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1328  *         and does NOT mean that the message was not transmitted (DV)
1329  */
1330 static ssize_t
1331 udp_plugin_send (void *cls,
1332                   struct Session *s,
1333                   const char *msgbuf, size_t msgbuf_size,
1334                   unsigned int priority,
1335                   struct GNUNET_TIME_Relative to,
1336                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1337 {
1338   struct Plugin *plugin = cls;
1339   size_t udpmlen = msgbuf_size + sizeof (struct UDPMessage);
1340   struct UDP_FragmentationContext * frag_ctx;
1341   struct UDP_MessageWrapper * udpw;
1342   struct UDPMessage *udp;
1343   char mbuf[udpmlen];
1344   GNUNET_assert (plugin != NULL);
1345   GNUNET_assert (s != NULL);
1346
1347   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1348     return GNUNET_SYSERR;
1349   if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1350     return GNUNET_SYSERR;
1351   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1352   {
1353     GNUNET_break (0);
1354     return GNUNET_SYSERR;
1355   }
1356   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1357   {
1358     GNUNET_break (0);
1359     return GNUNET_SYSERR;
1360   }
1361   LOG (GNUNET_ERROR_TYPE_DEBUG,
1362        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1363        udpmlen,
1364        GNUNET_i2s (&s->target),
1365        GNUNET_a2s(s->sock_addr, s->addrlen));
1366   
1367   GNUNET_STATISTICS_update (plugin->env->stats,
1368                             "# bytes payload asked to transmit via UDP",
1369                             msgbuf_size, GNUNET_NO);
1370
1371   /* Message */
1372   udp = (struct UDPMessage *) mbuf;
1373   udp->header.size = htons (udpmlen);
1374   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1375   udp->reserved = htonl (0);
1376   udp->sender = *plugin->env->my_identity;
1377
1378   reschedule_session_timeout(s);
1379   if (udpmlen <= UDP_MTU)
1380   {
1381     /* unfragmented message */
1382     GNUNET_STATISTICS_update (plugin->env->stats,
1383                               "# unfragmented messages asked to transmit via UDP",
1384                               1, GNUNET_NO);
1385     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
1386     udpw->session = s;
1387     udpw->msg_buf = (char *) &udpw[1];
1388     udpw->msg_size = udpmlen; /* message size with UDP overhead */
1389     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
1390     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1391     udpw->cont = cont;
1392     udpw->cont_cls = cont_cls;
1393     udpw->frag_ctx = NULL;
1394     memcpy (udpw->msg_buf, udp, sizeof (struct UDPMessage));
1395     memcpy (&udpw->msg_buf[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1396     enqueue (plugin, udpw);
1397   }
1398   else
1399   {
1400     /* fragmented message */
1401     GNUNET_STATISTICS_update (plugin->env->stats,
1402                               "# fragmented messages asked to transmit via UDP",
1403                               1, GNUNET_NO);
1404     if  (s->frag_ctx != NULL)
1405       return GNUNET_SYSERR;
1406     memcpy (&udp[1], msgbuf, msgbuf_size);
1407     frag_ctx = GNUNET_malloc (sizeof (struct UDP_FragmentationContext));
1408     frag_ctx->plugin = plugin;
1409     frag_ctx->session = s;
1410     frag_ctx->cont = cont;
1411     frag_ctx->cont_cls = cont_cls;
1412     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1413     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1414     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
1415     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1416               UDP_MTU,
1417               &plugin->tracker,
1418               s->last_expected_delay,
1419               &udp->header,
1420               &enqueue_fragment,
1421               frag_ctx);
1422     s->frag_ctx = frag_ctx;
1423   }
1424   schedule_select (plugin);
1425   return udpmlen;
1426 }
1427
1428
1429 /**
1430  * Our external IP address/port mapping has changed.
1431  *
1432  * @param cls closure, the 'struct LocalAddrList'
1433  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1434  *     the previous (now invalid) one
1435  * @param addr either the previous or the new public IP address
1436  * @param addrlen actual lenght of the address
1437  */
1438 static void
1439 udp_nat_port_map_callback (void *cls, int add_remove,
1440                            const struct sockaddr *addr, socklen_t addrlen)
1441 {
1442   struct Plugin *plugin = cls;
1443   struct IPv4UdpAddress u4;
1444   struct IPv6UdpAddress u6;
1445   void *arg;
1446   size_t args;
1447
1448   /* convert 'addr' to our internal format */
1449   switch (addr->sa_family)
1450   {
1451   case AF_INET:
1452     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1453     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1454     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1455     arg = &u4;
1456     args = sizeof (u4);
1457     break;
1458   case AF_INET6:
1459     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1460     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1461             sizeof (struct in6_addr));
1462     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1463     arg = &u6;
1464     args = sizeof (u6);
1465     break;
1466   default:
1467     GNUNET_break (0);
1468     return;
1469   }
1470   /* modify our published address list */
1471   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args, "udp");
1472 }
1473
1474
1475
1476 /**
1477  * Message tokenizer has broken up an incomming message. Pass it on
1478  * to the service.
1479  *
1480  * @param cls the 'struct Plugin'
1481  * @param client the 'struct SourceInformation'
1482  * @param hdr the actual message
1483  */
1484 static int
1485 process_inbound_tokenized_messages (void *cls, void *client,
1486                                     const struct GNUNET_MessageHeader *hdr)
1487 {
1488   struct Plugin *plugin = cls;
1489   struct SourceInformation *si = client;
1490   struct GNUNET_ATS_Information ats[2];
1491   struct GNUNET_TIME_Relative delay;
1492
1493   GNUNET_assert (si->session != NULL);
1494   if (GNUNET_YES == si->session->in_destroy)
1495     return GNUNET_OK;
1496   /* setup ATS */
1497   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1498   ats[0].value = htonl (1);
1499   ats[1] = si->session->ats;
1500   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1501   delay = plugin->env->receive (plugin->env->cls,
1502                                 &si->sender,
1503                                 hdr,
1504                                 (const struct GNUNET_ATS_Information *) &ats, 2,
1505                                 si->session,
1506                                 si->arg,
1507                                 si->args);
1508   si->session->flow_delay_for_other_peer = delay;
1509   reschedule_session_timeout(si->session);
1510   return GNUNET_OK;
1511 }
1512
1513
1514 /**
1515  * We've received a UDP Message.  Process it (pass contents to main service).
1516  *
1517  * @param plugin plugin context
1518  * @param msg the message
1519  * @param sender_addr sender address
1520  * @param sender_addr_len number of bytes in sender_addr
1521  */
1522 static void
1523 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1524                      const struct sockaddr *sender_addr,
1525                      socklen_t sender_addr_len)
1526 {
1527   struct SourceInformation si;
1528   struct Session * s;
1529   struct IPv4UdpAddress u4;
1530   struct IPv6UdpAddress u6;
1531   const void *arg;
1532   size_t args;
1533
1534   if (0 != ntohl (msg->reserved))
1535   {
1536     GNUNET_break_op (0);
1537     return;
1538   }
1539   if (ntohs (msg->header.size) <
1540       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1541   {
1542     GNUNET_break_op (0);
1543     return;
1544   }
1545
1546   /* convert address */
1547   switch (sender_addr->sa_family)
1548   {
1549   case AF_INET:
1550     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1551     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1552     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1553     arg = &u4;
1554     args = sizeof (u4);
1555     break;
1556   case AF_INET6:
1557     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1558     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1559     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1560     arg = &u6;
1561     args = sizeof (u6);
1562     break;
1563   default:
1564     GNUNET_break (0);
1565     return;
1566   }
1567   LOG (GNUNET_ERROR_TYPE_DEBUG,
1568        "Received message with %u bytes from peer `%s' at `%s'\n",
1569        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1570        GNUNET_a2s (sender_addr, sender_addr_len));
1571
1572   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1573   s = udp_plugin_get_session(plugin, address);
1574   GNUNET_free (address);
1575
1576   /* iterate over all embedded messages */
1577   si.session = s;
1578   si.sender = msg->sender;
1579   si.arg = arg;
1580   si.args = args;
1581   s->rc++;
1582   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1583                              ntohs (msg->header.size) -
1584                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1585   s->rc--;
1586   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1587     free_session (s);
1588 }
1589
1590
1591 /**
1592  * Scan the heap for a receive context with the given address.
1593  *
1594  * @param cls the 'struct FindReceiveContext'
1595  * @param node internal node of the heap
1596  * @param element value stored at the node (a 'struct ReceiveContext')
1597  * @param cost cost associated with the node
1598  * @return GNUNET_YES if we should continue to iterate,
1599  *         GNUNET_NO if not.
1600  */
1601 static int
1602 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1603                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1604 {
1605   struct FindReceiveContext *frc = cls;
1606   struct DefragContext *e = element;
1607
1608   if ((frc->addr_len == e->addr_len) &&
1609       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1610   {
1611     frc->rc = e;
1612     return GNUNET_NO;
1613   }
1614   return GNUNET_YES;
1615 }
1616
1617
1618 /**
1619  * Process a defragmented message.
1620  *
1621  * @param cls the 'struct ReceiveContext'
1622  * @param msg the message
1623  */
1624 static void
1625 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1626 {
1627   struct DefragContext *rc = cls;
1628
1629   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1630   {
1631     GNUNET_break (0);
1632     return;
1633   }
1634   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1635   {
1636     GNUNET_break (0);
1637     return;
1638   }
1639   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1640                        rc->src_addr, rc->addr_len);
1641 }
1642
1643
1644 struct LookupContext
1645 {
1646   const struct sockaddr * addr;
1647
1648   struct Session *res;
1649
1650   size_t addrlen;
1651 };
1652
1653
1654 static int
1655 lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1656 {
1657   struct LookupContext *l_ctx = cls;
1658   struct Session * s = value;
1659
1660   if ((s->addrlen == l_ctx->addrlen) &&
1661       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1662   {
1663     l_ctx->res = s;
1664     return GNUNET_NO;
1665   }
1666   return GNUNET_YES;
1667 }
1668
1669
1670 /**
1671  * Transmit an acknowledgement.
1672  *
1673  * @param cls the 'struct ReceiveContext'
1674  * @param id message ID (unused)
1675  * @param msg ack to transmit
1676  */
1677 static void
1678 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1679 {
1680   struct DefragContext *rc = cls;
1681   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1682   struct UDP_ACK_Message *udp_ack;
1683   uint32_t delay = 0;
1684   struct UDP_MessageWrapper *udpw;
1685   struct Session *s;
1686
1687   struct LookupContext l_ctx;
1688   l_ctx.addr = rc->src_addr;
1689   l_ctx.addrlen = rc->addr_len;
1690   l_ctx.res = NULL;
1691   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1692       &lookup_session_by_addr_it,
1693       &l_ctx);
1694   s = l_ctx.res;
1695
1696   if (NULL == s)
1697     return;
1698
1699   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1700     delay = s->flow_delay_for_other_peer.rel_value;
1701
1702   LOG (GNUNET_ERROR_TYPE_DEBUG,
1703        "Sending ACK to `%s' including delay of %u ms\n",
1704        GNUNET_a2s (rc->src_addr,
1705                    (rc->src_addr->sa_family ==
1706                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1707                                                                      sockaddr_in6)),
1708        delay);
1709   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
1710   udpw->msg_size = msize;
1711   udpw->payload_size = 0;
1712   udpw->session = s;
1713   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1714   udpw->msg_buf = (char *)&udpw[1];
1715   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
1716   udp_ack->header.size = htons ((uint16_t) msize);
1717   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1718   udp_ack->delay = htonl (delay);
1719   udp_ack->sender = *rc->plugin->env->my_identity;
1720   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1721
1722   GNUNET_STATISTICS_update (plugin->env->stats,
1723                             "# messages ACKs transmitted via UDP",
1724                             1, GNUNET_NO);
1725
1726   enqueue (rc->plugin, udpw);
1727 }
1728
1729
1730 static void 
1731 read_process_msg (struct Plugin *plugin,
1732                   const struct GNUNET_MessageHeader *msg,
1733                   const char *addr,
1734                   socklen_t fromlen)
1735 {
1736   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1737   {
1738     GNUNET_break_op (0);
1739     return;
1740   }
1741   process_udp_message (plugin, (const struct UDPMessage *) msg,
1742                        (const struct sockaddr *) addr, fromlen);
1743 }
1744
1745
1746 static void 
1747 read_process_ack (struct Plugin *plugin,
1748                   const struct GNUNET_MessageHeader *msg,
1749                   char *addr,
1750                   socklen_t fromlen)
1751 {
1752   const struct GNUNET_MessageHeader *ack;
1753   const struct UDP_ACK_Message *udp_ack;
1754   struct LookupContext l_ctx;
1755   struct Session *s;
1756   struct GNUNET_TIME_Relative flow_delay;
1757
1758   if (ntohs (msg->size) <
1759       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1760   {
1761     GNUNET_break_op (0);
1762     return;
1763   }
1764   udp_ack = (const struct UDP_ACK_Message *) msg;
1765   l_ctx.addr = (const struct sockaddr *) addr;
1766   l_ctx.addrlen = fromlen;
1767   l_ctx.res = NULL;
1768   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1769       &lookup_session_by_addr_it,
1770       &l_ctx);
1771   s = l_ctx.res;
1772
1773   if ((s == NULL) || (s->frag_ctx == NULL))
1774     return;
1775
1776   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
1777   LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
1778        flow_delay.rel_value);
1779   s->flow_delay_from_other_peer =
1780       GNUNET_TIME_relative_to_absolute (flow_delay);
1781
1782   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1783   if (ntohs (ack->size) !=
1784       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
1785   {
1786     GNUNET_break_op (0);
1787     return;
1788   }
1789
1790   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
1791   {
1792     LOG (GNUNET_ERROR_TYPE_DEBUG,
1793          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
1794          (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1795          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1796     /* Expect more ACKs to arrive */
1797     return;
1798   }
1799
1800   LOG (GNUNET_ERROR_TYPE_DEBUG,
1801        "Message full ACK'ed\n",
1802        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
1803        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1804   s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
1805
1806   struct UDP_MessageWrapper * udpw;
1807   struct UDP_MessageWrapper * tmp;
1808   if (s->addrlen == sizeof (struct sockaddr_in6))
1809   {
1810     udpw = plugin->ipv6_queue_head;
1811     while (NULL != udpw)
1812     {
1813       tmp = udpw->next;
1814       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1815       {
1816         dequeue (plugin, udpw);
1817         GNUNET_free (udpw);
1818       }
1819       udpw = tmp;
1820     }
1821   }
1822   if (s->addrlen == sizeof (struct sockaddr_in))
1823   {
1824     udpw = plugin->ipv4_queue_head;
1825     while (udpw!= NULL)
1826     {
1827       tmp = udpw->next;
1828       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1829       {
1830         dequeue (plugin, udpw);
1831         GNUNET_free (udpw);
1832       }
1833       udpw = tmp;
1834     }
1835   }
1836
1837   GNUNET_STATISTICS_update (plugin->env->stats,
1838                         "# bytes payload transmitted via UDP",
1839                         s->frag_ctx->payload_size, GNUNET_NO);
1840
1841   if (s->frag_ctx->on_wire_size >= s->frag_ctx->payload_size)
1842   {
1843       GNUNET_STATISTICS_update (plugin->env->stats,
1844                             "# bytes overhead transmitted via UDP",
1845                             s->frag_ctx->on_wire_size - s->frag_ctx->payload_size, GNUNET_NO);
1846   }
1847
1848   if (s->frag_ctx->cont != NULL)
1849   {
1850     LOG (GNUNET_ERROR_TYPE_DEBUG,
1851         "Calling continuation for fragmented message to `%s' with result %s\n",
1852         GNUNET_i2s (&s->target), "OK");
1853
1854     s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK,
1855                        s->frag_ctx->payload_size, s->frag_ctx->on_wire_size);
1856   }
1857
1858   GNUNET_free (s->frag_ctx);
1859   s->frag_ctx = NULL;
1860 }
1861
1862
1863 static void 
1864 read_process_fragment (struct Plugin *plugin,
1865                        const struct GNUNET_MessageHeader *msg,
1866                        char *addr,
1867                        socklen_t fromlen)
1868 {
1869   struct DefragContext *d_ctx;
1870   struct GNUNET_TIME_Absolute now;
1871   struct FindReceiveContext frc;
1872
1873   frc.rc = NULL;
1874   frc.addr = (const struct sockaddr *) addr;
1875   frc.addr_len = fromlen;
1876
1877   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
1878        (unsigned int) ntohs (msg->size),
1879        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1880   /* Lookup existing receive context for this address */
1881   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1882                                  &find_receive_context,
1883                                  &frc);
1884   now = GNUNET_TIME_absolute_get ();
1885   d_ctx = frc.rc;
1886
1887   if (d_ctx == NULL)
1888   {
1889     /* Create a new defragmentation context */
1890     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
1891     memcpy (&d_ctx[1], addr, fromlen);
1892     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
1893     d_ctx->addr_len = fromlen;
1894     d_ctx->plugin = plugin;
1895     d_ctx->defrag =
1896         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
1897                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
1898                                           &fragment_msg_proc, &ack_proc);
1899     d_ctx->hnode =
1900         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
1901                                       (GNUNET_CONTAINER_HeapCostType)
1902                                       now.abs_value);
1903     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1904          "Created new defragmentation context for %u-byte fragment from `%s'\n",
1905          (unsigned int) ntohs (msg->size),
1906          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1907   }
1908   else
1909   {
1910     LOG (GNUNET_ERROR_TYPE_DEBUG,
1911          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
1912          (unsigned int) ntohs (msg->size),
1913          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
1914   }
1915
1916   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
1917   {
1918     /* keep this 'rc' from expiring */
1919     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
1920                                        (GNUNET_CONTAINER_HeapCostType)
1921                                        now.abs_value);
1922   }
1923   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
1924       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
1925   {
1926     /* remove 'rc' that was inactive the longest */
1927     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
1928     GNUNET_assert (NULL != d_ctx);
1929     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1930     GNUNET_free (d_ctx);
1931   }
1932 }
1933
1934
1935 /**
1936  * Read and process a message from the given socket.
1937  *
1938  * @param plugin the overall plugin
1939  * @param rsock socket to read from
1940  */
1941 static void
1942 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
1943 {
1944   socklen_t fromlen;
1945   char addr[32];
1946   char buf[65536] GNUNET_ALIGN;
1947   ssize_t size;
1948   const struct GNUNET_MessageHeader *msg;
1949
1950   fromlen = sizeof (addr);
1951   memset (&addr, 0, sizeof (addr));
1952   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
1953                                       (struct sockaddr *) &addr, &fromlen);
1954 #if MINGW
1955   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
1956    * WSAECONNRESET error to indicate that previous sendto() (???)
1957    * on this socket has failed.
1958    */
1959   if ( (-1 == size) && (ECONNRESET == errno) )
1960     return;
1961 #endif
1962   if ( (-1 == size) || (size < sizeof (struct GNUNET_MessageHeader)))
1963   {
1964     GNUNET_break_op (0);
1965     return;
1966   }
1967   msg = (const struct GNUNET_MessageHeader *) buf;
1968
1969   LOG (GNUNET_ERROR_TYPE_DEBUG,
1970        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
1971        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
1972
1973   if (size != ntohs (msg->size))
1974   {
1975     GNUNET_break_op (0);
1976     return;
1977   }
1978
1979   GNUNET_STATISTICS_update (plugin->env->stats,
1980                             "# bytes received via UDP",
1981                             size, GNUNET_NO);
1982
1983   switch (ntohs (msg->type))
1984   {
1985   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
1986     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
1987     return;
1988
1989   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
1990     read_process_msg (plugin, msg, addr, fromlen);
1991     return;
1992
1993   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
1994     read_process_ack (plugin, msg, addr, fromlen);
1995     return;
1996
1997   case GNUNET_MESSAGE_TYPE_FRAGMENT:
1998     read_process_fragment (plugin, msg, addr, fromlen);
1999     return;
2000
2001   default:
2002     GNUNET_break_op (0);
2003     return;
2004   }
2005 }
2006
2007 static struct UDP_MessageWrapper *
2008 remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
2009                                     struct GNUNET_NETWORK_Handle *sock)
2010 {
2011   struct UDP_MessageWrapper *udpw = NULL;
2012   struct GNUNET_TIME_Relative remaining;
2013
2014   udpw = head;
2015   while (udpw != NULL)
2016   {
2017     /* Find messages with timeout */
2018     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2019     if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2020     {
2021       /* Message timed out */
2022       call_continuation (udpw, GNUNET_SYSERR);
2023       if (NULL == udpw->frag_ctx)
2024       {
2025         /* Not fragmented message */
2026         LOG (GNUNET_ERROR_TYPE_DEBUG,
2027              "Message for peer `%s' with size %u timed out\n",
2028              GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2029       }
2030       else
2031       {
2032           /* Fragmented message */
2033           LOG (GNUNET_ERROR_TYPE_DEBUG,
2034                "Fragment for message for peer `%s' with size %u timed out\n",
2035                GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->payload_size);
2036           udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy (udpw->frag_ctx->frag);
2037           GNUNET_free (udpw->frag_ctx);
2038           udpw->session->frag_ctx = NULL;
2039       }
2040
2041       GNUNET_STATISTICS_update (plugin->env->stats,
2042                                 "# messages dismissed due to timeout",
2043                                 1, GNUNET_NO);
2044       /* Remove message */
2045       if (sock == plugin->sockv4)
2046       {
2047         dequeue (plugin, udpw);
2048         GNUNET_free (udpw);
2049         udpw = plugin->ipv4_queue_head;
2050       }
2051       if (sock == plugin->sockv6)
2052       {
2053         dequeue (plugin, udpw);
2054         GNUNET_free (udpw);
2055         udpw = plugin->ipv6_queue_head;
2056       }
2057     }
2058     else
2059     {
2060       /* Message did not time out, check flow delay */
2061       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2062       if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2063       {
2064         /* this message is not delayed */
2065         LOG (GNUNET_ERROR_TYPE_DEBUG, 
2066              "Message for peer `%s' (%u bytes) is not delayed \n",
2067              GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2068         break; /* Found message to send, break */
2069       }
2070       else
2071       {
2072         /* Message is delayed, try next */
2073         LOG (GNUNET_ERROR_TYPE_DEBUG,
2074              "Message for peer `%s' (%u bytes) is delayed for %llu \n",
2075              GNUNET_i2s(&udpw->session->target), udpw->payload_size, remaining.rel_value);
2076         udpw = udpw->next;
2077       }
2078     }
2079   }
2080   return udpw;
2081 }
2082
2083
2084 static void
2085 analyze_send_error (struct Plugin *plugin,
2086                     const struct sockaddr * sa,
2087                     socklen_t slen,
2088                     int error)
2089 {
2090   static int network_down_error;
2091   struct GNUNET_ATS_Information type;
2092
2093  type = plugin->env->get_address_type (plugin->env->cls,sa, slen);
2094  if (((GNUNET_ATS_NET_LAN == ntohl(type.value)) || (GNUNET_ATS_NET_WAN == ntohl(type.value))) &&
2095      ((ENETUNREACH == errno) || (ENETDOWN == errno)))
2096  {
2097    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in)))
2098    {
2099      /* IPv4: "Network unreachable" or "Network down"
2100       *
2101       * This indicates we do not have connectivity
2102       */
2103      LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2104          _("UDP could not transmit message to `%s': "
2105            "Network seems down, please check your network configuration\n"),
2106          GNUNET_a2s (sa, slen));
2107    }
2108    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in6)))
2109    {
2110      /* IPv6: "Network unreachable" or "Network down"
2111       *
2112       * This indicates that this system is IPv6 enabled, but does not
2113       * have a valid global IPv6 address assigned or we do not have
2114       * connectivity
2115       */
2116
2117     LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2118         _("UDP could not transmit message to `%s': "
2119           "Please check your network configuration and disable IPv6 if your "
2120           "connection does not have a global IPv6 address\n"),
2121         GNUNET_a2s (sa, slen));
2122    }
2123  }
2124  else
2125  {
2126    LOG (GNUNET_ERROR_TYPE_WARNING,
2127       "UDP could not transmit message to `%s': `%s'\n",
2128       GNUNET_a2s (sa, slen), STRERROR (error));
2129  }
2130 }
2131
2132 static size_t
2133 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
2134 {
2135   const struct sockaddr * sa;
2136   ssize_t sent;
2137   socklen_t slen;
2138
2139   struct UDP_MessageWrapper *udpw = NULL;
2140
2141   /* Find message to send */
2142   udpw = remove_timeout_messages_and_select ((sock == plugin->sockv4) ? plugin->ipv4_queue_head : plugin->ipv6_queue_head,
2143                                              sock);
2144   if (NULL == udpw)
2145     return 0; /* No message to send */
2146
2147   sa = udpw->session->sock_addr;
2148   slen = udpw->session->addrlen;
2149
2150   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->msg_buf, udpw->msg_size, sa, slen);
2151
2152   if (GNUNET_SYSERR == sent)
2153   {
2154     /* Failure */
2155     analyze_send_error (plugin, sa, slen, errno);
2156     call_continuation(udpw, GNUNET_SYSERR);
2157   }
2158   else
2159   {
2160     /* Success */
2161     LOG (GNUNET_ERROR_TYPE_DEBUG,
2162          "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
2163          (unsigned int) (udpw->msg_size), GNUNET_i2s(&udpw->session->target) ,GNUNET_a2s (sa, slen), (int) sent,
2164          (sent < 0) ? STRERROR (errno) : "ok");
2165     GNUNET_STATISTICS_update (plugin->env->stats,
2166                               "# bytes transmitted via UDP",
2167                               sent, GNUNET_NO);
2168     if (NULL != udpw->frag_ctx)
2169         udpw->frag_ctx->on_wire_size += udpw->msg_size;
2170     call_continuation (udpw, GNUNET_OK);
2171   }
2172   dequeue (plugin, udpw);
2173   GNUNET_free (udpw);
2174   udpw = NULL;
2175
2176   return sent;
2177 }
2178
2179
2180 /**
2181  * We have been notified that our readset has something to read.  We don't
2182  * know which socket needs to be read, so we have to check each one
2183  * Then reschedule this function to be called again once more is available.
2184  *
2185  * @param cls the plugin handle
2186  * @param tc the scheduling context (for rescheduling this function again)
2187  */
2188 static void
2189 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2190 {
2191   struct Plugin *plugin = cls;
2192
2193   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2194   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2195     return;
2196   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
2197        (NULL != plugin->sockv4) &&
2198        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)) )
2199     udp_select_read (plugin, plugin->sockv4);
2200   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2201        (NULL != plugin->sockv4) && 
2202        (NULL != plugin->ipv4_queue_head) &&
2203        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)) )
2204     udp_select_send (plugin, plugin->sockv4);   
2205   schedule_select (plugin);
2206 }
2207
2208
2209 /**
2210  * We have been notified that our readset has something to read.  We don't
2211  * know which socket needs to be read, so we have to check each one
2212  * Then reschedule this function to be called again once more is available.
2213  *
2214  * @param cls the plugin handle
2215  * @param tc the scheduling context (for rescheduling this function again)
2216  */
2217 static void
2218 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2219 {
2220   struct Plugin *plugin = cls;
2221
2222   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2223   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2224     return;
2225   if ( ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) &&
2226        (NULL != plugin->sockv6) &&
2227        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)) )
2228     udp_select_read (plugin, plugin->sockv6);
2229   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2230        (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
2231        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )    
2232     udp_select_send (plugin, plugin->sockv6);
2233   schedule_select (plugin);
2234 }
2235
2236
2237 static int
2238 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
2239 {
2240   int tries;
2241   int sockets_created = 0;
2242   struct sockaddr *serverAddr;
2243   struct sockaddr *addrs[2];
2244   socklen_t addrlens[2];
2245   socklen_t addrlen;
2246
2247   /* Create IPv6 socket */
2248   if (plugin->enable_ipv6 == GNUNET_YES)
2249   {
2250     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2251     if (NULL == plugin->sockv6)
2252     {
2253       LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2254       plugin->enable_ipv6 = GNUNET_NO;
2255     }
2256     else
2257     {
2258 #if HAVE_SOCKADDR_IN_SIN_LEN
2259       serverAddrv6->sin6_len = sizeof (serverAddrv6);
2260 #endif
2261       serverAddrv6->sin6_family = AF_INET6;
2262       serverAddrv6->sin6_addr = in6addr_any;
2263       serverAddrv6->sin6_port = htons (plugin->port);
2264       addrlen = sizeof (struct sockaddr_in6);
2265       serverAddr = (struct sockaddr *) serverAddrv6;
2266       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
2267            ntohs (serverAddrv6->sin6_port));
2268       tries = 0;
2269       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
2270              GNUNET_OK)
2271       {
2272         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
2273         LOG (GNUNET_ERROR_TYPE_DEBUG,
2274              "IPv6 Binding failed, trying new port %d\n",
2275              ntohs (serverAddrv6->sin6_port));
2276         tries++;
2277         if (tries > 10)
2278         {
2279           GNUNET_NETWORK_socket_close (plugin->sockv6);
2280           plugin->sockv6 = NULL;
2281           break;
2282         }
2283       }
2284       if (plugin->sockv6 != NULL)
2285       {
2286         LOG (GNUNET_ERROR_TYPE_DEBUG,
2287              "IPv6 socket created on port %d\n",
2288              ntohs (serverAddrv6->sin6_port));
2289         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
2290         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2291         sockets_created++;
2292       }
2293     }
2294   }
2295
2296   /* Create IPv4 socket */
2297   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2298   if (NULL == plugin->sockv4)
2299   {
2300     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2301   }
2302   else
2303   {
2304 #if HAVE_SOCKADDR_IN_SIN_LEN
2305     serverAddrv4->sin_len = sizeof (serverAddrv4);
2306 #endif
2307     serverAddrv4->sin_family = AF_INET;
2308     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2309     serverAddrv4->sin_port = htons (plugin->port);
2310     addrlen = sizeof (struct sockaddr_in);
2311     serverAddr = (struct sockaddr *) serverAddrv4;
2312
2313     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2314          ntohs (serverAddrv4->sin_port));
2315     tries = 0;
2316     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2317            GNUNET_OK)
2318     {
2319       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2320       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2321            ntohs (serverAddrv4->sin_port));
2322       tries++;
2323       if (tries > 10)
2324       {
2325         GNUNET_NETWORK_socket_close (plugin->sockv4);
2326         plugin->sockv4 = NULL;
2327         break;
2328       }
2329     }
2330     if (plugin->sockv4 != NULL)
2331     {
2332       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2333       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2334       sockets_created++;
2335     }
2336   }
2337
2338   /* Create file descriptors */
2339   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2340   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2341   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2342   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2343   if (NULL != plugin->sockv4)
2344   {
2345     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2346     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2347   }
2348
2349   if (0 == sockets_created)
2350     LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2351   if (plugin->enable_ipv6 == GNUNET_YES)
2352   {
2353     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2354     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2355     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2356     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2357     if (NULL != plugin->sockv6)
2358     {
2359       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2360       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2361     }
2362   }
2363   schedule_select (plugin);
2364   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2365                            GNUNET_NO, plugin->port,
2366                            sockets_created,
2367                            (const struct sockaddr **) addrs, addrlens,
2368                            &udp_nat_port_map_callback, NULL, plugin);
2369
2370   return sockets_created;
2371 }
2372
2373
2374 /**
2375  * The exported method. Makes the core api available via a global and
2376  * returns the udp transport API.
2377  *
2378  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2379  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2380  */
2381 void *
2382 libgnunet_plugin_transport_udp_init (void *cls)
2383 {
2384   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2385   struct GNUNET_TRANSPORT_PluginFunctions *api;
2386   struct Plugin *p;
2387   unsigned long long port;
2388   unsigned long long aport;
2389   unsigned long long broadcast;
2390   unsigned long long udp_max_bps;
2391   unsigned long long enable_v6;
2392   char * bind4_address;
2393   char * bind6_address;
2394   char * fancy_interval;
2395   struct GNUNET_TIME_Relative interval;
2396   struct sockaddr_in serverAddrv4;
2397   struct sockaddr_in6 serverAddrv6;
2398   int res;
2399
2400   if (NULL == env->receive)
2401   {
2402     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2403        initialze the plugin or the API */
2404     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2405     api->cls = NULL;
2406     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2407     api->address_to_string = &udp_address_to_string;
2408     api->string_to_address = &udp_string_to_address;
2409     return api;
2410   }
2411
2412   GNUNET_assert( NULL != env->stats);
2413
2414   /* Get port number */
2415   if (GNUNET_OK !=
2416       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2417                                              &port))
2418     port = 2086;
2419   if (GNUNET_OK !=
2420       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2421                                              "ADVERTISED_PORT", &aport))
2422     aport = port;
2423   if (port > 65535)
2424   {
2425     LOG (GNUNET_ERROR_TYPE_WARNING,
2426          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2427          65535);
2428     return NULL;
2429   }
2430
2431   /* Protocols */
2432   if ((GNUNET_YES ==
2433        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2434                                              "DISABLEV6")))
2435   {
2436     enable_v6 = GNUNET_NO;
2437   }
2438   else
2439     enable_v6 = GNUNET_YES;
2440
2441   /* Addresses */
2442   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2443   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2444
2445   if (GNUNET_YES ==
2446       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2447                                              "BINDTO", &bind4_address))
2448   {
2449     LOG (GNUNET_ERROR_TYPE_DEBUG,
2450          "Binding udp plugin to specific address: `%s'\n",
2451          bind4_address);
2452     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2453     {
2454       GNUNET_free (bind4_address);
2455       return NULL;
2456     }
2457   }
2458
2459   if (GNUNET_YES ==
2460       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2461                                              "BINDTO6", &bind6_address))
2462   {
2463     LOG (GNUNET_ERROR_TYPE_DEBUG,
2464          "Binding udp plugin to specific address: `%s'\n",
2465          bind6_address);
2466     if (1 !=
2467         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2468     {
2469       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2470            bind6_address);
2471       GNUNET_free_non_null (bind4_address);
2472       GNUNET_free (bind6_address);
2473       return NULL;
2474     }
2475   }
2476
2477   /* Enable neighbour discovery */
2478   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2479                                             "BROADCAST");
2480   if (broadcast == GNUNET_SYSERR)
2481     broadcast = GNUNET_NO;
2482
2483   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2484                                            "BROADCAST_INTERVAL", &fancy_interval))
2485   {
2486     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2487   }
2488   else
2489   {
2490      if (GNUNET_SYSERR == GNUNET_STRINGS_fancy_time_to_relative(fancy_interval, &interval))
2491      {
2492        interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
2493      }
2494      GNUNET_free (fancy_interval);
2495   }
2496
2497   /* Maximum datarate */
2498   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2499                                              "MAX_BPS", &udp_max_bps))
2500   {
2501     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2502   }
2503
2504   p = GNUNET_malloc (sizeof (struct Plugin));
2505   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2506
2507   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
2508                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2509   p->sessions = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
2510   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2511   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
2512   p->port = port;
2513   p->aport = aport;
2514   p->broadcast_interval = interval;
2515   p->enable_ipv6 = enable_v6;
2516   p->env = env;
2517
2518   plugin = p;
2519
2520   api->cls = p;
2521   api->send = NULL;
2522   api->disconnect = &udp_disconnect;
2523   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2524   api->address_to_string = &udp_address_to_string;
2525   api->string_to_address = &udp_string_to_address;
2526   api->check_address = &udp_plugin_check_address;
2527   api->get_session = &udp_plugin_get_session;
2528   api->send = &udp_plugin_send;
2529
2530   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2531   res = setup_sockets (p, &serverAddrv6, &serverAddrv4);
2532   if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
2533   {
2534     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2535     GNUNET_free (p);
2536     GNUNET_free (api);
2537     return NULL;
2538   }
2539
2540   if (broadcast == GNUNET_YES)
2541   {
2542     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2543     setup_broadcast (p, &serverAddrv6, &serverAddrv4);
2544   }
2545
2546   GNUNET_free_non_null (bind4_address);
2547   GNUNET_free_non_null (bind6_address);
2548   return api;
2549 }
2550
2551
2552 static int
2553 heap_cleanup_iterator (void *cls,
2554                        struct GNUNET_CONTAINER_HeapNode *
2555                        node, void *element,
2556                        GNUNET_CONTAINER_HeapCostType
2557                        cost)
2558 {
2559   struct DefragContext * d_ctx = element;
2560
2561   GNUNET_CONTAINER_heap_remove_node (node);
2562   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2563   GNUNET_free (d_ctx);
2564
2565   return GNUNET_YES;
2566 }
2567
2568
2569 /**
2570  * The exported method. Makes the core api available via a global and
2571  * returns the udp transport API.
2572  *
2573  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2574  * @return NULL
2575  */
2576 void *
2577 libgnunet_plugin_transport_udp_done (void *cls)
2578 {
2579   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2580   struct Plugin *plugin = api->cls;
2581
2582   if (NULL == plugin)
2583   {
2584     GNUNET_free (api);
2585     return NULL;
2586   }
2587
2588   stop_broadcast (plugin);
2589   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2590   {
2591     GNUNET_SCHEDULER_cancel (plugin->select_task);
2592     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2593   }
2594   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2595   {
2596     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2597     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2598   }
2599
2600   /* Closing sockets */
2601   if (plugin->sockv4 != NULL)
2602   {
2603     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2604     plugin->sockv4 = NULL;
2605   }
2606   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2607   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2608
2609   if (plugin->sockv6 != NULL)
2610   {
2611     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2612     plugin->sockv6 = NULL;
2613
2614     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2615     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2616   }
2617
2618   GNUNET_NAT_unregister (plugin->nat);
2619
2620   if (plugin->defrag_ctxs != NULL)
2621   {
2622     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2623         heap_cleanup_iterator, NULL);
2624     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2625     plugin->defrag_ctxs = NULL;
2626   }
2627   if (plugin->mst != NULL)
2628   {
2629     GNUNET_SERVER_mst_destroy(plugin->mst);
2630     plugin->mst = NULL;
2631   }
2632
2633   /* Clean up leftover messages */
2634   struct UDP_MessageWrapper * udpw;
2635   udpw = plugin->ipv4_queue_head;
2636   while (udpw != NULL)
2637   {
2638     struct UDP_MessageWrapper *tmp = udpw->next;
2639     dequeue (plugin, udpw);
2640     call_continuation(udpw, GNUNET_SYSERR);
2641     GNUNET_free (udpw);
2642
2643     udpw = tmp;
2644   }
2645   udpw = plugin->ipv6_queue_head;
2646   while (udpw != NULL)
2647   {
2648     struct UDP_MessageWrapper *tmp = udpw->next;
2649     dequeue (plugin, udpw);
2650     call_continuation(udpw, GNUNET_SYSERR);
2651     GNUNET_free (udpw);
2652
2653     udpw = tmp;
2654   }
2655
2656   /* Clean up sessions */
2657   LOG (GNUNET_ERROR_TYPE_DEBUG,
2658        "Cleaning up sessions\n");
2659   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2660   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2661
2662   plugin->nat = NULL;
2663   GNUNET_free (plugin);
2664   GNUNET_free (api);
2665   return NULL;
2666 }
2667
2668
2669 /* end of plugin_transport_udp.c */