Integrating bluetooth in the configuration file
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 #define PLUGIN_NAME "udp"
46
47 /**
48  * Number of messages we can defragment in parallel.  We only really
49  * defragment 1 message at a time, but if messages get re-ordered, we
50  * may want to keep knowledge about the previous message to avoid
51  * discarding the current message in favor of a single fragment of a
52  * previous message.  3 should be good since we don't expect massive
53  * message reorderings with UDP.
54  */
55 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
56
57 /**
58  * We keep a defragmentation queue per sender address.  How many
59  * sender addresses do we support at the same time? Memory consumption
60  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
61  * value. (So 128 corresponds to 12 MB and should suffice for
62  * connecting to roughly 128 peers via UDP).
63  */
64 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
65
66 /**
67  * Closure for 'append_port'.
68  */
69 struct PrettyPrinterContext
70 {
71   /**
72    * Function to call with the result.
73    */
74   GNUNET_TRANSPORT_AddressStringCallback asc;
75
76   /**
77    * Clsoure for 'asc'.
78    */
79   void *asc_cls;
80
81   /**
82    * Port to add after the IP address.
83    */
84   uint16_t port;
85
86   /**
87    * IPv6 address
88    */
89
90   int ipv6;
91
92   /**
93    * Options
94    */
95   uint32_t options;
96 };
97
98
99 enum UDP_MessageType
100 {
101   UNDEFINED = 0,
102   MSG_FRAGMENTED = 1,
103   MSG_FRAGMENTED_COMPLETE = 2,
104   MSG_UNFRAGMENTED = 3,
105   MSG_ACK = 4,
106   MSG_BEACON = 5
107 };
108
109 struct Session
110 {
111   /**
112    * Which peer is this session for?
113    */
114   struct GNUNET_PeerIdentity target;
115
116   struct UDP_FragmentationContext * frag_ctx;
117
118   /**
119    * Address of the other peer
120    */
121   const struct sockaddr *sock_addr;
122
123   /**
124    * Desired delay for next sending we send to other peer
125    */
126   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
127
128   /**
129    * Desired delay for next sending we received from other peer
130    */
131   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
132
133   /**
134    * Session timeout task
135    */
136   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
137
138   /**
139    * expected delay for ACKs
140    */
141   struct GNUNET_TIME_Relative last_expected_ack_delay;
142
143   /**
144    * desired delay between UDP messages
145    */
146   struct GNUNET_TIME_Relative last_expected_msg_delay;
147
148   struct GNUNET_ATS_Information ats;
149
150   size_t addrlen;
151
152
153   unsigned int rc;
154
155   int in_destroy;
156
157   int inbound;
158 };
159
160
161 struct SessionCompareContext
162 {
163   struct Session *res;
164   const struct GNUNET_HELLO_Address *addr;
165 };
166
167
168 /**
169  * Closure for 'process_inbound_tokenized_messages'
170  */
171 struct SourceInformation
172 {
173   /**
174    * Sender identity.
175    */
176   struct GNUNET_PeerIdentity sender;
177
178   /**
179    * Source address.
180    */
181   const void *arg;
182
183   struct Session *session;
184   /**
185    * Number of bytes in source address.
186    */
187   size_t args;
188
189 };
190
191
192 /**
193  * Closure for 'find_receive_context'.
194  */
195 struct FindReceiveContext
196 {
197   /**
198    * Where to store the result.
199    */
200   struct DefragContext *rc;
201
202   /**
203    * Address to find.
204    */
205   const struct sockaddr *addr;
206
207   struct Session *session;
208
209   /**
210    * Number of bytes in 'addr'.
211    */
212   socklen_t addr_len;
213
214 };
215
216
217
218 /**
219  * Data structure to track defragmentation contexts based
220  * on the source of the UDP traffic.
221  */
222 struct DefragContext
223 {
224
225   /**
226    * Defragmentation context.
227    */
228   struct GNUNET_DEFRAGMENT_Context *defrag;
229
230   /**
231    * Source address this receive context is for (allocated at the
232    * end of the struct).
233    */
234   const struct sockaddr *src_addr;
235
236   /**
237    * Reference to master plugin struct.
238    */
239   struct Plugin *plugin;
240
241   /**
242    * Node in the defrag heap.
243    */
244   struct GNUNET_CONTAINER_HeapNode *hnode;
245
246   /**
247    * Length of 'src_addr'
248    */
249   size_t addr_len;
250 };
251
252
253
254 /**
255  * Context to send fragmented messages
256  */
257 struct UDP_FragmentationContext
258 {
259   /**
260    * Next in linked list
261    */
262   struct UDP_FragmentationContext * next;
263
264   /**
265    * Previous in linked list
266    */
267   struct UDP_FragmentationContext * prev;
268
269   /**
270    * The plugin
271    */
272   struct Plugin * plugin;
273
274   /**
275    * Handle for GNUNET_FRAGMENT context
276    */
277   struct GNUNET_FRAGMENT_Context * frag;
278
279   /**
280    * The session this fragmentation context belongs to
281    */
282   struct Session * session;
283
284   /**
285    * Function to call upon completion of the transmission.
286    */
287   GNUNET_TRANSPORT_TransmitContinuation cont;
288
289   /**
290    * Closure for 'cont'.
291    */
292   void *cont_cls;
293
294   /**
295    * Message timeout
296    */
297   struct GNUNET_TIME_Absolute timeout;
298
299   /**
300    * Payload size of original unfragmented message
301    */
302   size_t payload_size;
303
304   /**
305    * Bytes used to send all fragments on wire including UDP overhead
306    */
307   size_t on_wire_size;
308
309   unsigned int fragments_used;
310
311 };
312
313
314 struct UDP_MessageWrapper
315 {
316   /**
317    * Session this message belongs to
318    */
319   struct Session *session;
320
321   /**
322    * DLL of messages
323    * previous element
324    */
325   struct UDP_MessageWrapper *prev;
326
327   /**
328    * DLL of messages
329    * previous element
330    */
331   struct UDP_MessageWrapper *next;
332
333   /**
334    * Message type
335    * According to UDP_MessageType
336    */
337   int msg_type;
338
339   /**
340    * Message with size msg_size including UDP specific overhead
341    */
342   char *msg_buf;
343
344   /**
345    * Size of UDP message to send including UDP specific overhead
346    */
347   size_t msg_size;
348
349   /**
350    * Payload size of original message
351    */
352   size_t payload_size;
353
354   /**
355    * Message timeout
356    */
357   struct GNUNET_TIME_Absolute timeout;
358
359   /**
360    * Function to call upon completion of the transmission.
361    */
362   GNUNET_TRANSPORT_TransmitContinuation cont;
363
364   /**
365    * Closure for 'cont'.
366    */
367   void *cont_cls;
368
369   /**
370    * Fragmentation context
371    * frag_ctx == NULL if transport <= MTU
372    * frag_ctx != NULL if transport > MTU
373    */
374   struct UDP_FragmentationContext *frag_ctx;
375 };
376
377
378 /**
379  * UDP ACK Message-Packet header (after defragmentation).
380  */
381 struct UDP_ACK_Message
382 {
383   /**
384    * Message header.
385    */
386   struct GNUNET_MessageHeader header;
387
388   /**
389    * Desired delay for flow control
390    */
391   uint32_t delay;
392
393   /**
394    * What is the identity of the sender
395    */
396   struct GNUNET_PeerIdentity sender;
397
398 };
399
400 /**
401  * Address options
402  */
403 static uint32_t myoptions;
404
405
406 /**
407  * Encapsulation of all of the state of the plugin.
408  */
409 struct Plugin * plugin;
410
411
412 /**
413  * We have been notified that our readset has something to read.  We don't
414  * know which socket needs to be read, so we have to check each one
415  * Then reschedule this function to be called again once more is available.
416  *
417  * @param cls the plugin handle
418  * @param tc the scheduling context (for rescheduling this function again)
419  */
420 static void
421 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
422
423
424 /**
425  * We have been notified that our readset has something to read.  We don't
426  * know which socket needs to be read, so we have to check each one
427  * Then reschedule this function to be called again once more is available.
428  *
429  * @param cls the plugin handle
430  * @param tc the scheduling context (for rescheduling this function again)
431  */
432 static void
433 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
434
435
436 /**
437  * Start session timeout
438  */
439 static void
440 start_session_timeout (struct Session *s);
441
442 /**
443  * Increment session timeout due to activity
444  */
445 static void
446 reschedule_session_timeout (struct Session *s);
447
448 /**
449  * Cancel timeout
450  */
451 static void
452 stop_session_timeout (struct Session *s);
453
454 /**
455  * (re)schedule select tasks for this plugin.
456  *
457  * @param plugin plugin to reschedule
458  */
459 static void
460 schedule_select (struct Plugin *plugin)
461 {
462   struct GNUNET_TIME_Relative min_delay;
463   struct UDP_MessageWrapper *udpw;
464
465   if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
466   {
467     /* Find a message ready to send:
468      * Flow delay from other peer is expired or not set (0) */
469     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
470     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
471       min_delay = GNUNET_TIME_relative_min (min_delay,
472                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
473     
474     if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
475       GNUNET_SCHEDULER_cancel(plugin->select_task);
476
477     /* Schedule with:
478      * - write active set if message is ready
479      * - timeout minimum delay */
480     plugin->select_task =
481       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
482                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
483                                    plugin->rs_v4,
484                                    (0 == min_delay.rel_value) ? plugin->ws_v4 : NULL,
485                                    &udp_plugin_select, plugin);  
486   }
487   if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
488   {
489     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
490     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
491       min_delay = GNUNET_TIME_relative_min (min_delay,
492                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
493     
494     if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task_v6)
495       GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
496     plugin->select_task_v6 =
497       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
498                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
499                                    plugin->rs_v6,
500                                    (0 == min_delay.rel_value) ? plugin->ws_v6 : NULL,
501                                    &udp_plugin_select_v6, plugin);
502   }
503 }
504
505
506 /**
507  * Function called for a quick conversion of the binary address to
508  * a numeric address.  Note that the caller must not free the
509  * address and that the next call to this function is allowed
510  * to override the address again.
511  *
512  * @param cls closure
513  * @param addr binary address
514  * @param addrlen length of the address
515  * @return string representing the same address
516  */
517 const char *
518 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
519 {
520   static char rbuf[INET6_ADDRSTRLEN + 10];
521   char buf[INET6_ADDRSTRLEN];
522   const void *sb;
523   struct in_addr a4;
524   struct in6_addr a6;
525   const struct IPv4UdpAddress *t4;
526   const struct IPv6UdpAddress *t6;
527   int af;
528   uint16_t port;
529   uint32_t options;
530
531   options = 0;
532   if (addrlen == sizeof (struct IPv6UdpAddress))
533   {
534     t6 = addr;
535     af = AF_INET6;
536     options = ntohl (t6->options);
537     port = ntohs (t6->u6_port);
538     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
539     sb = &a6;
540   }
541   else if (addrlen == sizeof (struct IPv4UdpAddress))
542   {
543     t4 = addr;
544     af = AF_INET;
545     options = ntohl (t4->options);
546     port = ntohs (t4->u4_port);
547     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
548     sb = &a4;
549   }
550   else if (addrlen == 0)
551         {
552                 GNUNET_snprintf (rbuf, sizeof (rbuf), "%s", TRANSPORT_SESSION_INBOUND_STRING);
553                 return rbuf;
554         }
555   else
556   {
557     GNUNET_break_op (0);
558     return NULL;
559   }
560   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
561
562   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "%s.%u.[%s]:%u" : "%s.%u.%s:%u",
563                    PLUGIN_NAME, options, buf, port);
564   return rbuf;
565 }
566
567
568 /**
569  * Function called to convert a string address to
570  * a binary address.
571  *
572  * @param cls closure ('struct Plugin*')
573  * @param addr string address
574  * @param addrlen length of the address
575  * @param buf location to store the buffer
576  * @param added location to store the number of bytes in the buffer.
577  *        If the function returns GNUNET_SYSERR, its contents are undefined.
578  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
579  */
580 static int
581 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
582     void **buf, size_t *added)
583 {
584   struct sockaddr_storage socket_address;
585   char *address;
586   char *plugin;
587   char *optionstr;
588   uint32_t options;
589
590   /* Format tcp.options.address:port */
591   address = NULL;
592   plugin = NULL;
593   optionstr = NULL;
594   options = 0;
595   if ((NULL == addr) || (addrlen == 0))
596   {
597     GNUNET_break (0);
598     return GNUNET_SYSERR;
599   }
600   if ('\0' != addr[addrlen - 1])
601   {
602     GNUNET_break (0);
603     return GNUNET_SYSERR;
604   }
605   if (strlen (addr) != addrlen - 1)
606   {
607     GNUNET_break (0);
608     return GNUNET_SYSERR;
609   }
610   plugin = GNUNET_strdup (addr);
611   optionstr = strchr (plugin, '.');
612   if (NULL == optionstr)
613   {
614     GNUNET_break (0);
615     GNUNET_free (plugin);
616     return GNUNET_SYSERR;
617   }
618   optionstr[0] = '\0';
619   optionstr ++;
620   options = atol (optionstr);
621   address = strchr (optionstr, '.');
622   if (NULL == address)
623   {
624     GNUNET_break (0);
625     GNUNET_free (plugin);
626     return GNUNET_SYSERR;
627   }
628   address[0] = '\0';
629   address ++;
630
631   if (GNUNET_OK !=
632       GNUNET_STRINGS_to_address_ip (address, strlen (address),
633                                     &socket_address))
634   {
635     GNUNET_break (0);
636     GNUNET_free (plugin);
637     return GNUNET_SYSERR;
638   }
639
640   GNUNET_free (plugin);
641
642   switch (socket_address.ss_family)
643   {
644   case AF_INET:
645     {
646       struct IPv4UdpAddress *u4;
647       struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
648       u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
649       u4->options =  htonl (options);
650       u4->ipv4_addr = in4->sin_addr.s_addr;
651       u4->u4_port = in4->sin_port;
652       *buf = u4;
653       *added = sizeof (struct IPv4UdpAddress);
654       return GNUNET_OK;
655     }
656   case AF_INET6:
657     {
658       struct IPv6UdpAddress *u6;
659       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
660       u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
661       u6->options =  htonl (options);
662       u6->ipv6_addr = in6->sin6_addr;
663       u6->u6_port = in6->sin6_port;
664       *buf = u6;
665       *added = sizeof (struct IPv6UdpAddress);
666       return GNUNET_OK;
667     }
668   default:
669     GNUNET_break (0);
670     return GNUNET_SYSERR;
671   }
672 }
673
674
675 /**
676  * Append our port and forward the result.
677  *
678  * @param cls a 'struct PrettyPrinterContext'
679  * @param hostname result from DNS resolver
680  */
681 static void
682 append_port (void *cls, const char *hostname)
683 {
684   struct PrettyPrinterContext *ppc = cls;
685   char *ret;
686
687   if (hostname == NULL)
688   {
689     ppc->asc (ppc->asc_cls, NULL);
690     GNUNET_free (ppc);
691     return;
692   }
693   if (GNUNET_YES == ppc->ipv6)
694     GNUNET_asprintf (&ret, "%s.%u.[%s]:%d", PLUGIN_NAME, ppc->options, hostname, ppc->port);
695   else
696     GNUNET_asprintf (&ret, "%s.%u.%s:%d", PLUGIN_NAME, ppc->options, hostname, ppc->port);
697   ppc->asc (ppc->asc_cls, ret);
698   GNUNET_free (ret);
699 }
700
701
702 /**
703  * Convert the transports address to a nice, human-readable
704  * format.
705  *
706  * @param cls closure
707  * @param type name of the transport that generated the address
708  * @param addr one of the addresses of the host, NULL for the last address
709  *        the specific address format depends on the transport
710  * @param addrlen length of the address
711  * @param numeric should (IP) addresses be displayed in numeric form?
712  * @param timeout after how long should we give up?
713  * @param asc function to call on each string
714  * @param asc_cls closure for asc
715  */
716 static void
717 udp_plugin_address_pretty_printer (void *cls, const char *type,
718                                    const void *addr, size_t addrlen,
719                                    int numeric,
720                                    struct GNUNET_TIME_Relative timeout,
721                                    GNUNET_TRANSPORT_AddressStringCallback asc,
722                                    void *asc_cls)
723 {
724   struct PrettyPrinterContext *ppc;
725   const void *sb;
726   size_t sbs;
727   struct sockaddr_in a4;
728   struct sockaddr_in6 a6;
729   const struct IPv4UdpAddress *u4;
730   const struct IPv6UdpAddress *u6;
731   uint16_t port;
732   uint32_t options;
733
734   options = 0;
735   if (addrlen == sizeof (struct IPv6UdpAddress))
736   {
737     u6 = addr;
738     memset (&a6, 0, sizeof (a6));
739     a6.sin6_family = AF_INET6;
740 #if HAVE_SOCKADDR_IN_SIN_LEN
741     a6.sin6_len = sizeof (a6);
742 #endif
743     a6.sin6_port = u6->u6_port;
744     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
745     port = ntohs (u6->u6_port);
746     options = ntohl (u6->options);
747     sb = &a6;
748     sbs = sizeof (a6);
749   }
750   else if (addrlen == sizeof (struct IPv4UdpAddress))
751   {
752     u4 = addr;
753     memset (&a4, 0, sizeof (a4));
754     a4.sin_family = AF_INET;
755 #if HAVE_SOCKADDR_IN_SIN_LEN
756     a4.sin_len = sizeof (a4);
757 #endif
758     a4.sin_port = u4->u4_port;
759     a4.sin_addr.s_addr = u4->ipv4_addr;
760     port = ntohs (u4->u4_port);
761     options = ntohl (u4->options);
762     sb = &a4;
763     sbs = sizeof (a4);
764   }
765   else if (0 == addrlen)
766   {
767     asc (asc_cls, TRANSPORT_SESSION_INBOUND_STRING);
768     asc (asc_cls, NULL);
769     return;
770   }
771   else
772   {
773     /* invalid address */
774     GNUNET_break_op (0);
775     asc (asc_cls, NULL);
776     return;
777   }
778   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
779   ppc->asc = asc;
780   ppc->asc_cls = asc_cls;
781   ppc->port = port;
782   ppc->options = options;
783   if (addrlen == sizeof (struct IPv6UdpAddress))
784     ppc->ipv6 = GNUNET_YES;
785   else
786     ppc->ipv6 = GNUNET_NO;
787   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
788 }
789
790
791 static void
792 call_continuation (struct UDP_MessageWrapper *udpw, int result)
793 {
794   size_t overhead;
795
796   LOG (GNUNET_ERROR_TYPE_DEBUG,
797       "Calling continuation for %u byte message to `%s' with result %s\n",
798       udpw->payload_size, GNUNET_i2s (&udpw->session->target),
799       (GNUNET_OK == result) ? "OK" : "SYSERR");
800
801   if (udpw->msg_size >= udpw->payload_size)
802     overhead = udpw->msg_size - udpw->payload_size;
803   else
804     overhead = udpw->msg_size;
805
806   switch (result) {
807     case GNUNET_OK:
808       switch (udpw->msg_type) {
809         case MSG_UNFRAGMENTED:
810           if (NULL != udpw->cont)
811           {
812             /* Transport continuation */
813             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
814                       udpw->payload_size, udpw->msg_size);
815           }
816           GNUNET_STATISTICS_update (plugin->env->stats,
817                                     "# UDP, unfragmented msgs, messages, sent, success",
818                                     1, GNUNET_NO);
819           GNUNET_STATISTICS_update (plugin->env->stats,
820                                     "# UDP, unfragmented msgs, bytes payload, sent, success",
821                                     udpw->payload_size, GNUNET_NO);
822           GNUNET_STATISTICS_update (plugin->env->stats,
823                                     "# UDP, unfragmented msgs, bytes overhead, sent, success",
824                                     overhead, GNUNET_NO);
825           GNUNET_STATISTICS_update (plugin->env->stats,
826                                     "# UDP, total, bytes overhead, sent",
827                                     overhead, GNUNET_NO);
828           GNUNET_STATISTICS_update (plugin->env->stats,
829                                     "# UDP, total, bytes payload, sent",
830                                     udpw->payload_size, GNUNET_NO);
831           break;
832         case MSG_FRAGMENTED_COMPLETE:
833           GNUNET_assert (NULL != udpw->frag_ctx);
834           if (udpw->frag_ctx->cont != NULL)
835             udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_OK,
836                                udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
837           GNUNET_STATISTICS_update (plugin->env->stats,
838                                     "# UDP, fragmented msgs, messages, sent, success",
839                                     1, GNUNET_NO);
840           GNUNET_STATISTICS_update (plugin->env->stats,
841                                     "# UDP, fragmented msgs, bytes payload, sent, success",
842                                     udpw->payload_size, GNUNET_NO);
843           GNUNET_STATISTICS_update (plugin->env->stats,
844                                     "# UDP, fragmented msgs, bytes overhead, sent, success",
845                                     overhead, GNUNET_NO);
846           GNUNET_STATISTICS_update (plugin->env->stats,
847                                     "# UDP, total, bytes overhead, sent",
848                                     overhead, GNUNET_NO);
849           GNUNET_STATISTICS_update (plugin->env->stats,
850                                     "# UDP, total, bytes payload, sent",
851                                     udpw->payload_size, GNUNET_NO);
852           GNUNET_STATISTICS_update (plugin->env->stats,
853                                     "# UDP, fragmented msgs, messages, pending",
854                                     -1, GNUNET_NO);
855           break;
856         case MSG_FRAGMENTED:
857           /* Fragmented message: enqueue next fragment */
858           if (NULL != udpw->cont)
859             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
860                       udpw->payload_size, udpw->msg_size);
861           GNUNET_STATISTICS_update (plugin->env->stats,
862                                     "# UDP, fragmented msgs, fragments, sent, success",
863                                     1, GNUNET_NO);
864           GNUNET_STATISTICS_update (plugin->env->stats,
865                                     "# UDP, fragmented msgs, fragments bytes, sent, success",
866                                     udpw->msg_size, GNUNET_NO);
867           break;
868         case MSG_ACK:
869           /* No continuation */
870           GNUNET_STATISTICS_update (plugin->env->stats,
871                                     "# UDP, ACK msgs, messages, sent, success",
872                                     1, GNUNET_NO);
873           GNUNET_STATISTICS_update (plugin->env->stats,
874                                     "# UDP, ACK msgs, bytes overhead, sent, success",
875                                     overhead, GNUNET_NO);
876           GNUNET_STATISTICS_update (plugin->env->stats,
877                                     "# UDP, total, bytes overhead, sent",
878                                     overhead, GNUNET_NO);
879           break;
880         case MSG_BEACON:
881           GNUNET_break (0);
882           break;
883         default:
884           LOG (GNUNET_ERROR_TYPE_ERROR,
885               "ERROR: %u\n", udpw->msg_type);
886           GNUNET_break (0);
887           break;
888       }
889       break;
890     case GNUNET_SYSERR:
891       switch (udpw->msg_type) {
892         case MSG_UNFRAGMENTED:
893           /* Unfragmented message: failed to send */
894           if (NULL != udpw->cont)
895             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
896                       udpw->payload_size, overhead);
897           GNUNET_STATISTICS_update (plugin->env->stats,
898                                   "# UDP, unfragmented msgs, messages, sent, failure",
899                                   1, GNUNET_NO);
900           GNUNET_STATISTICS_update (plugin->env->stats,
901                                     "# UDP, unfragmented msgs, bytes payload, sent, failure",
902                                     udpw->payload_size, GNUNET_NO);
903           GNUNET_STATISTICS_update (plugin->env->stats,
904                                     "# UDP, unfragmented msgs, bytes overhead, sent, failure",
905                                     overhead, GNUNET_NO);
906           break;
907         case MSG_FRAGMENTED_COMPLETE:
908           GNUNET_assert (NULL != udpw->frag_ctx);
909           if (udpw->frag_ctx->cont != NULL)
910             udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_SYSERR,
911                                udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
912           GNUNET_STATISTICS_update (plugin->env->stats,
913                                     "# UDP, fragmented msgs, messages, sent, failure",
914                                     1, GNUNET_NO);
915           GNUNET_STATISTICS_update (plugin->env->stats,
916                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
917                                     udpw->payload_size, GNUNET_NO);
918           GNUNET_STATISTICS_update (plugin->env->stats,
919                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
920                                     overhead, GNUNET_NO);
921           GNUNET_STATISTICS_update (plugin->env->stats,
922                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
923                                     overhead, GNUNET_NO);
924           GNUNET_STATISTICS_update (plugin->env->stats,
925                                     "# UDP, fragmented msgs, messages, pending",
926                                     -1, GNUNET_NO);
927           break;
928         case MSG_FRAGMENTED:
929           GNUNET_assert (NULL != udpw->frag_ctx);
930           /* Fragmented message: failed to send */
931           GNUNET_STATISTICS_update (plugin->env->stats,
932                                     "# UDP, fragmented msgs, fragments, sent, failure",
933                                     1, GNUNET_NO);
934           GNUNET_STATISTICS_update (plugin->env->stats,
935                                     "# UDP, fragmented msgs, fragments bytes, sent, failure",
936                                     udpw->msg_size, GNUNET_NO);
937           break;
938         case MSG_ACK:
939           /* ACK message: failed to send */
940           GNUNET_STATISTICS_update (plugin->env->stats,
941                                     "# UDP, ACK msgs, messages, sent, failure",
942                                     1, GNUNET_NO);
943           break;
944         case MSG_BEACON:
945           /* Beacon message: failed to send */
946           GNUNET_break (0);
947           break;
948         default:
949           GNUNET_break (0);
950           break;
951       }
952       break;
953     default:
954       GNUNET_break (0);
955       break;
956   }
957 }
958
959
960 /**
961  * Check if the given port is plausible (must be either our listen
962  * port or our advertised port).  If it is neither, we return
963  * GNUNET_SYSERR.
964  *
965  * @param plugin global variables
966  * @param in_port port number to check
967  * @return GNUNET_OK if port is either open_port or adv_port
968  */
969 static int
970 check_port (struct Plugin *plugin, uint16_t in_port)
971 {
972   if ((in_port == plugin->port) || (in_port == plugin->aport))
973     return GNUNET_OK;
974   return GNUNET_SYSERR;
975 }
976
977
978 /**
979  * Function that will be called to check if a binary address for this
980  * plugin is well-formed and corresponds to an address for THIS peer
981  * (as per our configuration).  Naturally, if absolutely necessary,
982  * plugins can be a bit conservative in their answer, but in general
983  * plugins should make sure that the address does not redirect
984  * traffic to a 3rd party that might try to man-in-the-middle our
985  * traffic.
986  *
987  * @param cls closure, should be our handle to the Plugin
988  * @param addr pointer to the address
989  * @param addrlen length of addr
990  * @return GNUNET_OK if this is a plausible address for this peer
991  *         and transport, GNUNET_SYSERR if not
992  *
993  */
994 static int
995 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
996 {
997   struct Plugin *plugin = cls;
998   struct IPv4UdpAddress *v4;
999   struct IPv6UdpAddress *v6;
1000
1001   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
1002       (addrlen != sizeof (struct IPv6UdpAddress)))
1003   {
1004     GNUNET_break_op (0);
1005     return GNUNET_SYSERR;
1006   }
1007   if (addrlen == sizeof (struct IPv4UdpAddress))
1008   {
1009     v4 = (struct IPv4UdpAddress *) addr;
1010     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1011       return GNUNET_SYSERR;
1012     if (GNUNET_OK !=
1013         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
1014                                  sizeof (struct in_addr)))
1015       return GNUNET_SYSERR;
1016   }
1017   else
1018   {
1019     v6 = (struct IPv6UdpAddress *) addr;
1020     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1021     {
1022       GNUNET_break_op (0);
1023       return GNUNET_SYSERR;
1024     }
1025     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
1026       return GNUNET_SYSERR;
1027     if (GNUNET_OK !=
1028         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
1029                                  sizeof (struct in6_addr)))
1030       return GNUNET_SYSERR;
1031   }
1032   return GNUNET_OK;
1033 }
1034
1035
1036 /**
1037  * Task to free resources associated with a session.
1038  *
1039  * @param s session to free
1040  */
1041 static void
1042 free_session (struct Session *s)
1043 {
1044   if (NULL != s->frag_ctx)
1045   {
1046     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag, NULL, NULL);
1047     GNUNET_free (s->frag_ctx);
1048     s->frag_ctx = NULL;
1049   }
1050   GNUNET_free (s);
1051 }
1052
1053
1054 static void
1055 dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1056 {
1057   if (plugin->bytes_in_buffer < udpw->msg_size)
1058       GNUNET_break (0);
1059   else
1060   {
1061     GNUNET_STATISTICS_update (plugin->env->stats,
1062                               "# UDP, total, bytes in buffers",
1063                               - (long long) udpw->msg_size, GNUNET_NO);
1064     plugin->bytes_in_buffer -= udpw->msg_size;
1065   }
1066   GNUNET_STATISTICS_update (plugin->env->stats,
1067                             "# UDP, total, msgs in buffers",
1068                             -1, GNUNET_NO);
1069   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1070     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1071                                  plugin->ipv4_queue_tail, udpw);
1072   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1073     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1074                                  plugin->ipv6_queue_tail, udpw);
1075 }
1076
1077 static void
1078 fragmented_message_done (struct UDP_FragmentationContext *fc, int result)
1079 {
1080   struct UDP_MessageWrapper *udpw;
1081   struct UDP_MessageWrapper *tmp;
1082   struct UDP_MessageWrapper dummy;
1083   struct Session *s = fc->session;
1084
1085   LOG (GNUNET_ERROR_TYPE_DEBUG, "%p : Fragmented message removed with result %s\n", fc, (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1086   
1087   /* Call continuation for fragmented message */
1088   memset (&dummy, 0, sizeof (dummy));
1089   dummy.msg_type = MSG_FRAGMENTED_COMPLETE;
1090   dummy.msg_size = s->frag_ctx->on_wire_size;
1091   dummy.payload_size = s->frag_ctx->payload_size;
1092   dummy.frag_ctx = s->frag_ctx;
1093   dummy.cont = NULL;
1094   dummy.cont_cls = NULL;
1095   dummy.session = s;
1096
1097   call_continuation (&dummy, result);
1098
1099   /* Remove leftover fragments from queue */
1100   if (s->addrlen == sizeof (struct sockaddr_in6))
1101   {
1102     udpw = plugin->ipv6_queue_head;
1103     while (NULL != udpw)
1104     {
1105       tmp = udpw->next;
1106       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1107       {
1108         dequeue (plugin, udpw);
1109         call_continuation (udpw, GNUNET_SYSERR);
1110         GNUNET_free (udpw);
1111       }
1112       udpw = tmp;
1113     }
1114   }
1115   if (s->addrlen == sizeof (struct sockaddr_in))
1116   {
1117     udpw = plugin->ipv4_queue_head;
1118     while (udpw!= NULL)
1119     {
1120       tmp = udpw->next;
1121       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
1122       {
1123         dequeue (plugin, udpw);
1124         call_continuation (udpw, GNUNET_SYSERR);
1125         GNUNET_free (udpw);
1126       }
1127       udpw = tmp;
1128     }
1129   }
1130
1131   /* Destroy fragmentation context */
1132   GNUNET_FRAGMENT_context_destroy (fc->frag,
1133                                      &s->last_expected_msg_delay,
1134                                      &s->last_expected_ack_delay);
1135   s->frag_ctx = NULL;
1136   GNUNET_free (fc );
1137 }
1138
1139 /**
1140  * Functions with this signature are called whenever we need
1141  * to close a session due to a disconnect or failure to
1142  * establish a connection.
1143  *
1144  * @param s session to close down
1145  */
1146 static void
1147 disconnect_session (struct Session *s)
1148 {
1149   struct UDP_MessageWrapper *udpw;
1150   struct UDP_MessageWrapper *next;
1151
1152   GNUNET_assert (GNUNET_YES != s->in_destroy);
1153   LOG (GNUNET_ERROR_TYPE_DEBUG,
1154        "Session %p to peer `%s' address ended \n",
1155          s,
1156          GNUNET_i2s (&s->target),
1157          GNUNET_a2s (s->sock_addr, s->addrlen));
1158   stop_session_timeout (s);
1159
1160   if (NULL != s->frag_ctx)
1161   {
1162     /* Remove fragmented message due to disconnect */
1163     fragmented_message_done (s->frag_ctx, GNUNET_SYSERR);
1164   }
1165
1166   next = plugin->ipv4_queue_head;
1167   while (NULL != (udpw = next))
1168   {
1169     next = udpw->next;
1170     if (udpw->session == s)
1171     {
1172       dequeue (plugin, udpw);
1173       call_continuation(udpw, GNUNET_SYSERR);
1174       GNUNET_free (udpw);
1175     }
1176   }
1177   next = plugin->ipv6_queue_head;
1178   while (NULL != (udpw = next))
1179   {
1180     next = udpw->next;
1181     if (udpw->session == s)
1182     {
1183       dequeue (plugin, udpw);
1184       call_continuation(udpw, GNUNET_SYSERR);
1185       GNUNET_free (udpw);
1186     }
1187     udpw = next;
1188   }
1189   plugin->env->session_end (plugin->env->cls, &s->target, s);
1190
1191   if (NULL != s->frag_ctx)
1192   {
1193     if (NULL != s->frag_ctx->cont)
1194     {
1195       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR,
1196                          s->frag_ctx->payload_size, s->frag_ctx->on_wire_size);
1197       LOG (GNUNET_ERROR_TYPE_DEBUG,
1198           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1199           GNUNET_i2s (&s->target));
1200     }
1201   }
1202
1203   GNUNET_assert (GNUNET_YES ==
1204                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
1205                                                        &s->target.hashPubKey,
1206                                                        s));
1207   GNUNET_STATISTICS_set(plugin->env->stats,
1208                         "# UDP, sessions active",
1209                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1210                         GNUNET_NO);
1211   if (s->rc > 0)
1212     s->in_destroy = GNUNET_YES;
1213   else
1214     free_session (s);
1215 }
1216
1217 /**
1218  * Destroy a session, plugin is being unloaded.
1219  *
1220  * @param cls unused
1221  * @param key hash of public key of target peer
1222  * @param value a 'struct PeerSession*' to clean up
1223  * @return GNUNET_OK (continue to iterate)
1224  */
1225 static int
1226 disconnect_and_free_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1227 {
1228   disconnect_session(value);
1229   return GNUNET_OK;
1230 }
1231
1232
1233 /**
1234  * Disconnect from a remote node.  Clean up session if we have one for this peer
1235  *
1236  * @param cls closure for this call (should be handle to Plugin)
1237  * @param target the peeridentity of the peer to disconnect
1238  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
1239  */
1240 static void
1241 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
1242 {
1243   struct Plugin *plugin = cls;
1244   GNUNET_assert (plugin != NULL);
1245
1246   GNUNET_assert (target != NULL);
1247   LOG (GNUNET_ERROR_TYPE_DEBUG,
1248        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
1249   /* Clean up sessions */
1250   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
1251 }
1252
1253
1254 /**
1255  * Session was idle, so disconnect it
1256  */
1257 static void
1258 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1259 {
1260   GNUNET_assert (NULL != cls);
1261   struct Session *s = cls;
1262
1263   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1264   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1265               "Session %p was idle for %llu ms, disconnecting\n",
1266               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1267   /* call session destroy function */
1268   disconnect_session (s);
1269 }
1270
1271
1272 /**
1273  * Start session timeout
1274  */
1275 static void
1276 start_session_timeout (struct Session *s)
1277 {
1278   GNUNET_assert (NULL != s);
1279   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
1280   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1281                                                    &session_timeout,
1282                                                    s);
1283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284               "Timeout for session %p set to %llu ms\n",
1285               s,  (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1286 }
1287
1288
1289 /**
1290  * Increment session timeout due to activity
1291  */
1292 static void
1293 reschedule_session_timeout (struct Session *s)
1294 {
1295   GNUNET_assert (NULL != s);
1296   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1297
1298   GNUNET_SCHEDULER_cancel (s->timeout_task);
1299   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1300                                                    &session_timeout,
1301                                                    s);
1302   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303               "Timeout rescheduled for session %p set to %llu ms\n",
1304               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1305 }
1306
1307
1308 /**
1309  * Cancel timeout
1310  */
1311 static void
1312 stop_session_timeout (struct Session *s)
1313 {
1314   GNUNET_assert (NULL != s);
1315
1316   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
1317   {
1318     GNUNET_SCHEDULER_cancel (s->timeout_task);
1319     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1320     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1321                 "Timeout stopped for session %p canceled\n",
1322                 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1323   }
1324 }
1325
1326
1327 static struct Session *
1328 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
1329                 const void *addr, size_t addrlen,
1330                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1331 {
1332   struct Session *s;
1333   const struct IPv4UdpAddress *t4;
1334   const struct IPv6UdpAddress *t6;
1335   struct sockaddr_in *v4;
1336   struct sockaddr_in6 *v6;
1337   size_t len;
1338
1339   switch (addrlen)
1340   {
1341   case sizeof (struct IPv4UdpAddress):
1342     if (NULL == plugin->sockv4)
1343     {
1344       LOG (GNUNET_ERROR_TYPE_DEBUG,
1345            "Could not create session for peer `%s' address `%s': IPv4 is not enabled\n",
1346            GNUNET_i2s(target),
1347            udp_address_to_string(NULL, addr, addrlen));
1348       return NULL;
1349     }
1350     t4 = addr;
1351     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
1352     len = sizeof (struct sockaddr_in);
1353     v4 = (struct sockaddr_in *) &s[1];
1354     v4->sin_family = AF_INET;
1355 #if HAVE_SOCKADDR_IN_SIN_LEN
1356     v4->sin_len = sizeof (struct sockaddr_in);
1357 #endif
1358     v4->sin_port = t4->u4_port;
1359     v4->sin_addr.s_addr = t4->ipv4_addr;
1360     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
1361     break;
1362   case sizeof (struct IPv6UdpAddress):
1363     if (NULL == plugin->sockv6)
1364     {
1365       LOG (GNUNET_ERROR_TYPE_INFO,
1366            "Could not create session for peer `%s' address `%s': IPv6 is not enabled\n",
1367            GNUNET_i2s(target),
1368            udp_address_to_string(NULL, addr, addrlen));
1369       return NULL;
1370     }
1371     t6 = addr;
1372     s =
1373         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
1374     len = sizeof (struct sockaddr_in6);
1375     v6 = (struct sockaddr_in6 *) &s[1];
1376     v6->sin6_family = AF_INET6;
1377 #if HAVE_SOCKADDR_IN_SIN_LEN
1378     v6->sin6_len = sizeof (struct sockaddr_in6);
1379 #endif
1380     v6->sin6_port = t6->u6_port;
1381     v6->sin6_addr = t6->ipv6_addr;
1382     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
1383     break;
1384   default:
1385     /* Must have a valid address to send to */
1386     GNUNET_break_op (0);
1387     return NULL;
1388   }
1389   s->addrlen = len;
1390   s->target = *target;
1391   s->sock_addr = (const struct sockaddr *) &s[1];
1392   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250);
1393   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1394   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1395   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1396   s->inbound = GNUNET_NO;
1397   start_session_timeout (s);
1398   return s;
1399 }
1400
1401
1402 static int
1403 session_cmp_it (void *cls,
1404                 const struct GNUNET_HashCode * key,
1405                 void *value)
1406 {
1407   struct SessionCompareContext * cctx = cls;
1408   const struct GNUNET_HELLO_Address *address = cctx->addr;
1409   struct Session *s = value;
1410
1411   socklen_t s_addrlen = s->addrlen;
1412
1413   LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
1414       udp_address_to_string (NULL, (void *) address->address, address->address_length),
1415       GNUNET_a2s (s->sock_addr, s->addrlen));
1416   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
1417       (s_addrlen == sizeof (struct sockaddr_in)))
1418   {
1419     struct IPv4UdpAddress * u4 = NULL;
1420     u4 = (struct IPv4UdpAddress *) address->address;
1421     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
1422     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
1423         (u4->u4_port == s4->sin_port))
1424     {
1425       cctx->res = s;
1426       return GNUNET_NO;
1427     }
1428
1429   }
1430   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
1431       (s_addrlen == sizeof (struct sockaddr_in6)))
1432   {
1433     struct IPv6UdpAddress * u6 = NULL;
1434     u6 = (struct IPv6UdpAddress *) address->address;
1435     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
1436     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
1437         (u6->u6_port == s6->sin6_port))
1438     {
1439       cctx->res = s;
1440       return GNUNET_NO;
1441     }
1442   }
1443   return GNUNET_YES;
1444 }
1445
1446
1447 /**
1448  * Function obtain the network type for a session
1449  *
1450  * @param cls closure ('struct Plugin*')
1451  * @param session the session
1452  * @return the network type in HBO or GNUNET_SYSERR
1453  */
1454 static enum GNUNET_ATS_Network_Type
1455 udp_get_network (void *cls, 
1456                  struct Session *session)
1457 {
1458   return ntohl (session->ats.value);
1459 }
1460
1461
1462 /**
1463  * Creates a new outbound session the transport service will use to send data to the
1464  * peer
1465  *
1466  * @param cls the plugin
1467  * @param address the address
1468  * @return the session or NULL of max connections exceeded
1469  */
1470 static struct Session *
1471 udp_plugin_lookup_session (void *cls,
1472                  const struct GNUNET_HELLO_Address *address)
1473 {
1474   struct Plugin * plugin = cls;
1475   struct IPv6UdpAddress * udp_a6;
1476   struct IPv4UdpAddress * udp_a4;
1477
1478   GNUNET_assert (plugin != NULL);
1479   GNUNET_assert (address != NULL);
1480
1481
1482   if ((address->address == NULL) ||
1483       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1484       (address->address_length != sizeof (struct IPv6UdpAddress))))
1485   {
1486     LOG (GNUNET_ERROR_TYPE_WARNING,
1487         _("Trying to create session for address of unexpected length %u (should be %u or %u)\n"),
1488         address->address_length,
1489         sizeof (struct IPv4UdpAddress),
1490         sizeof (struct IPv6UdpAddress));
1491     return NULL;
1492   }
1493
1494   if (address->address_length == sizeof (struct IPv4UdpAddress))
1495   {
1496     if (plugin->sockv4 == NULL)
1497       return NULL;
1498     udp_a4 = (struct IPv4UdpAddress *) address->address;
1499     if (udp_a4->u4_port == 0)
1500       return NULL;
1501   }
1502
1503   if (address->address_length == sizeof (struct IPv6UdpAddress))
1504   {
1505     if (plugin->sockv6 == NULL)
1506       return NULL;
1507     udp_a6 = (struct IPv6UdpAddress *) address->address;
1508     if (udp_a6->u6_port == 0)
1509       return NULL;
1510   }
1511
1512   /* check if session already exists */
1513   struct SessionCompareContext cctx;
1514   cctx.addr = address;
1515   cctx.res = NULL;
1516   LOG (GNUNET_ERROR_TYPE_DEBUG,
1517        "Looking for existing session for peer `%s' `%s' \n", 
1518        GNUNET_i2s (&address->peer), 
1519        udp_address_to_string(NULL, address->address, address->address_length));
1520   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
1521   if (cctx.res != NULL)
1522   {
1523     LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
1524     return cctx.res;
1525   }
1526   return NULL;
1527 }
1528
1529 static struct Session *
1530 udp_plugin_create_session (void *cls,
1531                   const struct GNUNET_HELLO_Address *address)
1532 {
1533   struct Session * s = NULL;
1534
1535   /* otherwise create new */
1536   s = create_session (plugin,
1537       &address->peer,
1538       address->address,
1539       address->address_length,
1540       NULL, NULL);
1541   if (NULL == s)
1542         return NULL; /* protocol not supported or address invalid */
1543   LOG (GNUNET_ERROR_TYPE_DEBUG,
1544        "Creating new session %p for peer `%s' address `%s'\n",
1545        s,
1546        GNUNET_i2s(&address->peer),
1547        udp_address_to_string(NULL,address->address,address->address_length));
1548   GNUNET_assert (GNUNET_OK ==
1549                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
1550                                                     &s->target.hashPubKey,
1551                                                     s,
1552                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1553   GNUNET_STATISTICS_set(plugin->env->stats,
1554                         "# UDP, sessions active",
1555                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1556                         GNUNET_NO);
1557   return s;
1558 }
1559
1560
1561
1562 /**
1563  * Creates a new outbound session the transport service will use to send data to the
1564  * peer
1565  *
1566  * @param cls the plugin
1567  * @param address the address
1568  * @return the session or NULL of max connections exceeded
1569  */
1570 static struct Session *
1571 udp_plugin_get_session (void *cls,
1572                   const struct GNUNET_HELLO_Address *address)
1573 {
1574   struct Session * s = NULL;
1575
1576   if (NULL == address)
1577   {
1578         GNUNET_break (0);
1579         return NULL;
1580   }
1581   if ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1582                 (address->address_length != sizeof (struct IPv6UdpAddress)))
1583                 return NULL;
1584
1585   /* otherwise create new */
1586   if (NULL != (s = udp_plugin_lookup_session(cls, address)))
1587         return s;
1588   else
1589         return udp_plugin_create_session (cls, address);
1590 }
1591
1592
1593 static void 
1594 enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1595 {
1596   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1597       GNUNET_break (0);
1598   else
1599   {
1600     GNUNET_STATISTICS_update (plugin->env->stats,
1601                               "# UDP, total, bytes in buffers",
1602                               udpw->msg_size, GNUNET_NO);
1603     plugin->bytes_in_buffer += udpw->msg_size;
1604   }
1605   GNUNET_STATISTICS_update (plugin->env->stats,
1606                             "# UDP, total, msgs in buffers",
1607                             1, GNUNET_NO);
1608   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1609     GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head,
1610                                  plugin->ipv4_queue_tail, udpw);
1611   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1612     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1613                                  plugin->ipv6_queue_tail, udpw);
1614 }
1615
1616
1617
1618 /**
1619  * Fragment message was transmitted via UDP, let fragmentation know
1620  * to send the next fragment now.
1621  *
1622  * @param cls the 'struct UDPMessageWrapper' of the fragment
1623  * @param target destination peer (ignored)
1624  * @param result GNUNET_OK on success (ignored)
1625  * @param payload bytes payload sent
1626  * @param physical bytes physical sent
1627  */
1628 static void
1629 send_next_fragment (void *cls,
1630                     const struct GNUNET_PeerIdentity *target,
1631                     int result, size_t payload, size_t physical)
1632 {
1633   struct UDP_MessageWrapper *udpw = cls;
1634   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
1635 }
1636
1637
1638 /**
1639  * Function that is called with messages created by the fragmentation
1640  * module.  In the case of the 'proc' callback of the
1641  * GNUNET_FRAGMENT_context_create function, this function must
1642  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
1643  *
1644  * @param cls closure, the 'struct FragmentationContext'
1645  * @param msg the message that was created
1646  */
1647 static void
1648 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1649 {
1650   struct UDP_FragmentationContext *frag_ctx = cls;
1651   struct Plugin *plugin = frag_ctx->plugin;
1652   struct UDP_MessageWrapper * udpw;
1653   size_t msg_len = ntohs (msg->size);
1654  
1655   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1656        "Enqueuing fragment with %u bytes\n", msg_len);
1657   frag_ctx->fragments_used ++;
1658   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1659   udpw->session = frag_ctx->session;
1660   udpw->msg_buf = (char *) &udpw[1];
1661   udpw->msg_size = msg_len;
1662   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
1663   udpw->cont = &send_next_fragment;
1664   udpw->cont_cls = udpw;
1665   udpw->timeout = frag_ctx->timeout;
1666   udpw->frag_ctx = frag_ctx;
1667   udpw->msg_type = MSG_FRAGMENTED;
1668   memcpy (udpw->msg_buf, msg, msg_len);
1669   enqueue (plugin, udpw);
1670   schedule_select (plugin);
1671 }
1672
1673
1674 /**
1675  * Function that can be used by the transport service to transmit
1676  * a message using the plugin.   Note that in the case of a
1677  * peer disconnecting, the continuation MUST be called
1678  * prior to the disconnect notification itself.  This function
1679  * will be called with this peer's HELLO message to initiate
1680  * a fresh connection to another peer.
1681  *
1682  * @param cls closure
1683  * @param s which session must be used
1684  * @param msgbuf the message to transmit
1685  * @param msgbuf_size number of bytes in 'msgbuf'
1686  * @param priority how important is the message (most plugins will
1687  *                 ignore message priority and just FIFO)
1688  * @param to how long to wait at most for the transmission (does not
1689  *                require plugins to discard the message after the timeout,
1690  *                just advisory for the desired delay; most plugins will ignore
1691  *                this as well)
1692  * @param cont continuation to call once the message has
1693  *        been transmitted (or if the transport is ready
1694  *        for the next transmission call; or if the
1695  *        peer disconnected...); can be NULL
1696  * @param cont_cls closure for cont
1697  * @return number of bytes used (on the physical network, with overheads);
1698  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1699  *         and does NOT mean that the message was not transmitted (DV)
1700  */
1701 static ssize_t
1702 udp_plugin_send (void *cls,
1703                   struct Session *s,
1704                   const char *msgbuf, size_t msgbuf_size,
1705                   unsigned int priority,
1706                   struct GNUNET_TIME_Relative to,
1707                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1708 {
1709   struct Plugin *plugin = cls;
1710   size_t udpmlen = msgbuf_size + sizeof (struct UDPMessage);
1711   struct UDP_FragmentationContext * frag_ctx;
1712   struct UDP_MessageWrapper * udpw;
1713   struct UDPMessage *udp;
1714   char mbuf[udpmlen];
1715   GNUNET_assert (plugin != NULL);
1716   GNUNET_assert (s != NULL);
1717
1718   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1719     return GNUNET_SYSERR;
1720   if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1721     return GNUNET_SYSERR;
1722   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1723   {
1724     GNUNET_break (0);
1725     return GNUNET_SYSERR;
1726   }
1727   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1728   {
1729     GNUNET_break (0);
1730     return GNUNET_SYSERR;
1731   }
1732   LOG (GNUNET_ERROR_TYPE_DEBUG,
1733        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1734        udpmlen,
1735        GNUNET_i2s (&s->target),
1736        GNUNET_a2s(s->sock_addr, s->addrlen));
1737
1738
1739   /* Message */
1740   udp = (struct UDPMessage *) mbuf;
1741   udp->header.size = htons (udpmlen);
1742   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1743   udp->reserved = htonl (0);
1744   udp->sender = *plugin->env->my_identity;
1745
1746   reschedule_session_timeout(s);
1747   if (udpmlen <= UDP_MTU)
1748   {
1749     /* unfragmented message */
1750     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
1751     udpw->session = s;
1752     udpw->msg_buf = (char *) &udpw[1];
1753     udpw->msg_size = udpmlen; /* message size with UDP overhead */
1754     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
1755     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1756     udpw->cont = cont;
1757     udpw->cont_cls = cont_cls;
1758     udpw->frag_ctx = NULL;
1759     udpw->msg_type = MSG_UNFRAGMENTED;
1760     memcpy (udpw->msg_buf, udp, sizeof (struct UDPMessage));
1761     memcpy (&udpw->msg_buf[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1762     enqueue (plugin, udpw);
1763
1764     GNUNET_STATISTICS_update (plugin->env->stats,
1765                               "# UDP, unfragmented msgs, messages, attempt",
1766                               1, GNUNET_NO);
1767     GNUNET_STATISTICS_update (plugin->env->stats,
1768                               "# UDP, unfragmented msgs, bytes payload, attempt",
1769                               udpw->payload_size, GNUNET_NO);
1770   }
1771   else
1772   {
1773     /* fragmented message */
1774     if  (s->frag_ctx != NULL)
1775       return GNUNET_SYSERR;
1776     memcpy (&udp[1], msgbuf, msgbuf_size);
1777     frag_ctx = GNUNET_malloc (sizeof (struct UDP_FragmentationContext));
1778     frag_ctx->plugin = plugin;
1779     frag_ctx->session = s;
1780     frag_ctx->cont = cont;
1781     frag_ctx->cont_cls = cont_cls;
1782     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1783     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1784     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
1785     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1786                                                      UDP_MTU,
1787                                                      &plugin->tracker,
1788                                                      s->last_expected_msg_delay, 
1789                                                      s->last_expected_ack_delay, 
1790                                                      &udp->header,
1791                                                      &enqueue_fragment,
1792                                                      frag_ctx);    
1793     s->frag_ctx = frag_ctx;
1794     GNUNET_STATISTICS_update (plugin->env->stats,
1795                               "# UDP, fragmented msgs, messages, pending",
1796                               1, GNUNET_NO);
1797     GNUNET_STATISTICS_update (plugin->env->stats,
1798                               "# UDP, fragmented msgs, messages, attempt",
1799                               1, GNUNET_NO);
1800     GNUNET_STATISTICS_update (plugin->env->stats,
1801                               "# UDP, fragmented msgs, bytes payload, attempt",
1802                               frag_ctx->payload_size, GNUNET_NO);
1803   }
1804   schedule_select (plugin);
1805   return udpmlen;
1806 }
1807
1808
1809 /**
1810  * Our external IP address/port mapping has changed.
1811  *
1812  * @param cls closure, the 'struct LocalAddrList'
1813  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1814  *     the previous (now invalid) one
1815  * @param addr either the previous or the new public IP address
1816  * @param addrlen actual lenght of the address
1817  */
1818 static void
1819 udp_nat_port_map_callback (void *cls, int add_remove,
1820                            const struct sockaddr *addr, socklen_t addrlen)
1821 {
1822   struct Plugin *plugin = cls;
1823   struct IPv4UdpAddress u4;
1824   struct IPv6UdpAddress u6;
1825   void *arg;
1826   size_t args;
1827
1828   LOG (GNUNET_ERROR_TYPE_INFO,
1829        "NAT notification to %s address `%s'\n",
1830        (GNUNET_YES == add_remove) ? "add" : "remove",
1831        GNUNET_a2s (addr, addrlen));
1832
1833   /* convert 'addr' to our internal format */
1834   switch (addr->sa_family)
1835   {
1836   case AF_INET:
1837     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1838     memset (&u4, 0, sizeof (u4));
1839     u4.options = htonl(myoptions);
1840     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1841     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1842     arg = &u4;
1843     args = sizeof (struct IPv4UdpAddress);
1844     break;
1845   case AF_INET6:
1846     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1847     memset (&u4, 0, sizeof (u4));
1848     u6.options = htonl(myoptions);
1849     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1850             sizeof (struct in6_addr));
1851     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1852     arg = &u6;
1853     args = sizeof (struct IPv6UdpAddress);
1854     break;
1855   default:
1856     GNUNET_break (0);
1857     return;
1858   }
1859   /* modify our published address list */
1860   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args, "udp");
1861 }
1862
1863
1864
1865 /**
1866  * Message tokenizer has broken up an incomming message. Pass it on
1867  * to the service.
1868  *
1869  * @param cls the 'struct Plugin'
1870  * @param client the 'struct SourceInformation'
1871  * @param hdr the actual message
1872  */
1873 static int
1874 process_inbound_tokenized_messages (void *cls, void *client,
1875                                     const struct GNUNET_MessageHeader *hdr)
1876 {
1877   struct Plugin *plugin = cls;
1878   struct SourceInformation *si = client;
1879   struct GNUNET_TIME_Relative delay;
1880
1881   GNUNET_assert (si->session != NULL);
1882   if (GNUNET_YES == si->session->in_destroy)
1883     return GNUNET_OK;
1884   /* setup ATS */
1885   GNUNET_break (ntohl(si->session->ats.value) != GNUNET_ATS_NET_UNSPECIFIED);
1886   delay = plugin->env->receive (plugin->env->cls,
1887                                 &si->sender,
1888                                 hdr,
1889                                 si->session,
1890                  (GNUNET_YES == si->session->inbound) ? NULL : si->arg,
1891                  (GNUNET_YES == si->session->inbound) ? 0 : si->args);
1892
1893   plugin->env->update_address_metrics (plugin->env->cls,
1894                                        &si->sender,
1895                                          (GNUNET_YES == si->session->inbound) ? NULL : si->arg,
1896                                          (GNUNET_YES == si->session->inbound) ? 0 : si->args,
1897                                        si->session,
1898                                        &si->session->ats, 1);
1899
1900   si->session->flow_delay_for_other_peer = delay;
1901   reschedule_session_timeout(si->session);
1902   return GNUNET_OK;
1903 }
1904
1905
1906 /**
1907  * We've received a UDP Message.  Process it (pass contents to main service).
1908  *
1909  * @param plugin plugin context
1910  * @param msg the message
1911  * @param sender_addr sender address
1912  * @param sender_addr_len number of bytes in sender_addr
1913  */
1914 static void
1915 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1916                      const struct sockaddr *sender_addr,
1917                      socklen_t sender_addr_len)
1918 {
1919   struct SourceInformation si;
1920   struct Session * s;
1921   struct IPv4UdpAddress u4;
1922   struct IPv6UdpAddress u6;
1923   const void *arg;
1924   size_t args;
1925
1926   if (0 != ntohl (msg->reserved))
1927   {
1928     GNUNET_break_op (0);
1929     return;
1930   }
1931   if (ntohs (msg->header.size) <
1932       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1933   {
1934     GNUNET_break_op (0);
1935     return;
1936   }
1937
1938   /* convert address */
1939   switch (sender_addr->sa_family)
1940   {
1941   case AF_INET:
1942     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1943     memset (&u4, 0, sizeof (u4));
1944     u6.options = htonl (0);
1945     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1946     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1947     arg = &u4;
1948     args = sizeof (u4);
1949     break;
1950   case AF_INET6:
1951     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1952     memset (&u6, 0, sizeof (u6));
1953     u6.options = htonl (0);
1954     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1955     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1956     arg = &u6;
1957     args = sizeof (u6);
1958     break;
1959   default:
1960     GNUNET_break (0);
1961     return;
1962   }
1963   LOG (GNUNET_ERROR_TYPE_DEBUG,
1964        "Received message with %u bytes from peer `%s' at `%s'\n",
1965        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1966        GNUNET_a2s (sender_addr, sender_addr_len));
1967
1968   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1969   if (NULL == (s = udp_plugin_lookup_session (plugin, address)))
1970   {
1971                 s = udp_plugin_create_session(plugin, address);
1972                 s->inbound = GNUNET_YES;
1973           plugin->env->session_start (NULL, &address->peer, PLUGIN_NAME,
1974                         (GNUNET_YES == s->inbound) ? NULL : address->address,
1975                         (GNUNET_YES == s->inbound) ? 0 : address->address_length,
1976                   s, NULL, 0);
1977   }
1978   GNUNET_free (address);
1979
1980   /* iterate over all embedded messages */
1981   si.session = s;
1982   si.sender = msg->sender;
1983   si.arg = arg;
1984   si.args = args;
1985   s->rc++;
1986   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1987                              ntohs (msg->header.size) -
1988                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1989   s->rc--;
1990   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1991     free_session (s);
1992 }
1993
1994
1995 /**
1996  * Scan the heap for a receive context with the given address.
1997  *
1998  * @param cls the 'struct FindReceiveContext'
1999  * @param node internal node of the heap
2000  * @param element value stored at the node (a 'struct ReceiveContext')
2001  * @param cost cost associated with the node
2002  * @return GNUNET_YES if we should continue to iterate,
2003  *         GNUNET_NO if not.
2004  */
2005 static int
2006 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
2007                       void *element, GNUNET_CONTAINER_HeapCostType cost)
2008 {
2009   struct FindReceiveContext *frc = cls;
2010   struct DefragContext *e = element;
2011
2012   if ((frc->addr_len == e->addr_len) &&
2013       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
2014   {
2015     frc->rc = e;
2016     return GNUNET_NO;
2017   }
2018   return GNUNET_YES;
2019 }
2020
2021
2022 /**
2023  * Process a defragmented message.
2024  *
2025  * @param cls the 'struct ReceiveContext'
2026  * @param msg the message
2027  */
2028 static void
2029 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
2030 {
2031   struct DefragContext *rc = cls;
2032
2033   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2034   {
2035     GNUNET_break (0);
2036     return;
2037   }
2038   if (ntohs (msg->size) < sizeof (struct UDPMessage))
2039   {
2040     GNUNET_break (0);
2041     return;
2042   }
2043   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
2044                        rc->src_addr, rc->addr_len);
2045 }
2046
2047
2048 struct LookupContext
2049 {
2050   const struct sockaddr * addr;
2051
2052   struct Session *res;
2053
2054   size_t addrlen;
2055 };
2056
2057
2058 static int
2059 lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value)
2060 {
2061   struct LookupContext *l_ctx = cls;
2062   struct Session * s = value;
2063
2064   if ((s->addrlen == l_ctx->addrlen) &&
2065       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
2066   {
2067     l_ctx->res = s;
2068     return GNUNET_NO;
2069   }
2070   return GNUNET_YES;
2071 }
2072
2073
2074 /**
2075  * Transmit an acknowledgement.
2076  *
2077  * @param cls the 'struct ReceiveContext'
2078  * @param id message ID (unused)
2079  * @param msg ack to transmit
2080  */
2081 static void
2082 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
2083 {
2084   struct DefragContext *rc = cls;
2085   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
2086   struct UDP_ACK_Message *udp_ack;
2087   uint32_t delay = 0;
2088   struct UDP_MessageWrapper *udpw;
2089   struct Session *s;
2090   struct LookupContext l_ctx;
2091
2092   l_ctx.addr = rc->src_addr;
2093   l_ctx.addrlen = rc->addr_len;
2094   l_ctx.res = NULL;
2095   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
2096       &lookup_session_by_addr_it,
2097       &l_ctx);
2098   s = l_ctx.res;
2099
2100   if (NULL == s)
2101     return;
2102
2103   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
2104     delay = s->flow_delay_for_other_peer.rel_value;
2105
2106   LOG (GNUNET_ERROR_TYPE_DEBUG,
2107        "Sending ACK to `%s' including delay of %u ms\n",
2108        GNUNET_a2s (rc->src_addr,
2109                    (rc->src_addr->sa_family ==
2110                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
2111                                                                      sockaddr_in6)),
2112        delay);
2113   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2114   udpw->msg_size = msize;
2115   udpw->payload_size = 0;
2116   udpw->session = s;
2117   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2118   udpw->msg_buf = (char *)&udpw[1];
2119   udpw->msg_type = MSG_ACK;
2120   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2121   udp_ack->header.size = htons ((uint16_t) msize);
2122   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2123   udp_ack->delay = htonl (delay);
2124   udp_ack->sender = *rc->plugin->env->my_identity;
2125   memcpy (&udp_ack[1], msg, ntohs (msg->size));
2126   enqueue (rc->plugin, udpw);
2127 }
2128
2129
2130 static void 
2131 read_process_msg (struct Plugin *plugin,
2132                   const struct GNUNET_MessageHeader *msg,
2133                   const char *addr,
2134                   socklen_t fromlen)
2135 {
2136   if (ntohs (msg->size) < sizeof (struct UDPMessage))
2137   {
2138     GNUNET_break_op (0);
2139     return;
2140   }
2141   process_udp_message (plugin, (const struct UDPMessage *) msg,
2142                        (const struct sockaddr *) addr, fromlen);
2143 }
2144
2145
2146 static void 
2147 read_process_ack (struct Plugin *plugin,
2148                   const struct GNUNET_MessageHeader *msg,
2149                   char *addr,
2150                   socklen_t fromlen)
2151 {
2152   const struct GNUNET_MessageHeader *ack;
2153   const struct UDP_ACK_Message *udp_ack;
2154   struct LookupContext l_ctx;
2155   struct Session *s;
2156   struct GNUNET_TIME_Relative flow_delay;
2157
2158   if (ntohs (msg->size) <
2159       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
2160   {
2161     GNUNET_break_op (0);
2162     return;
2163   }
2164   udp_ack = (const struct UDP_ACK_Message *) msg;
2165   l_ctx.addr = (const struct sockaddr *) addr;
2166   l_ctx.addrlen = fromlen;
2167   l_ctx.res = NULL;
2168   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
2169                                          &lookup_session_by_addr_it,
2170                                          &l_ctx);
2171   s = l_ctx.res;
2172
2173   if ((NULL == s) || (NULL == s->frag_ctx))
2174   {
2175     return;
2176   }
2177
2178   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
2179   LOG (GNUNET_ERROR_TYPE_DEBUG, 
2180        "We received a sending delay of %llu\n",
2181        flow_delay.rel_value);
2182   s->flow_delay_from_other_peer =
2183       GNUNET_TIME_relative_to_absolute (flow_delay);
2184
2185   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2186   if (ntohs (ack->size) !=
2187       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
2188   {
2189     GNUNET_break_op (0);
2190     return;
2191   }
2192
2193   if (0 != memcmp (&l_ctx.res->target, &udp_ack->sender, sizeof (struct GNUNET_PeerIdentity)))
2194     GNUNET_break (0);
2195   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
2196   {
2197     LOG (GNUNET_ERROR_TYPE_DEBUG,
2198          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2199          (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2200          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2201     /* Expect more ACKs to arrive */
2202     return;
2203   }
2204
2205   LOG (GNUNET_ERROR_TYPE_DEBUG,
2206        "Message full ACK'ed\n",
2207        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2208        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2209
2210   /* Remove fragmented message after successful sending */
2211   fragmented_message_done (s->frag_ctx, GNUNET_OK);
2212 }
2213
2214
2215 static void 
2216 read_process_fragment (struct Plugin *plugin,
2217                        const struct GNUNET_MessageHeader *msg,
2218                        char *addr,
2219                        socklen_t fromlen)
2220 {
2221   struct DefragContext *d_ctx;
2222   struct GNUNET_TIME_Absolute now;
2223   struct FindReceiveContext frc;
2224
2225   frc.rc = NULL;
2226   frc.addr = (const struct sockaddr *) addr;
2227   frc.addr_len = fromlen;
2228
2229   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
2230        (unsigned int) ntohs (msg->size),
2231        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2232   /* Lookup existing receive context for this address */
2233   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2234                                  &find_receive_context,
2235                                  &frc);
2236   now = GNUNET_TIME_absolute_get ();
2237   d_ctx = frc.rc;
2238
2239   if (d_ctx == NULL)
2240   {
2241     /* Create a new defragmentation context */
2242     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
2243     memcpy (&d_ctx[1], addr, fromlen);
2244     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
2245     d_ctx->addr_len = fromlen;
2246     d_ctx->plugin = plugin;
2247     d_ctx->defrag =
2248         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
2249                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
2250                                           &fragment_msg_proc, &ack_proc);
2251     d_ctx->hnode =
2252         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
2253                                       (GNUNET_CONTAINER_HeapCostType)
2254                                       now.abs_value);
2255     LOG (GNUNET_ERROR_TYPE_DEBUG, 
2256          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2257          (unsigned int) ntohs (msg->size),
2258          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2259   }
2260   else
2261   {
2262     LOG (GNUNET_ERROR_TYPE_DEBUG,
2263          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2264          (unsigned int) ntohs (msg->size),
2265          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2266   }
2267
2268   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2269   {
2270     /* keep this 'rc' from expiring */
2271     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
2272                                        (GNUNET_CONTAINER_HeapCostType)
2273                                        now.abs_value);
2274   }
2275   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2276       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2277   {
2278     /* remove 'rc' that was inactive the longest */
2279     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2280     GNUNET_assert (NULL != d_ctx);
2281     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2282     GNUNET_free (d_ctx);
2283   }
2284 }
2285
2286
2287 /**
2288  * Read and process a message from the given socket.
2289  *
2290  * @param plugin the overall plugin
2291  * @param rsock socket to read from
2292  */
2293 static void
2294 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
2295 {
2296   socklen_t fromlen;
2297   char addr[32];
2298   char buf[65536] GNUNET_ALIGN;
2299   ssize_t size;
2300   const struct GNUNET_MessageHeader *msg;
2301
2302   fromlen = sizeof (addr);
2303   memset (&addr, 0, sizeof (addr));
2304   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
2305                                       (struct sockaddr *) &addr, &fromlen);
2306 #if MINGW
2307   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2308    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
2309    * on this socket has failed.
2310    * Quote from MSDN:
2311    *   WSAECONNRESET - The virtual circuit was reset by the remote side
2312    *   executing a hard or abortive close. The application should close
2313    *   the socket; it is no longer usable. On a UDP-datagram socket this
2314    *   error indicates a previous send operation resulted in an ICMP Port
2315    *   Unreachable message.
2316    */
2317   if ( (-1 == size) && (ECONNRESET == errno) )
2318     return;
2319 #endif
2320   if (-1 == size)
2321   {
2322     LOG (GNUNET_ERROR_TYPE_DEBUG,
2323         "UDP failed to receive data: %s\n", STRERROR (errno));
2324     /* Connection failure or something. Not a protocol violation. */
2325     return;
2326   }
2327   if (size < sizeof (struct GNUNET_MessageHeader))
2328   {
2329     LOG (GNUNET_ERROR_TYPE_WARNING,
2330         "UDP got %u bytes, which is not enough for a GNUnet message header\n",
2331         (unsigned int) size);
2332     /* _MAY_ be a connection failure (got partial message) */
2333     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2334     GNUNET_break_op (0);
2335     return;
2336   }
2337   msg = (const struct GNUNET_MessageHeader *) buf;
2338
2339   LOG (GNUNET_ERROR_TYPE_DEBUG,
2340        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
2341        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
2342
2343   if (size != ntohs (msg->size))
2344   {
2345     GNUNET_break_op (0);
2346     return;
2347   }
2348
2349   GNUNET_STATISTICS_update (plugin->env->stats,
2350                             "# UDP, total, bytes, received",
2351                             size, GNUNET_NO);
2352
2353   switch (ntohs (msg->type))
2354   {
2355   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2356     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
2357     return;
2358
2359   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2360     read_process_msg (plugin, msg, addr, fromlen);
2361     return;
2362
2363   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2364     read_process_ack (plugin, msg, addr, fromlen);
2365     return;
2366
2367   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2368     read_process_fragment (plugin, msg, addr, fromlen);
2369     return;
2370
2371   default:
2372     GNUNET_break_op (0);
2373     return;
2374   }
2375 }
2376
2377 static struct UDP_MessageWrapper *
2378 remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
2379                                     struct GNUNET_NETWORK_Handle *sock)
2380 {
2381   struct UDP_MessageWrapper *udpw = NULL;
2382   struct GNUNET_TIME_Relative remaining;
2383
2384   udpw = head;
2385   while (udpw != NULL)
2386   {
2387     /* Find messages with timeout */
2388     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2389     if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2390     {
2391       /* Message timed out */
2392       switch (udpw->msg_type) {
2393         case MSG_UNFRAGMENTED:
2394           GNUNET_STATISTICS_update (plugin->env->stats,
2395                                     "# UDP, total, bytes, sent, timeout",
2396                                     udpw->msg_size, GNUNET_NO);
2397           GNUNET_STATISTICS_update (plugin->env->stats,
2398                                     "# UDP, total, messages, sent, timeout",
2399                                     1, GNUNET_NO);
2400           GNUNET_STATISTICS_update (plugin->env->stats,
2401                                     "# UDP, unfragmented msgs, messages, sent, timeout",
2402                                     1, GNUNET_NO);
2403           GNUNET_STATISTICS_update (plugin->env->stats,
2404                                     "# UDP, unfragmented msgs, bytes, sent, timeout",
2405                                     udpw->payload_size, GNUNET_NO);
2406           /* Not fragmented message */
2407           LOG (GNUNET_ERROR_TYPE_DEBUG,
2408                "Message for peer `%s' with size %u timed out\n",
2409                GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2410           call_continuation (udpw, GNUNET_SYSERR);
2411           /* Remove message */
2412           dequeue (plugin, udpw);
2413           GNUNET_free (udpw);
2414           break;
2415         case MSG_FRAGMENTED:
2416           /* Fragmented message */
2417           GNUNET_STATISTICS_update (plugin->env->stats,
2418                                     "# UDP, total, bytes, sent, timeout",
2419                                     udpw->frag_ctx->on_wire_size, GNUNET_NO);
2420           GNUNET_STATISTICS_update (plugin->env->stats,
2421                                     "# UDP, total, messages, sent, timeout",
2422                                     1, GNUNET_NO);
2423           call_continuation (udpw, GNUNET_SYSERR);
2424           LOG (GNUNET_ERROR_TYPE_DEBUG,
2425                "Fragment for message for peer `%s' with size %u timed out\n",
2426                GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->payload_size);
2427
2428
2429           GNUNET_STATISTICS_update (plugin->env->stats,
2430                                     "# UDP, fragmented msgs, messages, sent, timeout",
2431                                     1, GNUNET_NO);
2432           GNUNET_STATISTICS_update (plugin->env->stats,
2433                                     "# UDP, fragmented msgs, bytes, sent, timeout",
2434                                     udpw->frag_ctx->payload_size, GNUNET_NO);
2435           /* Remove fragmented message due to timeout */
2436           fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
2437           break;
2438         case MSG_ACK:
2439           GNUNET_STATISTICS_update (plugin->env->stats,
2440                                     "# UDP, total, bytes, sent, timeout",
2441                                     udpw->msg_size, GNUNET_NO);
2442           GNUNET_STATISTICS_update (plugin->env->stats,
2443                                     "# UDP, total, messages, sent, timeout",
2444                                     1, GNUNET_NO);
2445           LOG (GNUNET_ERROR_TYPE_DEBUG,
2446                "ACK Message for peer `%s' with size %u timed out\n",
2447                GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2448           call_continuation (udpw, GNUNET_SYSERR);
2449           dequeue (plugin, udpw);
2450           GNUNET_free (udpw);
2451           break;
2452         default:
2453           break;
2454       }
2455       if (sock == plugin->sockv4)
2456         udpw = plugin->ipv4_queue_head;
2457       else if (sock == plugin->sockv6)
2458         udpw = plugin->ipv6_queue_head;
2459       else
2460       {
2461         GNUNET_break (0); /* should never happen */
2462         udpw = NULL;
2463       }
2464       GNUNET_STATISTICS_update (plugin->env->stats,
2465                                 "# messages dismissed due to timeout",
2466                                 1, GNUNET_NO);
2467     }
2468     else
2469     {
2470       /* Message did not time out, check flow delay */
2471       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2472       if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2473       {
2474         /* this message is not delayed */
2475         LOG (GNUNET_ERROR_TYPE_DEBUG, 
2476              "Message for peer `%s' (%u bytes) is not delayed \n",
2477              GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2478         break; /* Found message to send, break */
2479       }
2480       else
2481       {
2482         /* Message is delayed, try next */
2483         LOG (GNUNET_ERROR_TYPE_DEBUG,
2484              "Message for peer `%s' (%u bytes) is delayed for %llu \n",
2485              GNUNET_i2s(&udpw->session->target), udpw->payload_size, remaining.rel_value);
2486         udpw = udpw->next;
2487       }
2488     }
2489   }
2490   return udpw;
2491 }
2492
2493
2494 static void
2495 analyze_send_error (struct Plugin *plugin,
2496                     const struct sockaddr * sa,
2497                     socklen_t slen,
2498                     int error)
2499 {
2500   static int network_down_error;
2501   struct GNUNET_ATS_Information type;
2502
2503  type = plugin->env->get_address_type (plugin->env->cls,sa, slen);
2504  if (((GNUNET_ATS_NET_LAN == ntohl(type.value)) || (GNUNET_ATS_NET_WAN == ntohl(type.value))) &&
2505      ((ENETUNREACH == errno) || (ENETDOWN == errno)))
2506  {
2507    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in)))
2508    {
2509      /* IPv4: "Network unreachable" or "Network down"
2510       *
2511       * This indicates we do not have connectivity
2512       */
2513      LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2514          _("UDP could not transmit message to `%s': "
2515            "Network seems down, please check your network configuration\n"),
2516          GNUNET_a2s (sa, slen));
2517    }
2518    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in6)))
2519    {
2520      /* IPv6: "Network unreachable" or "Network down"
2521       *
2522       * This indicates that this system is IPv6 enabled, but does not
2523       * have a valid global IPv6 address assigned or we do not have
2524       * connectivity
2525       */
2526
2527     LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2528         _("UDP could not transmit message to `%s': "
2529           "Please check your network configuration and disable IPv6 if your "
2530           "connection does not have a global IPv6 address\n"),
2531         GNUNET_a2s (sa, slen));
2532    }
2533  }
2534  else
2535  {
2536    LOG (GNUNET_ERROR_TYPE_WARNING,
2537       "UDP could not transmit message to `%s': `%s'\n",
2538       GNUNET_a2s (sa, slen), STRERROR (error));
2539  }
2540 }
2541
2542 static size_t
2543 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
2544 {
2545   const struct sockaddr * sa;
2546   ssize_t sent;
2547   socklen_t slen;
2548
2549   struct UDP_MessageWrapper *udpw = NULL;
2550
2551   /* Find message to send */
2552   udpw = remove_timeout_messages_and_select ((sock == plugin->sockv4) ? plugin->ipv4_queue_head : plugin->ipv6_queue_head,
2553                                              sock);
2554   if (NULL == udpw)
2555     return 0; /* No message to send */
2556
2557   sa = udpw->session->sock_addr;
2558   slen = udpw->session->addrlen;
2559
2560   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->msg_buf, udpw->msg_size, sa, slen);
2561
2562   if (GNUNET_SYSERR == sent)
2563   {
2564     /* Failure */
2565     analyze_send_error (plugin, sa, slen, errno);
2566     call_continuation(udpw, GNUNET_SYSERR);
2567     GNUNET_STATISTICS_update (plugin->env->stats,
2568                             "# UDP, total, bytes, sent, failure",
2569                             sent, GNUNET_NO);
2570     GNUNET_STATISTICS_update (plugin->env->stats,
2571                               "# UDP, total, messages, sent, failure",
2572                               1, GNUNET_NO);
2573   }
2574   else
2575   {
2576     /* Success */
2577     LOG (GNUNET_ERROR_TYPE_DEBUG,
2578          "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
2579          (unsigned int) (udpw->msg_size), GNUNET_i2s(&udpw->session->target) ,GNUNET_a2s (sa, slen), (int) sent,
2580          (sent < 0) ? STRERROR (errno) : "ok");
2581     GNUNET_STATISTICS_update (plugin->env->stats,
2582                               "# UDP, total, bytes, sent, success",
2583                               sent, GNUNET_NO);
2584     GNUNET_STATISTICS_update (plugin->env->stats,
2585                               "# UDP, total, messages, sent, success",
2586                               1, GNUNET_NO);
2587     if (NULL != udpw->frag_ctx)
2588         udpw->frag_ctx->on_wire_size += udpw->msg_size;
2589     call_continuation (udpw, GNUNET_OK);
2590   }
2591   dequeue (plugin, udpw);
2592   GNUNET_free (udpw);
2593   udpw = NULL;
2594
2595   return sent;
2596 }
2597
2598
2599 /**
2600  * We have been notified that our readset has something to read.  We don't
2601  * know which socket needs to be read, so we have to check each one
2602  * Then reschedule this function to be called again once more is available.
2603  *
2604  * @param cls the plugin handle
2605  * @param tc the scheduling context (for rescheduling this function again)
2606  */
2607 static void
2608 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2609 {
2610   struct Plugin *plugin = cls;
2611
2612   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2613   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2614     return;
2615   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
2616        (NULL != plugin->sockv4) &&
2617        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)) )
2618     udp_select_read (plugin, plugin->sockv4);
2619   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2620        (NULL != plugin->sockv4) && 
2621        (NULL != plugin->ipv4_queue_head) &&
2622        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)) )
2623     udp_select_send (plugin, plugin->sockv4);   
2624   schedule_select (plugin);
2625 }
2626
2627
2628 /**
2629  * We have been notified that our readset has something to read.  We don't
2630  * know which socket needs to be read, so we have to check each one
2631  * Then reschedule this function to be called again once more is available.
2632  *
2633  * @param cls the plugin handle
2634  * @param tc the scheduling context (for rescheduling this function again)
2635  */
2636 static void
2637 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2638 {
2639   struct Plugin *plugin = cls;
2640
2641   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2642   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2643     return;
2644   if ( ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) &&
2645        (NULL != plugin->sockv6) &&
2646        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)) )
2647     udp_select_read (plugin, plugin->sockv6);
2648   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2649        (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
2650        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )    
2651     udp_select_send (plugin, plugin->sockv6);
2652   schedule_select (plugin);
2653 }
2654
2655
2656 /**
2657  *
2658  * @return number of sockets that were successfully bound
2659  */
2660 static int
2661 setup_sockets (struct Plugin *plugin, 
2662                const struct sockaddr_in6 *bind_v6,
2663                const struct sockaddr_in *bind_v4)
2664 {
2665   int tries;
2666   int sockets_created = 0;
2667   struct sockaddr_in6 serverAddrv6;
2668   struct sockaddr_in serverAddrv4;
2669   struct sockaddr *serverAddr;
2670   struct sockaddr *addrs[2];
2671   socklen_t addrlens[2];
2672   socklen_t addrlen;
2673   int eno;
2674
2675   /* Create IPv6 socket */
2676   eno = EINVAL;
2677   if (plugin->enable_ipv6 == GNUNET_YES)
2678   {
2679     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2680     if (NULL == plugin->sockv6)
2681     {
2682         GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2683       LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2684       plugin->enable_ipv6 = GNUNET_NO;
2685     }
2686     else
2687     {
2688         memset (&serverAddrv6, '\0', sizeof (struct sockaddr_in6));
2689 #if HAVE_SOCKADDR_IN_SIN_LEN
2690       serverAddrv6.sin6_len = sizeof (struct sockaddr_in6);
2691 #endif
2692       serverAddrv6.sin6_family = AF_INET6;
2693       if (NULL != bind_v6)
2694         serverAddrv6.sin6_addr = bind_v6->sin6_addr;
2695       else
2696         serverAddrv6.sin6_addr = in6addr_any;
2697
2698       if (0 == plugin->port) /* autodetect */
2699                 serverAddrv6.sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2700       else
2701                 serverAddrv6.sin6_port = htons (plugin->port);
2702       addrlen = sizeof (struct sockaddr_in6);
2703       serverAddr = (struct sockaddr *) &serverAddrv6;
2704
2705       tries = 0;
2706       while (tries < 10)
2707       {
2708                 LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 `%s'\n",
2709                                  GNUNET_a2s (serverAddr, addrlen));
2710                 /* binding */
2711                 if (GNUNET_OK == GNUNET_NETWORK_socket_bind (plugin->sockv6,
2712                                                              serverAddr, addrlen, 0))
2713                         break;
2714                 eno = errno;
2715                 if (0 != plugin->port)
2716                 {
2717                                 tries = 10; /* fail */
2718                                 break; /* bind failed on specific port */
2719                 }
2720                 /* autodetect */
2721                 serverAddrv6.sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2722                 tries ++;
2723       }
2724       if (tries >= 10)
2725       {
2726         GNUNET_NETWORK_socket_close (plugin->sockv6);
2727         plugin->enable_ipv6 = GNUNET_NO;
2728         plugin->sockv6 = NULL;
2729       }
2730
2731       if (plugin->sockv6 != NULL)
2732       {
2733         LOG (GNUNET_ERROR_TYPE_DEBUG,
2734              "IPv6 socket created on port %s\n",
2735              GNUNET_a2s (serverAddr, addrlen));
2736         addrs[sockets_created] = (struct sockaddr *) &serverAddrv6;
2737         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2738         sockets_created++;
2739       }
2740       else
2741       {
2742           LOG (GNUNET_ERROR_TYPE_ERROR,
2743                "Failed to bind UDP socket to %s: %s\n",
2744                GNUNET_a2s (serverAddr, addrlen),
2745                STRERROR (eno));
2746       }
2747     }
2748   }
2749
2750   /* Create IPv4 socket */
2751   eno = EINVAL;
2752   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2753   if (NULL == plugin->sockv4)
2754   {
2755     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2756     LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv4 since it is not supported on this system!\n");
2757     plugin->enable_ipv4 = GNUNET_NO;
2758   }
2759   else
2760   {
2761     memset (&serverAddrv4, '\0', sizeof (struct sockaddr_in));
2762 #if HAVE_SOCKADDR_IN_SIN_LEN
2763     serverAddrv4.sin_len = sizeof (struct sockaddr_in);
2764 #endif
2765     serverAddrv4.sin_family = AF_INET;
2766     if (NULL != bind_v4)
2767       serverAddrv4.sin_addr = bind_v4->sin_addr;
2768     else
2769       serverAddrv4.sin_addr.s_addr = INADDR_ANY;
2770     
2771     if (0 == plugin->port)
2772       /* autodetect */
2773       serverAddrv4.sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2774     else
2775       serverAddrv4.sin_port = htons (plugin->port);
2776     
2777     
2778     addrlen = sizeof (struct sockaddr_in);
2779     serverAddr = (struct sockaddr *) &serverAddrv4;
2780     
2781     tries = 0;
2782     while (tries < 10)
2783     {
2784       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 `%s'\n",
2785                         GNUNET_a2s (serverAddr, addrlen));
2786       
2787       /* binding */
2788       if (GNUNET_OK == GNUNET_NETWORK_socket_bind (plugin->sockv4,
2789                                                    serverAddr, addrlen, 0))
2790                 break;
2791       eno = errno;
2792       if (0 != plugin->port)
2793       {
2794                 tries = 10; /* fail */
2795                 break; /* bind failed on specific port */
2796       }
2797       
2798       /* autodetect */
2799       serverAddrv4.sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2800       tries ++;
2801     }
2802     
2803     if (tries >= 10)
2804     {
2805       GNUNET_NETWORK_socket_close (plugin->sockv4);
2806       plugin->enable_ipv4 = GNUNET_NO;
2807       plugin->sockv4 = NULL;
2808     }
2809     
2810     if (plugin->sockv4 != NULL)
2811     {
2812       LOG (GNUNET_ERROR_TYPE_DEBUG,
2813                 "IPv4 socket created on port %s\n", GNUNET_a2s (serverAddr, addrlen));
2814       addrs[sockets_created] = (struct sockaddr *) &serverAddrv4;
2815       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2816       sockets_created++;
2817     }
2818     else
2819     {             
2820       LOG (GNUNET_ERROR_TYPE_ERROR,
2821                 "Failed to bind UDP socket to %s: %s\n",
2822                 GNUNET_a2s (serverAddr, addrlen), STRERROR (eno));
2823     }
2824   }
2825   
2826   if (0 == sockets_created)
2827   {
2828                 LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2829                 return 0; /* No sockets created, return */
2830   }
2831
2832   /* Create file descriptors */
2833   if (plugin->enable_ipv4 == GNUNET_YES)
2834   {
2835                         plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2836                         plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2837                         GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2838                         GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2839                         if (NULL != plugin->sockv4)
2840                         {
2841                                 GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2842                                 GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2843                         }
2844   }
2845
2846   if (plugin->enable_ipv6 == GNUNET_YES)
2847   {
2848     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2849     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2850     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2851     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2852     if (NULL != plugin->sockv6)
2853     {
2854       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2855       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2856     }
2857   }
2858
2859   schedule_select (plugin);
2860   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2861                            GNUNET_NO, plugin->port,
2862                            sockets_created,
2863                            (const struct sockaddr **) addrs, addrlens,
2864                            &udp_nat_port_map_callback, NULL, plugin);
2865
2866   return sockets_created;
2867 }
2868
2869
2870 /**
2871  * The exported method. Makes the core api available via a global and
2872  * returns the udp transport API.
2873  *
2874  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2875  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2876  */
2877 void *
2878 libgnunet_plugin_transport_udp_init (void *cls)
2879 {
2880   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2881   struct GNUNET_TRANSPORT_PluginFunctions *api;
2882   struct Plugin *p;
2883   unsigned long long port;
2884   unsigned long long aport;
2885   unsigned long long broadcast;
2886   unsigned long long udp_max_bps;
2887   unsigned long long enable_v6;
2888   char * bind4_address;
2889   char * bind6_address;
2890   char * fancy_interval;
2891   struct GNUNET_TIME_Relative interval;
2892   struct sockaddr_in serverAddrv4;
2893   struct sockaddr_in6 serverAddrv6;
2894   int res;
2895   int have_bind4;
2896   int have_bind6;
2897
2898   if (NULL == env->receive)
2899   {
2900     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2901        initialze the plugin or the API */
2902     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2903     api->cls = NULL;
2904     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2905     api->address_to_string = &udp_address_to_string;
2906     api->string_to_address = &udp_string_to_address;
2907     return api;
2908   }
2909
2910   GNUNET_assert (NULL != env->stats);
2911
2912   /* Get port number: port == 0 : autodetect a port,
2913    *                                                                                            > 0 : use this port,
2914    *                                                              not given : 2086 default */
2915   if (GNUNET_OK !=
2916       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2917                                              &port))
2918     port = 2086;
2919   if (GNUNET_OK !=
2920       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2921                                              "ADVERTISED_PORT", &aport))
2922     aport = port;
2923   if (port > 65535)
2924   {
2925     LOG (GNUNET_ERROR_TYPE_WARNING,
2926          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2927          65535);
2928     return NULL;
2929   }
2930
2931   /* Protocols */
2932   if ((GNUNET_YES ==
2933        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2934                                              "DISABLEV6")))
2935     enable_v6 = GNUNET_NO;
2936   else
2937     enable_v6 = GNUNET_YES;
2938
2939   /* Addresses */
2940   have_bind4 = GNUNET_NO;
2941   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2942   if (GNUNET_YES ==
2943       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2944                                              "BINDTO", &bind4_address))
2945   {
2946     LOG (GNUNET_ERROR_TYPE_DEBUG,
2947          "Binding udp plugin to specific address: `%s'\n",
2948          bind4_address);
2949     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2950     {
2951       GNUNET_free (bind4_address);
2952       return NULL;
2953     }
2954     have_bind4 = GNUNET_YES;
2955   }
2956   GNUNET_free_non_null (bind4_address);
2957   have_bind6 = GNUNET_NO;
2958   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2959   if (GNUNET_YES ==
2960       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2961                                              "BINDTO6", &bind6_address))
2962   {
2963     LOG (GNUNET_ERROR_TYPE_DEBUG,
2964          "Binding udp plugin to specific address: `%s'\n",
2965          bind6_address);
2966     if (1 !=
2967         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2968     {
2969       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2970            bind6_address);
2971       GNUNET_free (bind6_address);
2972       return NULL;
2973     }
2974     have_bind6 = GNUNET_YES;
2975   }
2976   GNUNET_free_non_null (bind6_address);
2977
2978   /* Initialize my flags */
2979   myoptions = 0;
2980
2981   /* Enable neighbour discovery */
2982   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2983                                             "BROADCAST");
2984   if (broadcast == GNUNET_SYSERR)
2985     broadcast = GNUNET_NO;
2986
2987   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2988                                            "BROADCAST_INTERVAL", &fancy_interval))
2989   {
2990     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2991   }
2992   else
2993   {
2994      if (GNUNET_SYSERR == GNUNET_STRINGS_fancy_time_to_relative(fancy_interval, &interval))
2995      {
2996        interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
2997      }
2998      GNUNET_free (fancy_interval);
2999   }
3000
3001   /* Maximum datarate */
3002   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
3003                                              "MAX_BPS", &udp_max_bps))
3004   {
3005     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
3006   }
3007
3008   p = GNUNET_malloc (sizeof (struct Plugin));
3009   p->port = port;
3010   p->aport = aport;
3011   p->broadcast_interval = interval;
3012   p->enable_ipv6 = enable_v6;
3013   p->enable_ipv4 = GNUNET_YES; /* default */
3014   p->env = env;
3015   p->sessions = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
3016   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3017   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
3018   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3019                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
3020   plugin = p;
3021
3022   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
3023   res = setup_sockets (p, (GNUNET_YES == have_bind6) ? &serverAddrv6 : NULL,
3024                                                                                 (GNUNET_YES == have_bind4) ? &serverAddrv4 : NULL);
3025   if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
3026   {
3027     LOG (GNUNET_ERROR_TYPE_ERROR,
3028          _("Failed to create network sockets, plugin failed\n"));
3029     GNUNET_CONTAINER_multihashmap_destroy (p->sessions);
3030     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3031     GNUNET_SERVER_mst_destroy (p->mst);
3032     GNUNET_free (p);
3033     return NULL;
3034   }
3035   else if (broadcast == GNUNET_YES)
3036   {
3037     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
3038     setup_broadcast (p, &serverAddrv6, &serverAddrv4);
3039   }
3040
3041   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
3042   api->cls = p;
3043   api->send = NULL;
3044   api->disconnect = &udp_disconnect;
3045   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3046   api->address_to_string = &udp_address_to_string;
3047   api->string_to_address = &udp_string_to_address;
3048   api->check_address = &udp_plugin_check_address;
3049   api->get_session = &udp_plugin_get_session;
3050   api->send = &udp_plugin_send;
3051   api->get_network = &udp_get_network;
3052
3053   return api;
3054 }
3055
3056
3057 static int
3058 heap_cleanup_iterator (void *cls,
3059                        struct GNUNET_CONTAINER_HeapNode *
3060                        node, void *element,
3061                        GNUNET_CONTAINER_HeapCostType
3062                        cost)
3063 {
3064   struct DefragContext * d_ctx = element;
3065
3066   GNUNET_CONTAINER_heap_remove_node (node);
3067   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
3068   GNUNET_free (d_ctx);
3069
3070   return GNUNET_YES;
3071 }
3072
3073
3074 /**
3075  * The exported method. Makes the core api available via a global and
3076  * returns the udp transport API.
3077  *
3078  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
3079  * @return NULL
3080  */
3081 void *
3082 libgnunet_plugin_transport_udp_done (void *cls)
3083 {
3084   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3085   struct Plugin *plugin = api->cls;
3086
3087   if (NULL == plugin)
3088   {
3089     GNUNET_free (api);
3090     return NULL;
3091   }
3092
3093   stop_broadcast (plugin);
3094   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
3095   {
3096     GNUNET_SCHEDULER_cancel (plugin->select_task);
3097     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
3098   }
3099   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
3100   {
3101     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3102     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
3103   }
3104
3105   /* Closing sockets */
3106   if (GNUNET_YES ==plugin->enable_ipv4)
3107   {
3108                 if (plugin->sockv4 != NULL)
3109                 {
3110                         GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
3111                         plugin->sockv4 = NULL;
3112                 }
3113                 GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
3114                 GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
3115   }
3116   if (GNUNET_YES ==plugin->enable_ipv6)
3117   {
3118                 if (plugin->sockv6 != NULL)
3119                 {
3120                         GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
3121                         plugin->sockv6 = NULL;
3122
3123                         GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
3124                         GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
3125                 }
3126   }
3127   if (NULL != plugin->nat)
3128         GNUNET_NAT_unregister (plugin->nat);
3129
3130   if (plugin->defrag_ctxs != NULL)
3131   {
3132     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
3133         heap_cleanup_iterator, NULL);
3134     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
3135     plugin->defrag_ctxs = NULL;
3136   }
3137   if (plugin->mst != NULL)
3138   {
3139     GNUNET_SERVER_mst_destroy(plugin->mst);
3140     plugin->mst = NULL;
3141   }
3142
3143   /* Clean up leftover messages */
3144   struct UDP_MessageWrapper * udpw;
3145   udpw = plugin->ipv4_queue_head;
3146   while (udpw != NULL)
3147   {
3148     struct UDP_MessageWrapper *tmp = udpw->next;
3149     dequeue (plugin, udpw);
3150     call_continuation(udpw, GNUNET_SYSERR);
3151     GNUNET_free (udpw);
3152
3153     udpw = tmp;
3154   }
3155   udpw = plugin->ipv6_queue_head;
3156   while (udpw != NULL)
3157   {
3158     struct UDP_MessageWrapper *tmp = udpw->next;
3159     dequeue (plugin, udpw);
3160     call_continuation(udpw, GNUNET_SYSERR);
3161     GNUNET_free (udpw);
3162
3163     udpw = tmp;
3164   }
3165
3166   /* Clean up sessions */
3167   LOG (GNUNET_ERROR_TYPE_DEBUG,
3168        "Cleaning up sessions\n");
3169   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
3170   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
3171
3172   plugin->nat = NULL;
3173   GNUNET_free (plugin);
3174   GNUNET_free (api);
3175 #if DEBUG_MALLOC
3176   struct Allocation *allocation;
3177   while (NULL != ahead)
3178   {
3179       allocation = ahead;
3180       GNUNET_CONTAINER_DLL_remove (ahead, atail, allocation);
3181       GNUNET_free (allocation);
3182   }
3183   struct Allocator *allocator;
3184   while (NULL != aehead)
3185   {
3186       allocator = aehead;
3187       GNUNET_CONTAINER_DLL_remove (aehead, aetail, allocator);
3188       GNUNET_free (allocator);
3189   }
3190 #endif
3191   return NULL;
3192 }
3193
3194
3195 /* end of plugin_transport_udp.c */