5085b70cabfacbdd4c89432235982f5a74e1d30b
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 enum UDP_MessageType
90 {
91   UNDEFINED = 0,
92   MSG_FRAGMENTED = 1,
93   MSG_FRAGMENTED_COMPLETE = 2,
94   MSG_UNFRAGMENTED = 3,
95   MSG_ACK = 4,
96   MSG_BEACON = 5
97 };
98
99 struct Session
100 {
101   /**
102    * Which peer is this session for?
103    */
104   struct GNUNET_PeerIdentity target;
105
106   struct UDP_FragmentationContext * frag_ctx;
107
108   /**
109    * Address of the other peer
110    */
111   const struct sockaddr *sock_addr;
112
113   /**
114    * Desired delay for next sending we send to other peer
115    */
116   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
117
118   /**
119    * Desired delay for next sending we received from other peer
120    */
121   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
122
123   /**
124    * Session timeout task
125    */
126   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
127
128   /**
129    * expected delay for ACKs
130    */
131   struct GNUNET_TIME_Relative last_expected_ack_delay;
132
133   /**
134    * desired delay between UDP messages
135    */
136   struct GNUNET_TIME_Relative last_expected_msg_delay;
137
138   struct GNUNET_ATS_Information ats;
139
140   size_t addrlen;
141
142
143   unsigned int rc;
144
145   int in_destroy;
146 };
147
148
149 struct SessionCompareContext
150 {
151   struct Session *res;
152   const struct GNUNET_HELLO_Address *addr;
153 };
154
155
156 /**
157  * Closure for 'process_inbound_tokenized_messages'
158  */
159 struct SourceInformation
160 {
161   /**
162    * Sender identity.
163    */
164   struct GNUNET_PeerIdentity sender;
165
166   /**
167    * Source address.
168    */
169   const void *arg;
170
171   struct Session *session;
172   /**
173    * Number of bytes in source address.
174    */
175   size_t args;
176
177 };
178
179
180 /**
181  * Closure for 'find_receive_context'.
182  */
183 struct FindReceiveContext
184 {
185   /**
186    * Where to store the result.
187    */
188   struct DefragContext *rc;
189
190   /**
191    * Address to find.
192    */
193   const struct sockaddr *addr;
194
195   struct Session *session;
196
197   /**
198    * Number of bytes in 'addr'.
199    */
200   socklen_t addr_len;
201
202 };
203
204
205
206 /**
207  * Data structure to track defragmentation contexts based
208  * on the source of the UDP traffic.
209  */
210 struct DefragContext
211 {
212
213   /**
214    * Defragmentation context.
215    */
216   struct GNUNET_DEFRAGMENT_Context *defrag;
217
218   /**
219    * Source address this receive context is for (allocated at the
220    * end of the struct).
221    */
222   const struct sockaddr *src_addr;
223
224   /**
225    * Reference to master plugin struct.
226    */
227   struct Plugin *plugin;
228
229   /**
230    * Node in the defrag heap.
231    */
232   struct GNUNET_CONTAINER_HeapNode *hnode;
233
234   /**
235    * Length of 'src_addr'
236    */
237   size_t addr_len;
238 };
239
240
241
242 /**
243  * Context to send fragmented messages
244  */
245 struct UDP_FragmentationContext
246 {
247   /**
248    * Next in linked list
249    */
250   struct UDP_FragmentationContext * next;
251
252   /**
253    * Previous in linked list
254    */
255   struct UDP_FragmentationContext * prev;
256
257   /**
258    * The plugin
259    */
260   struct Plugin * plugin;
261
262   /**
263    * Handle for GNUNET_FRAGMENT context
264    */
265   struct GNUNET_FRAGMENT_Context * frag;
266
267   /**
268    * The session this fragmentation context belongs to
269    */
270   struct Session * session;
271
272   /**
273    * Function to call upon completion of the transmission.
274    */
275   GNUNET_TRANSPORT_TransmitContinuation cont;
276
277   /**
278    * Closure for 'cont'.
279    */
280   void *cont_cls;
281
282   /**
283    * Message timeout
284    */
285   struct GNUNET_TIME_Absolute timeout;
286
287   /**
288    * Payload size of original unfragmented message
289    */
290   size_t payload_size;
291
292   /**
293    * Bytes used to send all fragments on wire including UDP overhead
294    */
295   size_t on_wire_size;
296
297   unsigned int fragments_used;
298
299 };
300
301
302 struct UDP_MessageWrapper
303 {
304   /**
305    * Session this message belongs to
306    */
307   struct Session *session;
308
309   /**
310    * DLL of messages
311    * previous element
312    */
313   struct UDP_MessageWrapper *prev;
314
315   /**
316    * DLL of messages
317    * previous element
318    */
319   struct UDP_MessageWrapper *next;
320
321   /**
322    * Message type
323    * According to UDP_MessageType
324    */
325   int msg_type;
326
327   /**
328    * Message with size msg_size including UDP specific overhead
329    */
330   char *msg_buf;
331
332   /**
333    * Size of UDP message to send including UDP specific overhead
334    */
335   size_t msg_size;
336
337   /**
338    * Payload size of original message
339    */
340   size_t payload_size;
341
342   /**
343    * Message timeout
344    */
345   struct GNUNET_TIME_Absolute timeout;
346
347   /**
348    * Function to call upon completion of the transmission.
349    */
350   GNUNET_TRANSPORT_TransmitContinuation cont;
351
352   /**
353    * Closure for 'cont'.
354    */
355   void *cont_cls;
356
357   /**
358    * Fragmentation context
359    * frag_ctx == NULL if transport <= MTU
360    * frag_ctx != NULL if transport > MTU
361    */
362   struct UDP_FragmentationContext *frag_ctx;
363 };
364
365
366 /**
367  * UDP ACK Message-Packet header (after defragmentation).
368  */
369 struct UDP_ACK_Message
370 {
371   /**
372    * Message header.
373    */
374   struct GNUNET_MessageHeader header;
375
376   /**
377    * Desired delay for flow control
378    */
379   uint32_t delay;
380
381   /**
382    * What is the identity of the sender
383    */
384   struct GNUNET_PeerIdentity sender;
385
386 };
387
388 /**
389  * Encapsulation of all of the state of the plugin.
390  */
391 struct Plugin * plugin;
392
393
394 /**
395  * We have been notified that our readset has something to read.  We don't
396  * know which socket needs to be read, so we have to check each one
397  * Then reschedule this function to be called again once more is available.
398  *
399  * @param cls the plugin handle
400  * @param tc the scheduling context (for rescheduling this function again)
401  */
402 static void
403 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
404
405
406 /**
407  * We have been notified that our readset has something to read.  We don't
408  * know which socket needs to be read, so we have to check each one
409  * Then reschedule this function to be called again once more is available.
410  *
411  * @param cls the plugin handle
412  * @param tc the scheduling context (for rescheduling this function again)
413  */
414 static void
415 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
416
417
418 /**
419  * Start session timeout
420  */
421 static void
422 start_session_timeout (struct Session *s);
423
424 /**
425  * Increment session timeout due to activity
426  */
427 static void
428 reschedule_session_timeout (struct Session *s);
429
430 /**
431  * Cancel timeout
432  */
433 static void
434 stop_session_timeout (struct Session *s);
435
436
437 /**
438  * (re)schedule select tasks for this plugin.
439  *
440  * @param plugin plugin to reschedule
441  */
442 static void
443 schedule_select (struct Plugin *plugin)
444 {
445   struct GNUNET_TIME_Relative min_delay;
446   struct UDP_MessageWrapper *udpw;
447
448   if (NULL != plugin->sockv4)
449   {
450     /* Find a message ready to send:
451      * Flow delay from other peer is expired or not set (0) */
452     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
453     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
454       min_delay = GNUNET_TIME_relative_min (min_delay,
455                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
456     
457     if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
458       GNUNET_SCHEDULER_cancel(plugin->select_task);
459
460     /* Schedule with:
461      * - write active set if message is ready
462      * - timeout minimum delay */
463     plugin->select_task =
464       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
465                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
466                                    plugin->rs_v4,
467                                    (0 == min_delay.rel_value) ? plugin->ws_v4 : NULL,
468                                    &udp_plugin_select, plugin);  
469   }
470   if (NULL != plugin->sockv6)
471   {
472     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
473     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
474       min_delay = GNUNET_TIME_relative_min (min_delay,
475                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
476     
477     if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task_v6)
478       GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
479     plugin->select_task_v6 =
480       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
481                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
482                                    plugin->rs_v6,
483                                    (0 == min_delay.rel_value) ? plugin->ws_v6 : NULL,
484                                    &udp_plugin_select_v6, plugin);
485   }
486 }
487
488
489 /**
490  * Function called for a quick conversion of the binary address to
491  * a numeric address.  Note that the caller must not free the
492  * address and that the next call to this function is allowed
493  * to override the address again.
494  *
495  * @param cls closure
496  * @param addr binary address
497  * @param addrlen length of the address
498  * @return string representing the same address
499  */
500 const char *
501 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
502 {
503   static char rbuf[INET6_ADDRSTRLEN + 10];
504   char buf[INET6_ADDRSTRLEN];
505   const void *sb;
506   struct in_addr a4;
507   struct in6_addr a6;
508   const struct IPv4UdpAddress *t4;
509   const struct IPv6UdpAddress *t6;
510   int af;
511   uint16_t port;
512
513   if (addrlen == sizeof (struct IPv6UdpAddress))
514   {
515     t6 = addr;
516     af = AF_INET6;
517     port = ntohs (t6->u6_port);
518     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
519     sb = &a6;
520   }
521   else if (addrlen == sizeof (struct IPv4UdpAddress))
522   {
523     t4 = addr;
524     af = AF_INET;
525     port = ntohs (t4->u4_port);
526     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
527     sb = &a4;
528   }
529   else
530   {
531     GNUNET_break_op (0);
532     return NULL;
533   }
534   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
535   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
536                    buf, port);
537   return rbuf;
538 }
539
540
541 /**
542  * Function called to convert a string address to
543  * a binary address.
544  *
545  * @param cls closure ('struct Plugin*')
546  * @param addr string address
547  * @param addrlen length of the address
548  * @param buf location to store the buffer
549  * @param added location to store the number of bytes in the buffer.
550  *        If the function returns GNUNET_SYSERR, its contents are undefined.
551  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
552  */
553 static int
554 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
555     void **buf, size_t *added)
556 {
557   struct sockaddr_storage socket_address;
558   
559   if ((NULL == addr) || (0 == addrlen))
560   {
561     GNUNET_break (0);
562     return GNUNET_SYSERR;
563   }
564
565   if ('\0' != addr[addrlen - 1])
566   {
567     return GNUNET_SYSERR;
568   }
569
570   if (strlen (addr) != addrlen - 1)
571   {
572     return GNUNET_SYSERR;
573   }
574
575   if (GNUNET_OK != GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
576                                                  &socket_address))
577   {
578     return GNUNET_SYSERR;
579   }
580
581   switch (socket_address.ss_family)
582   {
583   case AF_INET:
584     {
585       struct IPv4UdpAddress *u4;
586       struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
587       u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
588       u4->ipv4_addr = in4->sin_addr.s_addr;
589       u4->u4_port = in4->sin_port;
590       *buf = u4;
591       *added = sizeof (struct IPv4UdpAddress);
592       return GNUNET_OK;
593     }
594   case AF_INET6:
595     {
596       struct IPv6UdpAddress *u6;
597       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
598       u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
599       u6->ipv6_addr = in6->sin6_addr;
600       u6->u6_port = in6->sin6_port;
601       *buf = u6;
602       *added = sizeof (struct IPv6UdpAddress);
603       return GNUNET_OK;
604     }
605   default:
606     GNUNET_break (0);
607     return GNUNET_SYSERR;
608   }
609 }
610
611
612 /**
613  * Append our port and forward the result.
614  *
615  * @param cls a 'struct PrettyPrinterContext'
616  * @param hostname result from DNS resolver
617  */
618 static void
619 append_port (void *cls, const char *hostname)
620 {
621   struct PrettyPrinterContext *ppc = cls;
622   char *ret;
623
624   if (hostname == NULL)
625   {
626     ppc->asc (ppc->asc_cls, NULL);
627     GNUNET_free (ppc);
628     return;
629   }
630   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
631   ppc->asc (ppc->asc_cls, ret);
632   GNUNET_free (ret);
633 }
634
635
636 /**
637  * Convert the transports address to a nice, human-readable
638  * format.
639  *
640  * @param cls closure
641  * @param type name of the transport that generated the address
642  * @param addr one of the addresses of the host, NULL for the last address
643  *        the specific address format depends on the transport
644  * @param addrlen length of the address
645  * @param numeric should (IP) addresses be displayed in numeric form?
646  * @param timeout after how long should we give up?
647  * @param asc function to call on each string
648  * @param asc_cls closure for asc
649  */
650 static void
651 udp_plugin_address_pretty_printer (void *cls, const char *type,
652                                    const void *addr, size_t addrlen,
653                                    int numeric,
654                                    struct GNUNET_TIME_Relative timeout,
655                                    GNUNET_TRANSPORT_AddressStringCallback asc,
656                                    void *asc_cls)
657 {
658   struct PrettyPrinterContext *ppc;
659   const void *sb;
660   size_t sbs;
661   struct sockaddr_in a4;
662   struct sockaddr_in6 a6;
663   const struct IPv4UdpAddress *u4;
664   const struct IPv6UdpAddress *u6;
665   uint16_t port;
666
667   if (addrlen == sizeof (struct IPv6UdpAddress))
668   {
669     u6 = addr;
670     memset (&a6, 0, sizeof (a6));
671     a6.sin6_family = AF_INET6;
672 #if HAVE_SOCKADDR_IN_SIN_LEN
673     a6.sin6_len = sizeof (a6);
674 #endif
675     a6.sin6_port = u6->u6_port;
676     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
677     port = ntohs (u6->u6_port);
678     sb = &a6;
679     sbs = sizeof (a6);
680   }
681   else if (addrlen == sizeof (struct IPv4UdpAddress))
682   {
683     u4 = addr;
684     memset (&a4, 0, sizeof (a4));
685     a4.sin_family = AF_INET;
686 #if HAVE_SOCKADDR_IN_SIN_LEN
687     a4.sin_len = sizeof (a4);
688 #endif
689     a4.sin_port = u4->u4_port;
690     a4.sin_addr.s_addr = u4->ipv4_addr;
691     port = ntohs (u4->u4_port);
692     sb = &a4;
693     sbs = sizeof (a4);
694   }
695   else if (0 == addrlen)
696   {
697     asc (asc_cls, "<inbound connection>");
698     asc (asc_cls, NULL);
699     return;
700   }
701   else
702   {
703     /* invalid address */
704     GNUNET_break_op (0);
705     asc (asc_cls, NULL);
706     return;
707   }
708   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
709   ppc->asc = asc;
710   ppc->asc_cls = asc_cls;
711   ppc->port = port;
712   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
713 }
714
715
716 static void
717 call_continuation (struct UDP_MessageWrapper *udpw, int result)
718 {
719   size_t overhead;
720
721   LOG (GNUNET_ERROR_TYPE_DEBUG,
722       "Calling continuation for %u byte message to `%s' with result %s\n",
723       udpw->payload_size, GNUNET_i2s (&udpw->session->target),
724       (GNUNET_OK == result) ? "OK" : "SYSERR");
725
726   if (udpw->msg_size >= udpw->payload_size)
727     overhead = udpw->msg_size - udpw->payload_size;
728   else
729     overhead = udpw->msg_size;
730
731   switch (result) {
732     case GNUNET_OK:
733       switch (udpw->msg_type) {
734         case MSG_UNFRAGMENTED:
735           if (NULL != udpw->cont)
736           {
737             /* Transport continuation */
738             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
739                       udpw->payload_size, overhead);
740           }
741           GNUNET_STATISTICS_update (plugin->env->stats,
742                                     "# UDP, unfragmented msgs, messages, sent, success",
743                                     1, GNUNET_NO);
744           GNUNET_STATISTICS_update (plugin->env->stats,
745                                     "# UDP, unfragmented msgs, bytes payload, sent, success",
746                                     udpw->payload_size, GNUNET_NO);
747           GNUNET_STATISTICS_update (plugin->env->stats,
748                                     "# UDP, unfragmented msgs, bytes overhead, sent, success",
749                                     overhead, GNUNET_NO);
750           GNUNET_STATISTICS_update (plugin->env->stats,
751                                     "# UDP, total, bytes overhead, sent",
752                                     overhead, GNUNET_NO);
753           GNUNET_STATISTICS_update (plugin->env->stats,
754                                     "# UDP, total, bytes payload, sent",
755                                     udpw->payload_size, GNUNET_NO);
756           break;
757         case MSG_FRAGMENTED_COMPLETE:
758           GNUNET_assert (NULL != udpw->frag_ctx);
759           if (udpw->frag_ctx->cont != NULL)
760             udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_OK,
761                                udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
762           GNUNET_STATISTICS_update (plugin->env->stats,
763                                     "# UDP, fragmented msgs, messages, sent, success",
764                                     1, GNUNET_NO);
765           GNUNET_STATISTICS_update (plugin->env->stats,
766                                     "# UDP, fragmented msgs, bytes payload, sent, success",
767                                     udpw->payload_size, GNUNET_NO);
768           GNUNET_STATISTICS_update (plugin->env->stats,
769                                     "# UDP, fragmented msgs, bytes overhead, sent, success",
770                                     overhead, GNUNET_NO);
771           GNUNET_STATISTICS_update (plugin->env->stats,
772                                     "# UDP, total, bytes overhead, sent",
773                                     overhead, GNUNET_NO);
774           GNUNET_STATISTICS_update (plugin->env->stats,
775                                     "# UDP, total, bytes payload, sent",
776                                     udpw->payload_size, GNUNET_NO);
777           GNUNET_STATISTICS_update (plugin->env->stats,
778                                     "# UDP, fragmented msgs, messages, pending",
779                                     -1, GNUNET_NO);
780           break;
781         case MSG_FRAGMENTED:
782           /* Fragmented message: enqueue next fragment */
783           if (NULL != udpw->cont)
784             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
785                       udpw->payload_size, udpw->msg_size);
786           GNUNET_STATISTICS_update (plugin->env->stats,
787                                     "# UDP, fragmented msgs, fragments, sent, success",
788                                     1, GNUNET_NO);
789           GNUNET_STATISTICS_update (plugin->env->stats,
790                                     "# UDP, fragmented msgs, fragments bytes, sent, success",
791                                     udpw->msg_size, GNUNET_NO);
792           break;
793         case MSG_ACK:
794           /* No continuation */
795           GNUNET_STATISTICS_update (plugin->env->stats,
796                                     "# UDP, ACK msgs, messages, sent, success",
797                                     1, GNUNET_NO);
798           GNUNET_STATISTICS_update (plugin->env->stats,
799                                     "# UDP, ACK msgs, bytes overhead, sent, success",
800                                     overhead, GNUNET_NO);
801           GNUNET_STATISTICS_update (plugin->env->stats,
802                                     "# UDP, total, bytes overhead, sent",
803                                     overhead, GNUNET_NO);
804           break;
805         case MSG_BEACON:
806           GNUNET_break (0);
807           break;
808         default:
809           LOG (GNUNET_ERROR_TYPE_ERROR,
810               "ERROR: %u\n", udpw->msg_type);
811           GNUNET_break (0);
812           break;
813       }
814       break;
815     case GNUNET_SYSERR:
816       switch (udpw->msg_type) {
817         case MSG_UNFRAGMENTED:
818           /* Unfragmented message: failed to send */
819           if (NULL != udpw->cont)
820             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
821                       udpw->payload_size, overhead);
822           GNUNET_STATISTICS_update (plugin->env->stats,
823                                   "# UDP, unfragmented msgs, messages, sent, failure",
824                                   1, GNUNET_NO);
825           GNUNET_STATISTICS_update (plugin->env->stats,
826                                     "# UDP, unfragmented msgs, bytes payload, sent, failure",
827                                     udpw->payload_size, GNUNET_NO);
828           GNUNET_STATISTICS_update (plugin->env->stats,
829                                     "# UDP, unfragmented msgs, bytes overhead, sent, failure",
830                                     overhead, GNUNET_NO);
831           break;
832         case MSG_FRAGMENTED_COMPLETE:
833           GNUNET_assert (NULL != udpw->frag_ctx);
834           if (udpw->frag_ctx->cont != NULL)
835             udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_SYSERR,
836                                udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
837           GNUNET_STATISTICS_update (plugin->env->stats,
838                                     "# UDP, fragmented msgs, messages, sent, failure",
839                                     1, GNUNET_NO);
840           GNUNET_STATISTICS_update (plugin->env->stats,
841                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
842                                     udpw->payload_size, GNUNET_NO);
843           GNUNET_STATISTICS_update (plugin->env->stats,
844                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
845                                     overhead, GNUNET_NO);
846           GNUNET_STATISTICS_update (plugin->env->stats,
847                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
848                                     overhead, GNUNET_NO);
849           GNUNET_STATISTICS_update (plugin->env->stats,
850                                     "# UDP, fragmented msgs, messages, pending",
851                                     -1, GNUNET_NO);
852           break;
853         case MSG_FRAGMENTED:
854           GNUNET_assert (NULL != udpw->frag_ctx);
855           /* Fragmented message: failed to send */
856           GNUNET_STATISTICS_update (plugin->env->stats,
857                                     "# UDP, fragmented msgs, fragments, sent, failure",
858                                     1, GNUNET_NO);
859           GNUNET_STATISTICS_update (plugin->env->stats,
860                                     "# UDP, fragmented msgs, fragments bytes, sent, failure",
861                                     udpw->msg_size, GNUNET_NO);
862           break;
863         case MSG_ACK:
864           /* ACK message: failed to send */
865           GNUNET_STATISTICS_update (plugin->env->stats,
866                                     "# UDP, ACK msgs, messages, sent, failure",
867                                     1, GNUNET_NO);
868           break;
869         case MSG_BEACON:
870           /* Beacon message: failed to send */
871           GNUNET_break (0);
872           break;
873         default:
874           GNUNET_break (0);
875           break;
876       }
877       break;
878     default:
879       GNUNET_break (0);
880       break;
881   }
882 }
883
884
885 /**
886  * Check if the given port is plausible (must be either our listen
887  * port or our advertised port).  If it is neither, we return
888  * GNUNET_SYSERR.
889  *
890  * @param plugin global variables
891  * @param in_port port number to check
892  * @return GNUNET_OK if port is either open_port or adv_port
893  */
894 static int
895 check_port (struct Plugin *plugin, uint16_t in_port)
896 {
897   if ((in_port == plugin->port) || (in_port == plugin->aport))
898     return GNUNET_OK;
899   return GNUNET_SYSERR;
900 }
901
902
903 /**
904  * Function that will be called to check if a binary address for this
905  * plugin is well-formed and corresponds to an address for THIS peer
906  * (as per our configuration).  Naturally, if absolutely necessary,
907  * plugins can be a bit conservative in their answer, but in general
908  * plugins should make sure that the address does not redirect
909  * traffic to a 3rd party that might try to man-in-the-middle our
910  * traffic.
911  *
912  * @param cls closure, should be our handle to the Plugin
913  * @param addr pointer to the address
914  * @param addrlen length of addr
915  * @return GNUNET_OK if this is a plausible address for this peer
916  *         and transport, GNUNET_SYSERR if not
917  *
918  */
919 static int
920 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
921 {
922   struct Plugin *plugin = cls;
923   struct IPv4UdpAddress *v4;
924   struct IPv6UdpAddress *v6;
925
926   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
927       (addrlen != sizeof (struct IPv6UdpAddress)))
928   {
929     GNUNET_break_op (0);
930     return GNUNET_SYSERR;
931   }
932   if (addrlen == sizeof (struct IPv4UdpAddress))
933   {
934     v4 = (struct IPv4UdpAddress *) addr;
935     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
936       return GNUNET_SYSERR;
937     if (GNUNET_OK !=
938         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
939                                  sizeof (struct in_addr)))
940       return GNUNET_SYSERR;
941   }
942   else
943   {
944     v6 = (struct IPv6UdpAddress *) addr;
945     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
946     {
947       GNUNET_break_op (0);
948       return GNUNET_SYSERR;
949     }
950     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
951       return GNUNET_SYSERR;
952     if (GNUNET_OK !=
953         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
954                                  sizeof (struct in6_addr)))
955       return GNUNET_SYSERR;
956   }
957   return GNUNET_OK;
958 }
959
960
961 /**
962  * Task to free resources associated with a session.
963  *
964  * @param s session to free
965  */
966 static void
967 free_session (struct Session *s)
968 {
969   if (NULL != s->frag_ctx)
970   {
971     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag, NULL, NULL);
972     GNUNET_free (s->frag_ctx);
973     s->frag_ctx = NULL;
974   }
975   GNUNET_free (s);
976 }
977
978
979 static void
980 dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
981 {
982   GNUNET_STATISTICS_update (plugin->env->stats,
983                             "# UDP, total, bytes in buffers",
984                             -udpw->msg_size, GNUNET_NO);
985   GNUNET_STATISTICS_update (plugin->env->stats,
986                             "# UDP, total, msgs in buffers",
987                             -1, GNUNET_NO);
988   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
989     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
990                                  plugin->ipv4_queue_tail, udpw);
991   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
992     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
993                                  plugin->ipv6_queue_tail, udpw);
994 }
995
996 static void
997 fragmented_message_done (struct UDP_FragmentationContext *fc, int result)
998 {
999   struct UDP_MessageWrapper *udpw;
1000   struct UDP_MessageWrapper *tmp;
1001   struct UDP_MessageWrapper dummy;
1002   struct Session *s = fc->session;
1003
1004   LOG (GNUNET_ERROR_TYPE_DEBUG, "%p : Fragmented message removed with result %s\n", fc, (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1005   
1006   /* Call continuation for fragmented message */
1007   memset (&dummy, 0, sizeof (dummy));
1008   dummy.msg_type = MSG_FRAGMENTED_COMPLETE;
1009   dummy.msg_size = s->frag_ctx->on_wire_size;
1010   dummy.payload_size = s->frag_ctx->payload_size;
1011   dummy.frag_ctx = s->frag_ctx;
1012   dummy.cont = NULL;
1013   dummy.cont_cls = NULL;
1014   dummy.session = s;
1015
1016   call_continuation (&dummy, result);
1017
1018   /* Remove left-over fragments from queue */
1019   /* Remove leftover fragments from queue */
1020   if (s->addrlen == sizeof (struct sockaddr_in6))
1021   {
1022     udpw = plugin->ipv6_queue_head;
1023     while (NULL != udpw)
1024     {
1025       tmp = udpw->next;
1026       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1027       {
1028         dequeue (plugin, udpw);
1029         call_continuation (udpw, GNUNET_SYSERR);
1030         GNUNET_free (udpw);
1031       }
1032       udpw = tmp;
1033     }
1034   }
1035   if (s->addrlen == sizeof (struct sockaddr_in))
1036   {
1037     udpw = plugin->ipv4_queue_head;
1038     while (udpw!= NULL)
1039     {
1040       tmp = udpw->next;
1041       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
1042       {
1043         dequeue (plugin, udpw);
1044         call_continuation (udpw, GNUNET_SYSERR);
1045         GNUNET_free (udpw);
1046       }
1047       udpw = tmp;
1048     }
1049   }
1050
1051   /* Destroy fragmentation context */
1052   GNUNET_FRAGMENT_context_destroy (fc->frag,
1053                                      &s->last_expected_msg_delay,
1054                                      &s->last_expected_ack_delay);
1055   s->frag_ctx = NULL;
1056   GNUNET_free (fc);
1057 }
1058
1059 /**
1060  * Functions with this signature are called whenever we need
1061  * to close a session due to a disconnect or failure to
1062  * establish a connection.
1063  *
1064  * @param s session to close down
1065  */
1066 static void
1067 disconnect_session (struct Session *s)
1068 {
1069   struct UDP_MessageWrapper *udpw;
1070   struct UDP_MessageWrapper *next;
1071
1072   GNUNET_assert (GNUNET_YES != s->in_destroy);
1073   LOG (GNUNET_ERROR_TYPE_DEBUG,
1074        "Session %p to peer `%s' address ended \n",
1075          s,
1076          GNUNET_i2s (&s->target),
1077          GNUNET_a2s (s->sock_addr, s->addrlen));
1078   stop_session_timeout (s);
1079
1080   if (NULL != s->frag_ctx)
1081   {
1082     /* Remove fragmented message due to disconnect */
1083     fragmented_message_done (s->frag_ctx, GNUNET_SYSERR);
1084   }
1085
1086   next = plugin->ipv4_queue_head;
1087   while (NULL != (udpw = next))
1088   {
1089     next = udpw->next;
1090     if (udpw->session == s)
1091     {
1092       dequeue (plugin, udpw);
1093       call_continuation(udpw, GNUNET_SYSERR);
1094       GNUNET_free (udpw);
1095     }
1096   }
1097   next = plugin->ipv6_queue_head;
1098   while (NULL != (udpw = next))
1099   {
1100     next = udpw->next;
1101     if (udpw->session == s)
1102     {
1103       dequeue (plugin, udpw);
1104       call_continuation(udpw, GNUNET_SYSERR);
1105       GNUNET_free (udpw);
1106     }
1107     udpw = next;
1108   }
1109   plugin->env->session_end (plugin->env->cls, &s->target, s);
1110
1111   if (NULL != s->frag_ctx)
1112   {
1113     if (NULL != s->frag_ctx->cont)
1114     {
1115       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR,
1116                          s->frag_ctx->payload_size, s->frag_ctx->on_wire_size);
1117       LOG (GNUNET_ERROR_TYPE_DEBUG,
1118           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1119           GNUNET_i2s (&s->target));
1120     }
1121   }
1122
1123   GNUNET_assert (GNUNET_YES ==
1124                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
1125                                                        &s->target.hashPubKey,
1126                                                        s));
1127   GNUNET_STATISTICS_set(plugin->env->stats,
1128                         "# UDP, sessions active",
1129                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1130                         GNUNET_NO);
1131   if (s->rc > 0)
1132     s->in_destroy = GNUNET_YES;
1133   else
1134     free_session (s);
1135 }
1136
1137 /**
1138  * Destroy a session, plugin is being unloaded.
1139  *
1140  * @param cls unused
1141  * @param key hash of public key of target peer
1142  * @param value a 'struct PeerSession*' to clean up
1143  * @return GNUNET_OK (continue to iterate)
1144  */
1145 static int
1146 disconnect_and_free_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1147 {
1148   disconnect_session(value);
1149   return GNUNET_OK;
1150 }
1151
1152
1153 /**
1154  * Disconnect from a remote node.  Clean up session if we have one for this peer
1155  *
1156  * @param cls closure for this call (should be handle to Plugin)
1157  * @param target the peeridentity of the peer to disconnect
1158  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
1159  */
1160 static void
1161 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
1162 {
1163   struct Plugin *plugin = cls;
1164   GNUNET_assert (plugin != NULL);
1165
1166   GNUNET_assert (target != NULL);
1167   LOG (GNUNET_ERROR_TYPE_DEBUG,
1168        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
1169   /* Clean up sessions */
1170   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
1171 }
1172
1173
1174 /**
1175  * Session was idle, so disconnect it
1176  */
1177 static void
1178 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1179 {
1180   GNUNET_assert (NULL != cls);
1181   struct Session *s = cls;
1182
1183   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1184   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185               "Session %p was idle for %llu ms, disconnecting\n",
1186               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1187   /* call session destroy function */
1188   disconnect_session (s);
1189 }
1190
1191
1192 /**
1193  * Start session timeout
1194  */
1195 static void
1196 start_session_timeout (struct Session *s)
1197 {
1198   GNUNET_assert (NULL != s);
1199   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
1200   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1201                                                    &session_timeout,
1202                                                    s);
1203   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1204               "Timeout for session %p set to %llu ms\n",
1205               s,  (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1206 }
1207
1208
1209 /**
1210  * Increment session timeout due to activity
1211  */
1212 static void
1213 reschedule_session_timeout (struct Session *s)
1214 {
1215   GNUNET_assert (NULL != s);
1216   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1217
1218   GNUNET_SCHEDULER_cancel (s->timeout_task);
1219   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1220                                                    &session_timeout,
1221                                                    s);
1222   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1223               "Timeout rescheduled for session %p set to %llu ms\n",
1224               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1225 }
1226
1227
1228 /**
1229  * Cancel timeout
1230  */
1231 static void
1232 stop_session_timeout (struct Session *s)
1233 {
1234   GNUNET_assert (NULL != s);
1235
1236   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
1237   {
1238     GNUNET_SCHEDULER_cancel (s->timeout_task);
1239     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1240     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1241                 "Timeout stopped for session %p canceled\n",
1242                 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1243   }
1244 }
1245
1246
1247 static struct Session *
1248 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
1249                 const void *addr, size_t addrlen,
1250                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1251 {
1252   struct Session *s;
1253   const struct IPv4UdpAddress *t4;
1254   const struct IPv6UdpAddress *t6;
1255   struct sockaddr_in *v4;
1256   struct sockaddr_in6 *v6;
1257   size_t len;
1258
1259   switch (addrlen)
1260   {
1261   case sizeof (struct IPv4UdpAddress):
1262     if (NULL == plugin->sockv4)
1263     {
1264       return NULL;
1265     }
1266     t4 = addr;
1267     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
1268     len = sizeof (struct sockaddr_in);
1269     v4 = (struct sockaddr_in *) &s[1];
1270     v4->sin_family = AF_INET;
1271 #if HAVE_SOCKADDR_IN_SIN_LEN
1272     v4->sin_len = sizeof (struct sockaddr_in);
1273 #endif
1274     v4->sin_port = t4->u4_port;
1275     v4->sin_addr.s_addr = t4->ipv4_addr;
1276     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
1277     break;
1278   case sizeof (struct IPv6UdpAddress):
1279     if (NULL == plugin->sockv6)
1280     {
1281       return NULL;
1282     }
1283     t6 = addr;
1284     s =
1285         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
1286     len = sizeof (struct sockaddr_in6);
1287     v6 = (struct sockaddr_in6 *) &s[1];
1288     v6->sin6_family = AF_INET6;
1289 #if HAVE_SOCKADDR_IN_SIN_LEN
1290     v6->sin6_len = sizeof (struct sockaddr_in6);
1291 #endif
1292     v6->sin6_port = t6->u6_port;
1293     v6->sin6_addr = t6->ipv6_addr;
1294     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
1295     break;
1296   default:
1297     /* Must have a valid address to send to */
1298     GNUNET_break_op (0);
1299     return NULL;
1300   }
1301   s->addrlen = len;
1302   s->target = *target;
1303   s->sock_addr = (const struct sockaddr *) &s[1];
1304   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250);
1305   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1306   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1307   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1308   start_session_timeout (s);
1309   return s;
1310 }
1311
1312
1313 static int
1314 session_cmp_it (void *cls,
1315                 const struct GNUNET_HashCode * key,
1316                 void *value)
1317 {
1318   struct SessionCompareContext * cctx = cls;
1319   const struct GNUNET_HELLO_Address *address = cctx->addr;
1320   struct Session *s = value;
1321
1322   socklen_t s_addrlen = s->addrlen;
1323
1324   LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
1325       udp_address_to_string (NULL, (void *) address->address, address->address_length),
1326       GNUNET_a2s (s->sock_addr, s->addrlen));
1327   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
1328       (s_addrlen == sizeof (struct sockaddr_in)))
1329   {
1330     struct IPv4UdpAddress * u4 = NULL;
1331     u4 = (struct IPv4UdpAddress *) address->address;
1332     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
1333     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
1334         (u4->u4_port == s4->sin_port))
1335     {
1336       cctx->res = s;
1337       return GNUNET_NO;
1338     }
1339
1340   }
1341   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
1342       (s_addrlen == sizeof (struct sockaddr_in6)))
1343   {
1344     struct IPv6UdpAddress * u6 = NULL;
1345     u6 = (struct IPv6UdpAddress *) address->address;
1346     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
1347     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
1348         (u6->u6_port == s6->sin6_port))
1349     {
1350       cctx->res = s;
1351       return GNUNET_NO;
1352     }
1353   }
1354   return GNUNET_YES;
1355 }
1356
1357
1358 /**
1359  * Creates a new outbound session the transport service will use to send data to the
1360  * peer
1361  *
1362  * @param cls the plugin
1363  * @param address the address
1364  * @return the session or NULL of max connections exceeded
1365  */
1366 static struct Session *
1367 udp_plugin_get_session (void *cls,
1368                   const struct GNUNET_HELLO_Address *address)
1369 {
1370   struct Session * s = NULL;
1371   struct Plugin * plugin = cls;
1372   struct IPv6UdpAddress * udp_a6;
1373   struct IPv4UdpAddress * udp_a4;
1374
1375   GNUNET_assert (plugin != NULL);
1376   GNUNET_assert (address != NULL);
1377
1378
1379   if ((address->address == NULL) ||
1380       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1381       (address->address_length != sizeof (struct IPv6UdpAddress))))
1382   {
1383     GNUNET_break (0);
1384     return NULL;
1385   }
1386
1387   if (address->address_length == sizeof (struct IPv4UdpAddress))
1388   {
1389     if (plugin->sockv4 == NULL)
1390       return NULL;
1391     udp_a4 = (struct IPv4UdpAddress *) address->address;
1392     if (udp_a4->u4_port == 0)
1393       return NULL;
1394   }
1395
1396   if (address->address_length == sizeof (struct IPv6UdpAddress))
1397   {
1398     if (plugin->sockv6 == NULL)
1399       return NULL;
1400     udp_a6 = (struct IPv6UdpAddress *) address->address;
1401     if (udp_a6->u6_port == 0)
1402       return NULL;
1403   }
1404
1405   /* check if session already exists */
1406   struct SessionCompareContext cctx;
1407   cctx.addr = address;
1408   cctx.res = NULL;
1409   LOG (GNUNET_ERROR_TYPE_DEBUG,
1410        "Looking for existing session for peer `%s' `%s' \n", 
1411        GNUNET_i2s (&address->peer), 
1412        udp_address_to_string(NULL, address->address, address->address_length));
1413   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
1414   if (cctx.res != NULL)
1415   {
1416     LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
1417     return cctx.res;
1418   }
1419
1420   /* otherwise create new */
1421   s = create_session (plugin,
1422       &address->peer,
1423       address->address,
1424       address->address_length,
1425       NULL, NULL);
1426   LOG (GNUNET_ERROR_TYPE_DEBUG,
1427        "Creating new session %p for peer `%s' address `%s'\n",
1428        s,
1429        GNUNET_i2s(&address->peer),
1430        udp_address_to_string(NULL,address->address,address->address_length));
1431   GNUNET_assert (GNUNET_OK ==
1432                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
1433                                                     &s->target.hashPubKey,
1434                                                     s,
1435                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1436   GNUNET_STATISTICS_set(plugin->env->stats,
1437                         "# UDP, sessions active",
1438                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1439                         GNUNET_NO);
1440   return s;
1441 }
1442
1443
1444 static void 
1445 enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1446 {
1447   GNUNET_STATISTICS_update (plugin->env->stats,
1448                             "# UDP, total, bytes in buffers",
1449                             udpw->msg_size, GNUNET_NO);
1450   GNUNET_STATISTICS_update (plugin->env->stats,
1451                             "# UDP, total, msgs in buffers",
1452                             1, GNUNET_NO);
1453   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1454     GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head,
1455                                  plugin->ipv4_queue_tail, udpw);
1456   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1457     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1458                                  plugin->ipv6_queue_tail, udpw);
1459 }
1460
1461
1462
1463 /**
1464  * Fragment message was transmitted via UDP, let fragmentation know
1465  * to send the next fragment now.
1466  *
1467  * @param cls the 'struct UDPMessageWrapper' of the fragment
1468  * @param target destination peer (ignored)
1469  * @param result GNUNET_OK on success (ignored)
1470  * @param payload bytes payload sent
1471  * @param physical bytes physical sent
1472  */
1473 static void
1474 send_next_fragment (void *cls,
1475                     const struct GNUNET_PeerIdentity *target,
1476                     int result, size_t payload, size_t physical)
1477 {
1478   struct UDP_MessageWrapper *udpw = cls;
1479   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
1480 }
1481
1482
1483 /**
1484  * Function that is called with messages created by the fragmentation
1485  * module.  In the case of the 'proc' callback of the
1486  * GNUNET_FRAGMENT_context_create function, this function must
1487  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
1488  *
1489  * @param cls closure, the 'struct FragmentationContext'
1490  * @param msg the message that was created
1491  */
1492 static void
1493 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1494 {
1495   struct UDP_FragmentationContext *frag_ctx = cls;
1496   struct Plugin *plugin = frag_ctx->plugin;
1497   struct UDP_MessageWrapper * udpw;
1498   size_t msg_len = ntohs (msg->size);
1499  
1500   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1501        "Enqueuing fragment with %u bytes\n", msg_len);
1502   frag_ctx->fragments_used ++;
1503   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1504   udpw->session = frag_ctx->session;
1505   udpw->msg_buf = (char *) &udpw[1];
1506   udpw->msg_size = msg_len;
1507   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
1508   udpw->cont = &send_next_fragment;
1509   udpw->cont_cls = udpw;
1510   udpw->timeout = frag_ctx->timeout;
1511   udpw->frag_ctx = frag_ctx;
1512   udpw->msg_type = MSG_FRAGMENTED;
1513   memcpy (udpw->msg_buf, msg, msg_len);
1514   enqueue (plugin, udpw);
1515   schedule_select (plugin);
1516 }
1517
1518
1519 /**
1520  * Function that can be used by the transport service to transmit
1521  * a message using the plugin.   Note that in the case of a
1522  * peer disconnecting, the continuation MUST be called
1523  * prior to the disconnect notification itself.  This function
1524  * will be called with this peer's HELLO message to initiate
1525  * a fresh connection to another peer.
1526  *
1527  * @param cls closure
1528  * @param s which session must be used
1529  * @param msgbuf the message to transmit
1530  * @param msgbuf_size number of bytes in 'msgbuf'
1531  * @param priority how important is the message (most plugins will
1532  *                 ignore message priority and just FIFO)
1533  * @param to how long to wait at most for the transmission (does not
1534  *                require plugins to discard the message after the timeout,
1535  *                just advisory for the desired delay; most plugins will ignore
1536  *                this as well)
1537  * @param cont continuation to call once the message has
1538  *        been transmitted (or if the transport is ready
1539  *        for the next transmission call; or if the
1540  *        peer disconnected...); can be NULL
1541  * @param cont_cls closure for cont
1542  * @return number of bytes used (on the physical network, with overheads);
1543  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1544  *         and does NOT mean that the message was not transmitted (DV)
1545  */
1546 static ssize_t
1547 udp_plugin_send (void *cls,
1548                   struct Session *s,
1549                   const char *msgbuf, size_t msgbuf_size,
1550                   unsigned int priority,
1551                   struct GNUNET_TIME_Relative to,
1552                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1553 {
1554   struct Plugin *plugin = cls;
1555   size_t udpmlen = msgbuf_size + sizeof (struct UDPMessage);
1556   struct UDP_FragmentationContext * frag_ctx;
1557   struct UDP_MessageWrapper * udpw;
1558   struct UDPMessage *udp;
1559   char mbuf[udpmlen];
1560   GNUNET_assert (plugin != NULL);
1561   GNUNET_assert (s != NULL);
1562
1563   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1564     return GNUNET_SYSERR;
1565   if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1566     return GNUNET_SYSERR;
1567   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1568   {
1569     GNUNET_break (0);
1570     return GNUNET_SYSERR;
1571   }
1572   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1573   {
1574     GNUNET_break (0);
1575     return GNUNET_SYSERR;
1576   }
1577   LOG (GNUNET_ERROR_TYPE_DEBUG,
1578        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1579        udpmlen,
1580        GNUNET_i2s (&s->target),
1581        GNUNET_a2s(s->sock_addr, s->addrlen));
1582
1583
1584   /* Message */
1585   udp = (struct UDPMessage *) mbuf;
1586   udp->header.size = htons (udpmlen);
1587   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1588   udp->reserved = htonl (0);
1589   udp->sender = *plugin->env->my_identity;
1590
1591   reschedule_session_timeout(s);
1592   if (udpmlen <= UDP_MTU)
1593   {
1594     /* unfragmented message */
1595     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
1596     udpw->session = s;
1597     udpw->msg_buf = (char *) &udpw[1];
1598     udpw->msg_size = udpmlen; /* message size with UDP overhead */
1599     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
1600     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1601     udpw->cont = cont;
1602     udpw->cont_cls = cont_cls;
1603     udpw->frag_ctx = NULL;
1604     udpw->msg_type = MSG_UNFRAGMENTED;
1605     memcpy (udpw->msg_buf, udp, sizeof (struct UDPMessage));
1606     memcpy (&udpw->msg_buf[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1607     enqueue (plugin, udpw);
1608
1609     GNUNET_STATISTICS_update (plugin->env->stats,
1610                               "# UDP, unfragmented msgs, messages, attempt",
1611                               1, GNUNET_NO);
1612     GNUNET_STATISTICS_update (plugin->env->stats,
1613                               "# UDP, unfragmented msgs, bytes payload, attempt",
1614                               udpw->payload_size, GNUNET_NO);
1615   }
1616   else
1617   {
1618     /* fragmented message */
1619     if  (s->frag_ctx != NULL)
1620       return GNUNET_SYSERR;
1621     memcpy (&udp[1], msgbuf, msgbuf_size);
1622     frag_ctx = GNUNET_malloc (sizeof (struct UDP_FragmentationContext));
1623     frag_ctx->plugin = plugin;
1624     frag_ctx->session = s;
1625     frag_ctx->cont = cont;
1626     frag_ctx->cont_cls = cont_cls;
1627     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1628     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1629     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
1630     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1631                                                      UDP_MTU,
1632                                                      &plugin->tracker,
1633                                                      s->last_expected_msg_delay, 
1634                                                      s->last_expected_ack_delay, 
1635                                                      &udp->header,
1636                                                      &enqueue_fragment,
1637                                                      frag_ctx);    
1638     s->frag_ctx = frag_ctx;
1639     GNUNET_STATISTICS_update (plugin->env->stats,
1640                               "# UDP, fragmented msgs, messages, pending",
1641                               1, GNUNET_NO);
1642     GNUNET_STATISTICS_update (plugin->env->stats,
1643                               "# UDP, fragmented msgs, messages, attempt",
1644                               1, GNUNET_NO);
1645     GNUNET_STATISTICS_update (plugin->env->stats,
1646                               "# UDP, fragmented msgs, bytes payload, attempt",
1647                               frag_ctx->payload_size, GNUNET_NO);
1648   }
1649   schedule_select (plugin);
1650   return udpmlen;
1651 }
1652
1653
1654 /**
1655  * Our external IP address/port mapping has changed.
1656  *
1657  * @param cls closure, the 'struct LocalAddrList'
1658  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1659  *     the previous (now invalid) one
1660  * @param addr either the previous or the new public IP address
1661  * @param addrlen actual lenght of the address
1662  */
1663 static void
1664 udp_nat_port_map_callback (void *cls, int add_remove,
1665                            const struct sockaddr *addr, socklen_t addrlen)
1666 {
1667   struct Plugin *plugin = cls;
1668   struct IPv4UdpAddress u4;
1669   struct IPv6UdpAddress u6;
1670   void *arg;
1671   size_t args;
1672
1673   /* convert 'addr' to our internal format */
1674   switch (addr->sa_family)
1675   {
1676   case AF_INET:
1677     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1678     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1679     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1680     arg = &u4;
1681     args = sizeof (u4);
1682     break;
1683   case AF_INET6:
1684     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1685     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1686             sizeof (struct in6_addr));
1687     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1688     arg = &u6;
1689     args = sizeof (u6);
1690     break;
1691   default:
1692     GNUNET_break (0);
1693     return;
1694   }
1695   /* modify our published address list */
1696   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args, "udp");
1697 }
1698
1699
1700
1701 /**
1702  * Message tokenizer has broken up an incomming message. Pass it on
1703  * to the service.
1704  *
1705  * @param cls the 'struct Plugin'
1706  * @param client the 'struct SourceInformation'
1707  * @param hdr the actual message
1708  */
1709 static int
1710 process_inbound_tokenized_messages (void *cls, void *client,
1711                                     const struct GNUNET_MessageHeader *hdr)
1712 {
1713   struct Plugin *plugin = cls;
1714   struct SourceInformation *si = client;
1715   struct GNUNET_ATS_Information ats[2];
1716   struct GNUNET_TIME_Relative delay;
1717
1718   GNUNET_assert (si->session != NULL);
1719   if (GNUNET_YES == si->session->in_destroy)
1720     return GNUNET_OK;
1721   /* setup ATS */
1722   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1723   ats[0].value = htonl (1);
1724   ats[1] = si->session->ats;
1725   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1726   delay = plugin->env->receive (plugin->env->cls,
1727                                 &si->sender,
1728                                 hdr,
1729                                 (const struct GNUNET_ATS_Information *) &ats, 2,
1730                                 si->session,
1731                                 si->arg,
1732                                 si->args);
1733   si->session->flow_delay_for_other_peer = delay;
1734   reschedule_session_timeout(si->session);
1735   return GNUNET_OK;
1736 }
1737
1738
1739 /**
1740  * We've received a UDP Message.  Process it (pass contents to main service).
1741  *
1742  * @param plugin plugin context
1743  * @param msg the message
1744  * @param sender_addr sender address
1745  * @param sender_addr_len number of bytes in sender_addr
1746  */
1747 static void
1748 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1749                      const struct sockaddr *sender_addr,
1750                      socklen_t sender_addr_len)
1751 {
1752   struct SourceInformation si;
1753   struct Session * s;
1754   struct IPv4UdpAddress u4;
1755   struct IPv6UdpAddress u6;
1756   const void *arg;
1757   size_t args;
1758
1759   if (0 != ntohl (msg->reserved))
1760   {
1761     GNUNET_break_op (0);
1762     return;
1763   }
1764   if (ntohs (msg->header.size) <
1765       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1766   {
1767     GNUNET_break_op (0);
1768     return;
1769   }
1770
1771   /* convert address */
1772   switch (sender_addr->sa_family)
1773   {
1774   case AF_INET:
1775     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1776     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1777     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1778     arg = &u4;
1779     args = sizeof (u4);
1780     break;
1781   case AF_INET6:
1782     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1783     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1784     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1785     arg = &u6;
1786     args = sizeof (u6);
1787     break;
1788   default:
1789     GNUNET_break (0);
1790     return;
1791   }
1792   LOG (GNUNET_ERROR_TYPE_DEBUG,
1793        "Received message with %u bytes from peer `%s' at `%s'\n",
1794        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1795        GNUNET_a2s (sender_addr, sender_addr_len));
1796
1797   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1798   s = udp_plugin_get_session(plugin, address);
1799   GNUNET_free (address);
1800
1801   /* iterate over all embedded messages */
1802   si.session = s;
1803   si.sender = msg->sender;
1804   si.arg = arg;
1805   si.args = args;
1806   s->rc++;
1807   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1808                              ntohs (msg->header.size) -
1809                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1810   s->rc--;
1811   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1812     free_session (s);
1813 }
1814
1815
1816 /**
1817  * Scan the heap for a receive context with the given address.
1818  *
1819  * @param cls the 'struct FindReceiveContext'
1820  * @param node internal node of the heap
1821  * @param element value stored at the node (a 'struct ReceiveContext')
1822  * @param cost cost associated with the node
1823  * @return GNUNET_YES if we should continue to iterate,
1824  *         GNUNET_NO if not.
1825  */
1826 static int
1827 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1828                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1829 {
1830   struct FindReceiveContext *frc = cls;
1831   struct DefragContext *e = element;
1832
1833   if ((frc->addr_len == e->addr_len) &&
1834       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1835   {
1836     frc->rc = e;
1837     return GNUNET_NO;
1838   }
1839   return GNUNET_YES;
1840 }
1841
1842
1843 /**
1844  * Process a defragmented message.
1845  *
1846  * @param cls the 'struct ReceiveContext'
1847  * @param msg the message
1848  */
1849 static void
1850 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1851 {
1852   struct DefragContext *rc = cls;
1853
1854   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1855   {
1856     GNUNET_break (0);
1857     return;
1858   }
1859   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1860   {
1861     GNUNET_break (0);
1862     return;
1863   }
1864   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1865                        rc->src_addr, rc->addr_len);
1866 }
1867
1868
1869 struct LookupContext
1870 {
1871   const struct sockaddr * addr;
1872
1873   struct Session *res;
1874
1875   size_t addrlen;
1876 };
1877
1878
1879 static int
1880 lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1881 {
1882   struct LookupContext *l_ctx = cls;
1883   struct Session * s = value;
1884
1885   if ((s->addrlen == l_ctx->addrlen) &&
1886       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1887   {
1888     l_ctx->res = s;
1889     return GNUNET_NO;
1890   }
1891   return GNUNET_YES;
1892 }
1893
1894
1895 /**
1896  * Transmit an acknowledgement.
1897  *
1898  * @param cls the 'struct ReceiveContext'
1899  * @param id message ID (unused)
1900  * @param msg ack to transmit
1901  */
1902 static void
1903 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1904 {
1905   struct DefragContext *rc = cls;
1906   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1907   struct UDP_ACK_Message *udp_ack;
1908   uint32_t delay = 0;
1909   struct UDP_MessageWrapper *udpw;
1910   struct Session *s;
1911   struct LookupContext l_ctx;
1912
1913   l_ctx.addr = rc->src_addr;
1914   l_ctx.addrlen = rc->addr_len;
1915   l_ctx.res = NULL;
1916   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1917       &lookup_session_by_addr_it,
1918       &l_ctx);
1919   s = l_ctx.res;
1920
1921   if (NULL == s)
1922     return;
1923
1924   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1925     delay = s->flow_delay_for_other_peer.rel_value;
1926
1927   LOG (GNUNET_ERROR_TYPE_DEBUG,
1928        "Sending ACK to `%s' including delay of %u ms\n",
1929        GNUNET_a2s (rc->src_addr,
1930                    (rc->src_addr->sa_family ==
1931                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1932                                                                      sockaddr_in6)),
1933        delay);
1934   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
1935   udpw->msg_size = msize;
1936   udpw->payload_size = 0;
1937   udpw->session = s;
1938   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1939   udpw->msg_buf = (char *)&udpw[1];
1940   udpw->msg_type = MSG_ACK;
1941   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
1942   udp_ack->header.size = htons ((uint16_t) msize);
1943   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1944   udp_ack->delay = htonl (delay);
1945   udp_ack->sender = *rc->plugin->env->my_identity;
1946   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1947   enqueue (rc->plugin, udpw);
1948 }
1949
1950
1951 static void 
1952 read_process_msg (struct Plugin *plugin,
1953                   const struct GNUNET_MessageHeader *msg,
1954                   const char *addr,
1955                   socklen_t fromlen)
1956 {
1957   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1958   {
1959     GNUNET_break_op (0);
1960     return;
1961   }
1962   process_udp_message (plugin, (const struct UDPMessage *) msg,
1963                        (const struct sockaddr *) addr, fromlen);
1964 }
1965
1966
1967 static void 
1968 read_process_ack (struct Plugin *plugin,
1969                   const struct GNUNET_MessageHeader *msg,
1970                   char *addr,
1971                   socklen_t fromlen)
1972 {
1973   const struct GNUNET_MessageHeader *ack;
1974   const struct UDP_ACK_Message *udp_ack;
1975   struct LookupContext l_ctx;
1976   struct Session *s;
1977   struct GNUNET_TIME_Relative flow_delay;
1978
1979   if (ntohs (msg->size) <
1980       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1981   {
1982     GNUNET_break_op (0);
1983     return;
1984   }
1985   udp_ack = (const struct UDP_ACK_Message *) msg;
1986   l_ctx.addr = (const struct sockaddr *) addr;
1987   l_ctx.addrlen = fromlen;
1988   l_ctx.res = NULL;
1989   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
1990                                          &lookup_session_by_addr_it,
1991                                          &l_ctx);
1992   s = l_ctx.res;
1993
1994   if ((NULL == s) || (NULL == s->frag_ctx))
1995   {
1996     return;
1997   }
1998
1999   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
2000   LOG (GNUNET_ERROR_TYPE_DEBUG, 
2001        "We received a sending delay of %llu\n",
2002        flow_delay.rel_value);
2003   s->flow_delay_from_other_peer =
2004       GNUNET_TIME_relative_to_absolute (flow_delay);
2005
2006   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2007   if (ntohs (ack->size) !=
2008       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
2009   {
2010     GNUNET_break_op (0);
2011     return;
2012   }
2013
2014   if (0 != memcmp (&l_ctx.res->target, &udp_ack->sender, sizeof (struct GNUNET_PeerIdentity)))
2015     GNUNET_break (0);
2016   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
2017   {
2018     LOG (GNUNET_ERROR_TYPE_DEBUG,
2019          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2020          (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2021          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2022     /* Expect more ACKs to arrive */
2023     return;
2024   }
2025
2026   LOG (GNUNET_ERROR_TYPE_DEBUG,
2027        "Message full ACK'ed\n",
2028        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2029        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2030
2031   /* Remove fragmented message after successful sending */
2032   fragmented_message_done (s->frag_ctx, GNUNET_OK);
2033 }
2034
2035
2036 static void 
2037 read_process_fragment (struct Plugin *plugin,
2038                        const struct GNUNET_MessageHeader *msg,
2039                        char *addr,
2040                        socklen_t fromlen)
2041 {
2042   struct DefragContext *d_ctx;
2043   struct GNUNET_TIME_Absolute now;
2044   struct FindReceiveContext frc;
2045
2046   frc.rc = NULL;
2047   frc.addr = (const struct sockaddr *) addr;
2048   frc.addr_len = fromlen;
2049
2050   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
2051        (unsigned int) ntohs (msg->size),
2052        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2053   /* Lookup existing receive context for this address */
2054   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2055                                  &find_receive_context,
2056                                  &frc);
2057   now = GNUNET_TIME_absolute_get ();
2058   d_ctx = frc.rc;
2059
2060   if (d_ctx == NULL)
2061   {
2062     /* Create a new defragmentation context */
2063     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
2064     memcpy (&d_ctx[1], addr, fromlen);
2065     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
2066     d_ctx->addr_len = fromlen;
2067     d_ctx->plugin = plugin;
2068     d_ctx->defrag =
2069         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
2070                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
2071                                           &fragment_msg_proc, &ack_proc);
2072     d_ctx->hnode =
2073         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
2074                                       (GNUNET_CONTAINER_HeapCostType)
2075                                       now.abs_value);
2076     LOG (GNUNET_ERROR_TYPE_DEBUG, 
2077          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2078          (unsigned int) ntohs (msg->size),
2079          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2080   }
2081   else
2082   {
2083     LOG (GNUNET_ERROR_TYPE_DEBUG,
2084          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2085          (unsigned int) ntohs (msg->size),
2086          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2087   }
2088
2089   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2090   {
2091     /* keep this 'rc' from expiring */
2092     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
2093                                        (GNUNET_CONTAINER_HeapCostType)
2094                                        now.abs_value);
2095   }
2096   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2097       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2098   {
2099     /* remove 'rc' that was inactive the longest */
2100     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2101     GNUNET_assert (NULL != d_ctx);
2102     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2103     GNUNET_free (d_ctx);
2104   }
2105 }
2106
2107
2108 /**
2109  * Read and process a message from the given socket.
2110  *
2111  * @param plugin the overall plugin
2112  * @param rsock socket to read from
2113  */
2114 static void
2115 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
2116 {
2117   socklen_t fromlen;
2118   char addr[32];
2119   char buf[65536] GNUNET_ALIGN;
2120   ssize_t size;
2121   const struct GNUNET_MessageHeader *msg;
2122
2123   fromlen = sizeof (addr);
2124   memset (&addr, 0, sizeof (addr));
2125   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
2126                                       (struct sockaddr *) &addr, &fromlen);
2127 #if MINGW
2128   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2129    * WSAECONNRESET error to indicate that previous sendto() (???)
2130    * on this socket has failed.
2131    */
2132   if ( (-1 == size) && (ECONNRESET == errno) )
2133     return;
2134 #endif
2135   if ( (-1 == size) || (size < sizeof (struct GNUNET_MessageHeader)))
2136   {
2137     GNUNET_break_op (0);
2138     return;
2139   }
2140   msg = (const struct GNUNET_MessageHeader *) buf;
2141
2142   LOG (GNUNET_ERROR_TYPE_DEBUG,
2143        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
2144        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
2145
2146   if (size != ntohs (msg->size))
2147   {
2148     GNUNET_break_op (0);
2149     return;
2150   }
2151
2152   GNUNET_STATISTICS_update (plugin->env->stats,
2153                             "# UDP, total, bytes, received",
2154                             size, GNUNET_NO);
2155
2156   switch (ntohs (msg->type))
2157   {
2158   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2159     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
2160     return;
2161
2162   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2163     read_process_msg (plugin, msg, addr, fromlen);
2164     return;
2165
2166   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2167     read_process_ack (plugin, msg, addr, fromlen);
2168     return;
2169
2170   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2171     read_process_fragment (plugin, msg, addr, fromlen);
2172     return;
2173
2174   default:
2175     GNUNET_break_op (0);
2176     return;
2177   }
2178 }
2179
2180 static struct UDP_MessageWrapper *
2181 remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
2182                                     struct GNUNET_NETWORK_Handle *sock)
2183 {
2184   struct UDP_MessageWrapper *udpw = NULL;
2185   struct GNUNET_TIME_Relative remaining;
2186
2187   udpw = head;
2188   while (udpw != NULL)
2189   {
2190     /* Find messages with timeout */
2191     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2192     if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2193     {
2194       /* Message timed out */
2195       switch (udpw->msg_type) {
2196         case MSG_UNFRAGMENTED:
2197           GNUNET_STATISTICS_update (plugin->env->stats,
2198                                     "# UDP, total, bytes, sent, timeout",
2199                                     udpw->msg_size, GNUNET_NO);
2200           GNUNET_STATISTICS_update (plugin->env->stats,
2201                                     "# UDP, total, messages, sent, timeout",
2202                                     1, GNUNET_NO);
2203           GNUNET_STATISTICS_update (plugin->env->stats,
2204                                     "# UDP, unfragmented msgs, messages, sent, timeout",
2205                                     1, GNUNET_NO);
2206           GNUNET_STATISTICS_update (plugin->env->stats,
2207                                     "# UDP, unfragmented msgs, bytes, sent, timeout",
2208                                     udpw->payload_size, GNUNET_NO);
2209           /* Not fragmented message */
2210           LOG (GNUNET_ERROR_TYPE_DEBUG,
2211                "Message for peer `%s' with size %u timed out\n",
2212                GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2213           call_continuation (udpw, GNUNET_SYSERR);
2214           /* Remove message */
2215           dequeue (plugin, udpw);
2216           GNUNET_free (udpw);
2217           break;
2218         case MSG_FRAGMENTED:
2219           /* Fragmented message */
2220           GNUNET_STATISTICS_update (plugin->env->stats,
2221                                     "# UDP, total, bytes, sent, timeout",
2222                                     udpw->frag_ctx->on_wire_size, GNUNET_NO);
2223           GNUNET_STATISTICS_update (plugin->env->stats,
2224                                     "# UDP, total, messages, sent, timeout",
2225                                     1, GNUNET_NO);
2226           call_continuation (udpw, GNUNET_SYSERR);
2227           LOG (GNUNET_ERROR_TYPE_DEBUG,
2228                "Fragment for message for peer `%s' with size %u timed out\n",
2229                GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->payload_size);
2230
2231
2232           GNUNET_STATISTICS_update (plugin->env->stats,
2233                                     "# UDP, fragmented msgs, messages, sent, timeout",
2234                                     1, GNUNET_NO);
2235           GNUNET_STATISTICS_update (plugin->env->stats,
2236                                     "# UDP, fragmented msgs, bytes, sent, timeout",
2237                                     udpw->frag_ctx->payload_size, GNUNET_NO);
2238           /* Remove fragmented message due to timeout */
2239           fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
2240           break;
2241         case MSG_ACK:
2242           GNUNET_STATISTICS_update (plugin->env->stats,
2243                                     "# UDP, total, bytes, sent, timeout",
2244                                     udpw->msg_size, GNUNET_NO);
2245           GNUNET_STATISTICS_update (plugin->env->stats,
2246                                     "# UDP, total, messages, sent, timeout",
2247                                     1, GNUNET_NO);
2248           LOG (GNUNET_ERROR_TYPE_DEBUG,
2249                "ACK Message for peer `%s' with size %u timed out\n",
2250                GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2251           call_continuation (udpw, GNUNET_SYSERR);
2252           dequeue (plugin, udpw);
2253           GNUNET_free (udpw);
2254           break;
2255         default:
2256           break;
2257       }
2258       if (sock == plugin->sockv4)
2259         udpw = plugin->ipv4_queue_head;
2260       if (sock == plugin->sockv6)
2261         udpw = plugin->ipv6_queue_head;
2262       else
2263       {
2264         GNUNET_break (0); /* should never happen */
2265         updw = NULL;
2266       }
2267       GNUNET_STATISTICS_update (plugin->env->stats,
2268                                 "# messages dismissed due to timeout",
2269                                 1, GNUNET_NO);
2270     }
2271     else
2272     {
2273       /* Message did not time out, check flow delay */
2274       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2275       if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2276       {
2277         /* this message is not delayed */
2278         LOG (GNUNET_ERROR_TYPE_DEBUG, 
2279              "Message for peer `%s' (%u bytes) is not delayed \n",
2280              GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2281         break; /* Found message to send, break */
2282       }
2283       else
2284       {
2285         /* Message is delayed, try next */
2286         LOG (GNUNET_ERROR_TYPE_DEBUG,
2287              "Message for peer `%s' (%u bytes) is delayed for %llu \n",
2288              GNUNET_i2s(&udpw->session->target), udpw->payload_size, remaining.rel_value);
2289         udpw = udpw->next;
2290       }
2291     }
2292   }
2293   return udpw;
2294 }
2295
2296
2297 static void
2298 analyze_send_error (struct Plugin *plugin,
2299                     const struct sockaddr * sa,
2300                     socklen_t slen,
2301                     int error)
2302 {
2303   static int network_down_error;
2304   struct GNUNET_ATS_Information type;
2305
2306  type = plugin->env->get_address_type (plugin->env->cls,sa, slen);
2307  if (((GNUNET_ATS_NET_LAN == ntohl(type.value)) || (GNUNET_ATS_NET_WAN == ntohl(type.value))) &&
2308      ((ENETUNREACH == errno) || (ENETDOWN == errno)))
2309  {
2310    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in)))
2311    {
2312      /* IPv4: "Network unreachable" or "Network down"
2313       *
2314       * This indicates we do not have connectivity
2315       */
2316      LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2317          _("UDP could not transmit message to `%s': "
2318            "Network seems down, please check your network configuration\n"),
2319          GNUNET_a2s (sa, slen));
2320    }
2321    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in6)))
2322    {
2323      /* IPv6: "Network unreachable" or "Network down"
2324       *
2325       * This indicates that this system is IPv6 enabled, but does not
2326       * have a valid global IPv6 address assigned or we do not have
2327       * connectivity
2328       */
2329
2330     LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2331         _("UDP could not transmit message to `%s': "
2332           "Please check your network configuration and disable IPv6 if your "
2333           "connection does not have a global IPv6 address\n"),
2334         GNUNET_a2s (sa, slen));
2335    }
2336  }
2337  else
2338  {
2339    LOG (GNUNET_ERROR_TYPE_WARNING,
2340       "UDP could not transmit message to `%s': `%s'\n",
2341       GNUNET_a2s (sa, slen), STRERROR (error));
2342  }
2343 }
2344
2345 static size_t
2346 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
2347 {
2348   const struct sockaddr * sa;
2349   ssize_t sent;
2350   socklen_t slen;
2351
2352   struct UDP_MessageWrapper *udpw = NULL;
2353
2354   /* Find message to send */
2355   udpw = remove_timeout_messages_and_select ((sock == plugin->sockv4) ? plugin->ipv4_queue_head : plugin->ipv6_queue_head,
2356                                              sock);
2357   if (NULL == udpw)
2358     return 0; /* No message to send */
2359
2360   sa = udpw->session->sock_addr;
2361   slen = udpw->session->addrlen;
2362
2363   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->msg_buf, udpw->msg_size, sa, slen);
2364
2365   if (GNUNET_SYSERR == sent)
2366   {
2367     /* Failure */
2368     analyze_send_error (plugin, sa, slen, errno);
2369     call_continuation(udpw, GNUNET_SYSERR);
2370     GNUNET_STATISTICS_update (plugin->env->stats,
2371                             "# UDP, total, bytes, sent, failure",
2372                             sent, GNUNET_NO);
2373     GNUNET_STATISTICS_update (plugin->env->stats,
2374                               "# UDP, total, messages, sent, failure",
2375                               1, GNUNET_NO);
2376   }
2377   else
2378   {
2379     /* Success */
2380     LOG (GNUNET_ERROR_TYPE_DEBUG,
2381          "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
2382          (unsigned int) (udpw->msg_size), GNUNET_i2s(&udpw->session->target) ,GNUNET_a2s (sa, slen), (int) sent,
2383          (sent < 0) ? STRERROR (errno) : "ok");
2384     GNUNET_STATISTICS_update (plugin->env->stats,
2385                               "# UDP, total, bytes, sent, success",
2386                               sent, GNUNET_NO);
2387     GNUNET_STATISTICS_update (plugin->env->stats,
2388                               "# UDP, total, messages, sent, success",
2389                               1, GNUNET_NO);
2390     if (NULL != udpw->frag_ctx)
2391         udpw->frag_ctx->on_wire_size += udpw->msg_size;
2392     call_continuation (udpw, GNUNET_OK);
2393   }
2394   dequeue (plugin, udpw);
2395   GNUNET_free (udpw);
2396   udpw = NULL;
2397
2398   return sent;
2399 }
2400
2401
2402 /**
2403  * We have been notified that our readset has something to read.  We don't
2404  * know which socket needs to be read, so we have to check each one
2405  * Then reschedule this function to be called again once more is available.
2406  *
2407  * @param cls the plugin handle
2408  * @param tc the scheduling context (for rescheduling this function again)
2409  */
2410 static void
2411 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2412 {
2413   struct Plugin *plugin = cls;
2414
2415   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2416   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2417     return;
2418   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
2419        (NULL != plugin->sockv4) &&
2420        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)) )
2421     udp_select_read (plugin, plugin->sockv4);
2422   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2423        (NULL != plugin->sockv4) && 
2424        (NULL != plugin->ipv4_queue_head) &&
2425        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)) )
2426     udp_select_send (plugin, plugin->sockv4);   
2427   schedule_select (plugin);
2428 }
2429
2430
2431 /**
2432  * We have been notified that our readset has something to read.  We don't
2433  * know which socket needs to be read, so we have to check each one
2434  * Then reschedule this function to be called again once more is available.
2435  *
2436  * @param cls the plugin handle
2437  * @param tc the scheduling context (for rescheduling this function again)
2438  */
2439 static void
2440 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2441 {
2442   struct Plugin *plugin = cls;
2443
2444   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2445   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2446     return;
2447   if ( ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) &&
2448        (NULL != plugin->sockv6) &&
2449        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)) )
2450     udp_select_read (plugin, plugin->sockv6);
2451   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2452        (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
2453        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )    
2454     udp_select_send (plugin, plugin->sockv6);
2455   schedule_select (plugin);
2456 }
2457
2458
2459 static int
2460 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
2461 {
2462   int tries;
2463   int sockets_created = 0;
2464   struct sockaddr *serverAddr;
2465   struct sockaddr *addrs[2];
2466   socklen_t addrlens[2];
2467   socklen_t addrlen;
2468
2469   /* Create IPv6 socket */
2470   if (plugin->enable_ipv6 == GNUNET_YES)
2471   {
2472     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2473     if (NULL == plugin->sockv6)
2474     {
2475       LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2476       plugin->enable_ipv6 = GNUNET_NO;
2477     }
2478     else
2479     {
2480 #if HAVE_SOCKADDR_IN_SIN_LEN
2481       serverAddrv6->sin6_len = sizeof (serverAddrv6);
2482 #endif
2483       serverAddrv6->sin6_family = AF_INET6;
2484       serverAddrv6->sin6_addr = in6addr_any;
2485       serverAddrv6->sin6_port = htons (plugin->port);
2486       addrlen = sizeof (struct sockaddr_in6);
2487       serverAddr = (struct sockaddr *) serverAddrv6;
2488       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
2489            ntohs (serverAddrv6->sin6_port));
2490       tries = 0;
2491       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
2492              GNUNET_OK)
2493       {
2494         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
2495         LOG (GNUNET_ERROR_TYPE_DEBUG,
2496              "IPv6 Binding failed, trying new port %d\n",
2497              ntohs (serverAddrv6->sin6_port));
2498         tries++;
2499         if (tries > 10)
2500         {
2501           GNUNET_NETWORK_socket_close (plugin->sockv6);
2502           plugin->sockv6 = NULL;
2503           break;
2504         }
2505       }
2506       if (plugin->sockv6 != NULL)
2507       {
2508         LOG (GNUNET_ERROR_TYPE_DEBUG,
2509              "IPv6 socket created on port %d\n",
2510              ntohs (serverAddrv6->sin6_port));
2511         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
2512         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2513         sockets_created++;
2514       }
2515     }
2516   }
2517
2518   /* Create IPv4 socket */
2519   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2520   if (NULL == plugin->sockv4)
2521   {
2522     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2523   }
2524   else
2525   {
2526 #if HAVE_SOCKADDR_IN_SIN_LEN
2527     serverAddrv4->sin_len = sizeof (serverAddrv4);
2528 #endif
2529     serverAddrv4->sin_family = AF_INET;
2530     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2531     serverAddrv4->sin_port = htons (plugin->port);
2532     addrlen = sizeof (struct sockaddr_in);
2533     serverAddr = (struct sockaddr *) serverAddrv4;
2534
2535     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2536          ntohs (serverAddrv4->sin_port));
2537     tries = 0;
2538     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2539            GNUNET_OK)
2540     {
2541       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2542       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2543            ntohs (serverAddrv4->sin_port));
2544       tries++;
2545       if (tries > 10)
2546       {
2547         GNUNET_NETWORK_socket_close (plugin->sockv4);
2548         plugin->sockv4 = NULL;
2549         break;
2550       }
2551     }
2552     if (plugin->sockv4 != NULL)
2553     {
2554       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2555       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2556       sockets_created++;
2557     }
2558   }
2559
2560   /* Create file descriptors */
2561   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2562   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2563   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2564   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2565   if (NULL != plugin->sockv4)
2566   {
2567     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2568     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2569   }
2570
2571   if (0 == sockets_created)
2572     LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2573   if (plugin->enable_ipv6 == GNUNET_YES)
2574   {
2575     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2576     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2577     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2578     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2579     if (NULL != plugin->sockv6)
2580     {
2581       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2582       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2583     }
2584   }
2585   schedule_select (plugin);
2586   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2587                            GNUNET_NO, plugin->port,
2588                            sockets_created,
2589                            (const struct sockaddr **) addrs, addrlens,
2590                            &udp_nat_port_map_callback, NULL, plugin);
2591
2592   return sockets_created;
2593 }
2594
2595
2596 /**
2597  * The exported method. Makes the core api available via a global and
2598  * returns the udp transport API.
2599  *
2600  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2601  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2602  */
2603 void *
2604 libgnunet_plugin_transport_udp_init (void *cls)
2605 {
2606   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2607   struct GNUNET_TRANSPORT_PluginFunctions *api;
2608   struct Plugin *p;
2609   unsigned long long port;
2610   unsigned long long aport;
2611   unsigned long long broadcast;
2612   unsigned long long udp_max_bps;
2613   unsigned long long enable_v6;
2614   char * bind4_address;
2615   char * bind6_address;
2616   char * fancy_interval;
2617   struct GNUNET_TIME_Relative interval;
2618   struct sockaddr_in serverAddrv4;
2619   struct sockaddr_in6 serverAddrv6;
2620   int res;
2621
2622   if (NULL == env->receive)
2623   {
2624     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2625        initialze the plugin or the API */
2626     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2627     api->cls = NULL;
2628     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2629     api->address_to_string = &udp_address_to_string;
2630     api->string_to_address = &udp_string_to_address;
2631     return api;
2632   }
2633
2634   GNUNET_assert( NULL != env->stats);
2635
2636   /* Get port number */
2637   if (GNUNET_OK !=
2638       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2639                                              &port))
2640     port = 2086;
2641   if (GNUNET_OK !=
2642       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2643                                              "ADVERTISED_PORT", &aport))
2644     aport = port;
2645   if (port > 65535)
2646   {
2647     LOG (GNUNET_ERROR_TYPE_WARNING,
2648          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2649          65535);
2650     return NULL;
2651   }
2652
2653   /* Protocols */
2654   if ((GNUNET_YES ==
2655        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2656                                              "DISABLEV6")))
2657   {
2658     enable_v6 = GNUNET_NO;
2659   }
2660   else
2661     enable_v6 = GNUNET_YES;
2662
2663   /* Addresses */
2664   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2665   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2666
2667   if (GNUNET_YES ==
2668       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2669                                              "BINDTO", &bind4_address))
2670   {
2671     LOG (GNUNET_ERROR_TYPE_DEBUG,
2672          "Binding udp plugin to specific address: `%s'\n",
2673          bind4_address);
2674     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2675     {
2676       GNUNET_free (bind4_address);
2677       return NULL;
2678     }
2679   }
2680
2681   if (GNUNET_YES ==
2682       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2683                                              "BINDTO6", &bind6_address))
2684   {
2685     LOG (GNUNET_ERROR_TYPE_DEBUG,
2686          "Binding udp plugin to specific address: `%s'\n",
2687          bind6_address);
2688     if (1 !=
2689         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2690     {
2691       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2692            bind6_address);
2693       GNUNET_free_non_null (bind4_address);
2694       GNUNET_free (bind6_address);
2695       return NULL;
2696     }
2697   }
2698
2699   /* Enable neighbour discovery */
2700   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2701                                             "BROADCAST");
2702   if (broadcast == GNUNET_SYSERR)
2703     broadcast = GNUNET_NO;
2704
2705   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2706                                            "BROADCAST_INTERVAL", &fancy_interval))
2707   {
2708     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2709   }
2710   else
2711   {
2712      if (GNUNET_SYSERR == GNUNET_STRINGS_fancy_time_to_relative(fancy_interval, &interval))
2713      {
2714        interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
2715      }
2716      GNUNET_free (fancy_interval);
2717   }
2718
2719   /* Maximum datarate */
2720   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2721                                              "MAX_BPS", &udp_max_bps))
2722   {
2723     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2724   }
2725
2726   p = GNUNET_malloc (sizeof (struct Plugin));
2727   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2728
2729   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
2730                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2731   p->sessions = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
2732   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2733   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
2734   p->port = port;
2735   p->aport = aport;
2736   p->broadcast_interval = interval;
2737   p->enable_ipv6 = enable_v6;
2738   p->env = env;
2739
2740   plugin = p;
2741
2742   api->cls = p;
2743   api->send = NULL;
2744   api->disconnect = &udp_disconnect;
2745   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2746   api->address_to_string = &udp_address_to_string;
2747   api->string_to_address = &udp_string_to_address;
2748   api->check_address = &udp_plugin_check_address;
2749   api->get_session = &udp_plugin_get_session;
2750   api->send = &udp_plugin_send;
2751
2752   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2753   res = setup_sockets (p, &serverAddrv6, &serverAddrv4);
2754   if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
2755   {
2756     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2757     GNUNET_free (p);
2758     GNUNET_free (api);
2759     return NULL;
2760   }
2761
2762   if (broadcast == GNUNET_YES)
2763   {
2764     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2765     setup_broadcast (p, &serverAddrv6, &serverAddrv4);
2766   }
2767
2768   GNUNET_free_non_null (bind4_address);
2769   GNUNET_free_non_null (bind6_address);
2770   return api;
2771 }
2772
2773
2774 static int
2775 heap_cleanup_iterator (void *cls,
2776                        struct GNUNET_CONTAINER_HeapNode *
2777                        node, void *element,
2778                        GNUNET_CONTAINER_HeapCostType
2779                        cost)
2780 {
2781   struct DefragContext * d_ctx = element;
2782
2783   GNUNET_CONTAINER_heap_remove_node (node);
2784   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2785   GNUNET_free (d_ctx);
2786
2787   return GNUNET_YES;
2788 }
2789
2790
2791 /**
2792  * The exported method. Makes the core api available via a global and
2793  * returns the udp transport API.
2794  *
2795  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2796  * @return NULL
2797  */
2798 void *
2799 libgnunet_plugin_transport_udp_done (void *cls)
2800 {
2801   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2802   struct Plugin *plugin = api->cls;
2803
2804   if (NULL == plugin)
2805   {
2806     GNUNET_free (api);
2807     return NULL;
2808   }
2809
2810   stop_broadcast (plugin);
2811   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2812   {
2813     GNUNET_SCHEDULER_cancel (plugin->select_task);
2814     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2815   }
2816   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2817   {
2818     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2819     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2820   }
2821
2822   /* Closing sockets */
2823   if (plugin->sockv4 != NULL)
2824   {
2825     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2826     plugin->sockv4 = NULL;
2827   }
2828   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2829   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2830
2831   if (plugin->sockv6 != NULL)
2832   {
2833     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2834     plugin->sockv6 = NULL;
2835
2836     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2837     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2838   }
2839
2840   GNUNET_NAT_unregister (plugin->nat);
2841
2842   if (plugin->defrag_ctxs != NULL)
2843   {
2844     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2845         heap_cleanup_iterator, NULL);
2846     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2847     plugin->defrag_ctxs = NULL;
2848   }
2849   if (plugin->mst != NULL)
2850   {
2851     GNUNET_SERVER_mst_destroy(plugin->mst);
2852     plugin->mst = NULL;
2853   }
2854
2855   /* Clean up leftover messages */
2856   struct UDP_MessageWrapper * udpw;
2857   udpw = plugin->ipv4_queue_head;
2858   while (udpw != NULL)
2859   {
2860     struct UDP_MessageWrapper *tmp = udpw->next;
2861     dequeue (plugin, udpw);
2862     call_continuation(udpw, GNUNET_SYSERR);
2863     GNUNET_free (udpw);
2864
2865     udpw = tmp;
2866   }
2867   udpw = plugin->ipv6_queue_head;
2868   while (udpw != NULL)
2869   {
2870     struct UDP_MessageWrapper *tmp = udpw->next;
2871     dequeue (plugin, udpw);
2872     call_continuation(udpw, GNUNET_SYSERR);
2873     GNUNET_free (udpw);
2874
2875     udpw = tmp;
2876   }
2877
2878   /* Clean up sessions */
2879   LOG (GNUNET_ERROR_TYPE_DEBUG,
2880        "Cleaning up sessions\n");
2881   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2882   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2883
2884   plugin->nat = NULL;
2885   GNUNET_free (plugin);
2886   GNUNET_free (api);
2887   return NULL;
2888 }
2889
2890
2891 /* end of plugin_transport_udp.c */