f67bbd915600bc0319b34d6c8d4a54eb1f0f4696
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45
46 /**
47  * Number of messages we can defragment in parallel.  We only really
48  * defragment 1 message at a time, but if messages get re-ordered, we
49  * may want to keep knowledge about the previous message to avoid
50  * discarding the current message in favor of a single fragment of a
51  * previous message.  3 should be good since we don't expect massive
52  * message reorderings with UDP.
53  */
54 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
55
56 /**
57  * We keep a defragmentation queue per sender address.  How many
58  * sender addresses do we support at the same time? Memory consumption
59  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
60  * value. (So 128 corresponds to 12 MB and should suffice for
61  * connecting to roughly 128 peers via UDP).
62  */
63 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
64
65
66
67 /**
68  * Closure for 'append_port'.
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_TRANSPORT_AddressStringCallback asc;
76
77   /**
78    * Clsoure for 'asc'.
79    */
80   void *asc_cls;
81
82   /**
83    * Port to add after the IP address.
84    */
85   uint16_t port;
86 };
87
88
89 enum UDP_MessageType
90 {
91   UNDEFINED = 0,
92   MSG_FRAGMENTED = 1,
93   MSG_FRAGMENTED_COMPLETE = 2,
94   MSG_UNFRAGMENTED = 3,
95   MSG_ACK = 4,
96   MSG_BEACON = 5
97 };
98
99 struct Session
100 {
101   /**
102    * Which peer is this session for?
103    */
104   struct GNUNET_PeerIdentity target;
105
106   struct UDP_FragmentationContext * frag_ctx;
107
108   /**
109    * Address of the other peer
110    */
111   const struct sockaddr *sock_addr;
112
113   /**
114    * Desired delay for next sending we send to other peer
115    */
116   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
117
118   /**
119    * Desired delay for next sending we received from other peer
120    */
121   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
122
123   /**
124    * Session timeout task
125    */
126   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
127
128   /**
129    * expected delay for ACKs
130    */
131   struct GNUNET_TIME_Relative last_expected_ack_delay;
132
133   /**
134    * desired delay between UDP messages
135    */
136   struct GNUNET_TIME_Relative last_expected_msg_delay;
137
138   struct GNUNET_ATS_Information ats;
139
140   size_t addrlen;
141
142
143   unsigned int rc;
144
145   int in_destroy;
146 };
147
148
149 struct SessionCompareContext
150 {
151   struct Session *res;
152   const struct GNUNET_HELLO_Address *addr;
153 };
154
155
156 /**
157  * Closure for 'process_inbound_tokenized_messages'
158  */
159 struct SourceInformation
160 {
161   /**
162    * Sender identity.
163    */
164   struct GNUNET_PeerIdentity sender;
165
166   /**
167    * Source address.
168    */
169   const void *arg;
170
171   struct Session *session;
172   /**
173    * Number of bytes in source address.
174    */
175   size_t args;
176
177 };
178
179
180 /**
181  * Closure for 'find_receive_context'.
182  */
183 struct FindReceiveContext
184 {
185   /**
186    * Where to store the result.
187    */
188   struct DefragContext *rc;
189
190   /**
191    * Address to find.
192    */
193   const struct sockaddr *addr;
194
195   struct Session *session;
196
197   /**
198    * Number of bytes in 'addr'.
199    */
200   socklen_t addr_len;
201
202 };
203
204
205
206 /**
207  * Data structure to track defragmentation contexts based
208  * on the source of the UDP traffic.
209  */
210 struct DefragContext
211 {
212
213   /**
214    * Defragmentation context.
215    */
216   struct GNUNET_DEFRAGMENT_Context *defrag;
217
218   /**
219    * Source address this receive context is for (allocated at the
220    * end of the struct).
221    */
222   const struct sockaddr *src_addr;
223
224   /**
225    * Reference to master plugin struct.
226    */
227   struct Plugin *plugin;
228
229   /**
230    * Node in the defrag heap.
231    */
232   struct GNUNET_CONTAINER_HeapNode *hnode;
233
234   /**
235    * Length of 'src_addr'
236    */
237   size_t addr_len;
238 };
239
240
241
242 /**
243  * Context to send fragmented messages
244  */
245 struct UDP_FragmentationContext
246 {
247   /**
248    * Next in linked list
249    */
250   struct UDP_FragmentationContext * next;
251
252   /**
253    * Previous in linked list
254    */
255   struct UDP_FragmentationContext * prev;
256
257   /**
258    * The plugin
259    */
260   struct Plugin * plugin;
261
262   /**
263    * Handle for GNUNET_FRAGMENT context
264    */
265   struct GNUNET_FRAGMENT_Context * frag;
266
267   /**
268    * The session this fragmentation context belongs to
269    */
270   struct Session * session;
271
272   /**
273    * Function to call upon completion of the transmission.
274    */
275   GNUNET_TRANSPORT_TransmitContinuation cont;
276
277   /**
278    * Closure for 'cont'.
279    */
280   void *cont_cls;
281
282   /**
283    * Message timeout
284    */
285   struct GNUNET_TIME_Absolute timeout;
286
287   /**
288    * Payload size of original unfragmented message
289    */
290   size_t payload_size;
291
292   /**
293    * Bytes used to send all fragments on wire including UDP overhead
294    */
295   size_t on_wire_size;
296
297   unsigned int fragments_used;
298
299 };
300
301
302 struct UDP_MessageWrapper
303 {
304   /**
305    * Session this message belongs to
306    */
307   struct Session *session;
308
309   /**
310    * DLL of messages
311    * previous element
312    */
313   struct UDP_MessageWrapper *prev;
314
315   /**
316    * DLL of messages
317    * previous element
318    */
319   struct UDP_MessageWrapper *next;
320
321   /**
322    * Message type
323    * According to UDP_MessageType
324    */
325   int msg_type;
326
327   /**
328    * Message with size msg_size including UDP specific overhead
329    */
330   char *msg_buf;
331
332   /**
333    * Size of UDP message to send including UDP specific overhead
334    */
335   size_t msg_size;
336
337   /**
338    * Payload size of original message
339    */
340   size_t payload_size;
341
342   /**
343    * Message timeout
344    */
345   struct GNUNET_TIME_Absolute timeout;
346
347   /**
348    * Function to call upon completion of the transmission.
349    */
350   GNUNET_TRANSPORT_TransmitContinuation cont;
351
352   /**
353    * Closure for 'cont'.
354    */
355   void *cont_cls;
356
357   /**
358    * Fragmentation context
359    * frag_ctx == NULL if transport <= MTU
360    * frag_ctx != NULL if transport > MTU
361    */
362   struct UDP_FragmentationContext *frag_ctx;
363 };
364
365
366 /**
367  * UDP ACK Message-Packet header (after defragmentation).
368  */
369 struct UDP_ACK_Message
370 {
371   /**
372    * Message header.
373    */
374   struct GNUNET_MessageHeader header;
375
376   /**
377    * Desired delay for flow control
378    */
379   uint32_t delay;
380
381   /**
382    * What is the identity of the sender
383    */
384   struct GNUNET_PeerIdentity sender;
385
386 };
387
388 /**
389  * Encapsulation of all of the state of the plugin.
390  */
391 struct Plugin * plugin;
392
393
394 /**
395  * We have been notified that our readset has something to read.  We don't
396  * know which socket needs to be read, so we have to check each one
397  * Then reschedule this function to be called again once more is available.
398  *
399  * @param cls the plugin handle
400  * @param tc the scheduling context (for rescheduling this function again)
401  */
402 static void
403 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
404
405
406 /**
407  * We have been notified that our readset has something to read.  We don't
408  * know which socket needs to be read, so we have to check each one
409  * Then reschedule this function to be called again once more is available.
410  *
411  * @param cls the plugin handle
412  * @param tc the scheduling context (for rescheduling this function again)
413  */
414 static void
415 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
416
417
418 /**
419  * Start session timeout
420  */
421 static void
422 start_session_timeout (struct Session *s);
423
424 /**
425  * Increment session timeout due to activity
426  */
427 static void
428 reschedule_session_timeout (struct Session *s);
429
430 /**
431  * Cancel timeout
432  */
433 static void
434 stop_session_timeout (struct Session *s);
435
436
437 /**
438  * (re)schedule select tasks for this plugin.
439  *
440  * @param plugin plugin to reschedule
441  */
442 static void
443 schedule_select (struct Plugin *plugin)
444 {
445   struct GNUNET_TIME_Relative min_delay;
446   struct UDP_MessageWrapper *udpw;
447
448   if (NULL != plugin->sockv4)
449   {
450     /* Find a message ready to send:
451      * Flow delay from other peer is expired or not set (0) */
452     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
453     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
454       min_delay = GNUNET_TIME_relative_min (min_delay,
455                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
456     
457     if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
458       GNUNET_SCHEDULER_cancel(plugin->select_task);
459
460     /* Schedule with:
461      * - write active set if message is ready
462      * - timeout minimum delay */
463     plugin->select_task =
464       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
465                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
466                                    plugin->rs_v4,
467                                    (0 == min_delay.rel_value) ? plugin->ws_v4 : NULL,
468                                    &udp_plugin_select, plugin);  
469   }
470   if (NULL != plugin->sockv6)
471   {
472     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
473     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
474       min_delay = GNUNET_TIME_relative_min (min_delay,
475                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
476     
477     if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task_v6)
478       GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
479     plugin->select_task_v6 =
480       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
481                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
482                                    plugin->rs_v6,
483                                    (0 == min_delay.rel_value) ? plugin->ws_v6 : NULL,
484                                    &udp_plugin_select_v6, plugin);
485   }
486 }
487
488
489 /**
490  * Function called for a quick conversion of the binary address to
491  * a numeric address.  Note that the caller must not free the
492  * address and that the next call to this function is allowed
493  * to override the address again.
494  *
495  * @param cls closure
496  * @param addr binary address
497  * @param addrlen length of the address
498  * @return string representing the same address
499  */
500 const char *
501 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
502 {
503   static char rbuf[INET6_ADDRSTRLEN + 10];
504   char buf[INET6_ADDRSTRLEN];
505   const void *sb;
506   struct in_addr a4;
507   struct in6_addr a6;
508   const struct IPv4UdpAddress *t4;
509   const struct IPv6UdpAddress *t6;
510   int af;
511   uint16_t port;
512
513   if (addrlen == sizeof (struct IPv6UdpAddress))
514   {
515     t6 = addr;
516     af = AF_INET6;
517     port = ntohs (t6->u6_port);
518     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
519     sb = &a6;
520   }
521   else if (addrlen == sizeof (struct IPv4UdpAddress))
522   {
523     t4 = addr;
524     af = AF_INET;
525     port = ntohs (t4->u4_port);
526     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
527     sb = &a4;
528   }
529   else
530   {
531     GNUNET_break_op (0);
532     return NULL;
533   }
534   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
535   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u",
536                    buf, port);
537   return rbuf;
538 }
539
540
541 /**
542  * Function called to convert a string address to
543  * a binary address.
544  *
545  * @param cls closure ('struct Plugin*')
546  * @param addr string address
547  * @param addrlen length of the address
548  * @param buf location to store the buffer
549  * @param added location to store the number of bytes in the buffer.
550  *        If the function returns GNUNET_SYSERR, its contents are undefined.
551  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
552  */
553 static int
554 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
555     void **buf, size_t *added)
556 {
557   struct sockaddr_storage socket_address;
558   
559   if ((NULL == addr) || (0 == addrlen))
560   {
561     GNUNET_break (0);
562     return GNUNET_SYSERR;
563   }
564
565   if ('\0' != addr[addrlen - 1])
566   {
567     return GNUNET_SYSERR;
568   }
569
570   if (strlen (addr) != addrlen - 1)
571   {
572     return GNUNET_SYSERR;
573   }
574
575   if (GNUNET_OK != GNUNET_STRINGS_to_address_ip (addr, strlen (addr),
576                                                  &socket_address))
577   {
578     return GNUNET_SYSERR;
579   }
580
581   switch (socket_address.ss_family)
582   {
583   case AF_INET:
584     {
585       struct IPv4UdpAddress *u4;
586       struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
587       u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
588       u4->ipv4_addr = in4->sin_addr.s_addr;
589       u4->u4_port = in4->sin_port;
590       *buf = u4;
591       *added = sizeof (struct IPv4UdpAddress);
592       return GNUNET_OK;
593     }
594   case AF_INET6:
595     {
596       struct IPv6UdpAddress *u6;
597       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
598       u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
599       u6->ipv6_addr = in6->sin6_addr;
600       u6->u6_port = in6->sin6_port;
601       *buf = u6;
602       *added = sizeof (struct IPv6UdpAddress);
603       return GNUNET_OK;
604     }
605   default:
606     GNUNET_break (0);
607     return GNUNET_SYSERR;
608   }
609 }
610
611
612 /**
613  * Append our port and forward the result.
614  *
615  * @param cls a 'struct PrettyPrinterContext'
616  * @param hostname result from DNS resolver
617  */
618 static void
619 append_port (void *cls, const char *hostname)
620 {
621   struct PrettyPrinterContext *ppc = cls;
622   char *ret;
623
624   if (hostname == NULL)
625   {
626     ppc->asc (ppc->asc_cls, NULL);
627     GNUNET_free (ppc);
628     return;
629   }
630   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
631   ppc->asc (ppc->asc_cls, ret);
632   GNUNET_free (ret);
633 }
634
635
636 /**
637  * Convert the transports address to a nice, human-readable
638  * format.
639  *
640  * @param cls closure
641  * @param type name of the transport that generated the address
642  * @param addr one of the addresses of the host, NULL for the last address
643  *        the specific address format depends on the transport
644  * @param addrlen length of the address
645  * @param numeric should (IP) addresses be displayed in numeric form?
646  * @param timeout after how long should we give up?
647  * @param asc function to call on each string
648  * @param asc_cls closure for asc
649  */
650 static void
651 udp_plugin_address_pretty_printer (void *cls, const char *type,
652                                    const void *addr, size_t addrlen,
653                                    int numeric,
654                                    struct GNUNET_TIME_Relative timeout,
655                                    GNUNET_TRANSPORT_AddressStringCallback asc,
656                                    void *asc_cls)
657 {
658   struct PrettyPrinterContext *ppc;
659   const void *sb;
660   size_t sbs;
661   struct sockaddr_in a4;
662   struct sockaddr_in6 a6;
663   const struct IPv4UdpAddress *u4;
664   const struct IPv6UdpAddress *u6;
665   uint16_t port;
666
667   if (addrlen == sizeof (struct IPv6UdpAddress))
668   {
669     u6 = addr;
670     memset (&a6, 0, sizeof (a6));
671     a6.sin6_family = AF_INET6;
672 #if HAVE_SOCKADDR_IN_SIN_LEN
673     a6.sin6_len = sizeof (a6);
674 #endif
675     a6.sin6_port = u6->u6_port;
676     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
677     port = ntohs (u6->u6_port);
678     sb = &a6;
679     sbs = sizeof (a6);
680   }
681   else if (addrlen == sizeof (struct IPv4UdpAddress))
682   {
683     u4 = addr;
684     memset (&a4, 0, sizeof (a4));
685     a4.sin_family = AF_INET;
686 #if HAVE_SOCKADDR_IN_SIN_LEN
687     a4.sin_len = sizeof (a4);
688 #endif
689     a4.sin_port = u4->u4_port;
690     a4.sin_addr.s_addr = u4->ipv4_addr;
691     port = ntohs (u4->u4_port);
692     sb = &a4;
693     sbs = sizeof (a4);
694   }
695   else if (0 == addrlen)
696   {
697     asc (asc_cls, "<inbound connection>");
698     asc (asc_cls, NULL);
699     return;
700   }
701   else
702   {
703     /* invalid address */
704     GNUNET_break_op (0);
705     asc (asc_cls, NULL);
706     return;
707   }
708   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
709   ppc->asc = asc;
710   ppc->asc_cls = asc_cls;
711   ppc->port = port;
712   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
713 }
714
715
716 static void
717 call_continuation (struct UDP_MessageWrapper *udpw, int result)
718 {
719   size_t overhead;
720   struct UDP_MessageWrapper dummy;
721
722   LOG (GNUNET_ERROR_TYPE_DEBUG,
723       "Calling continuation for %u byte message to `%s' with result %s\n",
724       udpw->payload_size, GNUNET_i2s (&udpw->session->target),
725       (GNUNET_OK == result) ? "OK" : "SYSERR");
726
727   if ((udpw->msg_size - udpw->payload_size) >= 0)
728     overhead = udpw->msg_size - udpw->payload_size;
729   else
730     overhead = udpw->msg_size;
731
732   switch (result) {
733     case GNUNET_OK:
734       switch (udpw->msg_type) {
735         case MSG_UNFRAGMENTED:
736           if (NULL != udpw->cont)
737           {
738             /* Transport continuation */
739             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
740                       udpw->payload_size, overhead);
741           }
742           GNUNET_STATISTICS_update (plugin->env->stats,
743                                     "# UDP, unfragmented msgs, messages, sent, success",
744                                     1, GNUNET_NO);
745           GNUNET_STATISTICS_update (plugin->env->stats,
746                                     "# UDP, unfragmented msgs, bytes payload, sent, success",
747                                     udpw->payload_size, GNUNET_NO);
748           GNUNET_STATISTICS_update (plugin->env->stats,
749                                     "# UDP, unfragmented msgs, bytes overhead, sent, success",
750                                     overhead, GNUNET_NO);
751           GNUNET_STATISTICS_update (plugin->env->stats,
752                                     "# UDP, total, bytes overhead, sent",
753                                     overhead, GNUNET_NO);
754           GNUNET_STATISTICS_update (plugin->env->stats,
755                                     "# UDP, total, bytes payload, sent",
756                                     udpw->payload_size, GNUNET_NO);
757           break;
758         case MSG_FRAGMENTED_COMPLETE:
759           GNUNET_assert (NULL != udpw->frag_ctx);
760           if (udpw->frag_ctx->cont != NULL)
761             udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_OK,
762                                udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
763           GNUNET_STATISTICS_update (plugin->env->stats,
764                                     "# UDP, fragmented msgs, messages, sent, success",
765                                     1, GNUNET_NO);
766           GNUNET_STATISTICS_update (plugin->env->stats,
767                                     "# UDP, fragmented msgs, bytes payload, sent, success",
768                                     udpw->payload_size, GNUNET_NO);
769           GNUNET_STATISTICS_update (plugin->env->stats,
770                                     "# UDP, fragmented msgs, bytes overhead, sent, success",
771                                     overhead, GNUNET_NO);
772           GNUNET_STATISTICS_update (plugin->env->stats,
773                                     "# UDP, total, bytes overhead, sent",
774                                     overhead, GNUNET_NO);
775           GNUNET_STATISTICS_update (plugin->env->stats,
776                                     "# UDP, total, bytes payload, sent",
777                                     udpw->payload_size, GNUNET_NO);
778           GNUNET_STATISTICS_update (plugin->env->stats,
779                                     "# UDP, fragmented msgs, messages, pending",
780                                     -1, GNUNET_NO);
781           break;
782         case MSG_FRAGMENTED:
783           /* Fragmented message: enqueue next fragment */
784           if (NULL != udpw->cont)
785             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
786                       udpw->payload_size, udpw->msg_size);
787           GNUNET_STATISTICS_update (plugin->env->stats,
788                                     "# UDP, fragmented msgs, fragments, sent, success",
789                                     1, GNUNET_NO);
790           GNUNET_STATISTICS_update (plugin->env->stats,
791                                     "# UDP, fragmented msgs, fragments bytes, sent, success",
792                                     udpw->msg_size, GNUNET_NO);
793           break;
794         case MSG_ACK:
795           /* No continuation */
796           GNUNET_STATISTICS_update (plugin->env->stats,
797                                     "# UDP, ACK msgs, messages, sent, success",
798                                     1, GNUNET_NO);
799           GNUNET_STATISTICS_update (plugin->env->stats,
800                                     "# UDP, ACK msgs, bytes overhead, sent, success",
801                                     overhead, GNUNET_NO);
802           GNUNET_STATISTICS_update (plugin->env->stats,
803                                     "# UDP, total, bytes overhead, sent",
804                                     overhead, GNUNET_NO);
805           break;
806         case MSG_BEACON:
807           GNUNET_break (0);
808           break;
809         default:
810           LOG (GNUNET_ERROR_TYPE_ERROR,
811               "ERROR: %u\n", udpw->msg_type);
812           GNUNET_break (0);
813           break;
814       }
815       break;
816     case GNUNET_SYSERR:
817       switch (udpw->msg_type) {
818         case MSG_UNFRAGMENTED:
819           /* Unfragmented message: failed to send */
820           if (NULL != udpw->cont)
821             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
822                       udpw->payload_size, overhead);
823           GNUNET_STATISTICS_update (plugin->env->stats,
824                                   "# UDP, unfragmented msgs, messages, sent, failure",
825                                   1, GNUNET_NO);
826           GNUNET_STATISTICS_update (plugin->env->stats,
827                                     "# UDP, unfragmented msgs, bytes payload, sent, failure",
828                                     udpw->payload_size, GNUNET_NO);
829           GNUNET_STATISTICS_update (plugin->env->stats,
830                                     "# UDP, unfragmented msgs, bytes overhead, sent, failure",
831                                     overhead, GNUNET_NO);
832           break;
833         case MSG_FRAGMENTED_COMPLETE:
834           GNUNET_assert (NULL != udpw->frag_ctx);
835           if (udpw->frag_ctx->cont != NULL)
836             udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_SYSERR,
837                                udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
838           GNUNET_STATISTICS_update (plugin->env->stats,
839                                     "# UDP, fragmented msgs, messages, sent, failure",
840                                     1, GNUNET_NO);
841           GNUNET_STATISTICS_update (plugin->env->stats,
842                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
843                                     udpw->payload_size, GNUNET_NO);
844           GNUNET_STATISTICS_update (plugin->env->stats,
845                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
846                                     overhead, GNUNET_NO);
847           GNUNET_STATISTICS_update (plugin->env->stats,
848                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
849                                     overhead, GNUNET_NO);
850           GNUNET_STATISTICS_update (plugin->env->stats,
851                                     "# UDP, fragmented msgs, messages, pending",
852                                     -1, GNUNET_NO);
853           break;
854         case MSG_FRAGMENTED:
855           GNUNET_assert (NULL != udpw->frag_ctx);
856           /* Fragmented message: failed to send */
857           GNUNET_STATISTICS_update (plugin->env->stats,
858                                     "# UDP, fragmented msgs, fragments, sent, failure",
859                                     1, GNUNET_NO);
860           GNUNET_STATISTICS_update (plugin->env->stats,
861                                     "# UDP, fragmented msgs, fragments bytes, sent, failure",
862                                     udpw->msg_size, GNUNET_NO);
863
864           dummy.msg_type = MSG_FRAGMENTED_COMPLETE;
865           dummy.msg_buf = NULL;
866           dummy.msg_size = udpw->frag_ctx->on_wire_size;
867           dummy.payload_size = udpw->frag_ctx->payload_size;
868           dummy.frag_ctx = udpw->frag_ctx;
869           dummy.session = udpw->session;
870           call_continuation (&dummy, GNUNET_SYSERR);
871
872           break;
873         case MSG_ACK:
874           /* ACK message: failed to send */
875           GNUNET_STATISTICS_update (plugin->env->stats,
876                                     "# UDP, ACK msgs, messages, sent, failure",
877                                     1, GNUNET_NO);
878           break;
879         case MSG_BEACON:
880           /* Beacon message: failed to send */
881           GNUNET_break (0);
882           break;
883         default:
884           GNUNET_break (0);
885           break;
886       }
887       break;
888     default:
889       GNUNET_break (0);
890       break;
891   }
892 }
893
894
895 /**
896  * Check if the given port is plausible (must be either our listen
897  * port or our advertised port).  If it is neither, we return
898  * GNUNET_SYSERR.
899  *
900  * @param plugin global variables
901  * @param in_port port number to check
902  * @return GNUNET_OK if port is either open_port or adv_port
903  */
904 static int
905 check_port (struct Plugin *plugin, uint16_t in_port)
906 {
907   if ((in_port == plugin->port) || (in_port == plugin->aport))
908     return GNUNET_OK;
909   return GNUNET_SYSERR;
910 }
911
912
913 /**
914  * Function that will be called to check if a binary address for this
915  * plugin is well-formed and corresponds to an address for THIS peer
916  * (as per our configuration).  Naturally, if absolutely necessary,
917  * plugins can be a bit conservative in their answer, but in general
918  * plugins should make sure that the address does not redirect
919  * traffic to a 3rd party that might try to man-in-the-middle our
920  * traffic.
921  *
922  * @param cls closure, should be our handle to the Plugin
923  * @param addr pointer to the address
924  * @param addrlen length of addr
925  * @return GNUNET_OK if this is a plausible address for this peer
926  *         and transport, GNUNET_SYSERR if not
927  *
928  */
929 static int
930 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
931 {
932   struct Plugin *plugin = cls;
933   struct IPv4UdpAddress *v4;
934   struct IPv6UdpAddress *v6;
935
936   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
937       (addrlen != sizeof (struct IPv6UdpAddress)))
938   {
939     GNUNET_break_op (0);
940     return GNUNET_SYSERR;
941   }
942   if (addrlen == sizeof (struct IPv4UdpAddress))
943   {
944     v4 = (struct IPv4UdpAddress *) addr;
945     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
946       return GNUNET_SYSERR;
947     if (GNUNET_OK !=
948         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
949                                  sizeof (struct in_addr)))
950       return GNUNET_SYSERR;
951   }
952   else
953   {
954     v6 = (struct IPv6UdpAddress *) addr;
955     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
956     {
957       GNUNET_break_op (0);
958       return GNUNET_SYSERR;
959     }
960     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
961       return GNUNET_SYSERR;
962     if (GNUNET_OK !=
963         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
964                                  sizeof (struct in6_addr)))
965       return GNUNET_SYSERR;
966   }
967   return GNUNET_OK;
968 }
969
970
971 /**
972  * Task to free resources associated with a session.
973  *
974  * @param s session to free
975  */
976 static void
977 free_session (struct Session *s)
978 {
979   if (NULL != s->frag_ctx)
980   {
981     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag, NULL, NULL);
982     GNUNET_free (s->frag_ctx);
983     s->frag_ctx = NULL;
984   }
985   GNUNET_free (s);
986 }
987
988
989 static void
990 dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
991 {
992   GNUNET_STATISTICS_update (plugin->env->stats,
993                             "# UDP, total, bytes in buffers",
994                             -udpw->msg_size, GNUNET_NO);
995   GNUNET_STATISTICS_update (plugin->env->stats,
996                             "# UDP, total, msgs in buffers",
997                             -1, GNUNET_NO);
998   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
999     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1000                                  plugin->ipv4_queue_tail, udpw);
1001   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1002     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1003                                  plugin->ipv6_queue_tail, udpw);
1004 }
1005
1006 static void
1007 fragmented_message_done (struct UDP_FragmentationContext *fc, int result)
1008 {
1009   struct UDP_MessageWrapper *udpw;
1010   struct UDP_MessageWrapper *tmp;
1011   struct UDP_MessageWrapper dummy;
1012   struct Session *s = fc->session;
1013   LOG (GNUNET_ERROR_TYPE_DEBUG, "%p : Fragmented message removed with result %s\n", fc, (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1014
1015   /* Call continuation for fragmented message */
1016   dummy.msg_type = MSG_FRAGMENTED_COMPLETE;
1017   dummy.msg_buf = NULL;
1018   dummy.msg_size = s->frag_ctx->on_wire_size;
1019   dummy.payload_size = s->frag_ctx->payload_size;
1020   dummy.frag_ctx = s->frag_ctx;
1021   dummy.session = s;
1022
1023   call_continuation (&dummy, result);
1024
1025   /* Remove left-over fragments from queue */
1026   /* Remove leftover fragments from queue */
1027   if (s->addrlen == sizeof (struct sockaddr_in6))
1028   {
1029     udpw = plugin->ipv6_queue_head;
1030     while (NULL != udpw)
1031     {
1032       tmp = udpw->next;
1033       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1034       {
1035         dequeue (plugin, udpw);
1036         call_continuation (udpw, GNUNET_SYSERR);
1037         GNUNET_free (udpw);
1038       }
1039       udpw = tmp;
1040     }
1041   }
1042   if (s->addrlen == sizeof (struct sockaddr_in))
1043   {
1044     udpw = plugin->ipv4_queue_head;
1045     while (udpw!= NULL)
1046     {
1047       tmp = udpw->next;
1048       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
1049       {
1050         dequeue (plugin, udpw);
1051         call_continuation (udpw, GNUNET_SYSERR);
1052         GNUNET_free (udpw);
1053       }
1054       udpw = tmp;
1055     }
1056   }
1057
1058   /* Destroy fragmentation context */
1059   GNUNET_FRAGMENT_context_destroy (fc->frag,
1060                                      &s->last_expected_msg_delay,
1061                                      &s->last_expected_ack_delay);
1062   s->frag_ctx = NULL;
1063   GNUNET_free (fc);
1064 }
1065
1066 /**
1067  * Functions with this signature are called whenever we need
1068  * to close a session due to a disconnect or failure to
1069  * establish a connection.
1070  *
1071  * @param s session to close down
1072  */
1073 static void
1074 disconnect_session (struct Session *s)
1075 {
1076   struct UDP_MessageWrapper *udpw;
1077   struct UDP_MessageWrapper *next;
1078
1079   GNUNET_assert (GNUNET_YES != s->in_destroy);
1080   LOG (GNUNET_ERROR_TYPE_DEBUG,
1081        "Session %p to peer `%s' address ended \n",
1082          s,
1083          GNUNET_i2s (&s->target),
1084          GNUNET_a2s (s->sock_addr, s->addrlen));
1085   stop_session_timeout (s);
1086
1087   if (NULL != s->frag_ctx)
1088   {
1089     /* Remove fragmented message due to disconnect */
1090     fragmented_message_done (s->frag_ctx, GNUNET_SYSERR);
1091   }
1092
1093   next = plugin->ipv4_queue_head;
1094   while (NULL != (udpw = next))
1095   {
1096     next = udpw->next;
1097     if (udpw->session == s)
1098     {
1099       dequeue (plugin, udpw);
1100       call_continuation(udpw, GNUNET_SYSERR);
1101       GNUNET_free (udpw);
1102     }
1103   }
1104   next = plugin->ipv6_queue_head;
1105   while (NULL != (udpw = next))
1106   {
1107     next = udpw->next;
1108     if (udpw->session == s)
1109     {
1110       dequeue (plugin, udpw);
1111       call_continuation(udpw, GNUNET_SYSERR);
1112       GNUNET_free (udpw);
1113     }
1114     udpw = next;
1115   }
1116   plugin->env->session_end (plugin->env->cls, &s->target, s);
1117
1118   if (NULL != s->frag_ctx)
1119   {
1120     if (NULL != s->frag_ctx->cont)
1121     {
1122       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR,
1123                          s->frag_ctx->payload_size, s->frag_ctx->on_wire_size);
1124       LOG (GNUNET_ERROR_TYPE_DEBUG,
1125           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1126           GNUNET_i2s (&s->target));
1127     }
1128   }
1129
1130   GNUNET_assert (GNUNET_YES ==
1131                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
1132                                                        &s->target.hashPubKey,
1133                                                        s));
1134   GNUNET_STATISTICS_set(plugin->env->stats,
1135                         "# UDP, sessions active",
1136                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1137                         GNUNET_NO);
1138   if (s->rc > 0)
1139     s->in_destroy = GNUNET_YES;
1140   else
1141     free_session (s);
1142 }
1143
1144 /**
1145  * Destroy a session, plugin is being unloaded.
1146  *
1147  * @param cls unused
1148  * @param key hash of public key of target peer
1149  * @param value a 'struct PeerSession*' to clean up
1150  * @return GNUNET_OK (continue to iterate)
1151  */
1152 static int
1153 disconnect_and_free_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1154 {
1155   disconnect_session(value);
1156   return GNUNET_OK;
1157 }
1158
1159
1160 /**
1161  * Disconnect from a remote node.  Clean up session if we have one for this peer
1162  *
1163  * @param cls closure for this call (should be handle to Plugin)
1164  * @param target the peeridentity of the peer to disconnect
1165  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
1166  */
1167 static void
1168 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
1169 {
1170   struct Plugin *plugin = cls;
1171   GNUNET_assert (plugin != NULL);
1172
1173   GNUNET_assert (target != NULL);
1174   LOG (GNUNET_ERROR_TYPE_DEBUG,
1175        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
1176   /* Clean up sessions */
1177   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
1178 }
1179
1180
1181 /**
1182  * Session was idle, so disconnect it
1183  */
1184 static void
1185 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1186 {
1187   GNUNET_assert (NULL != cls);
1188   struct Session *s = cls;
1189
1190   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1191   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192               "Session %p was idle for %llu ms, disconnecting\n",
1193               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1194   /* call session destroy function */
1195   disconnect_session (s);
1196 }
1197
1198
1199 /**
1200  * Start session timeout
1201  */
1202 static void
1203 start_session_timeout (struct Session *s)
1204 {
1205   GNUNET_assert (NULL != s);
1206   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
1207   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1208                                                    &session_timeout,
1209                                                    s);
1210   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1211               "Timeout for session %p set to %llu ms\n",
1212               s,  (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1213 }
1214
1215
1216 /**
1217  * Increment session timeout due to activity
1218  */
1219 static void
1220 reschedule_session_timeout (struct Session *s)
1221 {
1222   GNUNET_assert (NULL != s);
1223   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1224
1225   GNUNET_SCHEDULER_cancel (s->timeout_task);
1226   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1227                                                    &session_timeout,
1228                                                    s);
1229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230               "Timeout rescheduled for session %p set to %llu ms\n",
1231               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1232 }
1233
1234
1235 /**
1236  * Cancel timeout
1237  */
1238 static void
1239 stop_session_timeout (struct Session *s)
1240 {
1241   GNUNET_assert (NULL != s);
1242
1243   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
1244   {
1245     GNUNET_SCHEDULER_cancel (s->timeout_task);
1246     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1247     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1248                 "Timeout stopped for session %p canceled\n",
1249                 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1250   }
1251 }
1252
1253
1254 static struct Session *
1255 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
1256                 const void *addr, size_t addrlen,
1257                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1258 {
1259   struct Session *s;
1260   const struct IPv4UdpAddress *t4;
1261   const struct IPv6UdpAddress *t6;
1262   struct sockaddr_in *v4;
1263   struct sockaddr_in6 *v6;
1264   size_t len;
1265
1266   switch (addrlen)
1267   {
1268   case sizeof (struct IPv4UdpAddress):
1269     if (NULL == plugin->sockv4)
1270     {
1271       return NULL;
1272     }
1273     t4 = addr;
1274     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
1275     len = sizeof (struct sockaddr_in);
1276     v4 = (struct sockaddr_in *) &s[1];
1277     v4->sin_family = AF_INET;
1278 #if HAVE_SOCKADDR_IN_SIN_LEN
1279     v4->sin_len = sizeof (struct sockaddr_in);
1280 #endif
1281     v4->sin_port = t4->u4_port;
1282     v4->sin_addr.s_addr = t4->ipv4_addr;
1283     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
1284     break;
1285   case sizeof (struct IPv6UdpAddress):
1286     if (NULL == plugin->sockv6)
1287     {
1288       return NULL;
1289     }
1290     t6 = addr;
1291     s =
1292         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
1293     len = sizeof (struct sockaddr_in6);
1294     v6 = (struct sockaddr_in6 *) &s[1];
1295     v6->sin6_family = AF_INET6;
1296 #if HAVE_SOCKADDR_IN_SIN_LEN
1297     v6->sin6_len = sizeof (struct sockaddr_in6);
1298 #endif
1299     v6->sin6_port = t6->u6_port;
1300     v6->sin6_addr = t6->ipv6_addr;
1301     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
1302     break;
1303   default:
1304     /* Must have a valid address to send to */
1305     GNUNET_break_op (0);
1306     return NULL;
1307   }
1308   s->addrlen = len;
1309   s->target = *target;
1310   s->sock_addr = (const struct sockaddr *) &s[1];
1311   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250);
1312   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1313   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1314   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1315   start_session_timeout (s);
1316   return s;
1317 }
1318
1319
1320 static int
1321 session_cmp_it (void *cls,
1322                 const struct GNUNET_HashCode * key,
1323                 void *value)
1324 {
1325   struct SessionCompareContext * cctx = cls;
1326   const struct GNUNET_HELLO_Address *address = cctx->addr;
1327   struct Session *s = value;
1328
1329   socklen_t s_addrlen = s->addrlen;
1330
1331   LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
1332       udp_address_to_string (NULL, (void *) address->address, address->address_length),
1333       GNUNET_a2s (s->sock_addr, s->addrlen));
1334   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
1335       (s_addrlen == sizeof (struct sockaddr_in)))
1336   {
1337     struct IPv4UdpAddress * u4 = NULL;
1338     u4 = (struct IPv4UdpAddress *) address->address;
1339     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
1340     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
1341         (u4->u4_port == s4->sin_port))
1342     {
1343       cctx->res = s;
1344       return GNUNET_NO;
1345     }
1346
1347   }
1348   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
1349       (s_addrlen == sizeof (struct sockaddr_in6)))
1350   {
1351     struct IPv6UdpAddress * u6 = NULL;
1352     u6 = (struct IPv6UdpAddress *) address->address;
1353     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
1354     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
1355         (u6->u6_port == s6->sin6_port))
1356     {
1357       cctx->res = s;
1358       return GNUNET_NO;
1359     }
1360   }
1361   return GNUNET_YES;
1362 }
1363
1364
1365 /**
1366  * Creates a new outbound session the transport service will use to send data to the
1367  * peer
1368  *
1369  * @param cls the plugin
1370  * @param address the address
1371  * @return the session or NULL of max connections exceeded
1372  */
1373 static struct Session *
1374 udp_plugin_get_session (void *cls,
1375                   const struct GNUNET_HELLO_Address *address)
1376 {
1377   struct Session * s = NULL;
1378   struct Plugin * plugin = cls;
1379   struct IPv6UdpAddress * udp_a6;
1380   struct IPv4UdpAddress * udp_a4;
1381
1382   GNUNET_assert (plugin != NULL);
1383   GNUNET_assert (address != NULL);
1384
1385
1386   if ((address->address == NULL) ||
1387       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1388       (address->address_length != sizeof (struct IPv6UdpAddress))))
1389   {
1390     GNUNET_break (0);
1391     return NULL;
1392   }
1393
1394   if (address->address_length == sizeof (struct IPv4UdpAddress))
1395   {
1396     if (plugin->sockv4 == NULL)
1397       return NULL;
1398     udp_a4 = (struct IPv4UdpAddress *) address->address;
1399     if (udp_a4->u4_port == 0)
1400       return NULL;
1401   }
1402
1403   if (address->address_length == sizeof (struct IPv6UdpAddress))
1404   {
1405     if (plugin->sockv6 == NULL)
1406       return NULL;
1407     udp_a6 = (struct IPv6UdpAddress *) address->address;
1408     if (udp_a6->u6_port == 0)
1409       return NULL;
1410   }
1411
1412   /* check if session already exists */
1413   struct SessionCompareContext cctx;
1414   cctx.addr = address;
1415   cctx.res = NULL;
1416   LOG (GNUNET_ERROR_TYPE_DEBUG,
1417        "Looking for existing session for peer `%s' `%s' \n", 
1418        GNUNET_i2s (&address->peer), 
1419        udp_address_to_string(NULL, address->address, address->address_length));
1420   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
1421   if (cctx.res != NULL)
1422   {
1423     LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
1424     return cctx.res;
1425   }
1426
1427   /* otherwise create new */
1428   s = create_session (plugin,
1429       &address->peer,
1430       address->address,
1431       address->address_length,
1432       NULL, NULL);
1433   LOG (GNUNET_ERROR_TYPE_DEBUG,
1434        "Creating new session %p for peer `%s' address `%s'\n",
1435        s,
1436        GNUNET_i2s(&address->peer),
1437        udp_address_to_string(NULL,address->address,address->address_length));
1438   GNUNET_assert (GNUNET_OK ==
1439                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
1440                                                     &s->target.hashPubKey,
1441                                                     s,
1442                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1443   GNUNET_STATISTICS_set(plugin->env->stats,
1444                         "# UDP, sessions active",
1445                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1446                         GNUNET_NO);
1447   return s;
1448 }
1449
1450
1451 static void 
1452 enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1453 {
1454   GNUNET_STATISTICS_update (plugin->env->stats,
1455                             "# UDP, total, bytes in buffers",
1456                             udpw->msg_size, GNUNET_NO);
1457   GNUNET_STATISTICS_update (plugin->env->stats,
1458                             "# UDP, total, msgs in buffers",
1459                             1, GNUNET_NO);
1460   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1461     GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head,
1462                                  plugin->ipv4_queue_tail, udpw);
1463   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1464     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1465                                  plugin->ipv6_queue_tail, udpw);
1466 }
1467
1468
1469
1470 /**
1471  * Fragment message was transmitted via UDP, let fragmentation know
1472  * to send the next fragment now.
1473  *
1474  * @param cls the 'struct UDPMessageWrapper' of the fragment
1475  * @param target destination peer (ignored)
1476  * @param result GNUNET_OK on success (ignored)
1477  * @param payload bytes payload sent
1478  * @param physical bytes physical sent
1479  */
1480 static void
1481 send_next_fragment (void *cls,
1482                     const struct GNUNET_PeerIdentity *target,
1483                     int result, size_t payload, size_t physical)
1484 {
1485   struct UDP_MessageWrapper *udpw = cls;
1486   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
1487 }
1488
1489
1490 /**
1491  * Function that is called with messages created by the fragmentation
1492  * module.  In the case of the 'proc' callback of the
1493  * GNUNET_FRAGMENT_context_create function, this function must
1494  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
1495  *
1496  * @param cls closure, the 'struct FragmentationContext'
1497  * @param msg the message that was created
1498  */
1499 static void
1500 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1501 {
1502   struct UDP_FragmentationContext *frag_ctx = cls;
1503   struct Plugin *plugin = frag_ctx->plugin;
1504   struct UDP_MessageWrapper * udpw;
1505   size_t msg_len = ntohs (msg->size);
1506  
1507   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1508        "Enqueuing fragment with %u bytes\n", msg_len);
1509   frag_ctx->fragments_used ++;
1510   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1511   udpw->session = frag_ctx->session;
1512   udpw->msg_buf = (char *) &udpw[1];
1513   udpw->msg_size = msg_len;
1514   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
1515   udpw->cont = &send_next_fragment;
1516   udpw->cont_cls = udpw;
1517   udpw->timeout = frag_ctx->timeout;
1518   udpw->frag_ctx = frag_ctx;
1519   udpw->msg_type = MSG_FRAGMENTED;
1520   memcpy (udpw->msg_buf, msg, msg_len);
1521   enqueue (plugin, udpw);
1522   schedule_select (plugin);
1523 }
1524
1525
1526 /**
1527  * Function that can be used by the transport service to transmit
1528  * a message using the plugin.   Note that in the case of a
1529  * peer disconnecting, the continuation MUST be called
1530  * prior to the disconnect notification itself.  This function
1531  * will be called with this peer's HELLO message to initiate
1532  * a fresh connection to another peer.
1533  *
1534  * @param cls closure
1535  * @param s which session must be used
1536  * @param msgbuf the message to transmit
1537  * @param msgbuf_size number of bytes in 'msgbuf'
1538  * @param priority how important is the message (most plugins will
1539  *                 ignore message priority and just FIFO)
1540  * @param to how long to wait at most for the transmission (does not
1541  *                require plugins to discard the message after the timeout,
1542  *                just advisory for the desired delay; most plugins will ignore
1543  *                this as well)
1544  * @param cont continuation to call once the message has
1545  *        been transmitted (or if the transport is ready
1546  *        for the next transmission call; or if the
1547  *        peer disconnected...); can be NULL
1548  * @param cont_cls closure for cont
1549  * @return number of bytes used (on the physical network, with overheads);
1550  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1551  *         and does NOT mean that the message was not transmitted (DV)
1552  */
1553 static ssize_t
1554 udp_plugin_send (void *cls,
1555                   struct Session *s,
1556                   const char *msgbuf, size_t msgbuf_size,
1557                   unsigned int priority,
1558                   struct GNUNET_TIME_Relative to,
1559                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1560 {
1561   struct Plugin *plugin = cls;
1562   size_t udpmlen = msgbuf_size + sizeof (struct UDPMessage);
1563   struct UDP_FragmentationContext * frag_ctx;
1564   struct UDP_MessageWrapper * udpw;
1565   struct UDPMessage *udp;
1566   char mbuf[udpmlen];
1567   GNUNET_assert (plugin != NULL);
1568   GNUNET_assert (s != NULL);
1569
1570   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1571     return GNUNET_SYSERR;
1572   if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1573     return GNUNET_SYSERR;
1574   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1575   {
1576     GNUNET_break (0);
1577     return GNUNET_SYSERR;
1578   }
1579   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1580   {
1581     GNUNET_break (0);
1582     return GNUNET_SYSERR;
1583   }
1584   LOG (GNUNET_ERROR_TYPE_DEBUG,
1585        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1586        udpmlen,
1587        GNUNET_i2s (&s->target),
1588        GNUNET_a2s(s->sock_addr, s->addrlen));
1589
1590
1591   /* Message */
1592   udp = (struct UDPMessage *) mbuf;
1593   udp->header.size = htons (udpmlen);
1594   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1595   udp->reserved = htonl (0);
1596   udp->sender = *plugin->env->my_identity;
1597
1598   reschedule_session_timeout(s);
1599   if (udpmlen <= UDP_MTU)
1600   {
1601     /* unfragmented message */
1602     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
1603     udpw->session = s;
1604     udpw->msg_buf = (char *) &udpw[1];
1605     udpw->msg_size = udpmlen; /* message size with UDP overhead */
1606     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
1607     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1608     udpw->cont = cont;
1609     udpw->cont_cls = cont_cls;
1610     udpw->frag_ctx = NULL;
1611     udpw->msg_type = MSG_UNFRAGMENTED;
1612     memcpy (udpw->msg_buf, udp, sizeof (struct UDPMessage));
1613     memcpy (&udpw->msg_buf[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1614     enqueue (plugin, udpw);
1615
1616     GNUNET_STATISTICS_update (plugin->env->stats,
1617                               "# UDP, unfragmented msgs, messages, attempt",
1618                               1, GNUNET_NO);
1619     GNUNET_STATISTICS_update (plugin->env->stats,
1620                               "# UDP, unfragmented msgs, bytes payload, attempt",
1621                               udpw->payload_size, GNUNET_NO);
1622   }
1623   else
1624   {
1625     /* fragmented message */
1626     if  (s->frag_ctx != NULL)
1627       return GNUNET_SYSERR;
1628     memcpy (&udp[1], msgbuf, msgbuf_size);
1629     frag_ctx = GNUNET_malloc (sizeof (struct UDP_FragmentationContext));
1630     frag_ctx->plugin = plugin;
1631     frag_ctx->session = s;
1632     frag_ctx->cont = cont;
1633     frag_ctx->cont_cls = cont_cls;
1634     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1635     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1636     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
1637     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1638                                                      UDP_MTU,
1639                                                      &plugin->tracker,
1640                                                      s->last_expected_msg_delay, 
1641                                                      s->last_expected_ack_delay, 
1642                                                      &udp->header,
1643                                                      &enqueue_fragment,
1644                                                      frag_ctx);    
1645     s->frag_ctx = frag_ctx;
1646     GNUNET_STATISTICS_update (plugin->env->stats,
1647                               "# UDP, fragmented msgs, messages, pending",
1648                               1, GNUNET_NO);
1649     GNUNET_STATISTICS_update (plugin->env->stats,
1650                               "# UDP, fragmented msgs, messages, attempt",
1651                               1, GNUNET_NO);
1652     GNUNET_STATISTICS_update (plugin->env->stats,
1653                               "# UDP, fragmented msgs, bytes payload, attempt",
1654                               frag_ctx->payload_size, GNUNET_NO);
1655   }
1656   schedule_select (plugin);
1657   return udpmlen;
1658 }
1659
1660
1661 /**
1662  * Our external IP address/port mapping has changed.
1663  *
1664  * @param cls closure, the 'struct LocalAddrList'
1665  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1666  *     the previous (now invalid) one
1667  * @param addr either the previous or the new public IP address
1668  * @param addrlen actual lenght of the address
1669  */
1670 static void
1671 udp_nat_port_map_callback (void *cls, int add_remove,
1672                            const struct sockaddr *addr, socklen_t addrlen)
1673 {
1674   struct Plugin *plugin = cls;
1675   struct IPv4UdpAddress u4;
1676   struct IPv6UdpAddress u6;
1677   void *arg;
1678   size_t args;
1679
1680   /* convert 'addr' to our internal format */
1681   switch (addr->sa_family)
1682   {
1683   case AF_INET:
1684     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1685     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1686     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1687     arg = &u4;
1688     args = sizeof (u4);
1689     break;
1690   case AF_INET6:
1691     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1692     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1693             sizeof (struct in6_addr));
1694     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1695     arg = &u6;
1696     args = sizeof (u6);
1697     break;
1698   default:
1699     GNUNET_break (0);
1700     return;
1701   }
1702   /* modify our published address list */
1703   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args, "udp");
1704 }
1705
1706
1707
1708 /**
1709  * Message tokenizer has broken up an incomming message. Pass it on
1710  * to the service.
1711  *
1712  * @param cls the 'struct Plugin'
1713  * @param client the 'struct SourceInformation'
1714  * @param hdr the actual message
1715  */
1716 static int
1717 process_inbound_tokenized_messages (void *cls, void *client,
1718                                     const struct GNUNET_MessageHeader *hdr)
1719 {
1720   struct Plugin *plugin = cls;
1721   struct SourceInformation *si = client;
1722   struct GNUNET_ATS_Information ats[2];
1723   struct GNUNET_TIME_Relative delay;
1724
1725   GNUNET_assert (si->session != NULL);
1726   if (GNUNET_YES == si->session->in_destroy)
1727     return GNUNET_OK;
1728   /* setup ATS */
1729   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
1730   ats[0].value = htonl (1);
1731   ats[1] = si->session->ats;
1732   GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
1733   delay = plugin->env->receive (plugin->env->cls,
1734                                 &si->sender,
1735                                 hdr,
1736                                 (const struct GNUNET_ATS_Information *) &ats, 2,
1737                                 si->session,
1738                                 si->arg,
1739                                 si->args);
1740   si->session->flow_delay_for_other_peer = delay;
1741   reschedule_session_timeout(si->session);
1742   return GNUNET_OK;
1743 }
1744
1745
1746 /**
1747  * We've received a UDP Message.  Process it (pass contents to main service).
1748  *
1749  * @param plugin plugin context
1750  * @param msg the message
1751  * @param sender_addr sender address
1752  * @param sender_addr_len number of bytes in sender_addr
1753  */
1754 static void
1755 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1756                      const struct sockaddr *sender_addr,
1757                      socklen_t sender_addr_len)
1758 {
1759   struct SourceInformation si;
1760   struct Session * s;
1761   struct IPv4UdpAddress u4;
1762   struct IPv6UdpAddress u6;
1763   const void *arg;
1764   size_t args;
1765
1766   if (0 != ntohl (msg->reserved))
1767   {
1768     GNUNET_break_op (0);
1769     return;
1770   }
1771   if (ntohs (msg->header.size) <
1772       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1773   {
1774     GNUNET_break_op (0);
1775     return;
1776   }
1777
1778   /* convert address */
1779   switch (sender_addr->sa_family)
1780   {
1781   case AF_INET:
1782     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1783     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1784     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1785     arg = &u4;
1786     args = sizeof (u4);
1787     break;
1788   case AF_INET6:
1789     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1790     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1791     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1792     arg = &u6;
1793     args = sizeof (u6);
1794     break;
1795   default:
1796     GNUNET_break (0);
1797     return;
1798   }
1799   LOG (GNUNET_ERROR_TYPE_DEBUG,
1800        "Received message with %u bytes from peer `%s' at `%s'\n",
1801        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1802        GNUNET_a2s (sender_addr, sender_addr_len));
1803
1804   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1805   s = udp_plugin_get_session(plugin, address);
1806   GNUNET_free (address);
1807
1808   /* iterate over all embedded messages */
1809   si.session = s;
1810   si.sender = msg->sender;
1811   si.arg = arg;
1812   si.args = args;
1813   s->rc++;
1814   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1815                              ntohs (msg->header.size) -
1816                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1817   s->rc--;
1818   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1819     free_session (s);
1820 }
1821
1822
1823 /**
1824  * Scan the heap for a receive context with the given address.
1825  *
1826  * @param cls the 'struct FindReceiveContext'
1827  * @param node internal node of the heap
1828  * @param element value stored at the node (a 'struct ReceiveContext')
1829  * @param cost cost associated with the node
1830  * @return GNUNET_YES if we should continue to iterate,
1831  *         GNUNET_NO if not.
1832  */
1833 static int
1834 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1835                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1836 {
1837   struct FindReceiveContext *frc = cls;
1838   struct DefragContext *e = element;
1839
1840   if ((frc->addr_len == e->addr_len) &&
1841       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1842   {
1843     frc->rc = e;
1844     return GNUNET_NO;
1845   }
1846   return GNUNET_YES;
1847 }
1848
1849
1850 /**
1851  * Process a defragmented message.
1852  *
1853  * @param cls the 'struct ReceiveContext'
1854  * @param msg the message
1855  */
1856 static void
1857 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
1858 {
1859   struct DefragContext *rc = cls;
1860
1861   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
1862   {
1863     GNUNET_break (0);
1864     return;
1865   }
1866   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1867   {
1868     GNUNET_break (0);
1869     return;
1870   }
1871   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
1872                        rc->src_addr, rc->addr_len);
1873 }
1874
1875
1876 struct LookupContext
1877 {
1878   const struct sockaddr * addr;
1879
1880   struct Session *res;
1881
1882   size_t addrlen;
1883 };
1884
1885
1886 static int
1887 lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1888 {
1889   struct LookupContext *l_ctx = cls;
1890   struct Session * s = value;
1891
1892   if ((s->addrlen == l_ctx->addrlen) &&
1893       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
1894   {
1895     l_ctx->res = s;
1896     return GNUNET_NO;
1897   }
1898   return GNUNET_YES;
1899 }
1900
1901
1902 /**
1903  * Transmit an acknowledgement.
1904  *
1905  * @param cls the 'struct ReceiveContext'
1906  * @param id message ID (unused)
1907  * @param msg ack to transmit
1908  */
1909 static void
1910 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
1911 {
1912   struct DefragContext *rc = cls;
1913   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
1914   struct UDP_ACK_Message *udp_ack;
1915   uint32_t delay = 0;
1916   struct UDP_MessageWrapper *udpw;
1917   struct Session *s;
1918   struct LookupContext l_ctx;
1919
1920   l_ctx.addr = rc->src_addr;
1921   l_ctx.addrlen = rc->addr_len;
1922   l_ctx.res = NULL;
1923   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
1924       &lookup_session_by_addr_it,
1925       &l_ctx);
1926   s = l_ctx.res;
1927
1928   if (NULL == s)
1929     return;
1930
1931   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
1932     delay = s->flow_delay_for_other_peer.rel_value;
1933
1934   LOG (GNUNET_ERROR_TYPE_DEBUG,
1935        "Sending ACK to `%s' including delay of %u ms\n",
1936        GNUNET_a2s (rc->src_addr,
1937                    (rc->src_addr->sa_family ==
1938                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
1939                                                                      sockaddr_in6)),
1940        delay);
1941   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
1942   udpw->msg_size = msize;
1943   udpw->payload_size = 0;
1944   udpw->session = s;
1945   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1946   udpw->msg_buf = (char *)&udpw[1];
1947   udpw->msg_type = MSG_ACK;
1948   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
1949   udp_ack->header.size = htons ((uint16_t) msize);
1950   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
1951   udp_ack->delay = htonl (delay);
1952   udp_ack->sender = *rc->plugin->env->my_identity;
1953   memcpy (&udp_ack[1], msg, ntohs (msg->size));
1954   enqueue (rc->plugin, udpw);
1955 }
1956
1957
1958 static void 
1959 read_process_msg (struct Plugin *plugin,
1960                   const struct GNUNET_MessageHeader *msg,
1961                   const char *addr,
1962                   socklen_t fromlen)
1963 {
1964   if (ntohs (msg->size) < sizeof (struct UDPMessage))
1965   {
1966     GNUNET_break_op (0);
1967     return;
1968   }
1969   process_udp_message (plugin, (const struct UDPMessage *) msg,
1970                        (const struct sockaddr *) addr, fromlen);
1971 }
1972
1973
1974 static void 
1975 read_process_ack (struct Plugin *plugin,
1976                   const struct GNUNET_MessageHeader *msg,
1977                   char *addr,
1978                   socklen_t fromlen)
1979 {
1980   struct UDP_MessageWrapper dummy;
1981   struct UDP_MessageWrapper *udpw;
1982   struct UDP_MessageWrapper *tmp;
1983   const struct GNUNET_MessageHeader *ack;
1984   const struct UDP_ACK_Message *udp_ack;
1985   struct LookupContext l_ctx;
1986   struct Session *s;
1987   struct GNUNET_TIME_Relative flow_delay;
1988
1989   if (ntohs (msg->size) <
1990       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
1991   {
1992     GNUNET_break_op (0);
1993     return;
1994   }
1995   udp_ack = (const struct UDP_ACK_Message *) msg;
1996   l_ctx.addr = (const struct sockaddr *) addr;
1997   l_ctx.addrlen = fromlen;
1998   l_ctx.res = NULL;
1999   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
2000                                          &lookup_session_by_addr_it,
2001                                          &l_ctx);
2002   s = l_ctx.res;
2003
2004   if ((NULL == s) || (NULL == s->frag_ctx))
2005   {
2006     return;
2007   }
2008
2009   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
2010   LOG (GNUNET_ERROR_TYPE_DEBUG, 
2011        "We received a sending delay of %llu\n",
2012        flow_delay.rel_value);
2013   s->flow_delay_from_other_peer =
2014       GNUNET_TIME_relative_to_absolute (flow_delay);
2015
2016   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2017   if (ntohs (ack->size) !=
2018       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
2019   {
2020     GNUNET_break_op (0);
2021     return;
2022   }
2023
2024   if (0 != memcmp (&l_ctx.res->target, &udp_ack->sender, sizeof (struct GNUNET_PeerIdentity)))
2025     GNUNET_break (0);
2026   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
2027   {
2028     LOG (GNUNET_ERROR_TYPE_DEBUG,
2029          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2030          (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2031          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2032     /* Expect more ACKs to arrive */
2033     return;
2034   }
2035
2036   LOG (GNUNET_ERROR_TYPE_DEBUG,
2037        "Message full ACK'ed\n",
2038        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2039        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2040
2041   /* Remove fragmented message after successful sending */
2042   fragmented_message_done (s->frag_ctx, GNUNET_OK);
2043 }
2044
2045
2046 static void 
2047 read_process_fragment (struct Plugin *plugin,
2048                        const struct GNUNET_MessageHeader *msg,
2049                        char *addr,
2050                        socklen_t fromlen)
2051 {
2052   struct DefragContext *d_ctx;
2053   struct GNUNET_TIME_Absolute now;
2054   struct FindReceiveContext frc;
2055
2056   frc.rc = NULL;
2057   frc.addr = (const struct sockaddr *) addr;
2058   frc.addr_len = fromlen;
2059
2060   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
2061        (unsigned int) ntohs (msg->size),
2062        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2063   /* Lookup existing receive context for this address */
2064   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2065                                  &find_receive_context,
2066                                  &frc);
2067   now = GNUNET_TIME_absolute_get ();
2068   d_ctx = frc.rc;
2069
2070   if (d_ctx == NULL)
2071   {
2072     /* Create a new defragmentation context */
2073     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
2074     memcpy (&d_ctx[1], addr, fromlen);
2075     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
2076     d_ctx->addr_len = fromlen;
2077     d_ctx->plugin = plugin;
2078     d_ctx->defrag =
2079         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
2080                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
2081                                           &fragment_msg_proc, &ack_proc);
2082     d_ctx->hnode =
2083         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
2084                                       (GNUNET_CONTAINER_HeapCostType)
2085                                       now.abs_value);
2086     LOG (GNUNET_ERROR_TYPE_DEBUG, 
2087          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2088          (unsigned int) ntohs (msg->size),
2089          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2090   }
2091   else
2092   {
2093     LOG (GNUNET_ERROR_TYPE_DEBUG,
2094          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2095          (unsigned int) ntohs (msg->size),
2096          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2097   }
2098
2099   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2100   {
2101     /* keep this 'rc' from expiring */
2102     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
2103                                        (GNUNET_CONTAINER_HeapCostType)
2104                                        now.abs_value);
2105   }
2106   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2107       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2108   {
2109     /* remove 'rc' that was inactive the longest */
2110     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2111     GNUNET_assert (NULL != d_ctx);
2112     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2113     GNUNET_free (d_ctx);
2114   }
2115 }
2116
2117
2118 /**
2119  * Read and process a message from the given socket.
2120  *
2121  * @param plugin the overall plugin
2122  * @param rsock socket to read from
2123  */
2124 static void
2125 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
2126 {
2127   socklen_t fromlen;
2128   char addr[32];
2129   char buf[65536] GNUNET_ALIGN;
2130   ssize_t size;
2131   const struct GNUNET_MessageHeader *msg;
2132
2133   fromlen = sizeof (addr);
2134   memset (&addr, 0, sizeof (addr));
2135   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
2136                                       (struct sockaddr *) &addr, &fromlen);
2137 #if MINGW
2138   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2139    * WSAECONNRESET error to indicate that previous sendto() (???)
2140    * on this socket has failed.
2141    */
2142   if ( (-1 == size) && (ECONNRESET == errno) )
2143     return;
2144 #endif
2145   if ( (-1 == size) || (size < sizeof (struct GNUNET_MessageHeader)))
2146   {
2147     GNUNET_break_op (0);
2148     return;
2149   }
2150   msg = (const struct GNUNET_MessageHeader *) buf;
2151
2152   LOG (GNUNET_ERROR_TYPE_DEBUG,
2153        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
2154        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
2155
2156   if (size != ntohs (msg->size))
2157   {
2158     GNUNET_break_op (0);
2159     return;
2160   }
2161
2162   GNUNET_STATISTICS_update (plugin->env->stats,
2163                             "# UDP, total, bytes, received",
2164                             size, GNUNET_NO);
2165
2166   switch (ntohs (msg->type))
2167   {
2168   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2169     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
2170     return;
2171
2172   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2173     read_process_msg (plugin, msg, addr, fromlen);
2174     return;
2175
2176   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2177     read_process_ack (plugin, msg, addr, fromlen);
2178     return;
2179
2180   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2181     read_process_fragment (plugin, msg, addr, fromlen);
2182     return;
2183
2184   default:
2185     GNUNET_break_op (0);
2186     return;
2187   }
2188 }
2189
2190 static struct UDP_MessageWrapper *
2191 remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
2192                                     struct GNUNET_NETWORK_Handle *sock)
2193 {
2194   struct UDP_MessageWrapper *udpw = NULL;
2195   struct GNUNET_TIME_Relative remaining;
2196
2197   udpw = head;
2198   while (udpw != NULL)
2199   {
2200     /* Find messages with timeout */
2201     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2202     if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2203     {
2204       /* Message timed out */
2205       call_continuation (udpw, GNUNET_SYSERR);
2206       switch (udpw->msg_type) {
2207         case MSG_UNFRAGMENTED:
2208           /* Not fragmented message */
2209           LOG (GNUNET_ERROR_TYPE_DEBUG,
2210                "Message for peer `%s' with size %u timed out\n",
2211                GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2212           /* Remove message */
2213           dequeue (plugin, udpw);
2214           GNUNET_free (udpw);
2215           break;
2216         case MSG_FRAGMENTED:
2217           /* Fragmented message */
2218           call_continuation (udpw, GNUNET_SYSERR);
2219           LOG (GNUNET_ERROR_TYPE_DEBUG,
2220                "Fragment for message for peer `%s' with size %u timed out\n",
2221                GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->payload_size);
2222
2223           /* Remove fragmented message due to timeout */
2224           fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
2225           break;
2226         case MSG_ACK:
2227           LOG (GNUNET_ERROR_TYPE_DEBUG,
2228                "ACK Message for peer `%s' with size %u timed out\n",
2229                GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2230           dequeue (plugin, udpw);
2231           GNUNET_free (udpw);
2232           break;
2233         default:
2234           break;
2235       }
2236       if (sock == plugin->sockv4)
2237         udpw = plugin->ipv4_queue_head;
2238       if (sock == plugin->sockv6)
2239         udpw = plugin->ipv6_queue_head;
2240       GNUNET_STATISTICS_update (plugin->env->stats,
2241                                 "# messages dismissed due to timeout",
2242                                 1, GNUNET_NO);
2243     }
2244     else
2245     {
2246       /* Message did not time out, check flow delay */
2247       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2248       if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2249       {
2250         /* this message is not delayed */
2251         LOG (GNUNET_ERROR_TYPE_DEBUG, 
2252              "Message for peer `%s' (%u bytes) is not delayed \n",
2253              GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2254         break; /* Found message to send, break */
2255       }
2256       else
2257       {
2258         /* Message is delayed, try next */
2259         LOG (GNUNET_ERROR_TYPE_DEBUG,
2260              "Message for peer `%s' (%u bytes) is delayed for %llu \n",
2261              GNUNET_i2s(&udpw->session->target), udpw->payload_size, remaining.rel_value);
2262         udpw = udpw->next;
2263       }
2264     }
2265   }
2266   return udpw;
2267 }
2268
2269
2270 static void
2271 analyze_send_error (struct Plugin *plugin,
2272                     const struct sockaddr * sa,
2273                     socklen_t slen,
2274                     int error)
2275 {
2276   static int network_down_error;
2277   struct GNUNET_ATS_Information type;
2278
2279  type = plugin->env->get_address_type (plugin->env->cls,sa, slen);
2280  if (((GNUNET_ATS_NET_LAN == ntohl(type.value)) || (GNUNET_ATS_NET_WAN == ntohl(type.value))) &&
2281      ((ENETUNREACH == errno) || (ENETDOWN == errno)))
2282  {
2283    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in)))
2284    {
2285      /* IPv4: "Network unreachable" or "Network down"
2286       *
2287       * This indicates we do not have connectivity
2288       */
2289      LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2290          _("UDP could not transmit message to `%s': "
2291            "Network seems down, please check your network configuration\n"),
2292          GNUNET_a2s (sa, slen));
2293    }
2294    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in6)))
2295    {
2296      /* IPv6: "Network unreachable" or "Network down"
2297       *
2298       * This indicates that this system is IPv6 enabled, but does not
2299       * have a valid global IPv6 address assigned or we do not have
2300       * connectivity
2301       */
2302
2303     LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2304         _("UDP could not transmit message to `%s': "
2305           "Please check your network configuration and disable IPv6 if your "
2306           "connection does not have a global IPv6 address\n"),
2307         GNUNET_a2s (sa, slen));
2308    }
2309  }
2310  else
2311  {
2312    LOG (GNUNET_ERROR_TYPE_WARNING,
2313       "UDP could not transmit message to `%s': `%s'\n",
2314       GNUNET_a2s (sa, slen), STRERROR (error));
2315  }
2316 }
2317
2318 static size_t
2319 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
2320 {
2321   const struct sockaddr * sa;
2322   ssize_t sent;
2323   socklen_t slen;
2324
2325   struct UDP_MessageWrapper *udpw = NULL;
2326
2327   /* Find message to send */
2328   udpw = remove_timeout_messages_and_select ((sock == plugin->sockv4) ? plugin->ipv4_queue_head : plugin->ipv6_queue_head,
2329                                              sock);
2330   if (NULL == udpw)
2331     return 0; /* No message to send */
2332
2333   sa = udpw->session->sock_addr;
2334   slen = udpw->session->addrlen;
2335
2336   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->msg_buf, udpw->msg_size, sa, slen);
2337
2338   if (GNUNET_SYSERR == sent)
2339   {
2340     /* Failure */
2341     analyze_send_error (plugin, sa, slen, errno);
2342     call_continuation(udpw, GNUNET_SYSERR);
2343     GNUNET_STATISTICS_update (plugin->env->stats,
2344                             "# UDP, total, bytes, sent, failure",
2345                             sent, GNUNET_NO);
2346     GNUNET_STATISTICS_update (plugin->env->stats,
2347                               "# UDP, total, messages, sent, failure",
2348                               1, GNUNET_NO);
2349   }
2350   else
2351   {
2352     /* Success */
2353     LOG (GNUNET_ERROR_TYPE_DEBUG,
2354          "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
2355          (unsigned int) (udpw->msg_size), GNUNET_i2s(&udpw->session->target) ,GNUNET_a2s (sa, slen), (int) sent,
2356          (sent < 0) ? STRERROR (errno) : "ok");
2357     GNUNET_STATISTICS_update (plugin->env->stats,
2358                               "# UDP, total, bytes, sent, success",
2359                               sent, GNUNET_NO);
2360     GNUNET_STATISTICS_update (plugin->env->stats,
2361                               "# UDP, total, messages, sent, success",
2362                               1, GNUNET_NO);
2363     if (NULL != udpw->frag_ctx)
2364         udpw->frag_ctx->on_wire_size += udpw->msg_size;
2365     call_continuation (udpw, GNUNET_OK);
2366   }
2367   dequeue (plugin, udpw);
2368   GNUNET_free (udpw);
2369   udpw = NULL;
2370
2371   return sent;
2372 }
2373
2374
2375 /**
2376  * We have been notified that our readset has something to read.  We don't
2377  * know which socket needs to be read, so we have to check each one
2378  * Then reschedule this function to be called again once more is available.
2379  *
2380  * @param cls the plugin handle
2381  * @param tc the scheduling context (for rescheduling this function again)
2382  */
2383 static void
2384 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2385 {
2386   struct Plugin *plugin = cls;
2387
2388   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2389   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2390     return;
2391   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
2392        (NULL != plugin->sockv4) &&
2393        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)) )
2394     udp_select_read (plugin, plugin->sockv4);
2395   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2396        (NULL != plugin->sockv4) && 
2397        (NULL != plugin->ipv4_queue_head) &&
2398        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)) )
2399     udp_select_send (plugin, plugin->sockv4);   
2400   schedule_select (plugin);
2401 }
2402
2403
2404 /**
2405  * We have been notified that our readset has something to read.  We don't
2406  * know which socket needs to be read, so we have to check each one
2407  * Then reschedule this function to be called again once more is available.
2408  *
2409  * @param cls the plugin handle
2410  * @param tc the scheduling context (for rescheduling this function again)
2411  */
2412 static void
2413 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2414 {
2415   struct Plugin *plugin = cls;
2416
2417   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2418   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2419     return;
2420   if ( ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) &&
2421        (NULL != plugin->sockv6) &&
2422        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)) )
2423     udp_select_read (plugin, plugin->sockv6);
2424   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2425        (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
2426        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )    
2427     udp_select_send (plugin, plugin->sockv6);
2428   schedule_select (plugin);
2429 }
2430
2431
2432 static int
2433 setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct sockaddr_in *serverAddrv4)
2434 {
2435   int tries;
2436   int sockets_created = 0;
2437   struct sockaddr *serverAddr;
2438   struct sockaddr *addrs[2];
2439   socklen_t addrlens[2];
2440   socklen_t addrlen;
2441
2442   /* Create IPv6 socket */
2443   if (plugin->enable_ipv6 == GNUNET_YES)
2444   {
2445     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2446     if (NULL == plugin->sockv6)
2447     {
2448       LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2449       plugin->enable_ipv6 = GNUNET_NO;
2450     }
2451     else
2452     {
2453 #if HAVE_SOCKADDR_IN_SIN_LEN
2454       serverAddrv6->sin6_len = sizeof (serverAddrv6);
2455 #endif
2456       serverAddrv6->sin6_family = AF_INET6;
2457       serverAddrv6->sin6_addr = in6addr_any;
2458       serverAddrv6->sin6_port = htons (plugin->port);
2459       addrlen = sizeof (struct sockaddr_in6);
2460       serverAddr = (struct sockaddr *) serverAddrv6;
2461       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 port %d\n",
2462            ntohs (serverAddrv6->sin6_port));
2463       tries = 0;
2464       while (GNUNET_NETWORK_socket_bind (plugin->sockv6, serverAddr, addrlen) !=
2465              GNUNET_OK)
2466       {
2467         serverAddrv6->sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);        /* Find a good, non-root port */
2468         LOG (GNUNET_ERROR_TYPE_DEBUG,
2469              "IPv6 Binding failed, trying new port %d\n",
2470              ntohs (serverAddrv6->sin6_port));
2471         tries++;
2472         if (tries > 10)
2473         {
2474           GNUNET_NETWORK_socket_close (plugin->sockv6);
2475           plugin->sockv6 = NULL;
2476           break;
2477         }
2478       }
2479       if (plugin->sockv6 != NULL)
2480       {
2481         LOG (GNUNET_ERROR_TYPE_DEBUG,
2482              "IPv6 socket created on port %d\n",
2483              ntohs (serverAddrv6->sin6_port));
2484         addrs[sockets_created] = (struct sockaddr *) serverAddrv6;
2485         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2486         sockets_created++;
2487       }
2488     }
2489   }
2490
2491   /* Create IPv4 socket */
2492   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2493   if (NULL == plugin->sockv4)
2494   {
2495     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2496   }
2497   else
2498   {
2499 #if HAVE_SOCKADDR_IN_SIN_LEN
2500     serverAddrv4->sin_len = sizeof (serverAddrv4);
2501 #endif
2502     serverAddrv4->sin_family = AF_INET;
2503     serverAddrv4->sin_addr.s_addr = INADDR_ANY;
2504     serverAddrv4->sin_port = htons (plugin->port);
2505     addrlen = sizeof (struct sockaddr_in);
2506     serverAddr = (struct sockaddr *) serverAddrv4;
2507
2508     LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 port %d\n",
2509          ntohs (serverAddrv4->sin_port));
2510     tries = 0;
2511     while (GNUNET_NETWORK_socket_bind (plugin->sockv4, serverAddr, addrlen) !=
2512            GNUNET_OK)
2513     {
2514       serverAddrv4->sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);   /* Find a good, non-root port */
2515       LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 Binding failed, trying new port %d\n",
2516            ntohs (serverAddrv4->sin_port));
2517       tries++;
2518       if (tries > 10)
2519       {
2520         GNUNET_NETWORK_socket_close (plugin->sockv4);
2521         plugin->sockv4 = NULL;
2522         break;
2523       }
2524     }
2525     if (plugin->sockv4 != NULL)
2526     {
2527       addrs[sockets_created] = (struct sockaddr *) serverAddrv4;
2528       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2529       sockets_created++;
2530     }
2531   }
2532
2533   /* Create file descriptors */
2534   plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2535   plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2536   GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2537   GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2538   if (NULL != plugin->sockv4)
2539   {
2540     GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2541     GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2542   }
2543
2544   if (0 == sockets_created)
2545     LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2546   if (plugin->enable_ipv6 == GNUNET_YES)
2547   {
2548     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2549     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2550     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2551     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2552     if (NULL != plugin->sockv6)
2553     {
2554       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2555       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2556     }
2557   }
2558   schedule_select (plugin);
2559   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2560                            GNUNET_NO, plugin->port,
2561                            sockets_created,
2562                            (const struct sockaddr **) addrs, addrlens,
2563                            &udp_nat_port_map_callback, NULL, plugin);
2564
2565   return sockets_created;
2566 }
2567
2568
2569 /**
2570  * The exported method. Makes the core api available via a global and
2571  * returns the udp transport API.
2572  *
2573  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2574  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2575  */
2576 void *
2577 libgnunet_plugin_transport_udp_init (void *cls)
2578 {
2579   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2580   struct GNUNET_TRANSPORT_PluginFunctions *api;
2581   struct Plugin *p;
2582   unsigned long long port;
2583   unsigned long long aport;
2584   unsigned long long broadcast;
2585   unsigned long long udp_max_bps;
2586   unsigned long long enable_v6;
2587   char * bind4_address;
2588   char * bind6_address;
2589   char * fancy_interval;
2590   struct GNUNET_TIME_Relative interval;
2591   struct sockaddr_in serverAddrv4;
2592   struct sockaddr_in6 serverAddrv6;
2593   int res;
2594
2595   if (NULL == env->receive)
2596   {
2597     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2598        initialze the plugin or the API */
2599     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2600     api->cls = NULL;
2601     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2602     api->address_to_string = &udp_address_to_string;
2603     api->string_to_address = &udp_string_to_address;
2604     return api;
2605   }
2606
2607   GNUNET_assert( NULL != env->stats);
2608
2609   /* Get port number */
2610   if (GNUNET_OK !=
2611       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2612                                              &port))
2613     port = 2086;
2614   if (GNUNET_OK !=
2615       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2616                                              "ADVERTISED_PORT", &aport))
2617     aport = port;
2618   if (port > 65535)
2619   {
2620     LOG (GNUNET_ERROR_TYPE_WARNING,
2621          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2622          65535);
2623     return NULL;
2624   }
2625
2626   /* Protocols */
2627   if ((GNUNET_YES ==
2628        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2629                                              "DISABLEV6")))
2630   {
2631     enable_v6 = GNUNET_NO;
2632   }
2633   else
2634     enable_v6 = GNUNET_YES;
2635
2636   /* Addresses */
2637   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2638   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2639
2640   if (GNUNET_YES ==
2641       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2642                                              "BINDTO", &bind4_address))
2643   {
2644     LOG (GNUNET_ERROR_TYPE_DEBUG,
2645          "Binding udp plugin to specific address: `%s'\n",
2646          bind4_address);
2647     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2648     {
2649       GNUNET_free (bind4_address);
2650       return NULL;
2651     }
2652   }
2653
2654   if (GNUNET_YES ==
2655       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2656                                              "BINDTO6", &bind6_address))
2657   {
2658     LOG (GNUNET_ERROR_TYPE_DEBUG,
2659          "Binding udp plugin to specific address: `%s'\n",
2660          bind6_address);
2661     if (1 !=
2662         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2663     {
2664       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2665            bind6_address);
2666       GNUNET_free_non_null (bind4_address);
2667       GNUNET_free (bind6_address);
2668       return NULL;
2669     }
2670   }
2671
2672   /* Enable neighbour discovery */
2673   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2674                                             "BROADCAST");
2675   if (broadcast == GNUNET_SYSERR)
2676     broadcast = GNUNET_NO;
2677
2678   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2679                                            "BROADCAST_INTERVAL", &fancy_interval))
2680   {
2681     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2682   }
2683   else
2684   {
2685      if (GNUNET_SYSERR == GNUNET_STRINGS_fancy_time_to_relative(fancy_interval, &interval))
2686      {
2687        interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
2688      }
2689      GNUNET_free (fancy_interval);
2690   }
2691
2692   /* Maximum datarate */
2693   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2694                                              "MAX_BPS", &udp_max_bps))
2695   {
2696     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2697   }
2698
2699   p = GNUNET_malloc (sizeof (struct Plugin));
2700   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2701
2702   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
2703                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
2704   p->sessions = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
2705   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2706   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
2707   p->port = port;
2708   p->aport = aport;
2709   p->broadcast_interval = interval;
2710   p->enable_ipv6 = enable_v6;
2711   p->env = env;
2712
2713   plugin = p;
2714
2715   api->cls = p;
2716   api->send = NULL;
2717   api->disconnect = &udp_disconnect;
2718   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2719   api->address_to_string = &udp_address_to_string;
2720   api->string_to_address = &udp_string_to_address;
2721   api->check_address = &udp_plugin_check_address;
2722   api->get_session = &udp_plugin_get_session;
2723   api->send = &udp_plugin_send;
2724
2725   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
2726   res = setup_sockets (p, &serverAddrv6, &serverAddrv4);
2727   if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
2728   {
2729     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
2730     GNUNET_free (p);
2731     GNUNET_free (api);
2732     return NULL;
2733   }
2734
2735   if (broadcast == GNUNET_YES)
2736   {
2737     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
2738     setup_broadcast (p, &serverAddrv6, &serverAddrv4);
2739   }
2740
2741   GNUNET_free_non_null (bind4_address);
2742   GNUNET_free_non_null (bind6_address);
2743   return api;
2744 }
2745
2746
2747 static int
2748 heap_cleanup_iterator (void *cls,
2749                        struct GNUNET_CONTAINER_HeapNode *
2750                        node, void *element,
2751                        GNUNET_CONTAINER_HeapCostType
2752                        cost)
2753 {
2754   struct DefragContext * d_ctx = element;
2755
2756   GNUNET_CONTAINER_heap_remove_node (node);
2757   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
2758   GNUNET_free (d_ctx);
2759
2760   return GNUNET_YES;
2761 }
2762
2763
2764 /**
2765  * The exported method. Makes the core api available via a global and
2766  * returns the udp transport API.
2767  *
2768  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2769  * @return NULL
2770  */
2771 void *
2772 libgnunet_plugin_transport_udp_done (void *cls)
2773 {
2774   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2775   struct Plugin *plugin = api->cls;
2776
2777   if (NULL == plugin)
2778   {
2779     GNUNET_free (api);
2780     return NULL;
2781   }
2782
2783   stop_broadcast (plugin);
2784   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
2785   {
2786     GNUNET_SCHEDULER_cancel (plugin->select_task);
2787     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2788   }
2789   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
2790   {
2791     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
2792     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2793   }
2794
2795   /* Closing sockets */
2796   if (plugin->sockv4 != NULL)
2797   {
2798     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
2799     plugin->sockv4 = NULL;
2800   }
2801   GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
2802   GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
2803
2804   if (plugin->sockv6 != NULL)
2805   {
2806     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
2807     plugin->sockv6 = NULL;
2808
2809     GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
2810     GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
2811   }
2812
2813   GNUNET_NAT_unregister (plugin->nat);
2814
2815   if (plugin->defrag_ctxs != NULL)
2816   {
2817     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
2818         heap_cleanup_iterator, NULL);
2819     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
2820     plugin->defrag_ctxs = NULL;
2821   }
2822   if (plugin->mst != NULL)
2823   {
2824     GNUNET_SERVER_mst_destroy(plugin->mst);
2825     plugin->mst = NULL;
2826   }
2827
2828   /* Clean up leftover messages */
2829   struct UDP_MessageWrapper * udpw;
2830   udpw = plugin->ipv4_queue_head;
2831   while (udpw != NULL)
2832   {
2833     struct UDP_MessageWrapper *tmp = udpw->next;
2834     dequeue (plugin, udpw);
2835     call_continuation(udpw, GNUNET_SYSERR);
2836     GNUNET_free (udpw);
2837
2838     udpw = tmp;
2839   }
2840   udpw = plugin->ipv6_queue_head;
2841   while (udpw != NULL)
2842   {
2843     struct UDP_MessageWrapper *tmp = udpw->next;
2844     dequeue (plugin, udpw);
2845     call_continuation(udpw, GNUNET_SYSERR);
2846     GNUNET_free (udpw);
2847
2848     udpw = tmp;
2849   }
2850
2851   /* Clean up sessions */
2852   LOG (GNUNET_ERROR_TYPE_DEBUG,
2853        "Cleaning up sessions\n");
2854   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
2855   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
2856
2857   plugin->nat = NULL;
2858   GNUNET_free (plugin);
2859   GNUNET_free (api);
2860   return NULL;
2861 }
2862
2863
2864 /* end of plugin_transport_udp.c */