fixed:
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2      This file is part of GNUnet
3      (C) 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 #define PLUGIN_NAME "udp"
46
47 /**
48  * Number of messages we can defragment in parallel.  We only really
49  * defragment 1 message at a time, but if messages get re-ordered, we
50  * may want to keep knowledge about the previous message to avoid
51  * discarding the current message in favor of a single fragment of a
52  * previous message.  3 should be good since we don't expect massive
53  * message reorderings with UDP.
54  */
55 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
56
57 /**
58  * We keep a defragmentation queue per sender address.  How many
59  * sender addresses do we support at the same time? Memory consumption
60  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
61  * value. (So 128 corresponds to 12 MB and should suffice for
62  * connecting to roughly 128 peers via UDP).
63  */
64 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
65
66 /**
67  * Closure for 'append_port'.
68  */
69 struct PrettyPrinterContext
70 {
71   /**
72    * Function to call with the result.
73    */
74   GNUNET_TRANSPORT_AddressStringCallback asc;
75
76   /**
77    * Clsoure for 'asc'.
78    */
79   void *asc_cls;
80
81   /**
82    * Port to add after the IP address.
83    */
84   uint16_t port;
85
86   /**
87    * IPv6 address
88    */
89
90   int ipv6;
91
92   /**
93    * Options
94    */
95   uint32_t options;
96 };
97
98
99 enum UDP_MessageType
100 {
101   UNDEFINED = 0,
102   MSG_FRAGMENTED = 1,
103   MSG_FRAGMENTED_COMPLETE = 2,
104   MSG_UNFRAGMENTED = 3,
105   MSG_ACK = 4,
106   MSG_BEACON = 5
107 };
108
109 struct Session
110 {
111   /**
112    * Which peer is this session for?
113    */
114   struct GNUNET_PeerIdentity target;
115
116   struct UDP_FragmentationContext * frag_ctx;
117
118   /**
119    * Address of the other peer
120    */
121   const struct sockaddr *sock_addr;
122
123   /**
124    * Desired delay for next sending we send to other peer
125    */
126   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
127
128   /**
129    * Desired delay for next sending we received from other peer
130    */
131   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
132
133   /**
134    * Session timeout task
135    */
136   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
137
138   /**
139    * expected delay for ACKs
140    */
141   struct GNUNET_TIME_Relative last_expected_ack_delay;
142
143   /**
144    * desired delay between UDP messages
145    */
146   struct GNUNET_TIME_Relative last_expected_msg_delay;
147
148   struct GNUNET_ATS_Information ats;
149
150   size_t addrlen;
151
152
153   unsigned int rc;
154
155   int in_destroy;
156
157   int inbound;
158 };
159
160
161 struct SessionCompareContext
162 {
163   struct Session *res;
164   const struct GNUNET_HELLO_Address *addr;
165 };
166
167
168 /**
169  * Closure for 'process_inbound_tokenized_messages'
170  */
171 struct SourceInformation
172 {
173   /**
174    * Sender identity.
175    */
176   struct GNUNET_PeerIdentity sender;
177
178   /**
179    * Source address.
180    */
181   const void *arg;
182
183   struct Session *session;
184   /**
185    * Number of bytes in source address.
186    */
187   size_t args;
188
189 };
190
191
192 /**
193  * Closure for 'find_receive_context'.
194  */
195 struct FindReceiveContext
196 {
197   /**
198    * Where to store the result.
199    */
200   struct DefragContext *rc;
201
202   /**
203    * Address to find.
204    */
205   const struct sockaddr *addr;
206
207   struct Session *session;
208
209   /**
210    * Number of bytes in 'addr'.
211    */
212   socklen_t addr_len;
213
214 };
215
216
217
218 /**
219  * Data structure to track defragmentation contexts based
220  * on the source of the UDP traffic.
221  */
222 struct DefragContext
223 {
224
225   /**
226    * Defragmentation context.
227    */
228   struct GNUNET_DEFRAGMENT_Context *defrag;
229
230   /**
231    * Source address this receive context is for (allocated at the
232    * end of the struct).
233    */
234   const struct sockaddr *src_addr;
235
236   /**
237    * Reference to master plugin struct.
238    */
239   struct Plugin *plugin;
240
241   /**
242    * Node in the defrag heap.
243    */
244   struct GNUNET_CONTAINER_HeapNode *hnode;
245
246   /**
247    * Length of 'src_addr'
248    */
249   size_t addr_len;
250 };
251
252
253
254 /**
255  * Context to send fragmented messages
256  */
257 struct UDP_FragmentationContext
258 {
259   /**
260    * Next in linked list
261    */
262   struct UDP_FragmentationContext * next;
263
264   /**
265    * Previous in linked list
266    */
267   struct UDP_FragmentationContext * prev;
268
269   /**
270    * The plugin
271    */
272   struct Plugin * plugin;
273
274   /**
275    * Handle for GNUNET_FRAGMENT context
276    */
277   struct GNUNET_FRAGMENT_Context * frag;
278
279   /**
280    * The session this fragmentation context belongs to
281    */
282   struct Session * session;
283
284   /**
285    * Function to call upon completion of the transmission.
286    */
287   GNUNET_TRANSPORT_TransmitContinuation cont;
288
289   /**
290    * Closure for 'cont'.
291    */
292   void *cont_cls;
293
294   /**
295    * Message timeout
296    */
297   struct GNUNET_TIME_Absolute timeout;
298
299   /**
300    * Payload size of original unfragmented message
301    */
302   size_t payload_size;
303
304   /**
305    * Bytes used to send all fragments on wire including UDP overhead
306    */
307   size_t on_wire_size;
308
309   unsigned int fragments_used;
310
311 };
312
313
314 struct UDP_MessageWrapper
315 {
316   /**
317    * Session this message belongs to
318    */
319   struct Session *session;
320
321   /**
322    * DLL of messages
323    * previous element
324    */
325   struct UDP_MessageWrapper *prev;
326
327   /**
328    * DLL of messages
329    * previous element
330    */
331   struct UDP_MessageWrapper *next;
332
333   /**
334    * Message type
335    * According to UDP_MessageType
336    */
337   int msg_type;
338
339   /**
340    * Message with size msg_size including UDP specific overhead
341    */
342   char *msg_buf;
343
344   /**
345    * Size of UDP message to send including UDP specific overhead
346    */
347   size_t msg_size;
348
349   /**
350    * Payload size of original message
351    */
352   size_t payload_size;
353
354   /**
355    * Message timeout
356    */
357   struct GNUNET_TIME_Absolute timeout;
358
359   /**
360    * Function to call upon completion of the transmission.
361    */
362   GNUNET_TRANSPORT_TransmitContinuation cont;
363
364   /**
365    * Closure for 'cont'.
366    */
367   void *cont_cls;
368
369   /**
370    * Fragmentation context
371    * frag_ctx == NULL if transport <= MTU
372    * frag_ctx != NULL if transport > MTU
373    */
374   struct UDP_FragmentationContext *frag_ctx;
375 };
376
377
378 /**
379  * UDP ACK Message-Packet header (after defragmentation).
380  */
381 struct UDP_ACK_Message
382 {
383   /**
384    * Message header.
385    */
386   struct GNUNET_MessageHeader header;
387
388   /**
389    * Desired delay for flow control
390    */
391   uint32_t delay;
392
393   /**
394    * What is the identity of the sender
395    */
396   struct GNUNET_PeerIdentity sender;
397
398 };
399
400 /**
401  * Address options
402  */
403 static uint32_t myoptions;
404
405
406 /**
407  * Encapsulation of all of the state of the plugin.
408  */
409 struct Plugin * plugin;
410
411
412 /**
413  * We have been notified that our readset has something to read.  We don't
414  * know which socket needs to be read, so we have to check each one
415  * Then reschedule this function to be called again once more is available.
416  *
417  * @param cls the plugin handle
418  * @param tc the scheduling context (for rescheduling this function again)
419  */
420 static void
421 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
422
423
424 /**
425  * We have been notified that our readset has something to read.  We don't
426  * know which socket needs to be read, so we have to check each one
427  * Then reschedule this function to be called again once more is available.
428  *
429  * @param cls the plugin handle
430  * @param tc the scheduling context (for rescheduling this function again)
431  */
432 static void
433 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
434
435
436 /**
437  * Start session timeout
438  */
439 static void
440 start_session_timeout (struct Session *s);
441
442 /**
443  * Increment session timeout due to activity
444  */
445 static void
446 reschedule_session_timeout (struct Session *s);
447
448 /**
449  * Cancel timeout
450  */
451 static void
452 stop_session_timeout (struct Session *s);
453
454 /**
455  * (re)schedule select tasks for this plugin.
456  *
457  * @param plugin plugin to reschedule
458  */
459 static void
460 schedule_select (struct Plugin *plugin)
461 {
462   struct GNUNET_TIME_Relative min_delay;
463   struct UDP_MessageWrapper *udpw;
464
465   if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
466   {
467     /* Find a message ready to send:
468      * Flow delay from other peer is expired or not set (0) */
469     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
470     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
471       min_delay = GNUNET_TIME_relative_min (min_delay,
472                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
473     
474     if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
475       GNUNET_SCHEDULER_cancel(plugin->select_task);
476
477     /* Schedule with:
478      * - write active set if message is ready
479      * - timeout minimum delay */
480     plugin->select_task =
481       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
482                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
483                                    plugin->rs_v4,
484                                    (0 == min_delay.rel_value) ? plugin->ws_v4 : NULL,
485                                    &udp_plugin_select, plugin);  
486   }
487   if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
488   {
489     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
490     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
491       min_delay = GNUNET_TIME_relative_min (min_delay,
492                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
493     
494     if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task_v6)
495       GNUNET_SCHEDULER_cancel(plugin->select_task_v6);
496     plugin->select_task_v6 =
497       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
498                                    (0 == min_delay.rel_value) ? GNUNET_TIME_UNIT_FOREVER_REL : min_delay,
499                                    plugin->rs_v6,
500                                    (0 == min_delay.rel_value) ? plugin->ws_v6 : NULL,
501                                    &udp_plugin_select_v6, plugin);
502   }
503 }
504
505
506 /**
507  * Function called for a quick conversion of the binary address to
508  * a numeric address.  Note that the caller must not free the
509  * address and that the next call to this function is allowed
510  * to override the address again.
511  *
512  * @param cls closure
513  * @param addr binary address
514  * @param addrlen length of the address
515  * @return string representing the same address
516  */
517 const char *
518 udp_address_to_string (void *cls, const void *addr, size_t addrlen)
519 {
520   static char rbuf[INET6_ADDRSTRLEN + 10];
521   char buf[INET6_ADDRSTRLEN];
522   const void *sb;
523   struct in_addr a4;
524   struct in6_addr a6;
525   const struct IPv4UdpAddress *t4;
526   const struct IPv6UdpAddress *t6;
527   int af;
528   uint16_t port;
529   uint32_t options;
530
531   options = 0;
532   if (addrlen == sizeof (struct IPv6UdpAddress))
533   {
534     t6 = addr;
535     af = AF_INET6;
536     options = ntohl (t6->options);
537     port = ntohs (t6->u6_port);
538     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
539     sb = &a6;
540   }
541   else if (addrlen == sizeof (struct IPv4UdpAddress))
542   {
543     t4 = addr;
544     af = AF_INET;
545     options = ntohl (t4->options);
546     port = ntohs (t4->u4_port);
547     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
548     sb = &a4;
549   }
550   else if (addrlen == 0)
551         {
552                 GNUNET_snprintf (rbuf, sizeof (rbuf), "%s", "<inbound>");
553                 return rbuf;
554         }
555   else
556   {
557     GNUNET_break_op (0);
558     return NULL;
559   }
560   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
561
562   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "%s.%u.[%s]:%u" : "%s.%u.%s:%u",
563                    PLUGIN_NAME, options, buf, port);
564   return rbuf;
565 }
566
567
568 /**
569  * Function called to convert a string address to
570  * a binary address.
571  *
572  * @param cls closure ('struct Plugin*')
573  * @param addr string address
574  * @param addrlen length of the address
575  * @param buf location to store the buffer
576  * @param added location to store the number of bytes in the buffer.
577  *        If the function returns GNUNET_SYSERR, its contents are undefined.
578  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
579  */
580 static int
581 udp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
582     void **buf, size_t *added)
583 {
584   struct sockaddr_storage socket_address;
585   char *address;
586   char *plugin;
587   char *optionstr;
588   uint32_t options;
589
590   /* Format tcp.options.address:port */
591   address = NULL;
592   plugin = NULL;
593   optionstr = NULL;
594   options = 0;
595   if ((NULL == addr) || (addrlen == 0))
596   {
597     GNUNET_break (0);
598     return GNUNET_SYSERR;
599   }
600   if ('\0' != addr[addrlen - 1])
601   {
602     GNUNET_break (0);
603     return GNUNET_SYSERR;
604   }
605   if (strlen (addr) != addrlen - 1)
606   {
607     GNUNET_break (0);
608     return GNUNET_SYSERR;
609   }
610   plugin = GNUNET_strdup (addr);
611   optionstr = strchr (plugin, '.');
612   if (NULL == optionstr)
613   {
614     GNUNET_break (0);
615     GNUNET_free (plugin);
616     return GNUNET_SYSERR;
617   }
618   optionstr[0] = '\0';
619   optionstr ++;
620   options = atol (optionstr);
621   address = strchr (optionstr, '.');
622   if (NULL == address)
623   {
624     GNUNET_break (0);
625     GNUNET_free (plugin);
626     return GNUNET_SYSERR;
627   }
628   address[0] = '\0';
629   address ++;
630
631   if (GNUNET_OK !=
632       GNUNET_STRINGS_to_address_ip (address, strlen (address),
633                                     &socket_address))
634   {
635     GNUNET_break (0);
636     GNUNET_free (plugin);
637     return GNUNET_SYSERR;
638   }
639
640   GNUNET_free (plugin);
641
642   switch (socket_address.ss_family)
643   {
644   case AF_INET:
645     {
646       struct IPv4UdpAddress *u4;
647       struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
648       u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress));
649       u4->options =  htonl (options);
650       u4->ipv4_addr = in4->sin_addr.s_addr;
651       u4->u4_port = in4->sin_port;
652       *buf = u4;
653       *added = sizeof (struct IPv4UdpAddress);
654       return GNUNET_OK;
655     }
656   case AF_INET6:
657     {
658       struct IPv6UdpAddress *u6;
659       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
660       u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress));
661       u6->options =  htonl (options);
662       u6->ipv6_addr = in6->sin6_addr;
663       u6->u6_port = in6->sin6_port;
664       *buf = u6;
665       *added = sizeof (struct IPv6UdpAddress);
666       return GNUNET_OK;
667     }
668   default:
669     GNUNET_break (0);
670     return GNUNET_SYSERR;
671   }
672 }
673
674
675 /**
676  * Append our port and forward the result.
677  *
678  * @param cls a 'struct PrettyPrinterContext'
679  * @param hostname result from DNS resolver
680  */
681 static void
682 append_port (void *cls, const char *hostname)
683 {
684   struct PrettyPrinterContext *ppc = cls;
685   char *ret;
686
687   if (hostname == NULL)
688   {
689     ppc->asc (ppc->asc_cls, NULL);
690     GNUNET_free (ppc);
691     return;
692   }
693   if (GNUNET_YES == ppc->ipv6)
694     GNUNET_asprintf (&ret, "%s.%u.[%s]:%d", PLUGIN_NAME, ppc->options, hostname, ppc->port);
695   else
696     GNUNET_asprintf (&ret, "%s.%u.%s:%d", PLUGIN_NAME, ppc->options, hostname, ppc->port);
697   ppc->asc (ppc->asc_cls, ret);
698   GNUNET_free (ret);
699 }
700
701
702 /**
703  * Convert the transports address to a nice, human-readable
704  * format.
705  *
706  * @param cls closure
707  * @param type name of the transport that generated the address
708  * @param addr one of the addresses of the host, NULL for the last address
709  *        the specific address format depends on the transport
710  * @param addrlen length of the address
711  * @param numeric should (IP) addresses be displayed in numeric form?
712  * @param timeout after how long should we give up?
713  * @param asc function to call on each string
714  * @param asc_cls closure for asc
715  */
716 static void
717 udp_plugin_address_pretty_printer (void *cls, const char *type,
718                                    const void *addr, size_t addrlen,
719                                    int numeric,
720                                    struct GNUNET_TIME_Relative timeout,
721                                    GNUNET_TRANSPORT_AddressStringCallback asc,
722                                    void *asc_cls)
723 {
724   struct PrettyPrinterContext *ppc;
725   const void *sb;
726   size_t sbs;
727   struct sockaddr_in a4;
728   struct sockaddr_in6 a6;
729   const struct IPv4UdpAddress *u4;
730   const struct IPv6UdpAddress *u6;
731   uint16_t port;
732   uint32_t options;
733
734   options = 0;
735   if (addrlen == sizeof (struct IPv6UdpAddress))
736   {
737     u6 = addr;
738     memset (&a6, 0, sizeof (a6));
739     a6.sin6_family = AF_INET6;
740 #if HAVE_SOCKADDR_IN_SIN_LEN
741     a6.sin6_len = sizeof (a6);
742 #endif
743     a6.sin6_port = u6->u6_port;
744     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof (struct in6_addr));
745     port = ntohs (u6->u6_port);
746     options = ntohl (u6->options);
747     sb = &a6;
748     sbs = sizeof (a6);
749   }
750   else if (addrlen == sizeof (struct IPv4UdpAddress))
751   {
752     u4 = addr;
753     memset (&a4, 0, sizeof (a4));
754     a4.sin_family = AF_INET;
755 #if HAVE_SOCKADDR_IN_SIN_LEN
756     a4.sin_len = sizeof (a4);
757 #endif
758     a4.sin_port = u4->u4_port;
759     a4.sin_addr.s_addr = u4->ipv4_addr;
760     port = ntohs (u4->u4_port);
761     options = ntohl (u4->options);
762     sb = &a4;
763     sbs = sizeof (a4);
764   }
765   else if (0 == addrlen)
766   {
767     asc (asc_cls, "<inbound connection>");
768     asc (asc_cls, NULL);
769     return;
770   }
771   else
772   {
773     /* invalid address */
774     GNUNET_break_op (0);
775     asc (asc_cls, NULL);
776     return;
777   }
778   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
779   ppc->asc = asc;
780   ppc->asc_cls = asc_cls;
781   ppc->port = port;
782   ppc->options = options;
783   if (addrlen == sizeof (struct IPv6UdpAddress))
784     ppc->ipv6 = GNUNET_YES;
785   else
786     ppc->ipv6 = GNUNET_NO;
787   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
788 }
789
790
791 static void
792 call_continuation (struct UDP_MessageWrapper *udpw, int result)
793 {
794   size_t overhead;
795
796   LOG (GNUNET_ERROR_TYPE_DEBUG,
797       "Calling continuation for %u byte message to `%s' with result %s\n",
798       udpw->payload_size, GNUNET_i2s (&udpw->session->target),
799       (GNUNET_OK == result) ? "OK" : "SYSERR");
800
801   if (udpw->msg_size >= udpw->payload_size)
802     overhead = udpw->msg_size - udpw->payload_size;
803   else
804     overhead = udpw->msg_size;
805
806   switch (result) {
807     case GNUNET_OK:
808       switch (udpw->msg_type) {
809         case MSG_UNFRAGMENTED:
810           if (NULL != udpw->cont)
811           {
812             /* Transport continuation */
813             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
814                       udpw->payload_size, udpw->msg_size);
815           }
816           GNUNET_STATISTICS_update (plugin->env->stats,
817                                     "# UDP, unfragmented msgs, messages, sent, success",
818                                     1, GNUNET_NO);
819           GNUNET_STATISTICS_update (plugin->env->stats,
820                                     "# UDP, unfragmented msgs, bytes payload, sent, success",
821                                     udpw->payload_size, GNUNET_NO);
822           GNUNET_STATISTICS_update (plugin->env->stats,
823                                     "# UDP, unfragmented msgs, bytes overhead, sent, success",
824                                     overhead, GNUNET_NO);
825           GNUNET_STATISTICS_update (plugin->env->stats,
826                                     "# UDP, total, bytes overhead, sent",
827                                     overhead, GNUNET_NO);
828           GNUNET_STATISTICS_update (plugin->env->stats,
829                                     "# UDP, total, bytes payload, sent",
830                                     udpw->payload_size, GNUNET_NO);
831           break;
832         case MSG_FRAGMENTED_COMPLETE:
833           GNUNET_assert (NULL != udpw->frag_ctx);
834           if (udpw->frag_ctx->cont != NULL)
835             udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_OK,
836                                udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
837           GNUNET_STATISTICS_update (plugin->env->stats,
838                                     "# UDP, fragmented msgs, messages, sent, success",
839                                     1, GNUNET_NO);
840           GNUNET_STATISTICS_update (plugin->env->stats,
841                                     "# UDP, fragmented msgs, bytes payload, sent, success",
842                                     udpw->payload_size, GNUNET_NO);
843           GNUNET_STATISTICS_update (plugin->env->stats,
844                                     "# UDP, fragmented msgs, bytes overhead, sent, success",
845                                     overhead, GNUNET_NO);
846           GNUNET_STATISTICS_update (plugin->env->stats,
847                                     "# UDP, total, bytes overhead, sent",
848                                     overhead, GNUNET_NO);
849           GNUNET_STATISTICS_update (plugin->env->stats,
850                                     "# UDP, total, bytes payload, sent",
851                                     udpw->payload_size, GNUNET_NO);
852           GNUNET_STATISTICS_update (plugin->env->stats,
853                                     "# UDP, fragmented msgs, messages, pending",
854                                     -1, GNUNET_NO);
855           break;
856         case MSG_FRAGMENTED:
857           /* Fragmented message: enqueue next fragment */
858           if (NULL != udpw->cont)
859             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
860                       udpw->payload_size, udpw->msg_size);
861           GNUNET_STATISTICS_update (plugin->env->stats,
862                                     "# UDP, fragmented msgs, fragments, sent, success",
863                                     1, GNUNET_NO);
864           GNUNET_STATISTICS_update (plugin->env->stats,
865                                     "# UDP, fragmented msgs, fragments bytes, sent, success",
866                                     udpw->msg_size, GNUNET_NO);
867           break;
868         case MSG_ACK:
869           /* No continuation */
870           GNUNET_STATISTICS_update (plugin->env->stats,
871                                     "# UDP, ACK msgs, messages, sent, success",
872                                     1, GNUNET_NO);
873           GNUNET_STATISTICS_update (plugin->env->stats,
874                                     "# UDP, ACK msgs, bytes overhead, sent, success",
875                                     overhead, GNUNET_NO);
876           GNUNET_STATISTICS_update (plugin->env->stats,
877                                     "# UDP, total, bytes overhead, sent",
878                                     overhead, GNUNET_NO);
879           break;
880         case MSG_BEACON:
881           GNUNET_break (0);
882           break;
883         default:
884           LOG (GNUNET_ERROR_TYPE_ERROR,
885               "ERROR: %u\n", udpw->msg_type);
886           GNUNET_break (0);
887           break;
888       }
889       break;
890     case GNUNET_SYSERR:
891       switch (udpw->msg_type) {
892         case MSG_UNFRAGMENTED:
893           /* Unfragmented message: failed to send */
894           if (NULL != udpw->cont)
895             udpw->cont (udpw->cont_cls, &udpw->session->target, result,
896                       udpw->payload_size, overhead);
897           GNUNET_STATISTICS_update (plugin->env->stats,
898                                   "# UDP, unfragmented msgs, messages, sent, failure",
899                                   1, GNUNET_NO);
900           GNUNET_STATISTICS_update (plugin->env->stats,
901                                     "# UDP, unfragmented msgs, bytes payload, sent, failure",
902                                     udpw->payload_size, GNUNET_NO);
903           GNUNET_STATISTICS_update (plugin->env->stats,
904                                     "# UDP, unfragmented msgs, bytes overhead, sent, failure",
905                                     overhead, GNUNET_NO);
906           break;
907         case MSG_FRAGMENTED_COMPLETE:
908           GNUNET_assert (NULL != udpw->frag_ctx);
909           if (udpw->frag_ctx->cont != NULL)
910             udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, GNUNET_SYSERR,
911                                udpw->frag_ctx->payload_size, udpw->frag_ctx->on_wire_size);
912           GNUNET_STATISTICS_update (plugin->env->stats,
913                                     "# UDP, fragmented msgs, messages, sent, failure",
914                                     1, GNUNET_NO);
915           GNUNET_STATISTICS_update (plugin->env->stats,
916                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
917                                     udpw->payload_size, GNUNET_NO);
918           GNUNET_STATISTICS_update (plugin->env->stats,
919                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
920                                     overhead, GNUNET_NO);
921           GNUNET_STATISTICS_update (plugin->env->stats,
922                                     "# UDP, fragmented msgs, bytes payload, sent, failure",
923                                     overhead, GNUNET_NO);
924           GNUNET_STATISTICS_update (plugin->env->stats,
925                                     "# UDP, fragmented msgs, messages, pending",
926                                     -1, GNUNET_NO);
927           break;
928         case MSG_FRAGMENTED:
929           GNUNET_assert (NULL != udpw->frag_ctx);
930           /* Fragmented message: failed to send */
931           GNUNET_STATISTICS_update (plugin->env->stats,
932                                     "# UDP, fragmented msgs, fragments, sent, failure",
933                                     1, GNUNET_NO);
934           GNUNET_STATISTICS_update (plugin->env->stats,
935                                     "# UDP, fragmented msgs, fragments bytes, sent, failure",
936                                     udpw->msg_size, GNUNET_NO);
937           break;
938         case MSG_ACK:
939           /* ACK message: failed to send */
940           GNUNET_STATISTICS_update (plugin->env->stats,
941                                     "# UDP, ACK msgs, messages, sent, failure",
942                                     1, GNUNET_NO);
943           break;
944         case MSG_BEACON:
945           /* Beacon message: failed to send */
946           GNUNET_break (0);
947           break;
948         default:
949           GNUNET_break (0);
950           break;
951       }
952       break;
953     default:
954       GNUNET_break (0);
955       break;
956   }
957 }
958
959
960 /**
961  * Check if the given port is plausible (must be either our listen
962  * port or our advertised port).  If it is neither, we return
963  * GNUNET_SYSERR.
964  *
965  * @param plugin global variables
966  * @param in_port port number to check
967  * @return GNUNET_OK if port is either open_port or adv_port
968  */
969 static int
970 check_port (struct Plugin *plugin, uint16_t in_port)
971 {
972   if ((in_port == plugin->port) || (in_port == plugin->aport))
973     return GNUNET_OK;
974   return GNUNET_SYSERR;
975 }
976
977
978 /**
979  * Function that will be called to check if a binary address for this
980  * plugin is well-formed and corresponds to an address for THIS peer
981  * (as per our configuration).  Naturally, if absolutely necessary,
982  * plugins can be a bit conservative in their answer, but in general
983  * plugins should make sure that the address does not redirect
984  * traffic to a 3rd party that might try to man-in-the-middle our
985  * traffic.
986  *
987  * @param cls closure, should be our handle to the Plugin
988  * @param addr pointer to the address
989  * @param addrlen length of addr
990  * @return GNUNET_OK if this is a plausible address for this peer
991  *         and transport, GNUNET_SYSERR if not
992  *
993  */
994 static int
995 udp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
996 {
997   struct Plugin *plugin = cls;
998   struct IPv4UdpAddress *v4;
999   struct IPv6UdpAddress *v6;
1000
1001   if ((addrlen != sizeof (struct IPv4UdpAddress)) &&
1002       (addrlen != sizeof (struct IPv6UdpAddress)))
1003   {
1004     GNUNET_break_op (0);
1005     return GNUNET_SYSERR;
1006   }
1007   if (addrlen == sizeof (struct IPv4UdpAddress))
1008   {
1009     v4 = (struct IPv4UdpAddress *) addr;
1010     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1011       return GNUNET_SYSERR;
1012     if (GNUNET_OK !=
1013         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
1014                                  sizeof (struct in_addr)))
1015       return GNUNET_SYSERR;
1016   }
1017   else
1018   {
1019     v6 = (struct IPv6UdpAddress *) addr;
1020     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1021     {
1022       GNUNET_break_op (0);
1023       return GNUNET_SYSERR;
1024     }
1025     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
1026       return GNUNET_SYSERR;
1027     if (GNUNET_OK !=
1028         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
1029                                  sizeof (struct in6_addr)))
1030       return GNUNET_SYSERR;
1031   }
1032   return GNUNET_OK;
1033 }
1034
1035
1036 /**
1037  * Task to free resources associated with a session.
1038  *
1039  * @param s session to free
1040  */
1041 static void
1042 free_session (struct Session *s)
1043 {
1044   if (NULL != s->frag_ctx)
1045   {
1046     GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag, NULL, NULL);
1047     GNUNET_free (s->frag_ctx);
1048     s->frag_ctx = NULL;
1049   }
1050   GNUNET_free (s);
1051 }
1052
1053
1054 static void
1055 dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1056 {
1057   if (plugin->bytes_in_buffer < udpw->msg_size)
1058       GNUNET_break (0);
1059   else
1060   {
1061     GNUNET_STATISTICS_update (plugin->env->stats,
1062                               "# UDP, total, bytes in buffers",
1063                               - (long long) udpw->msg_size, GNUNET_NO);
1064     plugin->bytes_in_buffer -= udpw->msg_size;
1065   }
1066   GNUNET_STATISTICS_update (plugin->env->stats,
1067                             "# UDP, total, msgs in buffers",
1068                             -1, GNUNET_NO);
1069   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1070     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1071                                  plugin->ipv4_queue_tail, udpw);
1072   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1073     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1074                                  plugin->ipv6_queue_tail, udpw);
1075 }
1076
1077 static void
1078 fragmented_message_done (struct UDP_FragmentationContext *fc, int result)
1079 {
1080   struct UDP_MessageWrapper *udpw;
1081   struct UDP_MessageWrapper *tmp;
1082   struct UDP_MessageWrapper dummy;
1083   struct Session *s = fc->session;
1084
1085   LOG (GNUNET_ERROR_TYPE_DEBUG, "%p : Fragmented message removed with result %s\n", fc, (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1086   
1087   /* Call continuation for fragmented message */
1088   memset (&dummy, 0, sizeof (dummy));
1089   dummy.msg_type = MSG_FRAGMENTED_COMPLETE;
1090   dummy.msg_size = s->frag_ctx->on_wire_size;
1091   dummy.payload_size = s->frag_ctx->payload_size;
1092   dummy.frag_ctx = s->frag_ctx;
1093   dummy.cont = NULL;
1094   dummy.cont_cls = NULL;
1095   dummy.session = s;
1096
1097   call_continuation (&dummy, result);
1098
1099   /* Remove leftover fragments from queue */
1100   if (s->addrlen == sizeof (struct sockaddr_in6))
1101   {
1102     udpw = plugin->ipv6_queue_head;
1103     while (NULL != udpw)
1104     {
1105       tmp = udpw->next;
1106       if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
1107       {
1108         dequeue (plugin, udpw);
1109         call_continuation (udpw, GNUNET_SYSERR);
1110         GNUNET_free (udpw);
1111       }
1112       udpw = tmp;
1113     }
1114   }
1115   if (s->addrlen == sizeof (struct sockaddr_in))
1116   {
1117     udpw = plugin->ipv4_queue_head;
1118     while (udpw!= NULL)
1119     {
1120       tmp = udpw->next;
1121       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
1122       {
1123         dequeue (plugin, udpw);
1124         call_continuation (udpw, GNUNET_SYSERR);
1125         GNUNET_free (udpw);
1126       }
1127       udpw = tmp;
1128     }
1129   }
1130
1131   /* Destroy fragmentation context */
1132   GNUNET_FRAGMENT_context_destroy (fc->frag,
1133                                      &s->last_expected_msg_delay,
1134                                      &s->last_expected_ack_delay);
1135   s->frag_ctx = NULL;
1136   GNUNET_free (fc );
1137 }
1138
1139 /**
1140  * Functions with this signature are called whenever we need
1141  * to close a session due to a disconnect or failure to
1142  * establish a connection.
1143  *
1144  * @param s session to close down
1145  */
1146 static void
1147 disconnect_session (struct Session *s)
1148 {
1149   struct UDP_MessageWrapper *udpw;
1150   struct UDP_MessageWrapper *next;
1151
1152   GNUNET_assert (GNUNET_YES != s->in_destroy);
1153   LOG (GNUNET_ERROR_TYPE_DEBUG,
1154        "Session %p to peer `%s' address ended \n",
1155          s,
1156          GNUNET_i2s (&s->target),
1157          GNUNET_a2s (s->sock_addr, s->addrlen));
1158   stop_session_timeout (s);
1159
1160   if (NULL != s->frag_ctx)
1161   {
1162     /* Remove fragmented message due to disconnect */
1163     fragmented_message_done (s->frag_ctx, GNUNET_SYSERR);
1164   }
1165
1166   next = plugin->ipv4_queue_head;
1167   while (NULL != (udpw = next))
1168   {
1169     next = udpw->next;
1170     if (udpw->session == s)
1171     {
1172       dequeue (plugin, udpw);
1173       call_continuation(udpw, GNUNET_SYSERR);
1174       GNUNET_free (udpw);
1175     }
1176   }
1177   next = plugin->ipv6_queue_head;
1178   while (NULL != (udpw = next))
1179   {
1180     next = udpw->next;
1181     if (udpw->session == s)
1182     {
1183       dequeue (plugin, udpw);
1184       call_continuation(udpw, GNUNET_SYSERR);
1185       GNUNET_free (udpw);
1186     }
1187     udpw = next;
1188   }
1189   plugin->env->session_end (plugin->env->cls, &s->target, s);
1190
1191   if (NULL != s->frag_ctx)
1192   {
1193     if (NULL != s->frag_ctx->cont)
1194     {
1195       s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR,
1196                          s->frag_ctx->payload_size, s->frag_ctx->on_wire_size);
1197       LOG (GNUNET_ERROR_TYPE_DEBUG,
1198           "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1199           GNUNET_i2s (&s->target));
1200     }
1201   }
1202
1203   GNUNET_assert (GNUNET_YES ==
1204                  GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
1205                                                        &s->target.hashPubKey,
1206                                                        s));
1207   GNUNET_STATISTICS_set(plugin->env->stats,
1208                         "# UDP, sessions active",
1209                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1210                         GNUNET_NO);
1211   if (s->rc > 0)
1212     s->in_destroy = GNUNET_YES;
1213   else
1214     free_session (s);
1215 }
1216
1217 /**
1218  * Destroy a session, plugin is being unloaded.
1219  *
1220  * @param cls unused
1221  * @param key hash of public key of target peer
1222  * @param value a 'struct PeerSession*' to clean up
1223  * @return GNUNET_OK (continue to iterate)
1224  */
1225 static int
1226 disconnect_and_free_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1227 {
1228   disconnect_session(value);
1229   return GNUNET_OK;
1230 }
1231
1232
1233 /**
1234  * Disconnect from a remote node.  Clean up session if we have one for this peer
1235  *
1236  * @param cls closure for this call (should be handle to Plugin)
1237  * @param target the peeridentity of the peer to disconnect
1238  * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
1239  */
1240 static void
1241 udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
1242 {
1243   struct Plugin *plugin = cls;
1244   GNUNET_assert (plugin != NULL);
1245
1246   GNUNET_assert (target != NULL);
1247   LOG (GNUNET_ERROR_TYPE_DEBUG,
1248        "Disconnecting from peer `%s'\n", GNUNET_i2s (target));
1249   /* Clean up sessions */
1250   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin);
1251 }
1252
1253
1254 /**
1255  * Session was idle, so disconnect it
1256  */
1257 static void
1258 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1259 {
1260   GNUNET_assert (NULL != cls);
1261   struct Session *s = cls;
1262
1263   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1264   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1265               "Session %p was idle for %llu ms, disconnecting\n",
1266               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1267   /* call session destroy function */
1268   disconnect_session (s);
1269 }
1270
1271
1272 /**
1273  * Start session timeout
1274  */
1275 static void
1276 start_session_timeout (struct Session *s)
1277 {
1278   GNUNET_assert (NULL != s);
1279   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
1280   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1281                                                    &session_timeout,
1282                                                    s);
1283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284               "Timeout for session %p set to %llu ms\n",
1285               s,  (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1286 }
1287
1288
1289 /**
1290  * Increment session timeout due to activity
1291  */
1292 static void
1293 reschedule_session_timeout (struct Session *s)
1294 {
1295   GNUNET_assert (NULL != s);
1296   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1297
1298   GNUNET_SCHEDULER_cancel (s->timeout_task);
1299   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1300                                                    &session_timeout,
1301                                                    s);
1302   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303               "Timeout rescheduled for session %p set to %llu ms\n",
1304               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1305 }
1306
1307
1308 /**
1309  * Cancel timeout
1310  */
1311 static void
1312 stop_session_timeout (struct Session *s)
1313 {
1314   GNUNET_assert (NULL != s);
1315
1316   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
1317   {
1318     GNUNET_SCHEDULER_cancel (s->timeout_task);
1319     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1320     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1321                 "Timeout stopped for session %p canceled\n",
1322                 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1323   }
1324 }
1325
1326
1327 static struct Session *
1328 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
1329                 const void *addr, size_t addrlen,
1330                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1331 {
1332   struct Session *s;
1333   const struct IPv4UdpAddress *t4;
1334   const struct IPv6UdpAddress *t6;
1335   struct sockaddr_in *v4;
1336   struct sockaddr_in6 *v6;
1337   size_t len;
1338
1339   switch (addrlen)
1340   {
1341   case sizeof (struct IPv4UdpAddress):
1342     if (NULL == plugin->sockv4)
1343     {
1344       return NULL;
1345     }
1346     t4 = addr;
1347     s = GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in));
1348     len = sizeof (struct sockaddr_in);
1349     v4 = (struct sockaddr_in *) &s[1];
1350     v4->sin_family = AF_INET;
1351 #if HAVE_SOCKADDR_IN_SIN_LEN
1352     v4->sin_len = sizeof (struct sockaddr_in);
1353 #endif
1354     v4->sin_port = t4->u4_port;
1355     v4->sin_addr.s_addr = t4->ipv4_addr;
1356     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in));
1357     break;
1358   case sizeof (struct IPv6UdpAddress):
1359     if (NULL == plugin->sockv6)
1360     {
1361       return NULL;
1362     }
1363     t6 = addr;
1364     s =
1365         GNUNET_malloc (sizeof (struct Session) + sizeof (struct sockaddr_in6));
1366     len = sizeof (struct sockaddr_in6);
1367     v6 = (struct sockaddr_in6 *) &s[1];
1368     v6->sin6_family = AF_INET6;
1369 #if HAVE_SOCKADDR_IN_SIN_LEN
1370     v6->sin6_len = sizeof (struct sockaddr_in6);
1371 #endif
1372     v6->sin6_port = t6->u6_port;
1373     v6->sin6_addr = t6->ipv6_addr;
1374     s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6));
1375     break;
1376   default:
1377     /* Must have a valid address to send to */
1378     GNUNET_break_op (0);
1379     return NULL;
1380   }
1381   s->addrlen = len;
1382   s->target = *target;
1383   s->sock_addr = (const struct sockaddr *) &s[1];
1384   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250);
1385   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1386   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1387   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1388   s->inbound = GNUNET_NO;
1389   start_session_timeout (s);
1390   return s;
1391 }
1392
1393
1394 static int
1395 session_cmp_it (void *cls,
1396                 const struct GNUNET_HashCode * key,
1397                 void *value)
1398 {
1399   struct SessionCompareContext * cctx = cls;
1400   const struct GNUNET_HELLO_Address *address = cctx->addr;
1401   struct Session *s = value;
1402
1403   socklen_t s_addrlen = s->addrlen;
1404
1405   LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing  address %s <-> %s\n",
1406       udp_address_to_string (NULL, (void *) address->address, address->address_length),
1407       GNUNET_a2s (s->sock_addr, s->addrlen));
1408   if ((address->address_length == sizeof (struct IPv4UdpAddress)) &&
1409       (s_addrlen == sizeof (struct sockaddr_in)))
1410   {
1411     struct IPv4UdpAddress * u4 = NULL;
1412     u4 = (struct IPv4UdpAddress *) address->address;
1413     const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr;
1414     if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) &&
1415         (u4->u4_port == s4->sin_port))
1416     {
1417       cctx->res = s;
1418       return GNUNET_NO;
1419     }
1420
1421   }
1422   if ((address->address_length == sizeof (struct IPv6UdpAddress)) &&
1423       (s_addrlen == sizeof (struct sockaddr_in6)))
1424   {
1425     struct IPv6UdpAddress * u6 = NULL;
1426     u6 = (struct IPv6UdpAddress *) address->address;
1427     const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr;
1428     if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) &&
1429         (u6->u6_port == s6->sin6_port))
1430     {
1431       cctx->res = s;
1432       return GNUNET_NO;
1433     }
1434   }
1435   return GNUNET_YES;
1436 }
1437
1438 /**
1439  * Function obtain the network type for a session
1440  *
1441  * @param cls closure ('struct Plugin*')
1442  * @param session the session
1443  * @return the network type in HBO or GNUNET_SYSERR
1444  */
1445 static enum GNUNET_ATS_Network_Type
1446 udp_get_network (void *cls, void *session)
1447 {
1448         struct Session *s = (struct Session *) session;
1449
1450         return ntohl(s->ats.value);
1451 }
1452
1453 /**
1454  * Creates a new outbound session the transport service will use to send data to the
1455  * peer
1456  *
1457  * @param cls the plugin
1458  * @param address the address
1459  * @return the session or NULL of max connections exceeded
1460  */
1461 static struct Session *
1462 udp_plugin_lookup_session (void *cls,
1463                  const struct GNUNET_HELLO_Address *address)
1464 {
1465   struct Plugin * plugin = cls;
1466   struct IPv6UdpAddress * udp_a6;
1467   struct IPv4UdpAddress * udp_a4;
1468
1469   GNUNET_assert (plugin != NULL);
1470   GNUNET_assert (address != NULL);
1471
1472
1473   if ((address->address == NULL) ||
1474       ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1475       (address->address_length != sizeof (struct IPv6UdpAddress))))
1476   {
1477     LOG (GNUNET_ERROR_TYPE_WARNING,
1478         _("Trying to create session for address of unexpected length %u (should be %u or %u)\n"),
1479         address->address_length,
1480         sizeof (struct IPv4UdpAddress),
1481         sizeof (struct IPv6UdpAddress));
1482     return NULL;
1483   }
1484
1485   if (address->address_length == sizeof (struct IPv4UdpAddress))
1486   {
1487     if (plugin->sockv4 == NULL)
1488       return NULL;
1489     udp_a4 = (struct IPv4UdpAddress *) address->address;
1490     if (udp_a4->u4_port == 0)
1491       return NULL;
1492   }
1493
1494   if (address->address_length == sizeof (struct IPv6UdpAddress))
1495   {
1496     if (plugin->sockv6 == NULL)
1497       return NULL;
1498     udp_a6 = (struct IPv6UdpAddress *) address->address;
1499     if (udp_a6->u6_port == 0)
1500       return NULL;
1501   }
1502
1503   /* check if session already exists */
1504   struct SessionCompareContext cctx;
1505   cctx.addr = address;
1506   cctx.res = NULL;
1507   LOG (GNUNET_ERROR_TYPE_DEBUG,
1508        "Looking for existing session for peer `%s' `%s' \n", 
1509        GNUNET_i2s (&address->peer), 
1510        udp_address_to_string(NULL, address->address, address->address_length));
1511   GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx);
1512   if (cctx.res != NULL)
1513   {
1514     LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res);
1515     return cctx.res;
1516   }
1517   return NULL;
1518 }
1519
1520 static struct Session *
1521 udp_plugin_create_session (void *cls,
1522                   const struct GNUNET_HELLO_Address *address)
1523 {
1524   struct Session * s = NULL;
1525
1526   /* otherwise create new */
1527   s = create_session (plugin,
1528       &address->peer,
1529       address->address,
1530       address->address_length,
1531       NULL, NULL);
1532   LOG (GNUNET_ERROR_TYPE_DEBUG,
1533        "Creating new session %p for peer `%s' address `%s'\n",
1534        s,
1535        GNUNET_i2s(&address->peer),
1536        udp_address_to_string(NULL,address->address,address->address_length));
1537   GNUNET_assert (GNUNET_OK ==
1538                  GNUNET_CONTAINER_multihashmap_put (plugin->sessions,
1539                                                     &s->target.hashPubKey,
1540                                                     s,
1541                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1542   GNUNET_STATISTICS_set(plugin->env->stats,
1543                         "# UDP, sessions active",
1544                         GNUNET_CONTAINER_multihashmap_size(plugin->sessions),
1545                         GNUNET_NO);
1546   return s;
1547 }
1548
1549
1550
1551 /**
1552  * Creates a new outbound session the transport service will use to send data to the
1553  * peer
1554  *
1555  * @param cls the plugin
1556  * @param address the address
1557  * @return the session or NULL of max connections exceeded
1558  */
1559 static struct Session *
1560 udp_plugin_get_session (void *cls,
1561                   const struct GNUNET_HELLO_Address *address)
1562 {
1563   struct Session * s = NULL;
1564
1565   /* otherwise create new */
1566   if (NULL != (s = udp_plugin_lookup_session(cls, address)))
1567         return s;
1568   else
1569         return udp_plugin_create_session (cls, address);
1570 }
1571
1572
1573 static void 
1574 enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
1575 {
1576   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1577       GNUNET_break (0);
1578   else
1579   {
1580     GNUNET_STATISTICS_update (plugin->env->stats,
1581                               "# UDP, total, bytes in buffers",
1582                               udpw->msg_size, GNUNET_NO);
1583     plugin->bytes_in_buffer += udpw->msg_size;
1584   }
1585   GNUNET_STATISTICS_update (plugin->env->stats,
1586                             "# UDP, total, msgs in buffers",
1587                             1, GNUNET_NO);
1588   if (udpw->session->addrlen == sizeof (struct sockaddr_in))
1589     GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head,
1590                                  plugin->ipv4_queue_tail, udpw);
1591   if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
1592     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1593                                  plugin->ipv6_queue_tail, udpw);
1594 }
1595
1596
1597
1598 /**
1599  * Fragment message was transmitted via UDP, let fragmentation know
1600  * to send the next fragment now.
1601  *
1602  * @param cls the 'struct UDPMessageWrapper' of the fragment
1603  * @param target destination peer (ignored)
1604  * @param result GNUNET_OK on success (ignored)
1605  * @param payload bytes payload sent
1606  * @param physical bytes physical sent
1607  */
1608 static void
1609 send_next_fragment (void *cls,
1610                     const struct GNUNET_PeerIdentity *target,
1611                     int result, size_t payload, size_t physical)
1612 {
1613   struct UDP_MessageWrapper *udpw = cls;
1614   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);  
1615 }
1616
1617
1618 /**
1619  * Function that is called with messages created by the fragmentation
1620  * module.  In the case of the 'proc' callback of the
1621  * GNUNET_FRAGMENT_context_create function, this function must
1622  * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
1623  *
1624  * @param cls closure, the 'struct FragmentationContext'
1625  * @param msg the message that was created
1626  */
1627 static void
1628 enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
1629 {
1630   struct UDP_FragmentationContext *frag_ctx = cls;
1631   struct Plugin *plugin = frag_ctx->plugin;
1632   struct UDP_MessageWrapper * udpw;
1633   size_t msg_len = ntohs (msg->size);
1634  
1635   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1636        "Enqueuing fragment with %u bytes\n", msg_len);
1637   frag_ctx->fragments_used ++;
1638   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1639   udpw->session = frag_ctx->session;
1640   udpw->msg_buf = (char *) &udpw[1];
1641   udpw->msg_size = msg_len;
1642   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
1643   udpw->cont = &send_next_fragment;
1644   udpw->cont_cls = udpw;
1645   udpw->timeout = frag_ctx->timeout;
1646   udpw->frag_ctx = frag_ctx;
1647   udpw->msg_type = MSG_FRAGMENTED;
1648   memcpy (udpw->msg_buf, msg, msg_len);
1649   enqueue (plugin, udpw);
1650   schedule_select (plugin);
1651 }
1652
1653
1654 /**
1655  * Function that can be used by the transport service to transmit
1656  * a message using the plugin.   Note that in the case of a
1657  * peer disconnecting, the continuation MUST be called
1658  * prior to the disconnect notification itself.  This function
1659  * will be called with this peer's HELLO message to initiate
1660  * a fresh connection to another peer.
1661  *
1662  * @param cls closure
1663  * @param s which session must be used
1664  * @param msgbuf the message to transmit
1665  * @param msgbuf_size number of bytes in 'msgbuf'
1666  * @param priority how important is the message (most plugins will
1667  *                 ignore message priority and just FIFO)
1668  * @param to how long to wait at most for the transmission (does not
1669  *                require plugins to discard the message after the timeout,
1670  *                just advisory for the desired delay; most plugins will ignore
1671  *                this as well)
1672  * @param cont continuation to call once the message has
1673  *        been transmitted (or if the transport is ready
1674  *        for the next transmission call; or if the
1675  *        peer disconnected...); can be NULL
1676  * @param cont_cls closure for cont
1677  * @return number of bytes used (on the physical network, with overheads);
1678  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1679  *         and does NOT mean that the message was not transmitted (DV)
1680  */
1681 static ssize_t
1682 udp_plugin_send (void *cls,
1683                   struct Session *s,
1684                   const char *msgbuf, size_t msgbuf_size,
1685                   unsigned int priority,
1686                   struct GNUNET_TIME_Relative to,
1687                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1688 {
1689   struct Plugin *plugin = cls;
1690   size_t udpmlen = msgbuf_size + sizeof (struct UDPMessage);
1691   struct UDP_FragmentationContext * frag_ctx;
1692   struct UDP_MessageWrapper * udpw;
1693   struct UDPMessage *udp;
1694   char mbuf[udpmlen];
1695   GNUNET_assert (plugin != NULL);
1696   GNUNET_assert (s != NULL);
1697
1698   if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL))
1699     return GNUNET_SYSERR;
1700   if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL))
1701     return GNUNET_SYSERR;
1702   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1703   {
1704     GNUNET_break (0);
1705     return GNUNET_SYSERR;
1706   }
1707   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s))
1708   {
1709     GNUNET_break (0);
1710     return GNUNET_SYSERR;
1711   }
1712   LOG (GNUNET_ERROR_TYPE_DEBUG,
1713        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1714        udpmlen,
1715        GNUNET_i2s (&s->target),
1716        GNUNET_a2s(s->sock_addr, s->addrlen));
1717
1718
1719   /* Message */
1720   udp = (struct UDPMessage *) mbuf;
1721   udp->header.size = htons (udpmlen);
1722   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1723   udp->reserved = htonl (0);
1724   udp->sender = *plugin->env->my_identity;
1725
1726   reschedule_session_timeout(s);
1727   if (udpmlen <= UDP_MTU)
1728   {
1729     /* unfragmented message */
1730     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
1731     udpw->session = s;
1732     udpw->msg_buf = (char *) &udpw[1];
1733     udpw->msg_size = udpmlen; /* message size with UDP overhead */
1734     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
1735     udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1736     udpw->cont = cont;
1737     udpw->cont_cls = cont_cls;
1738     udpw->frag_ctx = NULL;
1739     udpw->msg_type = MSG_UNFRAGMENTED;
1740     memcpy (udpw->msg_buf, udp, sizeof (struct UDPMessage));
1741     memcpy (&udpw->msg_buf[sizeof (struct UDPMessage)], msgbuf, msgbuf_size);
1742     enqueue (plugin, udpw);
1743
1744     GNUNET_STATISTICS_update (plugin->env->stats,
1745                               "# UDP, unfragmented msgs, messages, attempt",
1746                               1, GNUNET_NO);
1747     GNUNET_STATISTICS_update (plugin->env->stats,
1748                               "# UDP, unfragmented msgs, bytes payload, attempt",
1749                               udpw->payload_size, GNUNET_NO);
1750   }
1751   else
1752   {
1753     /* fragmented message */
1754     if  (s->frag_ctx != NULL)
1755       return GNUNET_SYSERR;
1756     memcpy (&udp[1], msgbuf, msgbuf_size);
1757     frag_ctx = GNUNET_malloc (sizeof (struct UDP_FragmentationContext));
1758     frag_ctx->plugin = plugin;
1759     frag_ctx->session = s;
1760     frag_ctx->cont = cont;
1761     frag_ctx->cont_cls = cont_cls;
1762     frag_ctx->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to);
1763     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1764     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
1765     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1766                                                      UDP_MTU,
1767                                                      &plugin->tracker,
1768                                                      s->last_expected_msg_delay, 
1769                                                      s->last_expected_ack_delay, 
1770                                                      &udp->header,
1771                                                      &enqueue_fragment,
1772                                                      frag_ctx);    
1773     s->frag_ctx = frag_ctx;
1774     GNUNET_STATISTICS_update (plugin->env->stats,
1775                               "# UDP, fragmented msgs, messages, pending",
1776                               1, GNUNET_NO);
1777     GNUNET_STATISTICS_update (plugin->env->stats,
1778                               "# UDP, fragmented msgs, messages, attempt",
1779                               1, GNUNET_NO);
1780     GNUNET_STATISTICS_update (plugin->env->stats,
1781                               "# UDP, fragmented msgs, bytes payload, attempt",
1782                               frag_ctx->payload_size, GNUNET_NO);
1783   }
1784   schedule_select (plugin);
1785   return udpmlen;
1786 }
1787
1788
1789 /**
1790  * Our external IP address/port mapping has changed.
1791  *
1792  * @param cls closure, the 'struct LocalAddrList'
1793  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
1794  *     the previous (now invalid) one
1795  * @param addr either the previous or the new public IP address
1796  * @param addrlen actual lenght of the address
1797  */
1798 static void
1799 udp_nat_port_map_callback (void *cls, int add_remove,
1800                            const struct sockaddr *addr, socklen_t addrlen)
1801 {
1802   struct Plugin *plugin = cls;
1803   struct IPv4UdpAddress u4;
1804   struct IPv6UdpAddress u6;
1805   void *arg;
1806   size_t args;
1807
1808   LOG (GNUNET_ERROR_TYPE_INFO,
1809        "NAT notification to %s address `%s'\n",
1810        (GNUNET_YES == add_remove) ? "add" : "remove",
1811        GNUNET_a2s (addr, addrlen));
1812
1813   /* convert 'addr' to our internal format */
1814   switch (addr->sa_family)
1815   {
1816   case AF_INET:
1817     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
1818     memset (&u4, 0, sizeof (u4));
1819     u4.options = htonl(myoptions);
1820     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
1821     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
1822     arg = &u4;
1823     args = sizeof (struct IPv4UdpAddress);
1824     break;
1825   case AF_INET6:
1826     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
1827     memset (&u4, 0, sizeof (u4));
1828     u6.options = htonl(myoptions);
1829     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
1830             sizeof (struct in6_addr));
1831     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
1832     arg = &u6;
1833     args = sizeof (struct IPv6UdpAddress);
1834     break;
1835   default:
1836     GNUNET_break (0);
1837     return;
1838   }
1839   /* modify our published address list */
1840   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args, "udp");
1841 }
1842
1843
1844
1845 /**
1846  * Message tokenizer has broken up an incomming message. Pass it on
1847  * to the service.
1848  *
1849  * @param cls the 'struct Plugin'
1850  * @param client the 'struct SourceInformation'
1851  * @param hdr the actual message
1852  */
1853 static int
1854 process_inbound_tokenized_messages (void *cls, void *client,
1855                                     const struct GNUNET_MessageHeader *hdr)
1856 {
1857   struct Plugin *plugin = cls;
1858   struct SourceInformation *si = client;
1859   struct GNUNET_TIME_Relative delay;
1860
1861   GNUNET_assert (si->session != NULL);
1862   if (GNUNET_YES == si->session->in_destroy)
1863     return GNUNET_OK;
1864   /* setup ATS */
1865   GNUNET_break (ntohl(si->session->ats.value) != GNUNET_ATS_NET_UNSPECIFIED);
1866   delay = plugin->env->receive (plugin->env->cls,
1867                                 &si->sender,
1868                                 hdr,
1869                                 si->session,
1870                  (GNUNET_YES == si->session->inbound) ? NULL : si->arg,
1871                  (GNUNET_YES == si->session->inbound) ? 0 : si->args);
1872
1873   plugin->env->update_address_metrics (plugin->env->cls,
1874                                        &si->sender,
1875                                          (GNUNET_YES == si->session->inbound) ? NULL : si->arg,
1876                                          (GNUNET_YES == si->session->inbound) ? 0 : si->args,
1877                                        si->session,
1878                                        &si->session->ats, 1);
1879
1880   si->session->flow_delay_for_other_peer = delay;
1881   reschedule_session_timeout(si->session);
1882   return GNUNET_OK;
1883 }
1884
1885
1886 /**
1887  * We've received a UDP Message.  Process it (pass contents to main service).
1888  *
1889  * @param plugin plugin context
1890  * @param msg the message
1891  * @param sender_addr sender address
1892  * @param sender_addr_len number of bytes in sender_addr
1893  */
1894 static void
1895 process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
1896                      const struct sockaddr *sender_addr,
1897                      socklen_t sender_addr_len)
1898 {
1899   struct SourceInformation si;
1900   struct Session * s;
1901   struct IPv4UdpAddress u4;
1902   struct IPv6UdpAddress u6;
1903   const void *arg;
1904   size_t args;
1905
1906   if (0 != ntohl (msg->reserved))
1907   {
1908     GNUNET_break_op (0);
1909     return;
1910   }
1911   if (ntohs (msg->header.size) <
1912       sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
1913   {
1914     GNUNET_break_op (0);
1915     return;
1916   }
1917
1918   /* convert address */
1919   switch (sender_addr->sa_family)
1920   {
1921   case AF_INET:
1922     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
1923     memset (&u4, 0, sizeof (u4));
1924     u6.options = htonl (0);
1925     u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
1926     u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
1927     arg = &u4;
1928     args = sizeof (u4);
1929     break;
1930   case AF_INET6:
1931     GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
1932     memset (&u6, 0, sizeof (u6));
1933     u6.options = htonl (0);
1934     u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
1935     u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
1936     arg = &u6;
1937     args = sizeof (u6);
1938     break;
1939   default:
1940     GNUNET_break (0);
1941     return;
1942   }
1943   LOG (GNUNET_ERROR_TYPE_DEBUG,
1944        "Received message with %u bytes from peer `%s' at `%s'\n",
1945        (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
1946        GNUNET_a2s (sender_addr, sender_addr_len));
1947
1948   struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
1949   if (NULL == (s = udp_plugin_lookup_session (plugin, address)))
1950   {
1951                 s = udp_plugin_create_session(plugin, address);
1952                 s->inbound = GNUNET_YES;
1953           plugin->env->session_start (NULL, &address->peer, PLUGIN_NAME,
1954                         (GNUNET_YES == s->inbound) ? NULL : address->address,
1955                         (GNUNET_YES == s->inbound) ? 0 : address->address_length,
1956                   s, NULL, 0);
1957   }
1958   GNUNET_free (address);
1959
1960   /* iterate over all embedded messages */
1961   si.session = s;
1962   si.sender = msg->sender;
1963   si.arg = arg;
1964   si.args = args;
1965   s->rc++;
1966   GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
1967                              ntohs (msg->header.size) -
1968                              sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
1969   s->rc--;
1970   if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
1971     free_session (s);
1972 }
1973
1974
1975 /**
1976  * Scan the heap for a receive context with the given address.
1977  *
1978  * @param cls the 'struct FindReceiveContext'
1979  * @param node internal node of the heap
1980  * @param element value stored at the node (a 'struct ReceiveContext')
1981  * @param cost cost associated with the node
1982  * @return GNUNET_YES if we should continue to iterate,
1983  *         GNUNET_NO if not.
1984  */
1985 static int
1986 find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
1987                       void *element, GNUNET_CONTAINER_HeapCostType cost)
1988 {
1989   struct FindReceiveContext *frc = cls;
1990   struct DefragContext *e = element;
1991
1992   if ((frc->addr_len == e->addr_len) &&
1993       (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
1994   {
1995     frc->rc = e;
1996     return GNUNET_NO;
1997   }
1998   return GNUNET_YES;
1999 }
2000
2001
2002 /**
2003  * Process a defragmented message.
2004  *
2005  * @param cls the 'struct ReceiveContext'
2006  * @param msg the message
2007  */
2008 static void
2009 fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
2010 {
2011   struct DefragContext *rc = cls;
2012
2013   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2014   {
2015     GNUNET_break (0);
2016     return;
2017   }
2018   if (ntohs (msg->size) < sizeof (struct UDPMessage))
2019   {
2020     GNUNET_break (0);
2021     return;
2022   }
2023   process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
2024                        rc->src_addr, rc->addr_len);
2025 }
2026
2027
2028 struct LookupContext
2029 {
2030   const struct sockaddr * addr;
2031
2032   struct Session *res;
2033
2034   size_t addrlen;
2035 };
2036
2037
2038 static int
2039 lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value)
2040 {
2041   struct LookupContext *l_ctx = cls;
2042   struct Session * s = value;
2043
2044   if ((s->addrlen == l_ctx->addrlen) &&
2045       (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
2046   {
2047     l_ctx->res = s;
2048     return GNUNET_NO;
2049   }
2050   return GNUNET_YES;
2051 }
2052
2053
2054 /**
2055  * Transmit an acknowledgement.
2056  *
2057  * @param cls the 'struct ReceiveContext'
2058  * @param id message ID (unused)
2059  * @param msg ack to transmit
2060  */
2061 static void
2062 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
2063 {
2064   struct DefragContext *rc = cls;
2065   size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
2066   struct UDP_ACK_Message *udp_ack;
2067   uint32_t delay = 0;
2068   struct UDP_MessageWrapper *udpw;
2069   struct Session *s;
2070   struct LookupContext l_ctx;
2071
2072   l_ctx.addr = rc->src_addr;
2073   l_ctx.addrlen = rc->addr_len;
2074   l_ctx.res = NULL;
2075   GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
2076       &lookup_session_by_addr_it,
2077       &l_ctx);
2078   s = l_ctx.res;
2079
2080   if (NULL == s)
2081     return;
2082
2083   if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
2084     delay = s->flow_delay_for_other_peer.rel_value;
2085
2086   LOG (GNUNET_ERROR_TYPE_DEBUG,
2087        "Sending ACK to `%s' including delay of %u ms\n",
2088        GNUNET_a2s (rc->src_addr,
2089                    (rc->src_addr->sa_family ==
2090                     AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
2091                                                                      sockaddr_in6)),
2092        delay);
2093   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2094   udpw->msg_size = msize;
2095   udpw->payload_size = 0;
2096   udpw->session = s;
2097   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2098   udpw->msg_buf = (char *)&udpw[1];
2099   udpw->msg_type = MSG_ACK;
2100   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2101   udp_ack->header.size = htons ((uint16_t) msize);
2102   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2103   udp_ack->delay = htonl (delay);
2104   udp_ack->sender = *rc->plugin->env->my_identity;
2105   memcpy (&udp_ack[1], msg, ntohs (msg->size));
2106   enqueue (rc->plugin, udpw);
2107 }
2108
2109
2110 static void 
2111 read_process_msg (struct Plugin *plugin,
2112                   const struct GNUNET_MessageHeader *msg,
2113                   const char *addr,
2114                   socklen_t fromlen)
2115 {
2116   if (ntohs (msg->size) < sizeof (struct UDPMessage))
2117   {
2118     GNUNET_break_op (0);
2119     return;
2120   }
2121   process_udp_message (plugin, (const struct UDPMessage *) msg,
2122                        (const struct sockaddr *) addr, fromlen);
2123 }
2124
2125
2126 static void 
2127 read_process_ack (struct Plugin *plugin,
2128                   const struct GNUNET_MessageHeader *msg,
2129                   char *addr,
2130                   socklen_t fromlen)
2131 {
2132   const struct GNUNET_MessageHeader *ack;
2133   const struct UDP_ACK_Message *udp_ack;
2134   struct LookupContext l_ctx;
2135   struct Session *s;
2136   struct GNUNET_TIME_Relative flow_delay;
2137
2138   if (ntohs (msg->size) <
2139       sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
2140   {
2141     GNUNET_break_op (0);
2142     return;
2143   }
2144   udp_ack = (const struct UDP_ACK_Message *) msg;
2145   l_ctx.addr = (const struct sockaddr *) addr;
2146   l_ctx.addrlen = fromlen;
2147   l_ctx.res = NULL;
2148   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
2149                                          &lookup_session_by_addr_it,
2150                                          &l_ctx);
2151   s = l_ctx.res;
2152
2153   if ((NULL == s) || (NULL == s->frag_ctx))
2154   {
2155     return;
2156   }
2157
2158   flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
2159   LOG (GNUNET_ERROR_TYPE_DEBUG, 
2160        "We received a sending delay of %llu\n",
2161        flow_delay.rel_value);
2162   s->flow_delay_from_other_peer =
2163       GNUNET_TIME_relative_to_absolute (flow_delay);
2164
2165   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2166   if (ntohs (ack->size) !=
2167       ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
2168   {
2169     GNUNET_break_op (0);
2170     return;
2171   }
2172
2173   if (0 != memcmp (&l_ctx.res->target, &udp_ack->sender, sizeof (struct GNUNET_PeerIdentity)))
2174     GNUNET_break (0);
2175   if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
2176   {
2177     LOG (GNUNET_ERROR_TYPE_DEBUG,
2178          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2179          (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2180          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2181     /* Expect more ACKs to arrive */
2182     return;
2183   }
2184
2185   LOG (GNUNET_ERROR_TYPE_DEBUG,
2186        "Message full ACK'ed\n",
2187        (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
2188        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2189
2190   /* Remove fragmented message after successful sending */
2191   fragmented_message_done (s->frag_ctx, GNUNET_OK);
2192 }
2193
2194
2195 static void 
2196 read_process_fragment (struct Plugin *plugin,
2197                        const struct GNUNET_MessageHeader *msg,
2198                        char *addr,
2199                        socklen_t fromlen)
2200 {
2201   struct DefragContext *d_ctx;
2202   struct GNUNET_TIME_Absolute now;
2203   struct FindReceiveContext frc;
2204
2205   frc.rc = NULL;
2206   frc.addr = (const struct sockaddr *) addr;
2207   frc.addr_len = fromlen;
2208
2209   LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
2210        (unsigned int) ntohs (msg->size),
2211        GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2212   /* Lookup existing receive context for this address */
2213   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2214                                  &find_receive_context,
2215                                  &frc);
2216   now = GNUNET_TIME_absolute_get ();
2217   d_ctx = frc.rc;
2218
2219   if (d_ctx == NULL)
2220   {
2221     /* Create a new defragmentation context */
2222     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
2223     memcpy (&d_ctx[1], addr, fromlen);
2224     d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
2225     d_ctx->addr_len = fromlen;
2226     d_ctx->plugin = plugin;
2227     d_ctx->defrag =
2228         GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
2229                                           UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
2230                                           &fragment_msg_proc, &ack_proc);
2231     d_ctx->hnode =
2232         GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
2233                                       (GNUNET_CONTAINER_HeapCostType)
2234                                       now.abs_value);
2235     LOG (GNUNET_ERROR_TYPE_DEBUG, 
2236          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2237          (unsigned int) ntohs (msg->size),
2238          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2239   }
2240   else
2241   {
2242     LOG (GNUNET_ERROR_TYPE_DEBUG,
2243          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2244          (unsigned int) ntohs (msg->size),
2245          GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
2246   }
2247
2248   if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2249   {
2250     /* keep this 'rc' from expiring */
2251     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
2252                                        (GNUNET_CONTAINER_HeapCostType)
2253                                        now.abs_value);
2254   }
2255   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2256       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2257   {
2258     /* remove 'rc' that was inactive the longest */
2259     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2260     GNUNET_assert (NULL != d_ctx);
2261     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2262     GNUNET_free (d_ctx);
2263   }
2264 }
2265
2266
2267 /**
2268  * Read and process a message from the given socket.
2269  *
2270  * @param plugin the overall plugin
2271  * @param rsock socket to read from
2272  */
2273 static void
2274 udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock)
2275 {
2276   socklen_t fromlen;
2277   char addr[32];
2278   char buf[65536] GNUNET_ALIGN;
2279   ssize_t size;
2280   const struct GNUNET_MessageHeader *msg;
2281
2282   fromlen = sizeof (addr);
2283   memset (&addr, 0, sizeof (addr));
2284   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf),
2285                                       (struct sockaddr *) &addr, &fromlen);
2286 #if MINGW
2287   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2288    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
2289    * on this socket has failed.
2290    * Quote from MSDN:
2291    *   WSAECONNRESET - The virtual circuit was reset by the remote side
2292    *   executing a hard or abortive close. The application should close
2293    *   the socket; it is no longer usable. On a UDP-datagram socket this
2294    *   error indicates a previous send operation resulted in an ICMP Port
2295    *   Unreachable message.
2296    */
2297   if ( (-1 == size) && (ECONNRESET == errno) )
2298     return;
2299 #endif
2300   if (-1 == size)
2301   {
2302     LOG (GNUNET_ERROR_TYPE_DEBUG,
2303         "UDP failed to receive data: %s\n", STRERROR (errno));
2304     /* Connection failure or something. Not a protocol violation. */
2305     return;
2306   }
2307   if (size < sizeof (struct GNUNET_MessageHeader))
2308   {
2309     LOG (GNUNET_ERROR_TYPE_WARNING,
2310         "UDP got %u bytes, which is not enough for a GNUnet message header\n",
2311         (unsigned int) size);
2312     /* _MAY_ be a connection failure (got partial message) */
2313     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2314     GNUNET_break_op (0);
2315     return;
2316   }
2317   msg = (const struct GNUNET_MessageHeader *) buf;
2318
2319   LOG (GNUNET_ERROR_TYPE_DEBUG,
2320        "UDP received %u-byte message from `%s' type %i\n", (unsigned int) size,
2321        GNUNET_a2s ((const struct sockaddr *) addr, fromlen), ntohs (msg->type));
2322
2323   if (size != ntohs (msg->size))
2324   {
2325     GNUNET_break_op (0);
2326     return;
2327   }
2328
2329   GNUNET_STATISTICS_update (plugin->env->stats,
2330                             "# UDP, total, bytes, received",
2331                             size, GNUNET_NO);
2332
2333   switch (ntohs (msg->type))
2334   {
2335   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2336     udp_broadcast_receive (plugin, &buf, size, addr, fromlen);
2337     return;
2338
2339   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2340     read_process_msg (plugin, msg, addr, fromlen);
2341     return;
2342
2343   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2344     read_process_ack (plugin, msg, addr, fromlen);
2345     return;
2346
2347   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2348     read_process_fragment (plugin, msg, addr, fromlen);
2349     return;
2350
2351   default:
2352     GNUNET_break_op (0);
2353     return;
2354   }
2355 }
2356
2357 static struct UDP_MessageWrapper *
2358 remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
2359                                     struct GNUNET_NETWORK_Handle *sock)
2360 {
2361   struct UDP_MessageWrapper *udpw = NULL;
2362   struct GNUNET_TIME_Relative remaining;
2363
2364   udpw = head;
2365   while (udpw != NULL)
2366   {
2367     /* Find messages with timeout */
2368     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2369     if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2370     {
2371       /* Message timed out */
2372       switch (udpw->msg_type) {
2373         case MSG_UNFRAGMENTED:
2374           GNUNET_STATISTICS_update (plugin->env->stats,
2375                                     "# UDP, total, bytes, sent, timeout",
2376                                     udpw->msg_size, GNUNET_NO);
2377           GNUNET_STATISTICS_update (plugin->env->stats,
2378                                     "# UDP, total, messages, sent, timeout",
2379                                     1, GNUNET_NO);
2380           GNUNET_STATISTICS_update (plugin->env->stats,
2381                                     "# UDP, unfragmented msgs, messages, sent, timeout",
2382                                     1, GNUNET_NO);
2383           GNUNET_STATISTICS_update (plugin->env->stats,
2384                                     "# UDP, unfragmented msgs, bytes, sent, timeout",
2385                                     udpw->payload_size, GNUNET_NO);
2386           /* Not fragmented message */
2387           LOG (GNUNET_ERROR_TYPE_DEBUG,
2388                "Message for peer `%s' with size %u timed out\n",
2389                GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2390           call_continuation (udpw, GNUNET_SYSERR);
2391           /* Remove message */
2392           dequeue (plugin, udpw);
2393           GNUNET_free (udpw);
2394           break;
2395         case MSG_FRAGMENTED:
2396           /* Fragmented message */
2397           GNUNET_STATISTICS_update (plugin->env->stats,
2398                                     "# UDP, total, bytes, sent, timeout",
2399                                     udpw->frag_ctx->on_wire_size, GNUNET_NO);
2400           GNUNET_STATISTICS_update (plugin->env->stats,
2401                                     "# UDP, total, messages, sent, timeout",
2402                                     1, GNUNET_NO);
2403           call_continuation (udpw, GNUNET_SYSERR);
2404           LOG (GNUNET_ERROR_TYPE_DEBUG,
2405                "Fragment for message for peer `%s' with size %u timed out\n",
2406                GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->payload_size);
2407
2408
2409           GNUNET_STATISTICS_update (plugin->env->stats,
2410                                     "# UDP, fragmented msgs, messages, sent, timeout",
2411                                     1, GNUNET_NO);
2412           GNUNET_STATISTICS_update (plugin->env->stats,
2413                                     "# UDP, fragmented msgs, bytes, sent, timeout",
2414                                     udpw->frag_ctx->payload_size, GNUNET_NO);
2415           /* Remove fragmented message due to timeout */
2416           fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
2417           break;
2418         case MSG_ACK:
2419           GNUNET_STATISTICS_update (plugin->env->stats,
2420                                     "# UDP, total, bytes, sent, timeout",
2421                                     udpw->msg_size, GNUNET_NO);
2422           GNUNET_STATISTICS_update (plugin->env->stats,
2423                                     "# UDP, total, messages, sent, timeout",
2424                                     1, GNUNET_NO);
2425           LOG (GNUNET_ERROR_TYPE_DEBUG,
2426                "ACK Message for peer `%s' with size %u timed out\n",
2427                GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2428           call_continuation (udpw, GNUNET_SYSERR);
2429           dequeue (plugin, udpw);
2430           GNUNET_free (udpw);
2431           break;
2432         default:
2433           break;
2434       }
2435       if (sock == plugin->sockv4)
2436         udpw = plugin->ipv4_queue_head;
2437       else if (sock == plugin->sockv6)
2438         udpw = plugin->ipv6_queue_head;
2439       else
2440       {
2441         GNUNET_break (0); /* should never happen */
2442         udpw = NULL;
2443       }
2444       GNUNET_STATISTICS_update (plugin->env->stats,
2445                                 "# messages dismissed due to timeout",
2446                                 1, GNUNET_NO);
2447     }
2448     else
2449     {
2450       /* Message did not time out, check flow delay */
2451       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2452       if (GNUNET_TIME_UNIT_ZERO.rel_value == remaining.rel_value)
2453       {
2454         /* this message is not delayed */
2455         LOG (GNUNET_ERROR_TYPE_DEBUG, 
2456              "Message for peer `%s' (%u bytes) is not delayed \n",
2457              GNUNET_i2s(&udpw->session->target), udpw->payload_size);
2458         break; /* Found message to send, break */
2459       }
2460       else
2461       {
2462         /* Message is delayed, try next */
2463         LOG (GNUNET_ERROR_TYPE_DEBUG,
2464              "Message for peer `%s' (%u bytes) is delayed for %llu \n",
2465              GNUNET_i2s(&udpw->session->target), udpw->payload_size, remaining.rel_value);
2466         udpw = udpw->next;
2467       }
2468     }
2469   }
2470   return udpw;
2471 }
2472
2473
2474 static void
2475 analyze_send_error (struct Plugin *plugin,
2476                     const struct sockaddr * sa,
2477                     socklen_t slen,
2478                     int error)
2479 {
2480   static int network_down_error;
2481   struct GNUNET_ATS_Information type;
2482
2483  type = plugin->env->get_address_type (plugin->env->cls,sa, slen);
2484  if (((GNUNET_ATS_NET_LAN == ntohl(type.value)) || (GNUNET_ATS_NET_WAN == ntohl(type.value))) &&
2485      ((ENETUNREACH == errno) || (ENETDOWN == errno)))
2486  {
2487    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in)))
2488    {
2489      /* IPv4: "Network unreachable" or "Network down"
2490       *
2491       * This indicates we do not have connectivity
2492       */
2493      LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2494          _("UDP could not transmit message to `%s': "
2495            "Network seems down, please check your network configuration\n"),
2496          GNUNET_a2s (sa, slen));
2497    }
2498    if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in6)))
2499    {
2500      /* IPv6: "Network unreachable" or "Network down"
2501       *
2502       * This indicates that this system is IPv6 enabled, but does not
2503       * have a valid global IPv6 address assigned or we do not have
2504       * connectivity
2505       */
2506
2507     LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2508         _("UDP could not transmit message to `%s': "
2509           "Please check your network configuration and disable IPv6 if your "
2510           "connection does not have a global IPv6 address\n"),
2511         GNUNET_a2s (sa, slen));
2512    }
2513  }
2514  else
2515  {
2516    LOG (GNUNET_ERROR_TYPE_WARNING,
2517       "UDP could not transmit message to `%s': `%s'\n",
2518       GNUNET_a2s (sa, slen), STRERROR (error));
2519  }
2520 }
2521
2522 static size_t
2523 udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
2524 {
2525   const struct sockaddr * sa;
2526   ssize_t sent;
2527   socklen_t slen;
2528
2529   struct UDP_MessageWrapper *udpw = NULL;
2530
2531   /* Find message to send */
2532   udpw = remove_timeout_messages_and_select ((sock == plugin->sockv4) ? plugin->ipv4_queue_head : plugin->ipv6_queue_head,
2533                                              sock);
2534   if (NULL == udpw)
2535     return 0; /* No message to send */
2536
2537   sa = udpw->session->sock_addr;
2538   slen = udpw->session->addrlen;
2539
2540   sent = GNUNET_NETWORK_socket_sendto (sock, udpw->msg_buf, udpw->msg_size, sa, slen);
2541
2542   if (GNUNET_SYSERR == sent)
2543   {
2544     /* Failure */
2545     analyze_send_error (plugin, sa, slen, errno);
2546     call_continuation(udpw, GNUNET_SYSERR);
2547     GNUNET_STATISTICS_update (plugin->env->stats,
2548                             "# UDP, total, bytes, sent, failure",
2549                             sent, GNUNET_NO);
2550     GNUNET_STATISTICS_update (plugin->env->stats,
2551                               "# UDP, total, messages, sent, failure",
2552                               1, GNUNET_NO);
2553   }
2554   else
2555   {
2556     /* Success */
2557     LOG (GNUNET_ERROR_TYPE_DEBUG,
2558          "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
2559          (unsigned int) (udpw->msg_size), GNUNET_i2s(&udpw->session->target) ,GNUNET_a2s (sa, slen), (int) sent,
2560          (sent < 0) ? STRERROR (errno) : "ok");
2561     GNUNET_STATISTICS_update (plugin->env->stats,
2562                               "# UDP, total, bytes, sent, success",
2563                               sent, GNUNET_NO);
2564     GNUNET_STATISTICS_update (plugin->env->stats,
2565                               "# UDP, total, messages, sent, success",
2566                               1, GNUNET_NO);
2567     if (NULL != udpw->frag_ctx)
2568         udpw->frag_ctx->on_wire_size += udpw->msg_size;
2569     call_continuation (udpw, GNUNET_OK);
2570   }
2571   dequeue (plugin, udpw);
2572   GNUNET_free (udpw);
2573   udpw = NULL;
2574
2575   return sent;
2576 }
2577
2578
2579 /**
2580  * We have been notified that our readset has something to read.  We don't
2581  * know which socket needs to be read, so we have to check each one
2582  * Then reschedule this function to be called again once more is available.
2583  *
2584  * @param cls the plugin handle
2585  * @param tc the scheduling context (for rescheduling this function again)
2586  */
2587 static void
2588 udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2589 {
2590   struct Plugin *plugin = cls;
2591
2592   plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
2593   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2594     return;
2595   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
2596        (NULL != plugin->sockv4) &&
2597        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)) )
2598     udp_select_read (plugin, plugin->sockv4);
2599   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2600        (NULL != plugin->sockv4) && 
2601        (NULL != plugin->ipv4_queue_head) &&
2602        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)) )
2603     udp_select_send (plugin, plugin->sockv4);   
2604   schedule_select (plugin);
2605 }
2606
2607
2608 /**
2609  * We have been notified that our readset has something to read.  We don't
2610  * know which socket needs to be read, so we have to check each one
2611  * Then reschedule this function to be called again once more is available.
2612  *
2613  * @param cls the plugin handle
2614  * @param tc the scheduling context (for rescheduling this function again)
2615  */
2616 static void
2617 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2618 {
2619   struct Plugin *plugin = cls;
2620
2621   plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
2622   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2623     return;
2624   if ( ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) &&
2625        (NULL != plugin->sockv6) &&
2626        (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)) )
2627     udp_select_read (plugin, plugin->sockv6);
2628   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
2629        (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL) &&
2630        (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )    
2631     udp_select_send (plugin, plugin->sockv6);
2632   schedule_select (plugin);
2633 }
2634
2635
2636 /**
2637  *
2638  * @return number of sockets that were successfully bound
2639  */
2640 static int
2641 setup_sockets (struct Plugin *plugin, 
2642                const struct sockaddr_in6 *bind_v6,
2643                const struct sockaddr_in *bind_v4)
2644 {
2645   int tries;
2646   int sockets_created = 0;
2647   struct sockaddr_in6 serverAddrv6;
2648   struct sockaddr_in serverAddrv4;
2649   struct sockaddr *serverAddr;
2650   struct sockaddr *addrs[2];
2651   socklen_t addrlens[2];
2652   socklen_t addrlen;
2653   int eno;
2654
2655   /* Create IPv6 socket */
2656   eno = EINVAL;
2657   if (plugin->enable_ipv6 == GNUNET_YES)
2658   {
2659     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
2660     if (NULL == plugin->sockv6)
2661     {
2662         GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2663       LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n");
2664       plugin->enable_ipv6 = GNUNET_NO;
2665     }
2666     else
2667     {
2668         memset (&serverAddrv6, '\0', sizeof (struct sockaddr_in6));
2669 #if HAVE_SOCKADDR_IN_SIN_LEN
2670       serverAddrv6.sin6_len = sizeof (struct sockaddr_in6);
2671 #endif
2672       serverAddrv6.sin6_family = AF_INET6;
2673       if (NULL != bind_v6)
2674         serverAddrv6.sin6_addr = bind_v6->sin6_addr;
2675       else
2676         serverAddrv6.sin6_addr = in6addr_any;
2677
2678       if (0 == plugin->port) /* autodetect */
2679                 serverAddrv6.sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2680       else
2681                 serverAddrv6.sin6_port = htons (plugin->port);
2682       addrlen = sizeof (struct sockaddr_in6);
2683       serverAddr = (struct sockaddr *) &serverAddrv6;
2684
2685       tries = 0;
2686       while (tries < 10)
2687       {
2688                 LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 `%s'\n",
2689                                  GNUNET_a2s (serverAddr, addrlen));
2690                 /* binding */
2691                 if (GNUNET_OK == GNUNET_NETWORK_socket_bind (plugin->sockv6,
2692                                                              serverAddr, addrlen, 0))
2693                         break;
2694                 eno = errno;
2695                 if (0 != plugin->port)
2696                 {
2697                                 tries = 10; /* fail */
2698                                 break; /* bind failed on specific port */
2699                 }
2700                 /* autodetect */
2701                 serverAddrv6.sin6_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2702                 tries ++;
2703       }
2704       if (tries >= 10)
2705       {
2706         GNUNET_NETWORK_socket_close (plugin->sockv6);
2707         plugin->enable_ipv6 = GNUNET_NO;
2708         plugin->sockv6 = NULL;
2709       }
2710
2711       if (plugin->sockv6 != NULL)
2712       {
2713         LOG (GNUNET_ERROR_TYPE_DEBUG,
2714              "IPv6 socket created on port %s\n",
2715              GNUNET_a2s (serverAddr, addrlen));
2716         addrs[sockets_created] = (struct sockaddr *) &serverAddrv6;
2717         addrlens[sockets_created] = sizeof (struct sockaddr_in6);
2718         sockets_created++;
2719       }
2720       else
2721       {
2722           LOG (GNUNET_ERROR_TYPE_ERROR,
2723                "Failed to bind UDP socket to %s: %s\n",
2724                GNUNET_a2s (serverAddr, addrlen),
2725                STRERROR (eno));
2726       }
2727     }
2728   }
2729
2730   /* Create IPv4 socket */
2731   eno = EINVAL;
2732   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
2733   if (NULL == plugin->sockv4)
2734   {
2735     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket");
2736     LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv4 since it is not supported on this system!\n");
2737     plugin->enable_ipv4 = GNUNET_NO;
2738   }
2739   else
2740   {
2741     memset (&serverAddrv4, '\0', sizeof (struct sockaddr_in));
2742 #if HAVE_SOCKADDR_IN_SIN_LEN
2743     serverAddrv4.sin_len = sizeof (struct sockaddr_in);
2744 #endif
2745     serverAddrv4.sin_family = AF_INET;
2746     if (NULL != bind_v4)
2747       serverAddrv4.sin_addr = bind_v4->sin_addr;
2748     else
2749       serverAddrv4.sin_addr.s_addr = INADDR_ANY;
2750     
2751     if (0 == plugin->port)
2752       /* autodetect */
2753       serverAddrv4.sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2754     else
2755       serverAddrv4.sin_port = htons (plugin->port);
2756     
2757     
2758     addrlen = sizeof (struct sockaddr_in);
2759     serverAddr = (struct sockaddr *) &serverAddrv4;
2760     
2761     tries = 0;
2762     while (tries < 10)
2763     {
2764       LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 `%s'\n",
2765                         GNUNET_a2s (serverAddr, addrlen));
2766       
2767       /* binding */
2768       if (GNUNET_OK == GNUNET_NETWORK_socket_bind (plugin->sockv4,
2769                                                    serverAddr, addrlen, 0))
2770                 break;
2771       eno = errno;
2772       if (0 != plugin->port)
2773       {
2774                 tries = 10; /* fail */
2775                 break; /* bind failed on specific port */
2776       }
2777       
2778       /* autodetect */
2779       serverAddrv4.sin_port = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + 32000);
2780       tries ++;
2781     }
2782     
2783     if (tries >= 10)
2784     {
2785       GNUNET_NETWORK_socket_close (plugin->sockv4);
2786       plugin->enable_ipv4 = GNUNET_NO;
2787       plugin->sockv4 = NULL;
2788     }
2789     
2790     if (plugin->sockv4 != NULL)
2791     {
2792       LOG (GNUNET_ERROR_TYPE_DEBUG,
2793                 "IPv4 socket created on port %s\n", GNUNET_a2s (serverAddr, addrlen));
2794       addrs[sockets_created] = (struct sockaddr *) &serverAddrv4;
2795       addrlens[sockets_created] = sizeof (struct sockaddr_in);
2796       sockets_created++;
2797     }
2798     else
2799     {             
2800       LOG (GNUNET_ERROR_TYPE_ERROR,
2801                 "Failed to bind UDP socket to %s: %s\n",
2802                 GNUNET_a2s (serverAddr, addrlen), STRERROR (eno));
2803     }
2804   }
2805   
2806   if (0 == sockets_created)
2807   {
2808                 LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n"));
2809                 return 0; /* No sockets created, return */
2810   }
2811
2812   /* Create file descriptors */
2813   if (plugin->enable_ipv4 == GNUNET_YES)
2814   {
2815                         plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
2816                         plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
2817                         GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
2818                         GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
2819                         if (NULL != plugin->sockv4)
2820                         {
2821                                 GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
2822                                 GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
2823                         }
2824   }
2825
2826   if (plugin->enable_ipv6 == GNUNET_YES)
2827   {
2828     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
2829     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
2830     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
2831     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
2832     if (NULL != plugin->sockv6)
2833     {
2834       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
2835       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
2836     }
2837   }
2838
2839   schedule_select (plugin);
2840   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2841                            GNUNET_NO, plugin->port,
2842                            sockets_created,
2843                            (const struct sockaddr **) addrs, addrlens,
2844                            &udp_nat_port_map_callback, NULL, plugin);
2845
2846   return sockets_created;
2847 }
2848
2849
2850 /**
2851  * The exported method. Makes the core api available via a global and
2852  * returns the udp transport API.
2853  *
2854  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
2855  * @return our 'struct GNUNET_TRANSPORT_PluginFunctions'
2856  */
2857 void *
2858 libgnunet_plugin_transport_udp_init (void *cls)
2859 {
2860   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2861   struct GNUNET_TRANSPORT_PluginFunctions *api;
2862   struct Plugin *p;
2863   unsigned long long port;
2864   unsigned long long aport;
2865   unsigned long long broadcast;
2866   unsigned long long udp_max_bps;
2867   unsigned long long enable_v6;
2868   char * bind4_address;
2869   char * bind6_address;
2870   char * fancy_interval;
2871   struct GNUNET_TIME_Relative interval;
2872   struct sockaddr_in serverAddrv4;
2873   struct sockaddr_in6 serverAddrv6;
2874   int res;
2875   int have_bind4;
2876   int have_bind6;
2877
2878   if (NULL == env->receive)
2879   {
2880     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2881        initialze the plugin or the API */
2882     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2883     api->cls = NULL;
2884     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
2885     api->address_to_string = &udp_address_to_string;
2886     api->string_to_address = &udp_string_to_address;
2887     return api;
2888   }
2889
2890   GNUNET_assert (NULL != env->stats);
2891
2892   /* Get port number: port == 0 : autodetect a port,
2893    *                                                                                            > 0 : use this port,
2894    *                                                              not given : 2086 default */
2895   if (GNUNET_OK !=
2896       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", "PORT",
2897                                              &port))
2898     port = 2086;
2899   if (GNUNET_OK !=
2900       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2901                                              "ADVERTISED_PORT", &aport))
2902     aport = port;
2903   if (port > 65535)
2904   {
2905     LOG (GNUNET_ERROR_TYPE_WARNING,
2906          _("Given `%s' option is out of range: %llu > %u\n"), "PORT", port,
2907          65535);
2908     return NULL;
2909   }
2910
2911   /* Protocols */
2912   if ((GNUNET_YES ==
2913        GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat",
2914                                              "DISABLEV6")))
2915     enable_v6 = GNUNET_NO;
2916   else
2917     enable_v6 = GNUNET_YES;
2918
2919   /* Addresses */
2920   have_bind4 = GNUNET_NO;
2921   memset (&serverAddrv4, 0, sizeof (serverAddrv4));
2922   if (GNUNET_YES ==
2923       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2924                                              "BINDTO", &bind4_address))
2925   {
2926     LOG (GNUNET_ERROR_TYPE_DEBUG,
2927          "Binding udp plugin to specific address: `%s'\n",
2928          bind4_address);
2929     if (1 != inet_pton (AF_INET, bind4_address, &serverAddrv4.sin_addr))
2930     {
2931       GNUNET_free (bind4_address);
2932       return NULL;
2933     }
2934     have_bind4 = GNUNET_YES;
2935   }
2936   GNUNET_free_non_null (bind4_address);
2937   have_bind6 = GNUNET_NO;
2938   memset (&serverAddrv6, 0, sizeof (serverAddrv6));
2939   if (GNUNET_YES ==
2940       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2941                                              "BINDTO6", &bind6_address))
2942   {
2943     LOG (GNUNET_ERROR_TYPE_DEBUG,
2944          "Binding udp plugin to specific address: `%s'\n",
2945          bind6_address);
2946     if (1 !=
2947         inet_pton (AF_INET6, bind6_address, &serverAddrv6.sin6_addr))
2948     {
2949       LOG (GNUNET_ERROR_TYPE_ERROR, _("Invalid IPv6 address: `%s'\n"),
2950            bind6_address);
2951       GNUNET_free (bind6_address);
2952       return NULL;
2953     }
2954     have_bind6 = GNUNET_YES;
2955   }
2956   GNUNET_free_non_null (bind6_address);
2957
2958   /* Initialize my flags */
2959   myoptions = 0;
2960
2961   /* Enable neighbour discovery */
2962   broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp",
2963                                             "BROADCAST");
2964   if (broadcast == GNUNET_SYSERR)
2965     broadcast = GNUNET_NO;
2966
2967   if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
2968                                            "BROADCAST_INTERVAL", &fancy_interval))
2969   {
2970     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
2971   }
2972   else
2973   {
2974      if (GNUNET_SYSERR == GNUNET_STRINGS_fancy_time_to_relative(fancy_interval, &interval))
2975      {
2976        interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
2977      }
2978      GNUNET_free (fancy_interval);
2979   }
2980
2981   /* Maximum datarate */
2982   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
2983                                              "MAX_BPS", &udp_max_bps))
2984   {
2985     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
2986   }
2987
2988   p = GNUNET_malloc (sizeof (struct Plugin));
2989   p->port = port;
2990   p->aport = aport;
2991   p->broadcast_interval = interval;
2992   p->enable_ipv6 = enable_v6;
2993   p->enable_ipv4 = GNUNET_YES; /* default */
2994   p->env = env;
2995   p->sessions = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
2996   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2997   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
2998   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
2999                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
3000   plugin = p;
3001
3002   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
3003   res = setup_sockets (p, (GNUNET_YES == have_bind6) ? &serverAddrv6 : NULL,
3004                                                                                 (GNUNET_YES == have_bind4) ? &serverAddrv4 : NULL);
3005   if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
3006   {
3007     LOG (GNUNET_ERROR_TYPE_ERROR,
3008          _("Failed to create network sockets, plugin failed\n"));
3009     GNUNET_CONTAINER_multihashmap_destroy (p->sessions);
3010     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3011     GNUNET_SERVER_mst_destroy (p->mst);
3012     GNUNET_free (p);
3013     return NULL;
3014   }
3015   else if (broadcast == GNUNET_YES)
3016   {
3017     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
3018     setup_broadcast (p, &serverAddrv6, &serverAddrv4);
3019   }
3020
3021   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
3022   api->cls = p;
3023   api->send = NULL;
3024   api->disconnect = &udp_disconnect;
3025   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3026   api->address_to_string = &udp_address_to_string;
3027   api->string_to_address = &udp_string_to_address;
3028   api->check_address = &udp_plugin_check_address;
3029   api->get_session = &udp_plugin_get_session;
3030   api->send = &udp_plugin_send;
3031   api->get_network = &udp_get_network;
3032
3033   return api;
3034 }
3035
3036
3037 static int
3038 heap_cleanup_iterator (void *cls,
3039                        struct GNUNET_CONTAINER_HeapNode *
3040                        node, void *element,
3041                        GNUNET_CONTAINER_HeapCostType
3042                        cost)
3043 {
3044   struct DefragContext * d_ctx = element;
3045
3046   GNUNET_CONTAINER_heap_remove_node (node);
3047   GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag);
3048   GNUNET_free (d_ctx);
3049
3050   return GNUNET_YES;
3051 }
3052
3053
3054 /**
3055  * The exported method. Makes the core api available via a global and
3056  * returns the udp transport API.
3057  *
3058  * @param cls our 'struct GNUNET_TRANSPORT_PluginEnvironment'
3059  * @return NULL
3060  */
3061 void *
3062 libgnunet_plugin_transport_udp_done (void *cls)
3063 {
3064   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3065   struct Plugin *plugin = api->cls;
3066
3067   if (NULL == plugin)
3068   {
3069     GNUNET_free (api);
3070     return NULL;
3071   }
3072
3073   stop_broadcast (plugin);
3074   if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
3075   {
3076     GNUNET_SCHEDULER_cancel (plugin->select_task);
3077     plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
3078   }
3079   if (plugin->select_task_v6 != GNUNET_SCHEDULER_NO_TASK)
3080   {
3081     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3082     plugin->select_task_v6 = GNUNET_SCHEDULER_NO_TASK;
3083   }
3084
3085   /* Closing sockets */
3086   if (GNUNET_YES ==plugin->enable_ipv4)
3087   {
3088                 if (plugin->sockv4 != NULL)
3089                 {
3090                         GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv4));
3091                         plugin->sockv4 = NULL;
3092                 }
3093                 GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
3094                 GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
3095   }
3096   if (GNUNET_YES ==plugin->enable_ipv6)
3097   {
3098                 if (plugin->sockv6 != NULL)
3099                 {
3100                         GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6));
3101                         plugin->sockv6 = NULL;
3102
3103                         GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
3104                         GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
3105                 }
3106   }
3107   if (NULL != plugin->nat)
3108         GNUNET_NAT_unregister (plugin->nat);
3109
3110   if (plugin->defrag_ctxs != NULL)
3111   {
3112     GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs,
3113         heap_cleanup_iterator, NULL);
3114     GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs);
3115     plugin->defrag_ctxs = NULL;
3116   }
3117   if (plugin->mst != NULL)
3118   {
3119     GNUNET_SERVER_mst_destroy(plugin->mst);
3120     plugin->mst = NULL;
3121   }
3122
3123   /* Clean up leftover messages */
3124   struct UDP_MessageWrapper * udpw;
3125   udpw = plugin->ipv4_queue_head;
3126   while (udpw != NULL)
3127   {
3128     struct UDP_MessageWrapper *tmp = udpw->next;
3129     dequeue (plugin, udpw);
3130     call_continuation(udpw, GNUNET_SYSERR);
3131     GNUNET_free (udpw);
3132
3133     udpw = tmp;
3134   }
3135   udpw = plugin->ipv6_queue_head;
3136   while (udpw != NULL)
3137   {
3138     struct UDP_MessageWrapper *tmp = udpw->next;
3139     dequeue (plugin, udpw);
3140     call_continuation(udpw, GNUNET_SYSERR);
3141     GNUNET_free (udpw);
3142
3143     udpw = tmp;
3144   }
3145
3146   /* Clean up sessions */
3147   LOG (GNUNET_ERROR_TYPE_DEBUG,
3148        "Cleaning up sessions\n");
3149   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, &disconnect_and_free_it, plugin);
3150   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessions);
3151
3152   plugin->nat = NULL;
3153   GNUNET_free (plugin);
3154   GNUNET_free (api);
3155 #if DEBUG_MALLOC
3156   struct Allocation *allocation;
3157   while (NULL != ahead)
3158   {
3159       allocation = ahead;
3160       GNUNET_CONTAINER_DLL_remove (ahead, atail, allocation);
3161       GNUNET_free (allocation);
3162   }
3163   struct Allocator *allocator;
3164   while (NULL != aehead)
3165   {
3166       allocator = aehead;
3167       GNUNET_CONTAINER_DLL_remove (aehead, aetail, allocator);
3168       GNUNET_free (allocator);
3169   }
3170 #endif
3171   return NULL;
3172 }
3173
3174
3175 /* end of plugin_transport_udp.c */