src: for every AGPL3.0 file, add SPDX identifier.
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2010-2017 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 /**
46  * After how much inactivity should a UDP session time out?
47  */
48 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
49
50 /**
51  * Number of messages we can defragment in parallel.  We only really
52  * defragment 1 message at a time, but if messages get re-ordered, we
53  * may want to keep knowledge about the previous message to avoid
54  * discarding the current message in favor of a single fragment of a
55  * previous message.  3 should be good since we don't expect massive
56  * message reorderings with UDP.
57  */
58 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
59
60 /**
61  * We keep a defragmentation queue per sender address.  How many
62  * sender addresses do we support at the same time? Memory consumption
63  * is roughly a factor of 32k * #UDP_MAX_MESSAGES_IN_DEFRAG times this
64  * value. (So 128 corresponds to 12 MB and should suffice for
65  * connecting to roughly 128 peers via UDP).
66  */
67 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
68
69
70 /**
71  * UDP Message-Packet header (after defragmentation).
72  */
73 struct UDPMessage
74 {
75   /**
76    * Message header.
77    */
78   struct GNUNET_MessageHeader header;
79
80   /**
81    * Always zero for now.
82    */
83   uint32_t reserved;
84
85   /**
86    * What is the identity of the sender
87    */
88   struct GNUNET_PeerIdentity sender;
89
90 };
91
92
93 /**
94  * Closure for #append_port().
95  */
96 struct PrettyPrinterContext
97 {
98   /**
99    * DLL
100    */
101   struct PrettyPrinterContext *next;
102
103   /**
104    * DLL
105    */
106   struct PrettyPrinterContext *prev;
107
108   /**
109    * Our plugin.
110    */
111   struct Plugin *plugin;
112
113   /**
114    * Resolver handle
115    */
116   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
117
118   /**
119    * Function to call with the result.
120    */
121   GNUNET_TRANSPORT_AddressStringCallback asc;
122
123   /**
124    * Clsoure for @e asc.
125    */
126   void *asc_cls;
127
128   /**
129    * Timeout task
130    */
131   struct GNUNET_SCHEDULER_Task *timeout_task;
132
133   /**
134    * Is this an IPv6 address?
135    */
136   int ipv6;
137
138   /**
139    * Options
140    */
141   uint32_t options;
142
143   /**
144    * Port to add after the IP address.
145    */
146   uint16_t port;
147
148 };
149
150
151 /**
152  * Session with another peer.
153  */
154 struct GNUNET_ATS_Session
155 {
156   /**
157    * Which peer is this session for?
158    */
159   struct GNUNET_PeerIdentity target;
160
161   /**
162    * Tokenizer for inbound messages.
163    */
164   struct GNUNET_MessageStreamTokenizer *mst;
165
166   /**
167    * Plugin this session belongs to.
168    */
169   struct Plugin *plugin;
170
171   /**
172    * Context for dealing with fragments.
173    */
174   struct UDP_FragmentationContext *frag_ctx;
175
176   /**
177    * Desired delay for next sending we send to other peer
178    */
179   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
180
181   /**
182    * Desired delay for transmissions we received from other peer.
183    * This is for full messages, the value needs to be adjusted for
184    * fragmented messages.
185    */
186   struct GNUNET_TIME_Relative flow_delay_from_other_peer;
187
188   /**
189    * Session timeout task
190    */
191   struct GNUNET_SCHEDULER_Task *timeout_task;
192
193   /**
194    * When does this session time out?
195    */
196   struct GNUNET_TIME_Absolute timeout;
197
198   /**
199    * What time did we last transmit?
200    */
201   struct GNUNET_TIME_Absolute last_transmit_time;
202
203   /**
204    * expected delay for ACKs
205    */
206   struct GNUNET_TIME_Relative last_expected_ack_delay;
207
208   /**
209    * desired delay between UDP messages
210    */
211   struct GNUNET_TIME_Relative last_expected_msg_delay;
212
213   /**
214    * Our own address.
215    */
216   struct GNUNET_HELLO_Address *address;
217
218   /**
219    * Number of bytes waiting for transmission to this peer.
220    */
221   unsigned long long bytes_in_queue;
222
223   /**
224    * Number of messages waiting for transmission to this peer.
225    */
226   unsigned int msgs_in_queue;
227
228   /**
229    * Reference counter to indicate that this session is
230    * currently being used and must not be destroyed;
231    * setting @e in_destroy will destroy it as soon as
232    * possible.
233    */
234   unsigned int rc;
235
236   /**
237    * Network type of the address.
238    */
239   enum GNUNET_NetworkType scope;
240
241   /**
242    * Is this session about to be destroyed (sometimes we cannot
243    * destroy a session immediately as below us on the stack
244    * there might be code that still uses it; in this case,
245    * @e rc is non-zero).
246    */
247   int in_destroy;
248 };
249
250
251
252 /**
253  * Data structure to track defragmentation contexts based
254  * on the source of the UDP traffic.
255  */
256 struct DefragContext
257 {
258
259   /**
260    * Defragmentation context.
261    */
262   struct GNUNET_DEFRAGMENT_Context *defrag;
263
264   /**
265    * Reference to master plugin struct.
266    */
267   struct Plugin *plugin;
268
269   /**
270    * Node in the defrag heap.
271    */
272   struct GNUNET_CONTAINER_HeapNode *hnode;
273
274   /**
275    * Source address this receive context is for (allocated at the
276    * end of the struct).
277    */
278   const union UdpAddress *udp_addr;
279
280   /**
281    * Who's message(s) are we defragmenting here?
282    * Only initialized once we succeeded and
283    * @e have_sender is set.
284    */
285   struct GNUNET_PeerIdentity sender;
286
287   /**
288    * Length of @e udp_addr.
289    */
290   size_t udp_addr_len;
291
292   /**
293    * Network type the address belongs to.
294    */
295   enum GNUNET_NetworkType network_type;
296
297   /**
298    * Has the @e sender field been initialized yet?
299    */
300   int have_sender;
301 };
302
303
304 /**
305  * Context to send fragmented messages
306  */
307 struct UDP_FragmentationContext
308 {
309   /**
310    * Next in linked list
311    */
312   struct UDP_FragmentationContext *next;
313
314   /**
315    * Previous in linked list
316    */
317   struct UDP_FragmentationContext *prev;
318
319   /**
320    * The plugin
321    */
322   struct Plugin *plugin;
323
324   /**
325    * Handle for fragmentation.
326    */
327   struct GNUNET_FRAGMENT_Context *frag;
328
329   /**
330    * The session this fragmentation context belongs to
331    */
332   struct GNUNET_ATS_Session *session;
333
334   /**
335    * Function to call upon completion of the transmission.
336    */
337   GNUNET_TRANSPORT_TransmitContinuation cont;
338
339   /**
340    * Closure for @e cont.
341    */
342   void *cont_cls;
343
344   /**
345    * Start time.
346    */
347   struct GNUNET_TIME_Absolute start_time;
348
349   /**
350    * Transmission time for the next fragment.  Incremented by
351    * the @e flow_delay_from_other_peer for each fragment when
352    * we setup the fragments.
353    */
354   struct GNUNET_TIME_Absolute next_frag_time;
355
356   /**
357    * Desired delay for transmissions we received from other peer.
358    * Adjusted to be per fragment (UDP_MTU), even though on the
359    * wire it was for "full messages".
360    */
361   struct GNUNET_TIME_Relative flow_delay_from_other_peer;
362
363   /**
364    * Message timeout
365    */
366   struct GNUNET_TIME_Absolute timeout;
367
368   /**
369    * Payload size of original unfragmented message
370    */
371   size_t payload_size;
372
373   /**
374    * Bytes used to send all fragments on wire including UDP overhead
375    */
376   size_t on_wire_size;
377
378 };
379
380
381 /**
382  * Function called when a message is removed from the
383  * transmission queue.
384  *
385  * @param cls closure
386  * @param udpw message wrapper finished
387  * @param result #GNUNET_OK on success (message was sent)
388  *               #GNUNET_SYSERR if the target disconnected
389  *               or we had a timeout or other trouble sending
390  */
391 typedef void
392 (*QueueContinuation) (void *cls,
393                       struct UDP_MessageWrapper *udpw,
394                       int result);
395
396
397 /**
398  * Information we track for each message in the queue.
399  */
400 struct UDP_MessageWrapper
401 {
402   /**
403    * Session this message belongs to
404    */
405   struct GNUNET_ATS_Session *session;
406
407   /**
408    * DLL of messages, previous element
409    */
410   struct UDP_MessageWrapper *prev;
411
412   /**
413    * DLL of messages, next element
414    */
415   struct UDP_MessageWrapper *next;
416
417   /**
418    * Message with @e msg_size bytes including UDP-specific overhead.
419    */
420   char *msg_buf;
421
422   /**
423    * Function to call once the message wrapper is being removed
424    * from the queue (with success or failure).
425    */
426   QueueContinuation qc;
427
428   /**
429    * Closure for @e qc.
430    */
431   void *qc_cls;
432
433   /**
434    * External continuation to call upon completion of the
435    * transmission, NULL if this queue entry is not for a
436    * message from the application.
437    */
438   GNUNET_TRANSPORT_TransmitContinuation cont;
439
440   /**
441    * Closure for @e cont.
442    */
443   void *cont_cls;
444
445   /**
446    * Fragmentation context.
447    * frag_ctx == NULL if transport <= MTU
448    * frag_ctx != NULL if transport > MTU
449    */
450   struct UDP_FragmentationContext *frag_ctx;
451
452   /**
453    * Message enqueue time.
454    */
455   struct GNUNET_TIME_Absolute start_time;
456
457   /**
458    * Desired transmission time for this message, based on the
459    * flow limiting information we got from the other peer.
460    */
461   struct GNUNET_TIME_Absolute transmission_time;
462
463   /**
464    * Message timeout.
465    */
466   struct GNUNET_TIME_Absolute timeout;
467
468   /**
469    * Size of UDP message to send, including UDP-specific overhead.
470    */
471   size_t msg_size;
472
473   /**
474    * Payload size of original message.
475    */
476   size_t payload_size;
477
478 };
479
480
481 GNUNET_NETWORK_STRUCT_BEGIN
482
483 /**
484  * UDP ACK Message-Packet header.
485  */
486 struct UDP_ACK_Message
487 {
488   /**
489    * Message header.
490    */
491   struct GNUNET_MessageHeader header;
492
493   /**
494    * Desired delay for flow control, in us (in NBO).
495    * A value of UINT32_MAX indicates that the other
496    * peer wants us to disconnect.
497    */
498   uint32_t delay GNUNET_PACKED;
499
500   /**
501    * What is the identity of the sender
502    */
503   struct GNUNET_PeerIdentity sender;
504
505 };
506
507 GNUNET_NETWORK_STRUCT_END
508
509
510 /* ************************* Monitoring *********** */
511
512
513 /**
514  * If a session monitor is attached, notify it about the new
515  * session state.
516  *
517  * @param plugin our plugin
518  * @param session session that changed state
519  * @param state new state of the session
520  */
521 static void
522 notify_session_monitor (struct Plugin *plugin,
523                         struct GNUNET_ATS_Session *session,
524                         enum GNUNET_TRANSPORT_SessionState state)
525 {
526   struct GNUNET_TRANSPORT_SessionInfo info;
527
528   if (NULL == plugin->sic)
529     return;
530   if (GNUNET_YES == session->in_destroy)
531     return; /* already destroyed, just RC>0 left-over actions */
532   memset (&info,
533           0,
534           sizeof (info));
535   info.state = state;
536   info.is_inbound = GNUNET_SYSERR; /* hard to say */
537   info.num_msg_pending = session->msgs_in_queue;
538   info.num_bytes_pending = session->bytes_in_queue;
539   /* info.receive_delay remains zero as this is not supported by UDP
540      (cannot selectively not receive from 'some' peer while continuing
541      to receive from others) */
542   info.session_timeout = session->timeout;
543   info.address = session->address;
544   plugin->sic (plugin->sic_cls,
545                session,
546                &info);
547 }
548
549
550 /**
551  * Return information about the given session to the monitor callback.
552  *
553  * @param cls the `struct Plugin` with the monitor callback (`sic`)
554  * @param peer peer we send information about
555  * @param value our `struct GNUNET_ATS_Session` to send information about
556  * @return #GNUNET_OK (continue to iterate)
557  */
558 static int
559 send_session_info_iter (void *cls,
560                         const struct GNUNET_PeerIdentity *peer,
561                         void *value)
562 {
563   struct Plugin *plugin = cls;
564   struct GNUNET_ATS_Session *session = value;
565
566   notify_session_monitor (plugin,
567                           session,
568                           GNUNET_TRANSPORT_SS_INIT);
569   notify_session_monitor (plugin,
570                           session,
571                           GNUNET_TRANSPORT_SS_UP);
572   return GNUNET_OK;
573 }
574
575
576 /**
577  * Begin monitoring sessions of a plugin.  There can only
578  * be one active monitor per plugin (i.e. if there are
579  * multiple monitors, the transport service needs to
580  * multiplex the generated events over all of them).
581  *
582  * @param cls closure of the plugin
583  * @param sic callback to invoke, NULL to disable monitor;
584  *            plugin will being by iterating over all active
585  *            sessions immediately and then enter monitor mode
586  * @param sic_cls closure for @a sic
587  */
588 static void
589 udp_plugin_setup_monitor (void *cls,
590                           GNUNET_TRANSPORT_SessionInfoCallback sic,
591                           void *sic_cls)
592 {
593   struct Plugin *plugin = cls;
594
595   plugin->sic = sic;
596   plugin->sic_cls = sic_cls;
597   if (NULL != sic)
598   {
599     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
600                                            &send_session_info_iter,
601                                            plugin);
602     /* signal end of first iteration */
603     sic (sic_cls,
604          NULL,
605          NULL);
606   }
607 }
608
609
610 /* ****************** Little Helpers ****************** */
611
612
613 /**
614  * Function to free last resources associated with a session.
615  *
616  * @param s session to free
617  */
618 static void
619 free_session (struct GNUNET_ATS_Session *s)
620 {
621   if (NULL != s->address)
622   {
623     GNUNET_HELLO_address_free (s->address);
624     s->address = NULL;
625   }
626   if (NULL != s->frag_ctx)
627   {
628     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag,
629                                      NULL,
630                                      NULL);
631     GNUNET_free (s->frag_ctx);
632     s->frag_ctx = NULL;
633   }
634   if (NULL != s->mst)
635   {
636     GNUNET_MST_destroy (s->mst);
637     s->mst = NULL;
638   }
639   GNUNET_free (s);
640 }
641
642
643 /**
644  * Function that is called to get the keepalive factor.
645  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
646  * calculate the interval between keepalive packets.
647  *
648  * @param cls closure with the `struct Plugin`
649  * @return keepalive factor
650  */
651 static unsigned int
652 udp_query_keepalive_factor (void *cls)
653 {
654   return 15;
655 }
656
657
658 /**
659  * Function obtain the network type for a session
660  *
661  * @param cls closure (`struct Plugin *`)
662  * @param session the session
663  * @return the network type
664  */
665 static enum GNUNET_NetworkType
666 udp_plugin_get_network (void *cls,
667                         struct GNUNET_ATS_Session *session)
668 {
669   return session->scope;
670 }
671
672
673 /**
674  * Function obtain the network type for an address.
675  *
676  * @param cls closure (`struct Plugin *`)
677  * @param address the address
678  * @return the network type
679  */
680 static enum GNUNET_NetworkType
681 udp_plugin_get_network_for_address (void *cls,
682                                     const struct GNUNET_HELLO_Address *address)
683 {
684   struct Plugin *plugin = cls;
685   size_t addrlen;
686   struct sockaddr_in a4;
687   struct sockaddr_in6 a6;
688   const struct IPv4UdpAddress *u4;
689   const struct IPv6UdpAddress *u6;
690   const void *sb;
691   size_t sbs;
692
693   addrlen = address->address_length;
694   if (addrlen == sizeof(struct IPv6UdpAddress))
695   {
696     GNUNET_assert (NULL != address->address); /* make static analysis happy */
697     u6 = address->address;
698     memset (&a6, 0, sizeof(a6));
699 #if HAVE_SOCKADDR_IN_SIN_LEN
700     a6.sin6_len = sizeof (a6);
701 #endif
702     a6.sin6_family = AF_INET6;
703     a6.sin6_port = u6->u6_port;
704     GNUNET_memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
705     sb = &a6;
706     sbs = sizeof(a6);
707   }
708   else if (addrlen == sizeof(struct IPv4UdpAddress))
709   {
710     GNUNET_assert (NULL != address->address); /* make static analysis happy */
711     u4 = address->address;
712     memset (&a4, 0, sizeof(a4));
713 #if HAVE_SOCKADDR_IN_SIN_LEN
714     a4.sin_len = sizeof (a4);
715 #endif
716     a4.sin_family = AF_INET;
717     a4.sin_port = u4->u4_port;
718     a4.sin_addr.s_addr = u4->ipv4_addr;
719     sb = &a4;
720     sbs = sizeof(a4);
721   }
722   else
723   {
724     GNUNET_break (0);
725     return GNUNET_NT_UNSPECIFIED;
726   }
727   return plugin->env->get_address_type (plugin->env->cls,
728                                         sb,
729                                         sbs);
730 }
731
732
733 /* ******************* Event loop ******************** */
734
735 /**
736  * We have been notified that our readset has something to read.  We don't
737  * know which socket needs to be read, so we have to check each one
738  * Then reschedule this function to be called again once more is available.
739  *
740  * @param cls the plugin handle
741  */
742 static void
743 udp_plugin_select_v4 (void *cls);
744
745
746 /**
747  * We have been notified that our readset has something to read.  We don't
748  * know which socket needs to be read, so we have to check each one
749  * Then reschedule this function to be called again once more is available.
750  *
751  * @param cls the plugin handle
752  */
753 static void
754 udp_plugin_select_v6 (void *cls);
755
756
757 /**
758  * (re)schedule IPv4-select tasks for this plugin.
759  *
760  * @param plugin plugin to reschedule
761  */
762 static void
763 schedule_select_v4 (struct Plugin *plugin)
764 {
765   struct GNUNET_TIME_Relative min_delay;
766   struct GNUNET_TIME_Relative delay;
767   struct UDP_MessageWrapper *udpw;
768   struct UDP_MessageWrapper *min_udpw;
769
770   if ( (GNUNET_YES == plugin->enable_ipv4) &&
771        (NULL != plugin->sockv4) )
772   {
773     /* Find a message ready to send:
774      * Flow delay from other peer is expired or not set (0) */
775     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
776     min_udpw = NULL;
777     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
778     {
779       delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
780       if (delay.rel_value_us < min_delay.rel_value_us)
781       {
782         min_delay = delay;
783         min_udpw = udpw;
784       }
785     }
786     if (NULL != plugin->select_task_v4)
787       GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
788     if (NULL != min_udpw)
789     {
790       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
791       {
792         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
793                     "Calculated flow delay for UDPv4 at %s for %s\n",
794                     GNUNET_STRINGS_relative_time_to_string (min_delay,
795                                                             GNUNET_YES),
796                     GNUNET_i2s (&min_udpw->session->target));
797       }
798       else
799       {
800         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801                     "Calculated flow delay for UDPv4 at %s for %s\n",
802                     GNUNET_STRINGS_relative_time_to_string (min_delay,
803                                                             GNUNET_YES),
804                     GNUNET_i2s (&min_udpw->session->target));
805       }
806     }
807     plugin->select_task_v4
808       = GNUNET_SCHEDULER_add_read_net (min_delay,
809                                        plugin->sockv4,
810                                        &udp_plugin_select_v4,
811                                        plugin);
812   }
813 }
814
815
816 /**
817  * (re)schedule IPv6-select tasks for this plugin.
818  *
819  * @param plugin plugin to reschedule
820  */
821 static void
822 schedule_select_v6 (struct Plugin *plugin)
823 {
824   struct GNUNET_TIME_Relative min_delay;
825   struct GNUNET_TIME_Relative delay;
826   struct UDP_MessageWrapper *udpw;
827   struct UDP_MessageWrapper *min_udpw;
828
829   if ( (GNUNET_YES == plugin->enable_ipv6) &&
830        (NULL != plugin->sockv6) )
831   {
832     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
833     min_udpw = NULL;
834     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
835     {
836       delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
837       if (delay.rel_value_us < min_delay.rel_value_us)
838       {
839         min_delay = delay;
840         min_udpw = udpw;
841       }
842     }
843     if (NULL != plugin->select_task_v6)
844       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
845     if (NULL != min_udpw)
846     {
847       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
848       {
849         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
850                     "Calculated flow delay for UDPv6 at %s for %s\n",
851                     GNUNET_STRINGS_relative_time_to_string (min_delay,
852                                                             GNUNET_YES),
853                     GNUNET_i2s (&min_udpw->session->target));
854       }
855       else
856       {
857         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858                     "Calculated flow delay for UDPv6 at %s for %s\n",
859                     GNUNET_STRINGS_relative_time_to_string (min_delay,
860                                                             GNUNET_YES),
861                     GNUNET_i2s (&min_udpw->session->target));
862       }
863     }
864     plugin->select_task_v6
865       = GNUNET_SCHEDULER_add_read_net (min_delay,
866                                        plugin->sockv6,
867                                        &udp_plugin_select_v6,
868                                        plugin);
869   }
870 }
871
872
873 /* ******************* Address to string and back ***************** */
874
875
876 /**
877  * Function called for a quick conversion of the binary address to
878  * a numeric address.  Note that the caller must not free the
879  * address and that the next call to this function is allowed
880  * to override the address again.
881  *
882  * @param cls closure
883  * @param addr binary address (a `union UdpAddress`)
884  * @param addrlen length of the @a addr
885  * @return string representing the same address
886  */
887 const char *
888 udp_address_to_string (void *cls,
889                        const void *addr,
890                        size_t addrlen)
891 {
892   static char rbuf[INET6_ADDRSTRLEN + 10];
893   char buf[INET6_ADDRSTRLEN];
894   const void *sb;
895   struct in_addr a4;
896   struct in6_addr a6;
897   const struct IPv4UdpAddress *t4;
898   const struct IPv6UdpAddress *t6;
899   int af;
900   uint16_t port;
901   uint32_t options;
902
903   if (NULL == addr)
904   {
905     GNUNET_break_op (0);
906     return NULL;
907   }
908
909   if (addrlen == sizeof(struct IPv6UdpAddress))
910   {
911     t6 = addr;
912     af = AF_INET6;
913     options = ntohl (t6->options);
914     port = ntohs (t6->u6_port);
915     a6 = t6->ipv6_addr;
916     sb = &a6;
917   }
918   else if (addrlen == sizeof(struct IPv4UdpAddress))
919   {
920     t4 = addr;
921     af = AF_INET;
922     options = ntohl (t4->options);
923     port = ntohs (t4->u4_port);
924     a4.s_addr = t4->ipv4_addr;
925     sb = &a4;
926   }
927   else
928   {
929     GNUNET_break_op (0);
930     return NULL;
931   }
932   inet_ntop (af,
933              sb,
934              buf,
935              INET6_ADDRSTRLEN);
936   GNUNET_snprintf (rbuf,
937                    sizeof(rbuf),
938                    (af == AF_INET6)
939                    ? "%s.%u.[%s]:%u"
940                    : "%s.%u.%s:%u",
941                    PLUGIN_NAME,
942                    options,
943                    buf,
944                    port);
945   return rbuf;
946 }
947
948
949 /**
950  * Function called to convert a string address to a binary address.
951  *
952  * @param cls closure (`struct Plugin *`)
953  * @param addr string address
954  * @param addrlen length of the address
955  * @param buf location to store the buffer
956  * @param added location to store the number of bytes in the buffer.
957  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
958  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
959  */
960 static int
961 udp_string_to_address (void *cls,
962                        const char *addr,
963                        uint16_t addrlen,
964                        void **buf,
965                        size_t *added)
966 {
967   struct sockaddr_storage socket_address;
968   char *address;
969   char *plugin;
970   char *optionstr;
971   uint32_t options;
972
973   /* Format tcp.options.address:port */
974   address = NULL;
975   plugin = NULL;
976   optionstr = NULL;
977
978   if ((NULL == addr) || (0 == addrlen))
979   {
980     GNUNET_break (0);
981     return GNUNET_SYSERR;
982   }
983   if ('\0' != addr[addrlen - 1])
984   {
985     GNUNET_break (0);
986     return GNUNET_SYSERR;
987   }
988   if (strlen (addr) != addrlen - 1)
989   {
990     GNUNET_break (0);
991     return GNUNET_SYSERR;
992   }
993   plugin = GNUNET_strdup (addr);
994   optionstr = strchr (plugin, '.');
995   if (NULL == optionstr)
996   {
997     GNUNET_break (0);
998     GNUNET_free (plugin);
999     return GNUNET_SYSERR;
1000   }
1001   optionstr[0] = '\0';
1002   optionstr++;
1003   options = atol (optionstr);
1004   address = strchr (optionstr, '.');
1005   if (NULL == address)
1006   {
1007     GNUNET_break (0);
1008     GNUNET_free (plugin);
1009     return GNUNET_SYSERR;
1010   }
1011   address[0] = '\0';
1012   address++;
1013
1014   if (GNUNET_OK !=
1015       GNUNET_STRINGS_to_address_ip (address,
1016                                     strlen (address),
1017                                     &socket_address))
1018   {
1019     GNUNET_break (0);
1020     GNUNET_free (plugin);
1021     return GNUNET_SYSERR;
1022   }
1023   GNUNET_free(plugin);
1024
1025   switch (socket_address.ss_family)
1026   {
1027   case AF_INET:
1028     {
1029       struct IPv4UdpAddress *u4;
1030       const struct sockaddr_in *in4 = (const struct sockaddr_in *) &socket_address;
1031
1032       u4 = GNUNET_new (struct IPv4UdpAddress);
1033       u4->options = htonl (options);
1034       u4->ipv4_addr = in4->sin_addr.s_addr;
1035       u4->u4_port = in4->sin_port;
1036       *buf = u4;
1037       *added = sizeof (struct IPv4UdpAddress);
1038       return GNUNET_OK;
1039     }
1040   case AF_INET6:
1041     {
1042       struct IPv6UdpAddress *u6;
1043       const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &socket_address;
1044
1045       u6 = GNUNET_new (struct IPv6UdpAddress);
1046       u6->options = htonl (options);
1047       u6->ipv6_addr = in6->sin6_addr;
1048       u6->u6_port = in6->sin6_port;
1049       *buf = u6;
1050       *added = sizeof (struct IPv6UdpAddress);
1051       return GNUNET_OK;
1052     }
1053   default:
1054     GNUNET_break (0);
1055     return GNUNET_SYSERR;
1056   }
1057 }
1058
1059
1060 /**
1061  * Append our port and forward the result.
1062  *
1063  * @param cls a `struct PrettyPrinterContext *`
1064  * @param hostname result from DNS resolver
1065  */
1066 static void
1067 append_port (void *cls,
1068              const char *hostname)
1069 {
1070   struct PrettyPrinterContext *ppc = cls;
1071   struct Plugin *plugin = ppc->plugin;
1072   char *ret;
1073
1074   if (NULL == hostname)
1075   {
1076     /* Final call, done */
1077     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
1078                                  plugin->ppc_dll_tail,
1079                                  ppc);
1080     ppc->resolver_handle = NULL;
1081     ppc->asc (ppc->asc_cls,
1082               NULL,
1083               GNUNET_OK);
1084     GNUNET_free (ppc);
1085     return;
1086   }
1087   if (GNUNET_YES == ppc->ipv6)
1088     GNUNET_asprintf (&ret,
1089                      "%s.%u.[%s]:%d",
1090                      PLUGIN_NAME,
1091                      ppc->options,
1092                      hostname,
1093                      ppc->port);
1094   else
1095     GNUNET_asprintf (&ret,
1096                      "%s.%u.%s:%d",
1097                      PLUGIN_NAME,
1098                      ppc->options,
1099                      hostname,
1100                      ppc->port);
1101   ppc->asc (ppc->asc_cls,
1102             ret,
1103             GNUNET_OK);
1104   GNUNET_free (ret);
1105 }
1106
1107
1108 /**
1109  * Convert the transports address to a nice, human-readable format.
1110  *
1111  * @param cls closure with the `struct Plugin *`
1112  * @param type name of the transport that generated the address
1113  * @param addr one of the addresses of the host, NULL for the last address
1114  *        the specific address format depends on the transport;
1115  *        a `union UdpAddress`
1116  * @param addrlen length of the address
1117  * @param numeric should (IP) addresses be displayed in numeric form?
1118  * @param timeout after how long should we give up?
1119  * @param asc function to call on each string
1120  * @param asc_cls closure for @a asc
1121  */
1122 static void
1123 udp_plugin_address_pretty_printer (void *cls,
1124                                    const char *type,
1125                                    const void *addr,
1126                                    size_t addrlen,
1127                                    int numeric,
1128                                    struct GNUNET_TIME_Relative timeout,
1129                                    GNUNET_TRANSPORT_AddressStringCallback asc,
1130                                    void *asc_cls)
1131 {
1132   struct Plugin *plugin = cls;
1133   struct PrettyPrinterContext *ppc;
1134   const struct sockaddr *sb;
1135   size_t sbs;
1136   struct sockaddr_in a4;
1137   struct sockaddr_in6 a6;
1138   const struct IPv4UdpAddress *u4;
1139   const struct IPv6UdpAddress *u6;
1140   uint16_t port;
1141   uint32_t options;
1142
1143   if (addrlen == sizeof(struct IPv6UdpAddress))
1144   {
1145     u6 = addr;
1146     memset (&a6,
1147             0,
1148             sizeof (a6));
1149     a6.sin6_family = AF_INET6;
1150 #if HAVE_SOCKADDR_IN_SIN_LEN
1151     a6.sin6_len = sizeof (a6);
1152 #endif
1153     a6.sin6_port = u6->u6_port;
1154     a6.sin6_addr = u6->ipv6_addr;
1155     port = ntohs (u6->u6_port);
1156     options = ntohl (u6->options);
1157     sb = (const struct sockaddr *) &a6;
1158     sbs = sizeof (a6);
1159   }
1160   else if (addrlen == sizeof (struct IPv4UdpAddress))
1161   {
1162     u4 = addr;
1163     memset (&a4,
1164             0,
1165             sizeof(a4));
1166     a4.sin_family = AF_INET;
1167 #if HAVE_SOCKADDR_IN_SIN_LEN
1168     a4.sin_len = sizeof (a4);
1169 #endif
1170     a4.sin_port = u4->u4_port;
1171     a4.sin_addr.s_addr = u4->ipv4_addr;
1172     port = ntohs (u4->u4_port);
1173     options = ntohl (u4->options);
1174     sb = (const struct sockaddr *) &a4;
1175     sbs = sizeof(a4);
1176   }
1177   else
1178   {
1179     /* invalid address */
1180     GNUNET_break_op (0);
1181     asc (asc_cls,
1182          NULL,
1183          GNUNET_SYSERR);
1184     asc (asc_cls,
1185          NULL,
1186          GNUNET_OK);
1187     return;
1188   }
1189   ppc = GNUNET_new (struct PrettyPrinterContext);
1190   ppc->plugin = plugin;
1191   ppc->asc = asc;
1192   ppc->asc_cls = asc_cls;
1193   ppc->port = port;
1194   ppc->options = options;
1195   if (addrlen == sizeof (struct IPv6UdpAddress))
1196     ppc->ipv6 = GNUNET_YES;
1197   else
1198     ppc->ipv6 = GNUNET_NO;
1199   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
1200                                plugin->ppc_dll_tail,
1201                                ppc);
1202   ppc->resolver_handle
1203     = GNUNET_RESOLVER_hostname_get (sb,
1204                                     sbs,
1205                                     ! numeric,
1206                                     timeout,
1207                                     &append_port,
1208                                     ppc);
1209 }
1210
1211
1212 /**
1213  * Check if the given port is plausible (must be either our listen
1214  * port or our advertised port).  If it is neither, we return
1215  * #GNUNET_SYSERR.
1216  *
1217  * @param plugin global variables
1218  * @param in_port port number to check
1219  * @return #GNUNET_OK if port is either our open or advertised port
1220  */
1221 static int
1222 check_port (const struct Plugin *plugin,
1223             uint16_t in_port)
1224 {
1225   if ( (plugin->port == in_port) ||
1226        (plugin->aport == in_port) )
1227     return GNUNET_OK;
1228   return GNUNET_SYSERR;
1229 }
1230
1231
1232 /**
1233  * Function that will be called to check if a binary address for this
1234  * plugin is well-formed and corresponds to an address for THIS peer
1235  * (as per our configuration).  Naturally, if absolutely necessary,
1236  * plugins can be a bit conservative in their answer, but in general
1237  * plugins should make sure that the address does not redirect
1238  * traffic to a 3rd party that might try to man-in-the-middle our
1239  * traffic.
1240  *
1241  * @param cls closure, should be our handle to the Plugin
1242  * @param addr pointer to a `union UdpAddress`
1243  * @param addrlen length of @a addr
1244  * @return #GNUNET_OK if this is a plausible address for this peer
1245  *         and transport, #GNUNET_SYSERR if not
1246  */
1247 static int
1248 udp_plugin_check_address (void *cls,
1249                           const void *addr,
1250                           size_t addrlen)
1251 {
1252   struct Plugin *plugin = cls;
1253   const struct IPv4UdpAddress *v4;
1254   const struct IPv6UdpAddress *v6;
1255
1256   if (sizeof(struct IPv4UdpAddress) == addrlen)
1257   {
1258     struct sockaddr_in s4;
1259
1260     v4 = (const struct IPv4UdpAddress *) addr;
1261     if (GNUNET_OK != check_port (plugin,
1262                                  ntohs (v4->u4_port)))
1263       return GNUNET_SYSERR;
1264     memset (&s4, 0, sizeof (s4));
1265     s4.sin_family = AF_INET;
1266 #if HAVE_SOCKADDR_IN_SIN_LEN
1267     s4.sin_len = sizeof (s4);
1268 #endif
1269     s4.sin_port = v4->u4_port;
1270     s4.sin_addr.s_addr = v4->ipv4_addr;
1271
1272     if (GNUNET_OK !=
1273         GNUNET_NAT_test_address (plugin->nat,
1274                                  &s4,
1275                                  sizeof (struct sockaddr_in)))
1276       return GNUNET_SYSERR;
1277   }
1278   else if (sizeof(struct IPv6UdpAddress) == addrlen)
1279   {
1280     struct sockaddr_in6 s6;
1281
1282     v6 = (const struct IPv6UdpAddress *) addr;
1283     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1284       return GNUNET_OK; /* plausible, if unlikely... */
1285     memset (&s6, 0, sizeof (s6));
1286     s6.sin6_family = AF_INET6;
1287 #if HAVE_SOCKADDR_IN_SIN_LEN
1288     s6.sin6_len = sizeof (s6);
1289 #endif
1290     s6.sin6_port = v6->u6_port;
1291     s6.sin6_addr = v6->ipv6_addr;
1292
1293     if (GNUNET_OK !=
1294         GNUNET_NAT_test_address (plugin->nat,
1295                                  &s6,
1296                                  sizeof(struct sockaddr_in6)))
1297       return GNUNET_SYSERR;
1298   }
1299   else
1300   {
1301     GNUNET_break_op (0);
1302     return GNUNET_SYSERR;
1303   }
1304   return GNUNET_OK;
1305 }
1306
1307
1308 /**
1309  * Our external IP address/port mapping has changed.
1310  *
1311  * @param cls closure, the `struct Plugin`
1312  * @param add_remove #GNUNET_YES to mean the new public IP address,
1313  *                   #GNUNET_NO to mean the previous (now invalid) one
1314  * @param ac address class the address belongs to
1315  * @param addr either the previous or the new public IP address
1316  * @param addrlen actual length of the @a addr
1317  */
1318 static void
1319 udp_nat_port_map_callback (void *cls,
1320                            int add_remove,
1321                            enum GNUNET_NAT_AddressClass ac,
1322                            const struct sockaddr *addr,
1323                            socklen_t addrlen)
1324 {
1325   struct Plugin *plugin = cls;
1326   struct GNUNET_HELLO_Address *address;
1327   struct IPv4UdpAddress u4;
1328   struct IPv6UdpAddress u6;
1329   void *arg;
1330   size_t args;
1331
1332   LOG (GNUNET_ERROR_TYPE_DEBUG,
1333        (GNUNET_YES == add_remove)
1334        ? "NAT notification to add address `%s'\n"
1335        : "NAT notification to remove address `%s'\n",
1336        GNUNET_a2s (addr,
1337                    addrlen));
1338   /* convert 'address' to our internal format */
1339   switch (addr->sa_family)
1340   {
1341   case AF_INET:
1342     {
1343       const struct sockaddr_in *i4;
1344
1345       GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
1346       i4 = (const struct sockaddr_in *) addr;
1347       if (0 == ntohs (i4->sin_port))
1348         return; /* Port = 0 means unmapped, ignore these for UDP. */
1349       memset (&u4,
1350               0,
1351               sizeof(u4));
1352       u4.options = htonl (plugin->myoptions);
1353       u4.ipv4_addr = i4->sin_addr.s_addr;
1354       u4.u4_port = i4->sin_port;
1355       arg = &u4;
1356       args = sizeof (struct IPv4UdpAddress);
1357       break;
1358     }
1359   case AF_INET6:
1360     {
1361       const struct sockaddr_in6 *i6;
1362
1363       GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
1364       i6 = (const struct sockaddr_in6 *) addr;
1365       if (0 == ntohs (i6->sin6_port))
1366         return; /* Port = 0 means unmapped, ignore these for UDP. */
1367       memset (&u6,
1368               0,
1369               sizeof(u6));
1370       u6.options = htonl (plugin->myoptions);
1371       u6.ipv6_addr = i6->sin6_addr;
1372       u6.u6_port = i6->sin6_port;
1373       arg = &u6;
1374       args = sizeof (struct IPv6UdpAddress);
1375       break;
1376     }
1377   default:
1378     GNUNET_break (0);
1379     return;
1380   }
1381   /* modify our published address list */
1382   /* TODO: use 'ac' here in the future... */
1383   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
1384                                            PLUGIN_NAME,
1385                                            arg,
1386                                            args,
1387                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
1388   plugin->env->notify_address (plugin->env->cls,
1389                                add_remove,
1390                                address);
1391   GNUNET_HELLO_address_free (address);
1392 }
1393
1394
1395 /* ********************* Finding sessions ******************* */
1396
1397
1398 /**
1399  * Closure for #session_cmp_it().
1400  */
1401 struct GNUNET_ATS_SessionCompareContext
1402 {
1403   /**
1404    * Set to session matching the address.
1405    */
1406   struct GNUNET_ATS_Session *res;
1407
1408   /**
1409    * Address we are looking for.
1410    */
1411   const struct GNUNET_HELLO_Address *address;
1412 };
1413
1414
1415 /**
1416  * Find a session with a matching address.
1417  *
1418  * @param cls the `struct GNUNET_ATS_SessionCompareContext *`
1419  * @param key peer identity (unused)
1420  * @param value the `struct GNUNET_ATS_Session *`
1421  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1422  */
1423 static int
1424 session_cmp_it (void *cls,
1425                 const struct GNUNET_PeerIdentity *key,
1426                 void *value)
1427 {
1428   struct GNUNET_ATS_SessionCompareContext *cctx = cls;
1429   struct GNUNET_ATS_Session *s = value;
1430
1431   if (0 == GNUNET_HELLO_address_cmp (s->address,
1432                                      cctx->address))
1433   {
1434     GNUNET_assert (GNUNET_NO == s->in_destroy);
1435     cctx->res = s;
1436     return GNUNET_NO;
1437   }
1438   return GNUNET_OK;
1439 }
1440
1441
1442 /**
1443  * Locate an existing session the transport service is using to
1444  * send data to another peer.  Performs some basic sanity checks
1445  * on the address and then tries to locate a matching session.
1446  *
1447  * @param cls the plugin
1448  * @param address the address we should locate the session by
1449  * @return the session if it exists, or NULL if it is not found
1450  */
1451 static struct GNUNET_ATS_Session *
1452 udp_plugin_lookup_session (void *cls,
1453                            const struct GNUNET_HELLO_Address *address)
1454 {
1455   struct Plugin *plugin = cls;
1456   const struct IPv6UdpAddress *udp_a6;
1457   const struct IPv4UdpAddress *udp_a4;
1458   struct GNUNET_ATS_SessionCompareContext cctx;
1459
1460   if (NULL == address->address)
1461   {
1462     GNUNET_break (0);
1463     return NULL;
1464   }
1465   if (sizeof(struct IPv4UdpAddress) == address->address_length)
1466   {
1467     if (NULL == plugin->sockv4)
1468       return NULL;
1469     udp_a4 = (const struct IPv4UdpAddress *) address->address;
1470     if (0 == udp_a4->u4_port)
1471     {
1472       GNUNET_break (0);
1473       return NULL;
1474     }
1475   }
1476   else if (sizeof(struct IPv6UdpAddress) == address->address_length)
1477   {
1478     if (NULL == plugin->sockv6)
1479       return NULL;
1480     udp_a6 = (const struct IPv6UdpAddress *) address->address;
1481     if (0 == udp_a6->u6_port)
1482     {
1483       GNUNET_break (0);
1484       return NULL;
1485     }
1486   }
1487   else
1488   {
1489     GNUNET_break (0);
1490     return NULL;
1491   }
1492
1493   /* check if session already exists */
1494   cctx.address = address;
1495   cctx.res = NULL;
1496   LOG (GNUNET_ERROR_TYPE_DEBUG,
1497        "Looking for existing session for peer `%s' with address `%s'\n",
1498        GNUNET_i2s (&address->peer),
1499        udp_address_to_string (plugin,
1500                               address->address,
1501                               address->address_length));
1502   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1503                                               &address->peer,
1504                                               &session_cmp_it,
1505                                               &cctx);
1506   if (NULL == cctx.res)
1507     return NULL;
1508   LOG (GNUNET_ERROR_TYPE_DEBUG,
1509        "Found existing session %p\n",
1510        cctx.res);
1511   return cctx.res;
1512 }
1513
1514
1515 /* ********************** Timeout ****************** */
1516
1517
1518 /**
1519  * Increment session timeout due to activity.
1520  *
1521  * @param s session to reschedule timeout activity for
1522  */
1523 static void
1524 reschedule_session_timeout (struct GNUNET_ATS_Session *s)
1525 {
1526   if (GNUNET_YES == s->in_destroy)
1527     return;
1528   GNUNET_assert (NULL != s->timeout_task);
1529   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1530 }
1531
1532
1533
1534 /**
1535  * Function that will be called whenever the transport service wants to
1536  * notify the plugin that a session is still active and in use and
1537  * therefore the session timeout for this session has to be updated
1538  *
1539  * @param cls closure with the `struct Plugin`
1540  * @param peer which peer was the session for
1541  * @param session which session is being updated
1542  */
1543 static void
1544 udp_plugin_update_session_timeout (void *cls,
1545                                    const struct GNUNET_PeerIdentity *peer,
1546                                    struct GNUNET_ATS_Session *session)
1547 {
1548   struct Plugin *plugin = cls;
1549
1550   if (GNUNET_YES !=
1551       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1552                                                     peer,
1553                                                     session))
1554   {
1555     GNUNET_break (0);
1556     return;
1557   }
1558   /* Reschedule session timeout */
1559   reschedule_session_timeout (session);
1560 }
1561
1562
1563 /* ************************* Sending ************************ */
1564
1565
1566 /**
1567  * Remove the given message from the transmission queue and
1568  * update all applicable statistics.
1569  *
1570  * @param plugin the UDP plugin
1571  * @param udpw message wrapper to dequeue
1572  */
1573 static void
1574 dequeue (struct Plugin *plugin,
1575          struct UDP_MessageWrapper *udpw)
1576 {
1577   struct GNUNET_ATS_Session *session = udpw->session;
1578
1579   if (plugin->bytes_in_buffer < udpw->msg_size)
1580   {
1581     GNUNET_break (0);
1582   }
1583   else
1584   {
1585     GNUNET_STATISTICS_update (plugin->env->stats,
1586                               "# UDP, total bytes in send buffers",
1587                               - (long long) udpw->msg_size,
1588                               GNUNET_NO);
1589     plugin->bytes_in_buffer -= udpw->msg_size;
1590   }
1591   GNUNET_STATISTICS_update (plugin->env->stats,
1592                             "# UDP, total messages in send buffers",
1593                             -1,
1594                             GNUNET_NO);
1595   if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1596   {
1597     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1598                                  plugin->ipv4_queue_tail,
1599                                  udpw);
1600   }
1601   else if (sizeof(struct IPv6UdpAddress) == udpw->session->address->address_length)
1602   {
1603     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1604                                  plugin->ipv6_queue_tail,
1605                                  udpw);
1606   }
1607   else
1608   {
1609     GNUNET_break (0);
1610     return;
1611   }
1612   GNUNET_assert (session->msgs_in_queue > 0);
1613   session->msgs_in_queue--;
1614   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1615   session->bytes_in_queue -= udpw->msg_size;
1616 }
1617
1618
1619 /**
1620  * Enqueue a message for transmission and update statistics.
1621  *
1622  * @param plugin the UDP plugin
1623  * @param udpw message wrapper to queue
1624  */
1625 static void
1626 enqueue (struct Plugin *plugin,
1627          struct UDP_MessageWrapper *udpw)
1628 {
1629   struct GNUNET_ATS_Session *session = udpw->session;
1630
1631   if (GNUNET_YES == session->in_destroy)
1632   {
1633     GNUNET_break (0);
1634     GNUNET_free (udpw);
1635     return;
1636   }
1637   if (plugin->bytes_in_buffer > INT64_MAX - udpw->msg_size)
1638   {
1639     GNUNET_break (0);
1640   }
1641   else
1642   {
1643     GNUNET_STATISTICS_update (plugin->env->stats,
1644                               "# UDP, total bytes in send buffers",
1645                               udpw->msg_size,
1646                               GNUNET_NO);
1647     plugin->bytes_in_buffer += udpw->msg_size;
1648   }
1649   GNUNET_STATISTICS_update (plugin->env->stats,
1650                             "# UDP, total messages in send buffers",
1651                             1,
1652                             GNUNET_NO);
1653   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
1654   {
1655     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1656                                 plugin->ipv4_queue_tail,
1657                                 udpw);
1658   }
1659   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
1660   {
1661     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1662                                  plugin->ipv6_queue_tail,
1663                                  udpw);
1664   }
1665   else
1666   {
1667     GNUNET_break (0);
1668     udpw->cont (udpw->cont_cls,
1669                 &session->target,
1670                 GNUNET_SYSERR,
1671                 udpw->msg_size,
1672                 0);
1673     GNUNET_free (udpw);
1674     return;
1675   }
1676   session->msgs_in_queue++;
1677   session->bytes_in_queue += udpw->msg_size;
1678 }
1679
1680
1681 /**
1682  * We have completed our (attempt) to transmit a message that had to
1683  * be fragmented -- either because we got an ACK saying that all
1684  * fragments were received, or because of timeout / disconnect.  Clean
1685  * up our state.
1686  *
1687  * @param frag_ctx fragmentation context to clean up
1688  * @param result #GNUNET_OK if we succeeded (got ACK),
1689  *               #GNUNET_SYSERR if the transmission failed
1690  */
1691 static void
1692 fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
1693                          int result)
1694 {
1695   struct Plugin *plugin = frag_ctx->plugin;
1696   struct GNUNET_ATS_Session *s = frag_ctx->session;
1697   struct UDP_MessageWrapper *udpw;
1698   struct UDP_MessageWrapper *tmp;
1699   size_t overhead;
1700   struct GNUNET_TIME_Relative delay;
1701
1702   LOG (GNUNET_ERROR_TYPE_DEBUG,
1703        "%p: Fragmented message removed with result %s\n",
1704        frag_ctx,
1705        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1706   /* Call continuation for fragmented message */
1707   if (frag_ctx->on_wire_size >= frag_ctx->payload_size)
1708     overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
1709   else
1710     overhead = frag_ctx->on_wire_size;
1711   delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
1712   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1713   {
1714     LOG (GNUNET_ERROR_TYPE_WARNING,
1715          "Fragmented message acknowledged after %s (expected at %s)\n",
1716          GNUNET_STRINGS_relative_time_to_string (delay,
1717                                                  GNUNET_YES),
1718          GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
1719   }
1720   else
1721   {
1722     LOG (GNUNET_ERROR_TYPE_DEBUG,
1723          "Fragmented message acknowledged after %s (expected at %s)\n",
1724          GNUNET_STRINGS_relative_time_to_string (delay,
1725                                                  GNUNET_YES),
1726          GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
1727   }
1728
1729   if (NULL != frag_ctx->cont)
1730     frag_ctx->cont (frag_ctx->cont_cls,
1731                     &s->target,
1732                     result,
1733                     s->frag_ctx->payload_size,
1734                     frag_ctx->on_wire_size);
1735   GNUNET_STATISTICS_update (plugin->env->stats,
1736                             "# UDP, fragmented messages active",
1737                             -1,
1738                             GNUNET_NO);
1739
1740   if (GNUNET_OK == result)
1741   {
1742     GNUNET_STATISTICS_update (plugin->env->stats,
1743                               "# UDP, fragmented msgs, messages, sent, success",
1744                               1,
1745                               GNUNET_NO);
1746     GNUNET_STATISTICS_update (plugin->env->stats,
1747                               "# UDP, fragmented msgs, bytes payload, sent, success",
1748                               s->frag_ctx->payload_size,
1749                               GNUNET_NO);
1750     GNUNET_STATISTICS_update (plugin->env->stats,
1751                               "# UDP, fragmented msgs, bytes overhead, sent, success",
1752                               overhead,
1753                               GNUNET_NO);
1754     GNUNET_STATISTICS_update (plugin->env->stats,
1755                               "# UDP, total, bytes overhead, sent",
1756                               overhead,
1757                               GNUNET_NO);
1758     GNUNET_STATISTICS_update (plugin->env->stats,
1759                               "# UDP, total, bytes payload, sent",
1760                               s->frag_ctx->payload_size,
1761                               GNUNET_NO);
1762   }
1763   else
1764   {
1765     GNUNET_STATISTICS_update (plugin->env->stats,
1766                               "# UDP, fragmented msgs, messages, sent, failure",
1767                               1,
1768                               GNUNET_NO);
1769     GNUNET_STATISTICS_update (plugin->env->stats,
1770                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1771                               s->frag_ctx->payload_size,
1772                               GNUNET_NO);
1773     GNUNET_STATISTICS_update (plugin->env->stats,
1774                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1775                               overhead,
1776                               GNUNET_NO);
1777     GNUNET_STATISTICS_update (plugin->env->stats,
1778                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1779                               overhead,
1780                               GNUNET_NO);
1781   }
1782
1783   /* Remove remaining fragments from queue, no need to transmit those
1784      any longer. */
1785   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1786   {
1787     udpw = plugin->ipv6_queue_head;
1788     while (NULL != udpw)
1789     {
1790       tmp = udpw->next;
1791       if ( (udpw->frag_ctx != NULL) &&
1792            (udpw->frag_ctx == frag_ctx) )
1793       {
1794         dequeue (plugin,
1795                  udpw);
1796         GNUNET_free (udpw);
1797       }
1798       udpw = tmp;
1799     }
1800   }
1801   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1802   {
1803     udpw = plugin->ipv4_queue_head;
1804     while (NULL != udpw)
1805     {
1806       tmp = udpw->next;
1807       if ( (NULL != udpw->frag_ctx) &&
1808            (udpw->frag_ctx == frag_ctx) )
1809       {
1810         dequeue (plugin,
1811                  udpw);
1812         GNUNET_free (udpw);
1813       }
1814       udpw = tmp;
1815     }
1816   }
1817   notify_session_monitor (s->plugin,
1818                           s,
1819                           GNUNET_TRANSPORT_SS_UPDATE);
1820   GNUNET_FRAGMENT_context_destroy (frag_ctx->frag,
1821                                    &s->last_expected_msg_delay,
1822                                    &s->last_expected_ack_delay);
1823   s->frag_ctx = NULL;
1824   GNUNET_free (frag_ctx);
1825 }
1826
1827
1828 /**
1829  * We are finished with a fragment in the message queue.
1830  * Notify the continuation and update statistics.
1831  *
1832  * @param cls the `struct Plugin *`
1833  * @param udpw the queue entry
1834  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1835  */
1836 static void
1837 qc_fragment_sent (void *cls,
1838                   struct UDP_MessageWrapper *udpw,
1839                   int result)
1840 {
1841   struct Plugin *plugin = cls;
1842
1843   GNUNET_assert (NULL != udpw->frag_ctx);
1844   if (GNUNET_OK == result)
1845   {
1846     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1847                 "Fragment of message with %u bytes transmitted to %s\n",
1848                 (unsigned int) udpw->payload_size,
1849                 GNUNET_i2s (&udpw->session->target));
1850     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1851     GNUNET_STATISTICS_update (plugin->env->stats,
1852                               "# UDP, fragmented msgs, fragments, sent, success",
1853                               1,
1854                               GNUNET_NO);
1855     GNUNET_STATISTICS_update (plugin->env->stats,
1856                               "# UDP, fragmented msgs, fragments bytes, sent, success",
1857                               udpw->msg_size,
1858                               GNUNET_NO);
1859   }
1860   else
1861   {
1862     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1863                 "Failed to transmit fragment of message with %u bytes to %s\n",
1864                 (unsigned int) udpw->payload_size,
1865                 GNUNET_i2s (&udpw->session->target));
1866     fragmented_message_done (udpw->frag_ctx,
1867                              GNUNET_SYSERR);
1868     GNUNET_STATISTICS_update (plugin->env->stats,
1869                               "# UDP, fragmented msgs, fragments, sent, failure",
1870                               1,
1871                               GNUNET_NO);
1872     GNUNET_STATISTICS_update (plugin->env->stats,
1873                               "# UDP, fragmented msgs, fragments bytes, sent, failure",
1874                               udpw->msg_size,
1875                               GNUNET_NO);
1876   }
1877 }
1878
1879
1880 /**
1881  * Function that is called with messages created by the fragmentation
1882  * module.  In the case of the `proc` callback of the
1883  * #GNUNET_FRAGMENT_context_create() function, this function must
1884  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1885  *
1886  * @param cls closure, the `struct UDP_FragmentationContext`
1887  * @param msg the message that was created
1888  */
1889 static void
1890 enqueue_fragment (void *cls,
1891                   const struct GNUNET_MessageHeader *msg)
1892 {
1893   struct UDP_FragmentationContext *frag_ctx = cls;
1894   struct Plugin *plugin = frag_ctx->plugin;
1895   struct UDP_MessageWrapper *udpw;
1896   struct GNUNET_ATS_Session *session = frag_ctx->session;
1897   size_t msg_len = ntohs (msg->size);
1898
1899   LOG (GNUNET_ERROR_TYPE_DEBUG,
1900        "Enqueuing fragment with %u bytes\n",
1901        msg_len);
1902   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1903   udpw->session = session;
1904   udpw->msg_buf = (char *) &udpw[1];
1905   udpw->msg_size = msg_len;
1906   udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
1907   udpw->timeout = frag_ctx->timeout;
1908   udpw->start_time = frag_ctx->start_time;
1909   udpw->transmission_time = frag_ctx->next_frag_time;
1910   frag_ctx->next_frag_time
1911     = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
1912                                 frag_ctx->flow_delay_from_other_peer);
1913   udpw->frag_ctx = frag_ctx;
1914   udpw->qc = &qc_fragment_sent;
1915   udpw->qc_cls = plugin;
1916   GNUNET_memcpy (udpw->msg_buf,
1917                  msg,
1918                  msg_len);
1919   enqueue (plugin,
1920            udpw);
1921   if (session->address->address_length == sizeof (struct IPv4UdpAddress))
1922     schedule_select_v4 (plugin);
1923   else
1924     schedule_select_v6 (plugin);
1925 }
1926
1927
1928 /**
1929  * We are finished with a message from the message queue.
1930  * Notify the continuation and update statistics.
1931  *
1932  * @param cls the `struct Plugin *`
1933  * @param udpw the queue entry
1934  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1935  */
1936 static void
1937 qc_message_sent (void *cls,
1938                  struct UDP_MessageWrapper *udpw,
1939                  int result)
1940 {
1941   struct Plugin *plugin = cls;
1942   size_t overhead;
1943   struct GNUNET_TIME_Relative delay;
1944
1945   if (udpw->msg_size >= udpw->payload_size)
1946     overhead = udpw->msg_size - udpw->payload_size;
1947   else
1948     overhead = udpw->msg_size;
1949
1950   if (NULL != udpw->cont)
1951   {
1952     delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
1953     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1954     {
1955       LOG (GNUNET_ERROR_TYPE_WARNING,
1956            "Message sent via UDP with delay of %s\n",
1957            GNUNET_STRINGS_relative_time_to_string (delay,
1958                                                    GNUNET_YES));
1959     }
1960     else
1961     {
1962       LOG (GNUNET_ERROR_TYPE_DEBUG,
1963            "Message sent via UDP with delay of %s\n",
1964            GNUNET_STRINGS_relative_time_to_string (delay,
1965                                                    GNUNET_YES));
1966     }
1967     udpw->cont (udpw->cont_cls,
1968                 &udpw->session->target,
1969                 result,
1970                 udpw->payload_size,
1971                 overhead);
1972   }
1973   if (GNUNET_OK == result)
1974   {
1975     GNUNET_STATISTICS_update (plugin->env->stats,
1976                               "# UDP, unfragmented msgs, messages, sent, success",
1977                               1,
1978                               GNUNET_NO);
1979     GNUNET_STATISTICS_update (plugin->env->stats,
1980                               "# UDP, unfragmented msgs, bytes payload, sent, success",
1981                               udpw->payload_size,
1982                               GNUNET_NO);
1983     GNUNET_STATISTICS_update (plugin->env->stats,
1984                               "# UDP, unfragmented msgs, bytes overhead, sent, success",
1985                               overhead,
1986                               GNUNET_NO);
1987     GNUNET_STATISTICS_update (plugin->env->stats,
1988                               "# UDP, total, bytes overhead, sent",
1989                               overhead,
1990                               GNUNET_NO);
1991     GNUNET_STATISTICS_update (plugin->env->stats,
1992                               "# UDP, total, bytes payload, sent",
1993                               udpw->payload_size,
1994                               GNUNET_NO);
1995   }
1996   else
1997   {
1998     GNUNET_STATISTICS_update (plugin->env->stats,
1999                               "# UDP, unfragmented msgs, messages, sent, failure",
2000                               1,
2001                               GNUNET_NO);
2002     GNUNET_STATISTICS_update (plugin->env->stats,
2003                               "# UDP, unfragmented msgs, bytes payload, sent, failure",
2004                               udpw->payload_size,
2005                               GNUNET_NO);
2006     GNUNET_STATISTICS_update (plugin->env->stats,
2007                               "# UDP, unfragmented msgs, bytes overhead, sent, failure",
2008                               overhead,
2009                               GNUNET_NO);
2010   }
2011 }
2012
2013
2014 /**
2015  * Function that can be used by the transport service to transmit a
2016  * message using the plugin.  Note that in the case of a peer
2017  * disconnecting, the continuation MUST be called prior to the
2018  * disconnect notification itself.  This function will be called with
2019  * this peer's HELLO message to initiate a fresh connection to another
2020  * peer.
2021  *
2022  * @param cls closure
2023  * @param s which session must be used
2024  * @param msgbuf the message to transmit
2025  * @param msgbuf_size number of bytes in @a msgbuf
2026  * @param priority how important is the message (most plugins will
2027  *                 ignore message priority and just FIFO)
2028  * @param to how long to wait at most for the transmission (does not
2029  *                require plugins to discard the message after the timeout,
2030  *                just advisory for the desired delay; most plugins will ignore
2031  *                this as well)
2032  * @param cont continuation to call once the message has
2033  *        been transmitted (or if the transport is ready
2034  *        for the next transmission call; or if the
2035  *        peer disconnected...); can be NULL
2036  * @param cont_cls closure for @a cont
2037  * @return number of bytes used (on the physical network, with overheads);
2038  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
2039  *         and does NOT mean that the message was not transmitted (DV)
2040  */
2041 static ssize_t
2042 udp_plugin_send (void *cls,
2043                  struct GNUNET_ATS_Session *s,
2044                  const char *msgbuf,
2045                  size_t msgbuf_size,
2046                  unsigned int priority,
2047                  struct GNUNET_TIME_Relative to,
2048                  GNUNET_TRANSPORT_TransmitContinuation cont,
2049                  void *cont_cls)
2050 {
2051   struct Plugin *plugin = cls;
2052   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
2053   struct UDP_FragmentationContext *frag_ctx;
2054   struct UDP_MessageWrapper *udpw;
2055   struct UDPMessage *udp;
2056   char mbuf[udpmlen] GNUNET_ALIGN;
2057   struct GNUNET_TIME_Relative latency;
2058
2059   if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
2060        (NULL == plugin->sockv6) )
2061     return GNUNET_SYSERR;
2062   if ( (sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
2063        (NULL == plugin->sockv4) )
2064     return GNUNET_SYSERR;
2065   if (udpmlen >= GNUNET_MAX_MESSAGE_SIZE)
2066   {
2067     GNUNET_break (0);
2068     return GNUNET_SYSERR;
2069   }
2070   if (GNUNET_YES !=
2071       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
2072                                                     &s->target,
2073                                                     s))
2074   {
2075     GNUNET_break (0);
2076     return GNUNET_SYSERR;
2077   }
2078   LOG (GNUNET_ERROR_TYPE_DEBUG,
2079        "UDP transmits %u-byte message to `%s' using address `%s'\n",
2080        udpmlen,
2081        GNUNET_i2s (&s->target),
2082        udp_address_to_string (plugin,
2083                               s->address->address,
2084                               s->address->address_length));
2085
2086   udp = (struct UDPMessage *) mbuf;
2087   udp->header.size = htons (udpmlen);
2088   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2089   udp->reserved = htonl (0);
2090   udp->sender = *plugin->env->my_identity;
2091
2092   /* We do not update the session time out here!  Otherwise this
2093    * session will not timeout since we send keep alive before session
2094    * can timeout.
2095    *
2096    * For UDP we update session timeout only on receive, this will
2097    * cover keep alives, since remote peer will reply with keep alive
2098    * responses!
2099    */
2100   if (udpmlen <= UDP_MTU)
2101   {
2102     /* unfragmented message */
2103     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
2104     udpw->session = s;
2105     udpw->msg_buf = (char *) &udpw[1];
2106     udpw->msg_size = udpmlen; /* message size with UDP overhead */
2107     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2108     udpw->start_time = GNUNET_TIME_absolute_get ();
2109     udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
2110     udpw->transmission_time = s->last_transmit_time;
2111     s->last_transmit_time
2112       = GNUNET_TIME_absolute_add (s->last_transmit_time,
2113                                   s->flow_delay_from_other_peer);
2114     udpw->cont = cont;
2115     udpw->cont_cls = cont_cls;
2116     udpw->frag_ctx = NULL;
2117     udpw->qc = &qc_message_sent;
2118     udpw->qc_cls = plugin;
2119     GNUNET_memcpy (udpw->msg_buf,
2120             udp,
2121             sizeof (struct UDPMessage));
2122     GNUNET_memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
2123             msgbuf,
2124             msgbuf_size);
2125     enqueue (plugin,
2126              udpw);
2127     GNUNET_STATISTICS_update (plugin->env->stats,
2128                               "# UDP, unfragmented messages queued total",
2129                               1,
2130                               GNUNET_NO);
2131     GNUNET_STATISTICS_update (plugin->env->stats,
2132                               "# UDP, unfragmented bytes payload queued total",
2133                               msgbuf_size,
2134                               GNUNET_NO);
2135     if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2136       schedule_select_v4 (plugin);
2137     else
2138       schedule_select_v6 (plugin);
2139   }
2140   else
2141   {
2142     /* fragmented message */
2143     if (NULL != s->frag_ctx)
2144       return GNUNET_SYSERR;
2145     GNUNET_memcpy (&udp[1],
2146             msgbuf,
2147             msgbuf_size);
2148     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2149     frag_ctx->plugin = plugin;
2150     frag_ctx->session = s;
2151     frag_ctx->cont = cont;
2152     frag_ctx->cont_cls = cont_cls;
2153     frag_ctx->start_time = GNUNET_TIME_absolute_get ();
2154     frag_ctx->next_frag_time = s->last_transmit_time;
2155     frag_ctx->flow_delay_from_other_peer
2156       = GNUNET_TIME_relative_divide (s->flow_delay_from_other_peer,
2157                                      1 + (msgbuf_size /
2158                                           UDP_MTU));
2159     frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
2160     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
2161     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
2162     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2163                                                      UDP_MTU,
2164                                                      &plugin->tracker,
2165                                                      s->last_expected_msg_delay,
2166                                                      s->last_expected_ack_delay,
2167                                                      &udp->header,
2168                                                      &enqueue_fragment,
2169                                                      frag_ctx);
2170     s->frag_ctx = frag_ctx;
2171     s->last_transmit_time = frag_ctx->next_frag_time;
2172     latency = GNUNET_TIME_absolute_get_remaining (s->last_transmit_time);
2173     if (latency.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
2174       LOG (GNUNET_ERROR_TYPE_WARNING,
2175            "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
2176            GNUNET_STRINGS_relative_time_to_string (latency,
2177                                                    GNUNET_YES),
2178            GNUNET_i2s (&s->target),
2179            (unsigned int) s->msgs_in_queue);
2180     else
2181       LOG (GNUNET_ERROR_TYPE_DEBUG,
2182            "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
2183            GNUNET_STRINGS_relative_time_to_string (latency,
2184                                                    GNUNET_YES),
2185            GNUNET_i2s (&s->target),
2186            (unsigned int) s->msgs_in_queue);
2187
2188     GNUNET_STATISTICS_update (plugin->env->stats,
2189                               "# UDP, fragmented messages active",
2190                               1,
2191                               GNUNET_NO);
2192     GNUNET_STATISTICS_update (plugin->env->stats,
2193                               "# UDP, fragmented messages, total",
2194                               1,
2195                               GNUNET_NO);
2196     GNUNET_STATISTICS_update (plugin->env->stats,
2197                               "# UDP, fragmented bytes (payload)",
2198                               frag_ctx->payload_size,
2199                               GNUNET_NO);
2200   }
2201   notify_session_monitor (s->plugin,
2202                           s,
2203                           GNUNET_TRANSPORT_SS_UPDATE);
2204   return udpmlen;
2205 }
2206
2207
2208 /* ********************** Receiving ********************** */
2209
2210
2211 /**
2212  * Closure for #find_receive_context().
2213  */
2214 struct FindReceiveContext
2215 {
2216   /**
2217    * Where to store the result.
2218    */
2219   struct DefragContext *rc;
2220
2221   /**
2222    * Session associated with this context.
2223    */
2224   struct GNUNET_ATS_Session *session;
2225
2226   /**
2227    * Address to find.
2228    */
2229   const union UdpAddress *udp_addr;
2230
2231   /**
2232    * Number of bytes in @e udp_addr.
2233    */
2234   size_t udp_addr_len;
2235
2236 };
2237
2238
2239 /**
2240  * Scan the heap for a receive context with the given address.
2241  *
2242  * @param cls the `struct FindReceiveContext`
2243  * @param node internal node of the heap
2244  * @param element value stored at the node (a `struct ReceiveContext`)
2245  * @param cost cost associated with the node
2246  * @return #GNUNET_YES if we should continue to iterate,
2247  *         #GNUNET_NO if not.
2248  */
2249 static int
2250 find_receive_context (void *cls,
2251                       struct GNUNET_CONTAINER_HeapNode *node,
2252                       void *element,
2253                       GNUNET_CONTAINER_HeapCostType cost)
2254 {
2255   struct FindReceiveContext *frc = cls;
2256   struct DefragContext *e = element;
2257
2258   if ( (frc->udp_addr_len == e->udp_addr_len) &&
2259        (0 == memcmp (frc->udp_addr,
2260                      e->udp_addr,
2261                      frc->udp_addr_len)) )
2262   {
2263     frc->rc = e;
2264     return GNUNET_NO;
2265   }
2266   return GNUNET_YES;
2267 }
2268
2269
2270 /**
2271  * Functions with this signature are called whenever we need to close
2272  * a session due to a disconnect or failure to establish a connection.
2273  *
2274  * @param cls closure with the `struct Plugin`
2275  * @param s session to close down
2276  * @return #GNUNET_OK on success
2277  */
2278 static int
2279 udp_disconnect_session (void *cls,
2280                         struct GNUNET_ATS_Session *s)
2281 {
2282   struct Plugin *plugin = cls;
2283   struct UDP_MessageWrapper *udpw;
2284   struct UDP_MessageWrapper *next;
2285   struct FindReceiveContext frc;
2286
2287   GNUNET_assert (GNUNET_YES != s->in_destroy);
2288   LOG (GNUNET_ERROR_TYPE_DEBUG,
2289        "Session %p to peer `%s' at address %s ended\n",
2290        s,
2291        GNUNET_i2s (&s->target),
2292        udp_address_to_string (plugin,
2293                               s->address->address,
2294                               s->address->address_length));
2295   if (NULL != s->timeout_task)
2296   {
2297     GNUNET_SCHEDULER_cancel (s->timeout_task);
2298     s->timeout_task = NULL;
2299   }
2300   if (NULL != s->frag_ctx)
2301   {
2302     /* Remove fragmented message due to disconnect */
2303     fragmented_message_done (s->frag_ctx,
2304                              GNUNET_SYSERR);
2305   }
2306   GNUNET_assert (GNUNET_YES ==
2307                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
2308                                                        &s->target,
2309                                                        s));
2310   frc.rc = NULL;
2311   frc.udp_addr = s->address->address;
2312   frc.udp_addr_len = s->address->address_length;
2313   /* Lookup existing receive context for this address */
2314   if (NULL != plugin->defrag_ctxs)
2315   {
2316     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2317                                    &find_receive_context,
2318                                    &frc);
2319     if (NULL != frc.rc)
2320     {
2321       struct DefragContext *d_ctx = frc.rc;
2322
2323       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
2324       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2325       GNUNET_free (d_ctx);
2326     }
2327   }
2328   s->in_destroy = GNUNET_YES;
2329   next = plugin->ipv4_queue_head;
2330   while (NULL != (udpw = next))
2331   {
2332     next = udpw->next;
2333     if (udpw->session == s)
2334     {
2335       dequeue (plugin,
2336                udpw);
2337       udpw->qc (udpw->qc_cls,
2338                 udpw,
2339                 GNUNET_SYSERR);
2340       GNUNET_free (udpw);
2341     }
2342   }
2343   next = plugin->ipv6_queue_head;
2344   while (NULL != (udpw = next))
2345   {
2346     next = udpw->next;
2347     if (udpw->session == s)
2348     {
2349       dequeue (plugin,
2350                udpw);
2351       udpw->qc (udpw->qc_cls,
2352                 udpw,
2353                 GNUNET_SYSERR);
2354       GNUNET_free (udpw);
2355     }
2356   }
2357   if ( (NULL != s->frag_ctx) &&
2358        (NULL != s->frag_ctx->cont) )
2359   {
2360     /* The 'frag_ctx' itself will be freed in #free_session() a bit
2361        later, as it might be in use right now */
2362     LOG (GNUNET_ERROR_TYPE_DEBUG,
2363          "Calling continuation for fragemented message to `%s' with result SYSERR\n",
2364          GNUNET_i2s (&s->target));
2365     s->frag_ctx->cont (s->frag_ctx->cont_cls,
2366                        &s->target,
2367                        GNUNET_SYSERR,
2368                        s->frag_ctx->payload_size,
2369                        s->frag_ctx->on_wire_size);
2370   }
2371   notify_session_monitor (s->plugin,
2372                           s,
2373                           GNUNET_TRANSPORT_SS_DONE);
2374   plugin->env->session_end (plugin->env->cls,
2375                             s->address,
2376                             s);
2377   GNUNET_STATISTICS_set (plugin->env->stats,
2378                          "# UDP sessions active",
2379                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2380                          GNUNET_NO);
2381   if (0 == s->rc)
2382     free_session (s);
2383   return GNUNET_OK;
2384 }
2385
2386
2387 /**
2388  * Handle a #GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK message.
2389  *
2390  * @param plugin the UDP plugin
2391  * @param msg the (presumed) UDP ACK message
2392  * @param udp_addr sender address
2393  * @param udp_addr_len number of bytes in @a udp_addr
2394  */
2395 static void
2396 read_process_ack (struct Plugin *plugin,
2397                   const struct GNUNET_MessageHeader *msg,
2398                   const union UdpAddress *udp_addr,
2399                   socklen_t udp_addr_len)
2400 {
2401   const struct GNUNET_MessageHeader *ack;
2402   const struct UDP_ACK_Message *udp_ack;
2403   struct GNUNET_HELLO_Address *address;
2404   struct GNUNET_ATS_Session *s;
2405   struct GNUNET_TIME_Relative flow_delay;
2406
2407   /* check message format */
2408   if (ntohs (msg->size)
2409       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2410   {
2411     GNUNET_break_op (0);
2412     return;
2413   }
2414   udp_ack = (const struct UDP_ACK_Message *) msg;
2415   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2416   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2417   {
2418     GNUNET_break_op(0);
2419     return;
2420   }
2421
2422   /* Locate session */
2423   address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
2424                                            PLUGIN_NAME,
2425                                            udp_addr,
2426                                            udp_addr_len,
2427                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2428   s = udp_plugin_lookup_session (plugin,
2429                                  address);
2430   if (NULL == s)
2431   {
2432     LOG (GNUNET_ERROR_TYPE_WARNING,
2433          "UDP session of address %s for ACK not found\n",
2434          udp_address_to_string (plugin,
2435                                 address->address,
2436                                 address->address_length));
2437     GNUNET_HELLO_address_free (address);
2438     return;
2439   }
2440   if (NULL == s->frag_ctx)
2441   {
2442     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2443          "Fragmentation context of address %s for ACK (%s) not found\n",
2444          udp_address_to_string (plugin,
2445                                 address->address,
2446                                 address->address_length),
2447          GNUNET_FRAGMENT_print_ack (ack));
2448     GNUNET_HELLO_address_free (address);
2449     return;
2450   }
2451   GNUNET_HELLO_address_free (address);
2452
2453   /* evaluate flow delay: how long should we wait between messages? */
2454   if (UINT32_MAX == ntohl (udp_ack->delay))
2455   {
2456     /* Other peer asked for us to terminate the session */
2457     LOG (GNUNET_ERROR_TYPE_INFO,
2458          "Asked to disconnect UDP session of %s\n",
2459          GNUNET_i2s (&udp_ack->sender));
2460     udp_disconnect_session (plugin,
2461                             s);
2462     return;
2463   }
2464   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2465   if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
2466     LOG (GNUNET_ERROR_TYPE_WARNING,
2467          "We received a sending delay of %s for %s\n",
2468          GNUNET_STRINGS_relative_time_to_string (flow_delay,
2469                                                  GNUNET_YES),
2470          GNUNET_i2s (&udp_ack->sender));
2471   else
2472     LOG (GNUNET_ERROR_TYPE_DEBUG,
2473          "We received a sending delay of %s for %s\n",
2474          GNUNET_STRINGS_relative_time_to_string (flow_delay,
2475                                                  GNUNET_YES),
2476          GNUNET_i2s (&udp_ack->sender));
2477   /* Flow delay is for the reassembled packet, however, our delay
2478      is per packet, so we need to adjust: */
2479   s->flow_delay_from_other_peer = flow_delay;
2480
2481   /* Handle ACK */
2482   if (GNUNET_OK !=
2483       GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2484                                    ack))
2485   {
2486     LOG (GNUNET_ERROR_TYPE_DEBUG,
2487          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2488          (unsigned int) ntohs (msg->size),
2489          GNUNET_i2s (&udp_ack->sender),
2490          udp_address_to_string (plugin,
2491                                 udp_addr,
2492                                 udp_addr_len));
2493     /* Expect more ACKs to arrive */
2494     return;
2495   }
2496
2497   /* Remove fragmented message after successful sending */
2498   LOG (GNUNET_ERROR_TYPE_DEBUG,
2499        "Message from %s at %s full ACK'ed\n",
2500        GNUNET_i2s (&udp_ack->sender),
2501        udp_address_to_string (plugin,
2502                               udp_addr,
2503                               udp_addr_len));
2504   fragmented_message_done (s->frag_ctx,
2505                            GNUNET_OK);
2506 }
2507
2508
2509 /**
2510  * Message tokenizer has broken up an incomming message. Pass it on
2511  * to the service.
2512  *
2513  * @param cls the `struct GNUNET_ATS_Session *`
2514  * @param hdr the actual message
2515  * @return #GNUNET_OK (always)
2516  */
2517 static int
2518 process_inbound_tokenized_messages (void *cls,
2519                                     const struct GNUNET_MessageHeader *hdr)
2520 {
2521   struct GNUNET_ATS_Session *session = cls;
2522   struct Plugin *plugin = session->plugin;
2523
2524   if (GNUNET_YES == session->in_destroy)
2525     return GNUNET_OK;
2526   reschedule_session_timeout (session);
2527   session->flow_delay_for_other_peer
2528     = plugin->env->receive (plugin->env->cls,
2529                             session->address,
2530                             session,
2531                             hdr);
2532   return GNUNET_OK;
2533 }
2534
2535
2536 /**
2537  * Destroy a session, plugin is being unloaded.
2538  *
2539  * @param cls the `struct Plugin`
2540  * @param key hash of public key of target peer
2541  * @param value a `struct PeerSession *` to clean up
2542  * @return #GNUNET_OK (continue to iterate)
2543  */
2544 static int
2545 disconnect_and_free_it (void *cls,
2546                         const struct GNUNET_PeerIdentity *key,
2547                         void *value)
2548 {
2549   struct Plugin *plugin = cls;
2550
2551   udp_disconnect_session (plugin,
2552                           value);
2553   return GNUNET_OK;
2554 }
2555
2556
2557 /**
2558  * Disconnect from a remote node.  Clean up session if we have one for
2559  * this peer.
2560  *
2561  * @param cls closure for this call (should be handle to Plugin)
2562  * @param target the peeridentity of the peer to disconnect
2563  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
2564  */
2565 static void
2566 udp_disconnect (void *cls,
2567                 const struct GNUNET_PeerIdentity *target)
2568 {
2569   struct Plugin *plugin = cls;
2570
2571   LOG (GNUNET_ERROR_TYPE_DEBUG,
2572        "Disconnecting from peer `%s'\n",
2573        GNUNET_i2s (target));
2574   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
2575                                               target,
2576                                               &disconnect_and_free_it,
2577                                               plugin);
2578 }
2579
2580
2581 /**
2582  * Session was idle, so disconnect it.
2583  *
2584  * @param cls the `struct GNUNET_ATS_Session` to time out
2585  */
2586 static void
2587 session_timeout (void *cls)
2588 {
2589   struct GNUNET_ATS_Session *s = cls;
2590   struct Plugin *plugin = s->plugin;
2591   struct GNUNET_TIME_Relative left;
2592
2593   s->timeout_task = NULL;
2594   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
2595   if (left.rel_value_us > 0)
2596   {
2597     /* not actually our turn yet, but let's at least update
2598        the monitor, it may think we're about to die ... */
2599     notify_session_monitor (s->plugin,
2600                             s,
2601                             GNUNET_TRANSPORT_SS_UPDATE);
2602     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
2603                                                     &session_timeout,
2604                                                     s);
2605     return;
2606   }
2607   LOG (GNUNET_ERROR_TYPE_DEBUG,
2608        "Session %p was idle for %s, disconnecting\n",
2609        s,
2610        GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
2611                                                GNUNET_YES));
2612   /* call session destroy function */
2613   udp_disconnect_session (plugin,
2614                           s);
2615 }
2616
2617
2618 /**
2619  * Allocate a new session for the given endpoint address.
2620  * Note that this function does not inform the service
2621  * of the new session, this is the responsibility of the
2622  * caller (if needed).
2623  *
2624  * @param cls the `struct Plugin`
2625  * @param address address of the other peer to use
2626  * @param network_type network type the address belongs to
2627  * @return NULL on error, otherwise session handle
2628  */
2629 static struct GNUNET_ATS_Session *
2630 udp_plugin_create_session (void *cls,
2631                            const struct GNUNET_HELLO_Address *address,
2632                            enum GNUNET_NetworkType network_type)
2633 {
2634   struct Plugin *plugin = cls;
2635   struct GNUNET_ATS_Session *s;
2636
2637   s = GNUNET_new (struct GNUNET_ATS_Session);
2638   s->mst = GNUNET_MST_create (&process_inbound_tokenized_messages,
2639                               s);
2640   s->plugin = plugin;
2641   s->address = GNUNET_HELLO_address_copy (address);
2642   s->target = address->peer;
2643   s->last_transmit_time = GNUNET_TIME_absolute_get ();
2644   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2645                                                               250);
2646   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
2647   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
2648   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
2649   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
2650   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
2651                                                   &session_timeout,
2652                                                   s);
2653   s->scope = network_type;
2654
2655   LOG (GNUNET_ERROR_TYPE_DEBUG,
2656        "Creating new session %p for peer `%s' address `%s'\n",
2657        s,
2658        GNUNET_i2s (&address->peer),
2659        udp_address_to_string (plugin,
2660                               address->address,
2661                               address->address_length));
2662   GNUNET_assert (GNUNET_OK ==
2663                  GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
2664                                                     &s->target,
2665                                                     s,
2666                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2667   GNUNET_STATISTICS_set (plugin->env->stats,
2668                          "# UDP sessions active",
2669                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2670                          GNUNET_NO);
2671   notify_session_monitor (plugin,
2672                           s,
2673                           GNUNET_TRANSPORT_SS_INIT);
2674   return s;
2675 }
2676
2677
2678 /**
2679  * Creates a new outbound session the transport service will use to
2680  * send data to the peer.
2681  *
2682  * @param cls the `struct Plugin *`
2683  * @param address the address
2684  * @return the session or NULL of max connections exceeded
2685  */
2686 static struct GNUNET_ATS_Session *
2687 udp_plugin_get_session (void *cls,
2688                         const struct GNUNET_HELLO_Address *address)
2689 {
2690   struct Plugin *plugin = cls;
2691   struct GNUNET_ATS_Session *s;
2692   enum GNUNET_NetworkType network_type = GNUNET_NT_UNSPECIFIED;
2693   const struct IPv4UdpAddress *udp_v4;
2694   const struct IPv6UdpAddress *udp_v6;
2695
2696   if (NULL == address)
2697   {
2698     GNUNET_break (0);
2699     return NULL;
2700   }
2701   if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
2702        (address->address_length != sizeof(struct IPv6UdpAddress)) )
2703   {
2704     GNUNET_break_op (0);
2705     return NULL;
2706   }
2707   if (NULL != (s = udp_plugin_lookup_session (cls,
2708                                               address)))
2709     return s;
2710
2711   /* need to create new session */
2712   if (sizeof (struct IPv4UdpAddress) == address->address_length)
2713   {
2714     struct sockaddr_in v4;
2715
2716     udp_v4 = (const struct IPv4UdpAddress *) address->address;
2717     memset (&v4, '\0', sizeof (v4));
2718     v4.sin_family = AF_INET;
2719 #if HAVE_SOCKADDR_IN_SIN_LEN
2720     v4.sin_len = sizeof (struct sockaddr_in);
2721 #endif
2722     v4.sin_port = udp_v4->u4_port;
2723     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
2724     network_type = plugin->env->get_address_type (plugin->env->cls,
2725                                                   (const struct sockaddr *) &v4,
2726                                                   sizeof (v4));
2727   }
2728   if (sizeof (struct IPv6UdpAddress) == address->address_length)
2729   {
2730     struct sockaddr_in6 v6;
2731
2732     udp_v6 = (const struct IPv6UdpAddress *) address->address;
2733     memset (&v6, '\0', sizeof (v6));
2734     v6.sin6_family = AF_INET6;
2735 #if HAVE_SOCKADDR_IN_SIN_LEN
2736     v6.sin6_len = sizeof (struct sockaddr_in6);
2737 #endif
2738     v6.sin6_port = udp_v6->u6_port;
2739     v6.sin6_addr = udp_v6->ipv6_addr;
2740     network_type = plugin->env->get_address_type (plugin->env->cls,
2741                                                   (const struct sockaddr *) &v6,
2742                                                   sizeof (v6));
2743   }
2744   GNUNET_break (GNUNET_NT_UNSPECIFIED != network_type);
2745   return udp_plugin_create_session (cls,
2746                                     address,
2747                                     network_type);
2748 }
2749
2750
2751 /**
2752  * We've received a UDP Message.  Process it (pass contents to main service).
2753  *
2754  * @param plugin plugin context
2755  * @param msg the message
2756  * @param udp_addr sender address
2757  * @param udp_addr_len number of bytes in @a udp_addr
2758  * @param network_type network type the address belongs to
2759  */
2760 static void
2761 process_udp_message (struct Plugin *plugin,
2762                      const struct UDPMessage *msg,
2763                      const union UdpAddress *udp_addr,
2764                      size_t udp_addr_len,
2765                      enum GNUNET_NetworkType network_type)
2766 {
2767   struct GNUNET_ATS_Session *s;
2768   struct GNUNET_HELLO_Address *address;
2769
2770   GNUNET_break (GNUNET_NT_UNSPECIFIED != network_type);
2771   if (0 != ntohl (msg->reserved))
2772   {
2773     GNUNET_break_op(0);
2774     return;
2775   }
2776   if (ntohs (msg->header.size)
2777       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2778   {
2779     GNUNET_break_op(0);
2780     return;
2781   }
2782
2783   address = GNUNET_HELLO_address_allocate (&msg->sender,
2784                                            PLUGIN_NAME,
2785                                            udp_addr,
2786                                            udp_addr_len,
2787                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2788   if (NULL ==
2789       (s = udp_plugin_lookup_session (plugin,
2790                                       address)))
2791   {
2792     s = udp_plugin_create_session (plugin,
2793                                    address,
2794                                    network_type);
2795     plugin->env->session_start (plugin->env->cls,
2796                                 address,
2797                                 s,
2798                                 s->scope);
2799     notify_session_monitor (plugin,
2800                             s,
2801                             GNUNET_TRANSPORT_SS_UP);
2802   }
2803   GNUNET_free (address);
2804
2805   s->rc++;
2806   GNUNET_MST_from_buffer (s->mst,
2807                           (const char *) &msg[1],
2808                           ntohs (msg->header.size) - sizeof(struct UDPMessage),
2809                           GNUNET_YES,
2810                           GNUNET_NO);
2811   s->rc--;
2812   if ( (0 == s->rc) &&
2813        (GNUNET_YES == s->in_destroy) )
2814     free_session (s);
2815 }
2816
2817
2818 /**
2819  * Process a defragmented message.
2820  *
2821  * @param cls the `struct DefragContext *`
2822  * @param msg the message
2823  */
2824 static void
2825 fragment_msg_proc (void *cls,
2826                    const struct GNUNET_MessageHeader *msg)
2827 {
2828   struct DefragContext *dc = cls;
2829   const struct UDPMessage *um;
2830
2831   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2832   {
2833     GNUNET_break_op (0);
2834     return;
2835   }
2836   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2837   {
2838     GNUNET_break_op (0);
2839     return;
2840   }
2841   um = (const struct UDPMessage *) msg;
2842   dc->sender = um->sender;
2843   dc->have_sender = GNUNET_YES;
2844   process_udp_message (dc->plugin,
2845                        um,
2846                        dc->udp_addr,
2847                        dc->udp_addr_len,
2848                        dc->network_type);
2849 }
2850
2851
2852 /**
2853  * We finished sending an acknowledgement.  Update
2854  * statistics.
2855  *
2856  * @param cls the `struct Plugin`
2857  * @param udpw message queue entry of the ACK
2858  * @param result #GNUNET_OK if the transmission worked,
2859  *               #GNUNET_SYSERR if we failed to send the ACK
2860  */
2861 static void
2862 ack_message_sent (void *cls,
2863                   struct UDP_MessageWrapper *udpw,
2864                   int result)
2865 {
2866   struct Plugin *plugin = cls;
2867
2868   if (GNUNET_OK == result)
2869   {
2870     GNUNET_STATISTICS_update (plugin->env->stats,
2871                               "# UDP, ACK messages sent",
2872                               1,
2873                               GNUNET_NO);
2874   }
2875   else
2876   {
2877     GNUNET_STATISTICS_update (plugin->env->stats,
2878                               "# UDP, ACK transmissions failed",
2879                               1,
2880                               GNUNET_NO);
2881   }
2882 }
2883
2884
2885 /**
2886  * Transmit an acknowledgement.
2887  *
2888  * @param cls the `struct DefragContext *`
2889  * @param id message ID (unused)
2890  * @param msg ack to transmit
2891  */
2892 static void
2893 ack_proc (void *cls,
2894           uint32_t id,
2895           const struct GNUNET_MessageHeader *msg)
2896 {
2897   struct DefragContext *rc = cls;
2898   struct Plugin *plugin = rc->plugin;
2899   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2900   struct UDP_ACK_Message *udp_ack;
2901   uint32_t delay;
2902   struct UDP_MessageWrapper *udpw;
2903   struct GNUNET_ATS_Session *s;
2904   struct GNUNET_HELLO_Address *address;
2905
2906   if (GNUNET_NO == rc->have_sender)
2907   {
2908     /* tried to defragment but never succeeded, hence will not ACK */
2909     /* This can happen if we just lost msgs */
2910     GNUNET_STATISTICS_update (plugin->env->stats,
2911                               "# UDP, fragments discarded without ACK",
2912                               1,
2913                               GNUNET_NO);
2914     return;
2915   }
2916   address = GNUNET_HELLO_address_allocate (&rc->sender,
2917                                            PLUGIN_NAME,
2918                                            rc->udp_addr,
2919                                            rc->udp_addr_len,
2920                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2921   s = udp_plugin_lookup_session (plugin,
2922                                  address);
2923   GNUNET_HELLO_address_free (address);
2924   if (NULL == s)
2925   {
2926     LOG (GNUNET_ERROR_TYPE_ERROR,
2927          "Trying to transmit ACK to peer `%s' but no session found!\n",
2928          udp_address_to_string (plugin,
2929                                 rc->udp_addr,
2930                                 rc->udp_addr_len));
2931     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2932     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2933     GNUNET_free (rc);
2934     GNUNET_STATISTICS_update (plugin->env->stats,
2935                               "# UDP, ACK transmissions failed",
2936                               1,
2937                               GNUNET_NO);
2938     return;
2939   }
2940   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us ==
2941       s->flow_delay_for_other_peer.rel_value_us)
2942     delay = UINT32_MAX;
2943   else if (s->flow_delay_for_other_peer.rel_value_us < UINT32_MAX)
2944     delay = s->flow_delay_for_other_peer.rel_value_us;
2945   else
2946     delay = UINT32_MAX - 1; /* largest value we can communicate */
2947   LOG (GNUNET_ERROR_TYPE_DEBUG,
2948        "Sending ACK to `%s' including delay of %s\n",
2949        udp_address_to_string (plugin,
2950                               rc->udp_addr,
2951                               rc->udp_addr_len),
2952        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2953                                                GNUNET_YES));
2954   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2955   udpw->msg_size = msize;
2956   udpw->payload_size = 0;
2957   udpw->session = s;
2958   udpw->start_time = GNUNET_TIME_absolute_get ();
2959   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2960   udpw->msg_buf = (char *) &udpw[1];
2961   udpw->qc = &ack_message_sent;
2962   udpw->qc_cls = plugin;
2963   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2964   udp_ack->header.size = htons ((uint16_t) msize);
2965   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2966   udp_ack->delay = htonl (delay);
2967   udp_ack->sender = *plugin->env->my_identity;
2968   GNUNET_memcpy (&udp_ack[1],
2969           msg,
2970           ntohs (msg->size));
2971   enqueue (plugin,
2972            udpw);
2973   notify_session_monitor (plugin,
2974                           s,
2975                           GNUNET_TRANSPORT_SS_UPDATE);
2976   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2977     schedule_select_v4 (plugin);
2978   else
2979     schedule_select_v6 (plugin);
2980 }
2981
2982
2983 /**
2984  * We received a fragment, process it.
2985  *
2986  * @param plugin our plugin
2987  * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2988  * @param udp_addr sender address
2989  * @param udp_addr_len number of bytes in @a udp_addr
2990  * @param network_type network type the address belongs to
2991  */
2992 static void
2993 read_process_fragment (struct Plugin *plugin,
2994                        const struct GNUNET_MessageHeader *msg,
2995                        const union UdpAddress *udp_addr,
2996                        size_t udp_addr_len,
2997                        enum GNUNET_NetworkType network_type)
2998 {
2999   struct DefragContext *d_ctx;
3000   struct GNUNET_TIME_Absolute now;
3001   struct FindReceiveContext frc;
3002
3003   frc.rc = NULL;
3004   frc.udp_addr = udp_addr;
3005   frc.udp_addr_len = udp_addr_len;
3006
3007   /* Lookup existing receive context for this address */
3008   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
3009                                  &find_receive_context,
3010                                  &frc);
3011   now = GNUNET_TIME_absolute_get ();
3012   d_ctx = frc.rc;
3013
3014   if (NULL == d_ctx)
3015   {
3016     /* Create a new defragmentation context */
3017     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
3018     GNUNET_memcpy (&d_ctx[1],
3019             udp_addr,
3020             udp_addr_len);
3021     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
3022     d_ctx->udp_addr_len = udp_addr_len;
3023     d_ctx->network_type = network_type;
3024     d_ctx->plugin = plugin;
3025     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
3026                                                       UDP_MTU,
3027                                                       UDP_MAX_MESSAGES_IN_DEFRAG,
3028                                                       d_ctx,
3029                                                       &fragment_msg_proc,
3030                                                       &ack_proc);
3031     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
3032                                                  d_ctx,
3033                                                  (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
3034     LOG (GNUNET_ERROR_TYPE_DEBUG,
3035          "Created new defragmentation context for %u-byte fragment from `%s'\n",
3036          (unsigned int) ntohs (msg->size),
3037          udp_address_to_string (plugin,
3038                                 udp_addr,
3039                                 udp_addr_len));
3040   }
3041   else
3042   {
3043     LOG (GNUNET_ERROR_TYPE_DEBUG,
3044          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
3045          (unsigned int) ntohs (msg->size),
3046          udp_address_to_string (plugin,
3047                                 udp_addr,
3048                                 udp_addr_len));
3049   }
3050
3051   if (GNUNET_OK ==
3052       GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag,
3053                                           msg))
3054   {
3055     /* keep this 'rc' from expiring */
3056     GNUNET_CONTAINER_heap_update_cost (d_ctx->hnode,
3057                                        (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
3058   }
3059   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
3060       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
3061   {
3062     /* remove 'rc' that was inactive the longest */
3063     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
3064     GNUNET_assert (NULL != d_ctx);
3065     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3066     GNUNET_free (d_ctx);
3067     GNUNET_STATISTICS_update (plugin->env->stats,
3068                               "# UDP, Defragmentations aborted",
3069                               1,
3070                               GNUNET_NO);
3071   }
3072 }
3073
3074
3075 /**
3076  * Read and process a message from the given socket.
3077  *
3078  * @param plugin the overall plugin
3079  * @param rsock socket to read from
3080  */
3081 static void
3082 udp_select_read (struct Plugin *plugin,
3083                  struct GNUNET_NETWORK_Handle *rsock)
3084 {
3085   socklen_t fromlen;
3086   struct sockaddr_storage addr;
3087   char buf[65536] GNUNET_ALIGN;
3088   ssize_t size;
3089   const struct GNUNET_MessageHeader *msg;
3090   struct IPv4UdpAddress v4;
3091   struct IPv6UdpAddress v6;
3092   const struct sockaddr *sa;
3093   const struct sockaddr_in *sa4;
3094   const struct sockaddr_in6 *sa6;
3095   const union UdpAddress *int_addr;
3096   size_t int_addr_len;
3097   enum GNUNET_NetworkType network_type;
3098
3099   fromlen = sizeof (addr);
3100   memset (&addr,
3101           0,
3102           sizeof(addr));
3103   size = GNUNET_NETWORK_socket_recvfrom (rsock,
3104                                          buf,
3105                                          sizeof (buf),
3106                                          (struct sockaddr *) &addr,
3107                                          &fromlen);
3108   sa = (const struct sockaddr *) &addr;
3109 #if MINGW
3110   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
3111    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
3112    * on this socket has failed.
3113    * Quote from MSDN:
3114    *   WSAECONNRESET - The virtual circuit was reset by the remote side
3115    *   executing a hard or abortive close. The application should close
3116    *   the socket; it is no longer usable. On a UDP-datagram socket this
3117    *   error indicates a previous send operation resulted in an ICMP Port
3118    *   Unreachable message.
3119    */
3120   if ( (-1 == size) &&
3121        (ECONNRESET == errno) )
3122     return;
3123 #endif
3124   if (-1 == size)
3125   {
3126     LOG (GNUNET_ERROR_TYPE_DEBUG,
3127          "UDP failed to receive data: %s\n",
3128          STRERROR (errno));
3129     /* Connection failure or something. Not a protocol violation. */
3130     return;
3131   }
3132
3133   /* Check if this is a STUN packet */
3134   if (GNUNET_NO !=
3135       GNUNET_NAT_stun_handle_packet (plugin->nat,
3136                                      (const struct sockaddr *) &addr,
3137                                      fromlen,
3138                                      buf,
3139                                      size))
3140     return; /* was STUN, do not process further */
3141
3142   if (size < sizeof(struct GNUNET_MessageHeader))
3143   {
3144     LOG (GNUNET_ERROR_TYPE_WARNING,
3145          "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
3146          (unsigned int ) size,
3147          GNUNET_a2s (sa,
3148                      fromlen));
3149     /* _MAY_ be a connection failure (got partial message) */
3150     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
3151     GNUNET_break_op (0);
3152     return;
3153   }
3154
3155   msg = (const struct GNUNET_MessageHeader *) buf;
3156   LOG (GNUNET_ERROR_TYPE_DEBUG,
3157        "UDP received %u-byte message from `%s' type %u\n",
3158        (unsigned int) size,
3159        GNUNET_a2s (sa,
3160                    fromlen),
3161        ntohs (msg->type));
3162   if (size != ntohs (msg->size))
3163   {
3164     LOG (GNUNET_ERROR_TYPE_WARNING,
3165          "UDP malformed message (size %u) header from %s\n",
3166          (unsigned int) size,
3167          GNUNET_a2s (sa,
3168                      fromlen));
3169     GNUNET_break_op (0);
3170     return;
3171   }
3172   GNUNET_STATISTICS_update (plugin->env->stats,
3173                             "# UDP, total bytes received",
3174                             size,
3175                             GNUNET_NO);
3176   network_type = plugin->env->get_address_type (plugin->env->cls,
3177                                                 sa,
3178                                                 fromlen);
3179   switch (sa->sa_family)
3180   {
3181   case AF_INET:
3182     sa4 = (const struct sockaddr_in *) &addr;
3183     v4.options = 0;
3184     v4.ipv4_addr = sa4->sin_addr.s_addr;
3185     v4.u4_port = sa4->sin_port;
3186     int_addr = (union UdpAddress *) &v4;
3187     int_addr_len = sizeof (v4);
3188     break;
3189   case AF_INET6:
3190     sa6 = (const struct sockaddr_in6 *) &addr;
3191     v6.options = 0;
3192     v6.ipv6_addr = sa6->sin6_addr;
3193     v6.u6_port = sa6->sin6_port;
3194     int_addr = (union UdpAddress *) &v6;
3195     int_addr_len = sizeof (v6);
3196     break;
3197   default:
3198     GNUNET_break (0);
3199     return;
3200   }
3201
3202   switch (ntohs (msg->type))
3203   {
3204   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
3205     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
3206       udp_broadcast_receive (plugin,
3207                              buf,
3208                              size,
3209                              int_addr,
3210                              int_addr_len,
3211                              network_type);
3212     return;
3213   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
3214     if (ntohs (msg->size) < sizeof(struct UDPMessage))
3215     {
3216       GNUNET_break_op(0);
3217       return;
3218     }
3219     process_udp_message (plugin,
3220                          (const struct UDPMessage *) msg,
3221                          int_addr,
3222                          int_addr_len,
3223                          network_type);
3224     return;
3225   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
3226     read_process_ack (plugin,
3227                       msg,
3228                       int_addr,
3229                       int_addr_len);
3230     return;
3231   case GNUNET_MESSAGE_TYPE_FRAGMENT:
3232     read_process_fragment (plugin,
3233                            msg,
3234                            int_addr,
3235                            int_addr_len,
3236                            network_type);
3237     return;
3238   default:
3239     GNUNET_break_op(0);
3240     return;
3241   }
3242 }
3243
3244
3245 /**
3246  * Removes messages from the transmission queue that have
3247  * timed out, and then selects a message that should be
3248  * transmitted next.
3249  *
3250  * @param plugin the UDP plugin
3251  * @param sock which socket should we process the queue for (v4 or v6)
3252  * @return message selected for transmission, or NULL for none
3253  */
3254 static struct UDP_MessageWrapper *
3255 remove_timeout_messages_and_select (struct Plugin *plugin,
3256                                     struct GNUNET_NETWORK_Handle *sock)
3257 {
3258   struct UDP_MessageWrapper *udpw;
3259   struct GNUNET_TIME_Relative remaining;
3260   struct GNUNET_ATS_Session *session;
3261   int removed;
3262
3263   removed = GNUNET_NO;
3264   udpw = (sock == plugin->sockv4)
3265     ? plugin->ipv4_queue_head
3266     : plugin->ipv6_queue_head;
3267   while (NULL != udpw)
3268   {
3269     session = udpw->session;
3270     /* Find messages with timeout */
3271     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
3272     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
3273     {
3274       /* Message timed out */
3275       removed = GNUNET_YES;
3276       dequeue (plugin,
3277                udpw);
3278       udpw->qc (udpw->qc_cls,
3279                 udpw,
3280                 GNUNET_SYSERR);
3281       GNUNET_free (udpw);
3282
3283       if (sock == plugin->sockv4)
3284       {
3285         udpw = plugin->ipv4_queue_head;
3286       }
3287       else if (sock == plugin->sockv6)
3288       {
3289         udpw = plugin->ipv6_queue_head;
3290       }
3291       else
3292       {
3293         GNUNET_break (0); /* should never happen */
3294         udpw = NULL;
3295       }
3296       GNUNET_STATISTICS_update (plugin->env->stats,
3297                                 "# messages discarded due to timeout",
3298                                 1,
3299                                 GNUNET_NO);
3300     }
3301     else
3302     {
3303       /* Message did not time out, check transmission time */
3304       remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
3305       if (0 == remaining.rel_value_us)
3306       {
3307         /* this message is not delayed */
3308         LOG (GNUNET_ERROR_TYPE_DEBUG,
3309              "Message for peer `%s' (%u bytes) is not delayed \n",
3310              GNUNET_i2s (&udpw->session->target),
3311              udpw->payload_size);
3312         break; /* Found message to send, break */
3313       }
3314       else
3315       {
3316         /* Message is delayed, try next */
3317         LOG (GNUNET_ERROR_TYPE_DEBUG,
3318              "Message for peer `%s' (%u bytes) is delayed for %s\n",
3319              GNUNET_i2s (&udpw->session->target),
3320              udpw->payload_size,
3321              GNUNET_STRINGS_relative_time_to_string (remaining,
3322                                                      GNUNET_YES));
3323         udpw = udpw->next;
3324       }
3325     }
3326   }
3327   if (GNUNET_YES == removed)
3328     notify_session_monitor (session->plugin,
3329                             session,
3330                             GNUNET_TRANSPORT_SS_UPDATE);
3331   return udpw;
3332 }
3333
3334
3335 /**
3336  * We failed to transmit a message via UDP. Generate
3337  * a descriptive error message.
3338  *
3339  * @param plugin our plugin
3340  * @param sa target address we were trying to reach
3341  * @param slen number of bytes in @a sa
3342  * @param error the errno value returned from the sendto() call
3343  */
3344 static void
3345 analyze_send_error (struct Plugin *plugin,
3346                     const struct sockaddr *sa,
3347                     socklen_t slen,
3348                     int error)
3349 {
3350   enum GNUNET_NetworkType type;
3351
3352   type = plugin->env->get_address_type (plugin->env->cls,
3353                                         sa,
3354                                         slen);
3355   if ( ( (GNUNET_NT_LAN == type) ||
3356          (GNUNET_NT_WAN == type) ) &&
3357        ( (ENETUNREACH == errno) ||
3358          (ENETDOWN == errno) ) )
3359   {
3360     if (slen == sizeof (struct sockaddr_in))
3361     {
3362       /* IPv4: "Network unreachable" or "Network down"
3363        *
3364        * This indicates we do not have connectivity
3365        */
3366       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3367            _("UDP could not transmit message to `%s': "
3368              "Network seems down, please check your network configuration\n"),
3369            GNUNET_a2s (sa,
3370                        slen));
3371     }
3372     if (slen == sizeof (struct sockaddr_in6))
3373     {
3374       /* IPv6: "Network unreachable" or "Network down"
3375        *
3376        * This indicates that this system is IPv6 enabled, but does not
3377        * have a valid global IPv6 address assigned or we do not have
3378        * connectivity
3379        */
3380       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3381            _("UDP could not transmit IPv6 message! "
3382              "Please check your network configuration and disable IPv6 if your "
3383              "connection does not have a global IPv6 address\n"));
3384     }
3385   }
3386   else
3387   {
3388     LOG (GNUNET_ERROR_TYPE_WARNING,
3389          "UDP could not transmit message to `%s': `%s'\n",
3390          GNUNET_a2s (sa,
3391                      slen),
3392          STRERROR (error));
3393   }
3394 }
3395
3396
3397 /**
3398  * It is time to try to transmit a UDP message.  Select one
3399  * and send.
3400  *
3401  * @param plugin the plugin
3402  * @param sock which socket (v4/v6) to send on
3403  */
3404 static void
3405 udp_select_send (struct Plugin *plugin,
3406                  struct GNUNET_NETWORK_Handle *sock)
3407 {
3408   ssize_t sent;
3409   socklen_t slen;
3410   const struct sockaddr *a;
3411   const struct IPv4UdpAddress *u4;
3412   struct sockaddr_in a4;
3413   const struct IPv6UdpAddress *u6;
3414   struct sockaddr_in6 a6;
3415   struct UDP_MessageWrapper *udpw;
3416
3417   /* Find message(s) to send */
3418   while (NULL != (udpw = remove_timeout_messages_and_select (plugin,
3419                                                              sock)))
3420   {
3421     if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
3422     {
3423       u4 = udpw->session->address->address;
3424       memset (&a4,
3425               0,
3426               sizeof(a4));
3427       a4.sin_family = AF_INET;
3428 #if HAVE_SOCKADDR_IN_SIN_LEN
3429       a4.sin_len = sizeof (a4);
3430 #endif
3431       a4.sin_port = u4->u4_port;
3432       a4.sin_addr.s_addr = u4->ipv4_addr;
3433       a = (const struct sockaddr *) &a4;
3434       slen = sizeof (a4);
3435     }
3436     else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
3437     {
3438       u6 = udpw->session->address->address;
3439       memset (&a6,
3440               0,
3441               sizeof(a6));
3442       a6.sin6_family = AF_INET6;
3443 #if HAVE_SOCKADDR_IN_SIN_LEN
3444       a6.sin6_len = sizeof (a6);
3445 #endif
3446       a6.sin6_port = u6->u6_port;
3447       a6.sin6_addr = u6->ipv6_addr;
3448       a = (const struct sockaddr *) &a6;
3449       slen = sizeof (a6);
3450     }
3451     else
3452     {
3453       GNUNET_break (0);
3454       dequeue (plugin,
3455                udpw);
3456       udpw->qc (udpw->qc_cls,
3457                 udpw,
3458                 GNUNET_SYSERR);
3459       notify_session_monitor (plugin,
3460                               udpw->session,
3461                               GNUNET_TRANSPORT_SS_UPDATE);
3462       GNUNET_free (udpw);
3463       continue;
3464     }
3465     sent = GNUNET_NETWORK_socket_sendto (sock,
3466                                          udpw->msg_buf,
3467                                          udpw->msg_size,
3468                                          a,
3469                                          slen);
3470     udpw->session->last_transmit_time
3471       = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
3472                                   udpw->session->last_transmit_time);
3473     dequeue (plugin,
3474              udpw);
3475     if (GNUNET_SYSERR == sent)
3476     {
3477       /* Failure */
3478       analyze_send_error (plugin,
3479                           a,
3480                           slen,
3481                           errno);
3482       udpw->qc (udpw->qc_cls,
3483                 udpw,
3484                 GNUNET_SYSERR);
3485       GNUNET_STATISTICS_update (plugin->env->stats,
3486                                 "# UDP, total, bytes, sent, failure",
3487                                 sent,
3488                                 GNUNET_NO);
3489       GNUNET_STATISTICS_update (plugin->env->stats,
3490                                 "# UDP, total, messages, sent, failure",
3491                                 1,
3492                                 GNUNET_NO);
3493     }
3494     else
3495     {
3496       /* Success */
3497       LOG (GNUNET_ERROR_TYPE_DEBUG,
3498            "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
3499            (unsigned int) (udpw->msg_size),
3500            GNUNET_i2s (&udpw->session->target),
3501            GNUNET_a2s (a,
3502                        slen),
3503            (int ) sent,
3504            (sent < 0) ? STRERROR (errno) : "ok");
3505       GNUNET_STATISTICS_update (plugin->env->stats,
3506                                 "# UDP, total, bytes, sent, success",
3507                                 sent,
3508                                 GNUNET_NO);
3509       GNUNET_STATISTICS_update (plugin->env->stats,
3510                                 "# UDP, total, messages, sent, success",
3511                                 1,
3512                                 GNUNET_NO);
3513       if (NULL != udpw->frag_ctx)
3514         udpw->frag_ctx->on_wire_size += udpw->msg_size;
3515       udpw->qc (udpw->qc_cls,
3516                 udpw,
3517                 GNUNET_OK);
3518     }
3519     notify_session_monitor (plugin,
3520                             udpw->session,
3521                             GNUNET_TRANSPORT_SS_UPDATE);
3522     GNUNET_free (udpw);
3523   }
3524 }
3525
3526
3527 /* ***************** Event loop (part 2) *************** */
3528
3529
3530 /**
3531  * We have been notified that our readset has something to read.  We don't
3532  * know which socket needs to be read, so we have to check each one
3533  * Then reschedule this function to be called again once more is available.
3534  *
3535  * @param cls the plugin handle
3536  */
3537 static void
3538 udp_plugin_select_v4 (void *cls)
3539 {
3540   struct Plugin *plugin = cls;
3541   const struct GNUNET_SCHEDULER_TaskContext *tc;
3542
3543   plugin->select_task_v4 = NULL;
3544   if (NULL == plugin->sockv4)
3545     return;
3546   tc = GNUNET_SCHEDULER_get_task_context ();
3547   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3548       (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3549                                    plugin->sockv4)))
3550     udp_select_read (plugin,
3551                      plugin->sockv4);
3552   udp_select_send (plugin,
3553                    plugin->sockv4);
3554   schedule_select_v4 (plugin);
3555 }
3556
3557
3558 /**
3559  * We have been notified that our readset has something to read.  We don't
3560  * know which socket needs to be read, so we have to check each one
3561  * Then reschedule this function to be called again once more is available.
3562  *
3563  * @param cls the plugin handle
3564  */
3565 static void
3566 udp_plugin_select_v6 (void *cls)
3567 {
3568   struct Plugin *plugin = cls;
3569   const struct GNUNET_SCHEDULER_TaskContext *tc;
3570
3571   plugin->select_task_v6 = NULL;
3572   if (NULL == plugin->sockv6)
3573     return;
3574   tc = GNUNET_SCHEDULER_get_task_context ();
3575   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3576        (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3577                                     plugin->sockv6)) )
3578     udp_select_read (plugin,
3579                      plugin->sockv6);
3580
3581   udp_select_send (plugin,
3582                    plugin->sockv6);
3583   schedule_select_v6 (plugin);
3584 }
3585
3586
3587 /* ******************* Initialization *************** */
3588
3589
3590 /**
3591  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3592  *
3593  * @param plugin the plugin to initialize
3594  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3595  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3596  * @return number of sockets that were successfully bound
3597  */
3598 static unsigned int
3599 setup_sockets (struct Plugin *plugin,
3600                const struct sockaddr_in6 *bind_v6,
3601                const struct sockaddr_in *bind_v4)
3602 {
3603   int tries;
3604   unsigned int sockets_created = 0;
3605   struct sockaddr_in6 server_addrv6;
3606   struct sockaddr_in server_addrv4;
3607   const struct sockaddr *server_addr;
3608   const struct sockaddr *addrs[2];
3609   socklen_t addrlens[2];
3610   socklen_t addrlen;
3611   int eno;
3612
3613   /* Create IPv6 socket */
3614   eno = EINVAL;
3615   if (GNUNET_YES == plugin->enable_ipv6)
3616   {
3617     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6,
3618                                                    SOCK_DGRAM,
3619                                                    0);
3620     if (NULL == plugin->sockv6)
3621     {
3622       LOG (GNUNET_ERROR_TYPE_INFO,
3623            _("Disabling IPv6 since it is not supported on this system!\n"));
3624       plugin->enable_ipv6 = GNUNET_NO;
3625     }
3626     else
3627     {
3628       memset (&server_addrv6,
3629               0,
3630               sizeof(struct sockaddr_in6));
3631 #if HAVE_SOCKADDR_IN_SIN_LEN
3632       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3633 #endif
3634       server_addrv6.sin6_family = AF_INET6;
3635       if (NULL != bind_v6)
3636         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3637       else
3638         server_addrv6.sin6_addr = in6addr_any;
3639
3640       if (0 == plugin->port) /* autodetect */
3641         server_addrv6.sin6_port
3642           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3643                                              33537)
3644                    + 32000);
3645       else
3646         server_addrv6.sin6_port = htons (plugin->port);
3647       addrlen = sizeof (struct sockaddr_in6);
3648       server_addr = (const struct sockaddr *) &server_addrv6;
3649
3650       tries = 0;
3651       while (tries < 10)
3652       {
3653         LOG(GNUNET_ERROR_TYPE_DEBUG,
3654             "Binding to IPv6 `%s'\n",
3655             GNUNET_a2s (server_addr,
3656                         addrlen));
3657         /* binding */
3658         if (GNUNET_OK ==
3659             GNUNET_NETWORK_socket_bind (plugin->sockv6,
3660                                         server_addr,
3661                                         addrlen))
3662           break;
3663         eno = errno;
3664         if (0 != plugin->port)
3665         {
3666           tries = 10; /* fail immediately */
3667           break; /* bind failed on specific port */
3668         }
3669         /* autodetect */
3670         server_addrv6.sin6_port
3671           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3672                                              33537)
3673                    + 32000);
3674         tries++;
3675       }
3676       if (tries >= 10)
3677       {
3678         GNUNET_NETWORK_socket_close (plugin->sockv6);
3679         plugin->enable_ipv6 = GNUNET_NO;
3680         plugin->sockv6 = NULL;
3681       }
3682       else
3683       {
3684         plugin->port = ntohs (server_addrv6.sin6_port);
3685       }
3686       if (NULL != plugin->sockv6)
3687       {
3688         LOG (GNUNET_ERROR_TYPE_DEBUG,
3689              "IPv6 UDP socket created listinging at %s\n",
3690              GNUNET_a2s (server_addr,
3691                          addrlen));
3692         addrs[sockets_created] = server_addr;
3693         addrlens[sockets_created] = addrlen;
3694         sockets_created++;
3695       }
3696       else
3697       {
3698         LOG (GNUNET_ERROR_TYPE_WARNING,
3699              _("Failed to bind UDP socket to %s: %s\n"),
3700              GNUNET_a2s (server_addr,
3701                          addrlen),
3702              STRERROR (eno));
3703       }
3704     }
3705   }
3706
3707   /* Create IPv4 socket */
3708   eno = EINVAL;
3709   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET,
3710                                                  SOCK_DGRAM,
3711                                                  0);
3712   if (NULL == plugin->sockv4)
3713   {
3714     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3715                          "socket");
3716     LOG (GNUNET_ERROR_TYPE_INFO,
3717          _("Disabling IPv4 since it is not supported on this system!\n"));
3718     plugin->enable_ipv4 = GNUNET_NO;
3719   }
3720   else
3721   {
3722     memset (&server_addrv4,
3723             0,
3724             sizeof(struct sockaddr_in));
3725 #if HAVE_SOCKADDR_IN_SIN_LEN
3726     server_addrv4.sin_len = sizeof (struct sockaddr_in);
3727 #endif
3728     server_addrv4.sin_family = AF_INET;
3729     if (NULL != bind_v4)
3730       server_addrv4.sin_addr = bind_v4->sin_addr;
3731     else
3732       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3733
3734     if (0 == plugin->port)
3735       /* autodetect */
3736       server_addrv4.sin_port
3737         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3738                                            33537)
3739                  + 32000);
3740     else
3741       server_addrv4.sin_port = htons (plugin->port);
3742
3743     addrlen = sizeof (struct sockaddr_in);
3744     server_addr = (const struct sockaddr *) &server_addrv4;
3745
3746     tries = 0;
3747     while (tries < 10)
3748     {
3749       LOG (GNUNET_ERROR_TYPE_DEBUG,
3750            "Binding to IPv4 `%s'\n",
3751            GNUNET_a2s (server_addr,
3752                        addrlen));
3753
3754       /* binding */
3755       if (GNUNET_OK ==
3756           GNUNET_NETWORK_socket_bind (plugin->sockv4,
3757                                       server_addr,
3758                                       addrlen))
3759         break;
3760       eno = errno;
3761       if (0 != plugin->port)
3762       {
3763         tries = 10; /* fail */
3764         break; /* bind failed on specific port */
3765       }
3766
3767       /* autodetect */
3768       server_addrv4.sin_port
3769         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3770                                            33537)
3771                  + 32000);
3772       tries++;
3773     }
3774     if (tries >= 10)
3775     {
3776       GNUNET_NETWORK_socket_close (plugin->sockv4);
3777       plugin->enable_ipv4 = GNUNET_NO;
3778       plugin->sockv4 = NULL;
3779     }
3780     else
3781     {
3782       plugin->port = ntohs (server_addrv4.sin_port);
3783     }
3784
3785     if (NULL != plugin->sockv4)
3786     {
3787       LOG (GNUNET_ERROR_TYPE_DEBUG,
3788            "IPv4 socket created on port %s\n",
3789            GNUNET_a2s (server_addr,
3790                        addrlen));
3791       addrs[sockets_created] = server_addr;
3792       addrlens[sockets_created] = addrlen;
3793       sockets_created++;
3794     }
3795     else
3796     {
3797       LOG (GNUNET_ERROR_TYPE_ERROR,
3798            _("Failed to bind UDP socket to %s: %s\n"),
3799            GNUNET_a2s (server_addr,
3800                        addrlen),
3801            STRERROR (eno));
3802     }
3803   }
3804
3805   if (0 == sockets_created)
3806   {
3807     LOG (GNUNET_ERROR_TYPE_WARNING,
3808          _("Failed to open UDP sockets\n"));
3809     return 0; /* No sockets created, return */
3810   }
3811   schedule_select_v4 (plugin);
3812   schedule_select_v6 (plugin);
3813   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3814                                      "transport-udp",
3815                                      IPPROTO_UDP,
3816                                      sockets_created,
3817                                      addrs,
3818                                      addrlens,
3819                                      &udp_nat_port_map_callback,
3820                                      NULL,
3821                                      plugin);
3822   return sockets_created;
3823 }
3824
3825
3826 /**
3827  * The exported method. Makes the core api available via a global and
3828  * returns the udp transport API.
3829  *
3830  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3831  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3832  */
3833 void *
3834 libgnunet_plugin_transport_udp_init (void *cls)
3835 {
3836   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3837   struct GNUNET_TRANSPORT_PluginFunctions *api;
3838   struct Plugin *p;
3839   unsigned long long port;
3840   unsigned long long aport;
3841   unsigned long long udp_max_bps;
3842   int enable_v6;
3843   int enable_broadcasting;
3844   int enable_broadcasting_recv;
3845   char *bind4_address;
3846   char *bind6_address;
3847   struct GNUNET_TIME_Relative interval;
3848   struct sockaddr_in server_addrv4;
3849   struct sockaddr_in6 server_addrv6;
3850   unsigned int res;
3851   int have_bind4;
3852   int have_bind6;
3853
3854   if (NULL == env->receive)
3855   {
3856     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3857      initialze the plugin or the API */
3858     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3859     api->cls = NULL;
3860     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3861     api->address_to_string = &udp_address_to_string;
3862     api->string_to_address = &udp_string_to_address;
3863     return api;
3864   }
3865
3866   /* Get port number: port == 0 : autodetect a port,
3867    * > 0 : use this port, not given : 2086 default */
3868   if (GNUNET_OK !=
3869       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3870                                              "transport-udp",
3871                                              "PORT",
3872                                              &port))
3873     port = 2086;
3874   if (port > 65535)
3875   {
3876     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3877                                "transport-udp",
3878                                "PORT",
3879                                _("must be in [0,65535]"));
3880     return NULL;
3881   }
3882   if (GNUNET_OK !=
3883       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3884                                              "transport-udp",
3885                                              "ADVERTISED_PORT",
3886                                              &aport))
3887     aport = port;
3888   if (aport > 65535)
3889   {
3890     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3891                                "transport-udp",
3892                                "ADVERTISED_PORT",
3893                                _("must be in [0,65535]"));
3894     return NULL;
3895   }
3896
3897   if (GNUNET_YES ==
3898       GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3899                                             "nat",
3900                                             "DISABLEV6"))
3901     enable_v6 = GNUNET_NO;
3902   else
3903     enable_v6 = GNUNET_YES;
3904
3905   have_bind4 = GNUNET_NO;
3906   memset (&server_addrv4,
3907           0,
3908           sizeof (server_addrv4));
3909   if (GNUNET_YES ==
3910       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3911                                              "transport-udp",
3912                                              "BINDTO",
3913                                              &bind4_address))
3914   {
3915     LOG (GNUNET_ERROR_TYPE_DEBUG,
3916          "Binding UDP plugin to specific address: `%s'\n",
3917          bind4_address);
3918     if (1 != inet_pton (AF_INET,
3919                         bind4_address,
3920                         &server_addrv4.sin_addr))
3921     {
3922       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3923                                  "transport-udp",
3924                                  "BINDTO",
3925                                  _("must be valid IPv4 address"));
3926       GNUNET_free (bind4_address);
3927       return NULL;
3928     }
3929     have_bind4 = GNUNET_YES;
3930   }
3931   GNUNET_free_non_null (bind4_address);
3932   have_bind6 = GNUNET_NO;
3933   memset (&server_addrv6,
3934           0,
3935           sizeof (server_addrv6));
3936   if (GNUNET_YES ==
3937       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3938                                              "transport-udp",
3939                                              "BINDTO6",
3940                                              &bind6_address))
3941   {
3942     LOG (GNUNET_ERROR_TYPE_DEBUG,
3943          "Binding udp plugin to specific address: `%s'\n",
3944          bind6_address);
3945     if (1 != inet_pton (AF_INET6,
3946                         bind6_address,
3947                         &server_addrv6.sin6_addr))
3948     {
3949       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3950                                  "transport-udp",
3951                                  "BINDTO6",
3952                                  _("must be valid IPv6 address"));
3953       GNUNET_free (bind6_address);
3954       return NULL;
3955     }
3956     have_bind6 = GNUNET_YES;
3957   }
3958   GNUNET_free_non_null (bind6_address);
3959
3960   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3961                                                               "transport-udp",
3962                                                               "BROADCAST");
3963   if (enable_broadcasting == GNUNET_SYSERR)
3964     enable_broadcasting = GNUNET_NO;
3965
3966   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3967                                                                    "transport-udp",
3968                                                                    "BROADCAST_RECEIVE");
3969   if (enable_broadcasting_recv == GNUNET_SYSERR)
3970     enable_broadcasting_recv = GNUNET_YES;
3971
3972   if (GNUNET_SYSERR ==
3973       GNUNET_CONFIGURATION_get_value_time (env->cfg,
3974                                            "transport-udp",
3975                                            "BROADCAST_INTERVAL",
3976                                            &interval))
3977   {
3978     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
3979                                               10);
3980   }
3981   if (GNUNET_OK !=
3982       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3983                                              "transport-udp",
3984                                              "MAX_BPS",
3985                                              &udp_max_bps))
3986   {
3987     /* 50 MB/s == infinity for practical purposes */
3988     udp_max_bps = 1024 * 1024 * 50;
3989   }
3990
3991   p = GNUNET_new (struct Plugin);
3992   p->port = port;
3993   p->aport = aport;
3994   p->broadcast_interval = interval;
3995   p->enable_ipv6 = enable_v6;
3996   p->enable_ipv4 = GNUNET_YES; /* default */
3997   p->enable_broadcasting = enable_broadcasting;
3998   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3999   p->env = env;
4000   p->sessions = GNUNET_CONTAINER_multipeermap_create (16,
4001                                                       GNUNET_NO);
4002   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4003   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
4004                                  NULL,
4005                                  NULL,
4006                                  GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
4007                                  30);
4008   res = setup_sockets (p,
4009                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
4010                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
4011   if ( (0 == res) ||
4012        ( (NULL == p->sockv4) &&
4013          (NULL == p->sockv6) ) )
4014   {
4015     LOG (GNUNET_ERROR_TYPE_ERROR,
4016         _("Failed to create UDP network sockets\n"));
4017     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
4018     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
4019     if (NULL != p->nat)
4020       GNUNET_NAT_unregister (p->nat);
4021     GNUNET_free (p);
4022     return NULL;
4023   }
4024
4025   /* Setup broadcasting and receiving beacons */
4026   setup_broadcast (p,
4027                    &server_addrv6,
4028                    &server_addrv4);
4029
4030   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
4031   api->cls = p;
4032   api->disconnect_session = &udp_disconnect_session;
4033   api->query_keepalive_factor = &udp_query_keepalive_factor;
4034   api->disconnect_peer = &udp_disconnect;
4035   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
4036   api->address_to_string = &udp_address_to_string;
4037   api->string_to_address = &udp_string_to_address;
4038   api->check_address = &udp_plugin_check_address;
4039   api->get_session = &udp_plugin_get_session;
4040   api->send = &udp_plugin_send;
4041   api->get_network = &udp_plugin_get_network;
4042   api->get_network_for_address = &udp_plugin_get_network_for_address;
4043   api->update_session_timeout = &udp_plugin_update_session_timeout;
4044   api->setup_monitor = &udp_plugin_setup_monitor;
4045   return api;
4046 }
4047
4048
4049 /**
4050  * Function called on each entry in the defragmentation heap to
4051  * clean it up.
4052  *
4053  * @param cls NULL
4054  * @param node node in the heap (to be removed)
4055  * @param element a `struct DefragContext` to be cleaned up
4056  * @param cost unused
4057  * @return #GNUNET_YES
4058  */
4059 static int
4060 heap_cleanup_iterator (void *cls,
4061                        struct GNUNET_CONTAINER_HeapNode *node,
4062                        void *element,
4063                        GNUNET_CONTAINER_HeapCostType cost)
4064 {
4065   struct DefragContext *d_ctx = element;
4066
4067   GNUNET_CONTAINER_heap_remove_node (node);
4068   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
4069   GNUNET_free (d_ctx);
4070   return GNUNET_YES;
4071 }
4072
4073
4074 /**
4075  * The exported method. Makes the core api available via a global and
4076  * returns the udp transport API.
4077  *
4078  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
4079  * @return NULL
4080  */
4081 void *
4082 libgnunet_plugin_transport_udp_done (void *cls)
4083 {
4084   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
4085   struct Plugin *plugin = api->cls;
4086   struct PrettyPrinterContext *cur;
4087   struct UDP_MessageWrapper *udpw;
4088
4089   if (NULL == plugin)
4090   {
4091     GNUNET_free (api);
4092     return NULL;
4093   }
4094   stop_broadcast (plugin);
4095   if (NULL != plugin->select_task_v4)
4096   {
4097     GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
4098     plugin->select_task_v4 = NULL;
4099   }
4100   if (NULL != plugin->select_task_v6)
4101   {
4102     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
4103     plugin->select_task_v6 = NULL;
4104   }
4105   if (NULL != plugin->sockv4)
4106   {
4107     GNUNET_break (GNUNET_OK ==
4108                   GNUNET_NETWORK_socket_close (plugin->sockv4));
4109     plugin->sockv4 = NULL;
4110   }
4111   if (NULL != plugin->sockv6)
4112   {
4113     GNUNET_break (GNUNET_OK ==
4114                   GNUNET_NETWORK_socket_close (plugin->sockv6));
4115     plugin->sockv6 = NULL;
4116   }
4117   if (NULL != plugin->nat)
4118   {
4119     GNUNET_NAT_unregister (plugin->nat);
4120     plugin->nat = NULL;
4121   }
4122   if (NULL != plugin->defrag_ctxs)
4123   {
4124     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
4125                                    &heap_cleanup_iterator,
4126                                    NULL);
4127     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
4128     plugin->defrag_ctxs = NULL;
4129   }
4130   while (NULL != (udpw = plugin->ipv4_queue_head))
4131   {
4132     dequeue (plugin,
4133              udpw);
4134     udpw->qc (udpw->qc_cls,
4135               udpw,
4136               GNUNET_SYSERR);
4137     GNUNET_free (udpw);
4138   }
4139   while (NULL != (udpw = plugin->ipv6_queue_head))
4140   {
4141     dequeue (plugin,
4142              udpw);
4143     udpw->qc (udpw->qc_cls,
4144               udpw,
4145               GNUNET_SYSERR);
4146     GNUNET_free (udpw);
4147   }
4148   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
4149                                          &disconnect_and_free_it,
4150                                          plugin);
4151   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
4152
4153   while (NULL != (cur = plugin->ppc_dll_head))
4154   {
4155     GNUNET_break (0);
4156     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
4157                                  plugin->ppc_dll_tail,
4158                                  cur);
4159     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
4160     if (NULL != cur->timeout_task)
4161     {
4162       GNUNET_SCHEDULER_cancel (cur->timeout_task);
4163       cur->timeout_task = NULL;
4164     }
4165     GNUNET_free (cur);
4166   }
4167   GNUNET_free (plugin);
4168   GNUNET_free (api);
4169   return NULL;
4170 }
4171
4172 /* end of plugin_transport_udp.c */