-use UINT32_MAX to mean disconnect, for real
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 /**
46  * After how much inactivity should a UDP session time out?
47  */
48 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
49
50 /**
51  * Number of messages we can defragment in parallel.  We only really
52  * defragment 1 message at a time, but if messages get re-ordered, we
53  * may want to keep knowledge about the previous message to avoid
54  * discarding the current message in favor of a single fragment of a
55  * previous message.  3 should be good since we don't expect massive
56  * message reorderings with UDP.
57  */
58 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
59
60 /**
61  * We keep a defragmentation queue per sender address.  How many
62  * sender addresses do we support at the same time? Memory consumption
63  * is roughly a factor of 32k * #UDP_MAX_MESSAGES_IN_DEFRAG times this
64  * value. (So 128 corresponds to 12 MB and should suffice for
65  * connecting to roughly 128 peers via UDP).
66  */
67 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
68
69
70 /**
71  * UDP Message-Packet header (after defragmentation).
72  */
73 struct UDPMessage
74 {
75   /**
76    * Message header.
77    */
78   struct GNUNET_MessageHeader header;
79
80   /**
81    * Always zero for now.
82    */
83   uint32_t reserved;
84
85   /**
86    * What is the identity of the sender
87    */
88   struct GNUNET_PeerIdentity sender;
89
90 };
91
92
93 /**
94  * Closure for #append_port().
95  */
96 struct PrettyPrinterContext
97 {
98   /**
99    * DLL
100    */
101   struct PrettyPrinterContext *next;
102
103   /**
104    * DLL
105    */
106   struct PrettyPrinterContext *prev;
107
108   /**
109    * Our plugin.
110    */
111   struct Plugin *plugin;
112
113   /**
114    * Resolver handle
115    */
116   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
117
118   /**
119    * Function to call with the result.
120    */
121   GNUNET_TRANSPORT_AddressStringCallback asc;
122
123   /**
124    * Clsoure for @e asc.
125    */
126   void *asc_cls;
127
128   /**
129    * Timeout task
130    */
131   struct GNUNET_SCHEDULER_Task *timeout_task;
132
133   /**
134    * Is this an IPv6 address?
135    */
136   int ipv6;
137
138   /**
139    * Options
140    */
141   uint32_t options;
142
143   /**
144    * Port to add after the IP address.
145    */
146   uint16_t port;
147
148 };
149
150
151 /**
152  * Session with another peer.
153  */
154 struct GNUNET_ATS_Session
155 {
156   /**
157    * Which peer is this session for?
158    */
159   struct GNUNET_PeerIdentity target;
160
161   /**
162    * Plugin this session belongs to.
163    */
164   struct Plugin *plugin;
165
166   /**
167    * Context for dealing with fragments.
168    */
169   struct UDP_FragmentationContext *frag_ctx;
170
171   /**
172    * Desired delay for next sending we send to other peer
173    */
174   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
175
176   /**
177    * Desired delay for transmissions we received from other peer.
178    * Adjusted to be per fragment (UDP_MTU), even though on the
179    * wire it was for "full messages".
180    */
181   struct GNUNET_TIME_Relative flow_delay_from_other_peer;
182
183   /**
184    * Session timeout task
185    */
186   struct GNUNET_SCHEDULER_Task *timeout_task;
187
188   /**
189    * When does this session time out?
190    */
191   struct GNUNET_TIME_Absolute timeout;
192
193   /**
194    * What time did we last transmit?
195    */
196   struct GNUNET_TIME_Absolute last_transmit_time;
197
198   /**
199    * expected delay for ACKs
200    */
201   struct GNUNET_TIME_Relative last_expected_ack_delay;
202
203   /**
204    * desired delay between UDP messages
205    */
206   struct GNUNET_TIME_Relative last_expected_msg_delay;
207
208   /**
209    * Our own address.
210    */
211   struct GNUNET_HELLO_Address *address;
212
213   /**
214    * Number of bytes waiting for transmission to this peer.
215    */
216   unsigned long long bytes_in_queue;
217
218   /**
219    * Number of messages waiting for transmission to this peer.
220    */
221   unsigned int msgs_in_queue;
222
223   /**
224    * Reference counter to indicate that this session is
225    * currently being used and must not be destroyed;
226    * setting @e in_destroy will destroy it as soon as
227    * possible.
228    */
229   unsigned int rc;
230
231   /**
232    * Network type of the address.
233    */
234   enum GNUNET_ATS_Network_Type scope;
235
236   /**
237    * Is this session about to be destroyed (sometimes we cannot
238    * destroy a session immediately as below us on the stack
239    * there might be code that still uses it; in this case,
240    * @e rc is non-zero).
241    */
242   int in_destroy;
243 };
244
245
246
247 /**
248  * Data structure to track defragmentation contexts based
249  * on the source of the UDP traffic.
250  */
251 struct DefragContext
252 {
253
254   /**
255    * Defragmentation context.
256    */
257   struct GNUNET_DEFRAGMENT_Context *defrag;
258
259   /**
260    * Reference to master plugin struct.
261    */
262   struct Plugin *plugin;
263
264   /**
265    * Node in the defrag heap.
266    */
267   struct GNUNET_CONTAINER_HeapNode *hnode;
268
269   /**
270    * Source address this receive context is for (allocated at the
271    * end of the struct).
272    */
273   const union UdpAddress *udp_addr;
274
275   /**
276    * Who's message(s) are we defragmenting here?
277    * Only initialized once we succeeded and
278    * @e have_sender is set.
279    */
280   struct GNUNET_PeerIdentity sender;
281
282   /**
283    * Length of @e udp_addr.
284    */
285   size_t udp_addr_len;
286
287   /**
288    * Network type the address belongs to.
289    */
290   enum GNUNET_ATS_Network_Type network_type;
291
292   /**
293    * Has the @e sender field been initialized yet?
294    */
295   int have_sender;
296 };
297
298
299 /**
300  * Context to send fragmented messages
301  */
302 struct UDP_FragmentationContext
303 {
304   /**
305    * Next in linked list
306    */
307   struct UDP_FragmentationContext *next;
308
309   /**
310    * Previous in linked list
311    */
312   struct UDP_FragmentationContext *prev;
313
314   /**
315    * The plugin
316    */
317   struct Plugin *plugin;
318
319   /**
320    * Handle for fragmentation.
321    */
322   struct GNUNET_FRAGMENT_Context *frag;
323
324   /**
325    * The session this fragmentation context belongs to
326    */
327   struct GNUNET_ATS_Session *session;
328
329   /**
330    * Function to call upon completion of the transmission.
331    */
332   GNUNET_TRANSPORT_TransmitContinuation cont;
333
334   /**
335    * Closure for @e cont.
336    */
337   void *cont_cls;
338
339   /**
340    * Start time.
341    */
342   struct GNUNET_TIME_Absolute start_time;
343
344   /**
345    * Transmission time for the next fragment.  Incremented by
346    * the "flow_delay_from_other_peer" for each fragment when
347    * we setup the fragments.
348    */
349   struct GNUNET_TIME_Absolute next_frag_time;
350
351   /**
352    * Message timeout
353    */
354   struct GNUNET_TIME_Absolute timeout;
355
356   /**
357    * Payload size of original unfragmented message
358    */
359   size_t payload_size;
360
361   /**
362    * Bytes used to send all fragments on wire including UDP overhead
363    */
364   size_t on_wire_size;
365
366 };
367
368
369 /**
370  * Function called when a message is removed from the
371  * transmission queue.
372  *
373  * @param cls closure
374  * @param udpw message wrapper finished
375  * @param result #GNUNET_OK on success (message was sent)
376  *               #GNUNET_SYSERR if the target disconnected
377  *               or we had a timeout or other trouble sending
378  */
379 typedef void
380 (*QueueContinuation) (void *cls,
381                       struct UDP_MessageWrapper *udpw,
382                       int result);
383
384
385 /**
386  * Information we track for each message in the queue.
387  */
388 struct UDP_MessageWrapper
389 {
390   /**
391    * Session this message belongs to
392    */
393   struct GNUNET_ATS_Session *session;
394
395   /**
396    * DLL of messages, previous element
397    */
398   struct UDP_MessageWrapper *prev;
399
400   /**
401    * DLL of messages, next element
402    */
403   struct UDP_MessageWrapper *next;
404
405   /**
406    * Message with @e msg_size bytes including UDP-specific overhead.
407    */
408   char *msg_buf;
409
410   /**
411    * Function to call once the message wrapper is being removed
412    * from the queue (with success or failure).
413    */
414   QueueContinuation qc;
415
416   /**
417    * Closure for @e qc.
418    */
419   void *qc_cls;
420
421   /**
422    * External continuation to call upon completion of the
423    * transmission, NULL if this queue entry is not for a
424    * message from the application.
425    */
426   GNUNET_TRANSPORT_TransmitContinuation cont;
427
428   /**
429    * Closure for @e cont.
430    */
431   void *cont_cls;
432
433   /**
434    * Fragmentation context.
435    * frag_ctx == NULL if transport <= MTU
436    * frag_ctx != NULL if transport > MTU
437    */
438   struct UDP_FragmentationContext *frag_ctx;
439
440   /**
441    * Message enqueue time.
442    */
443   struct GNUNET_TIME_Absolute start_time;
444
445   /**
446    * Desired transmission time for this message, based on the
447    * flow limiting information we got from the other peer.
448    */
449   struct GNUNET_TIME_Absolute transmission_time;
450
451   /**
452    * Message timeout.
453    */
454   struct GNUNET_TIME_Absolute timeout;
455
456   /**
457    * Size of UDP message to send, including UDP-specific overhead.
458    */
459   size_t msg_size;
460
461   /**
462    * Payload size of original message.
463    */
464   size_t payload_size;
465
466 };
467
468
469 GNUNET_NETWORK_STRUCT_BEGIN
470
471 /**
472  * UDP ACK Message-Packet header.
473  */
474 struct UDP_ACK_Message
475 {
476   /**
477    * Message header.
478    */
479   struct GNUNET_MessageHeader header;
480
481   /**
482    * Desired delay for flow control, in us (in NBO).
483    * A value of UINT32_MAX indicates that the other
484    * peer wants us to disconnect.
485    */
486   uint32_t delay GNUNET_PACKED;
487
488   /**
489    * What is the identity of the sender
490    */
491   struct GNUNET_PeerIdentity sender;
492
493 };
494
495 GNUNET_NETWORK_STRUCT_END
496
497
498 /* ************************* Monitoring *********** */
499
500
501 /**
502  * If a session monitor is attached, notify it about the new
503  * session state.
504  *
505  * @param plugin our plugin
506  * @param session session that changed state
507  * @param state new state of the session
508  */
509 static void
510 notify_session_monitor (struct Plugin *plugin,
511                         struct GNUNET_ATS_Session *session,
512                         enum GNUNET_TRANSPORT_SessionState state)
513 {
514   struct GNUNET_TRANSPORT_SessionInfo info;
515
516   if (NULL == plugin->sic)
517     return;
518   if (GNUNET_YES == session->in_destroy)
519     return; /* already destroyed, just RC>0 left-over actions */
520   memset (&info,
521           0,
522           sizeof (info));
523   info.state = state;
524   info.is_inbound = GNUNET_SYSERR; /* hard to say */
525   info.num_msg_pending = session->msgs_in_queue;
526   info.num_bytes_pending = session->bytes_in_queue;
527   /* info.receive_delay remains zero as this is not supported by UDP
528      (cannot selectively not receive from 'some' peer while continuing
529      to receive from others) */
530   info.session_timeout = session->timeout;
531   info.address = session->address;
532   plugin->sic (plugin->sic_cls,
533                session,
534                &info);
535 }
536
537
538 /**
539  * Return information about the given session to the monitor callback.
540  *
541  * @param cls the `struct Plugin` with the monitor callback (`sic`)
542  * @param peer peer we send information about
543  * @param value our `struct GNUNET_ATS_Session` to send information about
544  * @return #GNUNET_OK (continue to iterate)
545  */
546 static int
547 send_session_info_iter (void *cls,
548                         const struct GNUNET_PeerIdentity *peer,
549                         void *value)
550 {
551   struct Plugin *plugin = cls;
552   struct GNUNET_ATS_Session *session = value;
553
554   notify_session_monitor (plugin,
555                           session,
556                           GNUNET_TRANSPORT_SS_INIT);
557   notify_session_monitor (plugin,
558                           session,
559                           GNUNET_TRANSPORT_SS_UP);
560   return GNUNET_OK;
561 }
562
563
564 /**
565  * Begin monitoring sessions of a plugin.  There can only
566  * be one active monitor per plugin (i.e. if there are
567  * multiple monitors, the transport service needs to
568  * multiplex the generated events over all of them).
569  *
570  * @param cls closure of the plugin
571  * @param sic callback to invoke, NULL to disable monitor;
572  *            plugin will being by iterating over all active
573  *            sessions immediately and then enter monitor mode
574  * @param sic_cls closure for @a sic
575  */
576 static void
577 udp_plugin_setup_monitor (void *cls,
578                           GNUNET_TRANSPORT_SessionInfoCallback sic,
579                           void *sic_cls)
580 {
581   struct Plugin *plugin = cls;
582
583   plugin->sic = sic;
584   plugin->sic_cls = sic_cls;
585   if (NULL != sic)
586   {
587     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
588                                            &send_session_info_iter,
589                                            plugin);
590     /* signal end of first iteration */
591     sic (sic_cls,
592          NULL,
593          NULL);
594   }
595 }
596
597
598 /* ****************** Little Helpers ****************** */
599
600
601 /**
602  * Function to free last resources associated with a session.
603  *
604  * @param s session to free
605  */
606 static void
607 free_session (struct GNUNET_ATS_Session *s)
608 {
609   if (NULL != s->address)
610   {
611     GNUNET_HELLO_address_free (s->address);
612     s->address = NULL;
613   }
614   if (NULL != s->frag_ctx)
615   {
616     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag,
617                                      NULL,
618                                      NULL);
619     GNUNET_free (s->frag_ctx);
620     s->frag_ctx = NULL;
621   }
622   GNUNET_free (s);
623 }
624
625
626 /**
627  * Function that is called to get the keepalive factor.
628  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
629  * calculate the interval between keepalive packets.
630  *
631  * @param cls closure with the `struct Plugin`
632  * @return keepalive factor
633  */
634 static unsigned int
635 udp_query_keepalive_factor (void *cls)
636 {
637   return 15;
638 }
639
640
641 /**
642  * Function obtain the network type for a session
643  *
644  * @param cls closure (`struct Plugin *`)
645  * @param session the session
646  * @return the network type
647  */
648 static enum GNUNET_ATS_Network_Type
649 udp_plugin_get_network (void *cls,
650                         struct GNUNET_ATS_Session *session)
651 {
652   return session->scope;
653 }
654
655
656 /**
657  * Function obtain the network type for an address.
658  *
659  * @param cls closure (`struct Plugin *`)
660  * @param address the address
661  * @return the network type
662  */
663 static enum GNUNET_ATS_Network_Type
664 udp_plugin_get_network_for_address (void *cls,
665                                     const struct GNUNET_HELLO_Address *address)
666 {
667   struct Plugin *plugin = cls;
668   size_t addrlen;
669   struct sockaddr_in a4;
670   struct sockaddr_in6 a6;
671   const struct IPv4UdpAddress *u4;
672   const struct IPv6UdpAddress *u6;
673   const void *sb;
674   size_t sbs;
675
676   addrlen = address->address_length;
677   if (addrlen == sizeof(struct IPv6UdpAddress))
678   {
679     GNUNET_assert (NULL != address->address); /* make static analysis happy */
680     u6 = address->address;
681     memset (&a6, 0, sizeof(a6));
682 #if HAVE_SOCKADDR_IN_SIN_LEN
683     a6.sin6_len = sizeof (a6);
684 #endif
685     a6.sin6_family = AF_INET6;
686     a6.sin6_port = u6->u6_port;
687     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
688     sb = &a6;
689     sbs = sizeof(a6);
690   }
691   else if (addrlen == sizeof(struct IPv4UdpAddress))
692   {
693     GNUNET_assert (NULL != address->address); /* make static analysis happy */
694     u4 = address->address;
695     memset (&a4, 0, sizeof(a4));
696 #if HAVE_SOCKADDR_IN_SIN_LEN
697     a4.sin_len = sizeof (a4);
698 #endif
699     a4.sin_family = AF_INET;
700     a4.sin_port = u4->u4_port;
701     a4.sin_addr.s_addr = u4->ipv4_addr;
702     sb = &a4;
703     sbs = sizeof(a4);
704   }
705   else
706   {
707     GNUNET_break (0);
708     return GNUNET_ATS_NET_UNSPECIFIED;
709   }
710   return plugin->env->get_address_type (plugin->env->cls,
711                                         sb,
712                                         sbs);
713 }
714
715
716 /* ******************* Event loop ******************** */
717
718 /**
719  * We have been notified that our readset has something to read.  We don't
720  * know which socket needs to be read, so we have to check each one
721  * Then reschedule this function to be called again once more is available.
722  *
723  * @param cls the plugin handle
724  * @param tc the scheduling context (for rescheduling this function again)
725  */
726 static void
727 udp_plugin_select_v4 (void *cls,
728                       const struct GNUNET_SCHEDULER_TaskContext *tc);
729
730
731 /**
732  * We have been notified that our readset has something to read.  We don't
733  * know which socket needs to be read, so we have to check each one
734  * Then reschedule this function to be called again once more is available.
735  *
736  * @param cls the plugin handle
737  * @param tc the scheduling context (for rescheduling this function again)
738  */
739 static void
740 udp_plugin_select_v6 (void *cls,
741                       const struct GNUNET_SCHEDULER_TaskContext *tc);
742
743
744 /**
745  * (re)schedule IPv4-select tasks for this plugin.
746  *
747  * @param plugin plugin to reschedule
748  */
749 static void
750 schedule_select_v4 (struct Plugin *plugin)
751 {
752   struct GNUNET_TIME_Relative min_delay;
753   struct GNUNET_TIME_Relative delay;
754   struct UDP_MessageWrapper *udpw;
755   struct UDP_MessageWrapper *min_udpw;
756
757   if ( (GNUNET_YES == plugin->enable_ipv4) &&
758        (NULL != plugin->sockv4) )
759   {
760     /* Find a message ready to send:
761      * Flow delay from other peer is expired or not set (0) */
762     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
763     min_udpw = NULL;
764     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
765     {
766       delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
767       if (delay.rel_value_us < min_delay.rel_value_us)
768       {
769         min_delay = delay;
770         min_udpw = udpw;
771       }
772     }
773     if (NULL != plugin->select_task_v4)
774       GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
775     if (NULL != min_udpw)
776     {
777       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
778       {
779         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
780                     "Calculated flow delay for UDPv4 at %s for %s\n",
781                     GNUNET_STRINGS_relative_time_to_string (min_delay,
782                                                             GNUNET_YES),
783                     GNUNET_i2s (&udpw->session->target));
784       }
785       else
786       {
787         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788                     "Calculated flow delay for UDPv4 at %s for %s\n",
789                     GNUNET_STRINGS_relative_time_to_string (min_delay,
790                                                             GNUNET_YES),
791                     GNUNET_i2s (&udpw->session->target));
792       }
793     }
794     plugin->select_task_v4
795       = GNUNET_SCHEDULER_add_read_net (min_delay,
796                                        plugin->sockv4,
797                                        &udp_plugin_select_v4,
798                                        plugin);
799   }
800 }
801
802
803 /**
804  * (re)schedule IPv6-select tasks for this plugin.
805  *
806  * @param plugin plugin to reschedule
807  */
808 static void
809 schedule_select_v6 (struct Plugin *plugin)
810 {
811   struct GNUNET_TIME_Relative min_delay;
812   struct GNUNET_TIME_Relative delay;
813   struct UDP_MessageWrapper *udpw;
814   struct UDP_MessageWrapper *min_udpw;
815
816   if ( (GNUNET_YES == plugin->enable_ipv6) &&
817        (NULL != plugin->sockv6) )
818   {
819     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
820     min_udpw = NULL;
821     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
822     {
823       delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
824       if (delay.rel_value_us < min_delay.rel_value_us)
825       {
826         min_delay = delay;
827         min_udpw = udpw;
828       }
829     }
830     if (NULL != plugin->select_task_v6)
831       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
832     if (NULL != min_udpw)
833     {
834       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
835       {
836         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
837                     "Calculated flow delay for UDPv6 at %s for %s\n",
838                     GNUNET_STRINGS_relative_time_to_string (min_delay,
839                                                             GNUNET_YES),
840                     GNUNET_i2s (&udpw->session->target));
841       }
842       else
843       {
844         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
845                     "Calculated flow delay for UDPv6 at %s for %s\n",
846                     GNUNET_STRINGS_relative_time_to_string (min_delay,
847                                                             GNUNET_YES),
848                     GNUNET_i2s (&udpw->session->target));
849       }
850     }
851     plugin->select_task_v6
852       = GNUNET_SCHEDULER_add_read_net (min_delay,
853                                        plugin->sockv6,
854                                        &udp_plugin_select_v6,
855                                        plugin);
856   }
857 }
858
859
860 /* ******************* Address to string and back ***************** */
861
862
863 /**
864  * Function called for a quick conversion of the binary address to
865  * a numeric address.  Note that the caller must not free the
866  * address and that the next call to this function is allowed
867  * to override the address again.
868  *
869  * @param cls closure
870  * @param addr binary address (a `union UdpAddress`)
871  * @param addrlen length of the @a addr
872  * @return string representing the same address
873  */
874 const char *
875 udp_address_to_string (void *cls,
876                        const void *addr,
877                        size_t addrlen)
878 {
879   static char rbuf[INET6_ADDRSTRLEN + 10];
880   char buf[INET6_ADDRSTRLEN];
881   const void *sb;
882   struct in_addr a4;
883   struct in6_addr a6;
884   const struct IPv4UdpAddress *t4;
885   const struct IPv6UdpAddress *t6;
886   int af;
887   uint16_t port;
888   uint32_t options;
889
890   if (NULL == addr)
891   {
892     GNUNET_break_op (0);
893     return NULL;
894   }
895
896   if (addrlen == sizeof(struct IPv6UdpAddress))
897   {
898     t6 = addr;
899     af = AF_INET6;
900     options = ntohl (t6->options);
901     port = ntohs (t6->u6_port);
902     a6 = t6->ipv6_addr;
903     sb = &a6;
904   }
905   else if (addrlen == sizeof(struct IPv4UdpAddress))
906   {
907     t4 = addr;
908     af = AF_INET;
909     options = ntohl (t4->options);
910     port = ntohs (t4->u4_port);
911     a4.s_addr = t4->ipv4_addr;
912     sb = &a4;
913   }
914   else
915   {
916     GNUNET_break_op (0);
917     return NULL;
918   }
919   inet_ntop (af,
920              sb,
921              buf,
922              INET6_ADDRSTRLEN);
923   GNUNET_snprintf (rbuf,
924                    sizeof(rbuf),
925                    (af == AF_INET6)
926                    ? "%s.%u.[%s]:%u"
927                    : "%s.%u.%s:%u",
928                    PLUGIN_NAME,
929                    options,
930                    buf,
931                    port);
932   return rbuf;
933 }
934
935
936 /**
937  * Function called to convert a string address to a binary address.
938  *
939  * @param cls closure (`struct Plugin *`)
940  * @param addr string address
941  * @param addrlen length of the address
942  * @param buf location to store the buffer
943  * @param added location to store the number of bytes in the buffer.
944  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
945  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
946  */
947 static int
948 udp_string_to_address (void *cls,
949                        const char *addr,
950                        uint16_t addrlen,
951                        void **buf,
952                        size_t *added)
953 {
954   struct sockaddr_storage socket_address;
955   char *address;
956   char *plugin;
957   char *optionstr;
958   uint32_t options;
959
960   /* Format tcp.options.address:port */
961   address = NULL;
962   plugin = NULL;
963   optionstr = NULL;
964
965   if ((NULL == addr) || (0 == addrlen))
966   {
967     GNUNET_break (0);
968     return GNUNET_SYSERR;
969   }
970   if ('\0' != addr[addrlen - 1])
971   {
972     GNUNET_break (0);
973     return GNUNET_SYSERR;
974   }
975   if (strlen (addr) != addrlen - 1)
976   {
977     GNUNET_break (0);
978     return GNUNET_SYSERR;
979   }
980   plugin = GNUNET_strdup (addr);
981   optionstr = strchr (plugin, '.');
982   if (NULL == optionstr)
983   {
984     GNUNET_break (0);
985     GNUNET_free (plugin);
986     return GNUNET_SYSERR;
987   }
988   optionstr[0] = '\0';
989   optionstr++;
990   options = atol (optionstr);
991   address = strchr (optionstr, '.');
992   if (NULL == address)
993   {
994     GNUNET_break (0);
995     GNUNET_free (plugin);
996     return GNUNET_SYSERR;
997   }
998   address[0] = '\0';
999   address++;
1000
1001   if (GNUNET_OK !=
1002       GNUNET_STRINGS_to_address_ip (address,
1003                                     strlen (address),
1004                                     &socket_address))
1005   {
1006     GNUNET_break (0);
1007     GNUNET_free (plugin);
1008     return GNUNET_SYSERR;
1009   }
1010   GNUNET_free(plugin);
1011
1012   switch (socket_address.ss_family)
1013   {
1014   case AF_INET:
1015     {
1016       struct IPv4UdpAddress *u4;
1017       const struct sockaddr_in *in4 = (const struct sockaddr_in *) &socket_address;
1018
1019       u4 = GNUNET_new (struct IPv4UdpAddress);
1020       u4->options = htonl (options);
1021       u4->ipv4_addr = in4->sin_addr.s_addr;
1022       u4->u4_port = in4->sin_port;
1023       *buf = u4;
1024       *added = sizeof (struct IPv4UdpAddress);
1025       return GNUNET_OK;
1026     }
1027   case AF_INET6:
1028     {
1029       struct IPv6UdpAddress *u6;
1030       const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &socket_address;
1031
1032       u6 = GNUNET_new (struct IPv6UdpAddress);
1033       u6->options = htonl (options);
1034       u6->ipv6_addr = in6->sin6_addr;
1035       u6->u6_port = in6->sin6_port;
1036       *buf = u6;
1037       *added = sizeof (struct IPv6UdpAddress);
1038       return GNUNET_OK;
1039     }
1040   default:
1041     GNUNET_break (0);
1042     return GNUNET_SYSERR;
1043   }
1044 }
1045
1046
1047 /**
1048  * Append our port and forward the result.
1049  *
1050  * @param cls a `struct PrettyPrinterContext *`
1051  * @param hostname result from DNS resolver
1052  */
1053 static void
1054 append_port (void *cls,
1055              const char *hostname)
1056 {
1057   struct PrettyPrinterContext *ppc = cls;
1058   struct Plugin *plugin = ppc->plugin;
1059   char *ret;
1060
1061   if (NULL == hostname)
1062   {
1063     /* Final call, done */
1064     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
1065                                  plugin->ppc_dll_tail,
1066                                  ppc);
1067     ppc->resolver_handle = NULL;
1068     ppc->asc (ppc->asc_cls,
1069               NULL,
1070               GNUNET_OK);
1071     GNUNET_free (ppc);
1072     return;
1073   }
1074   if (GNUNET_YES == ppc->ipv6)
1075     GNUNET_asprintf (&ret,
1076                      "%s.%u.[%s]:%d",
1077                      PLUGIN_NAME,
1078                      ppc->options,
1079                      hostname,
1080                      ppc->port);
1081   else
1082     GNUNET_asprintf (&ret,
1083                      "%s.%u.%s:%d",
1084                      PLUGIN_NAME,
1085                      ppc->options,
1086                      hostname,
1087                      ppc->port);
1088   ppc->asc (ppc->asc_cls,
1089             ret,
1090             GNUNET_OK);
1091   GNUNET_free (ret);
1092 }
1093
1094
1095 /**
1096  * Convert the transports address to a nice, human-readable format.
1097  *
1098  * @param cls closure with the `struct Plugin *`
1099  * @param type name of the transport that generated the address
1100  * @param addr one of the addresses of the host, NULL for the last address
1101  *        the specific address format depends on the transport;
1102  *        a `union UdpAddress`
1103  * @param addrlen length of the address
1104  * @param numeric should (IP) addresses be displayed in numeric form?
1105  * @param timeout after how long should we give up?
1106  * @param asc function to call on each string
1107  * @param asc_cls closure for @a asc
1108  */
1109 static void
1110 udp_plugin_address_pretty_printer (void *cls,
1111                                    const char *type,
1112                                    const void *addr,
1113                                    size_t addrlen,
1114                                    int numeric,
1115                                    struct GNUNET_TIME_Relative timeout,
1116                                    GNUNET_TRANSPORT_AddressStringCallback asc,
1117                                    void *asc_cls)
1118 {
1119   struct Plugin *plugin = cls;
1120   struct PrettyPrinterContext *ppc;
1121   const struct sockaddr *sb;
1122   size_t sbs;
1123   struct sockaddr_in a4;
1124   struct sockaddr_in6 a6;
1125   const struct IPv4UdpAddress *u4;
1126   const struct IPv6UdpAddress *u6;
1127   uint16_t port;
1128   uint32_t options;
1129
1130   if (addrlen == sizeof(struct IPv6UdpAddress))
1131   {
1132     u6 = addr;
1133     memset (&a6,
1134             0,
1135             sizeof (a6));
1136     a6.sin6_family = AF_INET6;
1137 #if HAVE_SOCKADDR_IN_SIN_LEN
1138     a6.sin6_len = sizeof (a6);
1139 #endif
1140     a6.sin6_port = u6->u6_port;
1141     a6.sin6_addr = u6->ipv6_addr;
1142     port = ntohs (u6->u6_port);
1143     options = ntohl (u6->options);
1144     sb = (const struct sockaddr *) &a6;
1145     sbs = sizeof (a6);
1146   }
1147   else if (addrlen == sizeof (struct IPv4UdpAddress))
1148   {
1149     u4 = addr;
1150     memset (&a4,
1151             0,
1152             sizeof(a4));
1153     a4.sin_family = AF_INET;
1154 #if HAVE_SOCKADDR_IN_SIN_LEN
1155     a4.sin_len = sizeof (a4);
1156 #endif
1157     a4.sin_port = u4->u4_port;
1158     a4.sin_addr.s_addr = u4->ipv4_addr;
1159     port = ntohs (u4->u4_port);
1160     options = ntohl (u4->options);
1161     sb = (const struct sockaddr *) &a4;
1162     sbs = sizeof(a4);
1163   }
1164   else
1165   {
1166     /* invalid address */
1167     GNUNET_break_op (0);
1168     asc (asc_cls,
1169          NULL,
1170          GNUNET_SYSERR);
1171     asc (asc_cls,
1172          NULL,
1173          GNUNET_OK);
1174     return;
1175   }
1176   ppc = GNUNET_new (struct PrettyPrinterContext);
1177   ppc->plugin = plugin;
1178   ppc->asc = asc;
1179   ppc->asc_cls = asc_cls;
1180   ppc->port = port;
1181   ppc->options = options;
1182   if (addrlen == sizeof (struct IPv6UdpAddress))
1183     ppc->ipv6 = GNUNET_YES;
1184   else
1185     ppc->ipv6 = GNUNET_NO;
1186   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
1187                                plugin->ppc_dll_tail,
1188                                ppc);
1189   ppc->resolver_handle
1190     = GNUNET_RESOLVER_hostname_get (sb,
1191                                     sbs,
1192                                     ! numeric,
1193                                     timeout,
1194                                     &append_port,
1195                                     ppc);
1196 }
1197
1198
1199 /**
1200  * Check if the given port is plausible (must be either our listen
1201  * port or our advertised port).  If it is neither, we return
1202  * #GNUNET_SYSERR.
1203  *
1204  * @param plugin global variables
1205  * @param in_port port number to check
1206  * @return #GNUNET_OK if port is either our open or advertised port
1207  */
1208 static int
1209 check_port (const struct Plugin *plugin,
1210             uint16_t in_port)
1211 {
1212   if ( (plugin->port == in_port) ||
1213        (plugin->aport == in_port) )
1214     return GNUNET_OK;
1215   return GNUNET_SYSERR;
1216 }
1217
1218
1219 /**
1220  * Function that will be called to check if a binary address for this
1221  * plugin is well-formed and corresponds to an address for THIS peer
1222  * (as per our configuration).  Naturally, if absolutely necessary,
1223  * plugins can be a bit conservative in their answer, but in general
1224  * plugins should make sure that the address does not redirect
1225  * traffic to a 3rd party that might try to man-in-the-middle our
1226  * traffic.
1227  *
1228  * @param cls closure, should be our handle to the Plugin
1229  * @param addr pointer to a `union UdpAddress`
1230  * @param addrlen length of @a addr
1231  * @return #GNUNET_OK if this is a plausible address for this peer
1232  *         and transport, #GNUNET_SYSERR if not
1233  */
1234 static int
1235 udp_plugin_check_address (void *cls,
1236                           const void *addr,
1237                           size_t addrlen)
1238 {
1239   struct Plugin *plugin = cls;
1240   const struct IPv4UdpAddress *v4;
1241   const struct IPv6UdpAddress *v6;
1242
1243   if (sizeof(struct IPv4UdpAddress) == addrlen)
1244   {
1245     v4 = (const struct IPv4UdpAddress *) addr;
1246     if (GNUNET_OK != check_port (plugin,
1247                                  ntohs (v4->u4_port)))
1248       return GNUNET_SYSERR;
1249     if (GNUNET_OK !=
1250         GNUNET_NAT_test_address (plugin->nat,
1251                                  &v4->ipv4_addr,
1252                                  sizeof (struct in_addr)))
1253       return GNUNET_SYSERR;
1254   }
1255   else if (sizeof(struct IPv6UdpAddress) == addrlen)
1256   {
1257     v6 = (const struct IPv6UdpAddress *) addr;
1258     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1259     {
1260       GNUNET_break_op (0);
1261       return GNUNET_SYSERR;
1262     }
1263     if (GNUNET_OK != check_port (plugin,
1264                                  ntohs (v6->u6_port)))
1265       return GNUNET_SYSERR;
1266     if (GNUNET_OK !=
1267         GNUNET_NAT_test_address (plugin->nat,
1268                                  &v6->ipv6_addr,
1269                                  sizeof (struct in6_addr)))
1270       return GNUNET_SYSERR;
1271   }
1272   else
1273   {
1274     GNUNET_break_op (0);
1275     return GNUNET_SYSERR;
1276   }
1277   return GNUNET_OK;
1278 }
1279
1280
1281 /**
1282  * Our external IP address/port mapping has changed.
1283  *
1284  * @param cls closure, the `struct Plugin`
1285  * @param add_remove #GNUNET_YES to mean the new public IP address,
1286  *                   #GNUNET_NO to mean the previous (now invalid) one
1287  * @param addr either the previous or the new public IP address
1288  * @param addrlen actual length of the @a addr
1289  */
1290 static void
1291 udp_nat_port_map_callback (void *cls,
1292                            int add_remove,
1293                            const struct sockaddr *addr,
1294                            socklen_t addrlen)
1295 {
1296   struct Plugin *plugin = cls;
1297   struct GNUNET_HELLO_Address *address;
1298   struct IPv4UdpAddress u4;
1299   struct IPv6UdpAddress u6;
1300   void *arg;
1301   size_t args;
1302
1303   LOG (GNUNET_ERROR_TYPE_DEBUG,
1304        (GNUNET_YES == add_remove)
1305        ? "NAT notification to add address `%s'\n"
1306        : "NAT notification to remove address `%s'\n",
1307        GNUNET_a2s (addr,
1308                    addrlen));
1309   /* convert 'address' to our internal format */
1310   switch (addr->sa_family)
1311   {
1312   case AF_INET:
1313     {
1314       const struct sockaddr_in *i4;
1315
1316       GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
1317       i4 = (const struct sockaddr_in *) addr;
1318       if (0 == ntohs (i4->sin_port))
1319       {
1320         GNUNET_break (0);
1321         return;
1322       }
1323       memset (&u4,
1324               0,
1325               sizeof(u4));
1326       u4.options = htonl (plugin->myoptions);
1327       u4.ipv4_addr = i4->sin_addr.s_addr;
1328       u4.u4_port = i4->sin_port;
1329       arg = &u4;
1330       args = sizeof (struct IPv4UdpAddress);
1331       break;
1332     }
1333   case AF_INET6:
1334     {
1335       const struct sockaddr_in6 *i6;
1336
1337       GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
1338       i6 = (const struct sockaddr_in6 *) addr;
1339       if (0 == ntohs (i6->sin6_port))
1340       {
1341         GNUNET_break (0);
1342         return;
1343       }
1344       memset (&u6,
1345               0,
1346               sizeof(u6));
1347       u6.options = htonl (plugin->myoptions);
1348       u6.ipv6_addr = i6->sin6_addr;
1349       u6.u6_port = i6->sin6_port;
1350       arg = &u6;
1351       args = sizeof (struct IPv6UdpAddress);
1352       break;
1353     }
1354   default:
1355     GNUNET_break (0);
1356     return;
1357   }
1358   /* modify our published address list */
1359   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
1360                                            PLUGIN_NAME,
1361                                            arg,
1362                                            args,
1363                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
1364   plugin->env->notify_address (plugin->env->cls,
1365                                add_remove,
1366                                address);
1367   GNUNET_HELLO_address_free (address);
1368 }
1369
1370
1371 /* ********************* Finding sessions ******************* */
1372
1373
1374 /**
1375  * Closure for #session_cmp_it().
1376  */
1377 struct GNUNET_ATS_SessionCompareContext
1378 {
1379   /**
1380    * Set to session matching the address.
1381    */
1382   struct GNUNET_ATS_Session *res;
1383
1384   /**
1385    * Address we are looking for.
1386    */
1387   const struct GNUNET_HELLO_Address *address;
1388 };
1389
1390
1391 /**
1392  * Find a session with a matching address.
1393  *
1394  * @param cls the `struct GNUNET_ATS_SessionCompareContext *`
1395  * @param key peer identity (unused)
1396  * @param value the `struct GNUNET_ATS_Session *`
1397  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1398  */
1399 static int
1400 session_cmp_it (void *cls,
1401                 const struct GNUNET_PeerIdentity *key,
1402                 void *value)
1403 {
1404   struct GNUNET_ATS_SessionCompareContext *cctx = cls;
1405   struct GNUNET_ATS_Session *s = value;
1406
1407   if (0 == GNUNET_HELLO_address_cmp (s->address,
1408                                      cctx->address))
1409   {
1410     GNUNET_assert (GNUNET_NO == s->in_destroy);
1411     cctx->res = s;
1412     return GNUNET_NO;
1413   }
1414   return GNUNET_OK;
1415 }
1416
1417
1418 /**
1419  * Locate an existing session the transport service is using to
1420  * send data to another peer.  Performs some basic sanity checks
1421  * on the address and then tries to locate a matching session.
1422  *
1423  * @param cls the plugin
1424  * @param address the address we should locate the session by
1425  * @return the session if it exists, or NULL if it is not found
1426  */
1427 static struct GNUNET_ATS_Session *
1428 udp_plugin_lookup_session (void *cls,
1429                            const struct GNUNET_HELLO_Address *address)
1430 {
1431   struct Plugin *plugin = cls;
1432   const struct IPv6UdpAddress *udp_a6;
1433   const struct IPv4UdpAddress *udp_a4;
1434   struct GNUNET_ATS_SessionCompareContext cctx;
1435
1436   if (NULL == address->address)
1437   {
1438     GNUNET_break (0);
1439     return NULL;
1440   }
1441   if (sizeof(struct IPv4UdpAddress) == address->address_length)
1442   {
1443     if (NULL == plugin->sockv4)
1444       return NULL;
1445     udp_a4 = (const struct IPv4UdpAddress *) address->address;
1446     if (0 == udp_a4->u4_port)
1447     {
1448       GNUNET_break (0);
1449       return NULL;
1450     }
1451   }
1452   else if (sizeof(struct IPv6UdpAddress) == address->address_length)
1453   {
1454     if (NULL == plugin->sockv6)
1455       return NULL;
1456     udp_a6 = (const struct IPv6UdpAddress *) address->address;
1457     if (0 == udp_a6->u6_port)
1458     {
1459       GNUNET_break (0);
1460       return NULL;
1461     }
1462   }
1463   else
1464   {
1465     GNUNET_break (0);
1466     return NULL;
1467   }
1468
1469   /* check if session already exists */
1470   cctx.address = address;
1471   cctx.res = NULL;
1472   LOG (GNUNET_ERROR_TYPE_DEBUG,
1473        "Looking for existing session for peer `%s' with address `%s'\n",
1474        GNUNET_i2s (&address->peer),
1475        udp_address_to_string (plugin,
1476                               address->address,
1477                               address->address_length));
1478   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1479                                               &address->peer,
1480                                               &session_cmp_it,
1481                                               &cctx);
1482   if (NULL == cctx.res)
1483     return NULL;
1484   LOG (GNUNET_ERROR_TYPE_DEBUG,
1485        "Found existing session %p\n",
1486        cctx.res);
1487   return cctx.res;
1488 }
1489
1490
1491 /* ********************** Timeout ****************** */
1492
1493
1494 /**
1495  * Increment session timeout due to activity.
1496  *
1497  * @param s session to reschedule timeout activity for
1498  */
1499 static void
1500 reschedule_session_timeout (struct GNUNET_ATS_Session *s)
1501 {
1502   if (GNUNET_YES == s->in_destroy)
1503     return;
1504   GNUNET_assert (NULL != s->timeout_task);
1505   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1506 }
1507
1508
1509
1510 /**
1511  * Function that will be called whenever the transport service wants to
1512  * notify the plugin that a session is still active and in use and
1513  * therefore the session timeout for this session has to be updated
1514  *
1515  * @param cls closure with the `struct Plugin`
1516  * @param peer which peer was the session for
1517  * @param session which session is being updated
1518  */
1519 static void
1520 udp_plugin_update_session_timeout (void *cls,
1521                                    const struct GNUNET_PeerIdentity *peer,
1522                                    struct GNUNET_ATS_Session *session)
1523 {
1524   struct Plugin *plugin = cls;
1525
1526   if (GNUNET_YES !=
1527       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1528                                                     peer,
1529                                                     session))
1530   {
1531     GNUNET_break (0);
1532     return;
1533   }
1534   /* Reschedule session timeout */
1535   reschedule_session_timeout (session);
1536 }
1537
1538
1539 /* ************************* Sending ************************ */
1540
1541
1542 /**
1543  * Remove the given message from the transmission queue and
1544  * update all applicable statistics.
1545  *
1546  * @param plugin the UDP plugin
1547  * @param udpw message wrapper to dequeue
1548  */
1549 static void
1550 dequeue (struct Plugin *plugin,
1551          struct UDP_MessageWrapper *udpw)
1552 {
1553   struct GNUNET_ATS_Session *session = udpw->session;
1554
1555   if (plugin->bytes_in_buffer < udpw->msg_size)
1556   {
1557     GNUNET_break (0);
1558   }
1559   else
1560   {
1561     GNUNET_STATISTICS_update (plugin->env->stats,
1562                               "# UDP, total bytes in send buffers",
1563                               - (long long) udpw->msg_size,
1564                               GNUNET_NO);
1565     plugin->bytes_in_buffer -= udpw->msg_size;
1566   }
1567   GNUNET_STATISTICS_update (plugin->env->stats,
1568                             "# UDP, total messages in send buffers",
1569                             -1,
1570                             GNUNET_NO);
1571   if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1572   {
1573     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1574                                  plugin->ipv4_queue_tail,
1575                                  udpw);
1576   }
1577   else if (sizeof(struct IPv6UdpAddress) == udpw->session->address->address_length)
1578   {
1579     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1580                                  plugin->ipv6_queue_tail,
1581                                  udpw);
1582   }
1583   else
1584   {
1585     GNUNET_break (0);
1586     return;
1587   }
1588   GNUNET_assert (session->msgs_in_queue > 0);
1589   session->msgs_in_queue--;
1590   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1591   session->bytes_in_queue -= udpw->msg_size;
1592 }
1593
1594
1595 /**
1596  * Enqueue a message for transmission and update statistics.
1597  *
1598  * @param plugin the UDP plugin
1599  * @param udpw message wrapper to queue
1600  */
1601 static void
1602 enqueue (struct Plugin *plugin,
1603          struct UDP_MessageWrapper *udpw)
1604 {
1605   struct GNUNET_ATS_Session *session = udpw->session;
1606
1607   if (GNUNET_YES == session->in_destroy)
1608   {
1609     GNUNET_break (0);
1610     return;
1611   }
1612   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1613   {
1614     GNUNET_break (0);
1615   }
1616   else
1617   {
1618     GNUNET_STATISTICS_update (plugin->env->stats,
1619                               "# UDP, total bytes in send buffers",
1620                               udpw->msg_size,
1621                               GNUNET_NO);
1622     plugin->bytes_in_buffer += udpw->msg_size;
1623   }
1624   GNUNET_STATISTICS_update (plugin->env->stats,
1625                             "# UDP, total messages in send buffers",
1626                             1,
1627                             GNUNET_NO);
1628   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
1629   {
1630     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1631                                 plugin->ipv4_queue_tail,
1632                                 udpw);
1633   }
1634   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
1635   {
1636     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1637                                  plugin->ipv6_queue_tail,
1638                                  udpw);
1639   }
1640   else
1641   {
1642     GNUNET_break (0);
1643     udpw->cont (udpw->cont_cls,
1644                 &session->target,
1645                 GNUNET_SYSERR,
1646                 udpw->msg_size,
1647                 0);
1648     GNUNET_free (udpw);
1649     return;
1650   }
1651   session->msgs_in_queue++;
1652   session->bytes_in_queue += udpw->msg_size;
1653 }
1654
1655
1656 /**
1657  * We have completed our (attempt) to transmit a message that had to
1658  * be fragmented -- either because we got an ACK saying that all
1659  * fragments were received, or because of timeout / disconnect.  Clean
1660  * up our state.
1661  *
1662  * @param frag_ctx fragmentation context to clean up
1663  * @param result #GNUNET_OK if we succeeded (got ACK),
1664  *               #GNUNET_SYSERR if the transmission failed
1665  */
1666 static void
1667 fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
1668                          int result)
1669 {
1670   struct Plugin *plugin = frag_ctx->plugin;
1671   struct GNUNET_ATS_Session *s = frag_ctx->session;
1672   struct UDP_MessageWrapper *udpw;
1673   struct UDP_MessageWrapper *tmp;
1674   size_t overhead;
1675   struct GNUNET_TIME_Relative delay;
1676
1677   LOG (GNUNET_ERROR_TYPE_DEBUG,
1678        "%p: Fragmented message removed with result %s\n",
1679        frag_ctx,
1680        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1681   /* Call continuation for fragmented message */
1682   if (frag_ctx->on_wire_size >= frag_ctx->payload_size)
1683     overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
1684   else
1685     overhead = frag_ctx->on_wire_size;
1686   delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
1687   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1688   {
1689     LOG (GNUNET_ERROR_TYPE_WARNING,
1690          "Fragmented message acknowledged after %s\n",
1691          GNUNET_STRINGS_relative_time_to_string (delay,
1692                                                  GNUNET_YES));
1693   }
1694   else
1695   {
1696     LOG (GNUNET_ERROR_TYPE_DEBUG,
1697          "Fragmented message acknowledged after %s\n",
1698          GNUNET_STRINGS_relative_time_to_string (delay,
1699                                                  GNUNET_YES));
1700   }
1701
1702   if (NULL != frag_ctx->cont)
1703     frag_ctx->cont (frag_ctx->cont_cls,
1704                     &s->target,
1705                     result,
1706                     s->frag_ctx->payload_size,
1707                     frag_ctx->on_wire_size);
1708   GNUNET_STATISTICS_update (plugin->env->stats,
1709                             "# UDP, fragmented messages active",
1710                             -1,
1711                             GNUNET_NO);
1712
1713   if (GNUNET_OK == result)
1714   {
1715     GNUNET_STATISTICS_update (plugin->env->stats,
1716                               "# UDP, fragmented msgs, messages, sent, success",
1717                               1,
1718                               GNUNET_NO);
1719     GNUNET_STATISTICS_update (plugin->env->stats,
1720                               "# UDP, fragmented msgs, bytes payload, sent, success",
1721                               s->frag_ctx->payload_size,
1722                               GNUNET_NO);
1723     GNUNET_STATISTICS_update (plugin->env->stats,
1724                               "# UDP, fragmented msgs, bytes overhead, sent, success",
1725                               overhead,
1726                               GNUNET_NO);
1727     GNUNET_STATISTICS_update (plugin->env->stats,
1728                               "# UDP, total, bytes overhead, sent",
1729                               overhead,
1730                               GNUNET_NO);
1731     GNUNET_STATISTICS_update (plugin->env->stats,
1732                               "# UDP, total, bytes payload, sent",
1733                               s->frag_ctx->payload_size,
1734                               GNUNET_NO);
1735   }
1736   else
1737   {
1738     GNUNET_STATISTICS_update (plugin->env->stats,
1739                               "# UDP, fragmented msgs, messages, sent, failure",
1740                               1,
1741                               GNUNET_NO);
1742     GNUNET_STATISTICS_update (plugin->env->stats,
1743                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1744                               s->frag_ctx->payload_size,
1745                               GNUNET_NO);
1746     GNUNET_STATISTICS_update (plugin->env->stats,
1747                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1748                               overhead,
1749                               GNUNET_NO);
1750     GNUNET_STATISTICS_update (plugin->env->stats,
1751                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1752                               overhead,
1753                               GNUNET_NO);
1754   }
1755
1756   /* Remove remaining fragments from queue, no need to transmit those
1757      any longer. */
1758   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1759   {
1760     udpw = plugin->ipv6_queue_head;
1761     while (NULL != udpw)
1762     {
1763       tmp = udpw->next;
1764       if ( (udpw->frag_ctx != NULL) &&
1765            (udpw->frag_ctx == frag_ctx) )
1766       {
1767         dequeue (plugin,
1768                  udpw);
1769         GNUNET_free (udpw);
1770       }
1771       udpw = tmp;
1772     }
1773   }
1774   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1775   {
1776     udpw = plugin->ipv4_queue_head;
1777     while (NULL != udpw)
1778     {
1779       tmp = udpw->next;
1780       if ( (NULL != udpw->frag_ctx) &&
1781            (udpw->frag_ctx == frag_ctx) )
1782       {
1783         dequeue (plugin,
1784                  udpw);
1785         GNUNET_free (udpw);
1786       }
1787       udpw = tmp;
1788     }
1789   }
1790   notify_session_monitor (s->plugin,
1791                           s,
1792                           GNUNET_TRANSPORT_SS_UPDATE);
1793   GNUNET_FRAGMENT_context_destroy (frag_ctx->frag,
1794                                    &s->last_expected_msg_delay,
1795                                    &s->last_expected_ack_delay);
1796   s->frag_ctx = NULL;
1797   GNUNET_free (frag_ctx);
1798 }
1799
1800
1801 /**
1802  * We are finished with a fragment in the message queue.
1803  * Notify the continuation and update statistics.
1804  *
1805  * @param cls the `struct Plugin *`
1806  * @param udpw the queue entry
1807  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1808  */
1809 static void
1810 qc_fragment_sent (void *cls,
1811                   struct UDP_MessageWrapper *udpw,
1812                   int result)
1813 {
1814   struct Plugin *plugin = cls;
1815
1816   GNUNET_assert (NULL != udpw->frag_ctx);
1817   if (GNUNET_OK == result)
1818   {
1819     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1820     GNUNET_STATISTICS_update (plugin->env->stats,
1821                               "# UDP, fragmented msgs, fragments, sent, success",
1822                               1,
1823                               GNUNET_NO);
1824     GNUNET_STATISTICS_update (plugin->env->stats,
1825                               "# UDP, fragmented msgs, fragments bytes, sent, success",
1826                               udpw->msg_size,
1827                               GNUNET_NO);
1828   }
1829   else
1830   {
1831     fragmented_message_done (udpw->frag_ctx,
1832                              GNUNET_SYSERR);
1833     GNUNET_STATISTICS_update (plugin->env->stats,
1834                               "# UDP, fragmented msgs, fragments, sent, failure",
1835                               1,
1836                               GNUNET_NO);
1837     GNUNET_STATISTICS_update (plugin->env->stats,
1838                               "# UDP, fragmented msgs, fragments bytes, sent, failure",
1839                               udpw->msg_size,
1840                               GNUNET_NO);
1841   }
1842 }
1843
1844
1845 /**
1846  * Function that is called with messages created by the fragmentation
1847  * module.  In the case of the `proc` callback of the
1848  * #GNUNET_FRAGMENT_context_create() function, this function must
1849  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1850  *
1851  * @param cls closure, the `struct UDP_FragmentationContext`
1852  * @param msg the message that was created
1853  */
1854 static void
1855 enqueue_fragment (void *cls,
1856                   const struct GNUNET_MessageHeader *msg)
1857 {
1858   struct UDP_FragmentationContext *frag_ctx = cls;
1859   struct Plugin *plugin = frag_ctx->plugin;
1860   struct UDP_MessageWrapper *udpw;
1861   struct GNUNET_ATS_Session *session = frag_ctx->session;
1862   size_t msg_len = ntohs (msg->size);
1863
1864   LOG (GNUNET_ERROR_TYPE_DEBUG,
1865        "Enqueuing fragment with %u bytes\n",
1866        msg_len);
1867   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1868   udpw->session = session;
1869   udpw->msg_buf = (char *) &udpw[1];
1870   udpw->msg_size = msg_len;
1871   udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
1872   udpw->timeout = frag_ctx->timeout;
1873   udpw->start_time = frag_ctx->start_time;
1874   udpw->transmission_time = frag_ctx->next_frag_time;
1875   frag_ctx->next_frag_time
1876     = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
1877                                 session->flow_delay_from_other_peer);
1878   udpw->frag_ctx = frag_ctx;
1879   udpw->qc = &qc_fragment_sent;
1880   udpw->qc_cls = plugin;
1881   memcpy (udpw->msg_buf,
1882           msg,
1883           msg_len);
1884   enqueue (plugin,
1885            udpw);
1886 }
1887
1888
1889 /**
1890  * We are finished with a message from the message queue.
1891  * Notify the continuation and update statistics.
1892  *
1893  * @param cls the `struct Plugin *`
1894  * @param udpw the queue entry
1895  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1896  */
1897 static void
1898 qc_message_sent (void *cls,
1899                  struct UDP_MessageWrapper *udpw,
1900                  int result)
1901 {
1902   struct Plugin *plugin = cls;
1903   size_t overhead;
1904   struct GNUNET_TIME_Relative delay;
1905
1906   if (udpw->msg_size >= udpw->payload_size)
1907     overhead = udpw->msg_size - udpw->payload_size;
1908   else
1909     overhead = udpw->msg_size;
1910
1911   if (NULL != udpw->cont)
1912   {
1913     delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
1914     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1915     {
1916       LOG (GNUNET_ERROR_TYPE_WARNING,
1917            "Message sent via UDP with delay of %s\n",
1918            GNUNET_STRINGS_relative_time_to_string (delay,
1919                                                    GNUNET_YES));
1920     }
1921     else
1922     {
1923       LOG (GNUNET_ERROR_TYPE_DEBUG,
1924            "Message sent via UDP with delay of %s\n",
1925            GNUNET_STRINGS_relative_time_to_string (delay,
1926                                                    GNUNET_YES));
1927     }
1928     udpw->cont (udpw->cont_cls,
1929                 &udpw->session->target,
1930                 result,
1931                 udpw->payload_size,
1932                 overhead);
1933   }
1934   if (GNUNET_OK == result)
1935   {
1936     GNUNET_STATISTICS_update (plugin->env->stats,
1937                               "# UDP, unfragmented msgs, messages, sent, success",
1938                               1,
1939                               GNUNET_NO);
1940     GNUNET_STATISTICS_update (plugin->env->stats,
1941                               "# UDP, unfragmented msgs, bytes payload, sent, success",
1942                               udpw->payload_size,
1943                               GNUNET_NO);
1944     GNUNET_STATISTICS_update (plugin->env->stats,
1945                               "# UDP, unfragmented msgs, bytes overhead, sent, success",
1946                               overhead,
1947                               GNUNET_NO);
1948     GNUNET_STATISTICS_update (plugin->env->stats,
1949                               "# UDP, total, bytes overhead, sent",
1950                               overhead,
1951                               GNUNET_NO);
1952     GNUNET_STATISTICS_update (plugin->env->stats,
1953                               "# UDP, total, bytes payload, sent",
1954                               udpw->payload_size,
1955                               GNUNET_NO);
1956   }
1957   else
1958   {
1959     GNUNET_STATISTICS_update (plugin->env->stats,
1960                               "# UDP, unfragmented msgs, messages, sent, failure",
1961                               1,
1962                               GNUNET_NO);
1963     GNUNET_STATISTICS_update (plugin->env->stats,
1964                               "# UDP, unfragmented msgs, bytes payload, sent, failure",
1965                               udpw->payload_size,
1966                               GNUNET_NO);
1967     GNUNET_STATISTICS_update (plugin->env->stats,
1968                               "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1969                               overhead,
1970                               GNUNET_NO);
1971   }
1972 }
1973
1974
1975 /**
1976  * Function that can be used by the transport service to transmit a
1977  * message using the plugin.  Note that in the case of a peer
1978  * disconnecting, the continuation MUST be called prior to the
1979  * disconnect notification itself.  This function will be called with
1980  * this peer's HELLO message to initiate a fresh connection to another
1981  * peer.
1982  *
1983  * @param cls closure
1984  * @param s which session must be used
1985  * @param msgbuf the message to transmit
1986  * @param msgbuf_size number of bytes in @a msgbuf
1987  * @param priority how important is the message (most plugins will
1988  *                 ignore message priority and just FIFO)
1989  * @param to how long to wait at most for the transmission (does not
1990  *                require plugins to discard the message after the timeout,
1991  *                just advisory for the desired delay; most plugins will ignore
1992  *                this as well)
1993  * @param cont continuation to call once the message has
1994  *        been transmitted (or if the transport is ready
1995  *        for the next transmission call; or if the
1996  *        peer disconnected...); can be NULL
1997  * @param cont_cls closure for @a cont
1998  * @return number of bytes used (on the physical network, with overheads);
1999  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
2000  *         and does NOT mean that the message was not transmitted (DV)
2001  */
2002 static ssize_t
2003 udp_plugin_send (void *cls,
2004                  struct GNUNET_ATS_Session *s,
2005                  const char *msgbuf,
2006                  size_t msgbuf_size,
2007                  unsigned int priority,
2008                  struct GNUNET_TIME_Relative to,
2009                  GNUNET_TRANSPORT_TransmitContinuation cont,
2010                  void *cont_cls)
2011 {
2012   struct Plugin *plugin = cls;
2013   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
2014   struct UDP_FragmentationContext *frag_ctx;
2015   struct UDP_MessageWrapper *udpw;
2016   struct UDPMessage *udp;
2017   char mbuf[udpmlen] GNUNET_ALIGN;
2018
2019   if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
2020        (NULL == plugin->sockv6) )
2021     return GNUNET_SYSERR;
2022   if ( (sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
2023        (NULL == plugin->sockv4) )
2024     return GNUNET_SYSERR;
2025   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2026   {
2027     GNUNET_break (0);
2028     return GNUNET_SYSERR;
2029   }
2030   if (GNUNET_YES !=
2031       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
2032                                                     &s->target,
2033                                                     s))
2034   {
2035     GNUNET_break (0);
2036     return GNUNET_SYSERR;
2037   }
2038   LOG (GNUNET_ERROR_TYPE_DEBUG,
2039        "UDP transmits %u-byte message to `%s' using address `%s'\n",
2040        udpmlen,
2041        GNUNET_i2s (&s->target),
2042        udp_address_to_string (plugin,
2043                               s->address->address,
2044                               s->address->address_length));
2045
2046   udp = (struct UDPMessage *) mbuf;
2047   udp->header.size = htons (udpmlen);
2048   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2049   udp->reserved = htonl (0);
2050   udp->sender = *plugin->env->my_identity;
2051
2052   /* We do not update the session time out here!  Otherwise this
2053    * session will not timeout since we send keep alive before session
2054    * can timeout.
2055    *
2056    * For UDP we update session timeout only on receive, this will
2057    * cover keep alives, since remote peer will reply with keep alive
2058    * responses!
2059    */
2060   if (udpmlen <= UDP_MTU)
2061   {
2062     /* unfragmented message */
2063     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
2064     udpw->session = s;
2065     udpw->msg_buf = (char *) &udpw[1];
2066     udpw->msg_size = udpmlen; /* message size with UDP overhead */
2067     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2068     udpw->start_time = GNUNET_TIME_absolute_get ();
2069     udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
2070     udpw->transmission_time = s->last_transmit_time;
2071     s->last_transmit_time
2072       = GNUNET_TIME_absolute_add (s->last_transmit_time,
2073                                   s->flow_delay_from_other_peer);
2074     udpw->cont = cont;
2075     udpw->cont_cls = cont_cls;
2076     udpw->frag_ctx = NULL;
2077     udpw->qc = &qc_message_sent;
2078     udpw->qc_cls = plugin;
2079     memcpy (udpw->msg_buf,
2080             udp,
2081             sizeof (struct UDPMessage));
2082     memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
2083             msgbuf,
2084             msgbuf_size);
2085     enqueue (plugin,
2086              udpw);
2087     GNUNET_STATISTICS_update (plugin->env->stats,
2088                               "# UDP, unfragmented messages queued total",
2089                               1,
2090                               GNUNET_NO);
2091     GNUNET_STATISTICS_update (plugin->env->stats,
2092                               "# UDP, unfragmented bytes payload queued total",
2093                               msgbuf_size,
2094                               GNUNET_NO);
2095   }
2096   else
2097   {
2098     /* fragmented message */
2099     if (NULL != s->frag_ctx)
2100       return GNUNET_SYSERR;
2101     memcpy (&udp[1],
2102             msgbuf,
2103             msgbuf_size);
2104     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2105     frag_ctx->plugin = plugin;
2106     frag_ctx->session = s;
2107     frag_ctx->cont = cont;
2108     frag_ctx->cont_cls = cont_cls;
2109     frag_ctx->start_time = GNUNET_TIME_absolute_get ();
2110     frag_ctx->next_frag_time = s->last_transmit_time;
2111     frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
2112     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
2113     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
2114     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2115                                                      UDP_MTU,
2116                                                      &plugin->tracker,
2117                                                      s->last_expected_msg_delay,
2118                                                      s->last_expected_ack_delay,
2119                                                      &udp->header,
2120                                                      &enqueue_fragment,
2121                                                      frag_ctx);
2122     s->frag_ctx = frag_ctx;
2123     s->last_transmit_time = frag_ctx->next_frag_time;
2124     GNUNET_STATISTICS_update (plugin->env->stats,
2125                               "# UDP, fragmented messages active",
2126                               1,
2127                               GNUNET_NO);
2128     GNUNET_STATISTICS_update (plugin->env->stats,
2129                               "# UDP, fragmented messages, total",
2130                               1,
2131                               GNUNET_NO);
2132     GNUNET_STATISTICS_update (plugin->env->stats,
2133                               "# UDP, fragmented bytes (payload)",
2134                               frag_ctx->payload_size,
2135                               GNUNET_NO);
2136   }
2137   notify_session_monitor (s->plugin,
2138                           s,
2139                           GNUNET_TRANSPORT_SS_UPDATE);
2140   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2141     schedule_select_v4 (plugin);
2142   else
2143     schedule_select_v6 (plugin);
2144   return udpmlen;
2145 }
2146
2147
2148 /* ********************** Receiving ********************** */
2149
2150
2151 /**
2152  * Closure for #find_receive_context().
2153  */
2154 struct FindReceiveContext
2155 {
2156   /**
2157    * Where to store the result.
2158    */
2159   struct DefragContext *rc;
2160
2161   /**
2162    * Session associated with this context.
2163    */
2164   struct GNUNET_ATS_Session *session;
2165
2166   /**
2167    * Address to find.
2168    */
2169   const union UdpAddress *udp_addr;
2170
2171   /**
2172    * Number of bytes in @e udp_addr.
2173    */
2174   size_t udp_addr_len;
2175
2176 };
2177
2178
2179 /**
2180  * Scan the heap for a receive context with the given address.
2181  *
2182  * @param cls the `struct FindReceiveContext`
2183  * @param node internal node of the heap
2184  * @param element value stored at the node (a `struct ReceiveContext`)
2185  * @param cost cost associated with the node
2186  * @return #GNUNET_YES if we should continue to iterate,
2187  *         #GNUNET_NO if not.
2188  */
2189 static int
2190 find_receive_context (void *cls,
2191                       struct GNUNET_CONTAINER_HeapNode *node,
2192                       void *element,
2193                       GNUNET_CONTAINER_HeapCostType cost)
2194 {
2195   struct FindReceiveContext *frc = cls;
2196   struct DefragContext *e = element;
2197
2198   if ( (frc->udp_addr_len == e->udp_addr_len) &&
2199        (0 == memcmp (frc->udp_addr,
2200                      e->udp_addr,
2201                      frc->udp_addr_len)) )
2202   {
2203     frc->rc = e;
2204     return GNUNET_NO;
2205   }
2206   return GNUNET_YES;
2207 }
2208
2209
2210 /**
2211  * Functions with this signature are called whenever we need to close
2212  * a session due to a disconnect or failure to establish a connection.
2213  *
2214  * @param cls closure with the `struct Plugin`
2215  * @param s session to close down
2216  * @return #GNUNET_OK on success
2217  */
2218 static int
2219 udp_disconnect_session (void *cls,
2220                         struct GNUNET_ATS_Session *s)
2221 {
2222   struct Plugin *plugin = cls;
2223   struct UDP_MessageWrapper *udpw;
2224   struct UDP_MessageWrapper *next;
2225   struct FindReceiveContext frc;
2226
2227   GNUNET_assert (GNUNET_YES != s->in_destroy);
2228   LOG (GNUNET_ERROR_TYPE_DEBUG,
2229        "Session %p to peer `%s' at address %s ended\n",
2230        s,
2231        GNUNET_i2s (&s->target),
2232        udp_address_to_string (plugin,
2233                               s->address->address,
2234                               s->address->address_length));
2235   if (NULL != s->timeout_task)
2236   {
2237     GNUNET_SCHEDULER_cancel (s->timeout_task);
2238     s->timeout_task = NULL;
2239   }
2240   if (NULL != s->frag_ctx)
2241   {
2242     /* Remove fragmented message due to disconnect */
2243     fragmented_message_done (s->frag_ctx,
2244                              GNUNET_SYSERR);
2245   }
2246   GNUNET_assert (GNUNET_YES ==
2247                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
2248                                                        &s->target,
2249                                                        s));
2250   frc.rc = NULL;
2251   frc.udp_addr = s->address->address;
2252   frc.udp_addr_len = s->address->address_length;
2253   /* Lookup existing receive context for this address */
2254   if (NULL != plugin->defrag_ctxs)
2255   {
2256     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2257                                    &find_receive_context,
2258                                    &frc);
2259     if (NULL != frc.rc)
2260     {
2261       struct DefragContext *d_ctx = frc.rc;
2262
2263       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
2264       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2265       GNUNET_free (d_ctx);
2266     }
2267   }
2268   s->in_destroy = GNUNET_YES;
2269   next = plugin->ipv4_queue_head;
2270   while (NULL != (udpw = next))
2271   {
2272     next = udpw->next;
2273     if (udpw->session == s)
2274     {
2275       dequeue (plugin,
2276                udpw);
2277       udpw->qc (udpw->qc_cls,
2278                 udpw,
2279                 GNUNET_SYSERR);
2280       GNUNET_free (udpw);
2281     }
2282   }
2283   next = plugin->ipv6_queue_head;
2284   while (NULL != (udpw = next))
2285   {
2286     next = udpw->next;
2287     if (udpw->session == s)
2288     {
2289       dequeue (plugin,
2290                udpw);
2291       udpw->qc (udpw->qc_cls,
2292                 udpw,
2293                 GNUNET_SYSERR);
2294       GNUNET_free (udpw);
2295     }
2296   }
2297   if ( (NULL != s->frag_ctx) &&
2298        (NULL != s->frag_ctx->cont) )
2299   {
2300     /* The 'frag_ctx' itself will be freed in #free_session() a bit
2301        later, as it might be in use right now */
2302     LOG (GNUNET_ERROR_TYPE_DEBUG,
2303          "Calling continuation for fragemented message to `%s' with result SYSERR\n",
2304          GNUNET_i2s (&s->target));
2305     s->frag_ctx->cont (s->frag_ctx->cont_cls,
2306                        &s->target,
2307                        GNUNET_SYSERR,
2308                        s->frag_ctx->payload_size,
2309                        s->frag_ctx->on_wire_size);
2310   }
2311   notify_session_monitor (s->plugin,
2312                           s,
2313                           GNUNET_TRANSPORT_SS_DONE);
2314   plugin->env->session_end (plugin->env->cls,
2315                             s->address,
2316                             s);
2317   GNUNET_STATISTICS_set (plugin->env->stats,
2318                          "# UDP sessions active",
2319                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2320                          GNUNET_NO);
2321   if (0 == s->rc)
2322     free_session (s);
2323   return GNUNET_OK;
2324 }
2325
2326
2327 /**
2328  * Handle an ACK message.
2329  *
2330  * @param plugin the UDP plugin
2331  * @param msg the (presumed) UDP ACK message
2332  * @param udp_addr sender address
2333  * @param udp_addr_len number of bytes in @a udp_addr
2334  */
2335 static void
2336 read_process_ack (struct Plugin *plugin,
2337                   const struct GNUNET_MessageHeader *msg,
2338                   const union UdpAddress *udp_addr,
2339                   socklen_t udp_addr_len)
2340 {
2341   const struct GNUNET_MessageHeader *ack;
2342   const struct UDP_ACK_Message *udp_ack;
2343   struct GNUNET_HELLO_Address *address;
2344   struct GNUNET_ATS_Session *s;
2345   struct GNUNET_TIME_Relative flow_delay;
2346
2347   if (ntohs (msg->size)
2348       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2349   {
2350     GNUNET_break_op (0);
2351     return;
2352   }
2353   udp_ack = (const struct UDP_ACK_Message *) msg;
2354   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2355   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2356   {
2357     GNUNET_break_op(0);
2358     return;
2359   }
2360   address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
2361                                            PLUGIN_NAME,
2362                                            udp_addr,
2363                                            udp_addr_len,
2364                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2365   s = udp_plugin_lookup_session (plugin,
2366                                  address);
2367   if (NULL == s)
2368   {
2369     LOG (GNUNET_ERROR_TYPE_WARNING,
2370          "UDP session of address %s for ACK not found\n",
2371          udp_address_to_string (plugin,
2372                                 address->address,
2373                                 address->address_length));
2374     GNUNET_HELLO_address_free (address);
2375     return;
2376   }
2377   if (NULL == s->frag_ctx)
2378   {
2379     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2380          "Fragmentation context of address %s for ACK (%s) not found\n",
2381          udp_address_to_string (plugin,
2382                                 address->address,
2383                                 address->address_length),
2384          GNUNET_FRAGMENT_print_ack (ack));
2385     GNUNET_HELLO_address_free (address);
2386     return;
2387   }
2388   GNUNET_HELLO_address_free (address);
2389
2390   if (UINT32_MAX == ntohl (udp_ack->delay))
2391   {
2392     /* Other peer asked for us to terminate the session */
2393     udp_disconnect_session (plugin,
2394                             s);
2395     return;
2396   }
2397   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2398   if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
2399     LOG (GNUNET_ERROR_TYPE_WARNING,
2400          "We received a sending delay of %s for %s\n",
2401          GNUNET_STRINGS_relative_time_to_string (flow_delay,
2402                                                  GNUNET_YES),
2403          GNUNET_i2s (&udp_ack->sender));
2404   else
2405     LOG (GNUNET_ERROR_TYPE_DEBUG,
2406          "We received a sending delay of %s for %s\n",
2407          GNUNET_STRINGS_relative_time_to_string (flow_delay,
2408                                                  GNUNET_YES),
2409          GNUNET_i2s (&udp_ack->sender));
2410   /* Flow delay is for the reassembled packet, however, our delay
2411      is per packet, so we need to adjust: */
2412   flow_delay = GNUNET_TIME_relative_divide (flow_delay,
2413                                             1 + (s->frag_ctx->payload_size /
2414                                                  UDP_MTU));
2415   s->flow_delay_from_other_peer = flow_delay;
2416
2417
2418   if (GNUNET_OK !=
2419       GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2420                                    ack))
2421   {
2422     LOG (GNUNET_ERROR_TYPE_DEBUG,
2423          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2424          (unsigned int) ntohs (msg->size),
2425          GNUNET_i2s (&udp_ack->sender),
2426          udp_address_to_string (plugin,
2427                                 udp_addr,
2428                                 udp_addr_len));
2429     /* Expect more ACKs to arrive */
2430     return;
2431   }
2432
2433   LOG (GNUNET_ERROR_TYPE_DEBUG,
2434        "Message from %s at %s full ACK'ed\n",
2435        GNUNET_i2s (&udp_ack->sender),
2436        udp_address_to_string (plugin,
2437                               udp_addr,
2438                               udp_addr_len));
2439
2440   /* Remove fragmented message after successful sending */
2441   fragmented_message_done (s->frag_ctx,
2442                            GNUNET_OK);
2443 }
2444
2445
2446 /**
2447  * Message tokenizer has broken up an incomming message. Pass it on
2448  * to the service.
2449  *
2450  * @param cls the `struct Plugin *`
2451  * @param client the `struct GNUNET_ATS_Session *`
2452  * @param hdr the actual message
2453  * @return #GNUNET_OK (always)
2454  */
2455 static int
2456 process_inbound_tokenized_messages (void *cls,
2457                                     void *client,
2458                                     const struct GNUNET_MessageHeader *hdr)
2459 {
2460   struct Plugin *plugin = cls;
2461   struct GNUNET_ATS_Session *session = client;
2462
2463   if (GNUNET_YES == session->in_destroy)
2464     return GNUNET_OK;
2465   reschedule_session_timeout (session);
2466   session->flow_delay_for_other_peer
2467     = plugin->env->receive (plugin->env->cls,
2468                             session->address,
2469                             session,
2470                             hdr);
2471   return GNUNET_OK;
2472 }
2473
2474
2475 /**
2476  * Destroy a session, plugin is being unloaded.
2477  *
2478  * @param cls the `struct Plugin`
2479  * @param key hash of public key of target peer
2480  * @param value a `struct PeerSession *` to clean up
2481  * @return #GNUNET_OK (continue to iterate)
2482  */
2483 static int
2484 disconnect_and_free_it (void *cls,
2485                         const struct GNUNET_PeerIdentity *key,
2486                         void *value)
2487 {
2488   struct Plugin *plugin = cls;
2489
2490   udp_disconnect_session (plugin,
2491                           value);
2492   return GNUNET_OK;
2493 }
2494
2495
2496 /**
2497  * Disconnect from a remote node.  Clean up session if we have one for
2498  * this peer.
2499  *
2500  * @param cls closure for this call (should be handle to Plugin)
2501  * @param target the peeridentity of the peer to disconnect
2502  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
2503  */
2504 static void
2505 udp_disconnect (void *cls,
2506                 const struct GNUNET_PeerIdentity *target)
2507 {
2508   struct Plugin *plugin = cls;
2509
2510   LOG (GNUNET_ERROR_TYPE_DEBUG,
2511        "Disconnecting from peer `%s'\n",
2512        GNUNET_i2s (target));
2513   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
2514                                               target,
2515                                               &disconnect_and_free_it,
2516                                               plugin);
2517 }
2518
2519
2520 /**
2521  * Session was idle, so disconnect it.
2522  *
2523  * @param cls the `struct GNUNET_ATS_Session` to time out
2524  * @param tc scheduler context
2525  */
2526 static void
2527 session_timeout (void *cls,
2528                  const struct GNUNET_SCHEDULER_TaskContext *tc)
2529 {
2530   struct GNUNET_ATS_Session *s = cls;
2531   struct Plugin *plugin = s->plugin;
2532   struct GNUNET_TIME_Relative left;
2533
2534   s->timeout_task = NULL;
2535   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
2536   if (left.rel_value_us > 0)
2537   {
2538     /* not actually our turn yet, but let's at least update
2539        the monitor, it may think we're about to die ... */
2540     notify_session_monitor (s->plugin,
2541                             s,
2542                             GNUNET_TRANSPORT_SS_UPDATE);
2543     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
2544                                                     &session_timeout,
2545                                                     s);
2546     return;
2547   }
2548   LOG (GNUNET_ERROR_TYPE_DEBUG,
2549        "Session %p was idle for %s, disconnecting\n",
2550        s,
2551        GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
2552                                                GNUNET_YES));
2553   /* call session destroy function */
2554   udp_disconnect_session (plugin,
2555                           s);
2556 }
2557
2558
2559 /**
2560  * Allocate a new session for the given endpoint address.
2561  * Note that this function does not inform the service
2562  * of the new session, this is the responsibility of the
2563  * caller (if needed).
2564  *
2565  * @param cls the `struct Plugin`
2566  * @param address address of the other peer to use
2567  * @param network_type network type the address belongs to
2568  * @return NULL on error, otherwise session handle
2569  */
2570 static struct GNUNET_ATS_Session *
2571 udp_plugin_create_session (void *cls,
2572                            const struct GNUNET_HELLO_Address *address,
2573                            enum GNUNET_ATS_Network_Type network_type)
2574 {
2575   struct Plugin *plugin = cls;
2576   struct GNUNET_ATS_Session *s;
2577
2578   s = GNUNET_new (struct GNUNET_ATS_Session);
2579   s->plugin = plugin;
2580   s->address = GNUNET_HELLO_address_copy (address);
2581   s->target = address->peer;
2582   s->last_transmit_time = GNUNET_TIME_absolute_get ();
2583   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2584                                                               250);
2585   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
2586   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
2587   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
2588   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
2589   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
2590                                                   &session_timeout,
2591                                                   s);
2592   s->scope = network_type;
2593
2594   LOG (GNUNET_ERROR_TYPE_DEBUG,
2595        "Creating new session %p for peer `%s' address `%s'\n",
2596        s,
2597        GNUNET_i2s (&address->peer),
2598        udp_address_to_string (plugin,
2599                               address->address,
2600                               address->address_length));
2601   GNUNET_assert (GNUNET_OK ==
2602                  GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
2603                                                     &s->target,
2604                                                     s,
2605                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2606   GNUNET_STATISTICS_set (plugin->env->stats,
2607                          "# UDP sessions active",
2608                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2609                          GNUNET_NO);
2610   notify_session_monitor (plugin,
2611                           s,
2612                           GNUNET_TRANSPORT_SS_INIT);
2613   return s;
2614 }
2615
2616
2617 /**
2618  * Creates a new outbound session the transport service will use to
2619  * send data to the peer.
2620  *
2621  * @param cls the `struct Plugin *`
2622  * @param address the address
2623  * @return the session or NULL of max connections exceeded
2624  */
2625 static struct GNUNET_ATS_Session *
2626 udp_plugin_get_session (void *cls,
2627                         const struct GNUNET_HELLO_Address *address)
2628 {
2629   struct Plugin *plugin = cls;
2630   struct GNUNET_ATS_Session *s;
2631   enum GNUNET_ATS_Network_Type network_type = GNUNET_ATS_NET_UNSPECIFIED;
2632   const struct IPv4UdpAddress *udp_v4;
2633   const struct IPv6UdpAddress *udp_v6;
2634
2635   if (NULL == address)
2636   {
2637     GNUNET_break (0);
2638     return NULL;
2639   }
2640   if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
2641        (address->address_length != sizeof(struct IPv6UdpAddress)) )
2642   {
2643     GNUNET_break_op (0);
2644     return NULL;
2645   }
2646   if (NULL != (s = udp_plugin_lookup_session (cls,
2647                                               address)))
2648     return s;
2649
2650   /* need to create new session */
2651   if (sizeof (struct IPv4UdpAddress) == address->address_length)
2652   {
2653     struct sockaddr_in v4;
2654
2655     udp_v4 = (const struct IPv4UdpAddress *) address->address;
2656     memset (&v4, '\0', sizeof (v4));
2657     v4.sin_family = AF_INET;
2658 #if HAVE_SOCKADDR_IN_SIN_LEN
2659     v4.sin_len = sizeof (struct sockaddr_in);
2660 #endif
2661     v4.sin_port = udp_v4->u4_port;
2662     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
2663     network_type = plugin->env->get_address_type (plugin->env->cls,
2664                                                   (const struct sockaddr *) &v4,
2665                                                   sizeof (v4));
2666   }
2667   if (sizeof (struct IPv6UdpAddress) == address->address_length)
2668   {
2669     struct sockaddr_in6 v6;
2670
2671     udp_v6 = (const struct IPv6UdpAddress *) address->address;
2672     memset (&v6, '\0', sizeof (v6));
2673     v6.sin6_family = AF_INET6;
2674 #if HAVE_SOCKADDR_IN_SIN_LEN
2675     v6.sin6_len = sizeof (struct sockaddr_in6);
2676 #endif
2677     v6.sin6_port = udp_v6->u6_port;
2678     v6.sin6_addr = udp_v6->ipv6_addr;
2679     network_type = plugin->env->get_address_type (plugin->env->cls,
2680                                                   (const struct sockaddr *) &v6,
2681                                                   sizeof (v6));
2682   }
2683   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2684   return udp_plugin_create_session (cls,
2685                                     address,
2686                                     network_type);
2687 }
2688
2689
2690 /**
2691  * We've received a UDP Message.  Process it (pass contents to main service).
2692  *
2693  * @param plugin plugin context
2694  * @param msg the message
2695  * @param udp_addr sender address
2696  * @param udp_addr_len number of bytes in @a udp_addr
2697  * @param network_type network type the address belongs to
2698  */
2699 static void
2700 process_udp_message (struct Plugin *plugin,
2701                      const struct UDPMessage *msg,
2702                      const union UdpAddress *udp_addr,
2703                      size_t udp_addr_len,
2704                      enum GNUNET_ATS_Network_Type network_type)
2705 {
2706   struct GNUNET_ATS_Session *s;
2707   struct GNUNET_HELLO_Address *address;
2708
2709   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2710   if (0 != ntohl (msg->reserved))
2711   {
2712     GNUNET_break_op(0);
2713     return;
2714   }
2715   if (ntohs (msg->header.size)
2716       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2717   {
2718     GNUNET_break_op(0);
2719     return;
2720   }
2721
2722   address = GNUNET_HELLO_address_allocate (&msg->sender,
2723                                            PLUGIN_NAME,
2724                                            udp_addr,
2725                                            udp_addr_len,
2726                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2727   if (NULL ==
2728       (s = udp_plugin_lookup_session (plugin,
2729                                       address)))
2730   {
2731     s = udp_plugin_create_session (plugin,
2732                                    address,
2733                                    network_type);
2734     plugin->env->session_start (plugin->env->cls,
2735                                 address,
2736                                 s,
2737                                 s->scope);
2738     notify_session_monitor (plugin,
2739                             s,
2740                             GNUNET_TRANSPORT_SS_UP);
2741   }
2742   GNUNET_free (address);
2743
2744   s->rc++;
2745   GNUNET_SERVER_mst_receive (plugin->mst,
2746                              s,
2747                              (const char *) &msg[1],
2748                              ntohs (msg->header.size) - sizeof(struct UDPMessage),
2749                              GNUNET_YES,
2750                              GNUNET_NO);
2751   s->rc--;
2752   if ( (0 == s->rc) &&
2753        (GNUNET_YES == s->in_destroy) )
2754     free_session (s);
2755 }
2756
2757
2758 /**
2759  * Process a defragmented message.
2760  *
2761  * @param cls the `struct DefragContext *`
2762  * @param msg the message
2763  */
2764 static void
2765 fragment_msg_proc (void *cls,
2766                    const struct GNUNET_MessageHeader *msg)
2767 {
2768   struct DefragContext *dc = cls;
2769   const struct UDPMessage *um;
2770
2771   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2772   {
2773     GNUNET_break_op (0);
2774     return;
2775   }
2776   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2777   {
2778     GNUNET_break_op (0);
2779     return;
2780   }
2781   um = (const struct UDPMessage *) msg;
2782   dc->sender = um->sender;
2783   dc->have_sender = GNUNET_YES;
2784   process_udp_message (dc->plugin,
2785                        um,
2786                        dc->udp_addr,
2787                        dc->udp_addr_len,
2788                        dc->network_type);
2789 }
2790
2791
2792 /**
2793  * We finished sending an acknowledgement.  Update
2794  * statistics.
2795  *
2796  * @param cls the `struct Plugin`
2797  * @param udpw message queue entry of the ACK
2798  * @param result #GNUNET_OK if the transmission worked,
2799  *               #GNUNET_SYSERR if we failed to send the ACK
2800  */
2801 static void
2802 ack_message_sent (void *cls,
2803                   struct UDP_MessageWrapper *udpw,
2804                   int result)
2805 {
2806   struct Plugin *plugin = cls;
2807
2808   if (GNUNET_OK == result)
2809   {
2810     GNUNET_STATISTICS_update (plugin->env->stats,
2811                               "# UDP, ACK messages sent",
2812                               1,
2813                               GNUNET_NO);
2814   }
2815   else
2816   {
2817     GNUNET_STATISTICS_update (plugin->env->stats,
2818                               "# UDP, ACK transmissions failed",
2819                               1,
2820                               GNUNET_NO);
2821   }
2822 }
2823
2824
2825 /**
2826  * Transmit an acknowledgement.
2827  *
2828  * @param cls the `struct DefragContext *`
2829  * @param id message ID (unused)
2830  * @param msg ack to transmit
2831  */
2832 static void
2833 ack_proc (void *cls,
2834           uint32_t id,
2835           const struct GNUNET_MessageHeader *msg)
2836 {
2837   struct DefragContext *rc = cls;
2838   struct Plugin *plugin = rc->plugin;
2839   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2840   struct UDP_ACK_Message *udp_ack;
2841   uint32_t delay;
2842   struct UDP_MessageWrapper *udpw;
2843   struct GNUNET_ATS_Session *s;
2844   struct GNUNET_HELLO_Address *address;
2845
2846   if (GNUNET_NO == rc->have_sender)
2847   {
2848     /* tried to defragment but never succeeded, hence will not ACK */
2849     /* This can happen if we just lost msgs */
2850     GNUNET_STATISTICS_update (plugin->env->stats,
2851                               "# UDP, fragments discarded without ACK",
2852                               1,
2853                               GNUNET_NO);
2854     return;
2855   }
2856   address = GNUNET_HELLO_address_allocate (&rc->sender,
2857                                            PLUGIN_NAME,
2858                                            rc->udp_addr,
2859                                            rc->udp_addr_len,
2860                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2861   s = udp_plugin_lookup_session (plugin,
2862                                  address);
2863   GNUNET_HELLO_address_free (address);
2864   if (NULL == s)
2865   {
2866     LOG (GNUNET_ERROR_TYPE_ERROR,
2867          "Trying to transmit ACK to peer `%s' but no session found!\n",
2868          udp_address_to_string (plugin,
2869                                 rc->udp_addr,
2870                                 rc->udp_addr_len));
2871     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2872     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2873     GNUNET_free (rc);
2874     GNUNET_STATISTICS_update (plugin->env->stats,
2875                               "# UDP, ACK transmissions failed",
2876                               1,
2877                               GNUNET_NO);
2878     return;
2879   }
2880   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us ==
2881       s->flow_delay_for_other_peer.rel_value_us)
2882     delay = UINT32_MAX;
2883   else if (s->flow_delay_for_other_peer.rel_value_us < UINT32_MAX)
2884     delay = s->flow_delay_for_other_peer.rel_value_us;
2885   else
2886     delay = UINT32_MAX - 1; /* largest value we can communicate */
2887   LOG (GNUNET_ERROR_TYPE_DEBUG,
2888        "Sending ACK to `%s' including delay of %s\n",
2889        udp_address_to_string (plugin,
2890                               rc->udp_addr,
2891                               rc->udp_addr_len),
2892        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2893                                                GNUNET_YES));
2894   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2895   udpw->msg_size = msize;
2896   udpw->payload_size = 0;
2897   udpw->session = s;
2898   udpw->start_time = GNUNET_TIME_absolute_get ();
2899   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2900   udpw->msg_buf = (char *) &udpw[1];
2901   udpw->qc = &ack_message_sent;
2902   udpw->qc_cls = plugin;
2903   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2904   udp_ack->header.size = htons ((uint16_t) msize);
2905   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2906   udp_ack->delay = htonl (delay);
2907   udp_ack->sender = *plugin->env->my_identity;
2908   memcpy (&udp_ack[1],
2909           msg,
2910           ntohs (msg->size));
2911   enqueue (plugin,
2912            udpw);
2913   notify_session_monitor (plugin,
2914                           s,
2915                           GNUNET_TRANSPORT_SS_UPDATE);
2916   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2917     schedule_select_v4 (plugin);
2918   else
2919     schedule_select_v6 (plugin);
2920 }
2921
2922
2923 /**
2924  * We received a fragment, process it.
2925  *
2926  * @param plugin our plugin
2927  * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2928  * @param udp_addr sender address
2929  * @param udp_addr_len number of bytes in @a udp_addr
2930  * @param network_type network type the address belongs to
2931  */
2932 static void
2933 read_process_fragment (struct Plugin *plugin,
2934                        const struct GNUNET_MessageHeader *msg,
2935                        const union UdpAddress *udp_addr,
2936                        size_t udp_addr_len,
2937                        enum GNUNET_ATS_Network_Type network_type)
2938 {
2939   struct DefragContext *d_ctx;
2940   struct GNUNET_TIME_Absolute now;
2941   struct FindReceiveContext frc;
2942
2943   frc.rc = NULL;
2944   frc.udp_addr = udp_addr;
2945   frc.udp_addr_len = udp_addr_len;
2946
2947   /* Lookup existing receive context for this address */
2948   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2949                                  &find_receive_context,
2950                                  &frc);
2951   now = GNUNET_TIME_absolute_get ();
2952   d_ctx = frc.rc;
2953
2954   if (NULL == d_ctx)
2955   {
2956     /* Create a new defragmentation context */
2957     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
2958     memcpy (&d_ctx[1],
2959             udp_addr,
2960             udp_addr_len);
2961     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
2962     d_ctx->udp_addr_len = udp_addr_len;
2963     d_ctx->network_type = network_type;
2964     d_ctx->plugin = plugin;
2965     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2966                                                       UDP_MTU,
2967                                                       UDP_MAX_MESSAGES_IN_DEFRAG,
2968                                                       d_ctx,
2969                                                       &fragment_msg_proc,
2970                                                       &ack_proc);
2971     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2972                                                  d_ctx,
2973                                                  (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2974     LOG (GNUNET_ERROR_TYPE_DEBUG,
2975          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2976          (unsigned int) ntohs (msg->size),
2977          udp_address_to_string (plugin,
2978                                 udp_addr,
2979                                 udp_addr_len));
2980   }
2981   else
2982   {
2983     LOG (GNUNET_ERROR_TYPE_DEBUG,
2984          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2985          (unsigned int) ntohs (msg->size),
2986          udp_address_to_string (plugin,
2987                                 udp_addr,
2988                                 udp_addr_len));
2989   }
2990
2991   if (GNUNET_OK ==
2992       GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag,
2993                                           msg))
2994   {
2995     /* keep this 'rc' from expiring */
2996     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
2997                                        d_ctx->hnode,
2998                                        (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2999   }
3000   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
3001       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
3002   {
3003     /* remove 'rc' that was inactive the longest */
3004     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
3005     GNUNET_assert (NULL != d_ctx);
3006     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3007     GNUNET_free (d_ctx);
3008     GNUNET_STATISTICS_update (plugin->env->stats,
3009                               "# UDP, Defragmentations aborted",
3010                               1,
3011                               GNUNET_NO);
3012   }
3013 }
3014
3015
3016 /**
3017  * Read and process a message from the given socket.
3018  *
3019  * @param plugin the overall plugin
3020  * @param rsock socket to read from
3021  */
3022 static void
3023 udp_select_read (struct Plugin *plugin,
3024                  struct GNUNET_NETWORK_Handle *rsock)
3025 {
3026   socklen_t fromlen;
3027   struct sockaddr_storage addr;
3028   char buf[65536] GNUNET_ALIGN;
3029   ssize_t size;
3030   const struct GNUNET_MessageHeader *msg;
3031   struct IPv4UdpAddress v4;
3032   struct IPv6UdpAddress v6;
3033   const struct sockaddr *sa;
3034   const struct sockaddr_in *sa4;
3035   const struct sockaddr_in6 *sa6;
3036   const union UdpAddress *int_addr;
3037   size_t int_addr_len;
3038   enum GNUNET_ATS_Network_Type network_type;
3039
3040   fromlen = sizeof (addr);
3041   memset (&addr,
3042           0,
3043           sizeof(addr));
3044   size = GNUNET_NETWORK_socket_recvfrom (rsock,
3045                                          buf,
3046                                          sizeof(buf),
3047                                          (struct sockaddr *) &addr,
3048                                          &fromlen);
3049   sa = (const struct sockaddr *) &addr;
3050 #if MINGW
3051   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
3052    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
3053    * on this socket has failed.
3054    * Quote from MSDN:
3055    *   WSAECONNRESET - The virtual circuit was reset by the remote side
3056    *   executing a hard or abortive close. The application should close
3057    *   the socket; it is no longer usable. On a UDP-datagram socket this
3058    *   error indicates a previous send operation resulted in an ICMP Port
3059    *   Unreachable message.
3060    */
3061   if ( (-1 == size) &&
3062        (ECONNRESET == errno) )
3063     return;
3064 #endif
3065   if (-1 == size)
3066   {
3067     LOG (GNUNET_ERROR_TYPE_DEBUG,
3068          "UDP failed to receive data: %s\n",
3069          STRERROR (errno));
3070     /* Connection failure or something. Not a protocol violation. */
3071     return;
3072   }
3073
3074   /* Check if this is a STUN packet */
3075   if (GNUNET_NAT_is_valid_stun_packet (plugin->nat,
3076                                        (uint8_t *)buf,
3077                                        size))
3078     return; /* was STUN, do not process further */
3079
3080   if (size < sizeof(struct GNUNET_MessageHeader))
3081   {
3082     LOG (GNUNET_ERROR_TYPE_WARNING,
3083          "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
3084          (unsigned int ) size,
3085          GNUNET_a2s (sa,
3086                      fromlen));
3087     /* _MAY_ be a connection failure (got partial message) */
3088     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
3089     GNUNET_break_op (0);
3090     return;
3091   }
3092
3093   msg = (const struct GNUNET_MessageHeader *) buf;
3094   LOG (GNUNET_ERROR_TYPE_DEBUG,
3095        "UDP received %u-byte message from `%s' type %u\n",
3096        (unsigned int) size,
3097        GNUNET_a2s (sa,
3098                    fromlen),
3099        ntohs (msg->type));
3100   if (size != ntohs (msg->size))
3101   {
3102     LOG (GNUNET_ERROR_TYPE_WARNING,
3103          "UDP malformed message header from %s\n",
3104          (unsigned int) size,
3105          GNUNET_a2s (sa,
3106                      fromlen));
3107     GNUNET_break_op (0);
3108     return;
3109   }
3110   GNUNET_STATISTICS_update (plugin->env->stats,
3111                             "# UDP, total bytes received",
3112                             size,
3113                             GNUNET_NO);
3114   network_type = plugin->env->get_address_type (plugin->env->cls,
3115                                                 sa,
3116                                                 fromlen);
3117   switch (sa->sa_family)
3118   {
3119   case AF_INET:
3120     sa4 = (const struct sockaddr_in *) &addr;
3121     v4.options = 0;
3122     v4.ipv4_addr = sa4->sin_addr.s_addr;
3123     v4.u4_port = sa4->sin_port;
3124     int_addr = (union UdpAddress *) &v4;
3125     int_addr_len = sizeof (v4);
3126     break;
3127   case AF_INET6:
3128     sa6 = (const struct sockaddr_in6 *) &addr;
3129     v6.options = 0;
3130     v6.ipv6_addr = sa6->sin6_addr;
3131     v6.u6_port = sa6->sin6_port;
3132     int_addr = (union UdpAddress *) &v6;
3133     int_addr_len = sizeof (v6);
3134     break;
3135   default:
3136     GNUNET_break (0);
3137     return;
3138   }
3139
3140   switch (ntohs (msg->type))
3141   {
3142   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
3143     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
3144       udp_broadcast_receive (plugin,
3145                              buf,
3146                              size,
3147                              int_addr,
3148                              int_addr_len,
3149                              network_type);
3150     return;
3151   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
3152     if (ntohs (msg->size) < sizeof(struct UDPMessage))
3153     {
3154       GNUNET_break_op(0);
3155       return;
3156     }
3157     process_udp_message (plugin,
3158                          (const struct UDPMessage *) msg,
3159                          int_addr,
3160                          int_addr_len,
3161                          network_type);
3162     return;
3163   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
3164     read_process_ack (plugin,
3165                       msg,
3166                       int_addr,
3167                       int_addr_len);
3168     return;
3169   case GNUNET_MESSAGE_TYPE_FRAGMENT:
3170     read_process_fragment (plugin,
3171                            msg,
3172                            int_addr,
3173                            int_addr_len,
3174                            network_type);
3175     return;
3176   default:
3177     GNUNET_break_op(0);
3178     return;
3179   }
3180 }
3181
3182
3183 /**
3184  * Removes messages from the transmission queue that have
3185  * timed out, and then selects a message that should be
3186  * transmitted next.
3187  *
3188  * @param plugin the UDP plugin
3189  * @param sock which socket should we process the queue for (v4 or v6)
3190  * @return message selected for transmission, or NULL for none
3191  */
3192 static struct UDP_MessageWrapper *
3193 remove_timeout_messages_and_select (struct Plugin *plugin,
3194                                     struct GNUNET_NETWORK_Handle *sock)
3195 {
3196   struct UDP_MessageWrapper *udpw;
3197   struct GNUNET_TIME_Relative remaining;
3198   struct GNUNET_ATS_Session *session;
3199   int removed;
3200
3201   removed = GNUNET_NO;
3202   udpw = (sock == plugin->sockv4)
3203     ? plugin->ipv4_queue_head
3204     : plugin->ipv6_queue_head;
3205   while (NULL != udpw)
3206   {
3207     session = udpw->session;
3208     /* Find messages with timeout */
3209     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
3210     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
3211     {
3212       /* Message timed out */
3213       removed = GNUNET_YES;
3214       dequeue (plugin,
3215                udpw);
3216       udpw->qc (udpw->qc_cls,
3217                 udpw,
3218                 GNUNET_SYSERR);
3219       GNUNET_free (udpw);
3220
3221       if (sock == plugin->sockv4)
3222       {
3223         udpw = plugin->ipv4_queue_head;
3224       }
3225       else if (sock == plugin->sockv6)
3226       {
3227         udpw = plugin->ipv6_queue_head;
3228       }
3229       else
3230       {
3231         GNUNET_break (0); /* should never happen */
3232         udpw = NULL;
3233       }
3234       GNUNET_STATISTICS_update (plugin->env->stats,
3235                                 "# messages discarded due to timeout",
3236                                 1,
3237                                 GNUNET_NO);
3238     }
3239     else
3240     {
3241       /* Message did not time out, check transmission time */
3242       remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
3243       if (0 == remaining.rel_value_us)
3244       {
3245         /* this message is not delayed */
3246         LOG (GNUNET_ERROR_TYPE_DEBUG,
3247              "Message for peer `%s' (%u bytes) is not delayed \n",
3248              GNUNET_i2s (&udpw->session->target),
3249              udpw->payload_size);
3250         break; /* Found message to send, break */
3251       }
3252       else
3253       {
3254         /* Message is delayed, try next */
3255         LOG (GNUNET_ERROR_TYPE_DEBUG,
3256              "Message for peer `%s' (%u bytes) is delayed for %s\n",
3257              GNUNET_i2s (&udpw->session->target),
3258              udpw->payload_size,
3259              GNUNET_STRINGS_relative_time_to_string (remaining,
3260                                                      GNUNET_YES));
3261         udpw = udpw->next;
3262       }
3263     }
3264   }
3265   if (GNUNET_YES == removed)
3266     notify_session_monitor (session->plugin,
3267                             session,
3268                             GNUNET_TRANSPORT_SS_UPDATE);
3269   return udpw;
3270 }
3271
3272
3273 /**
3274  * We failed to transmit a message via UDP. Generate
3275  * a descriptive error message.
3276  *
3277  * @param plugin our plugin
3278  * @param sa target address we were trying to reach
3279  * @param slen number of bytes in @a sa
3280  * @param error the errno value returned from the sendto() call
3281  */
3282 static void
3283 analyze_send_error (struct Plugin *plugin,
3284                     const struct sockaddr *sa,
3285                     socklen_t slen,
3286                     int error)
3287 {
3288   enum GNUNET_ATS_Network_Type type;
3289
3290   type = plugin->env->get_address_type (plugin->env->cls,
3291                                         sa,
3292                                         slen);
3293   if ( ( (GNUNET_ATS_NET_LAN == type) ||
3294          (GNUNET_ATS_NET_WAN == type) ) &&
3295        ( (ENETUNREACH == errno) ||
3296          (ENETDOWN == errno) ) )
3297   {
3298     if (slen == sizeof (struct sockaddr_in))
3299     {
3300       /* IPv4: "Network unreachable" or "Network down"
3301        *
3302        * This indicates we do not have connectivity
3303        */
3304       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3305            _("UDP could not transmit message to `%s': "
3306              "Network seems down, please check your network configuration\n"),
3307            GNUNET_a2s (sa,
3308                        slen));
3309     }
3310     if (slen == sizeof (struct sockaddr_in6))
3311     {
3312       /* IPv6: "Network unreachable" or "Network down"
3313        *
3314        * This indicates that this system is IPv6 enabled, but does not
3315        * have a valid global IPv6 address assigned or we do not have
3316        * connectivity
3317        */
3318       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3319            _("UDP could not transmit IPv6 message! "
3320              "Please check your network configuration and disable IPv6 if your "
3321              "connection does not have a global IPv6 address\n"));
3322     }
3323   }
3324   else
3325   {
3326     LOG (GNUNET_ERROR_TYPE_WARNING,
3327          "UDP could not transmit message to `%s': `%s'\n",
3328          GNUNET_a2s (sa,
3329                      slen),
3330          STRERROR (error));
3331   }
3332 }
3333
3334
3335 /**
3336  * It is time to try to transmit a UDP message.  Select one
3337  * and send.
3338  *
3339  * @param plugin the plugin
3340  * @param sock which socket (v4/v6) to send on
3341  */
3342 static void
3343 udp_select_send (struct Plugin *plugin,
3344                  struct GNUNET_NETWORK_Handle *sock)
3345 {
3346   ssize_t sent;
3347   socklen_t slen;
3348   const struct sockaddr *a;
3349   const struct IPv4UdpAddress *u4;
3350   struct sockaddr_in a4;
3351   const struct IPv6UdpAddress *u6;
3352   struct sockaddr_in6 a6;
3353   struct UDP_MessageWrapper *udpw;
3354
3355   /* Find message(s) to send */
3356   while (NULL != (udpw = remove_timeout_messages_and_select (plugin,
3357                                                              sock)))
3358   {
3359     if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
3360     {
3361       u4 = udpw->session->address->address;
3362       memset (&a4,
3363               0,
3364               sizeof(a4));
3365       a4.sin_family = AF_INET;
3366 #if HAVE_SOCKADDR_IN_SIN_LEN
3367       a4.sin_len = sizeof (a4);
3368 #endif
3369       a4.sin_port = u4->u4_port;
3370       a4.sin_addr.s_addr = u4->ipv4_addr;
3371       a = (const struct sockaddr *) &a4;
3372       slen = sizeof (a4);
3373     }
3374     else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
3375     {
3376       u6 = udpw->session->address->address;
3377       memset (&a6,
3378               0,
3379               sizeof(a6));
3380       a6.sin6_family = AF_INET6;
3381 #if HAVE_SOCKADDR_IN_SIN_LEN
3382       a6.sin6_len = sizeof (a6);
3383 #endif
3384       a6.sin6_port = u6->u6_port;
3385       a6.sin6_addr = u6->ipv6_addr;
3386       a = (const struct sockaddr *) &a6;
3387       slen = sizeof (a6);
3388     }
3389     else
3390     {
3391       GNUNET_break (0);
3392       dequeue (plugin,
3393                udpw);
3394       udpw->qc (udpw->qc_cls,
3395                 udpw,
3396                 GNUNET_SYSERR);
3397       notify_session_monitor (plugin,
3398                               udpw->session,
3399                               GNUNET_TRANSPORT_SS_UPDATE);
3400       GNUNET_free (udpw);
3401       continue;
3402     }
3403     sent = GNUNET_NETWORK_socket_sendto (sock,
3404                                          udpw->msg_buf,
3405                                          udpw->msg_size,
3406                                          a,
3407                                          slen);
3408     udpw->session->last_transmit_time
3409       = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
3410                                   udpw->session->last_transmit_time);
3411     dequeue (plugin,
3412              udpw);
3413     if (GNUNET_SYSERR == sent)
3414     {
3415       /* Failure */
3416       analyze_send_error (plugin,
3417                           a,
3418                           slen,
3419                           errno);
3420       udpw->qc (udpw->qc_cls,
3421                 udpw,
3422                 GNUNET_SYSERR);
3423       GNUNET_STATISTICS_update (plugin->env->stats,
3424                                 "# UDP, total, bytes, sent, failure",
3425                                 sent,
3426                                 GNUNET_NO);
3427       GNUNET_STATISTICS_update (plugin->env->stats,
3428                                 "# UDP, total, messages, sent, failure",
3429                                 1,
3430                                 GNUNET_NO);
3431     }
3432     else
3433     {
3434       /* Success */
3435       LOG (GNUNET_ERROR_TYPE_DEBUG,
3436            "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
3437            (unsigned int) (udpw->msg_size),
3438            GNUNET_i2s (&udpw->session->target),
3439            GNUNET_a2s (a,
3440                        slen),
3441            (int ) sent,
3442            (sent < 0) ? STRERROR (errno) : "ok");
3443       GNUNET_STATISTICS_update (plugin->env->stats,
3444                                 "# UDP, total, bytes, sent, success",
3445                                 sent,
3446                                 GNUNET_NO);
3447       GNUNET_STATISTICS_update (plugin->env->stats,
3448                                 "# UDP, total, messages, sent, success",
3449                                 1,
3450                                 GNUNET_NO);
3451       if (NULL != udpw->frag_ctx)
3452         udpw->frag_ctx->on_wire_size += udpw->msg_size;
3453       udpw->qc (udpw->qc_cls,
3454                 udpw,
3455                 GNUNET_OK);
3456     }
3457     notify_session_monitor (plugin,
3458                             udpw->session,
3459                             GNUNET_TRANSPORT_SS_UPDATE);
3460     GNUNET_free (udpw);
3461   }
3462 }
3463
3464
3465 /* ***************** Event loop (part 2) *************** */
3466
3467
3468 /**
3469  * We have been notified that our readset has something to read.  We don't
3470  * know which socket needs to be read, so we have to check each one
3471  * Then reschedule this function to be called again once more is available.
3472  *
3473  * @param cls the plugin handle
3474  * @param tc the scheduling context
3475  */
3476 static void
3477 udp_plugin_select_v4 (void *cls,
3478                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3479 {
3480   struct Plugin *plugin = cls;
3481
3482   plugin->select_task_v4 = NULL;
3483   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3484     return;
3485   if (NULL == plugin->sockv4)
3486     return;
3487   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3488       (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3489                                    plugin->sockv4)))
3490     udp_select_read (plugin,
3491                      plugin->sockv4);
3492   udp_select_send (plugin,
3493                    plugin->sockv4);
3494   schedule_select_v4 (plugin);
3495 }
3496
3497
3498 /**
3499  * We have been notified that our readset has something to read.  We don't
3500  * know which socket needs to be read, so we have to check each one
3501  * Then reschedule this function to be called again once more is available.
3502  *
3503  * @param cls the plugin handle
3504  * @param tc the scheduling context
3505  */
3506 static void
3507 udp_plugin_select_v6 (void *cls,
3508                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3509 {
3510   struct Plugin *plugin = cls;
3511
3512   plugin->select_task_v6 = NULL;
3513   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3514     return;
3515   if (NULL == plugin->sockv6)
3516     return;
3517   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3518        (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3519                                     plugin->sockv6)) )
3520     udp_select_read (plugin,
3521                      plugin->sockv6);
3522
3523   udp_select_send (plugin,
3524                    plugin->sockv6);
3525   schedule_select_v6 (plugin);
3526 }
3527
3528
3529 /* ******************* Initialization *************** */
3530
3531
3532 /**
3533  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3534  *
3535  * @param plugin the plugin to initialize
3536  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3537  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3538  * @return number of sockets that were successfully bound
3539  */
3540 static int
3541 setup_sockets (struct Plugin *plugin,
3542                const struct sockaddr_in6 *bind_v6,
3543                const struct sockaddr_in *bind_v4)
3544 {
3545   int tries;
3546   int sockets_created = 0;
3547   struct sockaddr_in6 server_addrv6;
3548   struct sockaddr_in server_addrv4;
3549   const struct sockaddr *server_addr;
3550   const struct sockaddr *addrs[2];
3551   socklen_t addrlens[2];
3552   socklen_t addrlen;
3553   int eno;
3554
3555   /* Create IPv6 socket */
3556   eno = EINVAL;
3557   if (GNUNET_YES == plugin->enable_ipv6)
3558   {
3559     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6,
3560                                                    SOCK_DGRAM,
3561                                                    0);
3562     if (NULL == plugin->sockv6)
3563     {
3564       LOG (GNUNET_ERROR_TYPE_INFO,
3565            _("Disabling IPv6 since it is not supported on this system!\n"));
3566       plugin->enable_ipv6 = GNUNET_NO;
3567     }
3568     else
3569     {
3570       memset (&server_addrv6,
3571               0,
3572               sizeof(struct sockaddr_in6));
3573 #if HAVE_SOCKADDR_IN_SIN_LEN
3574       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3575 #endif
3576       server_addrv6.sin6_family = AF_INET6;
3577       if (NULL != bind_v6)
3578         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3579       else
3580         server_addrv6.sin6_addr = in6addr_any;
3581
3582       if (0 == plugin->port) /* autodetect */
3583         server_addrv6.sin6_port
3584           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3585                                              33537)
3586                    + 32000);
3587       else
3588         server_addrv6.sin6_port = htons (plugin->port);
3589       addrlen = sizeof (struct sockaddr_in6);
3590       server_addr = (const struct sockaddr *) &server_addrv6;
3591
3592       tries = 0;
3593       while (tries < 10)
3594       {
3595         LOG(GNUNET_ERROR_TYPE_DEBUG,
3596             "Binding to IPv6 `%s'\n",
3597             GNUNET_a2s (server_addr,
3598                         addrlen));
3599         /* binding */
3600         if (GNUNET_OK ==
3601             GNUNET_NETWORK_socket_bind (plugin->sockv6,
3602                                         server_addr,
3603                                         addrlen))
3604           break;
3605         eno = errno;
3606         if (0 != plugin->port)
3607         {
3608           tries = 10; /* fail immediately */
3609           break; /* bind failed on specific port */
3610         }
3611         /* autodetect */
3612         server_addrv6.sin6_port
3613           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3614                                              33537)
3615                    + 32000);
3616         tries++;
3617       }
3618       if (tries >= 10)
3619       {
3620         GNUNET_NETWORK_socket_close (plugin->sockv6);
3621         plugin->enable_ipv6 = GNUNET_NO;
3622         plugin->sockv6 = NULL;
3623       }
3624       else
3625       {
3626         plugin->port = ntohs (server_addrv6.sin6_port);
3627       }
3628       if (NULL != plugin->sockv6)
3629       {
3630         LOG (GNUNET_ERROR_TYPE_DEBUG,
3631              "IPv6 UDP socket created listinging at %s\n",
3632              GNUNET_a2s (server_addr,
3633                          addrlen));
3634         addrs[sockets_created] = server_addr;
3635         addrlens[sockets_created] = addrlen;
3636         sockets_created++;
3637       }
3638       else
3639       {
3640         LOG (GNUNET_ERROR_TYPE_WARNING,
3641              _("Failed to bind UDP socket to %s: %s\n"),
3642              GNUNET_a2s (server_addr,
3643                          addrlen),
3644              STRERROR (eno));
3645       }
3646     }
3647   }
3648
3649   /* Create IPv4 socket */
3650   eno = EINVAL;
3651   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET,
3652                                                  SOCK_DGRAM,
3653                                                  0);
3654   if (NULL == plugin->sockv4)
3655   {
3656     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3657                          "socket");
3658     LOG (GNUNET_ERROR_TYPE_INFO,
3659          _("Disabling IPv4 since it is not supported on this system!\n"));
3660     plugin->enable_ipv4 = GNUNET_NO;
3661   }
3662   else
3663   {
3664     memset (&server_addrv4,
3665             0,
3666             sizeof(struct sockaddr_in));
3667 #if HAVE_SOCKADDR_IN_SIN_LEN
3668     server_addrv4.sin_len = sizeof (struct sockaddr_in);
3669 #endif
3670     server_addrv4.sin_family = AF_INET;
3671     if (NULL != bind_v4)
3672       server_addrv4.sin_addr = bind_v4->sin_addr;
3673     else
3674       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3675
3676     if (0 == plugin->port)
3677       /* autodetect */
3678       server_addrv4.sin_port
3679         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3680                                            33537)
3681                  + 32000);
3682     else
3683       server_addrv4.sin_port = htons (plugin->port);
3684
3685     addrlen = sizeof (struct sockaddr_in);
3686     server_addr = (const struct sockaddr *) &server_addrv4;
3687
3688     tries = 0;
3689     while (tries < 10)
3690     {
3691       LOG (GNUNET_ERROR_TYPE_DEBUG,
3692            "Binding to IPv4 `%s'\n",
3693            GNUNET_a2s (server_addr,
3694                        addrlen));
3695
3696       /* binding */
3697       if (GNUNET_OK ==
3698           GNUNET_NETWORK_socket_bind (plugin->sockv4,
3699                                       server_addr,
3700                                       addrlen))
3701         break;
3702       eno = errno;
3703       if (0 != plugin->port)
3704       {
3705         tries = 10; /* fail */
3706         break; /* bind failed on specific port */
3707       }
3708
3709       /* autodetect */
3710       server_addrv4.sin_port
3711         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3712                                            33537)
3713                  + 32000);
3714       tries++;
3715     }
3716     if (tries >= 10)
3717     {
3718       GNUNET_NETWORK_socket_close (plugin->sockv4);
3719       plugin->enable_ipv4 = GNUNET_NO;
3720       plugin->sockv4 = NULL;
3721     }
3722     else
3723     {
3724       plugin->port = ntohs (server_addrv4.sin_port);
3725     }
3726
3727     if (NULL != plugin->sockv4)
3728     {
3729       LOG (GNUNET_ERROR_TYPE_DEBUG,
3730            "IPv4 socket created on port %s\n",
3731            GNUNET_a2s (server_addr,
3732                        addrlen));
3733       addrs[sockets_created] = server_addr;
3734       addrlens[sockets_created] = addrlen;
3735       sockets_created++;
3736     }
3737     else
3738     {
3739       LOG (GNUNET_ERROR_TYPE_ERROR,
3740            _("Failed to bind UDP socket to %s: %s\n"),
3741            GNUNET_a2s (server_addr,
3742                        addrlen),
3743            STRERROR (eno));
3744     }
3745   }
3746
3747   if (0 == sockets_created)
3748   {
3749     LOG (GNUNET_ERROR_TYPE_WARNING,
3750          _("Failed to open UDP sockets\n"));
3751     return 0; /* No sockets created, return */
3752   }
3753   schedule_select_v4 (plugin);
3754   schedule_select_v6 (plugin);
3755   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3756                                      GNUNET_NO,
3757                                      plugin->port,
3758                                      sockets_created,
3759                                      addrs,
3760                                      addrlens,
3761                                      &udp_nat_port_map_callback,
3762                                      NULL,
3763                                      plugin,
3764                                      plugin->sockv4);
3765   return sockets_created;
3766 }
3767
3768
3769 /**
3770  * The exported method. Makes the core api available via a global and
3771  * returns the udp transport API.
3772  *
3773  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3774  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3775  */
3776 void *
3777 libgnunet_plugin_transport_udp_init (void *cls)
3778 {
3779   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3780   struct GNUNET_TRANSPORT_PluginFunctions *api;
3781   struct Plugin *p;
3782   unsigned long long port;
3783   unsigned long long aport;
3784   unsigned long long udp_max_bps;
3785   unsigned long long enable_v6;
3786   unsigned long long enable_broadcasting;
3787   unsigned long long enable_broadcasting_recv;
3788   char *bind4_address;
3789   char *bind6_address;
3790   struct GNUNET_TIME_Relative interval;
3791   struct sockaddr_in server_addrv4;
3792   struct sockaddr_in6 server_addrv6;
3793   int res;
3794   int have_bind4;
3795   int have_bind6;
3796
3797   if (NULL == env->receive)
3798   {
3799     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3800      initialze the plugin or the API */
3801     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3802     api->cls = NULL;
3803     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3804     api->address_to_string = &udp_address_to_string;
3805     api->string_to_address = &udp_string_to_address;
3806     return api;
3807   }
3808
3809   /* Get port number: port == 0 : autodetect a port,
3810    * > 0 : use this port, not given : 2086 default */
3811   if (GNUNET_OK !=
3812       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3813                                              "transport-udp",
3814                                              "PORT",
3815                                              &port))
3816     port = 2086;
3817   if (port > 65535)
3818   {
3819     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3820                                "transport-udp",
3821                                "PORT",
3822                                _("must be in [0,65535]"));
3823     return NULL;
3824   }
3825   if (GNUNET_OK !=
3826       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3827                                              "transport-udp",
3828                                              "ADVERTISED_PORT",
3829                                              &aport))
3830     aport = port;
3831   if (aport > 65535)
3832   {
3833     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3834                                "transport-udp",
3835                                "ADVERTISED_PORT",
3836                                _("must be in [0,65535]"));
3837     return NULL;
3838   }
3839
3840   if (GNUNET_YES ==
3841       GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3842                                             "nat",
3843                                             "DISABLEV6"))
3844     enable_v6 = GNUNET_NO;
3845   else
3846     enable_v6 = GNUNET_YES;
3847
3848   have_bind4 = GNUNET_NO;
3849   memset (&server_addrv4,
3850           0,
3851           sizeof (server_addrv4));
3852   if (GNUNET_YES ==
3853       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3854                                              "transport-udp",
3855                                              "BINDTO",
3856                                              &bind4_address))
3857   {
3858     LOG (GNUNET_ERROR_TYPE_DEBUG,
3859          "Binding UDP plugin to specific address: `%s'\n",
3860          bind4_address);
3861     if (1 != inet_pton (AF_INET,
3862                         bind4_address,
3863                         &server_addrv4.sin_addr))
3864     {
3865       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3866                                  "transport-udp",
3867                                  "BINDTO",
3868                                  _("must be valid IPv4 address"));
3869       GNUNET_free (bind4_address);
3870       return NULL;
3871     }
3872     have_bind4 = GNUNET_YES;
3873   }
3874   GNUNET_free_non_null (bind4_address);
3875   have_bind6 = GNUNET_NO;
3876   memset (&server_addrv6,
3877           0,
3878           sizeof (server_addrv6));
3879   if (GNUNET_YES ==
3880       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3881                                              "transport-udp",
3882                                              "BINDTO6",
3883                                              &bind6_address))
3884   {
3885     LOG (GNUNET_ERROR_TYPE_DEBUG,
3886          "Binding udp plugin to specific address: `%s'\n",
3887          bind6_address);
3888     if (1 != inet_pton (AF_INET6,
3889                         bind6_address,
3890                         &server_addrv6.sin6_addr))
3891     {
3892       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3893                                  "transport-udp",
3894                                  "BINDTO6",
3895                                  _("must be valid IPv6 address"));
3896       GNUNET_free (bind6_address);
3897       return NULL;
3898     }
3899     have_bind6 = GNUNET_YES;
3900   }
3901   GNUNET_free_non_null (bind6_address);
3902
3903   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3904                                                               "transport-udp",
3905                                                               "BROADCAST");
3906   if (enable_broadcasting == GNUNET_SYSERR)
3907     enable_broadcasting = GNUNET_NO;
3908
3909   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3910                                                                    "transport-udp",
3911                                                                    "BROADCAST_RECEIVE");
3912   if (enable_broadcasting_recv == GNUNET_SYSERR)
3913     enable_broadcasting_recv = GNUNET_YES;
3914
3915   if (GNUNET_SYSERR ==
3916       GNUNET_CONFIGURATION_get_value_time (env->cfg,
3917                                            "transport-udp",
3918                                            "BROADCAST_INTERVAL",
3919                                            &interval))
3920   {
3921     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
3922                                               10);
3923   }
3924   if (GNUNET_OK !=
3925       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3926                                              "transport-udp",
3927                                              "MAX_BPS",
3928                                              &udp_max_bps))
3929   {
3930     /* 50 MB/s == infinity for practical purposes */
3931     udp_max_bps = 1024 * 1024 * 50;
3932   }
3933
3934   p = GNUNET_new (struct Plugin);
3935   p->port = port;
3936   p->aport = aport;
3937   p->broadcast_interval = interval;
3938   p->enable_ipv6 = enable_v6;
3939   p->enable_ipv4 = GNUNET_YES; /* default */
3940   p->enable_broadcasting = enable_broadcasting;
3941   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3942   p->env = env;
3943   p->sessions = GNUNET_CONTAINER_multipeermap_create (16,
3944                                                       GNUNET_NO);
3945   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3946   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
3947                                      p);
3948   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3949                                  NULL,
3950                                  NULL,
3951                                  GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
3952                                  30);
3953   res = setup_sockets (p,
3954                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3955                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3956   if ( (0 == res) ||
3957        ( (NULL == p->sockv4) &&
3958          (NULL == p->sockv6) ) )
3959   {
3960     LOG (GNUNET_ERROR_TYPE_ERROR,
3961         _("Failed to create UDP network sockets\n"));
3962     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3963     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3964     GNUNET_SERVER_mst_destroy (p->mst);
3965     GNUNET_free (p);
3966     return NULL;
3967   }
3968
3969   /* Setup broadcasting and receiving beacons */
3970   setup_broadcast (p,
3971                    &server_addrv6,
3972                    &server_addrv4);
3973
3974   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3975   api->cls = p;
3976   api->disconnect_session = &udp_disconnect_session;
3977   api->query_keepalive_factor = &udp_query_keepalive_factor;
3978   api->disconnect_peer = &udp_disconnect;
3979   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3980   api->address_to_string = &udp_address_to_string;
3981   api->string_to_address = &udp_string_to_address;
3982   api->check_address = &udp_plugin_check_address;
3983   api->get_session = &udp_plugin_get_session;
3984   api->send = &udp_plugin_send;
3985   api->get_network = &udp_plugin_get_network;
3986   api->get_network_for_address = &udp_plugin_get_network_for_address;
3987   api->update_session_timeout = &udp_plugin_update_session_timeout;
3988   api->setup_monitor = &udp_plugin_setup_monitor;
3989   return api;
3990 }
3991
3992
3993 /**
3994  * Function called on each entry in the defragmentation heap to
3995  * clean it up.
3996  *
3997  * @param cls NULL
3998  * @param node node in the heap (to be removed)
3999  * @param element a `struct DefragContext` to be cleaned up
4000  * @param cost unused
4001  * @return #GNUNET_YES
4002  */
4003 static int
4004 heap_cleanup_iterator (void *cls,
4005                        struct GNUNET_CONTAINER_HeapNode *node,
4006                        void *element,
4007                        GNUNET_CONTAINER_HeapCostType cost)
4008 {
4009   struct DefragContext *d_ctx = element;
4010
4011   GNUNET_CONTAINER_heap_remove_node (node);
4012   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
4013   GNUNET_free (d_ctx);
4014   return GNUNET_YES;
4015 }
4016
4017
4018 /**
4019  * The exported method. Makes the core api available via a global and
4020  * returns the udp transport API.
4021  *
4022  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
4023  * @return NULL
4024  */
4025 void *
4026 libgnunet_plugin_transport_udp_done (void *cls)
4027 {
4028   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
4029   struct Plugin *plugin = api->cls;
4030   struct PrettyPrinterContext *cur;
4031   struct UDP_MessageWrapper *udpw;
4032
4033   if (NULL == plugin)
4034   {
4035     GNUNET_free (api);
4036     return NULL;
4037   }
4038   stop_broadcast (plugin);
4039   if (NULL != plugin->select_task_v4)
4040   {
4041     GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
4042     plugin->select_task_v4 = NULL;
4043   }
4044   if (NULL != plugin->select_task_v6)
4045   {
4046     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
4047     plugin->select_task_v6 = NULL;
4048   }
4049   if (NULL != plugin->sockv4)
4050   {
4051     GNUNET_break (GNUNET_OK ==
4052                   GNUNET_NETWORK_socket_close (plugin->sockv4));
4053     plugin->sockv4 = NULL;
4054   }
4055   if (NULL != plugin->sockv6)
4056   {
4057     GNUNET_break (GNUNET_OK ==
4058                   GNUNET_NETWORK_socket_close (plugin->sockv6));
4059     plugin->sockv6 = NULL;
4060   }
4061   if (NULL != plugin->nat)
4062   {
4063     GNUNET_NAT_unregister (plugin->nat);
4064     plugin->nat = NULL;
4065   }
4066   if (NULL != plugin->defrag_ctxs)
4067   {
4068     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
4069                                    &heap_cleanup_iterator,
4070                                    NULL);
4071     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
4072     plugin->defrag_ctxs = NULL;
4073   }
4074   if (NULL != plugin->mst)
4075   {
4076     GNUNET_SERVER_mst_destroy (plugin->mst);
4077     plugin->mst = NULL;
4078   }
4079   while (NULL != (udpw = plugin->ipv4_queue_head))
4080   {
4081     dequeue (plugin,
4082              udpw);
4083     udpw->qc (udpw->qc_cls,
4084               udpw,
4085               GNUNET_SYSERR);
4086     GNUNET_free (udpw);
4087   }
4088   while (NULL != (udpw = plugin->ipv6_queue_head))
4089   {
4090     dequeue (plugin,
4091              udpw);
4092     udpw->qc (udpw->qc_cls,
4093               udpw,
4094               GNUNET_SYSERR);
4095     GNUNET_free (udpw);
4096   }
4097   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
4098                                          &disconnect_and_free_it,
4099                                          plugin);
4100   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
4101
4102   while (NULL != (cur = plugin->ppc_dll_head))
4103   {
4104     GNUNET_break (0);
4105     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
4106                                  plugin->ppc_dll_tail,
4107                                  cur);
4108     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
4109     if (NULL != cur->timeout_task)
4110     {
4111       GNUNET_SCHEDULER_cancel (cur->timeout_task);
4112       cur->timeout_task = NULL;
4113     }
4114     GNUNET_free (cur);
4115   }
4116   GNUNET_free (plugin);
4117   GNUNET_free (api);
4118   return NULL;
4119 }
4120
4121 /* end of plugin_transport_udp.c */