also config files
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2    This file is part of GNUnet
3    Copyright (C) 2010-2017 GNUnet e.V.
4
5    GNUnet is free software: you can redistribute it and/or modify it
6    under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation, either version 3 of the License,
8    or (at your option) any later version.
9
10    GNUnet is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_service.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind, ...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 /**
46  * After how much inactivity should a UDP session time out?
47  */
48 #define UDP_SESSION_TIME_OUT \
49   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
50
51 /**
52  * Number of messages we can defragment in parallel.  We only really
53  * defragment 1 message at a time, but if messages get re-ordered, we
54  * may want to keep knowledge about the previous message to avoid
55  * discarding the current message in favor of a single fragment of a
56  * previous message.  3 should be good since we don't expect massive
57  * message reorderings with UDP.
58  */
59 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
60
61 /**
62  * We keep a defragmentation queue per sender address.  How many
63  * sender addresses do we support at the same time? Memory consumption
64  * is roughly a factor of 32k * #UDP_MAX_MESSAGES_IN_DEFRAG times this
65  * value. (So 128 corresponds to 12 MB and should suffice for
66  * connecting to roughly 128 peers via UDP).
67  */
68 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
69
70
71 /**
72  * UDP Message-Packet header (after defragmentation).
73  */
74 struct UDPMessage
75 {
76   /**
77    * Message header.
78    */
79   struct GNUNET_MessageHeader header;
80
81   /**
82    * Always zero for now.
83    */
84   uint32_t reserved;
85
86   /**
87    * What is the identity of the sender
88    */
89   struct GNUNET_PeerIdentity sender;
90 };
91
92
93 /**
94  * Closure for #append_port().
95  */
96 struct PrettyPrinterContext
97 {
98   /**
99    * DLL
100    */
101   struct PrettyPrinterContext *next;
102
103   /**
104    * DLL
105    */
106   struct PrettyPrinterContext *prev;
107
108   /**
109    * Our plugin.
110    */
111   struct Plugin *plugin;
112
113   /**
114    * Resolver handle
115    */
116   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
117
118   /**
119    * Function to call with the result.
120    */
121   GNUNET_TRANSPORT_AddressStringCallback asc;
122
123   /**
124    * Clsoure for @e asc.
125    */
126   void *asc_cls;
127
128   /**
129    * Timeout task
130    */
131   struct GNUNET_SCHEDULER_Task *timeout_task;
132
133   /**
134    * Is this an IPv6 address?
135    */
136   int ipv6;
137
138   /**
139    * Options
140    */
141   uint32_t options;
142
143   /**
144    * Port to add after the IP address.
145    */
146   uint16_t port;
147 };
148
149
150 /**
151  * Session with another peer.
152  */
153 struct GNUNET_ATS_Session
154 {
155   /**
156    * Which peer is this session for?
157    */
158   struct GNUNET_PeerIdentity target;
159
160   /**
161    * Tokenizer for inbound messages.
162    */
163   struct GNUNET_MessageStreamTokenizer *mst;
164
165   /**
166    * Plugin this session belongs to.
167    */
168   struct Plugin *plugin;
169
170   /**
171    * Context for dealing with fragments.
172    */
173   struct UDP_FragmentationContext *frag_ctx;
174
175   /**
176    * Desired delay for next sending we send to other peer
177    */
178   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
179
180   /**
181    * Desired delay for transmissions we received from other peer.
182    * This is for full messages, the value needs to be adjusted for
183    * fragmented messages.
184    */
185   struct GNUNET_TIME_Relative flow_delay_from_other_peer;
186
187   /**
188    * Session timeout task
189    */
190   struct GNUNET_SCHEDULER_Task *timeout_task;
191
192   /**
193    * When does this session time out?
194    */
195   struct GNUNET_TIME_Absolute timeout;
196
197   /**
198    * What time did we last transmit?
199    */
200   struct GNUNET_TIME_Absolute last_transmit_time;
201
202   /**
203    * expected delay for ACKs
204    */
205   struct GNUNET_TIME_Relative last_expected_ack_delay;
206
207   /**
208    * desired delay between UDP messages
209    */
210   struct GNUNET_TIME_Relative last_expected_msg_delay;
211
212   /**
213    * Our own address.
214    */
215   struct GNUNET_HELLO_Address *address;
216
217   /**
218    * Number of bytes waiting for transmission to this peer.
219    */
220   unsigned long long bytes_in_queue;
221
222   /**
223    * Number of messages waiting for transmission to this peer.
224    */
225   unsigned int msgs_in_queue;
226
227   /**
228    * Reference counter to indicate that this session is
229    * currently being used and must not be destroyed;
230    * setting @e in_destroy will destroy it as soon as
231    * possible.
232    */
233   unsigned int rc;
234
235   /**
236    * Network type of the address.
237    */
238   enum GNUNET_NetworkType scope;
239
240   /**
241    * Is this session about to be destroyed (sometimes we cannot
242    * destroy a session immediately as below us on the stack
243    * there might be code that still uses it; in this case,
244    * @e rc is non-zero).
245    */
246   int in_destroy;
247 };
248
249
250 /**
251  * Data structure to track defragmentation contexts based
252  * on the source of the UDP traffic.
253  */
254 struct DefragContext
255 {
256   /**
257    * Defragmentation context.
258    */
259   struct GNUNET_DEFRAGMENT_Context *defrag;
260
261   /**
262    * Reference to master plugin struct.
263    */
264   struct Plugin *plugin;
265
266   /**
267    * Node in the defrag heap.
268    */
269   struct GNUNET_CONTAINER_HeapNode *hnode;
270
271   /**
272    * Source address this receive context is for (allocated at the
273    * end of the struct).
274    */
275   const union UdpAddress *udp_addr;
276
277   /**
278    * Who's message(s) are we defragmenting here?
279    * Only initialized once we succeeded and
280    * @e have_sender is set.
281    */
282   struct GNUNET_PeerIdentity sender;
283
284   /**
285    * Length of @e udp_addr.
286    */
287   size_t udp_addr_len;
288
289   /**
290    * Network type the address belongs to.
291    */
292   enum GNUNET_NetworkType network_type;
293
294   /**
295    * Has the @e sender field been initialized yet?
296    */
297   int have_sender;
298 };
299
300
301 /**
302  * Context to send fragmented messages
303  */
304 struct UDP_FragmentationContext
305 {
306   /**
307    * Next in linked list
308    */
309   struct UDP_FragmentationContext *next;
310
311   /**
312    * Previous in linked list
313    */
314   struct UDP_FragmentationContext *prev;
315
316   /**
317    * The plugin
318    */
319   struct Plugin *plugin;
320
321   /**
322    * Handle for fragmentation.
323    */
324   struct GNUNET_FRAGMENT_Context *frag;
325
326   /**
327    * The session this fragmentation context belongs to
328    */
329   struct GNUNET_ATS_Session *session;
330
331   /**
332    * Function to call upon completion of the transmission.
333    */
334   GNUNET_TRANSPORT_TransmitContinuation cont;
335
336   /**
337    * Closure for @e cont.
338    */
339   void *cont_cls;
340
341   /**
342    * Start time.
343    */
344   struct GNUNET_TIME_Absolute start_time;
345
346   /**
347    * Transmission time for the next fragment.  Incremented by
348    * the @e flow_delay_from_other_peer for each fragment when
349    * we setup the fragments.
350    */
351   struct GNUNET_TIME_Absolute next_frag_time;
352
353   /**
354    * Desired delay for transmissions we received from other peer.
355    * Adjusted to be per fragment (UDP_MTU), even though on the
356    * wire it was for "full messages".
357    */
358   struct GNUNET_TIME_Relative flow_delay_from_other_peer;
359
360   /**
361    * Message timeout
362    */
363   struct GNUNET_TIME_Absolute timeout;
364
365   /**
366    * Payload size of original unfragmented message
367    */
368   size_t payload_size;
369
370   /**
371    * Bytes used to send all fragments on wire including UDP overhead
372    */
373   size_t on_wire_size;
374 };
375
376
377 /**
378  * Function called when a message is removed from the
379  * transmission queue.
380  *
381  * @param cls closure
382  * @param udpw message wrapper finished
383  * @param result #GNUNET_OK on success (message was sent)
384  *               #GNUNET_SYSERR if the target disconnected
385  *               or we had a timeout or other trouble sending
386  */
387 typedef void (*QueueContinuation) (void *cls,
388                                    struct UDP_MessageWrapper *udpw,
389                                    int result);
390
391
392 /**
393  * Information we track for each message in the queue.
394  */
395 struct UDP_MessageWrapper
396 {
397   /**
398    * Session this message belongs to
399    */
400   struct GNUNET_ATS_Session *session;
401
402   /**
403    * DLL of messages, previous element
404    */
405   struct UDP_MessageWrapper *prev;
406
407   /**
408    * DLL of messages, next element
409    */
410   struct UDP_MessageWrapper *next;
411
412   /**
413    * Message with @e msg_size bytes including UDP-specific overhead.
414    */
415   char *msg_buf;
416
417   /**
418    * Function to call once the message wrapper is being removed
419    * from the queue (with success or failure).
420    */
421   QueueContinuation qc;
422
423   /**
424    * Closure for @e qc.
425    */
426   void *qc_cls;
427
428   /**
429    * External continuation to call upon completion of the
430    * transmission, NULL if this queue entry is not for a
431    * message from the application.
432    */
433   GNUNET_TRANSPORT_TransmitContinuation cont;
434
435   /**
436    * Closure for @e cont.
437    */
438   void *cont_cls;
439
440   /**
441    * Fragmentation context.
442    * frag_ctx == NULL if transport <= MTU
443    * frag_ctx != NULL if transport > MTU
444    */
445   struct UDP_FragmentationContext *frag_ctx;
446
447   /**
448    * Message enqueue time.
449    */
450   struct GNUNET_TIME_Absolute start_time;
451
452   /**
453    * Desired transmission time for this message, based on the
454    * flow limiting information we got from the other peer.
455    */
456   struct GNUNET_TIME_Absolute transmission_time;
457
458   /**
459    * Message timeout.
460    */
461   struct GNUNET_TIME_Absolute timeout;
462
463   /**
464    * Size of UDP message to send, including UDP-specific overhead.
465    */
466   size_t msg_size;
467
468   /**
469    * Payload size of original message.
470    */
471   size_t payload_size;
472 };
473
474
475 GNUNET_NETWORK_STRUCT_BEGIN
476
477 /**
478  * UDP ACK Message-Packet header.
479  */
480 struct UDP_ACK_Message
481 {
482   /**
483    * Message header.
484    */
485   struct GNUNET_MessageHeader header;
486
487   /**
488    * Desired delay for flow control, in us (in NBO).
489    * A value of UINT32_MAX indicates that the other
490    * peer wants us to disconnect.
491    */
492   uint32_t delay GNUNET_PACKED;
493
494   /**
495    * What is the identity of the sender
496    */
497   struct GNUNET_PeerIdentity sender;
498 };
499
500 GNUNET_NETWORK_STRUCT_END
501
502
503 /* ************************* Monitoring *********** */
504
505
506 /**
507  * If a session monitor is attached, notify it about the new
508  * session state.
509  *
510  * @param plugin our plugin
511  * @param session session that changed state
512  * @param state new state of the session
513  */
514 static void
515 notify_session_monitor (struct Plugin *plugin,
516                         struct GNUNET_ATS_Session *session,
517                         enum GNUNET_TRANSPORT_SessionState state)
518 {
519   struct GNUNET_TRANSPORT_SessionInfo info;
520
521   if (NULL == plugin->sic)
522     return;
523   if (GNUNET_YES == session->in_destroy)
524     return; /* already destroyed, just RC>0 left-over actions */
525   memset (&info, 0, sizeof(info));
526   info.state = state;
527   info.is_inbound = GNUNET_SYSERR; /* hard to say */
528   info.num_msg_pending = session->msgs_in_queue;
529   info.num_bytes_pending = session->bytes_in_queue;
530   /* info.receive_delay remains zero as this is not supported by UDP
531      (cannot selectively not receive from 'some' peer while continuing
532      to receive from others) */
533   info.session_timeout = session->timeout;
534   info.address = session->address;
535   plugin->sic (plugin->sic_cls, session, &info);
536 }
537
538
539 /**
540  * Return information about the given session to the monitor callback.
541  *
542  * @param cls the `struct Plugin` with the monitor callback (`sic`)
543  * @param peer peer we send information about
544  * @param value our `struct GNUNET_ATS_Session` to send information about
545  * @return #GNUNET_OK (continue to iterate)
546  */
547 static int
548 send_session_info_iter (void *cls,
549                         const struct GNUNET_PeerIdentity *peer,
550                         void *value)
551 {
552   struct Plugin *plugin = cls;
553   struct GNUNET_ATS_Session *session = value;
554
555   notify_session_monitor (plugin, session, GNUNET_TRANSPORT_SS_INIT);
556   notify_session_monitor (plugin, session, GNUNET_TRANSPORT_SS_UP);
557   return GNUNET_OK;
558 }
559
560
561 /**
562  * Begin monitoring sessions of a plugin.  There can only
563  * be one active monitor per plugin (i.e. if there are
564  * multiple monitors, the transport service needs to
565  * multiplex the generated events over all of them).
566  *
567  * @param cls closure of the plugin
568  * @param sic callback to invoke, NULL to disable monitor;
569  *            plugin will being by iterating over all active
570  *            sessions immediately and then enter monitor mode
571  * @param sic_cls closure for @a sic
572  */
573 static void
574 udp_plugin_setup_monitor (void *cls,
575                           GNUNET_TRANSPORT_SessionInfoCallback sic,
576                           void *sic_cls)
577 {
578   struct Plugin *plugin = cls;
579
580   plugin->sic = sic;
581   plugin->sic_cls = sic_cls;
582   if (NULL != sic)
583   {
584     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
585                                            &send_session_info_iter,
586                                            plugin);
587     /* signal end of first iteration */
588     sic (sic_cls, NULL, NULL);
589   }
590 }
591
592
593 /* ****************** Little Helpers ****************** */
594
595
596 /**
597  * Function to free last resources associated with a session.
598  *
599  * @param s session to free
600  */
601 static void
602 free_session (struct GNUNET_ATS_Session *s)
603 {
604   if (NULL != s->address)
605   {
606     GNUNET_HELLO_address_free (s->address);
607     s->address = NULL;
608   }
609   if (NULL != s->frag_ctx)
610   {
611     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, NULL, NULL);
612     GNUNET_free (s->frag_ctx);
613     s->frag_ctx = NULL;
614   }
615   if (NULL != s->mst)
616   {
617     GNUNET_MST_destroy (s->mst);
618     s->mst = NULL;
619   }
620   GNUNET_free (s);
621 }
622
623
624 /**
625  * Function that is called to get the keepalive factor.
626  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
627  * calculate the interval between keepalive packets.
628  *
629  * @param cls closure with the `struct Plugin`
630  * @return keepalive factor
631  */
632 static unsigned int
633 udp_query_keepalive_factor (void *cls)
634 {
635   return 15;
636 }
637
638
639 /**
640  * Function obtain the network type for a session
641  *
642  * @param cls closure (`struct Plugin *`)
643  * @param session the session
644  * @return the network type
645  */
646 static enum GNUNET_NetworkType
647 udp_plugin_get_network (void *cls, struct GNUNET_ATS_Session *session)
648 {
649   return session->scope;
650 }
651
652
653 /**
654  * Function obtain the network type for an address.
655  *
656  * @param cls closure (`struct Plugin *`)
657  * @param address the address
658  * @return the network type
659  */
660 static enum GNUNET_NetworkType
661 udp_plugin_get_network_for_address (void *cls,
662                                     const struct GNUNET_HELLO_Address *address)
663 {
664   struct Plugin *plugin = cls;
665   size_t addrlen;
666   struct sockaddr_in a4;
667   struct sockaddr_in6 a6;
668   const struct IPv4UdpAddress *u4;
669   const struct IPv6UdpAddress *u6;
670   const void *sb;
671   size_t sbs;
672
673   addrlen = address->address_length;
674   if (addrlen == sizeof(struct IPv6UdpAddress))
675   {
676     GNUNET_assert (NULL != address->address);  /* make static analysis happy */
677     u6 = address->address;
678     memset (&a6, 0, sizeof(a6));
679 #if HAVE_SOCKADDR_IN_SIN_LEN
680     a6.sin6_len = sizeof(a6);
681 #endif
682     a6.sin6_family = AF_INET6;
683     a6.sin6_port = u6->u6_port;
684     GNUNET_memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
685     sb = &a6;
686     sbs = sizeof(a6);
687   }
688   else if (addrlen == sizeof(struct IPv4UdpAddress))
689   {
690     GNUNET_assert (NULL != address->address);  /* make static analysis happy */
691     u4 = address->address;
692     memset (&a4, 0, sizeof(a4));
693 #if HAVE_SOCKADDR_IN_SIN_LEN
694     a4.sin_len = sizeof(a4);
695 #endif
696     a4.sin_family = AF_INET;
697     a4.sin_port = u4->u4_port;
698     a4.sin_addr.s_addr = u4->ipv4_addr;
699     sb = &a4;
700     sbs = sizeof(a4);
701   }
702   else
703   {
704     GNUNET_break (0);
705     return GNUNET_NT_UNSPECIFIED;
706   }
707   return plugin->env->get_address_type (plugin->env->cls, sb, sbs);
708 }
709
710
711 /* ******************* Event loop ******************** */
712
713 /**
714  * We have been notified that our readset has something to read.  We don't
715  * know which socket needs to be read, so we have to check each one
716  * Then reschedule this function to be called again once more is available.
717  *
718  * @param cls the plugin handle
719  */
720 static void
721 udp_plugin_select_v4 (void *cls);
722
723
724 /**
725  * We have been notified that our readset has something to read.  We don't
726  * know which socket needs to be read, so we have to check each one
727  * Then reschedule this function to be called again once more is available.
728  *
729  * @param cls the plugin handle
730  */
731 static void
732 udp_plugin_select_v6 (void *cls);
733
734
735 /**
736  * (re)schedule IPv4-select tasks for this plugin.
737  *
738  * @param plugin plugin to reschedule
739  */
740 static void
741 schedule_select_v4 (struct Plugin *plugin)
742 {
743   struct GNUNET_TIME_Relative min_delay;
744   struct GNUNET_TIME_Relative delay;
745   struct UDP_MessageWrapper *udpw;
746   struct UDP_MessageWrapper *min_udpw;
747
748   if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
749   {
750     /* Find a message ready to send:
751      * Flow delay from other peer is expired or not set (0) */
752     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
753     min_udpw = NULL;
754     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
755     {
756       delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
757       if (delay.rel_value_us < min_delay.rel_value_us)
758       {
759         min_delay = delay;
760         min_udpw = udpw;
761       }
762     }
763     if (NULL != plugin->select_task_v4)
764       GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
765     if (NULL != min_udpw)
766     {
767       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
768       {
769         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
770                     "Calculated flow delay for UDPv4 at %s for %s\n",
771                     GNUNET_STRINGS_relative_time_to_string (min_delay,
772                                                             GNUNET_YES),
773                     GNUNET_i2s (&min_udpw->session->target));
774       }
775       else
776       {
777         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778                     "Calculated flow delay for UDPv4 at %s for %s\n",
779                     GNUNET_STRINGS_relative_time_to_string (min_delay,
780                                                             GNUNET_YES),
781                     GNUNET_i2s (&min_udpw->session->target));
782       }
783     }
784     plugin->select_task_v4 =
785       GNUNET_SCHEDULER_add_read_net (min_delay,
786                                      plugin->sockv4,
787                                      &udp_plugin_select_v4,
788                                      plugin);
789   }
790 }
791
792
793 /**
794  * (re)schedule IPv6-select tasks for this plugin.
795  *
796  * @param plugin plugin to reschedule
797  */
798 static void
799 schedule_select_v6 (struct Plugin *plugin)
800 {
801   struct GNUNET_TIME_Relative min_delay;
802   struct GNUNET_TIME_Relative delay;
803   struct UDP_MessageWrapper *udpw;
804   struct UDP_MessageWrapper *min_udpw;
805
806   if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
807   {
808     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
809     min_udpw = NULL;
810     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
811     {
812       delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
813       if (delay.rel_value_us < min_delay.rel_value_us)
814       {
815         min_delay = delay;
816         min_udpw = udpw;
817       }
818     }
819     if (NULL != plugin->select_task_v6)
820       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
821     if (NULL != min_udpw)
822     {
823       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
824       {
825         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
826                     "Calculated flow delay for UDPv6 at %s for %s\n",
827                     GNUNET_STRINGS_relative_time_to_string (min_delay,
828                                                             GNUNET_YES),
829                     GNUNET_i2s (&min_udpw->session->target));
830       }
831       else
832       {
833         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834                     "Calculated flow delay for UDPv6 at %s for %s\n",
835                     GNUNET_STRINGS_relative_time_to_string (min_delay,
836                                                             GNUNET_YES),
837                     GNUNET_i2s (&min_udpw->session->target));
838       }
839     }
840     plugin->select_task_v6 =
841       GNUNET_SCHEDULER_add_read_net (min_delay,
842                                      plugin->sockv6,
843                                      &udp_plugin_select_v6,
844                                      plugin);
845   }
846 }
847
848
849 /* ******************* Address to string and back ***************** */
850
851
852 /**
853  * Function called for a quick conversion of the binary address to
854  * a numeric address.  Note that the caller must not free the
855  * address and that the next call to this function is allowed
856  * to override the address again.
857  *
858  * @param cls closure
859  * @param addr binary address (a `union UdpAddress`)
860  * @param addrlen length of the @a addr
861  * @return string representing the same address
862  */
863 const char *
864 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
865 {
866   static char rbuf[INET6_ADDRSTRLEN + 10];
867   char buf[INET6_ADDRSTRLEN];
868   const void *sb;
869   struct in_addr a4;
870   struct in6_addr a6;
871   const struct IPv4UdpAddress *t4;
872   const struct IPv6UdpAddress *t6;
873   int af;
874   uint16_t port;
875   uint32_t options;
876
877   if (NULL == addr)
878   {
879     GNUNET_break_op (0);
880     return NULL;
881   }
882
883   if (addrlen == sizeof(struct IPv6UdpAddress))
884   {
885     t6 = addr;
886     af = AF_INET6;
887     options = ntohl (t6->options);
888     port = ntohs (t6->u6_port);
889     a6 = t6->ipv6_addr;
890     sb = &a6;
891   }
892   else if (addrlen == sizeof(struct IPv4UdpAddress))
893   {
894     t4 = addr;
895     af = AF_INET;
896     options = ntohl (t4->options);
897     port = ntohs (t4->u4_port);
898     a4.s_addr = t4->ipv4_addr;
899     sb = &a4;
900   }
901   else
902   {
903     GNUNET_break_op (0);
904     return NULL;
905   }
906   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
907   GNUNET_snprintf (rbuf,
908                    sizeof(rbuf),
909                    (af == AF_INET6) ? "%s.%u.[%s]:%u" : "%s.%u.%s:%u",
910                    PLUGIN_NAME,
911                    options,
912                    buf,
913                    port);
914   return rbuf;
915 }
916
917
918 /**
919  * Function called to convert a string address to a binary address.
920  *
921  * @param cls closure (`struct Plugin *`)
922  * @param addr string address
923  * @param addrlen length of the address
924  * @param buf location to store the buffer
925  * @param added location to store the number of bytes in the buffer.
926  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
927  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
928  */
929 static int
930 udp_string_to_address (void *cls,
931                        const char *addr,
932                        uint16_t addrlen,
933                        void **buf,
934                        size_t *added)
935 {
936   struct sockaddr_storage socket_address;
937   char *address;
938   char *plugin;
939   char *optionstr;
940   uint32_t options;
941
942   /* Format tcp.options.address:port */
943   address = NULL;
944   plugin = NULL;
945   optionstr = NULL;
946
947   if ((NULL == addr) || (0 == addrlen))
948   {
949     GNUNET_break (0);
950     return GNUNET_SYSERR;
951   }
952   if ('\0' != addr[addrlen - 1])
953   {
954     GNUNET_break (0);
955     return GNUNET_SYSERR;
956   }
957   if (strlen (addr) != addrlen - 1)
958   {
959     GNUNET_break (0);
960     return GNUNET_SYSERR;
961   }
962   plugin = GNUNET_strdup (addr);
963   optionstr = strchr (plugin, '.');
964   if (NULL == optionstr)
965   {
966     GNUNET_break (0);
967     GNUNET_free (plugin);
968     return GNUNET_SYSERR;
969   }
970   optionstr[0] = '\0';
971   optionstr++;
972   options = atol (optionstr);
973   address = strchr (optionstr, '.');
974   if (NULL == address)
975   {
976     GNUNET_break (0);
977     GNUNET_free (plugin);
978     return GNUNET_SYSERR;
979   }
980   address[0] = '\0';
981   address++;
982
983   if (GNUNET_OK !=
984       GNUNET_STRINGS_to_address_ip (address, strlen (address), &socket_address))
985   {
986     GNUNET_break (0);
987     GNUNET_free (plugin);
988     return GNUNET_SYSERR;
989   }
990   GNUNET_free (plugin);
991
992   switch (socket_address.ss_family)
993   {
994   case AF_INET: {
995       struct IPv4UdpAddress *u4;
996       const struct sockaddr_in *in4 =
997         (const struct sockaddr_in *) &socket_address;
998
999       u4 = GNUNET_new (struct IPv4UdpAddress);
1000       u4->options = htonl (options);
1001       u4->ipv4_addr = in4->sin_addr.s_addr;
1002       u4->u4_port = in4->sin_port;
1003       *buf = u4;
1004       *added = sizeof(struct IPv4UdpAddress);
1005       return GNUNET_OK;
1006     }
1007
1008   case AF_INET6: {
1009       struct IPv6UdpAddress *u6;
1010       const struct sockaddr_in6 *in6 =
1011         (const struct sockaddr_in6 *) &socket_address;
1012
1013       u6 = GNUNET_new (struct IPv6UdpAddress);
1014       u6->options = htonl (options);
1015       u6->ipv6_addr = in6->sin6_addr;
1016       u6->u6_port = in6->sin6_port;
1017       *buf = u6;
1018       *added = sizeof(struct IPv6UdpAddress);
1019       return GNUNET_OK;
1020     }
1021
1022   default:
1023     GNUNET_break (0);
1024     return GNUNET_SYSERR;
1025   }
1026 }
1027
1028
1029 /**
1030  * Append our port and forward the result.
1031  *
1032  * @param cls a `struct PrettyPrinterContext *`
1033  * @param hostname result from DNS resolver
1034  */
1035 static void
1036 append_port (void *cls, const char *hostname)
1037 {
1038   struct PrettyPrinterContext *ppc = cls;
1039   struct Plugin *plugin = ppc->plugin;
1040   char *ret;
1041
1042   if (NULL == hostname)
1043   {
1044     /* Final call, done */
1045     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
1046                                  plugin->ppc_dll_tail,
1047                                  ppc);
1048     ppc->resolver_handle = NULL;
1049     ppc->asc (ppc->asc_cls, NULL, GNUNET_OK);
1050     GNUNET_free (ppc);
1051     return;
1052   }
1053   if (GNUNET_YES == ppc->ipv6)
1054     GNUNET_asprintf (&ret,
1055                      "%s.%u.[%s]:%d",
1056                      PLUGIN_NAME,
1057                      ppc->options,
1058                      hostname,
1059                      ppc->port);
1060   else
1061     GNUNET_asprintf (&ret,
1062                      "%s.%u.%s:%d",
1063                      PLUGIN_NAME,
1064                      ppc->options,
1065                      hostname,
1066                      ppc->port);
1067   ppc->asc (ppc->asc_cls, ret, GNUNET_OK);
1068   GNUNET_free (ret);
1069 }
1070
1071
1072 /**
1073  * Convert the transports address to a nice, human-readable format.
1074  *
1075  * @param cls closure with the `struct Plugin *`
1076  * @param type name of the transport that generated the address
1077  * @param addr one of the addresses of the host, NULL for the last address
1078  *        the specific address format depends on the transport;
1079  *        a `union UdpAddress`
1080  * @param addrlen length of the address
1081  * @param numeric should (IP) addresses be displayed in numeric form?
1082  * @param timeout after how long should we give up?
1083  * @param asc function to call on each string
1084  * @param asc_cls closure for @a asc
1085  */
1086 static void
1087 udp_plugin_address_pretty_printer (void *cls,
1088                                    const char *type,
1089                                    const void *addr,
1090                                    size_t addrlen,
1091                                    int numeric,
1092                                    struct GNUNET_TIME_Relative timeout,
1093                                    GNUNET_TRANSPORT_AddressStringCallback asc,
1094                                    void *asc_cls)
1095 {
1096   struct Plugin *plugin = cls;
1097   struct PrettyPrinterContext *ppc;
1098   const struct sockaddr *sb;
1099   size_t sbs;
1100   struct sockaddr_in a4;
1101   struct sockaddr_in6 a6;
1102   const struct IPv4UdpAddress *u4;
1103   const struct IPv6UdpAddress *u6;
1104   uint16_t port;
1105   uint32_t options;
1106
1107   if (addrlen == sizeof(struct IPv6UdpAddress))
1108   {
1109     u6 = addr;
1110     memset (&a6, 0, sizeof(a6));
1111     a6.sin6_family = AF_INET6;
1112 #if HAVE_SOCKADDR_IN_SIN_LEN
1113     a6.sin6_len = sizeof(a6);
1114 #endif
1115     a6.sin6_port = u6->u6_port;
1116     a6.sin6_addr = u6->ipv6_addr;
1117     port = ntohs (u6->u6_port);
1118     options = ntohl (u6->options);
1119     sb = (const struct sockaddr *) &a6;
1120     sbs = sizeof(a6);
1121   }
1122   else if (addrlen == sizeof(struct IPv4UdpAddress))
1123   {
1124     u4 = addr;
1125     memset (&a4, 0, sizeof(a4));
1126     a4.sin_family = AF_INET;
1127 #if HAVE_SOCKADDR_IN_SIN_LEN
1128     a4.sin_len = sizeof(a4);
1129 #endif
1130     a4.sin_port = u4->u4_port;
1131     a4.sin_addr.s_addr = u4->ipv4_addr;
1132     port = ntohs (u4->u4_port);
1133     options = ntohl (u4->options);
1134     sb = (const struct sockaddr *) &a4;
1135     sbs = sizeof(a4);
1136   }
1137   else
1138   {
1139     /* invalid address */
1140     GNUNET_break_op (0);
1141     asc (asc_cls, NULL, GNUNET_SYSERR);
1142     asc (asc_cls, NULL, GNUNET_OK);
1143     return;
1144   }
1145   ppc = GNUNET_new (struct PrettyPrinterContext);
1146   ppc->plugin = plugin;
1147   ppc->asc = asc;
1148   ppc->asc_cls = asc_cls;
1149   ppc->port = port;
1150   ppc->options = options;
1151   if (addrlen == sizeof(struct IPv6UdpAddress))
1152     ppc->ipv6 = GNUNET_YES;
1153   else
1154     ppc->ipv6 = GNUNET_NO;
1155   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head, plugin->ppc_dll_tail, ppc);
1156   ppc->resolver_handle = GNUNET_RESOLVER_hostname_get (sb,
1157                                                        sbs,
1158                                                        ! numeric,
1159                                                        timeout,
1160                                                        &append_port,
1161                                                        ppc);
1162 }
1163
1164
1165 /**
1166  * Check if the given port is plausible (must be either our listen
1167  * port or our advertised port).  If it is neither, we return
1168  * #GNUNET_SYSERR.
1169  *
1170  * @param plugin global variables
1171  * @param in_port port number to check
1172  * @return #GNUNET_OK if port is either our open or advertised port
1173  */
1174 static int
1175 check_port (const struct Plugin *plugin, uint16_t in_port)
1176 {
1177   if ((plugin->port == in_port) || (plugin->aport == in_port))
1178     return GNUNET_OK;
1179   return GNUNET_SYSERR;
1180 }
1181
1182
1183 /**
1184  * Function that will be called to check if a binary address for this
1185  * plugin is well-formed and corresponds to an address for THIS peer
1186  * (as per our configuration).  Naturally, if absolutely necessary,
1187  * plugins can be a bit conservative in their answer, but in general
1188  * plugins should make sure that the address does not redirect
1189  * traffic to a 3rd party that might try to man-in-the-middle our
1190  * traffic.
1191  *
1192  * @param cls closure, should be our handle to the Plugin
1193  * @param addr pointer to a `union UdpAddress`
1194  * @param addrlen length of @a addr
1195  * @return #GNUNET_OK if this is a plausible address for this peer
1196  *         and transport, #GNUNET_SYSERR if not
1197  */
1198 static int
1199 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
1200 {
1201   struct Plugin *plugin = cls;
1202   const struct IPv4UdpAddress *v4;
1203   const struct IPv6UdpAddress *v6;
1204
1205   if (sizeof(struct IPv4UdpAddress) == addrlen)
1206   {
1207     struct sockaddr_in s4;
1208
1209     v4 = (const struct IPv4UdpAddress *) addr;
1210     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1211       return GNUNET_SYSERR;
1212     memset (&s4, 0, sizeof(s4));
1213     s4.sin_family = AF_INET;
1214 #if HAVE_SOCKADDR_IN_SIN_LEN
1215     s4.sin_len = sizeof(s4);
1216 #endif
1217     s4.sin_port = v4->u4_port;
1218     s4.sin_addr.s_addr = v4->ipv4_addr;
1219
1220     if (GNUNET_OK !=
1221         GNUNET_NAT_test_address (plugin->nat, &s4, sizeof(struct sockaddr_in)))
1222       return GNUNET_SYSERR;
1223   }
1224   else if (sizeof(struct IPv6UdpAddress) == addrlen)
1225   {
1226     struct sockaddr_in6 s6;
1227
1228     v6 = (const struct IPv6UdpAddress *) addr;
1229     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1230       return GNUNET_OK;   /* plausible, if unlikely... */
1231     memset (&s6, 0, sizeof(s6));
1232     s6.sin6_family = AF_INET6;
1233 #if HAVE_SOCKADDR_IN_SIN_LEN
1234     s6.sin6_len = sizeof(s6);
1235 #endif
1236     s6.sin6_port = v6->u6_port;
1237     s6.sin6_addr = v6->ipv6_addr;
1238
1239     if (GNUNET_OK != GNUNET_NAT_test_address (plugin->nat,
1240                                               &s6,
1241                                               sizeof(struct sockaddr_in6)))
1242       return GNUNET_SYSERR;
1243   }
1244   else
1245   {
1246     GNUNET_break_op (0);
1247     return GNUNET_SYSERR;
1248   }
1249   return GNUNET_OK;
1250 }
1251
1252
1253 /**
1254  * Our external IP address/port mapping has changed.
1255  *
1256  * @param cls closure, the `struct Plugin`
1257  * @param app_ctx[in,out] location where the app can store stuff
1258  *                  on add and retrieve it on remove
1259  * @param add_remove #GNUNET_YES to mean the new public IP address,
1260  *                   #GNUNET_NO to mean the previous (now invalid) one
1261  * @param ac address class the address belongs to
1262  * @param addr either the previous or the new public IP address
1263  * @param addrlen actual length of the @a addr
1264  */
1265 static void
1266 udp_nat_port_map_callback (void *cls,
1267                            void **app_ctx,
1268                            int add_remove,
1269                            enum GNUNET_NAT_AddressClass ac,
1270                            const struct sockaddr *addr,
1271                            socklen_t addrlen)
1272 {
1273   struct Plugin *plugin = cls;
1274   struct GNUNET_HELLO_Address *address;
1275   struct IPv4UdpAddress u4;
1276   struct IPv6UdpAddress u6;
1277   void *arg;
1278   size_t args;
1279
1280   (void) app_ctx;
1281   LOG (GNUNET_ERROR_TYPE_DEBUG,
1282        (GNUNET_YES == add_remove) ? "NAT notification to add address `%s'\n"
1283        : "NAT notification to remove address `%s'\n",
1284        GNUNET_a2s (addr, addrlen));
1285   /* convert 'address' to our internal format */
1286   switch (addr->sa_family)
1287   {
1288   case AF_INET: {
1289       const struct sockaddr_in *i4;
1290
1291       GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
1292       i4 = (const struct sockaddr_in *) addr;
1293       if (0 == ntohs (i4->sin_port))
1294         return; /* Port = 0 means unmapped, ignore these for UDP. */
1295       memset (&u4, 0, sizeof(u4));
1296       u4.options = htonl (plugin->myoptions);
1297       u4.ipv4_addr = i4->sin_addr.s_addr;
1298       u4.u4_port = i4->sin_port;
1299       arg = &u4;
1300       args = sizeof(struct IPv4UdpAddress);
1301       break;
1302     }
1303
1304   case AF_INET6: {
1305       const struct sockaddr_in6 *i6;
1306
1307       GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
1308       i6 = (const struct sockaddr_in6 *) addr;
1309       if (0 == ntohs (i6->sin6_port))
1310         return; /* Port = 0 means unmapped, ignore these for UDP. */
1311       memset (&u6, 0, sizeof(u6));
1312       u6.options = htonl (plugin->myoptions);
1313       u6.ipv6_addr = i6->sin6_addr;
1314       u6.u6_port = i6->sin6_port;
1315       arg = &u6;
1316       args = sizeof(struct IPv6UdpAddress);
1317       break;
1318     }
1319
1320   default:
1321     GNUNET_break (0);
1322     return;
1323   }
1324   /* modify our published address list */
1325   /* TODO: use 'ac' here in the future... */
1326   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
1327                                            PLUGIN_NAME,
1328                                            arg,
1329                                            args,
1330                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
1331   plugin->env->notify_address (plugin->env->cls, add_remove, address);
1332   GNUNET_HELLO_address_free (address);
1333 }
1334
1335
1336 /* ********************* Finding sessions ******************* */
1337
1338
1339 /**
1340  * Closure for #session_cmp_it().
1341  */
1342 struct GNUNET_ATS_SessionCompareContext
1343 {
1344   /**
1345    * Set to session matching the address.
1346    */
1347   struct GNUNET_ATS_Session *res;
1348
1349   /**
1350    * Address we are looking for.
1351    */
1352   const struct GNUNET_HELLO_Address *address;
1353 };
1354
1355
1356 /**
1357  * Find a session with a matching address.
1358  *
1359  * @param cls the `struct GNUNET_ATS_SessionCompareContext *`
1360  * @param key peer identity (unused)
1361  * @param value the `struct GNUNET_ATS_Session *`
1362  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1363  */
1364 static int
1365 session_cmp_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
1366 {
1367   struct GNUNET_ATS_SessionCompareContext *cctx = cls;
1368   struct GNUNET_ATS_Session *s = value;
1369
1370   if (0 == GNUNET_HELLO_address_cmp (s->address, cctx->address))
1371   {
1372     GNUNET_assert (GNUNET_NO == s->in_destroy);
1373     cctx->res = s;
1374     return GNUNET_NO;
1375   }
1376   return GNUNET_OK;
1377 }
1378
1379
1380 /**
1381  * Locate an existing session the transport service is using to
1382  * send data to another peer.  Performs some basic sanity checks
1383  * on the address and then tries to locate a matching session.
1384  *
1385  * @param cls the plugin
1386  * @param address the address we should locate the session by
1387  * @return the session if it exists, or NULL if it is not found
1388  */
1389 static struct GNUNET_ATS_Session *
1390 udp_plugin_lookup_session (void *cls,
1391                            const struct GNUNET_HELLO_Address *address)
1392 {
1393   struct Plugin *plugin = cls;
1394   const struct IPv6UdpAddress *udp_a6;
1395   const struct IPv4UdpAddress *udp_a4;
1396   struct GNUNET_ATS_SessionCompareContext cctx;
1397
1398   if (NULL == address->address)
1399   {
1400     GNUNET_break (0);
1401     return NULL;
1402   }
1403   if (sizeof(struct IPv4UdpAddress) == address->address_length)
1404   {
1405     if (NULL == plugin->sockv4)
1406       return NULL;
1407     udp_a4 = (const struct IPv4UdpAddress *) address->address;
1408     if (0 == udp_a4->u4_port)
1409     {
1410       GNUNET_break (0);
1411       return NULL;
1412     }
1413   }
1414   else if (sizeof(struct IPv6UdpAddress) == address->address_length)
1415   {
1416     if (NULL == plugin->sockv6)
1417       return NULL;
1418     udp_a6 = (const struct IPv6UdpAddress *) address->address;
1419     if (0 == udp_a6->u6_port)
1420     {
1421       GNUNET_break (0);
1422       return NULL;
1423     }
1424   }
1425   else
1426   {
1427     GNUNET_break (0);
1428     return NULL;
1429   }
1430
1431   /* check if session already exists */
1432   cctx.address = address;
1433   cctx.res = NULL;
1434   LOG (GNUNET_ERROR_TYPE_DEBUG,
1435        "Looking for existing session for peer `%s' with address `%s'\n",
1436        GNUNET_i2s (&address->peer),
1437        udp_address_to_string (plugin,
1438                               address->address,
1439                               address->address_length));
1440   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1441                                               &address->peer,
1442                                               &session_cmp_it,
1443                                               &cctx);
1444   if (NULL == cctx.res)
1445     return NULL;
1446   LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
1447   return cctx.res;
1448 }
1449
1450
1451 /* ********************** Timeout ****************** */
1452
1453
1454 /**
1455  * Increment session timeout due to activity.
1456  *
1457  * @param s session to reschedule timeout activity for
1458  */
1459 static void
1460 reschedule_session_timeout (struct GNUNET_ATS_Session *s)
1461 {
1462   if (GNUNET_YES == s->in_destroy)
1463     return;
1464   GNUNET_assert (NULL != s->timeout_task);
1465   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1466 }
1467
1468
1469 /**
1470  * Function that will be called whenever the transport service wants to
1471  * notify the plugin that a session is still active and in use and
1472  * therefore the session timeout for this session has to be updated
1473  *
1474  * @param cls closure with the `struct Plugin`
1475  * @param peer which peer was the session for
1476  * @param session which session is being updated
1477  */
1478 static void
1479 udp_plugin_update_session_timeout (void *cls,
1480                                    const struct GNUNET_PeerIdentity *peer,
1481                                    struct GNUNET_ATS_Session *session)
1482 {
1483   struct Plugin *plugin = cls;
1484
1485   if (GNUNET_YES !=
1486       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1487                                                     peer,
1488                                                     session))
1489   {
1490     GNUNET_break (0);
1491     return;
1492   }
1493   /* Reschedule session timeout */
1494   reschedule_session_timeout (session);
1495 }
1496
1497
1498 /* ************************* Sending ************************ */
1499
1500
1501 /**
1502  * Remove the given message from the transmission queue and
1503  * update all applicable statistics.
1504  *
1505  * @param plugin the UDP plugin
1506  * @param udpw message wrapper to dequeue
1507  */
1508 static void
1509 dequeue (struct Plugin *plugin, struct UDP_MessageWrapper *udpw)
1510 {
1511   struct GNUNET_ATS_Session *session = udpw->session;
1512
1513   if (plugin->bytes_in_buffer < udpw->msg_size)
1514   {
1515     GNUNET_break (0);
1516   }
1517   else
1518   {
1519     GNUNET_STATISTICS_update (plugin->env->stats,
1520                               "# UDP, total bytes in send buffers",
1521                               -(long long) udpw->msg_size,
1522                               GNUNET_NO);
1523     plugin->bytes_in_buffer -= udpw->msg_size;
1524   }
1525   GNUNET_STATISTICS_update (plugin->env->stats,
1526                             "# UDP, total messages in send buffers",
1527                             -1,
1528                             GNUNET_NO);
1529   if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1530   {
1531     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1532                                  plugin->ipv4_queue_tail,
1533                                  udpw);
1534   }
1535   else if (sizeof(struct IPv6UdpAddress) ==
1536            udpw->session->address->address_length)
1537   {
1538     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1539                                  plugin->ipv6_queue_tail,
1540                                  udpw);
1541   }
1542   else
1543   {
1544     GNUNET_break (0);
1545     return;
1546   }
1547   GNUNET_assert (session->msgs_in_queue > 0);
1548   session->msgs_in_queue--;
1549   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1550   session->bytes_in_queue -= udpw->msg_size;
1551 }
1552
1553
1554 /**
1555  * Enqueue a message for transmission and update statistics.
1556  *
1557  * @param plugin the UDP plugin
1558  * @param udpw message wrapper to queue
1559  */
1560 static void
1561 enqueue (struct Plugin *plugin, struct UDP_MessageWrapper *udpw)
1562 {
1563   struct GNUNET_ATS_Session *session = udpw->session;
1564
1565   if (GNUNET_YES == session->in_destroy)
1566   {
1567     GNUNET_break (0);
1568     GNUNET_free (udpw);
1569     return;
1570   }
1571   if (plugin->bytes_in_buffer > INT64_MAX - udpw->msg_size)
1572   {
1573     GNUNET_break (0);
1574   }
1575   else
1576   {
1577     GNUNET_STATISTICS_update (plugin->env->stats,
1578                               "# UDP, total bytes in send buffers",
1579                               udpw->msg_size,
1580                               GNUNET_NO);
1581     plugin->bytes_in_buffer += udpw->msg_size;
1582   }
1583   GNUNET_STATISTICS_update (plugin->env->stats,
1584                             "# UDP, total messages in send buffers",
1585                             1,
1586                             GNUNET_NO);
1587   if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1588   {
1589     GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head,
1590                                  plugin->ipv4_queue_tail,
1591                                  udpw);
1592   }
1593   else if (sizeof(struct IPv6UdpAddress) ==
1594            udpw->session->address->address_length)
1595   {
1596     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1597                                  plugin->ipv6_queue_tail,
1598                                  udpw);
1599   }
1600   else
1601   {
1602     GNUNET_break (0);
1603     udpw->cont (udpw->cont_cls,
1604                 &session->target,
1605                 GNUNET_SYSERR,
1606                 udpw->msg_size,
1607                 0);
1608     GNUNET_free (udpw);
1609     return;
1610   }
1611   session->msgs_in_queue++;
1612   session->bytes_in_queue += udpw->msg_size;
1613 }
1614
1615
1616 /**
1617  * We have completed our (attempt) to transmit a message that had to
1618  * be fragmented -- either because we got an ACK saying that all
1619  * fragments were received, or because of timeout / disconnect.  Clean
1620  * up our state.
1621  *
1622  * @param frag_ctx fragmentation context to clean up
1623  * @param result #GNUNET_OK if we succeeded (got ACK),
1624  *               #GNUNET_SYSERR if the transmission failed
1625  */
1626 static void
1627 fragmented_message_done (struct UDP_FragmentationContext *frag_ctx, int result)
1628 {
1629   struct Plugin *plugin = frag_ctx->plugin;
1630   struct GNUNET_ATS_Session *s = frag_ctx->session;
1631   struct UDP_MessageWrapper *udpw;
1632   struct UDP_MessageWrapper *tmp;
1633   size_t overhead;
1634   struct GNUNET_TIME_Relative delay;
1635
1636   LOG (GNUNET_ERROR_TYPE_DEBUG,
1637        "%p: Fragmented message removed with result %s\n",
1638        frag_ctx,
1639        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1640   /* Call continuation for fragmented message */
1641   if (frag_ctx->on_wire_size >= frag_ctx->payload_size)
1642     overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
1643   else
1644     overhead = frag_ctx->on_wire_size;
1645   delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
1646   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1647   {
1648     LOG (GNUNET_ERROR_TYPE_WARNING,
1649          "Fragmented message acknowledged after %s (expected at %s)\n",
1650          GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES),
1651          GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
1652   }
1653   else
1654   {
1655     LOG (GNUNET_ERROR_TYPE_DEBUG,
1656          "Fragmented message acknowledged after %s (expected at %s)\n",
1657          GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES),
1658          GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
1659   }
1660
1661   if (NULL != frag_ctx->cont)
1662     frag_ctx->cont (frag_ctx->cont_cls,
1663                     &s->target,
1664                     result,
1665                     s->frag_ctx->payload_size,
1666                     frag_ctx->on_wire_size);
1667   GNUNET_STATISTICS_update (plugin->env->stats,
1668                             "# UDP, fragmented messages active",
1669                             -1,
1670                             GNUNET_NO);
1671
1672   if (GNUNET_OK == result)
1673   {
1674     GNUNET_STATISTICS_update (plugin->env->stats,
1675                               "# UDP, fragmented msgs, messages, sent, success",
1676                               1,
1677                               GNUNET_NO);
1678     GNUNET_STATISTICS_update (plugin->env->stats,
1679                               "# UDP, fragmented msgs, bytes payload, sent, success",
1680                               s->frag_ctx->payload_size,
1681                               GNUNET_NO);
1682     GNUNET_STATISTICS_update (
1683       plugin->env->stats,
1684       "# UDP, fragmented msgs, bytes overhead, sent, success",
1685       overhead,
1686       GNUNET_NO);
1687     GNUNET_STATISTICS_update (plugin->env->stats,
1688                               "# UDP, total, bytes overhead, sent",
1689                               overhead,
1690                               GNUNET_NO);
1691     GNUNET_STATISTICS_update (plugin->env->stats,
1692                               "# UDP, total, bytes payload, sent",
1693                               s->frag_ctx->payload_size,
1694                               GNUNET_NO);
1695   }
1696   else
1697   {
1698     GNUNET_STATISTICS_update (plugin->env->stats,
1699                               "# UDP, fragmented msgs, messages, sent, failure",
1700                               1,
1701                               GNUNET_NO);
1702     GNUNET_STATISTICS_update (plugin->env->stats,
1703                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1704                               s->frag_ctx->payload_size,
1705                               GNUNET_NO);
1706     GNUNET_STATISTICS_update (plugin->env->stats,
1707                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1708                               overhead,
1709                               GNUNET_NO);
1710     GNUNET_STATISTICS_update (plugin->env->stats,
1711                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1712                               overhead,
1713                               GNUNET_NO);
1714   }
1715
1716   /* Remove remaining fragments from queue, no need to transmit those
1717      any longer. */
1718   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1719   {
1720     udpw = plugin->ipv6_queue_head;
1721     while (NULL != udpw)
1722     {
1723       tmp = udpw->next;
1724       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == frag_ctx))
1725       {
1726         dequeue (plugin, udpw);
1727         GNUNET_free (udpw);
1728       }
1729       udpw = tmp;
1730     }
1731   }
1732   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1733   {
1734     udpw = plugin->ipv4_queue_head;
1735     while (NULL != udpw)
1736     {
1737       tmp = udpw->next;
1738       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == frag_ctx))
1739       {
1740         dequeue (plugin, udpw);
1741         GNUNET_free (udpw);
1742       }
1743       udpw = tmp;
1744     }
1745   }
1746   notify_session_monitor (s->plugin, s, GNUNET_TRANSPORT_SS_UPDATE);
1747   GNUNET_FRAGMENT_context_destroy (frag_ctx->frag,
1748                                    &s->last_expected_msg_delay,
1749                                    &s->last_expected_ack_delay);
1750   s->frag_ctx = NULL;
1751   GNUNET_free (frag_ctx);
1752 }
1753
1754
1755 /**
1756  * We are finished with a fragment in the message queue.
1757  * Notify the continuation and update statistics.
1758  *
1759  * @param cls the `struct Plugin *`
1760  * @param udpw the queue entry
1761  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1762  */
1763 static void
1764 qc_fragment_sent (void *cls, struct UDP_MessageWrapper *udpw, int result)
1765 {
1766   struct Plugin *plugin = cls;
1767
1768   GNUNET_assert (NULL != udpw->frag_ctx);
1769   if (GNUNET_OK == result)
1770   {
1771     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1772                 "Fragment of message with %u bytes transmitted to %s\n",
1773                 (unsigned int) udpw->payload_size,
1774                 GNUNET_i2s (&udpw->session->target));
1775     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1776     GNUNET_STATISTICS_update (plugin->env->stats,
1777                               "# UDP, fragmented msgs, fragments, sent, success",
1778                               1,
1779                               GNUNET_NO);
1780     GNUNET_STATISTICS_update (
1781       plugin->env->stats,
1782       "# UDP, fragmented msgs, fragments bytes, sent, success",
1783       udpw->msg_size,
1784       GNUNET_NO);
1785   }
1786   else
1787   {
1788     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1789                 "Failed to transmit fragment of message with %u bytes to %s\n",
1790                 (unsigned int) udpw->payload_size,
1791                 GNUNET_i2s (&udpw->session->target));
1792     fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
1793     GNUNET_STATISTICS_update (plugin->env->stats,
1794                               "# UDP, fragmented msgs, fragments, sent, failure",
1795                               1,
1796                               GNUNET_NO);
1797     GNUNET_STATISTICS_update (
1798       plugin->env->stats,
1799       "# UDP, fragmented msgs, fragments bytes, sent, failure",
1800       udpw->msg_size,
1801       GNUNET_NO);
1802   }
1803 }
1804
1805
1806 /**
1807  * Function that is called with messages created by the fragmentation
1808  * module.  In the case of the `proc` callback of the
1809  * #GNUNET_FRAGMENT_context_create() function, this function must
1810  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1811  *
1812  * @param cls closure, the `struct UDP_FragmentationContext`
1813  * @param msg the message that was created
1814  */
1815 static void
1816 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1817 {
1818   struct UDP_FragmentationContext *frag_ctx = cls;
1819   struct Plugin *plugin = frag_ctx->plugin;
1820   struct UDP_MessageWrapper *udpw;
1821   struct GNUNET_ATS_Session *session = frag_ctx->session;
1822   size_t msg_len = ntohs (msg->size);
1823
1824   LOG (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes\n", msg_len);
1825   udpw = GNUNET_malloc (sizeof(struct UDP_MessageWrapper) + msg_len);
1826   udpw->session = session;
1827   udpw->msg_buf = (char *) &udpw[1];
1828   udpw->msg_size = msg_len;
1829   udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
1830   udpw->timeout = frag_ctx->timeout;
1831   udpw->start_time = frag_ctx->start_time;
1832   udpw->transmission_time = frag_ctx->next_frag_time;
1833   frag_ctx->next_frag_time =
1834     GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
1835                               frag_ctx->flow_delay_from_other_peer);
1836   udpw->frag_ctx = frag_ctx;
1837   udpw->qc = &qc_fragment_sent;
1838   udpw->qc_cls = plugin;
1839   GNUNET_memcpy (udpw->msg_buf, msg, msg_len);
1840   enqueue (plugin, udpw);
1841   if (session->address->address_length == sizeof(struct IPv4UdpAddress))
1842     schedule_select_v4 (plugin);
1843   else
1844     schedule_select_v6 (plugin);
1845 }
1846
1847
1848 /**
1849  * We are finished with a message from the message queue.
1850  * Notify the continuation and update statistics.
1851  *
1852  * @param cls the `struct Plugin *`
1853  * @param udpw the queue entry
1854  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1855  */
1856 static void
1857 qc_message_sent (void *cls, struct UDP_MessageWrapper *udpw, int result)
1858 {
1859   struct Plugin *plugin = cls;
1860   size_t overhead;
1861   struct GNUNET_TIME_Relative delay;
1862
1863   if (udpw->msg_size >= udpw->payload_size)
1864     overhead = udpw->msg_size - udpw->payload_size;
1865   else
1866     overhead = udpw->msg_size;
1867
1868   if (NULL != udpw->cont)
1869   {
1870     delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
1871     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1872     {
1873       LOG (GNUNET_ERROR_TYPE_WARNING,
1874            "Message sent via UDP with delay of %s\n",
1875            GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1876     }
1877     else
1878     {
1879       LOG (GNUNET_ERROR_TYPE_DEBUG,
1880            "Message sent via UDP with delay of %s\n",
1881            GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1882     }
1883     udpw->cont (udpw->cont_cls,
1884                 &udpw->session->target,
1885                 result,
1886                 udpw->payload_size,
1887                 overhead);
1888   }
1889   if (GNUNET_OK == result)
1890   {
1891     GNUNET_STATISTICS_update (plugin->env->stats,
1892                               "# UDP, unfragmented msgs, messages, sent, success",
1893                               1,
1894                               GNUNET_NO);
1895     GNUNET_STATISTICS_update (
1896       plugin->env->stats,
1897       "# UDP, unfragmented msgs, bytes payload, sent, success",
1898       udpw->payload_size,
1899       GNUNET_NO);
1900     GNUNET_STATISTICS_update (
1901       plugin->env->stats,
1902       "# UDP, unfragmented msgs, bytes overhead, sent, success",
1903       overhead,
1904       GNUNET_NO);
1905     GNUNET_STATISTICS_update (plugin->env->stats,
1906                               "# UDP, total, bytes overhead, sent",
1907                               overhead,
1908                               GNUNET_NO);
1909     GNUNET_STATISTICS_update (plugin->env->stats,
1910                               "# UDP, total, bytes payload, sent",
1911                               udpw->payload_size,
1912                               GNUNET_NO);
1913   }
1914   else
1915   {
1916     GNUNET_STATISTICS_update (plugin->env->stats,
1917                               "# UDP, unfragmented msgs, messages, sent, failure",
1918                               1,
1919                               GNUNET_NO);
1920     GNUNET_STATISTICS_update (
1921       plugin->env->stats,
1922       "# UDP, unfragmented msgs, bytes payload, sent, failure",
1923       udpw->payload_size,
1924       GNUNET_NO);
1925     GNUNET_STATISTICS_update (
1926       plugin->env->stats,
1927       "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1928       overhead,
1929       GNUNET_NO);
1930   }
1931 }
1932
1933
1934 /**
1935  * Function that can be used by the transport service to transmit a
1936  * message using the plugin.  Note that in the case of a peer
1937  * disconnecting, the continuation MUST be called prior to the
1938  * disconnect notification itself.  This function will be called with
1939  * this peer's HELLO message to initiate a fresh connection to another
1940  * peer.
1941  *
1942  * @param cls closure
1943  * @param s which session must be used
1944  * @param msgbuf the message to transmit
1945  * @param msgbuf_size number of bytes in @a msgbuf
1946  * @param priority how important is the message (most plugins will
1947  *                 ignore message priority and just FIFO)
1948  * @param to how long to wait at most for the transmission (does not
1949  *                require plugins to discard the message after the timeout,
1950  *                just advisory for the desired delay; most plugins will ignore
1951  *                this as well)
1952  * @param cont continuation to call once the message has
1953  *        been transmitted (or if the transport is ready
1954  *        for the next transmission call; or if the
1955  *        peer disconnected...); can be NULL
1956  * @param cont_cls closure for @a cont
1957  * @return number of bytes used (on the physical network, with overheads);
1958  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1959  *         and does NOT mean that the message was not transmitted (DV)
1960  */
1961 static ssize_t
1962 udp_plugin_send (void *cls,
1963                  struct GNUNET_ATS_Session *s,
1964                  const char *msgbuf,
1965                  size_t msgbuf_size,
1966                  unsigned int priority,
1967                  struct GNUNET_TIME_Relative to,
1968                  GNUNET_TRANSPORT_TransmitContinuation cont,
1969                  void *cont_cls)
1970 {
1971   struct Plugin *plugin = cls;
1972   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
1973   struct UDP_FragmentationContext *frag_ctx;
1974   struct UDP_MessageWrapper *udpw;
1975   struct UDPMessage *udp;
1976   char mbuf[udpmlen] GNUNET_ALIGN;
1977   struct GNUNET_TIME_Relative latency;
1978
1979   if ((sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
1980       (NULL == plugin->sockv6))
1981     return GNUNET_SYSERR;
1982   if ((sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
1983       (NULL == plugin->sockv4))
1984     return GNUNET_SYSERR;
1985   if (udpmlen >= GNUNET_MAX_MESSAGE_SIZE)
1986   {
1987     GNUNET_break (0);
1988     return GNUNET_SYSERR;
1989   }
1990   if (GNUNET_YES !=
1991       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1992                                                     &s->target,
1993                                                     s))
1994   {
1995     GNUNET_break (0);
1996     return GNUNET_SYSERR;
1997   }
1998   LOG (GNUNET_ERROR_TYPE_DEBUG,
1999        "UDP transmits %u-byte message to `%s' using address `%s'\n",
2000        udpmlen,
2001        GNUNET_i2s (&s->target),
2002        udp_address_to_string (plugin,
2003                               s->address->address,
2004                               s->address->address_length));
2005
2006   udp = (struct UDPMessage *) mbuf;
2007   udp->header.size = htons (udpmlen);
2008   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2009   udp->reserved = htonl (0);
2010   udp->sender = *plugin->env->my_identity;
2011
2012   /* We do not update the session time out here!  Otherwise this
2013    * session will not timeout since we send keep alive before session
2014    * can timeout.
2015    *
2016    * For UDP we update session timeout only on receive, this will
2017    * cover keep alives, since remote peer will reply with keep alive
2018    * responses!
2019    */if (udpmlen <= UDP_MTU)
2020   {
2021     /* unfragmented message */
2022     udpw = GNUNET_malloc (sizeof(struct UDP_MessageWrapper) + udpmlen);
2023     udpw->session = s;
2024     udpw->msg_buf = (char *) &udpw[1];
2025     udpw->msg_size = udpmlen;   /* message size with UDP overhead */
2026     udpw->payload_size = msgbuf_size;   /* message size without UDP overhead */
2027     udpw->start_time = GNUNET_TIME_absolute_get ();
2028     udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
2029     udpw->transmission_time = s->last_transmit_time;
2030     s->last_transmit_time =
2031       GNUNET_TIME_absolute_add (s->last_transmit_time,
2032                                 s->flow_delay_from_other_peer);
2033     udpw->cont = cont;
2034     udpw->cont_cls = cont_cls;
2035     udpw->frag_ctx = NULL;
2036     udpw->qc = &qc_message_sent;
2037     udpw->qc_cls = plugin;
2038     GNUNET_memcpy (udpw->msg_buf, udp, sizeof(struct UDPMessage));
2039     GNUNET_memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
2040                    msgbuf,
2041                    msgbuf_size);
2042     enqueue (plugin, udpw);
2043     GNUNET_STATISTICS_update (plugin->env->stats,
2044                               "# UDP, unfragmented messages queued total",
2045                               1,
2046                               GNUNET_NO);
2047     GNUNET_STATISTICS_update (plugin->env->stats,
2048                               "# UDP, unfragmented bytes payload queued total",
2049                               msgbuf_size,
2050                               GNUNET_NO);
2051     if (s->address->address_length == sizeof(struct IPv4UdpAddress))
2052       schedule_select_v4 (plugin);
2053     else
2054       schedule_select_v6 (plugin);
2055   }
2056   else
2057   {
2058     /* fragmented message */
2059     if (NULL != s->frag_ctx)
2060       return GNUNET_SYSERR;
2061     GNUNET_memcpy (&udp[1], msgbuf, msgbuf_size);
2062     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2063     frag_ctx->plugin = plugin;
2064     frag_ctx->session = s;
2065     frag_ctx->cont = cont;
2066     frag_ctx->cont_cls = cont_cls;
2067     frag_ctx->start_time = GNUNET_TIME_absolute_get ();
2068     frag_ctx->next_frag_time = s->last_transmit_time;
2069     frag_ctx->flow_delay_from_other_peer =
2070       GNUNET_TIME_relative_divide (s->flow_delay_from_other_peer,
2071                                    1 + (msgbuf_size / UDP_MTU));
2072     frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
2073     frag_ctx->payload_size =
2074       msgbuf_size;   /* unfragmented message size without UDP overhead */
2075     frag_ctx->on_wire_size = 0;   /* bytes with UDP and fragmentation overhead */
2076     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2077                                                      UDP_MTU,
2078                                                      &plugin->tracker,
2079                                                      s->last_expected_msg_delay,
2080                                                      s->last_expected_ack_delay,
2081                                                      &udp->header,
2082                                                      &enqueue_fragment,
2083                                                      frag_ctx);
2084     s->frag_ctx = frag_ctx;
2085     s->last_transmit_time = frag_ctx->next_frag_time;
2086     latency = GNUNET_TIME_absolute_get_remaining (s->last_transmit_time);
2087     if (latency.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
2088       LOG (GNUNET_ERROR_TYPE_WARNING,
2089            "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
2090            GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_YES),
2091            GNUNET_i2s (&s->target),
2092            (unsigned int) s->msgs_in_queue);
2093     else
2094       LOG (GNUNET_ERROR_TYPE_DEBUG,
2095            "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
2096            GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_YES),
2097            GNUNET_i2s (&s->target),
2098            (unsigned int) s->msgs_in_queue);
2099
2100     GNUNET_STATISTICS_update (plugin->env->stats,
2101                               "# UDP, fragmented messages active",
2102                               1,
2103                               GNUNET_NO);
2104     GNUNET_STATISTICS_update (plugin->env->stats,
2105                               "# UDP, fragmented messages, total",
2106                               1,
2107                               GNUNET_NO);
2108     GNUNET_STATISTICS_update (plugin->env->stats,
2109                               "# UDP, fragmented bytes (payload)",
2110                               frag_ctx->payload_size,
2111                               GNUNET_NO);
2112   }
2113   notify_session_monitor (s->plugin, s, GNUNET_TRANSPORT_SS_UPDATE);
2114   return udpmlen;
2115 }
2116
2117
2118 /* ********************** Receiving ********************** */
2119
2120
2121 /**
2122  * Closure for #find_receive_context().
2123  */
2124 struct FindReceiveContext
2125 {
2126   /**
2127    * Where to store the result.
2128    */
2129   struct DefragContext *rc;
2130
2131   /**
2132    * Session associated with this context.
2133    */
2134   struct GNUNET_ATS_Session *session;
2135
2136   /**
2137    * Address to find.
2138    */
2139   const union UdpAddress *udp_addr;
2140
2141   /**
2142    * Number of bytes in @e udp_addr.
2143    */
2144   size_t udp_addr_len;
2145 };
2146
2147
2148 /**
2149  * Scan the heap for a receive context with the given address.
2150  *
2151  * @param cls the `struct FindReceiveContext`
2152  * @param node internal node of the heap
2153  * @param element value stored at the node (a `struct ReceiveContext`)
2154  * @param cost cost associated with the node
2155  * @return #GNUNET_YES if we should continue to iterate,
2156  *         #GNUNET_NO if not.
2157  */
2158 static int
2159 find_receive_context (void *cls,
2160                       struct GNUNET_CONTAINER_HeapNode *node,
2161                       void *element,
2162                       GNUNET_CONTAINER_HeapCostType cost)
2163 {
2164   struct FindReceiveContext *frc = cls;
2165   struct DefragContext *e = element;
2166
2167   if ((frc->udp_addr_len == e->udp_addr_len) &&
2168       (0 == memcmp (frc->udp_addr, e->udp_addr, frc->udp_addr_len)))
2169   {
2170     frc->rc = e;
2171     return GNUNET_NO;
2172   }
2173   return GNUNET_YES;
2174 }
2175
2176
2177 /**
2178  * Functions with this signature are called whenever we need to close
2179  * a session due to a disconnect or failure to establish a connection.
2180  *
2181  * @param cls closure with the `struct Plugin`
2182  * @param s session to close down
2183  * @return #GNUNET_OK on success
2184  */
2185 static int
2186 udp_disconnect_session (void *cls, struct GNUNET_ATS_Session *s)
2187 {
2188   struct Plugin *plugin = cls;
2189   struct UDP_MessageWrapper *udpw;
2190   struct UDP_MessageWrapper *next;
2191   struct FindReceiveContext frc;
2192
2193   GNUNET_assert (GNUNET_YES != s->in_destroy);
2194   LOG (GNUNET_ERROR_TYPE_DEBUG,
2195        "Session %p to peer `%s' at address %s ended\n",
2196        s,
2197        GNUNET_i2s (&s->target),
2198        udp_address_to_string (plugin,
2199                               s->address->address,
2200                               s->address->address_length));
2201   if (NULL != s->timeout_task)
2202   {
2203     GNUNET_SCHEDULER_cancel (s->timeout_task);
2204     s->timeout_task = NULL;
2205   }
2206   if (NULL != s->frag_ctx)
2207   {
2208     /* Remove fragmented message due to disconnect */
2209     fragmented_message_done (s->frag_ctx, GNUNET_SYSERR);
2210   }
2211   GNUNET_assert (
2212     GNUNET_YES ==
2213     GNUNET_CONTAINER_multipeermap_remove (plugin->sessions, &s->target, s));
2214   frc.rc = NULL;
2215   frc.udp_addr = s->address->address;
2216   frc.udp_addr_len = s->address->address_length;
2217   /* Lookup existing receive context for this address */
2218   if (NULL != plugin->defrag_ctxs)
2219   {
2220     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2221                                    &find_receive_context,
2222                                    &frc);
2223     if (NULL != frc.rc)
2224     {
2225       struct DefragContext *d_ctx = frc.rc;
2226
2227       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
2228       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2229       GNUNET_free (d_ctx);
2230     }
2231   }
2232   s->in_destroy = GNUNET_YES;
2233   next = plugin->ipv4_queue_head;
2234   while (NULL != (udpw = next))
2235   {
2236     next = udpw->next;
2237     if (udpw->session == s)
2238     {
2239       dequeue (plugin, udpw);
2240       udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
2241       GNUNET_free (udpw);
2242     }
2243   }
2244   next = plugin->ipv6_queue_head;
2245   while (NULL != (udpw = next))
2246   {
2247     next = udpw->next;
2248     if (udpw->session == s)
2249     {
2250       dequeue (plugin, udpw);
2251       udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
2252       GNUNET_free (udpw);
2253     }
2254   }
2255   if ((NULL != s->frag_ctx) && (NULL != s->frag_ctx->cont))
2256   {
2257     /* The 'frag_ctx' itself will be freed in #free_session() a bit
2258        later, as it might be in use right now */
2259     LOG (GNUNET_ERROR_TYPE_DEBUG,
2260          "Calling continuation for fragemented message to `%s' with result SYSERR\n",
2261          GNUNET_i2s (&s->target));
2262     s->frag_ctx->cont (s->frag_ctx->cont_cls,
2263                        &s->target,
2264                        GNUNET_SYSERR,
2265                        s->frag_ctx->payload_size,
2266                        s->frag_ctx->on_wire_size);
2267   }
2268   notify_session_monitor (s->plugin, s, GNUNET_TRANSPORT_SS_DONE);
2269   plugin->env->session_end (plugin->env->cls, s->address, s);
2270   GNUNET_STATISTICS_set (plugin->env->stats,
2271                          "# UDP sessions active",
2272                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2273                          GNUNET_NO);
2274   if (0 == s->rc)
2275     free_session (s);
2276   return GNUNET_OK;
2277 }
2278
2279
2280 /**
2281  * Handle a #GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK message.
2282  *
2283  * @param plugin the UDP plugin
2284  * @param msg the (presumed) UDP ACK message
2285  * @param udp_addr sender address
2286  * @param udp_addr_len number of bytes in @a udp_addr
2287  */
2288 static void
2289 read_process_ack (struct Plugin *plugin,
2290                   const struct GNUNET_MessageHeader *msg,
2291                   const union UdpAddress *udp_addr,
2292                   socklen_t udp_addr_len)
2293 {
2294   const struct GNUNET_MessageHeader *ack;
2295   const struct UDP_ACK_Message *udp_ack;
2296   struct GNUNET_HELLO_Address *address;
2297   struct GNUNET_ATS_Session *s;
2298   struct GNUNET_TIME_Relative flow_delay;
2299
2300   /* check message format */
2301   if (ntohs (msg->size) <
2302       sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2303   {
2304     GNUNET_break_op (0);
2305     return;
2306   }
2307   udp_ack = (const struct UDP_ACK_Message *) msg;
2308   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2309   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2310   {
2311     GNUNET_break_op (0);
2312     return;
2313   }
2314
2315   /* Locate session */
2316   address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
2317                                            PLUGIN_NAME,
2318                                            udp_addr,
2319                                            udp_addr_len,
2320                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2321   s = udp_plugin_lookup_session (plugin, address);
2322   if (NULL == s)
2323   {
2324     LOG (GNUNET_ERROR_TYPE_WARNING,
2325          "UDP session of address %s for ACK not found\n",
2326          udp_address_to_string (plugin,
2327                                 address->address,
2328                                 address->address_length));
2329     GNUNET_HELLO_address_free (address);
2330     return;
2331   }
2332   if (NULL == s->frag_ctx)
2333   {
2334     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2335          "Fragmentation context of address %s for ACK (%s) not found\n",
2336          udp_address_to_string (plugin,
2337                                 address->address,
2338                                 address->address_length),
2339          GNUNET_FRAGMENT_print_ack (ack));
2340     GNUNET_HELLO_address_free (address);
2341     return;
2342   }
2343   GNUNET_HELLO_address_free (address);
2344
2345   /* evaluate flow delay: how long should we wait between messages? */
2346   if (UINT32_MAX == ntohl (udp_ack->delay))
2347   {
2348     /* Other peer asked for us to terminate the session */
2349     LOG (GNUNET_ERROR_TYPE_INFO,
2350          "Asked to disconnect UDP session of %s\n",
2351          GNUNET_i2s (&udp_ack->sender));
2352     udp_disconnect_session (plugin, s);
2353     return;
2354   }
2355   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2356   if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
2357     LOG (GNUNET_ERROR_TYPE_WARNING,
2358          "We received a sending delay of %s for %s\n",
2359          GNUNET_STRINGS_relative_time_to_string (flow_delay, GNUNET_YES),
2360          GNUNET_i2s (&udp_ack->sender));
2361   else
2362     LOG (GNUNET_ERROR_TYPE_DEBUG,
2363          "We received a sending delay of %s for %s\n",
2364          GNUNET_STRINGS_relative_time_to_string (flow_delay, GNUNET_YES),
2365          GNUNET_i2s (&udp_ack->sender));
2366   /* Flow delay is for the reassembled packet, however, our delay
2367      is per packet, so we need to adjust: */
2368   s->flow_delay_from_other_peer = flow_delay;
2369
2370   /* Handle ACK */
2371   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
2372   {
2373     LOG (GNUNET_ERROR_TYPE_DEBUG,
2374          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2375          (unsigned int) ntohs (msg->size),
2376          GNUNET_i2s (&udp_ack->sender),
2377          udp_address_to_string (plugin, udp_addr, udp_addr_len));
2378     /* Expect more ACKs to arrive */
2379     return;
2380   }
2381
2382   /* Remove fragmented message after successful sending */
2383   LOG (GNUNET_ERROR_TYPE_DEBUG,
2384        "Message from %s at %s full ACK'ed\n",
2385        GNUNET_i2s (&udp_ack->sender),
2386        udp_address_to_string (plugin, udp_addr, udp_addr_len));
2387   fragmented_message_done (s->frag_ctx, GNUNET_OK);
2388 }
2389
2390
2391 /**
2392  * Message tokenizer has broken up an incomming message. Pass it on
2393  * to the service.
2394  *
2395  * @param cls the `struct GNUNET_ATS_Session *`
2396  * @param hdr the actual message
2397  * @return #GNUNET_OK (always)
2398  */
2399 static int
2400 process_inbound_tokenized_messages (void *cls,
2401                                     const struct GNUNET_MessageHeader *hdr)
2402 {
2403   struct GNUNET_ATS_Session *session = cls;
2404   struct Plugin *plugin = session->plugin;
2405
2406   if (GNUNET_YES == session->in_destroy)
2407     return GNUNET_OK;
2408   reschedule_session_timeout (session);
2409   session->flow_delay_for_other_peer =
2410     plugin->env->receive (plugin->env->cls, session->address, session, hdr);
2411   return GNUNET_OK;
2412 }
2413
2414
2415 /**
2416  * Destroy a session, plugin is being unloaded.
2417  *
2418  * @param cls the `struct Plugin`
2419  * @param key hash of public key of target peer
2420  * @param value a `struct PeerSession *` to clean up
2421  * @return #GNUNET_OK (continue to iterate)
2422  */
2423 static int
2424 disconnect_and_free_it (void *cls,
2425                         const struct GNUNET_PeerIdentity *key,
2426                         void *value)
2427 {
2428   struct Plugin *plugin = cls;
2429
2430   udp_disconnect_session (plugin, value);
2431   return GNUNET_OK;
2432 }
2433
2434
2435 /**
2436  * Disconnect from a remote node.  Clean up session if we have one for
2437  * this peer.
2438  *
2439  * @param cls closure for this call (should be handle to Plugin)
2440  * @param target the peeridentity of the peer to disconnect
2441  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
2442  */
2443 static void
2444 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
2445 {
2446   struct Plugin *plugin = cls;
2447
2448   LOG (GNUNET_ERROR_TYPE_DEBUG,
2449        "Disconnecting from peer `%s'\n",
2450        GNUNET_i2s (target));
2451   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
2452                                               target,
2453                                               &disconnect_and_free_it,
2454                                               plugin);
2455 }
2456
2457
2458 /**
2459  * Session was idle, so disconnect it.
2460  *
2461  * @param cls the `struct GNUNET_ATS_Session` to time out
2462  */
2463 static void
2464 session_timeout (void *cls)
2465 {
2466   struct GNUNET_ATS_Session *s = cls;
2467   struct Plugin *plugin = s->plugin;
2468   struct GNUNET_TIME_Relative left;
2469
2470   s->timeout_task = NULL;
2471   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
2472   if (left.rel_value_us > 0)
2473   {
2474     /* not actually our turn yet, but let's at least update
2475        the monitor, it may think we're about to die ... */
2476     notify_session_monitor (s->plugin, s, GNUNET_TRANSPORT_SS_UPDATE);
2477     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, &session_timeout, s);
2478     return;
2479   }
2480   LOG (GNUNET_ERROR_TYPE_DEBUG,
2481        "Session %p was idle for %s, disconnecting\n",
2482        s,
2483        GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
2484                                                GNUNET_YES));
2485   /* call session destroy function */
2486   udp_disconnect_session (plugin, s);
2487 }
2488
2489
2490 /**
2491  * Allocate a new session for the given endpoint address.
2492  * Note that this function does not inform the service
2493  * of the new session, this is the responsibility of the
2494  * caller (if needed).
2495  *
2496  * @param cls the `struct Plugin`
2497  * @param address address of the other peer to use
2498  * @param network_type network type the address belongs to
2499  * @return NULL on error, otherwise session handle
2500  */
2501 static struct GNUNET_ATS_Session *
2502 udp_plugin_create_session (void *cls,
2503                            const struct GNUNET_HELLO_Address *address,
2504                            enum GNUNET_NetworkType network_type)
2505 {
2506   struct Plugin *plugin = cls;
2507   struct GNUNET_ATS_Session *s;
2508
2509   s = GNUNET_new (struct GNUNET_ATS_Session);
2510   s->mst = GNUNET_MST_create (&process_inbound_tokenized_messages, s);
2511   s->plugin = plugin;
2512   s->address = GNUNET_HELLO_address_copy (address);
2513   s->target = address->peer;
2514   s->last_transmit_time = GNUNET_TIME_absolute_get ();
2515   s->last_expected_ack_delay =
2516     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250);
2517   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
2518   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
2519   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
2520   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
2521   s->timeout_task =
2522     GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, &session_timeout, s);
2523   s->scope = network_type;
2524
2525   LOG (GNUNET_ERROR_TYPE_DEBUG,
2526        "Creating new session %p for peer `%s' address `%s'\n",
2527        s,
2528        GNUNET_i2s (&address->peer),
2529        udp_address_to_string (plugin,
2530                               address->address,
2531                               address->address_length));
2532   GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (
2533                    plugin->sessions,
2534                    &s->target,
2535                    s,
2536                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2537   GNUNET_STATISTICS_set (plugin->env->stats,
2538                          "# UDP sessions active",
2539                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2540                          GNUNET_NO);
2541   notify_session_monitor (plugin, s, GNUNET_TRANSPORT_SS_INIT);
2542   return s;
2543 }
2544
2545
2546 /**
2547  * Creates a new outbound session the transport service will use to
2548  * send data to the peer.
2549  *
2550  * @param cls the `struct Plugin *`
2551  * @param address the address
2552  * @return the session or NULL of max connections exceeded
2553  */
2554 static struct GNUNET_ATS_Session *
2555 udp_plugin_get_session (void *cls, const struct GNUNET_HELLO_Address *address)
2556 {
2557   struct Plugin *plugin = cls;
2558   struct GNUNET_ATS_Session *s;
2559   enum GNUNET_NetworkType network_type = GNUNET_NT_UNSPECIFIED;
2560   const struct IPv4UdpAddress *udp_v4;
2561   const struct IPv6UdpAddress *udp_v6;
2562
2563   if (NULL == address)
2564   {
2565     GNUNET_break (0);
2566     return NULL;
2567   }
2568   if ((address->address_length != sizeof(struct IPv4UdpAddress)) &&
2569       (address->address_length != sizeof(struct IPv6UdpAddress)))
2570   {
2571     GNUNET_break_op (0);
2572     return NULL;
2573   }
2574   if (NULL != (s = udp_plugin_lookup_session (cls, address)))
2575     return s;
2576
2577   /* need to create new session */
2578   if (sizeof(struct IPv4UdpAddress) == address->address_length)
2579   {
2580     struct sockaddr_in v4;
2581
2582     udp_v4 = (const struct IPv4UdpAddress *) address->address;
2583     memset (&v4, '\0', sizeof(v4));
2584     v4.sin_family = AF_INET;
2585 #if HAVE_SOCKADDR_IN_SIN_LEN
2586     v4.sin_len = sizeof(struct sockaddr_in);
2587 #endif
2588     v4.sin_port = udp_v4->u4_port;
2589     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
2590     network_type = plugin->env->get_address_type (plugin->env->cls,
2591                                                   (const struct sockaddr *) &v4,
2592                                                   sizeof(v4));
2593   }
2594   if (sizeof(struct IPv6UdpAddress) == address->address_length)
2595   {
2596     struct sockaddr_in6 v6;
2597
2598     udp_v6 = (const struct IPv6UdpAddress *) address->address;
2599     memset (&v6, '\0', sizeof(v6));
2600     v6.sin6_family = AF_INET6;
2601 #if HAVE_SOCKADDR_IN_SIN_LEN
2602     v6.sin6_len = sizeof(struct sockaddr_in6);
2603 #endif
2604     v6.sin6_port = udp_v6->u6_port;
2605     v6.sin6_addr = udp_v6->ipv6_addr;
2606     network_type = plugin->env->get_address_type (plugin->env->cls,
2607                                                   (const struct sockaddr *) &v6,
2608                                                   sizeof(v6));
2609   }
2610   GNUNET_break (GNUNET_NT_UNSPECIFIED != network_type);
2611   return udp_plugin_create_session (cls, address, network_type);
2612 }
2613
2614
2615 /**
2616  * We've received a UDP Message.  Process it (pass contents to main service).
2617  *
2618  * @param plugin plugin context
2619  * @param msg the message
2620  * @param udp_addr sender address
2621  * @param udp_addr_len number of bytes in @a udp_addr
2622  * @param network_type network type the address belongs to
2623  */
2624 static void
2625 process_udp_message (struct Plugin *plugin,
2626                      const struct UDPMessage *msg,
2627                      const union UdpAddress *udp_addr,
2628                      size_t udp_addr_len,
2629                      enum GNUNET_NetworkType network_type)
2630 {
2631   struct GNUNET_ATS_Session *s;
2632   struct GNUNET_HELLO_Address *address;
2633
2634   GNUNET_break (GNUNET_NT_UNSPECIFIED != network_type);
2635   if (0 != ntohl (msg->reserved))
2636   {
2637     GNUNET_break_op (0);
2638     return;
2639   }
2640   if (ntohs (msg->header.size) <
2641       sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2642   {
2643     GNUNET_break_op (0);
2644     return;
2645   }
2646
2647   address = GNUNET_HELLO_address_allocate (&msg->sender,
2648                                            PLUGIN_NAME,
2649                                            udp_addr,
2650                                            udp_addr_len,
2651                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2652   if (NULL == (s = udp_plugin_lookup_session (plugin, address)))
2653   {
2654     s = udp_plugin_create_session (plugin, address, network_type);
2655     plugin->env->session_start (plugin->env->cls, address, s, s->scope);
2656     notify_session_monitor (plugin, s, GNUNET_TRANSPORT_SS_UP);
2657   }
2658   GNUNET_free (address);
2659
2660   s->rc++;
2661   GNUNET_MST_from_buffer (s->mst,
2662                           (const char *) &msg[1],
2663                           ntohs (msg->header.size) - sizeof(struct UDPMessage),
2664                           GNUNET_YES,
2665                           GNUNET_NO);
2666   s->rc--;
2667   if ((0 == s->rc) && (GNUNET_YES == s->in_destroy))
2668     free_session (s);
2669 }
2670
2671
2672 /**
2673  * Process a defragmented message.
2674  *
2675  * @param cls the `struct DefragContext *`
2676  * @param msg the message
2677  */
2678 static void
2679 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
2680 {
2681   struct DefragContext *dc = cls;
2682   const struct UDPMessage *um;
2683
2684   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2685   {
2686     GNUNET_break_op (0);
2687     return;
2688   }
2689   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2690   {
2691     GNUNET_break_op (0);
2692     return;
2693   }
2694   um = (const struct UDPMessage *) msg;
2695   dc->sender = um->sender;
2696   dc->have_sender = GNUNET_YES;
2697   process_udp_message (dc->plugin,
2698                        um,
2699                        dc->udp_addr,
2700                        dc->udp_addr_len,
2701                        dc->network_type);
2702 }
2703
2704
2705 /**
2706  * We finished sending an acknowledgement.  Update
2707  * statistics.
2708  *
2709  * @param cls the `struct Plugin`
2710  * @param udpw message queue entry of the ACK
2711  * @param result #GNUNET_OK if the transmission worked,
2712  *               #GNUNET_SYSERR if we failed to send the ACK
2713  */
2714 static void
2715 ack_message_sent (void *cls, struct UDP_MessageWrapper *udpw, int result)
2716 {
2717   struct Plugin *plugin = cls;
2718
2719   if (GNUNET_OK == result)
2720   {
2721     GNUNET_STATISTICS_update (plugin->env->stats,
2722                               "# UDP, ACK messages sent",
2723                               1,
2724                               GNUNET_NO);
2725   }
2726   else
2727   {
2728     GNUNET_STATISTICS_update (plugin->env->stats,
2729                               "# UDP, ACK transmissions failed",
2730                               1,
2731                               GNUNET_NO);
2732   }
2733 }
2734
2735
2736 /**
2737  * Transmit an acknowledgement.
2738  *
2739  * @param cls the `struct DefragContext *`
2740  * @param id message ID (unused)
2741  * @param msg ack to transmit
2742  */
2743 static void
2744 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
2745 {
2746   struct DefragContext *rc = cls;
2747   struct Plugin *plugin = rc->plugin;
2748   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2749   struct UDP_ACK_Message *udp_ack;
2750   uint32_t delay;
2751   struct UDP_MessageWrapper *udpw;
2752   struct GNUNET_ATS_Session *s;
2753   struct GNUNET_HELLO_Address *address;
2754
2755   if (GNUNET_NO == rc->have_sender)
2756   {
2757     /* tried to defragment but never succeeded, hence will not ACK */
2758     /* This can happen if we just lost msgs */
2759     GNUNET_STATISTICS_update (plugin->env->stats,
2760                               "# UDP, fragments discarded without ACK",
2761                               1,
2762                               GNUNET_NO);
2763     return;
2764   }
2765   address = GNUNET_HELLO_address_allocate (&rc->sender,
2766                                            PLUGIN_NAME,
2767                                            rc->udp_addr,
2768                                            rc->udp_addr_len,
2769                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2770   s = udp_plugin_lookup_session (plugin, address);
2771   GNUNET_HELLO_address_free (address);
2772   if (NULL == s)
2773   {
2774     LOG (GNUNET_ERROR_TYPE_ERROR,
2775          "Trying to transmit ACK to peer `%s' but no session found!\n",
2776          udp_address_to_string (plugin, rc->udp_addr, rc->udp_addr_len));
2777     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2778     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2779     GNUNET_free (rc);
2780     GNUNET_STATISTICS_update (plugin->env->stats,
2781                               "# UDP, ACK transmissions failed",
2782                               1,
2783                               GNUNET_NO);
2784     return;
2785   }
2786   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us ==
2787       s->flow_delay_for_other_peer.rel_value_us)
2788     delay = UINT32_MAX;
2789   else if (s->flow_delay_for_other_peer.rel_value_us < UINT32_MAX)
2790     delay = s->flow_delay_for_other_peer.rel_value_us;
2791   else
2792     delay = UINT32_MAX - 1; /* largest value we can communicate */
2793   LOG (GNUNET_ERROR_TYPE_DEBUG,
2794        "Sending ACK to `%s' including delay of %s\n",
2795        udp_address_to_string (plugin, rc->udp_addr, rc->udp_addr_len),
2796        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2797                                                GNUNET_YES));
2798   udpw = GNUNET_malloc (sizeof(struct UDP_MessageWrapper) + msize);
2799   udpw->msg_size = msize;
2800   udpw->payload_size = 0;
2801   udpw->session = s;
2802   udpw->start_time = GNUNET_TIME_absolute_get ();
2803   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2804   udpw->msg_buf = (char *) &udpw[1];
2805   udpw->qc = &ack_message_sent;
2806   udpw->qc_cls = plugin;
2807   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2808   udp_ack->header.size = htons ((uint16_t) msize);
2809   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2810   udp_ack->delay = htonl (delay);
2811   udp_ack->sender = *plugin->env->my_identity;
2812   GNUNET_memcpy (&udp_ack[1], msg, ntohs (msg->size));
2813   enqueue (plugin, udpw);
2814   notify_session_monitor (plugin, s, GNUNET_TRANSPORT_SS_UPDATE);
2815   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
2816     schedule_select_v4 (plugin);
2817   else
2818     schedule_select_v6 (plugin);
2819 }
2820
2821
2822 /**
2823  * We received a fragment, process it.
2824  *
2825  * @param plugin our plugin
2826  * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2827  * @param udp_addr sender address
2828  * @param udp_addr_len number of bytes in @a udp_addr
2829  * @param network_type network type the address belongs to
2830  */
2831 static void
2832 read_process_fragment (struct Plugin *plugin,
2833                        const struct GNUNET_MessageHeader *msg,
2834                        const union UdpAddress *udp_addr,
2835                        size_t udp_addr_len,
2836                        enum GNUNET_NetworkType network_type)
2837 {
2838   struct DefragContext *d_ctx;
2839   struct GNUNET_TIME_Absolute now;
2840   struct FindReceiveContext frc;
2841
2842   frc.rc = NULL;
2843   frc.udp_addr = udp_addr;
2844   frc.udp_addr_len = udp_addr_len;
2845
2846   /* Lookup existing receive context for this address */
2847   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2848                                  &find_receive_context,
2849                                  &frc);
2850   now = GNUNET_TIME_absolute_get ();
2851   d_ctx = frc.rc;
2852
2853   if (NULL == d_ctx)
2854   {
2855     /* Create a new defragmentation context */
2856     d_ctx = GNUNET_malloc (sizeof(struct DefragContext) + udp_addr_len);
2857     GNUNET_memcpy (&d_ctx[1], udp_addr, udp_addr_len);
2858     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
2859     d_ctx->udp_addr_len = udp_addr_len;
2860     d_ctx->network_type = network_type;
2861     d_ctx->plugin = plugin;
2862     d_ctx->defrag =
2863       GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2864                                         UDP_MTU,
2865                                         UDP_MAX_MESSAGES_IN_DEFRAG,
2866                                         d_ctx,
2867                                         &fragment_msg_proc,
2868                                         &ack_proc);
2869     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2870                                                  d_ctx,
2871                                                  (GNUNET_CONTAINER_HeapCostType)
2872                                                  now.abs_value_us);
2873     LOG (GNUNET_ERROR_TYPE_DEBUG,
2874          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2875          (unsigned int) ntohs (msg->size),
2876          udp_address_to_string (plugin, udp_addr, udp_addr_len));
2877   }
2878   else
2879   {
2880     LOG (GNUNET_ERROR_TYPE_DEBUG,
2881          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2882          (unsigned int) ntohs (msg->size),
2883          udp_address_to_string (plugin, udp_addr, udp_addr_len));
2884   }
2885
2886   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2887   {
2888     /* keep this 'rc' from expiring */
2889     GNUNET_CONTAINER_heap_update_cost (d_ctx->hnode,
2890                                        (GNUNET_CONTAINER_HeapCostType)
2891                                        now.abs_value_us);
2892   }
2893   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2894       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2895   {
2896     /* remove 'rc' that was inactive the longest */
2897     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2898     GNUNET_assert (NULL != d_ctx);
2899     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2900     GNUNET_free (d_ctx);
2901     GNUNET_STATISTICS_update (plugin->env->stats,
2902                               "# UDP, Defragmentations aborted",
2903                               1,
2904                               GNUNET_NO);
2905   }
2906 }
2907
2908
2909 /**
2910  * Read and process a message from the given socket.
2911  *
2912  * @param plugin the overall plugin
2913  * @param rsock socket to read from
2914  */
2915 static void
2916 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
2917 {
2918   socklen_t fromlen;
2919   struct sockaddr_storage addr;
2920   char buf[65536] GNUNET_ALIGN;
2921   ssize_t size;
2922   const struct GNUNET_MessageHeader *msg;
2923   struct IPv4UdpAddress v4;
2924   struct IPv6UdpAddress v6;
2925   const struct sockaddr *sa;
2926   const struct sockaddr_in *sa4;
2927   const struct sockaddr_in6 *sa6;
2928   const union UdpAddress *int_addr;
2929   size_t int_addr_len;
2930   enum GNUNET_NetworkType network_type;
2931
2932   fromlen = sizeof(addr);
2933   memset (&addr, 0, sizeof(addr));
2934   size = GNUNET_NETWORK_socket_recvfrom (rsock,
2935                                          buf,
2936                                          sizeof(buf),
2937                                          (struct sockaddr *) &addr,
2938                                          &fromlen);
2939   sa = (const struct sockaddr *) &addr;
2940
2941   if (-1 == size)
2942   {
2943     LOG (GNUNET_ERROR_TYPE_DEBUG,
2944          "UDP failed to receive data: %s\n",
2945          strerror (errno));
2946     /* Connection failure or something. Not a protocol violation. */
2947     return;
2948   }
2949
2950   /* Check if this is a STUN packet */
2951   if (GNUNET_NO !=
2952       GNUNET_NAT_stun_handle_packet (plugin->nat,
2953                                      (const struct sockaddr *) &addr,
2954                                      fromlen,
2955                                      buf,
2956                                      size))
2957     return; /* was STUN, do not process further */
2958
2959   if (size < sizeof(struct GNUNET_MessageHeader))
2960   {
2961     LOG (GNUNET_ERROR_TYPE_WARNING,
2962          "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
2963          (unsigned int) size,
2964          GNUNET_a2s (sa, fromlen));
2965     /* _MAY_ be a connection failure (got partial message) */
2966     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2967     GNUNET_break_op (0);
2968     return;
2969   }
2970
2971   msg = (const struct GNUNET_MessageHeader *) buf;
2972   LOG (GNUNET_ERROR_TYPE_DEBUG,
2973        "UDP received %u-byte message from `%s' type %u\n",
2974        (unsigned int) size,
2975        GNUNET_a2s (sa, fromlen),
2976        ntohs (msg->type));
2977   if (size != ntohs (msg->size))
2978   {
2979     LOG (GNUNET_ERROR_TYPE_WARNING,
2980          "UDP malformed message (size %u) header from %s\n",
2981          (unsigned int) size,
2982          GNUNET_a2s (sa, fromlen));
2983     GNUNET_break_op (0);
2984     return;
2985   }
2986   GNUNET_STATISTICS_update (plugin->env->stats,
2987                             "# UDP, total bytes received",
2988                             size,
2989                             GNUNET_NO);
2990   network_type = plugin->env->get_address_type (plugin->env->cls, sa, fromlen);
2991   switch (sa->sa_family)
2992   {
2993   case AF_INET:
2994     sa4 = (const struct sockaddr_in *) &addr;
2995     v4.options = 0;
2996     v4.ipv4_addr = sa4->sin_addr.s_addr;
2997     v4.u4_port = sa4->sin_port;
2998     int_addr = (union UdpAddress *) &v4;
2999     int_addr_len = sizeof(v4);
3000     break;
3001
3002   case AF_INET6:
3003     sa6 = (const struct sockaddr_in6 *) &addr;
3004     v6.options = 0;
3005     v6.ipv6_addr = sa6->sin6_addr;
3006     v6.u6_port = sa6->sin6_port;
3007     int_addr = (union UdpAddress *) &v6;
3008     int_addr_len = sizeof(v6);
3009     break;
3010
3011   default:
3012     GNUNET_break (0);
3013     return;
3014   }
3015
3016   switch (ntohs (msg->type))
3017   {
3018   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
3019     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
3020       udp_broadcast_receive (plugin,
3021                              buf,
3022                              size,
3023                              int_addr,
3024                              int_addr_len,
3025                              network_type);
3026     return;
3027
3028   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
3029     if (ntohs (msg->size) < sizeof(struct UDPMessage))
3030     {
3031       GNUNET_break_op (0);
3032       return;
3033     }
3034     process_udp_message (plugin,
3035                          (const struct UDPMessage *) msg,
3036                          int_addr,
3037                          int_addr_len,
3038                          network_type);
3039     return;
3040
3041   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
3042     read_process_ack (plugin, msg, int_addr, int_addr_len);
3043     return;
3044
3045   case GNUNET_MESSAGE_TYPE_FRAGMENT:
3046     read_process_fragment (plugin, msg, int_addr, int_addr_len, network_type);
3047     return;
3048
3049   default:
3050     GNUNET_break_op (0);
3051     return;
3052   }
3053 }
3054
3055
3056 /**
3057  * Removes messages from the transmission queue that have
3058  * timed out, and then selects a message that should be
3059  * transmitted next.
3060  *
3061  * @param plugin the UDP plugin
3062  * @param sock which socket should we process the queue for (v4 or v6)
3063  * @return message selected for transmission, or NULL for none
3064  */
3065 static struct UDP_MessageWrapper *
3066 remove_timeout_messages_and_select (struct Plugin *plugin,
3067                                     struct GNUNET_NETWORK_Handle *sock)
3068 {
3069   struct UDP_MessageWrapper *udpw;
3070   struct GNUNET_TIME_Relative remaining;
3071   struct GNUNET_ATS_Session *session;
3072   int removed;
3073
3074   removed = GNUNET_NO;
3075   udpw = (sock == plugin->sockv4) ? plugin->ipv4_queue_head
3076          : plugin->ipv6_queue_head;
3077   while (NULL != udpw)
3078   {
3079     session = udpw->session;
3080     /* Find messages with timeout */
3081     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
3082     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
3083     {
3084       /* Message timed out */
3085       removed = GNUNET_YES;
3086       dequeue (plugin, udpw);
3087       udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
3088       GNUNET_free (udpw);
3089
3090       if (sock == plugin->sockv4)
3091       {
3092         udpw = plugin->ipv4_queue_head;
3093       }
3094       else if (sock == plugin->sockv6)
3095       {
3096         udpw = plugin->ipv6_queue_head;
3097       }
3098       else
3099       {
3100         GNUNET_break (0);      /* should never happen */
3101         udpw = NULL;
3102       }
3103       GNUNET_STATISTICS_update (plugin->env->stats,
3104                                 "# messages discarded due to timeout",
3105                                 1,
3106                                 GNUNET_NO);
3107     }
3108     else
3109     {
3110       /* Message did not time out, check transmission time */
3111       remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
3112       if (0 == remaining.rel_value_us)
3113       {
3114         /* this message is not delayed */
3115         LOG (GNUNET_ERROR_TYPE_DEBUG,
3116              "Message for peer `%s' (%u bytes) is not delayed \n",
3117              GNUNET_i2s (&udpw->session->target),
3118              udpw->payload_size);
3119         break;       /* Found message to send, break */
3120       }
3121       else
3122       {
3123         /* Message is delayed, try next */
3124         LOG (GNUNET_ERROR_TYPE_DEBUG,
3125              "Message for peer `%s' (%u bytes) is delayed for %s\n",
3126              GNUNET_i2s (&udpw->session->target),
3127              udpw->payload_size,
3128              GNUNET_STRINGS_relative_time_to_string (remaining, GNUNET_YES));
3129         udpw = udpw->next;
3130       }
3131     }
3132   }
3133   if (GNUNET_YES == removed)
3134     notify_session_monitor (session->plugin,
3135                             session,
3136                             GNUNET_TRANSPORT_SS_UPDATE);
3137   return udpw;
3138 }
3139
3140
3141 /**
3142  * We failed to transmit a message via UDP. Generate
3143  * a descriptive error message.
3144  *
3145  * @param plugin our plugin
3146  * @param sa target address we were trying to reach
3147  * @param slen number of bytes in @a sa
3148  * @param error the errno value returned from the sendto() call
3149  */
3150 static void
3151 analyze_send_error (struct Plugin *plugin,
3152                     const struct sockaddr *sa,
3153                     socklen_t slen,
3154                     int error)
3155 {
3156   enum GNUNET_NetworkType type;
3157
3158   type = plugin->env->get_address_type (plugin->env->cls, sa, slen);
3159   if (((GNUNET_NT_LAN == type) || (GNUNET_NT_WAN == type)) &&
3160       ((ENETUNREACH == errno) || (ENETDOWN == errno)))
3161   {
3162     if (slen == sizeof(struct sockaddr_in))
3163     {
3164       /* IPv4: "Network unreachable" or "Network down"
3165        *
3166        * This indicates we do not have connectivity
3167        */
3168       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3169            _ ("UDP could not transmit message to `%s': "
3170               "Network seems down, please check your network configuration\n"),
3171            GNUNET_a2s (sa, slen));
3172     }
3173     if (slen == sizeof(struct sockaddr_in6))
3174     {
3175       /* IPv6: "Network unreachable" or "Network down"
3176        *
3177        * This indicates that this system is IPv6 enabled, but does not
3178        * have a valid global IPv6 address assigned or we do not have
3179        * connectivity
3180        */LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3181            _ (
3182              "UDP could not transmit IPv6 message! "
3183              "Please check your network configuration and disable IPv6 if your "
3184              "connection does not have a global IPv6 address\n"));
3185     }
3186   }
3187   else
3188   {
3189     LOG (GNUNET_ERROR_TYPE_WARNING,
3190          "UDP could not transmit message to `%s': `%s'\n",
3191          GNUNET_a2s (sa, slen),
3192          strerror (error));
3193   }
3194 }
3195
3196
3197 /**
3198  * It is time to try to transmit a UDP message.  Select one
3199  * and send.
3200  *
3201  * @param plugin the plugin
3202  * @param sock which socket (v4/v6) to send on
3203  */
3204 static void
3205 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
3206 {
3207   ssize_t sent;
3208   socklen_t slen;
3209   const struct sockaddr *a;
3210   const struct IPv4UdpAddress *u4;
3211   struct sockaddr_in a4;
3212   const struct IPv6UdpAddress *u6;
3213   struct sockaddr_in6 a6;
3214   struct UDP_MessageWrapper *udpw;
3215
3216   /* Find message(s) to send */
3217   while (NULL != (udpw = remove_timeout_messages_and_select (plugin, sock)))
3218   {
3219     if (sizeof(struct IPv4UdpAddress) ==
3220         udpw->session->address->address_length)
3221     {
3222       u4 = udpw->session->address->address;
3223       memset (&a4, 0, sizeof(a4));
3224       a4.sin_family = AF_INET;
3225 #if HAVE_SOCKADDR_IN_SIN_LEN
3226       a4.sin_len = sizeof(a4);
3227 #endif
3228       a4.sin_port = u4->u4_port;
3229       a4.sin_addr.s_addr = u4->ipv4_addr;
3230       a = (const struct sockaddr *) &a4;
3231       slen = sizeof(a4);
3232     }
3233     else if (sizeof(struct IPv6UdpAddress) ==
3234              udpw->session->address->address_length)
3235     {
3236       u6 = udpw->session->address->address;
3237       memset (&a6, 0, sizeof(a6));
3238       a6.sin6_family = AF_INET6;
3239 #if HAVE_SOCKADDR_IN_SIN_LEN
3240       a6.sin6_len = sizeof(a6);
3241 #endif
3242       a6.sin6_port = u6->u6_port;
3243       a6.sin6_addr = u6->ipv6_addr;
3244       a = (const struct sockaddr *) &a6;
3245       slen = sizeof(a6);
3246     }
3247     else
3248     {
3249       GNUNET_break (0);
3250       dequeue (plugin, udpw);
3251       udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
3252       notify_session_monitor (plugin,
3253                               udpw->session,
3254                               GNUNET_TRANSPORT_SS_UPDATE);
3255       GNUNET_free (udpw);
3256       continue;
3257     }
3258     sent = GNUNET_NETWORK_socket_sendto (sock,
3259                                          udpw->msg_buf,
3260                                          udpw->msg_size,
3261                                          a,
3262                                          slen);
3263     udpw->session->last_transmit_time =
3264       GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
3265                                 udpw->session->last_transmit_time);
3266     dequeue (plugin, udpw);
3267     if (GNUNET_SYSERR == sent)
3268     {
3269       /* Failure */
3270       analyze_send_error (plugin, a, slen, errno);
3271       udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
3272       GNUNET_STATISTICS_update (plugin->env->stats,
3273                                 "# UDP, total, bytes, sent, failure",
3274                                 sent,
3275                                 GNUNET_NO);
3276       GNUNET_STATISTICS_update (plugin->env->stats,
3277                                 "# UDP, total, messages, sent, failure",
3278                                 1,
3279                                 GNUNET_NO);
3280     }
3281     else
3282     {
3283       /* Success */
3284       LOG (GNUNET_ERROR_TYPE_DEBUG,
3285            "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
3286            (unsigned int) (udpw->msg_size),
3287            GNUNET_i2s (&udpw->session->target),
3288            GNUNET_a2s (a, slen),
3289            (int) sent,
3290            (sent < 0) ? strerror (errno) : "ok");
3291       GNUNET_STATISTICS_update (plugin->env->stats,
3292                                 "# UDP, total, bytes, sent, success",
3293                                 sent,
3294                                 GNUNET_NO);
3295       GNUNET_STATISTICS_update (plugin->env->stats,
3296                                 "# UDP, total, messages, sent, success",
3297                                 1,
3298                                 GNUNET_NO);
3299       if (NULL != udpw->frag_ctx)
3300         udpw->frag_ctx->on_wire_size += udpw->msg_size;
3301       udpw->qc (udpw->qc_cls, udpw, GNUNET_OK);
3302     }
3303     notify_session_monitor (plugin, udpw->session, GNUNET_TRANSPORT_SS_UPDATE);
3304     GNUNET_free (udpw);
3305   }
3306 }
3307
3308
3309 /* ***************** Event loop (part 2) *************** */
3310
3311
3312 /**
3313  * We have been notified that our readset has something to read.  We don't
3314  * know which socket needs to be read, so we have to check each one
3315  * Then reschedule this function to be called again once more is available.
3316  *
3317  * @param cls the plugin handle
3318  */
3319 static void
3320 udp_plugin_select_v4 (void *cls)
3321 {
3322   struct Plugin *plugin = cls;
3323   const struct GNUNET_SCHEDULER_TaskContext *tc;
3324
3325   plugin->select_task_v4 = NULL;
3326   if (NULL == plugin->sockv4)
3327     return;
3328   tc = GNUNET_SCHEDULER_get_task_context ();
3329   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3330       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
3331     udp_select_read (plugin, plugin->sockv4);
3332   udp_select_send (plugin, plugin->sockv4);
3333   schedule_select_v4 (plugin);
3334 }
3335
3336
3337 /**
3338  * We have been notified that our readset has something to read.  We don't
3339  * know which socket needs to be read, so we have to check each one
3340  * Then reschedule this function to be called again once more is available.
3341  *
3342  * @param cls the plugin handle
3343  */
3344 static void
3345 udp_plugin_select_v6 (void *cls)
3346 {
3347   struct Plugin *plugin = cls;
3348   const struct GNUNET_SCHEDULER_TaskContext *tc;
3349
3350   plugin->select_task_v6 = NULL;
3351   if (NULL == plugin->sockv6)
3352     return;
3353   tc = GNUNET_SCHEDULER_get_task_context ();
3354   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3355       (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
3356     udp_select_read (plugin, plugin->sockv6);
3357
3358   udp_select_send (plugin, plugin->sockv6);
3359   schedule_select_v6 (plugin);
3360 }
3361
3362
3363 /* ******************* Initialization *************** */
3364
3365
3366 /**
3367  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3368  *
3369  * @param plugin the plugin to initialize
3370  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3371  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3372  * @return number of sockets that were successfully bound
3373  */
3374 static unsigned int
3375 setup_sockets (struct Plugin *plugin,
3376                const struct sockaddr_in6 *bind_v6,
3377                const struct sockaddr_in *bind_v4)
3378 {
3379   int tries;
3380   unsigned int sockets_created = 0;
3381   struct sockaddr_in6 server_addrv6;
3382   struct sockaddr_in server_addrv4;
3383   const struct sockaddr *server_addr;
3384   const struct sockaddr *addrs[2];
3385   socklen_t addrlens[2];
3386   socklen_t addrlen;
3387   int eno;
3388
3389   /* Create IPv6 socket */
3390   eno = EINVAL;
3391   if (GNUNET_YES == plugin->enable_ipv6)
3392   {
3393     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
3394     if (NULL == plugin->sockv6)
3395     {
3396       LOG (GNUNET_ERROR_TYPE_INFO,
3397            _ ("Disabling IPv6 since it is not supported on this system!\n"));
3398       plugin->enable_ipv6 = GNUNET_NO;
3399     }
3400     else
3401     {
3402       memset (&server_addrv6, 0, sizeof(struct sockaddr_in6));
3403 #if HAVE_SOCKADDR_IN_SIN_LEN
3404       server_addrv6.sin6_len = sizeof(struct sockaddr_in6);
3405 #endif
3406       server_addrv6.sin6_family = AF_INET6;
3407       if (NULL != bind_v6)
3408         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3409       else
3410         server_addrv6.sin6_addr = in6addr_any;
3411
3412       if (0 == plugin->port)     /* autodetect */
3413         server_addrv6.sin6_port = htons (
3414           GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3415           + 32000);
3416       else
3417         server_addrv6.sin6_port = htons (plugin->port);
3418       addrlen = sizeof(struct sockaddr_in6);
3419       server_addr = (const struct sockaddr *) &server_addrv6;
3420
3421       tries = 0;
3422       while (tries < 10)
3423       {
3424         LOG (GNUNET_ERROR_TYPE_DEBUG,
3425              "Binding to IPv6 `%s'\n",
3426              GNUNET_a2s (server_addr, addrlen));
3427         /* binding */
3428         if (GNUNET_OK ==
3429             GNUNET_NETWORK_socket_bind (plugin->sockv6, server_addr, addrlen))
3430           break;
3431         eno = errno;
3432         if (0 != plugin->port)
3433         {
3434           tries = 10;         /* fail immediately */
3435           break;         /* bind failed on specific port */
3436         }
3437         /* autodetect */
3438         server_addrv6.sin6_port = htons (
3439           GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3440           + 32000);
3441         tries++;
3442       }
3443       if (tries >= 10)
3444       {
3445         GNUNET_NETWORK_socket_close (plugin->sockv6);
3446         plugin->enable_ipv6 = GNUNET_NO;
3447         plugin->sockv6 = NULL;
3448       }
3449       else
3450       {
3451         plugin->port = ntohs (server_addrv6.sin6_port);
3452       }
3453       if (NULL != plugin->sockv6)
3454       {
3455         LOG (GNUNET_ERROR_TYPE_DEBUG,
3456              "IPv6 UDP socket created listinging at %s\n",
3457              GNUNET_a2s (server_addr, addrlen));
3458         addrs[sockets_created] = server_addr;
3459         addrlens[sockets_created] = addrlen;
3460         sockets_created++;
3461       }
3462       else
3463       {
3464         LOG (GNUNET_ERROR_TYPE_WARNING,
3465              _ ("Failed to bind UDP socket to %s: %s\n"),
3466              GNUNET_a2s (server_addr, addrlen),
3467              strerror (eno));
3468       }
3469     }
3470   }
3471
3472   /* Create IPv4 socket */
3473   eno = EINVAL;
3474   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
3475   if (NULL == plugin->sockv4)
3476   {
3477     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
3478     LOG (GNUNET_ERROR_TYPE_INFO,
3479          _ ("Disabling IPv4 since it is not supported on this system!\n"));
3480     plugin->enable_ipv4 = GNUNET_NO;
3481   }
3482   else
3483   {
3484     memset (&server_addrv4, 0, sizeof(struct sockaddr_in));
3485 #if HAVE_SOCKADDR_IN_SIN_LEN
3486     server_addrv4.sin_len = sizeof(struct sockaddr_in);
3487 #endif
3488     server_addrv4.sin_family = AF_INET;
3489     if (NULL != bind_v4)
3490       server_addrv4.sin_addr = bind_v4->sin_addr;
3491     else
3492       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3493
3494     if (0 == plugin->port)
3495       /* autodetect */
3496       server_addrv4.sin_port = htons (
3497         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
3498     else
3499       server_addrv4.sin_port = htons (plugin->port);
3500
3501     addrlen = sizeof(struct sockaddr_in);
3502     server_addr = (const struct sockaddr *) &server_addrv4;
3503
3504     tries = 0;
3505     while (tries < 10)
3506     {
3507       LOG (GNUNET_ERROR_TYPE_DEBUG,
3508            "Binding to IPv4 `%s'\n",
3509            GNUNET_a2s (server_addr, addrlen));
3510
3511       /* binding */
3512       if (GNUNET_OK ==
3513           GNUNET_NETWORK_socket_bind (plugin->sockv4, server_addr, addrlen))
3514         break;
3515       eno = errno;
3516       if (0 != plugin->port)
3517       {
3518         tries = 10;       /* fail */
3519         break;       /* bind failed on specific port */
3520       }
3521
3522       /* autodetect */
3523       server_addrv4.sin_port = htons (
3524         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
3525       tries++;
3526     }
3527     if (tries >= 10)
3528     {
3529       GNUNET_NETWORK_socket_close (plugin->sockv4);
3530       plugin->enable_ipv4 = GNUNET_NO;
3531       plugin->sockv4 = NULL;
3532     }
3533     else
3534     {
3535       plugin->port = ntohs (server_addrv4.sin_port);
3536     }
3537
3538     if (NULL != plugin->sockv4)
3539     {
3540       LOG (GNUNET_ERROR_TYPE_DEBUG,
3541            "IPv4 socket created on port %s\n",
3542            GNUNET_a2s (server_addr, addrlen));
3543       addrs[sockets_created] = server_addr;
3544       addrlens[sockets_created] = addrlen;
3545       sockets_created++;
3546     }
3547     else
3548     {
3549       LOG (GNUNET_ERROR_TYPE_ERROR,
3550            _ ("Failed to bind UDP socket to %s: %s\n"),
3551            GNUNET_a2s (server_addr, addrlen),
3552            strerror (eno));
3553     }
3554   }
3555
3556   if (0 == sockets_created)
3557   {
3558     LOG (GNUNET_ERROR_TYPE_WARNING, _ ("Failed to open UDP sockets\n"));
3559     return 0;   /* No sockets created, return */
3560   }
3561   schedule_select_v4 (plugin);
3562   schedule_select_v6 (plugin);
3563   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3564                                      "transport-udp",
3565                                      IPPROTO_UDP,
3566                                      sockets_created,
3567                                      addrs,
3568                                      addrlens,
3569                                      &udp_nat_port_map_callback,
3570                                      NULL,
3571                                      plugin);
3572   return sockets_created;
3573 }
3574
3575
3576 /**
3577  * The exported method. Makes the core api available via a global and
3578  * returns the udp transport API.
3579  *
3580  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3581  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3582  */
3583 void *
3584 libgnunet_plugin_transport_udp_init (void *cls)
3585 {
3586   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3587   struct GNUNET_TRANSPORT_PluginFunctions *api;
3588   struct Plugin *p;
3589   unsigned long long port;
3590   unsigned long long aport;
3591   unsigned long long udp_max_bps;
3592   int enable_v6;
3593   int enable_broadcasting;
3594   int enable_broadcasting_recv;
3595   char *bind4_address;
3596   char *bind6_address;
3597   struct GNUNET_TIME_Relative interval;
3598   struct sockaddr_in server_addrv4;
3599   struct sockaddr_in6 server_addrv6;
3600   unsigned int res;
3601   int have_bind4;
3602   int have_bind6;
3603
3604   if (NULL == env->receive)
3605   {
3606     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3607        initialze the plugin or the API */
3608     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3609     api->cls = NULL;
3610     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3611     api->address_to_string = &udp_address_to_string;
3612     api->string_to_address = &udp_string_to_address;
3613     return api;
3614   }
3615
3616   /* Get port number: port == 0 : autodetect a port,
3617   * > 0 : use this port, not given : 2086 default */
3618   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg,
3619                                                           "transport-udp",
3620                                                           "PORT",
3621                                                           &port))
3622     port = 2086;
3623   if (port > 65535)
3624   {
3625     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3626                                "transport-udp",
3627                                "PORT",
3628                                _ ("must be in [0,65535]"));
3629     return NULL;
3630   }
3631   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg,
3632                                                           "transport-udp",
3633                                                           "ADVERTISED_PORT",
3634                                                           &aport))
3635     aport = port;
3636   if (aport > 65535)
3637   {
3638     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3639                                "transport-udp",
3640                                "ADVERTISED_PORT",
3641                                _ ("must be in [0,65535]"));
3642     return NULL;
3643   }
3644
3645   if (GNUNET_YES ==
3646       GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat", "DISABLEV6"))
3647     enable_v6 = GNUNET_NO;
3648   else
3649     enable_v6 = GNUNET_YES;
3650
3651   have_bind4 = GNUNET_NO;
3652   memset (&server_addrv4, 0, sizeof(server_addrv4));
3653   if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_string (env->cfg,
3654                                                            "transport-udp",
3655                                                            "BINDTO",
3656                                                            &bind4_address))
3657   {
3658     LOG (GNUNET_ERROR_TYPE_DEBUG,
3659          "Binding UDP plugin to specific address: `%s'\n",
3660          bind4_address);
3661     if (1 != inet_pton (AF_INET, bind4_address, &server_addrv4.sin_addr))
3662     {
3663       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3664                                  "transport-udp",
3665                                  "BINDTO",
3666                                  _ ("must be valid IPv4 address"));
3667       GNUNET_free (bind4_address);
3668       return NULL;
3669     }
3670     have_bind4 = GNUNET_YES;
3671   }
3672   GNUNET_free_non_null (bind4_address);
3673   have_bind6 = GNUNET_NO;
3674   memset (&server_addrv6, 0, sizeof(server_addrv6));
3675   if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_string (env->cfg,
3676                                                            "transport-udp",
3677                                                            "BINDTO6",
3678                                                            &bind6_address))
3679   {
3680     LOG (GNUNET_ERROR_TYPE_DEBUG,
3681          "Binding udp plugin to specific address: `%s'\n",
3682          bind6_address);
3683     if (1 != inet_pton (AF_INET6, bind6_address, &server_addrv6.sin6_addr))
3684     {
3685       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3686                                  "transport-udp",
3687                                  "BINDTO6",
3688                                  _ ("must be valid IPv6 address"));
3689       GNUNET_free (bind6_address);
3690       return NULL;
3691     }
3692     have_bind6 = GNUNET_YES;
3693   }
3694   GNUNET_free_non_null (bind6_address);
3695
3696   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3697                                                               "transport-udp",
3698                                                               "BROADCAST");
3699   if (enable_broadcasting == GNUNET_SYSERR)
3700     enable_broadcasting = GNUNET_NO;
3701
3702   enable_broadcasting_recv =
3703     GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3704                                           "transport-udp",
3705                                           "BROADCAST_RECEIVE");
3706   if (enable_broadcasting_recv == GNUNET_SYSERR)
3707     enable_broadcasting_recv = GNUNET_YES;
3708
3709   if (GNUNET_SYSERR ==
3710       GNUNET_CONFIGURATION_get_value_time (env->cfg,
3711                                            "transport-udp",
3712                                            "BROADCAST_INTERVAL",
3713                                            &interval))
3714   {
3715     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
3716   }
3717   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg,
3718                                                           "transport-udp",
3719                                                           "MAX_BPS",
3720                                                           &udp_max_bps))
3721   {
3722     /* 50 MB/s == infinity for practical purposes */
3723     udp_max_bps = 1024 * 1024 * 50;
3724   }
3725
3726   p = GNUNET_new (struct Plugin);
3727   p->port = port;
3728   p->aport = aport;
3729   p->broadcast_interval = interval;
3730   p->enable_ipv6 = enable_v6;
3731   p->enable_ipv4 = GNUNET_YES; /* default */
3732   p->enable_broadcasting = enable_broadcasting;
3733   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3734   p->env = env;
3735   p->sessions = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_NO);
3736   p->defrag_ctxs =
3737     GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3738   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3739                                  NULL,
3740                                  NULL,
3741                                  GNUNET_BANDWIDTH_value_init (
3742                                    (uint32_t) udp_max_bps),
3743                                  30);
3744   res = setup_sockets (p,
3745                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3746                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3747   if ((0 == res) || ((NULL == p->sockv4) && (NULL == p->sockv6)))
3748   {
3749     LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to create UDP network sockets\n"));
3750     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3751     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3752     if (NULL != p->nat)
3753       GNUNET_NAT_unregister (p->nat);
3754     GNUNET_free (p);
3755     return NULL;
3756   }
3757
3758   /* Setup broadcasting and receiving beacons */
3759   setup_broadcast (p, &server_addrv6, &server_addrv4);
3760
3761   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3762   api->cls = p;
3763   api->disconnect_session = &udp_disconnect_session;
3764   api->query_keepalive_factor = &udp_query_keepalive_factor;
3765   api->disconnect_peer = &udp_disconnect;
3766   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3767   api->address_to_string = &udp_address_to_string;
3768   api->string_to_address = &udp_string_to_address;
3769   api->check_address = &udp_plugin_check_address;
3770   api->get_session = &udp_plugin_get_session;
3771   api->send = &udp_plugin_send;
3772   api->get_network = &udp_plugin_get_network;
3773   api->get_network_for_address = &udp_plugin_get_network_for_address;
3774   api->update_session_timeout = &udp_plugin_update_session_timeout;
3775   api->setup_monitor = &udp_plugin_setup_monitor;
3776   return api;
3777 }
3778
3779
3780 /**
3781  * Function called on each entry in the defragmentation heap to
3782  * clean it up.
3783  *
3784  * @param cls NULL
3785  * @param node node in the heap (to be removed)
3786  * @param element a `struct DefragContext` to be cleaned up
3787  * @param cost unused
3788  * @return #GNUNET_YES
3789  */
3790 static int
3791 heap_cleanup_iterator (void *cls,
3792                        struct GNUNET_CONTAINER_HeapNode *node,
3793                        void *element,
3794                        GNUNET_CONTAINER_HeapCostType cost)
3795 {
3796   struct DefragContext *d_ctx = element;
3797
3798   GNUNET_CONTAINER_heap_remove_node (node);
3799   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3800   GNUNET_free (d_ctx);
3801   return GNUNET_YES;
3802 }
3803
3804
3805 /**
3806  * The exported method. Makes the core api available via a global and
3807  * returns the udp transport API.
3808  *
3809  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3810  * @return NULL
3811  */
3812 void *
3813 libgnunet_plugin_transport_udp_done (void *cls)
3814 {
3815   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3816   struct Plugin *plugin = api->cls;
3817   struct PrettyPrinterContext *cur;
3818   struct UDP_MessageWrapper *udpw;
3819
3820   if (NULL == plugin)
3821   {
3822     GNUNET_free (api);
3823     return NULL;
3824   }
3825   stop_broadcast (plugin);
3826   if (NULL != plugin->select_task_v4)
3827   {
3828     GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
3829     plugin->select_task_v4 = NULL;
3830   }
3831   if (NULL != plugin->select_task_v6)
3832   {
3833     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3834     plugin->select_task_v6 = NULL;
3835   }
3836   if (NULL != plugin->sockv4)
3837   {
3838     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
3839     plugin->sockv4 = NULL;
3840   }
3841   if (NULL != plugin->sockv6)
3842   {
3843     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
3844     plugin->sockv6 = NULL;
3845   }
3846   if (NULL != plugin->nat)
3847   {
3848     GNUNET_NAT_unregister (plugin->nat);
3849     plugin->nat = NULL;
3850   }
3851   if (NULL != plugin->defrag_ctxs)
3852   {
3853     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
3854                                    &heap_cleanup_iterator,
3855                                    NULL);
3856     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
3857     plugin->defrag_ctxs = NULL;
3858   }
3859   while (NULL != (udpw = plugin->ipv4_queue_head))
3860   {
3861     dequeue (plugin, udpw);
3862     udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
3863     GNUNET_free (udpw);
3864   }
3865   while (NULL != (udpw = plugin->ipv6_queue_head))
3866   {
3867     dequeue (plugin, udpw);
3868     udpw->qc (udpw->qc_cls, udpw, GNUNET_SYSERR);
3869     GNUNET_free (udpw);
3870   }
3871   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3872                                          &disconnect_and_free_it,
3873                                          plugin);
3874   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
3875
3876   while (NULL != (cur = plugin->ppc_dll_head))
3877   {
3878     GNUNET_break (0);
3879     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
3880                                  plugin->ppc_dll_tail,
3881                                  cur);
3882     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
3883     if (NULL != cur->timeout_task)
3884     {
3885       GNUNET_SCHEDULER_cancel (cur->timeout_task);
3886       cur->timeout_task = NULL;
3887     }
3888     GNUNET_free (cur);
3889   }
3890   GNUNET_free (plugin);
3891   GNUNET_free (api);
3892   return NULL;
3893 }
3894
3895
3896 /* end of plugin_transport_udp.c */