/*
This file is part of GNUnet
- Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2010-2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
/**
#include "gnunet_hello_lib.h"
#include "gnunet_util_lib.h"
#include "gnunet_fragmentation_lib.h"
-#include "gnunet_nat_lib.h"
+#include "gnunet_nat_service.h"
#include "gnunet_protocols.h"
#include "gnunet_resolver_service.h"
#include "gnunet_signatures.h"
/**
* Session with another peer.
*/
-struct Session
+struct GNUNET_ATS_Session
{
/**
* Which peer is this session for?
*/
struct GNUNET_PeerIdentity target;
+ /**
+ * Tokenizer for inbound messages.
+ */
+ struct GNUNET_MessageStreamTokenizer *mst;
+
/**
* Plugin this session belongs to.
*/
struct GNUNET_TIME_Relative flow_delay_for_other_peer;
/**
- * Desired delay for next sending we received from other peer
+ * Desired delay for transmissions we received from other peer.
+ * This is for full messages, the value needs to be adjusted for
+ * fragmented messages.
*/
- struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
+ struct GNUNET_TIME_Relative flow_delay_from_other_peer;
/**
* Session timeout task
*/
struct GNUNET_TIME_Absolute timeout;
+ /**
+ * What time did we last transmit?
+ */
+ struct GNUNET_TIME_Absolute last_transmit_time;
+
/**
* expected delay for ACKs
*/
/**
* The session this fragmentation context belongs to
*/
- struct Session *session;
+ struct GNUNET_ATS_Session *session;
/**
* Function to call upon completion of the transmission.
*/
void *cont_cls;
+ /**
+ * Start time.
+ */
+ struct GNUNET_TIME_Absolute start_time;
+
+ /**
+ * Transmission time for the next fragment. Incremented by
+ * the @e flow_delay_from_other_peer for each fragment when
+ * we setup the fragments.
+ */
+ struct GNUNET_TIME_Absolute next_frag_time;
+
+ /**
+ * Desired delay for transmissions we received from other peer.
+ * Adjusted to be per fragment (UDP_MTU), even though on the
+ * wire it was for "full messages".
+ */
+ struct GNUNET_TIME_Relative flow_delay_from_other_peer;
+
/**
* Message timeout
*/
/**
* Session this message belongs to
*/
- struct Session *session;
+ struct GNUNET_ATS_Session *session;
/**
* DLL of messages, previous element
*/
struct UDP_FragmentationContext *frag_ctx;
+ /**
+ * Message enqueue time.
+ */
+ struct GNUNET_TIME_Absolute start_time;
+
+ /**
+ * Desired transmission time for this message, based on the
+ * flow limiting information we got from the other peer.
+ */
+ struct GNUNET_TIME_Absolute transmission_time;
+
/**
* Message timeout.
*/
/**
* Desired delay for flow control, in us (in NBO).
+ * A value of UINT32_MAX indicates that the other
+ * peer wants us to disconnect.
*/
uint32_t delay GNUNET_PACKED;
*/
static void
notify_session_monitor (struct Plugin *plugin,
- struct Session *session,
+ struct GNUNET_ATS_Session *session,
enum GNUNET_TRANSPORT_SessionState state)
{
struct GNUNET_TRANSPORT_SessionInfo info;
*
* @param cls the `struct Plugin` with the monitor callback (`sic`)
* @param peer peer we send information about
- * @param value our `struct Session` to send information about
+ * @param value our `struct GNUNET_ATS_Session` to send information about
* @return #GNUNET_OK (continue to iterate)
*/
static int
void *value)
{
struct Plugin *plugin = cls;
- struct Session *session = value;
+ struct GNUNET_ATS_Session *session = value;
notify_session_monitor (plugin,
session,
* @param s session to free
*/
static void
-free_session (struct Session *s)
+free_session (struct GNUNET_ATS_Session *s)
{
if (NULL != s->address)
{
GNUNET_free (s->frag_ctx);
s->frag_ctx = NULL;
}
+ if (NULL != s->mst)
+ {
+ GNUNET_MST_destroy (s->mst);
+ s->mst = NULL;
+ }
GNUNET_free (s);
}
* @return the network type
*/
static enum GNUNET_ATS_Network_Type
-udp_get_network (void *cls,
- struct Session *session)
+udp_plugin_get_network (void *cls,
+ struct GNUNET_ATS_Session *session)
{
return session->scope;
}
+/**
+ * Function obtain the network type for an address.
+ *
+ * @param cls closure (`struct Plugin *`)
+ * @param address the address
+ * @return the network type
+ */
+static enum GNUNET_ATS_Network_Type
+udp_plugin_get_network_for_address (void *cls,
+ const struct GNUNET_HELLO_Address *address)
+{
+ struct Plugin *plugin = cls;
+ size_t addrlen;
+ struct sockaddr_in a4;
+ struct sockaddr_in6 a6;
+ const struct IPv4UdpAddress *u4;
+ const struct IPv6UdpAddress *u6;
+ const void *sb;
+ size_t sbs;
+
+ addrlen = address->address_length;
+ if (addrlen == sizeof(struct IPv6UdpAddress))
+ {
+ GNUNET_assert (NULL != address->address); /* make static analysis happy */
+ u6 = address->address;
+ memset (&a6, 0, sizeof(a6));
+#if HAVE_SOCKADDR_IN_SIN_LEN
+ a6.sin6_len = sizeof (a6);
+#endif
+ a6.sin6_family = AF_INET6;
+ a6.sin6_port = u6->u6_port;
+ GNUNET_memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
+ sb = &a6;
+ sbs = sizeof(a6);
+ }
+ else if (addrlen == sizeof(struct IPv4UdpAddress))
+ {
+ GNUNET_assert (NULL != address->address); /* make static analysis happy */
+ u4 = address->address;
+ memset (&a4, 0, sizeof(a4));
+#if HAVE_SOCKADDR_IN_SIN_LEN
+ a4.sin_len = sizeof (a4);
+#endif
+ a4.sin_family = AF_INET;
+ a4.sin_port = u4->u4_port;
+ a4.sin_addr.s_addr = u4->ipv4_addr;
+ sb = &a4;
+ sbs = sizeof(a4);
+ }
+ else
+ {
+ GNUNET_break (0);
+ return GNUNET_ATS_NET_UNSPECIFIED;
+ }
+ return plugin->env->get_address_type (plugin->env->cls,
+ sb,
+ sbs);
+}
+
+
/* ******************* Event loop ******************** */
/**
* Then reschedule this function to be called again once more is available.
*
* @param cls the plugin handle
- * @param tc the scheduling context (for rescheduling this function again)
*/
static void
-udp_plugin_select_v4 (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
+udp_plugin_select_v4 (void *cls);
/**
* Then reschedule this function to be called again once more is available.
*
* @param cls the plugin handle
- * @param tc the scheduling context (for rescheduling this function again)
*/
static void
-udp_plugin_select_v6 (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
+udp_plugin_select_v6 (void *cls);
/**
schedule_select_v4 (struct Plugin *plugin)
{
struct GNUNET_TIME_Relative min_delay;
+ struct GNUNET_TIME_Relative delay;
struct UDP_MessageWrapper *udpw;
+ struct UDP_MessageWrapper *min_udpw;
if ( (GNUNET_YES == plugin->enable_ipv4) &&
(NULL != plugin->sockv4) )
/* Find a message ready to send:
* Flow delay from other peer is expired or not set (0) */
min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+ min_udpw = NULL;
for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
- min_delay = GNUNET_TIME_relative_min (min_delay,
- GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+ {
+ delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
+ if (delay.rel_value_us < min_delay.rel_value_us)
+ {
+ min_delay = delay;
+ min_udpw = udpw;
+ }
+ }
if (NULL != plugin->select_task_v4)
GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
+ if (NULL != min_udpw)
+ {
+ if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Calculated flow delay for UDPv4 at %s for %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES),
+ GNUNET_i2s (&min_udpw->session->target));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calculated flow delay for UDPv4 at %s for %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES),
+ GNUNET_i2s (&min_udpw->session->target));
+ }
+ }
plugin->select_task_v4
= GNUNET_SCHEDULER_add_read_net (min_delay,
plugin->sockv4,
schedule_select_v6 (struct Plugin *plugin)
{
struct GNUNET_TIME_Relative min_delay;
+ struct GNUNET_TIME_Relative delay;
struct UDP_MessageWrapper *udpw;
+ struct UDP_MessageWrapper *min_udpw;
if ( (GNUNET_YES == plugin->enable_ipv6) &&
(NULL != plugin->sockv6) )
{
min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
+ min_udpw = NULL;
for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
- min_delay = GNUNET_TIME_relative_min (min_delay,
- GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+ {
+ delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
+ if (delay.rel_value_us < min_delay.rel_value_us)
+ {
+ min_delay = delay;
+ min_udpw = udpw;
+ }
+ }
if (NULL != plugin->select_task_v6)
GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
+ if (NULL != min_udpw)
+ {
+ if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Calculated flow delay for UDPv6 at %s for %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES),
+ GNUNET_i2s (&min_udpw->session->target));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calculated flow delay for UDPv6 at %s for %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES),
+ GNUNET_i2s (&min_udpw->session->target));
+ }
+ }
plugin->select_task_v6
= GNUNET_SCHEDULER_add_read_net (min_delay,
plugin->sockv6,
if (sizeof(struct IPv4UdpAddress) == addrlen)
{
+ struct sockaddr_in s4;
+
v4 = (const struct IPv4UdpAddress *) addr;
if (GNUNET_OK != check_port (plugin,
ntohs (v4->u4_port)))
return GNUNET_SYSERR;
+ memset (&s4, 0, sizeof (s4));
+ s4.sin_family = AF_INET;
+#if HAVE_SOCKADDR_IN_SIN_LEN
+ s4.sin_len = sizeof (s4);
+#endif
+ s4.sin_port = v4->u4_port;
+ s4.sin_addr.s_addr = v4->ipv4_addr;
+
if (GNUNET_OK !=
- GNUNET_NAT_test_address (plugin->nat,
- &v4->ipv4_addr,
- sizeof (struct in_addr)))
+ GNUNET_NAT_test_address (plugin->nat,
+ &s4,
+ sizeof (struct sockaddr_in)))
return GNUNET_SYSERR;
}
else if (sizeof(struct IPv6UdpAddress) == addrlen)
{
+ struct sockaddr_in6 s6;
+
v6 = (const struct IPv6UdpAddress *) addr;
if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (GNUNET_OK != check_port (plugin,
- ntohs (v6->u6_port)))
- return GNUNET_SYSERR;
+ return GNUNET_OK; /* plausible, if unlikely... */
+ memset (&s6, 0, sizeof (s6));
+ s6.sin6_family = AF_INET6;
+#if HAVE_SOCKADDR_IN_SIN_LEN
+ s6.sin6_len = sizeof (s6);
+#endif
+ s6.sin6_port = v6->u6_port;
+ s6.sin6_addr = v6->ipv6_addr;
+
if (GNUNET_OK !=
- GNUNET_NAT_test_address (plugin->nat,
- &v6->ipv6_addr,
- sizeof (struct in6_addr)))
+ GNUNET_NAT_test_address (plugin->nat,
+ &s6,
+ sizeof(struct sockaddr_in6)))
return GNUNET_SYSERR;
}
else
* @param cls closure, the `struct Plugin`
* @param add_remove #GNUNET_YES to mean the new public IP address,
* #GNUNET_NO to mean the previous (now invalid) one
+ * @param ac address class the address belongs to
* @param addr either the previous or the new public IP address
* @param addrlen actual length of the @a addr
*/
static void
udp_nat_port_map_callback (void *cls,
int add_remove,
+ enum GNUNET_NAT_AddressClass ac,
const struct sockaddr *addr,
socklen_t addrlen)
{
GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
i4 = (const struct sockaddr_in *) addr;
if (0 == ntohs (i4->sin_port))
- {
- GNUNET_break (0);
- return;
- }
+ return; /* Port = 0 means unmapped, ignore these for UDP. */
memset (&u4,
0,
sizeof(u4));
GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
i6 = (const struct sockaddr_in6 *) addr;
if (0 == ntohs (i6->sin6_port))
- {
- GNUNET_break (0);
- return;
- }
+ return; /* Port = 0 means unmapped, ignore these for UDP. */
memset (&u6,
0,
sizeof(u6));
return;
}
/* modify our published address list */
+ /* TODO: use 'ac' here in the future... */
address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
PLUGIN_NAME,
arg,
/**
* Closure for #session_cmp_it().
*/
-struct SessionCompareContext
+struct GNUNET_ATS_SessionCompareContext
{
/**
* Set to session matching the address.
*/
- struct Session *res;
+ struct GNUNET_ATS_Session *res;
/**
* Address we are looking for.
/**
* Find a session with a matching address.
*
- * @param cls the `struct SessionCompareContext *`
+ * @param cls the `struct GNUNET_ATS_SessionCompareContext *`
* @param key peer identity (unused)
- * @param value the `struct Session *`
+ * @param value the `struct GNUNET_ATS_Session *`
* @return #GNUNET_NO if we found the session, #GNUNET_OK if not
*/
static int
const struct GNUNET_PeerIdentity *key,
void *value)
{
- struct SessionCompareContext *cctx = cls;
- struct Session *s = value;
+ struct GNUNET_ATS_SessionCompareContext *cctx = cls;
+ struct GNUNET_ATS_Session *s = value;
if (0 == GNUNET_HELLO_address_cmp (s->address,
cctx->address))
* @param address the address we should locate the session by
* @return the session if it exists, or NULL if it is not found
*/
-static struct Session *
+static struct GNUNET_ATS_Session *
udp_plugin_lookup_session (void *cls,
const struct GNUNET_HELLO_Address *address)
{
struct Plugin *plugin = cls;
const struct IPv6UdpAddress *udp_a6;
const struct IPv4UdpAddress *udp_a4;
- struct SessionCompareContext cctx;
+ struct GNUNET_ATS_SessionCompareContext cctx;
if (NULL == address->address)
{
* @param s session to reschedule timeout activity for
*/
static void
-reschedule_session_timeout (struct Session *s)
+reschedule_session_timeout (struct GNUNET_ATS_Session *s)
{
if (GNUNET_YES == s->in_destroy)
return;
static void
udp_plugin_update_session_timeout (void *cls,
const struct GNUNET_PeerIdentity *peer,
- struct Session *session)
+ struct GNUNET_ATS_Session *session)
{
struct Plugin *plugin = cls;
dequeue (struct Plugin *plugin,
struct UDP_MessageWrapper *udpw)
{
- struct Session *session = udpw->session;
+ struct GNUNET_ATS_Session *session = udpw->session;
if (plugin->bytes_in_buffer < udpw->msg_size)
{
enqueue (struct Plugin *plugin,
struct UDP_MessageWrapper *udpw)
{
- struct Session *session = udpw->session;
+ struct GNUNET_ATS_Session *session = udpw->session;
if (GNUNET_YES == session->in_destroy)
{
GNUNET_break (0);
+ GNUNET_free (udpw);
return;
}
- if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
+ if (plugin->bytes_in_buffer > INT64_MAX - udpw->msg_size)
{
GNUNET_break (0);
}
int result)
{
struct Plugin *plugin = frag_ctx->plugin;
- struct Session *s = frag_ctx->session;
+ struct GNUNET_ATS_Session *s = frag_ctx->session;
struct UDP_MessageWrapper *udpw;
struct UDP_MessageWrapper *tmp;
size_t overhead;
+ struct GNUNET_TIME_Relative delay;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%p: Fragmented message removed with result %s\n",
overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
else
overhead = frag_ctx->on_wire_size;
+ delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
+ if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Fragmented message acknowledged after %s (expected at %s)\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES),
+ GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Fragmented message acknowledged after %s (expected at %s)\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES),
+ GNUNET_STRINGS_absolute_time_to_string (frag_ctx->next_frag_time));
+ }
+
if (NULL != frag_ctx->cont)
frag_ctx->cont (frag_ctx->cont_cls,
&s->target,
GNUNET_assert (NULL != udpw->frag_ctx);
if (GNUNET_OK == result)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Fragment of message with %u bytes transmitted to %s\n",
+ (unsigned int) udpw->payload_size,
+ GNUNET_i2s (&udpw->session->target));
GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
GNUNET_STATISTICS_update (plugin->env->stats,
"# UDP, fragmented msgs, fragments, sent, success",
}
else
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to transmit fragment of message with %u bytes to %s\n",
+ (unsigned int) udpw->payload_size,
+ GNUNET_i2s (&udpw->session->target));
fragmented_message_done (udpw->frag_ctx,
GNUNET_SYSERR);
GNUNET_STATISTICS_update (plugin->env->stats,
struct UDP_FragmentationContext *frag_ctx = cls;
struct Plugin *plugin = frag_ctx->plugin;
struct UDP_MessageWrapper *udpw;
- struct Session *session = frag_ctx->session;
+ struct GNUNET_ATS_Session *session = frag_ctx->session;
size_t msg_len = ntohs (msg->size);
LOG (GNUNET_ERROR_TYPE_DEBUG,
udpw->msg_size = msg_len;
udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
udpw->timeout = frag_ctx->timeout;
+ udpw->start_time = frag_ctx->start_time;
+ udpw->transmission_time = frag_ctx->next_frag_time;
+ frag_ctx->next_frag_time
+ = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
+ frag_ctx->flow_delay_from_other_peer);
udpw->frag_ctx = frag_ctx;
udpw->qc = &qc_fragment_sent;
udpw->qc_cls = plugin;
- memcpy (udpw->msg_buf,
- msg,
- msg_len);
+ GNUNET_memcpy (udpw->msg_buf,
+ msg,
+ msg_len);
enqueue (plugin,
udpw);
- if (sizeof (struct IPv4UdpAddress) == session->address->address_length)
+ if (session->address->address_length == sizeof (struct IPv4UdpAddress))
schedule_select_v4 (plugin);
else
schedule_select_v6 (plugin);
{
struct Plugin *plugin = cls;
size_t overhead;
+ struct GNUNET_TIME_Relative delay;
if (udpw->msg_size >= udpw->payload_size)
overhead = udpw->msg_size - udpw->payload_size;
overhead = udpw->msg_size;
if (NULL != udpw->cont)
+ {
+ delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
+ if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Message sent via UDP with delay of %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message sent via UDP with delay of %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ }
udpw->cont (udpw->cont_cls,
&udpw->session->target,
result,
udpw->payload_size,
overhead);
+ }
if (GNUNET_OK == result)
{
GNUNET_STATISTICS_update (plugin->env->stats,
*/
static ssize_t
udp_plugin_send (void *cls,
- struct Session *s,
+ struct GNUNET_ATS_Session *s,
const char *msgbuf,
size_t msgbuf_size,
unsigned int priority,
struct UDP_MessageWrapper *udpw;
struct UDPMessage *udp;
char mbuf[udpmlen] GNUNET_ALIGN;
+ struct GNUNET_TIME_Relative latency;
if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
(NULL == plugin->sockv6) )
if ( (sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
(NULL == plugin->sockv4) )
return GNUNET_SYSERR;
- if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ if (udpmlen >= GNUNET_MAX_MESSAGE_SIZE)
{
GNUNET_break (0);
return GNUNET_SYSERR;
udpw->msg_buf = (char *) &udpw[1];
udpw->msg_size = udpmlen; /* message size with UDP overhead */
udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
+ udpw->start_time = GNUNET_TIME_absolute_get ();
udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
+ udpw->transmission_time = s->last_transmit_time;
+ s->last_transmit_time
+ = GNUNET_TIME_absolute_add (s->last_transmit_time,
+ s->flow_delay_from_other_peer);
udpw->cont = cont;
udpw->cont_cls = cont_cls;
udpw->frag_ctx = NULL;
udpw->qc = &qc_message_sent;
udpw->qc_cls = plugin;
- memcpy (udpw->msg_buf,
+ GNUNET_memcpy (udpw->msg_buf,
udp,
sizeof (struct UDPMessage));
- memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
+ GNUNET_memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
msgbuf,
msgbuf_size);
enqueue (plugin,
GNUNET_NO);
GNUNET_STATISTICS_update (plugin->env->stats,
"# UDP, unfragmented bytes payload queued total",
- udpw->payload_size,
+ msgbuf_size,
GNUNET_NO);
+ if (s->address->address_length == sizeof (struct IPv4UdpAddress))
+ schedule_select_v4 (plugin);
+ else
+ schedule_select_v6 (plugin);
}
else
{
/* fragmented message */
if (NULL != s->frag_ctx)
return GNUNET_SYSERR;
- memcpy (&udp[1],
+ GNUNET_memcpy (&udp[1],
msgbuf,
msgbuf_size);
frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
frag_ctx->session = s;
frag_ctx->cont = cont;
frag_ctx->cont_cls = cont_cls;
+ frag_ctx->start_time = GNUNET_TIME_absolute_get ();
+ frag_ctx->next_frag_time = s->last_transmit_time;
+ frag_ctx->flow_delay_from_other_peer
+ = GNUNET_TIME_relative_divide (s->flow_delay_from_other_peer,
+ 1 + (msgbuf_size /
+ UDP_MTU));
frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
&enqueue_fragment,
frag_ctx);
s->frag_ctx = frag_ctx;
+ s->last_transmit_time = frag_ctx->next_frag_time;
+ latency = GNUNET_TIME_absolute_get_remaining (s->last_transmit_time);
+ if (latency.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
+ GNUNET_STRINGS_relative_time_to_string (latency,
+ GNUNET_YES),
+ GNUNET_i2s (&s->target),
+ (unsigned int) s->msgs_in_queue);
+ else
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Enqueued fragments will take %s for transmission to %s (queue size: %u)\n",
+ GNUNET_STRINGS_relative_time_to_string (latency,
+ GNUNET_YES),
+ GNUNET_i2s (&s->target),
+ (unsigned int) s->msgs_in_queue);
+
GNUNET_STATISTICS_update (plugin->env->stats,
"# UDP, fragmented messages active",
1,
notify_session_monitor (s->plugin,
s,
GNUNET_TRANSPORT_SS_UPDATE);
- if (s->address->address_length == sizeof (struct IPv4UdpAddress))
- schedule_select_v4 (plugin);
- else
- schedule_select_v6 (plugin);
return udpmlen;
}
-/**
- * Handle an ACK message.
- *
- * @param plugin the UDP plugin
- * @param msg the (presumed) UDP ACK message
- * @param udp_addr sender address
- * @param udp_addr_len number of bytes in @a udp_addr
- */
-static void
-read_process_ack (struct Plugin *plugin,
- const struct GNUNET_MessageHeader *msg,
- const union UdpAddress *udp_addr,
- socklen_t udp_addr_len)
-{
- const struct GNUNET_MessageHeader *ack;
- const struct UDP_ACK_Message *udp_ack;
- struct GNUNET_HELLO_Address *address;
- struct Session *s;
- struct GNUNET_TIME_Relative flow_delay;
-
- if (ntohs (msg->size)
- < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
- {
- GNUNET_break_op (0);
- return;
- }
- udp_ack = (const struct UDP_ACK_Message *) msg;
- address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
- PLUGIN_NAME,
- udp_addr,
- udp_addr_len,
- GNUNET_HELLO_ADDRESS_INFO_NONE);
- s = udp_plugin_lookup_session (plugin,
- address);
- if (NULL == s)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "UDP session of address %s for ACK not found\n",
- udp_address_to_string (plugin,
- address->address,
- address->address_length));
- GNUNET_HELLO_address_free (address);
- return;
- }
- if (NULL == s->frag_ctx)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
- "Fragmentation context of address %s for ACK not found\n",
- udp_address_to_string (plugin,
- address->address,
- address->address_length));
- GNUNET_HELLO_address_free (address);
- return;
- }
- GNUNET_HELLO_address_free (address);
-
- flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "We received a sending delay of %s for %s\n",
- GNUNET_STRINGS_relative_time_to_string (flow_delay,
- GNUNET_YES),
- GNUNET_i2s (&udp_ack->sender));
- s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
-
- ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
- if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
- {
- GNUNET_break_op(0);
- return;
- }
-
- if (GNUNET_OK !=
- GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
- ack))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
- (unsigned int) ntohs (msg->size),
- GNUNET_i2s (&udp_ack->sender),
- udp_address_to_string (plugin,
- udp_addr,
- udp_addr_len));
- /* Expect more ACKs to arrive */
- return;
- }
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Message from %s at %s full ACK'ed\n",
- GNUNET_i2s (&udp_ack->sender),
- udp_address_to_string (plugin,
- udp_addr,
- udp_addr_len));
-
- /* Remove fragmented message after successful sending */
- fragmented_message_done (s->frag_ctx,
- GNUNET_OK);
-}
-
-
/* ********************** Receiving ********************** */
/**
* Session associated with this context.
*/
- struct Session *session;
+ struct GNUNET_ATS_Session *session;
/**
* Address to find.
}
-/**
- * Message tokenizer has broken up an incomming message. Pass it on
- * to the service.
- *
- * @param cls the `struct Plugin *`
- * @param client the `struct Session *`
- * @param hdr the actual message
- * @return #GNUNET_OK (always)
- */
-static int
-process_inbound_tokenized_messages (void *cls,
- void *client,
- const struct GNUNET_MessageHeader *hdr)
-{
- struct Plugin *plugin = cls;
- struct Session *session = client;
-
- if (GNUNET_YES == session->in_destroy)
- return GNUNET_OK;
- reschedule_session_timeout (session);
- session->flow_delay_for_other_peer
- = plugin->env->receive (plugin->env->cls,
- session->address,
- session,
- hdr);
- return GNUNET_OK;
-}
-
-
/**
* Functions with this signature are called whenever we need to close
* a session due to a disconnect or failure to establish a connection.
*/
static int
udp_disconnect_session (void *cls,
- struct Session *s)
+ struct GNUNET_ATS_Session *s)
{
struct Plugin *plugin = cls;
struct UDP_MessageWrapper *udpw;
}
+/**
+ * Handle a #GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK message.
+ *
+ * @param plugin the UDP plugin
+ * @param msg the (presumed) UDP ACK message
+ * @param udp_addr sender address
+ * @param udp_addr_len number of bytes in @a udp_addr
+ */
+static void
+read_process_ack (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ const union UdpAddress *udp_addr,
+ socklen_t udp_addr_len)
+{
+ const struct GNUNET_MessageHeader *ack;
+ const struct UDP_ACK_Message *udp_ack;
+ struct GNUNET_HELLO_Address *address;
+ struct GNUNET_ATS_Session *s;
+ struct GNUNET_TIME_Relative flow_delay;
+
+ /* check message format */
+ if (ntohs (msg->size)
+ < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ udp_ack = (const struct UDP_ACK_Message *) msg;
+ ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+ if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
+ {
+ GNUNET_break_op(0);
+ return;
+ }
+
+ /* Locate session */
+ address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
+ PLUGIN_NAME,
+ udp_addr,
+ udp_addr_len,
+ GNUNET_HELLO_ADDRESS_INFO_NONE);
+ s = udp_plugin_lookup_session (plugin,
+ address);
+ if (NULL == s)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "UDP session of address %s for ACK not found\n",
+ udp_address_to_string (plugin,
+ address->address,
+ address->address_length));
+ GNUNET_HELLO_address_free (address);
+ return;
+ }
+ if (NULL == s->frag_ctx)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+ "Fragmentation context of address %s for ACK (%s) not found\n",
+ udp_address_to_string (plugin,
+ address->address,
+ address->address_length),
+ GNUNET_FRAGMENT_print_ack (ack));
+ GNUNET_HELLO_address_free (address);
+ return;
+ }
+ GNUNET_HELLO_address_free (address);
+
+ /* evaluate flow delay: how long should we wait between messages? */
+ if (UINT32_MAX == ntohl (udp_ack->delay))
+ {
+ /* Other peer asked for us to terminate the session */
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "Asked to disconnect UDP session of %s\n",
+ GNUNET_i2s (&udp_ack->sender));
+ udp_disconnect_session (plugin,
+ s);
+ return;
+ }
+ flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
+ if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "We received a sending delay of %s for %s\n",
+ GNUNET_STRINGS_relative_time_to_string (flow_delay,
+ GNUNET_YES),
+ GNUNET_i2s (&udp_ack->sender));
+ else
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "We received a sending delay of %s for %s\n",
+ GNUNET_STRINGS_relative_time_to_string (flow_delay,
+ GNUNET_YES),
+ GNUNET_i2s (&udp_ack->sender));
+ /* Flow delay is for the reassembled packet, however, our delay
+ is per packet, so we need to adjust: */
+ s->flow_delay_from_other_peer = flow_delay;
+
+ /* Handle ACK */
+ if (GNUNET_OK !=
+ GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
+ ack))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_i2s (&udp_ack->sender),
+ udp_address_to_string (plugin,
+ udp_addr,
+ udp_addr_len));
+ /* Expect more ACKs to arrive */
+ return;
+ }
+
+ /* Remove fragmented message after successful sending */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message from %s at %s full ACK'ed\n",
+ GNUNET_i2s (&udp_ack->sender),
+ udp_address_to_string (plugin,
+ udp_addr,
+ udp_addr_len));
+ fragmented_message_done (s->frag_ctx,
+ GNUNET_OK);
+}
+
+
+/**
+ * Message tokenizer has broken up an incomming message. Pass it on
+ * to the service.
+ *
+ * @param cls the `struct GNUNET_ATS_Session *`
+ * @param hdr the actual message
+ * @return #GNUNET_OK (always)
+ */
+static int
+process_inbound_tokenized_messages (void *cls,
+ const struct GNUNET_MessageHeader *hdr)
+{
+ struct GNUNET_ATS_Session *session = cls;
+ struct Plugin *plugin = session->plugin;
+
+ if (GNUNET_YES == session->in_destroy)
+ return GNUNET_OK;
+ reschedule_session_timeout (session);
+ session->flow_delay_for_other_peer
+ = plugin->env->receive (plugin->env->cls,
+ session->address,
+ session,
+ hdr);
+ return GNUNET_OK;
+}
+
+
/**
* Destroy a session, plugin is being unloaded.
*
/**
* Session was idle, so disconnect it.
*
- * @param cls the `struct Session` to time out
- * @param tc scheduler context
+ * @param cls the `struct GNUNET_ATS_Session` to time out
*/
static void
-session_timeout (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+session_timeout (void *cls)
{
- struct Session *s = cls;
+ struct GNUNET_ATS_Session *s = cls;
struct Plugin *plugin = s->plugin;
struct GNUNET_TIME_Relative left;
* @param network_type network type the address belongs to
* @return NULL on error, otherwise session handle
*/
-static struct Session *
+static struct GNUNET_ATS_Session *
udp_plugin_create_session (void *cls,
const struct GNUNET_HELLO_Address *address,
enum GNUNET_ATS_Network_Type network_type)
{
struct Plugin *plugin = cls;
- struct Session *s;
+ struct GNUNET_ATS_Session *s;
- s = GNUNET_new (struct Session);
+ s = GNUNET_new (struct GNUNET_ATS_Session);
+ s->mst = GNUNET_MST_create (&process_inbound_tokenized_messages,
+ s);
s->plugin = plugin;
s->address = GNUNET_HELLO_address_copy (address);
s->target = address->peer;
+ s->last_transmit_time = GNUNET_TIME_absolute_get ();
s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
250);
s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
- s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
+ s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
* @param address the address
* @return the session or NULL of max connections exceeded
*/
-static struct Session *
+static struct GNUNET_ATS_Session *
udp_plugin_get_session (void *cls,
const struct GNUNET_HELLO_Address *address)
{
struct Plugin *plugin = cls;
- struct Session *s;
- enum GNUNET_ATS_Network_Type network_type;
+ struct GNUNET_ATS_Session *s;
+ enum GNUNET_ATS_Network_Type network_type = GNUNET_ATS_NET_UNSPECIFIED;
const struct IPv4UdpAddress *udp_v4;
const struct IPv6UdpAddress *udp_v6;
size_t udp_addr_len,
enum GNUNET_ATS_Network_Type network_type)
{
- struct Session *s;
+ struct GNUNET_ATS_Session *s;
struct GNUNET_HELLO_Address *address;
GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
GNUNET_free (address);
s->rc++;
- GNUNET_SERVER_mst_receive (plugin->mst,
- s,
- (const char *) &msg[1],
- ntohs (msg->header.size) - sizeof(struct UDPMessage),
- GNUNET_YES,
- GNUNET_NO);
+ GNUNET_MST_from_buffer (s->mst,
+ (const char *) &msg[1],
+ ntohs (msg->header.size) - sizeof(struct UDPMessage),
+ GNUNET_YES,
+ GNUNET_NO);
s->rc--;
if ( (0 == s->rc) &&
(GNUNET_YES == s->in_destroy) )
struct UDP_ACK_Message *udp_ack;
uint32_t delay;
struct UDP_MessageWrapper *udpw;
- struct Session *s;
+ struct GNUNET_ATS_Session *s;
struct GNUNET_HELLO_Address *address;
if (GNUNET_NO == rc->have_sender)
{
/* tried to defragment but never succeeded, hence will not ACK */
- GNUNET_break_op (0);
+ /* This can happen if we just lost msgs */
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# UDP, fragments discarded without ACK",
+ 1,
+ GNUNET_NO);
return;
}
address = GNUNET_HELLO_address_allocate (&rc->sender,
GNUNET_NO);
return;
}
- if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
+ if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us ==
+ s->flow_delay_for_other_peer.rel_value_us)
+ delay = UINT32_MAX;
+ else if (s->flow_delay_for_other_peer.rel_value_us < UINT32_MAX)
delay = s->flow_delay_for_other_peer.rel_value_us;
else
- delay = UINT32_MAX;
-
+ delay = UINT32_MAX - 1; /* largest value we can communicate */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sending ACK to `%s' including delay of %s\n",
udp_address_to_string (plugin,
udpw->msg_size = msize;
udpw->payload_size = 0;
udpw->session = s;
+ udpw->start_time = GNUNET_TIME_absolute_get ();
udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
udpw->msg_buf = (char *) &udpw[1];
udpw->qc = &ack_message_sent;
udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
udp_ack->delay = htonl (delay);
udp_ack->sender = *plugin->env->my_identity;
- memcpy (&udp_ack[1],
+ GNUNET_memcpy (&udp_ack[1],
msg,
ntohs (msg->size));
enqueue (plugin,
{
/* Create a new defragmentation context */
d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
- memcpy (&d_ctx[1],
+ GNUNET_memcpy (&d_ctx[1],
udp_addr,
udp_addr_len);
d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
msg))
{
/* keep this 'rc' from expiring */
- GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
- d_ctx->hnode,
+ GNUNET_CONTAINER_heap_update_cost (d_ctx->hnode,
(GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
}
if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
sizeof(addr));
size = GNUNET_NETWORK_socket_recvfrom (rsock,
buf,
- sizeof(buf),
+ sizeof (buf),
(struct sockaddr *) &addr,
&fromlen);
sa = (const struct sockaddr *) &addr;
/* Connection failure or something. Not a protocol violation. */
return;
}
+
+ /* Check if this is a STUN packet */
+ if (GNUNET_NO !=
+ GNUNET_NAT_stun_handle_packet (plugin->nat,
+ (const struct sockaddr *) &addr,
+ fromlen,
+ buf,
+ size))
+ return; /* was STUN, do not process further */
+
if (size < sizeof(struct GNUNET_MessageHeader))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
GNUNET_break_op (0);
return;
}
+
msg = (const struct GNUNET_MessageHeader *) buf;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"UDP received %u-byte message from `%s' type %u\n",
if (size != ntohs (msg->size))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
- "UDP malformed message header from %s\n",
+ "UDP malformed message (size %u) header from %s\n",
(unsigned int) size,
GNUNET_a2s (sa,
fromlen));
{
struct UDP_MessageWrapper *udpw;
struct GNUNET_TIME_Relative remaining;
- struct Session *session;
+ struct GNUNET_ATS_Session *session;
int removed;
removed = GNUNET_NO;
if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
{
/* Message timed out */
- udpw->qc (udpw->qc_cls,
- udpw,
- GNUNET_SYSERR);
- /* Remove message */
removed = GNUNET_YES;
dequeue (plugin,
udpw);
+ udpw->qc (udpw->qc_cls,
+ udpw,
+ GNUNET_SYSERR);
GNUNET_free (udpw);
if (sock == plugin->sockv4)
}
else
{
- /* Message did not time out, check flow delay */
- remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
+ /* Message did not time out, check transmission time */
+ remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
if (0 == remaining.rel_value_us)
{
/* this message is not delayed */
else
{
GNUNET_break (0);
+ dequeue (plugin,
+ udpw);
udpw->qc (udpw->qc_cls,
udpw,
GNUNET_SYSERR);
- dequeue (plugin,
- udpw);
notify_session_monitor (plugin,
udpw->session,
GNUNET_TRANSPORT_SS_UPDATE);
udpw->msg_size,
a,
slen);
+ udpw->session->last_transmit_time
+ = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
+ udpw->session->last_transmit_time);
+ dequeue (plugin,
+ udpw);
if (GNUNET_SYSERR == sent)
{
/* Failure */
udpw,
GNUNET_OK);
}
- dequeue (plugin,
- udpw);
notify_session_monitor (plugin,
udpw->session,
GNUNET_TRANSPORT_SS_UPDATE);
* Then reschedule this function to be called again once more is available.
*
* @param cls the plugin handle
- * @param tc the scheduling context
*/
static void
-udp_plugin_select_v4 (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+udp_plugin_select_v4 (void *cls)
{
struct Plugin *plugin = cls;
+ const struct GNUNET_SCHEDULER_TaskContext *tc;
plugin->select_task_v4 = NULL;
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ if (NULL == plugin->sockv4)
return;
+ tc = GNUNET_SCHEDULER_get_task_context ();
if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
- (NULL != plugin->sockv4) &&
(GNUNET_NETWORK_fdset_isset (tc->read_ready,
plugin->sockv4)))
udp_select_read (plugin,
* Then reschedule this function to be called again once more is available.
*
* @param cls the plugin handle
- * @param tc the scheduling context
*/
static void
-udp_plugin_select_v6 (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+udp_plugin_select_v6 (void *cls)
{
struct Plugin *plugin = cls;
+ const struct GNUNET_SCHEDULER_TaskContext *tc;
plugin->select_task_v6 = NULL;
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ if (NULL == plugin->sockv6)
return;
+ tc = GNUNET_SCHEDULER_get_task_context ();
if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
- (NULL != plugin->sockv6) &&
(GNUNET_NETWORK_fdset_isset (tc->read_ready,
plugin->sockv6)) )
udp_select_read (plugin,
plugin->sockv6);
+
udp_select_send (plugin,
plugin->sockv6);
schedule_select_v6 (plugin);
* @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
* @return number of sockets that were successfully bound
*/
-static int
+static unsigned int
setup_sockets (struct Plugin *plugin,
const struct sockaddr_in6 *bind_v6,
const struct sockaddr_in *bind_v4)
{
int tries;
- int sockets_created = 0;
+ unsigned int sockets_created = 0;
struct sockaddr_in6 server_addrv6;
struct sockaddr_in server_addrv4;
const struct sockaddr *server_addr;
schedule_select_v4 (plugin);
schedule_select_v6 (plugin);
plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
- GNUNET_NO,
- plugin->port,
+ "transport-udp",
+ IPPROTO_UDP,
sockets_created,
addrs,
addrlens,
unsigned long long port;
unsigned long long aport;
unsigned long long udp_max_bps;
- unsigned long long enable_v6;
- unsigned long long enable_broadcasting;
- unsigned long long enable_broadcasting_recv;
+ int enable_v6;
+ int enable_broadcasting;
+ int enable_broadcasting_recv;
char *bind4_address;
char *bind6_address;
struct GNUNET_TIME_Relative interval;
struct sockaddr_in server_addrv4;
struct sockaddr_in6 server_addrv6;
- int res;
+ unsigned int res;
int have_bind4;
int have_bind6;
p->sessions = GNUNET_CONTAINER_multipeermap_create (16,
GNUNET_NO);
p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
- p);
GNUNET_BANDWIDTH_tracker_init (&p->tracker,
NULL,
NULL,
_("Failed to create UDP network sockets\n"));
GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
- GNUNET_SERVER_mst_destroy (p->mst);
+ if (NULL != p->nat)
+ GNUNET_NAT_unregister (p->nat);
GNUNET_free (p);
return NULL;
}
api->check_address = &udp_plugin_check_address;
api->get_session = &udp_plugin_get_session;
api->send = &udp_plugin_send;
- api->get_network = &udp_get_network;
+ api->get_network = &udp_plugin_get_network;
+ api->get_network_for_address = &udp_plugin_get_network_for_address;
api->update_session_timeout = &udp_plugin_update_session_timeout;
api->setup_monitor = &udp_plugin_setup_monitor;
return api;
GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
plugin->defrag_ctxs = NULL;
}
- if (NULL != plugin->mst)
- {
- GNUNET_SERVER_mst_destroy (plugin->mst);
- plugin->mst = NULL;
- }
while (NULL != (udpw = plugin->ipv4_queue_head))
{
dequeue (plugin,
plugin->ppc_dll_tail,
cur);
GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
+ if (NULL != cur->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (cur->timeout_task);
+ cur->timeout_task = NULL;
+ }
GNUNET_free (cur);
}
GNUNET_free (plugin);