important check
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 #define PLUGIN_NAME "udp"
46
47 /**
48  * Number of messages we can defragment in parallel.  We only really
49  * defragment 1 message at a time, but if messages get re-ordered, we
50  * may want to keep knowledge about the previous message to avoid
51  * discarding the current message in favor of a single fragment of a
52  * previous message.  3 should be good since we don't expect massive
53  * message reorderings with UDP.
54  */
55 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
56
57 /**
58  * We keep a defragmentation queue per sender address.  How many
59  * sender addresses do we support at the same time? Memory consumption
60  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
61  * value. (So 128 corresponds to 12 MB and should suffice for
62  * connecting to roughly 128 peers via UDP).
63  */
64 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
65
66 /**
67  * Closure for 'append_port'.
68  */
69 struct PrettyPrinterContext
70 {
71   /**
72    * Function to call with the result.
73    */
74   GNUNET_TRANSPORT_AddressStringCallback asc;
75
76   /**
77    * Clsoure for 'asc'.
78    */
79   void *asc_cls;
80
81   /**
82    * Port to add after the IP address.
83    */
84   uint16_t port;
85
86   /**
87    * IPv6 address
88    */
89
90   int ipv6;
91
92   /**
93    * Options
94    */
95   uint32_t options;
96 };
97
98
99 enum UDP_MessageType
100 {
101   UNDEFINED = 0,
102   MSG_FRAGMENTED = 1,
103   MSG_FRAGMENTED_COMPLETE = 2,
104   MSG_UNFRAGMENTED = 3,
105   MSG_ACK = 4,
106   MSG_BEACON = 5
107 };
108
109 struct Session
110 {
111   /**
112    * Which peer is this session for?
113    */
114   struct GNUNET_PeerIdentity target;
115
116   struct UDP_FragmentationContext * frag_ctx;
117
118   /**
119    * Address of the other peer
120    */
121   const struct sockaddr *sock_addr;
122
123   /**
124    * Desired delay for next sending we send to other peer
125    */
126   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
127
128   /**
129    * Desired delay for next sending we received from other peer
130    */
131   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
132
133   /**
134    * Session timeout task
135    */
136   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
137
138   /**
139    * expected delay for ACKs
140    */
141   struct GNUNET_TIME_Relative last_expected_ack_delay;
142
143   /**
144    * desired delay between UDP messages
145    */
146   struct GNUNET_TIME_Relative last_expected_msg_delay;
147
148   struct GNUNET_ATS_Information ats;
149
150   size_t addrlen;
151
152
153   unsigned int rc;
154
155   int in_destroy;
156
157   int inbound;
158 };
159
160
161 struct SessionCompareContext
162 {
163   struct Session *res;
164   const struct GNUNET_HELLO_Address *addr;
165 };
166
167
168 /**
169  * Closure for 'process_inbound_tokenized_messages'
170  */
171 struct SourceInformation
172 {
173   /**
174    * Sender identity.
175    */
176   struct GNUNET_PeerIdentity sender;
177
178   /**
179    * Source address.
180    */
181   const void *arg;
182
183   struct Session *session;
184   /**
185    * Number of bytes in source address.
186    */
187   size_t args;
188
189 };
190
191
192 /**
193  * Closure for 'find_receive_context'.
194  */
195 struct FindReceiveContext
196 {
197   /**
198    * Where to store the result.
199    */
200   struct DefragContext *rc;
201
202   /**
203    * Address to find.
204    */
205   const struct sockaddr *addr;
206
207   struct Session *session;
208
209   /**
210    * Number of bytes in 'addr'.
211    */
212   socklen_t addr_len;
213
214 };
215
216
217
218 /**
219  * Data structure to track defragmentation contexts based
220  * on the source of the UDP traffic.
221  */
222 struct DefragContext
223 {
224
225   /**
226    * Defragmentation context.
227    */
228   struct GNUNET_DEFRAGMENT_Context *defrag;
229
230   /**
231    * Source address this receive context is for (allocated at the
232    * end of the struct).
233    */
234   const struct sockaddr *src_addr;
235
236   /**
237    * Reference to master plugin struct.
238    */
239   struct Plugin *plugin;
240
241   /**
242    * Node in the defrag heap.
243    */
244   struct GNUNET_CONTAINER_HeapNode *hnode;
245
246   /**
247    * Length of 'src_addr'
248    */
249   size_t addr_len;
250 };
251
252
253
254 /**
255  * Context to send fragmented messages
256  */
257 struct UDP_FragmentationContext
258 {
259   /**
260    * Next in linked list
261    */
262   struct UDP_FragmentationContext * next;
263
264   /**
265    * Previous in linked list
266    */
267   struct UDP_FragmentationContext * prev;
268
269   /**
270    * The plugin
271    */
272   struct Plugin * plugin;
273
274   /**
275    * Handle for GNUNET_FRAGMENT context
276    */
277   struct GNUNET_FRAGMENT_Context * frag;
278
279   /**
280    * The session this fragmentation context belongs to
281    */
282   struct Session * session;
283
284   /**
285    * Function to call upon completion of the transmission.
286    */
287   GNUNET_TRANSPORT_TransmitContinuation cont;
288
289   /**
290    * Closure for 'cont'.
291    */
292   void *cont_cls;
293
294   /**
295    * Message timeout
296    */
297   struct GNUNET_TIME_Absolute timeout;
298
299   /**
300    * Payload size of original unfragmented message
301    */
302   size_t payload_size;
303
304   /**
305    * Bytes used to send all fragments on wire including UDP overhead
306    */
307   size_t on_wire_size;
308
309   unsigned int fragments_used;
310
311 };
312
313
314 struct UDP_MessageWrapper
315 {
316   /**
317    * Session this message belongs to
318    */
319   struct Session *session;
320
321   /**
322    * DLL of messages
323    * previous element
324    */
325   struct UDP_MessageWrapper *prev;
326
327   /**
328    * DLL of messages
329    * previous element
330    */
331   struct UDP_MessageWrapper *next;
332
333   /**
334    * Message type
335    * According to UDP_MessageType
336    */
337   int msg_type;
338
339   /**
340    * Message with size msg_size including UDP specific overhead
341    */
342   char *msg_buf;
343
344   /**
345    * Size of UDP message to send including UDP specific overhead
346    */
347   size_t msg_size;
348
349   /**
350    * Payload size of original message
351    */
352   size_t payload_size;
353
354   /**
355    * Message timeout
356    */
357   struct GNUNET_TIME_Absolute timeout;
358
359   /**
360    * Function to call upon completion of the transmission.
361    */
362   GNUNET_TRANSPORT_TransmitContinuation cont;
363
364   /**
365    * Closure for 'cont'.
366    */
367   void *cont_cls;
368
369   /**
370    * Fragmentation context
371    * frag_ctx == NULL if transport <= MTU
372    * frag_ctx != NULL if transport > MTU
373    */
374   struct UDP_FragmentationContext *frag_ctx;
375 };
376
377
378 /**
379  * UDP ACK Message-Packet header (after defragmentation).
380  */
381 struct UDP_ACK_Message
382 {
383   /**
384    * Message header.
385    */
386   struct GNUNET_MessageHeader header;
387
388   /**
389    * Desired delay for flow control
390    */
391   uint32_t delay;
392
393   /**
394    * What is the identity of the sender
395    */
396   struct GNUNET_PeerIdentity sender;
397
398 };
399
400 /**
401  * Address options
402  */
403 static uint32_t myoptions;
404
405
406 /**
407  * Encapsulation of all of the state of the plugin.
408  */
409 struct Plugin * plugin;
410
411
412 /**
413  * We have been notified that our readset has something to read.  We don't
414  * know which socket needs to be read, so we have to check each one
415  * Then reschedule this function to be called again once more is available.
416  *
417  * @param cls the plugin handle
418  * @param tc the scheduling context (for rescheduling this function again)
419  */
420 static void
421 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
422
423
424 /**
425  * We have been notified that our readset has something to read.  We don't
426  * know which socket needs to be read, so we have to check each one
427  * Then reschedule this function to be called again once more is available.
428  *
429  * @param cls the plugin handle
430  * @param tc the scheduling context (for rescheduling this function again)
431  */
432 static void
433 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
434
435
436 /**
437  * Start session timeout
438  */
439 static void
440 start_session_timeout (struct Session *s);
441
442 /**
443  * Increment session timeout due to activity
444  */
445 static void
446 reschedule_session_timeout (struct Session *s);
447
448 /**
449  * Cancel timeout
450  */
451 static void
452 stop_session_timeout (struct Session *s);
453
454 /**
455  * (re)schedule select tasks for this plugin.
456  *
457  * @param plugin plugin to reschedule
458  */
459 static void
460 schedule_select (struct Plugin *plugin)
461 {
462   struct GNUNET_TIME_Relative min_delay;
463   struct UDP_MessageWrapper *udpw;
464
465   if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
466   {
467     /* Find a message ready to send:
468      * Flow delay from other peer is expired or not set (0) */
469     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
470     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
471       min_delay = GNUNET_TIME_relative_min (min_delay,
472                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
473     
474     if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
475       GNUNET_SCHEDULER_cancel(plugin->select_task);
476
477     /* Schedule with:
478      * - write active set if message is ready
479      * - timeout minimum delay */
480     plugin->select_task =
481       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
482                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
483                                    plugin->rs_v4,
484                                    (0 == min_delay.rel_value) ? plugin->ws_v4 : NULL,
485                                    &udp_plugin_select, plugin);  
486   }
487   if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
488   {
489     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
490     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
491       min_delay = GNUNET_TIME_relative_min (min_delay,
492                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
493     
494     if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task_v6)
495       GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
496     plugin->select_task_v6 =
497       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
498                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
499                                    plugin->rs_v6,
500                                    (0 == min_delay.rel_value) ? plugin->ws_v6 : NULL,
501                                    &udp_plugin_select_v6, plugin);
502   }
503 }
504
505
506 /**
507  * Function called for a quick conversion of the binary address to
508  * a numeric address.  Note that the caller must not free the
509  * address and that the next call to this function is allowed
510  * to override the address again.
511  *
512  * @param cls closure
513  * @param addr binary address
514  * @param addrlen length of the address
515  * @return string representing the same address
516  */
517 const char *
518 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
519 {
520   static char rbuf[INET6_ADDRSTRLEN + 10];
521   char buf[INET6_ADDRSTRLEN];
522   const void *sb;
523   struct in_addr a4;
524   struct in6_addr a6;
525   const struct IPv4UdpAddress *t4;
526   const struct IPv6UdpAddress *t6;
527   int af;
528   uint16_t port;
529   uint32_t options;
530
531   options = 0;
532   if (addrlen == sizeof (struct IPv6UdpAddress))
533   {
534     t6 = addr;
535     af = AF_INET6;
536     options = ntohl (t6->options);
537     port = ntohs (t6->u6_port);
538     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
539     sb = &a6;
540   }
541   else if (addrlen == sizeof (struct IPv4UdpAddress))
542   {
543     t4 = addr;
544     af = AF_INET;
545     options = ntohl (t4->options);
546     port = ntohs (t4->u4_port);
547     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
548     sb = &a4;
549   }
550   else if (addrlen == 0)
551         {
552                 GNUNET_snprintf (rbuf, sizeof (rbuf), "%s", "<inbound>");
553                 return rbuf;
554         }
555   else
556   {
557     GNUNET_break_op (0);
558     return NULL;
559   }
560   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
561
562   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "%s.%u.[%s]:%u" : "%s.%u.%s:%u",
563                    PLUGIN_NAME, options, buf, port);
564   return rbuf;
565 }
566
567
568 /**
569  * Function called to convert a string address to
570  * a binary address.
571  *
572  * @param cls closure ('struct Plugin*')
573  * @param addr string address
574  * @param addrlen length of the address
575  * @param buf location to store the buffer
576  * @param added location to store the number of bytes in the buffer.
577  *        If the function returns GNUNET_SYSERR, its contents are undefined.
578  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
579  */
580 static int
581 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
582     void **buf, size_t *added)
583 {
584   struct sockaddr_storage socket_address;
585   char *address;
586   char *plugin;
587   char *optionstr;
588   uint32_t options;
589
590   /* Format tcp.options.address:port */
591   address = NULL;
592   plugin = NULL;
593   optionstr = NULL;
594   options = 0;
595   if ((NULL == addr) || (addrlen == 0))
596   {
597     GNUNET_break (0);
598     return GNUNET_SYSERR;
599   }
600   if ('\0' != addr[addrlen - 1])
601   {
602     GNUNET_break (0);
603     return GNUNET_SYSERR;
604   }
605   if (strlen (addr) != addrlen - 1)
606   {
607     GNUNET_break (0);
608     return GNUNET_SYSERR;
609   }
610   plugin = GNUNET_strdup (addr);
611   optionstr = strchr (plugin, '.');
612   if (NULL == optionstr)
613   {
614     GNUNET_break (0);
615     GNUNET_free (plugin);
616     return GNUNET_SYSERR;
617   }
618   optionstr[0] = '\0';
619   optionstr ++;
620   options = atol (optionstr);
621   address = strchr (optionstr, '.');
622   if (NULL == address)
623   {
624     GNUNET_break (0);
625     GNUNET_free (plugin);
626     return GNUNET_SYSERR;
627   }
628   address[0] = '\0';
629   address ++;
630
631   if (GNUNET_OK !=
632       GNUNET_STRINGS_to_address_ip (address, strlen (address),
633                                     &socket_address))
634   {
635     GNUNET_break (0);
636     GNUNET_free (plugin);
637     return GNUNET_SYSERR;
638   }
639
640   GNUNET_free (plugin);
641
642   switch (socket_address.ss_family)
643   {
644   case AF_INET:
645     {
646       struct IPv4UdpAddress *u4;
647       struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
648       u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
649       u4->options =  htonl (options);
650       u4->ipv4_addr = in4->sin_addr.s_addr;
651       u4->u4_port = in4->sin_port;
652       *buf = u4;
653       *added = sizeof (struct IPv4UdpAddress);
654       return GNUNET_OK;
655     }
656   case AF_INET6:
657     {
658       struct IPv6UdpAddress *u6;
659       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
660       u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
661       u6->options =  htonl (options);
662       u6->ipv6_addr = in6->sin6_addr;
663       u6->u6_port = in6->sin6_port;
664       *buf = u6;
665       *added = sizeof (struct IPv6UdpAddress);
666       return GNUNET_OK;
667     }
668   default:
669     GNUNET_break (0);
670     return GNUNET_SYSERR;
671   }
672 }
673
674
675 /**
676  * Append our port and forward the result.
677  *
678  * @param cls a 'struct PrettyPrinterContext'
679  * @param hostname result from DNS resolver
680  */
681 static void
682 append_port (void *cls, const char *hostname)
683 {
684   struct PrettyPrinterContext *ppc = cls;
685   char *ret;
686
687   if (hostname == NULL)
688   {
689     ppc->asc (ppc->asc_cls, NULL);
690     GNUNET_free (ppc);
691     return;
692   }
693   if (GNUNET_YES == ppc->ipv6)
694     GNUNET_asprintf (&ret, "%s.%u.[%s]:%d", PLUGIN_NAME, ppc->options, hostname, ppc->port);
695   else
696     GNUNET_asprintf (&ret, "%s.%u.%s:%d", PLUGIN_NAME, ppc->options, hostname, ppc->port);
697   ppc->asc (ppc->asc_cls, ret);
698   GNUNET_free (ret);
699 }
700
701
702 /**
703  * Convert the transports address to a nice, human-readable
704  * format.
705  *
706  * @param cls closure
707  * @param type name of the transport that generated the address
708  * @param addr one of the addresses of the host, NULL for the last address
709  *        the specific address format depends on the transport
710  * @param addrlen length of the address
711  * @param numeric should (IP) addresses be displayed in numeric form?
712  * @param timeout after how long should we give up?
713  * @param asc function to call on each string
714  * @param asc_cls closure for asc
715  */
716 static void
717 udp_plugin_address_pretty_printer (void *cls, const char *type,
718                                    const void *addr, size_t addrlen,
719                                    int numeric,
720                                    struct GNUNET_TIME_Relative timeout,
721                                    GNUNET_TRANSPORT_AddressStringCallback asc,
722                                    void *asc_cls)
723 {
724   struct PrettyPrinterContext *ppc;
725   const void *sb;
726   size_t sbs;
727   struct sockaddr_in a4;
728   struct sockaddr_in6 a6;
729   const struct IPv4UdpAddress *u4;
730   const struct IPv6UdpAddress *u6;
731   uint16_t port;
732   uint32_t options;
733
734   options = 0;
735   if (addrlen == sizeof (struct IPv6UdpAddress))
736   {
737     u6 = addr;
738     memset (&a6, 0, sizeof (a6));
739     a6.sin6_family = AF_INET6;
740 #if HAVE_SOCKADDR_IN_SIN_LEN
741     a6.sin6_len = sizeof (a6);
742 #endif
743     a6.sin6_port = u6->u6_port;
744     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
745     port = ntohs (u6->u6_port);
746     options = ntohl (u6->options);
747     sb = &a6;
748     sbs = sizeof (a6);
749   }
750   else if (addrlen == sizeof (struct IPv4UdpAddress))
751   {
752     u4 = addr;
753     memset (&a4, 0, sizeof (a4));
754     a4.sin_family = AF_INET;
755 #if HAVE_SOCKADDR_IN_SIN_LEN
756     a4.sin_len = sizeof (a4);
757 #endif
758     a4.sin_port = u4->u4_port;
759     a4.sin_addr.s_addr = u4->ipv4_addr;
760     port = ntohs (u4->u4_port);
761     options = ntohl (u4->options);
762     sb = &a4;
763     sbs = sizeof (a4);
764   }
765   else if (0 == addrlen)
766   {
767     asc (asc_cls, "<inbound connection>");
768     asc (asc_cls, NULL);
769     return;
770   }
771   else
772   {
773     /* invalid address */
774     GNUNET_break_op (0);
775     asc (asc_cls, NULL);
776     return;
777   }
778   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
779   ppc->asc = asc;
780   ppc->asc_cls = asc_cls;
781   ppc->port = port;
782   ppc->options = options;
783   if (addrlen == sizeof (struct IPv6UdpAddress))
784     ppc->ipv6 = GNUNET_YES;
785   else
786     ppc->ipv6 = GNUNET_NO;
787   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
788 }
789
790
791 static void
792 call_continuation (struct UDP_MessageWrapper *udpw, int result)
793 {
794   size_t overhead;
795
796   LOG (GNUNET_ERROR_TYPE_DEBUG,
797       "Calling continuation for %u byte message to `%s' with result %s\n",
798       udpw->payload_size, GNUNET_i2s (&udpw->session->target),
799       (GNUNET_OK == result) ? "OK" : "SYSERR");
800
801   if (udpw->msg_size >= udpw->payload_size)
802     overhead = udpw->msg_size - udpw->payload_size;
803   else
804     overhead = udpw->msg_size;
805
806   switch (result) {
807     case GNUNET_OK:
808       switch (udpw->msg_type) {
809         case MSG_UNFRAGMENTED:
810           if (NULL != udpw->cont)
811           {
812             /* Transport continuation */
813             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
814                       udpw->payload_size, udpw->msg_size);
815           }
816           GNUNET_STATISTICS_update (plugin->env->stats,
817                                     "# UDP, unfragmented msgs, messages, sent, success",
818                                     1, GNUNET_NO);
819           GNUNET_STATISTICS_update (plugin->env->stats,
820                                     "# UDP, unfragmented msgs, bytes payload, sent, success",
821                                     udpw->payload_size, GNUNET_NO);
822           GNUNET_STATISTICS_update (plugin->env->stats,
823                                     "# UDP, unfragmented msgs, bytes overhead, sent, success",
824                                     overhead, GNUNET_NO);
825           GNUNET_STATISTICS_update (plugin->env->stats,
826                                     "# UDP, total, bytes overhead, sent",
827                                     overhead, GNUNET_NO);
828           GNUNET_STATISTICS_update (plugin->env->stats,
829                                     "# UDP, total, bytes payload, sent",
830                                     udpw->payload_size, GNUNET_NO);
831           break;
832         case MSG_FRAGMENTED_COMPLETE:
833           GNUNET_assert (NULL != udpw->frag_ctx);
834           if (udpw->frag_ctx->cont != NULL)
835             udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_OK,
836                                udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
837           GNUNET_STATISTICS_update (plugin->env->stats,
838                                     "# UDP, fragmented msgs, messages, sent, success",
839                                     1, GNUNET_NO);
840           GNUNET_STATISTICS_update (plugin->env->stats,
841                                     "# UDP, fragmented msgs, bytes payload, sent, success",
842                                     udpw->payload_size, GNUNET_NO);
843           GNUNET_STATISTICS_update (plugin->env->stats,
844                                     "# UDP, fragmented msgs, bytes overhead, sent, success",
845                                     overhead, GNUNET_NO);
846           GNUNET_STATISTICS_update (plugin->env->stats,
847                                     "# UDP, total, bytes overhead, sent",
848                                     overhead, GNUNET_NO);
849           GNUNET_STATISTICS_update (plugin->env->stats,
850                                     "# UDP, total, bytes payload, sent",
851                                     udpw->payload_size, GNUNET_NO);
852           GNUNET_STATISTICS_update (plugin->env->stats,
853                                     "# UDP, fragmented msgs, messages, pending",
854                                     -1, GNUNET_NO);
855           break;
856         case MSG_FRAGMENTED:
857           /* Fragmented message: enqueue next fragment */
858           if (NULL != udpw->cont)
859             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
860                       udpw->payload_size, udpw->msg_size);
861           GNUNET_STATISTICS_update (plugin->env->stats,
862                                     "# UDP, fragmented msgs, fragments, sent, success",
863                                     1, GNUNET_NO);
864           GNUNET_STATISTICS_update (plugin->env->stats,
865                                     "# UDP, fragmented msgs, fragments bytes, sent, success",
866                                     udpw->msg_size, GNUNET_NO);
867           break;
868         case MSG_ACK:
869           /* No continuation */
870           GNUNET_STATISTICS_update (plugin->env->stats,
871                                     "# UDP, ACK msgs, messages, sent, success",
872                                     1, GNUNET_NO);
873           GNUNET_STATISTICS_update (plugin->env->stats,
874                                     "# UDP, ACK msgs, bytes overhead, sent, success",
875                                     overhead, GNUNET_NO);
876           GNUNET_STATISTICS_update (plugin->env->stats,
877                                     "# UDP, total, bytes overhead, sent",
878                                     overhead, GNUNET_NO);
879           break;
880         case MSG_BEACON:
881           GNUNET_break (0);
882           break;
883         default:
884           LOG (GNUNET_ERROR_TYPE_ERROR,
885               "ERROR: %u\n", udpw->msg_type);
886           GNUNET_break (0);
887           break;
888       }
889       break;
890     case GNUNET_SYSERR:
891       switch (udpw->msg_type) {
892         case MSG_UNFRAGMENTED:
893           /* Unfragmented message: failed to send */
894           if (NULL != udpw->cont)
895             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
896                       udpw->payload_size, overhead);
897           GNUNET_STATISTICS_update (plugin->env->stats,
898                                   "# UDP, unfragmented msgs, messages, sent, failure",
899                                   1, GNUNET_NO);
900           GNUNET_STATISTICS_update (plugin->env->stats,
901                                     "# UDP, unfragmented msgs, bytes payload, sent, failure",
902                                     udpw->payload_size, GNUNET_NO);
903           GNUNET_STATISTICS_update (plugin->env->stats,
904                                     "# UDP, unfragmented msgs, bytes overhead, sent, failure",
905                                     overhead, GNUNET_NO);
906           break;
907         case MSG_FRAGMENTED_COMPLETE:
908           GNUNET_assert (NULL != udpw->frag_ctx);
909           if (udpw->frag_ctx->cont != NULL)
910             udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_SYSERR,
911                                udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
912           GNUNET_STATISTICS_update (plugin->env->stats,
913                                     "# UDP, fragmented msgs, messages, sent, failure",
914                                     1, GNUNET_NO);
915           GNUNET_STATISTICS_update (plugin->env->stats,
916                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
917                                     udpw->payload_size, GNUNET_NO);
918           GNUNET_STATISTICS_update (plugin->env->stats,
919                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
920                                     overhead, GNUNET_NO);
921           GNUNET_STATISTICS_update (plugin->env->stats,
922                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
923                                     overhead, GNUNET_NO);
924           GNUNET_STATISTICS_update (plugin->env->stats,
925                                     "# UDP, fragmented msgs, messages, pending",
926                                     -1, GNUNET_NO);
927           break;
928         case MSG_FRAGMENTED:
929           GNUNET_assert (NULL != udpw->frag_ctx);
930           /* Fragmented message: failed to send */
931           GNUNET_STATISTICS_update (plugin->env->stats,
932                                     "# UDP, fragmented msgs, fragments, sent, failure",
933                                     1, GNUNET_NO);
934           GNUNET_STATISTICS_update (plugin->env->stats,
935                                     "# UDP, fragmented msgs, fragments bytes, sent, failure",
936                                     udpw->msg_size, GNUNET_NO);
937           break;
938         case MSG_ACK:
939           /* ACK message: failed to send */
940           GNUNET_STATISTICS_update (plugin->env->stats,
941                                     "# UDP, ACK msgs, messages, sent, failure",
942                                     1, GNUNET_NO);
943           break;
944         case MSG_BEACON:
945           /* Beacon message: failed to send */
946           GNUNET_break (0);
947           break;
948         default:
949           GNUNET_break (0);
950           break;
951       }
952       break;
953     default:
954       GNUNET_break (0);
955       break;
956   }
957 }
958
959
960 /**
961  * Check if the given port is plausible (must be either our listen
962  * port or our advertised port).  If it is neither, we return
963  * GNUNET_SYSERR.
964  *
965  * @param plugin global variables
966  * @param in_port port number to check
967  * @return GNUNET_OK if port is either open_port or adv_port
968  */
969 static int
970 check_port (struct Plugin *plugin, uint16_t in_port)
971 {
972   if ((in_port == plugin->port) || (in_port == plugin->aport))
973     return GNUNET_OK;
974   return GNUNET_SYSERR;
975 }
976
977
978 /**
979  * Function that will be called to check if a binary address for this
980  * plugin is well-formed and corresponds to an address for THIS peer
981  * (as per our configuration).  Naturally, if absolutely necessary,
982  * plugins can be a bit conservative in their answer, but in general
983  * plugins should make sure that the address does not redirect
984  * traffic to a 3rd party that might try to man-in-the-middle our
985  * traffic.
986  *
987  * @param cls closure, should be our handle to the Plugin
988  * @param addr pointer to the address
989  * @param addrlen length of addr
990  * @return GNUNET_OK if this is a plausible address for this peer
991  *         and transport, GNUNET_SYSERR if not
992  *
993  */
994 static int
995 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
996 {
997   struct Plugin *plugin = cls;
998   struct IPv4UdpAddress *v4;
999   struct IPv6UdpAddress *v6;
1000
1001   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
1002       (addrlen != sizeof (struct IPv6UdpAddress)))
1003   {
1004     GNUNET_break_op (0);
1005     return GNUNET_SYSERR;
1006   }
1007   if (addrlen == sizeof (struct IPv4UdpAddress))
1008   {
1009     v4 = (struct IPv4UdpAddress *) addr;
1010     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1011       return GNUNET_SYSERR;
1012     if (GNUNET_OK !=
1013         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
1014                                  sizeof (struct in_addr)))
1015       return GNUNET_SYSERR;
1016   }
1017   else
1018   {
1019     v6 = (struct IPv6UdpAddress *) addr;
1020     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1021     {
1022       GNUNET_break_op (0);
1023       return GNUNET_SYSERR;
1024     }
1025     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
1026       return GNUNET_SYSERR;
1027     if (GNUNET_OK !=
1028         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
1029                                  sizeof (struct in6_addr)))
1030       return GNUNET_SYSERR;
1031   }
1032   return GNUNET_OK;
1033 }
1034
1035
1036 /**
1037  * Task to free resources associated with a session.
1038  *
1039  * @param s session to free
1040  */
1041 static void
1042 free_session (struct Session *s)
1043 {
1044   if (NULL != s->frag_ctx)
1045   {
1046     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag, NULL, NULL);
1047     GNUNET_free (s->frag_ctx);
1048     s->frag_ctx = NULL;
1049   }
1050   GNUNET_free (s);
1051 }
1052
1053
1054 static void
1055 dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1056 {
1057   if (plugin->bytes_in_buffer < udpw->msg_size)
1058       GNUNET_break (0);
1059   else
1060   {
1061     GNUNET_STATISTICS_update (plugin->env->stats,
1062                               "# UDP, total, bytes in buffers",
1063                               - (long long) udpw->msg_size, GNUNET_NO);
1064     plugin->bytes_in_buffer -= udpw->msg_size;
1065   }
1066   GNUNET_STATISTICS_update (plugin->env->stats,
1067                             "# UDP, total, msgs in buffers",
1068                             -1, GNUNET_NO);
1069   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1070     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1071                                  plugin->ipv4_queue_tail, udpw);
1072   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1073     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1074                                  plugin->ipv6_queue_tail, udpw);
1075 }
1076
1077 static void
1078 fragmented_message_done (struct UDP_FragmentationContext *fc, int result)
1079 {
1080   struct UDP_MessageWrapper *udpw;
1081   struct UDP_MessageWrapper *tmp;
1082   struct UDP_MessageWrapper dummy;
1083   struct Session *s = fc->session;
1084
1085   LOG (GNUNET_ERROR_TYPE_DEBUG, "%p : Fragmented message removed with result %s\n", fc, (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1086   
1087   /* Call continuation for fragmented message */
1088   memset (&dummy, 0, sizeof (dummy));
1089   dummy.msg_type = MSG_FRAGMENTED_COMPLETE;
1090   dummy.msg_size = s->frag_ctx->on_wire_size;
1091   dummy.payload_size = s->frag_ctx->payload_size;
1092   dummy.frag_ctx = s->frag_ctx;
1093   dummy.cont = NULL;
1094   dummy.cont_cls = NULL;
1095   dummy.session = s;
1096
1097   call_continuation (&dummy, result);
1098
1099   /* Remove leftover fragments from queue */
1100   if (s->addrlen == sizeof (struct sockaddr_in6))
1101   {
1102     udpw = plugin->ipv6_queue_head;
1103     while (NULL != udpw)
1104     {
1105       tmp = udpw->next;
1106       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1107       {
1108         dequeue (plugin, udpw);
1109         call_continuation (udpw, GNUNET_SYSERR);
1110         GNUNET_free (udpw);
1111       }
1112       udpw = tmp;
1113     }
1114   }
1115   if (s->addrlen == sizeof (struct sockaddr_in))
1116   {
1117     udpw = plugin->ipv4_queue_head;
1118     while (udpw!= NULL)
1119     {
1120       tmp = udpw->next;
1121       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
1122       {
1123         dequeue (plugin, udpw);
1124         call_continuation (udpw, GNUNET_SYSERR);
1125         GNUNET_free (udpw);
1126       }
1127       udpw = tmp;
1128     }
1129   }
1130
1131   /* Destroy fragmentation context */
1132   GNUNET_FRAGMENT_context_destroy (fc->frag,
1133                                      &s->last_expected_msg_delay,
1134                                      &s->last_expected_ack_delay);
1135   s->frag_ctx = NULL;
1136   GNUNET_free (fc );
1137 }
1138
1139 /**
1140  * Functions with this signature are called whenever we need
1141  * to close a session due to a disconnect or failure to
1142  * establish a connection.
1143  *
1144  * @param s session to close down
1145  */
1146 static void
1147 disconnect_session (struct Session *s)
1148 {
1149   struct UDP_MessageWrapper *udpw;
1150   struct UDP_MessageWrapper *next;
1151
1152   GNUNET_assert (GNUNET_YES != s->in_destroy);
1153   LOG (GNUNET_ERROR_TYPE_DEBUG,
1154        "Session %p to peer `%s' address ended \n",
1155          s,
1156          GNUNET_i2s (&s->target),
1157          GNUNET_a2s (s->sock_addr, s->addrlen));
1158   stop_session_timeout (s);
1159
1160   if (NULL != s->frag_ctx)
1161   {
1162     /* Remove fragmented message due to disconnect */
1163     fragmented_message_done (s->frag_ctx, GNUNET_SYSERR);
1164   }
1165
1166   next = plugin->ipv4_queue_head;
1167   while (NULL != (udpw = next))
1168   {
1169     next = udpw->next;
1170     if (udpw->session == s)
1171     {
1172       dequeue (plugin, udpw);
1173       call_continuation(udpw, GNUNET_SYSERR);
1174       GNUNET_free (udpw);
1175     }
1176   }
1177   next = plugin->ipv6_queue_head;
1178   while (NULL != (udpw = next))
1179   {
1180     next = udpw->next;
1181     if (udpw->session == s)
1182     {
1183       dequeue (plugin, udpw);
1184       call_continuation(udpw, GNUNET_SYSERR);
1185       GNUNET_free (udpw);
1186     }
1187     udpw = next;
1188   }
1189   plugin->env->session_end (plugin->env->cls, &s->target, s);
1190
1191   if (NULL != s->frag_ctx)
1192   {
1193     if (NULL != s->frag_ctx->cont)
1194     {
1195       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR,
1196                          s->frag_ctx->payload_size, s->frag_ctx->on_wire_size);
1197       LOG (GNUNET_ERROR_TYPE_DEBUG,
1198           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1199           GNUNET_i2s (&s->target));
1200     }
1201   }
1202
1203   GNUNET_assert (GNUNET_YES ==
1204                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
1205                                                        &s->target.hashPubKey,
1206                                                        s));
1207   GNUNET_STATISTICS_set(plugin->env->stats,
1208                         "# UDP, sessions active",
1209                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1210                         GNUNET_NO);
1211   if (s->rc > 0)
1212     s->in_destroy = GNUNET_YES;
1213   else
1214     free_session (s);
1215 }
1216
1217 /**
1218  * Destroy a session, plugin is being unloaded.
1219  *
1220  * @param cls unused
1221  * @param key hash of public key of target peer
1222  * @param value a 'struct PeerSession*' to clean up
1223  * @return GNUNET_OK (continue to iterate)
1224  */
1225 static int
1226 disconnect_and_free_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1227 {
1228   disconnect_session(value);
1229   return GNUNET_OK;
1230 }
1231
1232
1233 /**
1234  * Disconnect from a remote node.  Clean up session if we have one for this peer
1235  *
1236  * @param cls closure for this call (should be handle to Plugin)
1237  * @param target the peeridentity of the peer to disconnect
1238  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
1239  */
1240 static void
1241 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
1242 {
1243   struct Plugin *plugin = cls;
1244   GNUNET_assert (plugin != NULL);
1245
1246   GNUNET_assert (target != NULL);
1247   LOG (GNUNET_ERROR_TYPE_DEBUG,
1248        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
1249   /* Clean up sessions */
1250   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
1251 }
1252
1253
1254 /**
1255  * Session was idle, so disconnect it
1256  */
1257 static void
1258 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1259 {
1260   GNUNET_assert (NULL != cls);
1261   struct Session *s = cls;
1262
1263   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1264   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1265               "Session %p was idle for %llu ms, disconnecting\n",
1266               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1267   /* call session destroy function */
1268   disconnect_session (s);
1269 }
1270
1271
1272 /**
1273  * Start session timeout
1274  */
1275 static void
1276 start_session_timeout (struct Session *s)
1277 {
1278   GNUNET_assert (NULL != s);
1279   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
1280   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1281                                                    &session_timeout,
1282                                                    s);
1283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284               "Timeout for session %p set to %llu ms\n",
1285               s,  (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1286 }
1287
1288
1289 /**
1290  * Increment session timeout due to activity
1291  */
1292 static void
1293 reschedule_session_timeout (struct Session *s)
1294 {
1295   GNUNET_assert (NULL != s);
1296   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1297
1298   GNUNET_SCHEDULER_cancel (s->timeout_task);
1299   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1300                                                    &session_timeout,
1301                                                    s);
1302   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303               "Timeout rescheduled for session %p set to %llu ms\n",
1304               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1305 }
1306
1307
1308 /**
1309  * Cancel timeout
1310  */
1311 static void
1312 stop_session_timeout (struct Session *s)
1313 {
1314   GNUNET_assert (NULL != s);
1315
1316   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
1317   {
1318     GNUNET_SCHEDULER_cancel (s->timeout_task);
1319     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1320     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1321                 "Timeout stopped for session %p canceled\n",
1322                 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1323   }
1324 }
1325
1326
1327 static struct Session *
1328 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
1329                 const void *addr, size_t addrlen,
1330                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1331 {
1332   struct Session *s;
1333   const struct IPv4UdpAddress *t4;
1334   const struct IPv6UdpAddress *t6;
1335   struct sockaddr_in *v4;
1336   struct sockaddr_in6 *v6;
1337   size_t len;
1338
1339   switch (addrlen)
1340   {
1341   case sizeof (struct IPv4UdpAddress):
1342     if (NULL == plugin->sockv4)
1343     {
1344       return NULL;
1345     }
1346     t4 = addr;
1347     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
1348     len = sizeof (struct sockaddr_in);
1349     v4 = (struct sockaddr_in *) &s[1];
1350     v4->sin_family = AF_INET;
1351 #if HAVE_SOCKADDR_IN_SIN_LEN
1352     v4->sin_len = sizeof (struct sockaddr_in);
1353 #endif
1354     v4->sin_port = t4->u4_port;
1355     v4->sin_addr.s_addr = t4->ipv4_addr;
1356     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
1357     break;
1358   case sizeof (struct IPv6UdpAddress):
1359     if (NULL == plugin->sockv6)
1360     {
1361       return NULL;
1362     }
1363     t6 = addr;
1364     s =
1365         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
1366     len = sizeof (struct sockaddr_in6);
1367     v6 = (struct sockaddr_in6 *) &s[1];
1368     v6->sin6_family = AF_INET6;
1369 #if HAVE_SOCKADDR_IN_SIN_LEN
1370     v6->sin6_len = sizeof (struct sockaddr_in6);
1371 #endif
1372     v6->sin6_port = t6->u6_port;
1373     v6->sin6_addr = t6->ipv6_addr;
1374     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
1375     break;
1376   default:
1377     /* Must have a valid address to send to */
1378     GNUNET_break_op (0);
1379     return NULL;
1380   }
1381   s->addrlen = len;
1382   s->target = *target;
1383   s->sock_addr = (const struct sockaddr *) &s[1];
1384   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250);
1385   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1386   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1387   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1388   s->inbound = GNUNET_NO;
1389   start_session_timeout (s);
1390   return s;
1391 }
1392
1393
1394 static int
1395 session_cmp_it (void *cls,
1396                 const struct GNUNET_HashCode * key,
1397                 void *value)
1398 {
1399   struct SessionCompareContext * cctx = cls;
1400   const struct GNUNET_HELLO_Address *address = cctx->addr;
1401   struct Session *s = value;
1402
1403   socklen_t s_addrlen = s->addrlen;
1404
1405   LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
1406       udp_address_to_string (NULL, (void *) address->address, address->address_length),
1407       GNUNET_a2s (s->sock_addr, s->addrlen));
1408   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
1409       (s_addrlen == sizeof (struct sockaddr_in)))
1410   {
1411     struct IPv4UdpAddress * u4 = NULL;
1412     u4 = (struct IPv4UdpAddress *) address->address;
1413     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
1414     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
1415         (u4->u4_port == s4->sin_port))
1416     {
1417       cctx->res = s;
1418       return GNUNET_NO;
1419     }
1420
1421   }
1422   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
1423       (s_addrlen == sizeof (struct sockaddr_in6)))
1424   {
1425     struct IPv6UdpAddress * u6 = NULL;
1426     u6 = (struct IPv6UdpAddress *) address->address;
1427     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
1428     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
1429         (u6->u6_port == s6->sin6_port))
1430     {
1431       cctx->res = s;
1432       return GNUNET_NO;
1433     }
1434   }
1435   return GNUNET_YES;
1436 }
1437
1438 /**
1439  * Function obtain the network type for a session
1440  *
1441  * @param cls closure ('struct Plugin*')
1442  * @param session the session
1443  * @return the network type in HBO or GNUNET_SYSERR
1444  */
1445 static enum GNUNET_ATS_Network_Type
1446 udp_get_network (void *cls, void *session)
1447 {
1448         struct Session *s = (struct Session *) session;
1449
1450         return ntohl(s->ats.value);
1451 }
1452
1453 /**
1454  * Creates a new outbound session the transport service will use to send data to the
1455  * peer
1456  *
1457  * @param cls the plugin
1458  * @param address the address
1459  * @return the session or NULL of max connections exceeded
1460  */
1461 static struct Session *
1462 udp_plugin_lookup_session (void *cls,
1463                  const struct GNUNET_HELLO_Address *address)
1464 {
1465   struct Plugin * plugin = cls;
1466   struct IPv6UdpAddress * udp_a6;
1467   struct IPv4UdpAddress * udp_a4;
1468
1469   GNUNET_assert (plugin != NULL);
1470   GNUNET_assert (address != NULL);
1471
1472
1473   if ((address->address == NULL) ||
1474       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1475       (address->address_length != sizeof (struct IPv6UdpAddress))))
1476   {
1477     LOG (GNUNET_ERROR_TYPE_WARNING,
1478         _("Trying to create session for address of unexpected length %u (should be %u or %u)\n"),
1479         address->address_length,
1480         sizeof (struct IPv4UdpAddress),
1481         sizeof (struct IPv6UdpAddress));
1482     return NULL;
1483   }
1484
1485   if (address->address_length == sizeof (struct IPv4UdpAddress))
1486   {
1487     if (plugin->sockv4 == NULL)
1488       return NULL;
1489     udp_a4 = (struct IPv4UdpAddress *) address->address;
1490     if (udp_a4->u4_port == 0)
1491       return NULL;
1492   }
1493
1494   if (address->address_length == sizeof (struct IPv6UdpAddress))
1495   {
1496     if (plugin->sockv6 == NULL)
1497       return NULL;
1498     udp_a6 = (struct IPv6UdpAddress *) address->address;
1499     if (udp_a6->u6_port == 0)
1500       return NULL;
1501   }
1502
1503   /* check if session already exists */
1504   struct SessionCompareContext cctx;
1505   cctx.addr = address;
1506   cctx.res = NULL;
1507   LOG (GNUNET_ERROR_TYPE_DEBUG,
1508        "Looking for existing session for peer `%s' `%s' \n", 
1509        GNUNET_i2s (&address->peer), 
1510        udp_address_to_string(NULL, address->address, address->address_length));
1511   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
1512   if (cctx.res != NULL)
1513   {
1514     LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
1515     return cctx.res;
1516   }
1517   return NULL;
1518 }
1519
1520 static struct Session *
1521 udp_plugin_create_session (void *cls,
1522                   const struct GNUNET_HELLO_Address *address)
1523 {
1524   struct Session * s = NULL;
1525
1526   /* otherwise create new */
1527   s = create_session (plugin,
1528       &address->peer,
1529       address->address,
1530       address->address_length,
1531       NULL, NULL);
1532   LOG (GNUNET_ERROR_TYPE_DEBUG,
1533        "Creating new session %p for peer `%s' address `%s'\n",
1534        s,
1535        GNUNET_i2s(&address->peer),
1536        udp_address_to_string(NULL,address->address,address->address_length));
1537   GNUNET_assert (GNUNET_OK ==
1538                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
1539                                                     &s->target.hashPubKey,
1540                                                     s,
1541                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1542   GNUNET_STATISTICS_set(plugin->env->stats,
1543                         "# UDP, sessions active",
1544                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1545                         GNUNET_NO);
1546   return s;
1547 }
1548
1549
1550
1551 /**
1552  * Creates a new outbound session the transport service will use to send data to the
1553  * peer
1554  *
1555  * @param cls the plugin
1556  * @param address the address
1557  * @return the session or NULL of max connections exceeded
1558  */
1559 static struct Session *
1560 udp_plugin_get_session (void *cls,
1561                   const struct GNUNET_HELLO_Address *address)
1562 {
1563   struct Session * s = NULL;
1564
1565   if (NULL == address)
1566   {
1567         GNUNET_break (0);
1568         return NULL;
1569   }
1570   if ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1571                 (address->address_length != sizeof (struct IPv6UdpAddress)))
1572                 return NULL;
1573
1574   /* otherwise create new */
1575   if (NULL != (s = udp_plugin_lookup_session(cls, address)))
1576         return s;
1577   else
1578         return udp_plugin_create_session (cls, address);
1579 }
1580
1581
1582 static void 
1583 enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1584 {
1585   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1586       GNUNET_break (0);
1587   else
1588   {
1589     GNUNET_STATISTICS_update (plugin->env->stats,
1590                               "# UDP, total, bytes in buffers",
1591                               udpw->msg_size, GNUNET_NO);
1592     plugin->bytes_in_buffer += udpw->msg_size;
1593   }
1594   GNUNET_STATISTICS_update (plugin->env->stats,
1595                             "# UDP, total, msgs in buffers",
1596                             1, GNUNET_NO);
1597   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1598     GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head,
1599                                  plugin->ipv4_queue_tail, udpw);
1600   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1601     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1602                                  plugin->ipv6_queue_tail, udpw);
1603 }
1604
1605
1606
1607 /**
1608  * Fragment message was transmitted via UDP, let fragmentation know
1609  * to send the next fragment now.
1610  *
1611  * @param cls the 'struct UDPMessageWrapper' of the fragment
1612  * @param target destination peer (ignored)
1613  * @param result GNUNET_OK on success (ignored)
1614  * @param payload bytes payload sent
1615  * @param physical bytes physical sent
1616  */
1617 static void
1618 send_next_fragment (void *cls,
1619                     const struct GNUNET_PeerIdentity *target,
1620                     int result, size_t payload, size_t physical)
1621 {
1622   struct UDP_MessageWrapper *udpw = cls;
1623   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
1624 }
1625
1626
1627 /**
1628  * Function that is called with messages created by the fragmentation
1629  * module.  In the case of the 'proc' callback of the
1630  * GNUNET_FRAGMENT_context_create function, this function must
1631  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
1632  *
1633  * @param cls closure, the 'struct FragmentationContext'
1634  * @param msg the message that was created
1635  */
1636 static void
1637 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1638 {
1639   struct UDP_FragmentationContext *frag_ctx = cls;
1640   struct Plugin *plugin = frag_ctx->plugin;
1641   struct UDP_MessageWrapper * udpw;
1642   size_t msg_len = ntohs (msg->size);
1643  
1644   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1645        "Enqueuing fragment with %u bytes\n", msg_len);
1646   frag_ctx->fragments_used ++;
1647   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1648   udpw->session = frag_ctx->session;
1649   udpw->msg_buf = (char *) &udpw[1];
1650   udpw->msg_size = msg_len;
1651   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
1652   udpw->cont = &send_next_fragment;
1653   udpw->cont_cls = udpw;
1654   udpw->timeout = frag_ctx->timeout;
1655   udpw->frag_ctx = frag_ctx;
1656   udpw->msg_type = MSG_FRAGMENTED;
1657   memcpy (udpw->msg_buf, msg, msg_len);
1658   enqueue (plugin, udpw);
1659   schedule_select (plugin);
1660 }
1661
1662
1663 /**
1664  * Function that can be used by the transport service to transmit
1665  * a message using the plugin.   Note that in the case of a
1666  * peer disconnecting, the continuation MUST be called
1667  * prior to the disconnect notification itself.  This function
1668  * will be called with this peer's HELLO message to initiate
1669  * a fresh connection to another peer.
1670  *
1671  * @param cls closure
1672  * @param s which session must be used
1673  * @param msgbuf the message to transmit
1674  * @param msgbuf_size number of bytes in 'msgbuf'
1675  * @param priority how important is the message (most plugins will
1676  *                 ignore message priority and just FIFO)
1677  * @param to how long to wait at most for the transmission (does not
1678  *                require plugins to discard the message after the timeout,
1679  *                just advisory for the desired delay; most plugins will ignore
1680  *                this as well)
1681  * @param cont continuation to call once the message has
1682  *        been transmitted (or if the transport is ready
1683  *        for the next transmission call; or if the
1684  *        peer disconnected...); can be NULL
1685  * @param cont_cls closure for cont
1686  * @return number of bytes used (on the physical network, with overheads);
1687  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1688  *         and does NOT mean that the message was not transmitted (DV)
1689  */
1690 static ssize_t
1691 udp_plugin_send (void *cls,
1692                   struct Session *s,
1693                   const char *msgbuf, size_t msgbuf_size,
1694                   unsigned int priority,
1695                   struct GNUNET_TIME_Relative to,
1696                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1697 {
1698   struct Plugin *plugin = cls;
1699   size_t udpmlen = msgbuf_size + sizeof (struct UDPMessage);
1700   struct UDP_FragmentationContext * frag_ctx;
1701   struct UDP_MessageWrapper * udpw;
1702   struct UDPMessage *udp;
1703   char mbuf[udpmlen];
1704   GNUNET_assert (plugin != NULL);
1705   GNUNET_assert (s != NULL);
1706
1707   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1708     return GNUNET_SYSERR;
1709   if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1710     return GNUNET_SYSERR;
1711   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1712   {
1713     GNUNET_break (0);
1714     return GNUNET_SYSERR;
1715   }
1716   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1717   {
1718     GNUNET_break (0);
1719     return GNUNET_SYSERR;
1720   }
1721   LOG (GNUNET_ERROR_TYPE_DEBUG,
1722        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1723        udpmlen,
1724        GNUNET_i2s (&s->target),
1725        GNUNET_a2s(s->sock_addr, s->addrlen));
1726
1727
1728   /* Message */
1729   udp = (struct UDPMessage *) mbuf;
1730   udp->header.size = htons (udpmlen);
1731   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1732   udp->reserved = htonl (0);
1733   udp->sender = *plugin->env->my_identity;
1734
1735   reschedule_session_timeout(s);
1736   if (udpmlen <= UDP_MTU)
1737   {
1738     /* unfragmented message */
1739     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
1740     udpw->session = s;
1741     udpw->msg_buf = (char *) &udpw[1];
1742     udpw->msg_size = udpmlen; /* message size with UDP overhead */
1743     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
1744     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1745     udpw->cont = cont;
1746     udpw->cont_cls = cont_cls;
1747     udpw->frag_ctx = NULL;
1748     udpw->msg_type = MSG_UNFRAGMENTED;
1749     memcpy (udpw->msg_buf, udp, sizeof (struct UDPMessage));
1750     memcpy (&udpw->msg_buf[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1751     enqueue (plugin, udpw);
1752
1753     GNUNET_STATISTICS_update (plugin->env->stats,
1754                               "# UDP, unfragmented msgs, messages, attempt",
1755                               1, GNUNET_NO);
1756     GNUNET_STATISTICS_update (plugin->env->stats,
1757                               "# UDP, unfragmented msgs, bytes payload, attempt",
1758                               udpw->payload_size, GNUNET_NO);
1759   }
1760   else
1761   {
1762     /* fragmented message */
1763     if  (s->frag_ctx != NULL)
1764       return GNUNET_SYSERR;
1765     memcpy (&udp[1], msgbuf, msgbuf_size);
1766     frag_ctx = GNUNET_malloc (sizeof (struct UDP_FragmentationContext));
1767     frag_ctx->plugin = plugin;
1768     frag_ctx->session = s;
1769     frag_ctx->cont = cont;
1770     frag_ctx->cont_cls = cont_cls;
1771     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1772     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1773     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
1774     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1775                                                      UDP_MTU,
1776                                                      &plugin->tracker,
1777                                                      s->last_expected_msg_delay, 
1778                                                      s->last_expected_ack_delay, 
1779                                                      &udp->header,
1780                                                      &enqueue_fragment,
1781                                                      frag_ctx);    
1782     s->frag_ctx = frag_ctx;
1783     GNUNET_STATISTICS_update (plugin->env->stats,
1784                               "# UDP, fragmented msgs, messages, pending",
1785                               1, GNUNET_NO);
1786     GNUNET_STATISTICS_update (plugin->env->stats,
1787                               "# UDP, fragmented msgs, messages, attempt",
1788                               1, GNUNET_NO);
1789     GNUNET_STATISTICS_update (plugin->env->stats,
1790                               "# UDP, fragmented msgs, bytes payload, attempt",
1791                               frag_ctx->payload_size, GNUNET_NO);
1792   }
1793   schedule_select (plugin);
1794   return udpmlen;
1795 }
1796
1797
1798 /**
1799  * Our external IP address/port mapping has changed.
1800  *
1801  * @param cls closure, the 'struct LocalAddrList'
1802  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1803  *     the previous (now invalid) one
1804  * @param addr either the previous or the new public IP address
1805  * @param addrlen actual lenght of the address
1806  */
1807 static void
1808 udp_nat_port_map_callback (void *cls, int add_remove,
1809                            const struct sockaddr *addr, socklen_t addrlen)
1810 {
1811   struct Plugin *plugin = cls;
1812   struct IPv4UdpAddress u4;
1813   struct IPv6UdpAddress u6;
1814   void *arg;
1815   size_t args;
1816
1817   LOG (GNUNET_ERROR_TYPE_INFO,
1818        "NAT notification to %s address `%s'\n",
1819        (GNUNET_YES == add_remove) ? "add" : "remove",
1820        GNUNET_a2s (addr, addrlen));
1821
1822   /* convert 'addr' to our internal format */
1823   switch (addr->sa_family)
1824   {
1825   case AF_INET:
1826     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1827     memset (&u4, 0, sizeof (u4));
1828     u4.options = htonl(myoptions);
1829     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1830     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1831     arg = &u4;
1832     args = sizeof (struct IPv4UdpAddress);
1833     break;
1834   case AF_INET6:
1835     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1836     memset (&u4, 0, sizeof (u4));
1837     u6.options = htonl(myoptions);
1838     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1839             sizeof (struct in6_addr));
1840     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1841     arg = &u6;
1842     args = sizeof (struct IPv6UdpAddress);
1843     break;
1844   default:
1845     GNUNET_break (0);
1846     return;
1847   }
1848   /* modify our published address list */
1849   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args, "udp");
1850 }
1851
1852
1853
1854 /**
1855  * Message tokenizer has broken up an incomming message. Pass it on
1856  * to the service.
1857  *
1858  * @param cls the 'struct Plugin'
1859  * @param client the 'struct SourceInformation'
1860  * @param hdr the actual message
1861  */
1862 static int
1863 process_inbound_tokenized_messages (void *cls, void *client,
1864                                     const struct GNUNET_MessageHeader *hdr)
1865 {
1866   struct Plugin *plugin = cls;
1867   struct SourceInformation *si = client;
1868   struct GNUNET_TIME_Relative delay;
1869
1870   GNUNET_assert (si->session != NULL);
1871   if (GNUNET_YES == si->session->in_destroy)
1872     return GNUNET_OK;
1873   /* setup ATS */
1874   GNUNET_break (ntohl(si->session->ats.value) != GNUNET_ATS_NET_UNSPECIFIED);
1875   delay = plugin->env->receive (plugin->env->cls,
1876                                 &si->sender,
1877                                 hdr,
1878                                 si->session,
1879                  (GNUNET_YES == si->session->inbound) ? NULL : si->arg,
1880                  (GNUNET_YES == si->session->inbound) ? 0 : si->args);
1881
1882   plugin->env->update_address_metrics (plugin->env->cls,
1883                                        &si->sender,
1884                                          (GNUNET_YES == si->session->inbound) ? NULL : si->arg,
1885                                          (GNUNET_YES == si->session->inbound) ? 0 : si->args,
1886                                        si->session,
1887                                        &si->session->ats, 1);
1888
1889   si->session->flow_delay_for_other_peer = delay;
1890   reschedule_session_timeout(si->session);
1891   return GNUNET_OK;
1892 }
1893
1894
1895 /**
1896  * We've received a UDP Message.  Process it (pass contents to main service).
1897  *
1898  * @param plugin plugin context
1899  * @param msg the message
1900  * @param sender_addr sender address
1901  * @param sender_addr_len number of bytes in sender_addr
1902  */
1903 static void
1904 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1905                      const struct sockaddr *sender_addr,
1906                      socklen_t sender_addr_len)
1907 {
1908   struct SourceInformation si;
1909   struct Session * s;
1910   struct IPv4UdpAddress u4;
1911   struct IPv6UdpAddress u6;
1912   const void *arg;
1913   size_t args;
1914
1915   if (0 != ntohl (msg->reserved))
1916   {
1917     GNUNET_break_op (0);
1918     return;
1919   }
1920   if (ntohs (msg->header.size) <
1921       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1922   {
1923     GNUNET_break_op (0);
1924     return;
1925   }
1926
1927   /* convert address */
1928   switch (sender_addr->sa_family)
1929   {
1930   case AF_INET:
1931     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1932     memset (&u4, 0, sizeof (u4));
1933     u6.options = htonl (0);
1934     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1935     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1936     arg = &u4;
1937     args = sizeof (u4);
1938     break;
1939   case AF_INET6:
1940     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1941     memset (&u6, 0, sizeof (u6));
1942     u6.options = htonl (0);
1943     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1944     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1945     arg = &u6;
1946     args = sizeof (u6);
1947     break;
1948   default:
1949     GNUNET_break (0);
1950     return;
1951   }
1952   LOG (GNUNET_ERROR_TYPE_DEBUG,
1953        "Received message with %u bytes from peer `%s' at `%s'\n",
1954        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1955        GNUNET_a2s (sender_addr, sender_addr_len));
1956
1957   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1958   if (NULL == (s = udp_plugin_lookup_session (plugin, address)))
1959   {
1960                 s = udp_plugin_create_session(plugin, address);
1961                 s->inbound = GNUNET_YES;
1962           plugin->env->session_start (NULL, &address->peer, PLUGIN_NAME,
1963                         (GNUNET_YES == s->inbound) ? NULL : address->address,
1964                         (GNUNET_YES == s->inbound) ? 0 : address->address_length,
1965                   s, NULL, 0);
1966   }
1967   GNUNET_free (address);
1968
1969   /* iterate over all embedded messages */
1970   si.session = s;
1971   si.sender = msg->sender;
1972   si.arg = arg;
1973   si.args = args;
1974   s->rc++;
1975   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1976                              ntohs (msg->header.size) -
1977                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1978   s->rc--;
1979   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1980     free_session (s);
1981 }
1982
1983
1984 /**
1985  * Scan the heap for a receive context with the given address.
1986  *
1987  * @param cls the 'struct FindReceiveContext'
1988  * @param node internal node of the heap
1989  * @param element value stored at the node (a 'struct ReceiveContext')
1990  * @param cost cost associated with the node
1991  * @return GNUNET_YES if we should continue to iterate,
1992  *         GNUNET_NO if not.
1993  */
1994 static int
1995 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1996                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1997 {
1998   struct FindReceiveContext *frc = cls;
1999   struct DefragContext *e = element;
2000
2001   if ((frc->addr_len == e->addr_len) &&
2002       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
2003   {
2004     frc->rc = e;
2005     return GNUNET_NO;
2006   }
2007   return GNUNET_YES;
2008 }
2009
2010
2011 /**
2012  * Process a defragmented message.
2013  *
2014  * @param cls the 'struct ReceiveContext'
2015  * @param msg the message
2016  */
2017 static void
2018 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
2019 {
2020   struct DefragContext *rc = cls;
2021
2022   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2023   {
2024     GNUNET_break (0);
2025     return;
2026   }
2027   if (ntohs (msg->size) < sizeof (struct UDPMessage))
2028   {
2029     GNUNET_break (0);
2030     return;
2031   }
2032   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
2033                        rc->src_addr, rc->addr_len);
2034 }
2035
2036
2037 struct LookupContext
2038 {
2039   const struct sockaddr * addr;
2040
2041   struct Session *res;
2042
2043   size_t addrlen;
2044 };
2045
2046
2047 static int
2048 lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value)
2049 {
2050   struct LookupContext *l_ctx = cls;
2051   struct Session * s = value;
2052
2053   if ((s->addrlen == l_ctx->addrlen) &&
2054       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
2055   {
2056     l_ctx->res = s;
2057     return GNUNET_NO;
2058   }
2059   return GNUNET_YES;
2060 }
2061
2062
2063 /**
2064  * Transmit an acknowledgement.
2065  *
2066  * @param cls the 'struct ReceiveContext'
2067  * @param id message ID (unused)
2068  * @param msg ack to transmit
2069  */
2070 static void
2071 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
2072 {
2073   struct DefragContext *rc = cls;
2074   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
2075   struct UDP_ACK_Message *udp_ack;
2076   uint32_t delay = 0;
2077   struct UDP_MessageWrapper *udpw;
2078   struct Session *s;
2079   struct LookupContext l_ctx;
2080
2081   l_ctx.addr = rc->src_addr;
2082   l_ctx.addrlen = rc->addr_len;
2083   l_ctx.res = NULL;
2084   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
2085       &lookup_session_by_addr_it,
2086       &l_ctx);
2087   s = l_ctx.res;
2088
2089   if (NULL == s)
2090     return;
2091
2092   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
2093     delay = s->flow_delay_for_other_peer.rel_value;
2094
2095   LOG (GNUNET_ERROR_TYPE_DEBUG,
2096        "Sending ACK to `%s' including delay of %u ms\n",
2097        GNUNET_a2s (rc->src_addr,
2098                    (rc->src_addr->sa_family ==
2099                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
2100                                                                      sockaddr_in6)),
2101        delay);
2102   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2103   udpw->msg_size = msize;
2104   udpw->payload_size = 0;
2105   udpw->session = s;
2106   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2107   udpw->msg_buf = (char *)&udpw[1];
2108   udpw->msg_type = MSG_ACK;
2109   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2110   udp_ack->header.size = htons ((uint16_t) msize);
2111   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2112   udp_ack->delay = htonl (delay);
2113   udp_ack->sender = *rc->plugin->env->my_identity;
2114   memcpy (&udp_ack[1], msg, ntohs (msg->size));
2115   enqueue (rc->plugin, udpw);
2116 }
2117
2118
2119 static void 
2120 read_process_msg (struct Plugin *plugin,
2121                   const struct GNUNET_MessageHeader *msg,
2122                   const char *addr,
2123                   socklen_t fromlen)
2124 {
2125   if (ntohs (msg->size) < sizeof (struct UDPMessage))
2126   {
2127     GNUNET_break_op (0);
2128     return;
2129   }
2130   process_udp_message (plugin, (const struct UDPMessage *) msg,
2131                        (const struct sockaddr *) addr, fromlen);
2132 }
2133
2134
2135 static void 
2136 read_process_ack (struct Plugin *plugin,
2137                   const struct GNUNET_MessageHeader *msg,
2138                   char *addr,
2139                   socklen_t fromlen)
2140 {
2141   const struct GNUNET_MessageHeader *ack;
2142   const struct UDP_ACK_Message *udp_ack;
2143   struct LookupContext l_ctx;
2144   struct Session *s;
2145   struct GNUNET_TIME_Relative flow_delay;
2146
2147   if (ntohs (msg->size) <
2148       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
2149   {
2150     GNUNET_break_op (0);
2151     return;
2152   }
2153   udp_ack = (const struct UDP_ACK_Message *) msg;
2154   l_ctx.addr = (const struct sockaddr *) addr;
2155   l_ctx.addrlen = fromlen;
2156   l_ctx.res = NULL;
2157   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
2158                                          &lookup_session_by_addr_it,
2159                                          &l_ctx);
2160   s = l_ctx.res;
2161
2162   if ((NULL == s) || (NULL == s->frag_ctx))
2163   {
2164     return;
2165   }
2166
2167   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
2168   LOG (GNUNET_ERROR_TYPE_DEBUG, 
2169        "We received a sending delay of %llu\n",
2170        flow_delay.rel_value);
2171   s->flow_delay_from_other_peer =
2172       GNUNET_TIME_relative_to_absolute (flow_delay);
2173
2174   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2175   if (ntohs (ack->size) !=
2176       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
2177   {
2178     GNUNET_break_op (0);
2179     return;
2180   }
2181
2182   if (0 != memcmp (&l_ctx.res->target, &udp_ack->sender, sizeof (struct GNUNET_PeerIdentity)))
2183     GNUNET_break (0);
2184   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
2185   {
2186     LOG (GNUNET_ERROR_TYPE_DEBUG,
2187          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2188          (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2189          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2190     /* Expect more ACKs to arrive */
2191     return;
2192   }
2193
2194   LOG (GNUNET_ERROR_TYPE_DEBUG,
2195        "Message full ACK'ed\n",
2196        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2197        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2198
2199   /* Remove fragmented message after successful sending */
2200   fragmented_message_done (s->frag_ctx, GNUNET_OK);
2201 }
2202
2203
2204 static void 
2205 read_process_fragment (struct Plugin *plugin,
2206                        const struct GNUNET_MessageHeader *msg,
2207                        char *addr,
2208                        socklen_t fromlen)
2209 {
2210   struct DefragContext *d_ctx;
2211   struct GNUNET_TIME_Absolute now;
2212   struct FindReceiveContext frc;
2213
2214   frc.rc = NULL;
2215   frc.addr = (const struct sockaddr *) addr;
2216   frc.addr_len = fromlen;
2217
2218   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
2219        (unsigned int) ntohs (msg->size),
2220        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2221   /* Lookup existing receive context for this address */
2222   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2223                                  &find_receive_context,
2224                                  &frc);
2225   now = GNUNET_TIME_absolute_get ();
2226   d_ctx = frc.rc;
2227
2228   if (d_ctx == NULL)
2229   {
2230     /* Create a new defragmentation context */
2231     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
2232     memcpy (&d_ctx[1], addr, fromlen);
2233     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
2234     d_ctx->addr_len = fromlen;
2235     d_ctx->plugin = plugin;
2236     d_ctx->defrag =
2237         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
2238                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
2239                                           &fragment_msg_proc, &ack_proc);
2240     d_ctx->hnode =
2241         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
2242                                       (GNUNET_CONTAINER_HeapCostType)
2243                                       now.abs_value);
2244     LOG (GNUNET_ERROR_TYPE_DEBUG, 
2245          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2246          (unsigned int) ntohs (msg->size),
2247          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2248   }
2249   else
2250   {
2251     LOG (GNUNET_ERROR_TYPE_DEBUG,
2252          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2253          (unsigned int) ntohs (msg->size),
2254          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2255   }
2256
2257   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2258   {
2259     /* keep this 'rc' from expiring */
2260     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
2261                                        (GNUNET_CONTAINER_HeapCostType)
2262                                        now.abs_value);
2263   }
2264   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2265       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2266   {
2267     /* remove 'rc' that was inactive the longest */
2268     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2269     GNUNET_assert (NULL != d_ctx);
2270     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2271     GNUNET_free (d_ctx);
2272   }
2273 }
2274
2275
2276 /**
2277  * Read and process a message from the given socket.
2278  *
2279  * @param plugin the overall plugin
2280  * @param rsock socket to read from
2281  */
2282 static void
2283 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
2284 {
2285   socklen_t fromlen;
2286   char addr[32];
2287   char buf[65536] GNUNET_ALIGN;
2288   ssize_t size;
2289   const struct GNUNET_MessageHeader *msg;
2290
2291   fromlen = sizeof (addr);
2292   memset (&addr, 0, sizeof (addr));
2293   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
2294                                       (struct sockaddr *) &addr, &fromlen);
2295 #if MINGW
2296   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2297    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
2298    * on this socket has failed.
2299    * Quote from MSDN:
2300    *   WSAECONNRESET - The virtual circuit was reset by the remote side
2301    *   executing a hard or abortive close. The application should close
2302    *   the socket; it is no longer usable. On a UDP-datagram socket this
2303    *   error indicates a previous send operation resulted in an ICMP Port
2304    *   Unreachable message.
2305    */
2306   if ( (-1 == size) && (ECONNRESET == errno) )
2307     return;
2308 #endif
2309   if (-1 == size)
2310   {
2311     LOG (GNUNET_ERROR_TYPE_DEBUG,
2312         "UDP failed to receive data: %s\n", STRERROR (errno));
2313     /* Connection failure or something. Not a protocol violation. */
2314     return;
2315   }
2316   if (size < sizeof (struct GNUNET_MessageHeader))
2317   {
2318     LOG (GNUNET_ERROR_TYPE_WARNING,
2319         "UDP got %u bytes, which is not enough for a GNUnet message header\n",
2320         (unsigned int) size);
2321     /* _MAY_ be a connection failure (got partial message) */
2322     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2323     GNUNET_break_op (0);
2324     return;
2325   }
2326   msg = (const struct GNUNET_MessageHeader *) buf;
2327
2328   LOG (GNUNET_ERROR_TYPE_DEBUG,
2329        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
2330        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
2331
2332   if (size != ntohs (msg->size))
2333   {
2334     GNUNET_break_op (0);
2335     return;
2336   }
2337
2338   GNUNET_STATISTICS_update (plugin->env->stats,
2339                             "# UDP, total, bytes, received",
2340                             size, GNUNET_NO);
2341
2342   switch (ntohs (msg->type))
2343   {
2344   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2345     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
2346     return;
2347
2348   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2349     read_process_msg (plugin, msg, addr, fromlen);
2350     return;
2351
2352   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2353     read_process_ack (plugin, msg, addr, fromlen);
2354     return;
2355
2356   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2357     read_process_fragment (plugin, msg, addr, fromlen);
2358     return;
2359
2360   default:
2361     GNUNET_break_op (0);
2362     return;
2363   }
2364 }
2365
2366 static struct UDP_MessageWrapper *
2367 remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
2368                                     struct GNUNET_NETWORK_Handle *sock)
2369 {
2370   struct UDP_MessageWrapper *udpw = NULL;
2371   struct GNUNET_TIME_Relative remaining;
2372
2373   udpw = head;
2374   while (udpw != NULL)
2375   {
2376     /* Find messages with timeout */
2377     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2378     if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2379     {
2380       /* Message timed out */
2381       switch (udpw->msg_type) {
2382         case MSG_UNFRAGMENTED:
2383           GNUNET_STATISTICS_update (plugin->env->stats,
2384                                     "# UDP, total, bytes, sent, timeout",
2385                                     udpw->msg_size, GNUNET_NO);
2386           GNUNET_STATISTICS_update (plugin->env->stats,
2387                                     "# UDP, total, messages, sent, timeout",
2388                                     1, GNUNET_NO);
2389           GNUNET_STATISTICS_update (plugin->env->stats,
2390                                     "# UDP, unfragmented msgs, messages, sent, timeout",
2391                                     1, GNUNET_NO);
2392           GNUNET_STATISTICS_update (plugin->env->stats,
2393                                     "# UDP, unfragmented msgs, bytes, sent, timeout",
2394                                     udpw->payload_size, GNUNET_NO);
2395           /* Not fragmented message */
2396           LOG (GNUNET_ERROR_TYPE_DEBUG,
2397                "Message for peer `%s' with size %u timed out\n",
2398                GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2399           call_continuation (udpw, GNUNET_SYSERR);
2400           /* Remove message */
2401           dequeue (plugin, udpw);
2402           GNUNET_free (udpw);
2403           break;
2404         case MSG_FRAGMENTED:
2405           /* Fragmented message */
2406           GNUNET_STATISTICS_update (plugin->env->stats,
2407                                     "# UDP, total, bytes, sent, timeout",
2408                                     udpw->frag_ctx->on_wire_size, GNUNET_NO);
2409           GNUNET_STATISTICS_update (plugin->env->stats,
2410                                     "# UDP, total, messages, sent, timeout",
2411                                     1, GNUNET_NO);
2412           call_continuation (udpw, GNUNET_SYSERR);
2413           LOG (GNUNET_ERROR_TYPE_DEBUG,
2414                "Fragment for message for peer `%s' with size %u timed out\n",
2415                GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->payload_size);
2416
2417
2418           GNUNET_STATISTICS_update (plugin->env->stats,
2419                                     "# UDP, fragmented msgs, messages, sent, timeout",
2420                                     1, GNUNET_NO);
2421           GNUNET_STATISTICS_update (plugin->env->stats,
2422                                     "# UDP, fragmented msgs, bytes, sent, timeout",
2423                                     udpw->frag_ctx->payload_size, GNUNET_NO);
2424           /* Remove fragmented message due to timeout */
2425           fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
2426           break;
2427         case MSG_ACK:
2428           GNUNET_STATISTICS_update (plugin->env->stats,
2429                                     "# UDP, total, bytes, sent, timeout",
2430                                     udpw->msg_size, GNUNET_NO);
2431           GNUNET_STATISTICS_update (plugin->env->stats,
2432                                     "# UDP, total, messages, sent, timeout",
2433                                     1, GNUNET_NO);
2434           LOG (GNUNET_ERROR_TYPE_DEBUG,
2435                "ACK Message for peer `%s' with size %u timed out\n",
2436                GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2437           call_continuation (udpw, GNUNET_SYSERR);
2438           dequeue (plugin, udpw);
2439           GNUNET_free (udpw);
2440           break;
2441         default:
2442           break;
2443       }
2444       if (sock == plugin->sockv4)
2445         udpw = plugin->ipv4_queue_head;
2446       else if (sock == plugin->sockv6)
2447         udpw = plugin->ipv6_queue_head;
2448       else
2449       {
2450         GNUNET_break (0); /* should never happen */
2451         udpw = NULL;
2452       }
2453       GNUNET_STATISTICS_update (plugin->env->stats,
2454                                 "# messages dismissed due to timeout",
2455                                 1, GNUNET_NO);
2456     }
2457     else
2458     {
2459       /* Message did not time out, check flow delay */
2460       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2461       if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2462       {
2463         /* this message is not delayed */
2464         LOG (GNUNET_ERROR_TYPE_DEBUG, 
2465              "Message for peer `%s' (%u bytes) is not delayed \n",
2466              GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2467         break; /* Found message to send, break */
2468       }
2469       else
2470       {
2471         /* Message is delayed, try next */
2472         LOG (GNUNET_ERROR_TYPE_DEBUG,
2473              "Message for peer `%s' (%u bytes) is delayed for %llu \n",
2474              GNUNET_i2s(&udpw->session->target), udpw->payload_size, remaining.rel_value);
2475         udpw = udpw->next;
2476       }
2477     }
2478   }
2479   return udpw;
2480 }
2481
2482
2483 static void
2484 analyze_send_error (struct Plugin *plugin,
2485                     const struct sockaddr * sa,
2486                     socklen_t slen,
2487                     int error)
2488 {
2489   static int network_down_error;
2490   struct GNUNET_ATS_Information type;
2491
2492  type = plugin->env->get_address_type (plugin->env->cls,sa, slen);
2493  if (((GNUNET_ATS_NET_LAN == ntohl(type.value)) || (GNUNET_ATS_NET_WAN == ntohl(type.value))) &&
2494      ((ENETUNREACH == errno) || (ENETDOWN == errno)))
2495  {
2496    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in)))
2497    {
2498      /* IPv4: "Network unreachable" or "Network down"
2499       *
2500       * This indicates we do not have connectivity
2501       */
2502      LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2503          _("UDP could not transmit message to `%s': "
2504            "Network seems down, please check your network configuration\n"),
2505          GNUNET_a2s (sa, slen));
2506    }
2507    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in6)))
2508    {
2509      /* IPv6: "Network unreachable" or "Network down"
2510       *
2511       * This indicates that this system is IPv6 enabled, but does not
2512       * have a valid global IPv6 address assigned or we do not have
2513       * connectivity
2514       */
2515
2516     LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2517         _("UDP could not transmit message to `%s': "
2518           "Please check your network configuration and disable IPv6 if your "
2519           "connection does not have a global IPv6 address\n"),
2520         GNUNET_a2s (sa, slen));
2521    }
2522  }
2523  else
2524  {
2525    LOG (GNUNET_ERROR_TYPE_WARNING,
2526       "UDP could not transmit message to `%s': `%s'\n",
2527       GNUNET_a2s (sa, slen), STRERROR (error));
2528  }
2529 }
2530
2531 static size_t
2532 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
2533 {
2534   const struct sockaddr * sa;
2535   ssize_t sent;
2536   socklen_t slen;
2537
2538   struct UDP_MessageWrapper *udpw = NULL;
2539
2540   /* Find message to send */
2541   udpw = remove_timeout_messages_and_select ((sock == plugin->sockv4) ? plugin->ipv4_queue_head : plugin->ipv6_queue_head,
2542                                              sock);
2543   if (NULL == udpw)
2544     return 0; /* No message to send */
2545
2546   sa = udpw->session->sock_addr;
2547   slen = udpw->session->addrlen;
2548
2549   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->msg_buf, udpw->msg_size, sa, slen);
2550
2551   if (GNUNET_SYSERR == sent)
2552   {
2553     /* Failure */
2554     analyze_send_error (plugin, sa, slen, errno);
2555     call_continuation(udpw, GNUNET_SYSERR);
2556     GNUNET_STATISTICS_update (plugin->env->stats,
2557                             "# UDP, total, bytes, sent, failure",
2558                             sent, GNUNET_NO);
2559     GNUNET_STATISTICS_update (plugin->env->stats,
2560                               "# UDP, total, messages, sent, failure",
2561                               1, GNUNET_NO);
2562   }
2563   else
2564   {
2565     /* Success */
2566     LOG (GNUNET_ERROR_TYPE_DEBUG,
2567          "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
2568          (unsigned int) (udpw->msg_size), GNUNET_i2s(&udpw->session->target) ,GNUNET_a2s (sa, slen), (int) sent,
2569          (sent < 0) ? STRERROR (errno) : "ok");
2570     GNUNET_STATISTICS_update (plugin->env->stats,
2571                               "# UDP, total, bytes, sent, success",
2572                               sent, GNUNET_NO);
2573     GNUNET_STATISTICS_update (plugin->env->stats,
2574                               "# UDP, total, messages, sent, success",
2575                               1, GNUNET_NO);
2576     if (NULL != udpw->frag_ctx)
2577         udpw->frag_ctx->on_wire_size += udpw->msg_size;
2578     call_continuation (udpw, GNUNET_OK);
2579   }
2580   dequeue (plugin, udpw);
2581   GNUNET_free (udpw);
2582   udpw = NULL;
2583
2584   return sent;
2585 }
2586
2587
2588 /**
2589  * We have been notified that our readset has something to read.  We don't
2590  * know which socket needs to be read, so we have to check each one
2591  * Then reschedule this function to be called again once more is available.
2592  *
2593  * @param cls the plugin handle
2594  * @param tc the scheduling context (for rescheduling this function again)
2595  */
2596 static void
2597 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2598 {
2599   struct Plugin *plugin = cls;
2600
2601   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2602   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2603     return;
2604   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
2605        (NULL != plugin->sockv4) &&
2606        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)) )
2607     udp_select_read (plugin, plugin->sockv4);
2608   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2609        (NULL != plugin->sockv4) && 
2610        (NULL != plugin->ipv4_queue_head) &&
2611        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)) )
2612     udp_select_send (plugin, plugin->sockv4);   
2613   schedule_select (plugin);
2614 }
2615
2616
2617 /**
2618  * We have been notified that our readset has something to read.  We don't
2619  * know which socket needs to be read, so we have to check each one
2620  * Then reschedule this function to be called again once more is available.
2621  *
2622  * @param cls the plugin handle
2623  * @param tc the scheduling context (for rescheduling this function again)
2624  */
2625 static void
2626 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2627 {
2628   struct Plugin *plugin = cls;
2629
2630   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2631   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2632     return;
2633   if ( ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) &&
2634        (NULL != plugin->sockv6) &&
2635        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)) )
2636     udp_select_read (plugin, plugin->sockv6);
2637   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2638        (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
2639        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )    
2640     udp_select_send (plugin, plugin->sockv6);
2641   schedule_select (plugin);
2642 }
2643
2644
2645 /**
2646  *
2647  * @return number of sockets that were successfully bound
2648  */
2649 static int
2650 setup_sockets (struct Plugin *plugin, 
2651                const struct sockaddr_in6 *bind_v6,
2652                const struct sockaddr_in *bind_v4)
2653 {
2654   int tries;
2655   int sockets_created = 0;
2656   struct sockaddr_in6 serverAddrv6;
2657   struct sockaddr_in serverAddrv4;
2658   struct sockaddr *serverAddr;
2659   struct sockaddr *addrs[2];
2660   socklen_t addrlens[2];
2661   socklen_t addrlen;
2662   int eno;
2663
2664   /* Create IPv6 socket */
2665   eno = EINVAL;
2666   if (plugin->enable_ipv6 == GNUNET_YES)
2667   {
2668     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2669     if (NULL == plugin->sockv6)
2670     {
2671         GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2672       LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2673       plugin->enable_ipv6 = GNUNET_NO;
2674     }
2675     else
2676     {
2677         memset (&serverAddrv6, '\0', sizeof (struct sockaddr_in6));
2678 #if HAVE_SOCKADDR_IN_SIN_LEN
2679       serverAddrv6.sin6_len = sizeof (struct sockaddr_in6);
2680 #endif
2681       serverAddrv6.sin6_family = AF_INET6;
2682       if (NULL != bind_v6)
2683         serverAddrv6.sin6_addr = bind_v6->sin6_addr;
2684       else
2685         serverAddrv6.sin6_addr = in6addr_any;
2686
2687       if (0 == plugin->port) /* autodetect */
2688                 serverAddrv6.sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2689       else
2690                 serverAddrv6.sin6_port = htons (plugin->port);
2691       addrlen = sizeof (struct sockaddr_in6);
2692       serverAddr = (struct sockaddr *) &serverAddrv6;
2693
2694       tries = 0;
2695       while (tries < 10)
2696       {
2697                 LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 `%s'\n",
2698                                  GNUNET_a2s (serverAddr, addrlen));
2699                 /* binding */
2700                 if (GNUNET_OK == GNUNET_NETWORK_socket_bind (plugin->sockv6,
2701                                                              serverAddr, addrlen, 0))
2702                         break;
2703                 eno = errno;
2704                 if (0 != plugin->port)
2705                 {
2706                                 tries = 10; /* fail */
2707                                 break; /* bind failed on specific port */
2708                 }
2709                 /* autodetect */
2710                 serverAddrv6.sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2711                 tries ++;
2712       }
2713       if (tries >= 10)
2714       {
2715         GNUNET_NETWORK_socket_close (plugin->sockv6);
2716         plugin->enable_ipv6 = GNUNET_NO;
2717         plugin->sockv6 = NULL;
2718       }
2719
2720       if (plugin->sockv6 != NULL)
2721       {
2722         LOG (GNUNET_ERROR_TYPE_DEBUG,
2723              "IPv6 socket created on port %s\n",
2724              GNUNET_a2s (serverAddr, addrlen));
2725         addrs[sockets_created] = (struct sockaddr *) &serverAddrv6;
2726         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2727         sockets_created++;
2728       }
2729       else
2730       {
2731           LOG (GNUNET_ERROR_TYPE_ERROR,
2732                "Failed to bind UDP socket to %s: %s\n",
2733                GNUNET_a2s (serverAddr, addrlen),
2734                STRERROR (eno));
2735       }
2736     }
2737   }
2738
2739   /* Create IPv4 socket */
2740   eno = EINVAL;
2741   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2742   if (NULL == plugin->sockv4)
2743   {
2744     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2745     LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv4 since it is not supported on this system!\n");
2746     plugin->enable_ipv4 = GNUNET_NO;
2747   }
2748   else
2749   {
2750     memset (&serverAddrv4, '\0', sizeof (struct sockaddr_in));
2751 #if HAVE_SOCKADDR_IN_SIN_LEN
2752     serverAddrv4.sin_len = sizeof (struct sockaddr_in);
2753 #endif
2754     serverAddrv4.sin_family = AF_INET;
2755     if (NULL != bind_v4)
2756       serverAddrv4.sin_addr = bind_v4->sin_addr;
2757     else
2758       serverAddrv4.sin_addr.s_addr = INADDR_ANY;
2759     
2760     if (0 == plugin->port)
2761       /* autodetect */
2762       serverAddrv4.sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2763     else
2764       serverAddrv4.sin_port = htons (plugin->port);
2765     
2766     
2767     addrlen = sizeof (struct sockaddr_in);
2768     serverAddr = (struct sockaddr *) &serverAddrv4;
2769     
2770     tries = 0;
2771     while (tries < 10)
2772     {
2773       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 `%s'\n",
2774                         GNUNET_a2s (serverAddr, addrlen));
2775       
2776       /* binding */
2777       if (GNUNET_OK == GNUNET_NETWORK_socket_bind (plugin->sockv4,
2778                                                    serverAddr, addrlen, 0))
2779                 break;
2780       eno = errno;
2781       if (0 != plugin->port)
2782       {
2783                 tries = 10; /* fail */
2784                 break; /* bind failed on specific port */
2785       }
2786       
2787       /* autodetect */
2788       serverAddrv4.sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2789       tries ++;
2790     }
2791     
2792     if (tries >= 10)
2793     {
2794       GNUNET_NETWORK_socket_close (plugin->sockv4);
2795       plugin->enable_ipv4 = GNUNET_NO;
2796       plugin->sockv4 = NULL;
2797     }
2798     
2799     if (plugin->sockv4 != NULL)
2800     {
2801       LOG (GNUNET_ERROR_TYPE_DEBUG,
2802                 "IPv4 socket created on port %s\n", GNUNET_a2s (serverAddr, addrlen));
2803       addrs[sockets_created] = (struct sockaddr *) &serverAddrv4;
2804       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2805       sockets_created++;
2806     }
2807     else
2808     {             
2809       LOG (GNUNET_ERROR_TYPE_ERROR,
2810                 "Failed to bind UDP socket to %s: %s\n",
2811                 GNUNET_a2s (serverAddr, addrlen), STRERROR (eno));
2812     }
2813   }
2814   
2815   if (0 == sockets_created)
2816   {
2817                 LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2818                 return 0; /* No sockets created, return */
2819   }
2820
2821   /* Create file descriptors */
2822   if (plugin->enable_ipv4 == GNUNET_YES)
2823   {
2824                         plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2825                         plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2826                         GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2827                         GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2828                         if (NULL != plugin->sockv4)
2829                         {
2830                                 GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2831                                 GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2832                         }
2833   }
2834
2835   if (plugin->enable_ipv6 == GNUNET_YES)
2836   {
2837     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2838     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2839     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2840     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2841     if (NULL != plugin->sockv6)
2842     {
2843       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2844       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2845     }
2846   }
2847
2848   schedule_select (plugin);
2849   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2850                            GNUNET_NO, plugin->port,
2851                            sockets_created,
2852                            (const struct sockaddr **) addrs, addrlens,
2853                            &udp_nat_port_map_callback, NULL, plugin);
2854
2855   return sockets_created;
2856 }
2857
2858
2859 /**
2860  * The exported method. Makes the core api available via a global and
2861  * returns the udp transport API.
2862  *
2863  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2864  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2865  */
2866 void *
2867 libgnunet_plugin_transport_udp_init (void *cls)
2868 {
2869   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2870   struct GNUNET_TRANSPORT_PluginFunctions *api;
2871   struct Plugin *p;
2872   unsigned long long port;
2873   unsigned long long aport;
2874   unsigned long long broadcast;
2875   unsigned long long udp_max_bps;
2876   unsigned long long enable_v6;
2877   char * bind4_address;
2878   char * bind6_address;
2879   char * fancy_interval;
2880   struct GNUNET_TIME_Relative interval;
2881   struct sockaddr_in serverAddrv4;
2882   struct sockaddr_in6 serverAddrv6;
2883   int res;
2884   int have_bind4;
2885   int have_bind6;
2886
2887   if (NULL == env->receive)
2888   {
2889     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2890        initialze the plugin or the API */
2891     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2892     api->cls = NULL;
2893     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2894     api->address_to_string = &udp_address_to_string;
2895     api->string_to_address = &udp_string_to_address;
2896     return api;
2897   }
2898
2899   GNUNET_assert (NULL != env->stats);
2900
2901   /* Get port number: port == 0 : autodetect a port,
2902    *                                                                                            > 0 : use this port,
2903    *                                                              not given : 2086 default */
2904   if (GNUNET_OK !=
2905       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2906                                              &port))
2907     port = 2086;
2908   if (GNUNET_OK !=
2909       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2910                                              "ADVERTISED_PORT", &aport))
2911     aport = port;
2912   if (port > 65535)
2913   {
2914     LOG (GNUNET_ERROR_TYPE_WARNING,
2915          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2916          65535);
2917     return NULL;
2918   }
2919
2920   /* Protocols */
2921   if ((GNUNET_YES ==
2922        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2923                                              "DISABLEV6")))
2924     enable_v6 = GNUNET_NO;
2925   else
2926     enable_v6 = GNUNET_YES;
2927
2928   /* Addresses */
2929   have_bind4 = GNUNET_NO;
2930   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2931   if (GNUNET_YES ==
2932       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2933                                              "BINDTO", &bind4_address))
2934   {
2935     LOG (GNUNET_ERROR_TYPE_DEBUG,
2936          "Binding udp plugin to specific address: `%s'\n",
2937          bind4_address);
2938     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2939     {
2940       GNUNET_free (bind4_address);
2941       return NULL;
2942     }
2943     have_bind4 = GNUNET_YES;
2944   }
2945   GNUNET_free_non_null (bind4_address);
2946   have_bind6 = GNUNET_NO;
2947   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2948   if (GNUNET_YES ==
2949       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2950                                              "BINDTO6", &bind6_address))
2951   {
2952     LOG (GNUNET_ERROR_TYPE_DEBUG,
2953          "Binding udp plugin to specific address: `%s'\n",
2954          bind6_address);
2955     if (1 !=
2956         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2957     {
2958       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2959            bind6_address);
2960       GNUNET_free (bind6_address);
2961       return NULL;
2962     }
2963     have_bind6 = GNUNET_YES;
2964   }
2965   GNUNET_free_non_null (bind6_address);
2966
2967   /* Initialize my flags */
2968   myoptions = 0;
2969
2970   /* Enable neighbour discovery */
2971   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2972                                             "BROADCAST");
2973   if (broadcast == GNUNET_SYSERR)
2974     broadcast = GNUNET_NO;
2975
2976   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2977                                            "BROADCAST_INTERVAL", &fancy_interval))
2978   {
2979     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2980   }
2981   else
2982   {
2983      if (GNUNET_SYSERR == GNUNET_STRINGS_fancy_time_to_relative(fancy_interval, &interval))
2984      {
2985        interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
2986      }
2987      GNUNET_free (fancy_interval);
2988   }
2989
2990   /* Maximum datarate */
2991   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2992                                              "MAX_BPS", &udp_max_bps))
2993   {
2994     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2995   }
2996
2997   p = GNUNET_malloc (sizeof (struct Plugin));
2998   p->port = port;
2999   p->aport = aport;
3000   p->broadcast_interval = interval;
3001   p->enable_ipv6 = enable_v6;
3002   p->enable_ipv4 = GNUNET_YES; /* default */
3003   p->env = env;
3004   p->sessions = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
3005   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3006   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
3007   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3008                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
3009   plugin = p;
3010
3011   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
3012   res = setup_sockets (p, (GNUNET_YES == have_bind6) ? &serverAddrv6 : NULL,
3013                                                                                 (GNUNET_YES == have_bind4) ? &serverAddrv4 : NULL);
3014   if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
3015   {
3016     LOG (GNUNET_ERROR_TYPE_ERROR,
3017          _("Failed to create network sockets, plugin failed\n"));
3018     GNUNET_CONTAINER_multihashmap_destroy (p->sessions);
3019     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3020     GNUNET_SERVER_mst_destroy (p->mst);
3021     GNUNET_free (p);
3022     return NULL;
3023   }
3024   else if (broadcast == GNUNET_YES)
3025   {
3026     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
3027     setup_broadcast (p, &serverAddrv6, &serverAddrv4);
3028   }
3029
3030   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
3031   api->cls = p;
3032   api->send = NULL;
3033   api->disconnect = &udp_disconnect;
3034   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3035   api->address_to_string = &udp_address_to_string;
3036   api->string_to_address = &udp_string_to_address;
3037   api->check_address = &udp_plugin_check_address;
3038   api->get_session = &udp_plugin_get_session;
3039   api->send = &udp_plugin_send;
3040   api->get_network = &udp_get_network;
3041
3042   return api;
3043 }
3044
3045
3046 static int
3047 heap_cleanup_iterator (void *cls,
3048                        struct GNUNET_CONTAINER_HeapNode *
3049                        node, void *element,
3050                        GNUNET_CONTAINER_HeapCostType
3051                        cost)
3052 {
3053   struct DefragContext * d_ctx = element;
3054
3055   GNUNET_CONTAINER_heap_remove_node (node);
3056   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
3057   GNUNET_free (d_ctx);
3058
3059   return GNUNET_YES;
3060 }
3061
3062
3063 /**
3064  * The exported method. Makes the core api available via a global and
3065  * returns the udp transport API.
3066  *
3067  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
3068  * @return NULL
3069  */
3070 void *
3071 libgnunet_plugin_transport_udp_done (void *cls)
3072 {
3073   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3074   struct Plugin *plugin = api->cls;
3075
3076   if (NULL == plugin)
3077   {
3078     GNUNET_free (api);
3079     return NULL;
3080   }
3081
3082   stop_broadcast (plugin);
3083   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
3084   {
3085     GNUNET_SCHEDULER_cancel (plugin->select_task);
3086     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
3087   }
3088   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
3089   {
3090     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3091     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
3092   }
3093
3094   /* Closing sockets */
3095   if (GNUNET_YES ==plugin->enable_ipv4)
3096   {
3097                 if (plugin->sockv4 != NULL)
3098                 {
3099                         GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
3100                         plugin->sockv4 = NULL;
3101                 }
3102                 GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
3103                 GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
3104   }
3105   if (GNUNET_YES ==plugin->enable_ipv6)
3106   {
3107                 if (plugin->sockv6 != NULL)
3108                 {
3109                         GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
3110                         plugin->sockv6 = NULL;
3111
3112                         GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
3113                         GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
3114                 }
3115   }
3116   if (NULL != plugin->nat)
3117         GNUNET_NAT_unregister (plugin->nat);
3118
3119   if (plugin->defrag_ctxs != NULL)
3120   {
3121     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
3122         heap_cleanup_iterator, NULL);
3123     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
3124     plugin->defrag_ctxs = NULL;
3125   }
3126   if (plugin->mst != NULL)
3127   {
3128     GNUNET_SERVER_mst_destroy(plugin->mst);
3129     plugin->mst = NULL;
3130   }
3131
3132   /* Clean up leftover messages */
3133   struct UDP_MessageWrapper * udpw;
3134   udpw = plugin->ipv4_queue_head;
3135   while (udpw != NULL)
3136   {
3137     struct UDP_MessageWrapper *tmp = udpw->next;
3138     dequeue (plugin, udpw);
3139     call_continuation(udpw, GNUNET_SYSERR);
3140     GNUNET_free (udpw);
3141
3142     udpw = tmp;
3143   }
3144   udpw = plugin->ipv6_queue_head;
3145   while (udpw != NULL)
3146   {
3147     struct UDP_MessageWrapper *tmp = udpw->next;
3148     dequeue (plugin, udpw);
3149     call_continuation(udpw, GNUNET_SYSERR);
3150     GNUNET_free (udpw);
3151
3152     udpw = tmp;
3153   }
3154
3155   /* Clean up sessions */
3156   LOG (GNUNET_ERROR_TYPE_DEBUG,
3157        "Cleaning up sessions\n");
3158   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
3159   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
3160
3161   plugin->nat = NULL;
3162   GNUNET_free (plugin);
3163   GNUNET_free (api);
3164 #if DEBUG_MALLOC
3165   struct Allocation *allocation;
3166   while (NULL != ahead)
3167   {
3168       allocation = ahead;
3169       GNUNET_CONTAINER_DLL_remove (ahead, atail, allocation);
3170       GNUNET_free (allocation);
3171   }
3172   struct Allocator *allocator;
3173   while (NULL != aehead)
3174   {
3175       allocator = aehead;
3176       GNUNET_CONTAINER_DLL_remove (aehead, aetail, allocator);
3177       GNUNET_free (allocator);
3178   }
3179 #endif
3180   return NULL;
3181 }
3182
3183
3184 /* end of plugin_transport_udp.c */