From d9f888cdbb23b78f5936628d3adb7a77492b499c Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 1 Mar 2015 22:54:42 +0000 Subject: [PATCH] major code cleanup in UDP plugin, seems to also fix bugs; specifically, I think I fixed a leak --- src/transport/plugin_transport_udp.c | 3169 +++++++++++++------------ src/transport/test_plugin_transport.c | 35 +- 2 files changed, 1633 insertions(+), 1571 deletions(-) diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index f6b1ac251..a84f3e749 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -348,40 +348,19 @@ struct UDP_FragmentationContext /** - * Message types included in a `struct UDP_MessageWrapper` + * Function called when a message is removed from the + * transmission queue. + * + * @param cls closure + * @param udpw message wrapper finished + * @param result #GNUNET_OK on success (message was sent) + * #GNUNET_SYSERR if the target disconnected + * or we had a timeout or other trouble sending */ -enum UDP_MessageType -{ - /** - * Uninitialized (error) - */ - UMT_UNDEFINED = 0, - - /** - * This queue entry represents a fragment of a message. - */ - UMT_MSG_FRAGMENTED = 1, - - /** - * This queue entry does not include a message, but merely - * represents that we finished sending a fragmented message - * (all fragments confirmed, or timeout). - */ - UMT_MSG_FRAGMENTED_COMPLETE = 2, - - /** - * This queue entry represents a unfragmented message - * (was small enough to not require fragmentation). - */ - UMT_MSG_UNFRAGMENTED = 3, - - /** - * This queue entry represents the acknowledgement of us - * receiving a fragment. - */ - UMT_MSG_ACK = 4 - -}; +typedef void +(*QueueContinuation) (void *cls, + struct UDP_MessageWrapper *udpw, + int result); /** @@ -410,7 +389,20 @@ struct UDP_MessageWrapper char *msg_buf; /** - * Function to call upon completion of the transmission, can be NULL. + * Function to call once the message wrapper is being removed + * from the queue (with success or failure). + */ + QueueContinuation qc; + + /** + * Closure for @e qc. + */ + void *qc_cls; + + /** + * External continuation to call upon completion of the + * transmission, NULL if this queue entry is not for a + * message from the application. */ GNUNET_TRANSPORT_TransmitContinuation cont; @@ -441,14 +433,11 @@ struct UDP_MessageWrapper */ size_t payload_size; - /** - * Message type (what does this entry in the queue represent). - */ - enum UDP_MessageType msg_type; - }; +GNUNET_NETWORK_STRUCT_BEGIN + /** * UDP ACK Message-Packet header. */ @@ -462,7 +451,7 @@ struct UDP_ACK_Message /** * Desired delay for flow control, in us (in NBO). */ - uint32_t delay; + uint32_t delay GNUNET_PACKED; /** * What is the identity of the sender @@ -471,6 +460,11 @@ struct UDP_ACK_Message }; +GNUNET_NETWORK_STRUCT_END + + +/* ************************* Monitoring *********** */ + /** * If a session monitor is attached, notify it about the new @@ -509,6 +503,126 @@ notify_session_monitor (struct Plugin *plugin, } +/** + * Return information about the given session to the monitor callback. + * + * @param cls the `struct Plugin` with the monitor callback (`sic`) + * @param peer peer we send information about + * @param value our `struct Session` to send information about + * @return #GNUNET_OK (continue to iterate) + */ +static int +send_session_info_iter (void *cls, + const struct GNUNET_PeerIdentity *peer, + void *value) +{ + struct Plugin *plugin = cls; + struct Session *session = value; + + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_INIT); + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_UP); + return GNUNET_OK; +} + + +/** + * Begin monitoring sessions of a plugin. There can only + * be one active monitor per plugin (i.e. if there are + * multiple monitors, the transport service needs to + * multiplex the generated events over all of them). + * + * @param cls closure of the plugin + * @param sic callback to invoke, NULL to disable monitor; + * plugin will being by iterating over all active + * sessions immediately and then enter monitor mode + * @param sic_cls closure for @a sic + */ +static void +udp_plugin_setup_monitor (void *cls, + GNUNET_TRANSPORT_SessionInfoCallback sic, + void *sic_cls) +{ + struct Plugin *plugin = cls; + + plugin->sic = sic; + plugin->sic_cls = sic_cls; + if (NULL != sic) + { + GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions, + &send_session_info_iter, + plugin); + /* signal end of first iteration */ + sic (sic_cls, + NULL, + NULL); + } +} + + +/* ****************** Little Helpers ****************** */ + + +/** + * Function to free last resources associated with a session. + * + * @param s session to free + */ +static void +free_session (struct Session *s) +{ + if (NULL != s->address) + { + GNUNET_HELLO_address_free (s->address); + s->address = NULL; + } + if (NULL != s->frag_ctx) + { + GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, + NULL, + NULL); + GNUNET_free (s->frag_ctx); + s->frag_ctx = NULL; + } + GNUNET_free (s); +} + + +/** + * Function that is called to get the keepalive factor. + * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to + * calculate the interval between keepalive packets. + * + * @param cls closure with the `struct Plugin` + * @return keepalive factor + */ +static unsigned int +udp_query_keepalive_factor (void *cls) +{ + return 15; +} + + +/** + * Function obtain the network type for a session + * + * @param cls closure (`struct Plugin *`) + * @param session the session + * @return the network type + */ +static enum GNUNET_ATS_Network_Type +udp_get_network (void *cls, + struct Session *session) +{ + return session->scope; +} + + +/* ******************* Event loop ******************** */ + /** * We have been notified that our readset has something to read. We don't * know which socket needs to be read, so we have to check each one @@ -595,6 +709,9 @@ schedule_select_v6 (struct Plugin *plugin) } +/* ******************* Address to string and back ***************** */ + + /** * Function called for a quick conversion of the binary address to * a numeric address. Note that the caller must not free the @@ -622,31 +739,41 @@ udp_address_to_string (void *cls, uint16_t port; uint32_t options; - if ((NULL != addr) && (addrlen == sizeof(struct IPv6UdpAddress))) + if (NULL == addr) + { + GNUNET_break_op (0); + return NULL; + } + + if (addrlen == sizeof(struct IPv6UdpAddress)) { t6 = addr; af = AF_INET6; options = ntohl (t6->options); port = ntohs (t6->u6_port); - memcpy (&a6, &t6->ipv6_addr, sizeof(a6)); + a6 = t6->ipv6_addr; sb = &a6; } - else if ((NULL != addr) && (addrlen == sizeof(struct IPv4UdpAddress))) + else if (addrlen == sizeof(struct IPv4UdpAddress)) { t4 = addr; af = AF_INET; options = ntohl (t4->options); port = ntohs (t4->u4_port); - memcpy (&a4, &t4->ipv4_addr, sizeof(a4)); + a4.s_addr = t4->ipv4_addr; sb = &a4; } else { + GNUNET_break_op (0); return NULL; } - inet_ntop (af, sb, buf, INET6_ADDRSTRLEN); - - GNUNET_snprintf (rbuf, sizeof(rbuf), + inet_ntop (af, + sb, + buf, + INET6_ADDRSTRLEN); + GNUNET_snprintf (rbuf, + sizeof(rbuf), (af == AF_INET6) ? "%s.%u.[%s]:%u" : "%s.%u.%s:%u", @@ -659,8 +786,7 @@ udp_address_to_string (void *cls, /** - * Function called to convert a string address to - * a binary address. + * Function called to convert a string address to a binary address. * * @param cls closure (`struct Plugin *`) * @param addr string address @@ -688,27 +814,27 @@ udp_string_to_address (void *cls, plugin = NULL; optionstr = NULL; - if ((NULL == addr) || (addrlen == 0)) + if ((NULL == addr) || (0 == addrlen)) { - GNUNET_break(0); + GNUNET_break (0); return GNUNET_SYSERR; } if ('\0' != addr[addrlen - 1]) { - GNUNET_break(0); + GNUNET_break (0); return GNUNET_SYSERR; } if (strlen (addr) != addrlen - 1) { - GNUNET_break(0); + GNUNET_break (0); return GNUNET_SYSERR; } plugin = GNUNET_strdup (addr); optionstr = strchr (plugin, '.'); if (NULL == optionstr) { - GNUNET_break(0); - GNUNET_free(plugin); + GNUNET_break (0); + GNUNET_free (plugin); return GNUNET_SYSERR; } optionstr[0] = '\0'; @@ -717,52 +843,54 @@ udp_string_to_address (void *cls, address = strchr (optionstr, '.'); if (NULL == address) { - GNUNET_break(0); - GNUNET_free(plugin); + GNUNET_break (0); + GNUNET_free (plugin); return GNUNET_SYSERR; } address[0] = '\0'; address++; if (GNUNET_OK != - GNUNET_STRINGS_to_address_ip (address, strlen (address), + GNUNET_STRINGS_to_address_ip (address, + strlen (address), &socket_address)) { - GNUNET_break(0); - GNUNET_free(plugin); + GNUNET_break (0); + GNUNET_free (plugin); return GNUNET_SYSERR; } - GNUNET_free(plugin); switch (socket_address.ss_family) { case AF_INET: - { - struct IPv4UdpAddress *u4; - struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address; - u4 = GNUNET_new (struct IPv4UdpAddress); - u4->options = htonl (options); - u4->ipv4_addr = in4->sin_addr.s_addr; - u4->u4_port = in4->sin_port; - *buf = u4; - *added = sizeof(struct IPv4UdpAddress); - return GNUNET_OK; - } + { + struct IPv4UdpAddress *u4; + const struct sockaddr_in *in4 = (const struct sockaddr_in *) &socket_address; + + u4 = GNUNET_new (struct IPv4UdpAddress); + u4->options = htonl (options); + u4->ipv4_addr = in4->sin_addr.s_addr; + u4->u4_port = in4->sin_port; + *buf = u4; + *added = sizeof (struct IPv4UdpAddress); + return GNUNET_OK; + } case AF_INET6: - { - struct IPv6UdpAddress *u6; - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address; - u6 = GNUNET_new (struct IPv6UdpAddress); - u6->options = htonl (options); - u6->ipv6_addr = in6->sin6_addr; - u6->u6_port = in6->sin6_port; - *buf = u6; - *added = sizeof(struct IPv6UdpAddress); - return GNUNET_OK; - } + { + struct IPv6UdpAddress *u6; + const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &socket_address; + + u6 = GNUNET_new (struct IPv6UdpAddress); + u6->options = htonl (options); + u6->ipv6_addr = in6->sin6_addr; + u6->u6_port = in6->sin6_port; + *buf = u6; + *added = sizeof (struct IPv6UdpAddress); + return GNUNET_OK; + } default: - GNUNET_break(0); + GNUNET_break (0); return GNUNET_SYSERR; } } @@ -785,13 +913,13 @@ append_port (void *cls, if (NULL == hostname) { /* Final call, done */ - ppc->asc (ppc->asc_cls, - NULL, - GNUNET_OK); GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head, plugin->ppc_dll_tail, ppc); ppc->resolver_handle = NULL; + ppc->asc (ppc->asc_cls, + NULL, + GNUNET_OK); GNUNET_free (ppc); return; } @@ -817,8 +945,7 @@ append_port (void *cls, /** - * Convert the transports address to a nice, human-readable - * format. + * Convert the transports address to a nice, human-readable format. * * @param cls closure with the `struct Plugin *` * @param type name of the transport that generated the address @@ -843,7 +970,7 @@ udp_plugin_address_pretty_printer (void *cls, { struct Plugin *plugin = cls; struct PrettyPrinterContext *ppc; - const void *sb; + const struct sockaddr *sb; size_t sbs; struct sockaddr_in a4; struct sockaddr_in6 a6; @@ -855,22 +982,26 @@ udp_plugin_address_pretty_printer (void *cls, if (addrlen == sizeof(struct IPv6UdpAddress)) { u6 = addr; - memset (&a6, 0, sizeof(a6)); + memset (&a6, + 0, + sizeof (a6)); a6.sin6_family = AF_INET6; #if HAVE_SOCKADDR_IN_SIN_LEN a6.sin6_len = sizeof (a6); #endif a6.sin6_port = u6->u6_port; - memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr)); + a6.sin6_addr = u6->ipv6_addr; port = ntohs (u6->u6_port); options = ntohl (u6->options); - sb = &a6; - sbs = sizeof(a6); + sb = (const struct sockaddr *) &a6; + sbs = sizeof (a6); } - else if (addrlen == sizeof(struct IPv4UdpAddress)) + else if (addrlen == sizeof (struct IPv4UdpAddress)) { u4 = addr; - memset (&a4, 0, sizeof(a4)); + memset (&a4, + 0, + sizeof(a4)); a4.sin_family = AF_INET; #if HAVE_SOCKADDR_IN_SIN_LEN a4.sin_len = sizeof (a4); @@ -879,15 +1010,19 @@ udp_plugin_address_pretty_printer (void *cls, a4.sin_addr.s_addr = u4->ipv4_addr; port = ntohs (u4->u4_port); options = ntohl (u4->options); - sb = &a4; + sb = (const struct sockaddr *) &a4; sbs = sizeof(a4); } else { /* invalid address */ GNUNET_break_op (0); - asc (asc_cls, NULL , GNUNET_SYSERR); - asc (asc_cls, NULL, GNUNET_OK); + asc (asc_cls, + NULL, + GNUNET_SYSERR); + asc (asc_cls, + NULL, + GNUNET_OK); return; } ppc = GNUNET_new (struct PrettyPrinterContext); @@ -896,7 +1031,7 @@ udp_plugin_address_pretty_printer (void *cls, ppc->asc_cls = asc_cls; ppc->port = port; ppc->options = options; - if (addrlen == sizeof(struct IPv6UdpAddress)) + if (addrlen == sizeof (struct IPv6UdpAddress)) ppc->ipv6 = GNUNET_YES; else ppc->ipv6 = GNUNET_NO; @@ -908,219 +1043,8 @@ udp_plugin_address_pretty_printer (void *cls, sbs, ! numeric, timeout, - &append_port, ppc); -} - - -/** - * FIXME. - */ -static void -call_continuation (struct UDP_MessageWrapper *udpw, - int result) -{ - struct Session *session = udpw->session; - struct Plugin *plugin = session->plugin; - size_t overhead; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Calling continuation for %u byte message to `%s' with result %s\n", - udpw->payload_size, - GNUNET_i2s (&udpw->session->target), - (GNUNET_OK == result) ? "OK" : "SYSERR"); - - if (udpw->msg_size >= udpw->payload_size) - overhead = udpw->msg_size - udpw->payload_size; - else - overhead = udpw->msg_size; - - switch (result) - { - case GNUNET_OK: - switch (udpw->msg_type) - { - case UMT_MSG_UNFRAGMENTED: - if (NULL != udpw->cont) - { - /* Transport continuation */ - udpw->cont (udpw->cont_cls, - &udpw->session->target, - result, - udpw->payload_size, - udpw->msg_size); - } - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, messages, sent, success", - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, bytes payload, sent, success", - udpw->payload_size, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, bytes overhead, sent, success", - overhead, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes overhead, sent", - overhead, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes payload, sent", - udpw->payload_size, - GNUNET_NO); - break; - case UMT_MSG_FRAGMENTED_COMPLETE: - GNUNET_assert(NULL != udpw->frag_ctx); - if (udpw->frag_ctx->cont != NULL ) - udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, - &udpw->session->target, - GNUNET_OK, - udpw->frag_ctx->payload_size, - udpw->frag_ctx->on_wire_size); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, sent, success", - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes payload, sent, success", - udpw->payload_size, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes overhead, sent, success", - overhead, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes overhead, sent", - overhead, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes payload, sent", - udpw->payload_size, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, pending", - -1, - GNUNET_NO); - break; - case UMT_MSG_FRAGMENTED: - /* Fragmented message: enqueue next fragment */ - if (NULL != udpw->cont) - udpw->cont (udpw->cont_cls, - &udpw->session->target, - result, - udpw->payload_size, - udpw->msg_size); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, fragments, sent, success", - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, fragments bytes, sent, success", - udpw->msg_size, - GNUNET_NO); - break; - case UMT_MSG_ACK: - /* No continuation */ - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, ACK msgs, messages, sent, success", - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, ACK msgs, bytes overhead, sent, success", - overhead, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes overhead, sent", - overhead, - GNUNET_NO); - break; - default: - GNUNET_break(0); - break; - } - break; - case GNUNET_SYSERR: - switch (udpw->msg_type) - { - case UMT_MSG_UNFRAGMENTED: - /* Unfragmented message: failed to send */ - if (NULL != udpw->cont) - udpw->cont (udpw->cont_cls, - &udpw->session->target, - result, - udpw->payload_size, - overhead); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, messages, sent, failure", - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, bytes payload, sent, failure", - udpw->payload_size, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, bytes overhead, sent, failure", - overhead, - GNUNET_NO); - break; - case UMT_MSG_FRAGMENTED_COMPLETE: - GNUNET_assert (NULL != udpw->frag_ctx); - if (udpw->frag_ctx->cont != NULL) - udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, - &udpw->session->target, - GNUNET_SYSERR, - udpw->frag_ctx->payload_size, - udpw->frag_ctx->on_wire_size); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, sent, failure", - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes payload, sent, failure", - udpw->payload_size, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes payload, sent, failure", - overhead, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes payload, sent, failure", - overhead, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, pending", - -1, - GNUNET_NO); - break; - case UMT_MSG_FRAGMENTED: - GNUNET_assert (NULL != udpw->frag_ctx); - /* Fragmented message: failed to send */ - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, fragments, sent, failure", - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, fragments bytes, sent, failure", - udpw->msg_size, - GNUNET_NO); - break; - case UMT_MSG_ACK: - /* ACK message: failed to send */ - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, ACK msgs, messages, sent, failure", - 1, - GNUNET_NO); - break; - default: - GNUNET_break(0); - break; - } - break; - default: - GNUNET_break(0); - break; - } + &append_port, + ppc); } @@ -1131,13 +1055,14 @@ call_continuation (struct UDP_MessageWrapper *udpw, * * @param plugin global variables * @param in_port port number to check - * @return #GNUNET_OK if port is either open_port or adv_port + * @return #GNUNET_OK if port is either our open or advertised port */ static int -check_port (struct Plugin *plugin, +check_port (const struct Plugin *plugin, uint16_t in_port) { - if ((in_port == plugin->port) || (in_port == plugin->aport)) + if ( (plugin->port == in_port) || + (plugin->aport == in_port) ) return GNUNET_OK; return GNUNET_SYSERR; } @@ -1164,19 +1089,14 @@ udp_plugin_check_address (void *cls, size_t addrlen) { struct Plugin *plugin = cls; - struct IPv4UdpAddress *v4; - struct IPv6UdpAddress *v6; + const struct IPv4UdpAddress *v4; + const struct IPv6UdpAddress *v6; - if ( (addrlen != sizeof(struct IPv4UdpAddress)) && - (addrlen != sizeof(struct IPv6UdpAddress)) ) - { - GNUNET_break_op(0); - return GNUNET_SYSERR; - } - if (addrlen == sizeof(struct IPv4UdpAddress)) + if (sizeof(struct IPv4UdpAddress) == addrlen) { - v4 = (struct IPv4UdpAddress *) addr; - if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port))) + v4 = (const struct IPv4UdpAddress *) addr; + if (GNUNET_OK != check_port (plugin, + ntohs (v4->u4_port))) return GNUNET_SYSERR; if (GNUNET_OK != GNUNET_NAT_test_address (plugin->nat, @@ -1184,869 +1104,697 @@ udp_plugin_check_address (void *cls, sizeof (struct in_addr))) return GNUNET_SYSERR; } - else + else if (sizeof(struct IPv6UdpAddress) == addrlen) { - v6 = (struct IPv6UdpAddress *) addr; + v6 = (const struct IPv6UdpAddress *) addr; if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr)) { - GNUNET_break_op(0); + GNUNET_break_op (0); return GNUNET_SYSERR; } - if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port))) + if (GNUNET_OK != check_port (plugin, + ntohs (v6->u6_port))) return GNUNET_SYSERR; if (GNUNET_OK != GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr, - sizeof(struct in6_addr))) + sizeof (struct in6_addr))) return GNUNET_SYSERR; } - return GNUNET_OK; -} - - -/** - * Function to free last resources associated with a session. - * - * @param s session to free - */ -static void -free_session (struct Session *s) -{ - if (NULL != s->frag_ctx) - { - GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, NULL, NULL ); - GNUNET_free(s->frag_ctx); - s->frag_ctx = NULL; - } - GNUNET_free(s); -} - - -/** - * Remove a message from the transmission queue. - * - * @param plugin the UDP plugin - * @param udpw message wrapper to queue - */ -static void -dequeue (struct Plugin *plugin, - struct UDP_MessageWrapper *udpw) -{ - struct Session *session = udpw->session; - - if (plugin->bytes_in_buffer < udpw->msg_size) - { - GNUNET_break (0); - } - else - { - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes in buffers", - -(long long) udpw->msg_size, - GNUNET_NO); - plugin->bytes_in_buffer -= udpw->msg_size; - } - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, msgs in buffers", - -1, GNUNET_NO); - if (udpw->session->address->address_length == sizeof(struct IPv4UdpAddress)) - GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head, - plugin->ipv4_queue_tail, - udpw); - else if (udpw->session->address->address_length == sizeof(struct IPv6UdpAddress)) - GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head, - plugin->ipv6_queue_tail, - udpw); else { - GNUNET_break (0); - return; + GNUNET_break_op (0); + return GNUNET_SYSERR; } - GNUNET_assert (session->msgs_in_queue > 0); - session->msgs_in_queue--; - GNUNET_assert (session->bytes_in_queue >= udpw->msg_size); - session->bytes_in_queue -= udpw->msg_size; + return GNUNET_OK; } /** - * We have completed our (attempt) to transmit a message - * that had to be fragmented -- either because we got an - * ACK saying that all fragments were received, or because - * of timeout / disconnect. Clean up our state. + * Our external IP address/port mapping has changed. * - * @param fc fragmentation context to clean up - * @param result #GNUNET_OK if we succeeded (got ACK), - * #GNUNET_SYSERR if the transmission failed + * @param cls closure, the `struct Plugin` + * @param add_remove #GNUNET_YES to mean the new public IP address, + * #GNUNET_NO to mean the previous (now invalid) one + * @param addr either the previous or the new public IP address + * @param addrlen actual length of the @a addr */ static void -fragmented_message_done (struct UDP_FragmentationContext *fc, - int result) +udp_nat_port_map_callback (void *cls, + int add_remove, + const struct sockaddr *addr, + socklen_t addrlen) { - struct Plugin *plugin = fc->plugin; - struct Session *s = fc->session; - struct UDP_MessageWrapper *udpw; - struct UDP_MessageWrapper *tmp; - struct UDP_MessageWrapper dummy; + struct Plugin *plugin = cls; + struct GNUNET_HELLO_Address *address; + struct IPv4UdpAddress u4; + struct IPv6UdpAddress u6; + void *arg; + size_t args; LOG (GNUNET_ERROR_TYPE_DEBUG, - "%p : Fragmented message removed with result %s\n", - fc, - (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS"); - - /* Call continuation for fragmented message */ - memset (&dummy, 0, sizeof(dummy)); - dummy.msg_type = UMT_MSG_FRAGMENTED_COMPLETE; - dummy.msg_size = s->frag_ctx->on_wire_size; - dummy.payload_size = s->frag_ctx->payload_size; - dummy.frag_ctx = s->frag_ctx; - dummy.cont = NULL; - dummy.cont_cls = NULL; - dummy.session = s; - call_continuation (&dummy, result); - /* Remove leftover fragments from queue */ - if (s->address->address_length == sizeof(struct IPv6UdpAddress)) + (GNUNET_YES == add_remove) + ? "NAT notification to add address `%s'\n" + : "NAT notification to remove address `%s'\n", + GNUNET_a2s (addr, + addrlen)); + /* convert 'address' to our internal format */ + switch (addr->sa_family) { - udpw = plugin->ipv6_queue_head; - while (NULL != udpw) + case AF_INET: { - tmp = udpw->next; - if ( (udpw->frag_ctx != NULL) && - (udpw->frag_ctx == s->frag_ctx) ) + const struct sockaddr_in *i4; + + GNUNET_assert (sizeof(struct sockaddr_in) == addrlen); + i4 = (const struct sockaddr_in *) addr; + if (0 == ntohs (i4->sin_port)) { - dequeue (plugin, udpw); - call_continuation (udpw, GNUNET_SYSERR); - GNUNET_free (udpw); + GNUNET_break (0); + return; } - udpw = tmp; + memset (&u4, + 0, + sizeof(u4)); + u4.options = htonl (plugin->myoptions); + u4.ipv4_addr = i4->sin_addr.s_addr; + u4.u4_port = i4->sin_port; + arg = &u4; + args = sizeof (struct IPv4UdpAddress); + break; } - } - if (s->address->address_length == sizeof(struct IPv4UdpAddress)) - { - udpw = plugin->ipv4_queue_head; - while (udpw != NULL ) + case AF_INET6: { - tmp = udpw->next; - if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx)) + const struct sockaddr_in6 *i6; + + GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen); + i6 = (const struct sockaddr_in6 *) addr; + if (0 == ntohs (i6->sin6_port)) { - dequeue (plugin, udpw); - call_continuation (udpw, GNUNET_SYSERR); - GNUNET_free(udpw); + GNUNET_break (0); + return; } - udpw = tmp; + memset (&u6, + 0, + sizeof(u6)); + u6.options = htonl (plugin->myoptions); + u6.ipv6_addr = i6->sin6_addr; + u6.u6_port = i6->sin6_port; + arg = &u6; + args = sizeof (struct IPv6UdpAddress); + break; } + default: + GNUNET_break (0); + return; } - notify_session_monitor (s->plugin, - s, - GNUNET_TRANSPORT_SS_UPDATE); - /* Destroy fragmentation context */ - GNUNET_FRAGMENT_context_destroy (fc->frag, - &s->last_expected_msg_delay, - &s->last_expected_ack_delay); - s->frag_ctx = NULL; - GNUNET_free (fc); + /* modify our published address list */ + address = GNUNET_HELLO_address_allocate (plugin->env->my_identity, + PLUGIN_NAME, + arg, + args, + GNUNET_HELLO_ADDRESS_INFO_NONE); + plugin->env->notify_address (plugin->env->cls, + add_remove, + address); + GNUNET_HELLO_address_free (address); } +/* ********************* Finding sessions ******************* */ + + /** - * Closure for #find_receive_context(). + * Closure for #session_cmp_it(). */ -struct FindReceiveContext +struct SessionCompareContext { /** - * Where to store the result. - */ - struct DefragContext *rc; - - /** - * Session associated with this context. - */ - struct Session *session; - - /** - * Address to find. + * Set to session matching the address. */ - const union UdpAddress *udp_addr; + struct Session *res; /** - * Number of bytes in @e udp_addr. + * Address we are looking for. */ - size_t udp_addr_len; - + const struct GNUNET_HELLO_Address *address; }; /** - * Scan the heap for a receive context with the given address. + * Find a session with a matching address. * - * @param cls the `struct FindReceiveContext` - * @param node internal node of the heap - * @param element value stored at the node (a `struct ReceiveContext`) - * @param cost cost associated with the node - * @return #GNUNET_YES if we should continue to iterate, - * #GNUNET_NO if not. + * @param cls the `struct SessionCompareContext *` + * @param key peer identity (unused) + * @param value the `struct Session *` + * @return #GNUNET_NO if we found the session, #GNUNET_OK if not */ static int -find_receive_context (void *cls, - struct GNUNET_CONTAINER_HeapNode *node, - void *element, - GNUNET_CONTAINER_HeapCostType cost) +session_cmp_it (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { - struct FindReceiveContext *frc = cls; - struct DefragContext *e = element; + struct SessionCompareContext *cctx = cls; + struct Session *s = value; - if ( (frc->udp_addr_len == e->udp_addr_len) && - (0 == memcmp (frc->udp_addr, - e->udp_addr, - frc->udp_addr_len)) ) + if (0 == GNUNET_HELLO_address_cmp (s->address, + cctx->address)) { - frc->rc = e; + GNUNET_assert (GNUNET_NO == s->in_destroy); + cctx->res = s; return GNUNET_NO; } - return GNUNET_YES; + return GNUNET_OK; } /** - * Functions with this signature are called whenever we need - * to close a session due to a disconnect or failure to - * establish a connection. + * Locate an existing session the transport service is using to + * send data to another peer. Performs some basic sanity checks + * on the address and then tries to locate a matching session. * - * @param cls closure with the `struct Plugin` - * @param s session to close down - * @return #GNUNET_OK on success + * @param cls the plugin + * @param address the address we should locate the session by + * @return the session if it exists, or NULL if it is not found */ -static int -udp_disconnect_session (void *cls, - struct Session *s) +static struct Session * +udp_plugin_lookup_session (void *cls, + const struct GNUNET_HELLO_Address *address) { struct Plugin *plugin = cls; - struct UDP_MessageWrapper *udpw; - struct UDP_MessageWrapper *next; - struct FindReceiveContext frc; - - GNUNET_assert (GNUNET_YES != s->in_destroy); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Session %p to peer `%s' address ended\n", - s, - GNUNET_i2s (&s->target), - udp_address_to_string (plugin, - s->address->address, - s->address->address_length)); - /* stop timeout task */ - if (NULL != s->timeout_task) - { - GNUNET_SCHEDULER_cancel (s->timeout_task); - s->timeout_task = NULL; - } - if (NULL != s->frag_ctx) - { - /* Remove fragmented message due to disconnect */ - fragmented_message_done (s->frag_ctx, - GNUNET_SYSERR); - } + const struct IPv6UdpAddress *udp_a6; + const struct IPv4UdpAddress *udp_a4; + struct SessionCompareContext cctx; - frc.rc = NULL; - frc.udp_addr = s->address->address; - frc.udp_addr_len = s->address->address_length; - /* Lookup existing receive context for this address */ - if (NULL != plugin->defrag_ctxs) + if (NULL == address->address) { - GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs, - &find_receive_context, - &frc); - if (NULL != frc.rc) - { - struct DefragContext *d_ctx = frc.rc; - - GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode); - GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag); - GNUNET_free (d_ctx); - } + GNUNET_break (0); + return NULL; } - next = plugin->ipv4_queue_head; - while (NULL != (udpw = next)) + if (sizeof(struct IPv4UdpAddress) == address->address_length) { - next = udpw->next; - if (udpw->session == s) + if (NULL == plugin->sockv4) + return NULL; + udp_a4 = (const struct IPv4UdpAddress *) address->address; + if (0 == udp_a4->u4_port) { - dequeue (plugin, udpw); - call_continuation (udpw, GNUNET_SYSERR); - GNUNET_free(udpw); + GNUNET_break (0); + return NULL; } } - next = plugin->ipv6_queue_head; - while (NULL != (udpw = next)) + else if (sizeof(struct IPv6UdpAddress) == address->address_length) { - next = udpw->next; - if (udpw->session == s) + if (NULL == plugin->sockv6) + return NULL; + udp_a6 = (const struct IPv6UdpAddress *) address->address; + if (0 == udp_a6->u6_port) { - dequeue (plugin, udpw); - call_continuation (udpw, GNUNET_SYSERR); - GNUNET_free(udpw); + GNUNET_break (0); + return NULL; } } - notify_session_monitor (s->plugin, - s, - GNUNET_TRANSPORT_SS_DONE); - plugin->env->session_end (plugin->env->cls, - s->address, - s); - - if (NULL != s->frag_ctx) + else { - if (NULL != s->frag_ctx->cont) - { - s->frag_ctx->cont (s->frag_ctx->cont_cls, - &s->target, - GNUNET_SYSERR, - s->frag_ctx->payload_size, - s->frag_ctx->on_wire_size); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Calling continuation for fragemented message to `%s' with result SYSERR\n", - GNUNET_i2s (&s->target)); - } + GNUNET_break (0); + return NULL; } - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (plugin->sessions, - &s->target, - s)); - GNUNET_STATISTICS_set (plugin->env->stats, - "# UDP sessions active", - GNUNET_CONTAINER_multipeermap_size (plugin->sessions), - GNUNET_NO); - if (s->rc > 0) - { - s->in_destroy = GNUNET_YES; - } - else - { - GNUNET_HELLO_address_free (s->address); - free_session (s); - } - return GNUNET_OK; + /* check if session already exists */ + cctx.address = address; + cctx.res = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Looking for existing session for peer `%s' with address `%s'\n", + GNUNET_i2s (&address->peer), + udp_address_to_string (plugin, + address->address, + address->address_length)); + GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions, + &address->peer, + &session_cmp_it, + &cctx); + if (NULL == cctx.res) + return NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Found existing session %p\n", + cctx.res); + return cctx.res; } -/** - * Function that is called to get the keepalive factor. - * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to - * calculate the interval between keepalive packets. - * - * @param cls closure with the `struct Plugin` - * @return keepalive factor - */ -static unsigned int -udp_query_keepalive_factor (void *cls) -{ - return 15; -} +/* ********************** Timeout ****************** */ /** - * Destroy a session, plugin is being unloaded. + * Increment session timeout due to activity. * - * @param cls the `struct Plugin` - * @param key hash of public key of target peer - * @param value a `struct PeerSession *` to clean up - * @return #GNUNET_OK (continue to iterate) + * @param s session to reschedule timeout activity for */ -static int -disconnect_and_free_it (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +static void +reschedule_session_timeout (struct Session *s) { - struct Plugin *plugin = cls; - - udp_disconnect_session (plugin, value); - return GNUNET_OK; + if (GNUNET_YES == s->in_destroy) + return; + GNUNET_assert (NULL != s->timeout_task); + s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); } + /** - * Disconnect from a remote node. Clean up session if we have one for - * this peer. + * Function that will be called whenever the transport service wants to + * notify the plugin that a session is still active and in use and + * therefore the session timeout for this session has to be updated * - * @param cls closure for this call (should be handle to Plugin) - * @param target the peeridentity of the peer to disconnect - * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed + * @param cls closure with the `struct Plugin` + * @param peer which peer was the session for + * @param session which session is being updated */ static void -udp_disconnect (void *cls, - const struct GNUNET_PeerIdentity *target) +udp_plugin_update_session_timeout (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct Session *session) { struct Plugin *plugin = cls; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Disconnecting from peer `%s'\n", - GNUNET_i2s (target)); - /* Clean up sessions */ - GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions, - target, - &disconnect_and_free_it, - plugin); + if (GNUNET_YES != + GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions, + peer, + session)) + { + GNUNET_break (0); + return; + } + /* Reschedule session timeout */ + reschedule_session_timeout (session); } +/* ************************* Sending ************************ */ + + /** - * Session was idle, so disconnect it + * Remove the given message from the transmission queue and + * update all applicable statistics. * - * @param cls the `struct Session` to time out - * @param tc scheduler context + * @param plugin the UDP plugin + * @param udpw message wrapper to dequeue */ static void -session_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +dequeue (struct Plugin *plugin, + struct UDP_MessageWrapper *udpw) { - struct Session *s = cls; - struct Plugin *plugin = s->plugin; - struct GNUNET_TIME_Relative left; + struct Session *session = udpw->session; - s->timeout_task = NULL; - left = GNUNET_TIME_absolute_get_remaining (s->timeout); - if (left.rel_value_us > 0) + if (plugin->bytes_in_buffer < udpw->msg_size) { - /* not actually our turn yet, but let's at least update - the monitor, it may think we're about to die ... */ - notify_session_monitor (s->plugin, - s, - GNUNET_TRANSPORT_SS_UPDATE); - s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, - &session_timeout, - s); + GNUNET_break (0); + } + else + { + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total bytes in send buffers", + - (long long) udpw->msg_size, + GNUNET_NO); + plugin->bytes_in_buffer -= udpw->msg_size; + } + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total messages in send buffers", + -1, + GNUNET_NO); + if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length) + { + GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head, + plugin->ipv4_queue_tail, + udpw); + } + else if (sizeof(struct IPv6UdpAddress) == udpw->session->address->address_length) + { + GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head, + plugin->ipv6_queue_tail, + udpw); + } + else + { + GNUNET_break (0); return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Session %p was idle for %s, disconnecting\n", - s, - GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT, - GNUNET_YES)); - /* call session destroy function */ - udp_disconnect_session (plugin, s); + GNUNET_assert (session->msgs_in_queue > 0); + session->msgs_in_queue--; + GNUNET_assert (session->bytes_in_queue >= udpw->msg_size); + session->bytes_in_queue -= udpw->msg_size; } /** - * Increment session timeout due to activity + * Enqueue a message for transmission and update statistics. * - * @param s session to reschedule timeout activity for + * @param plugin the UDP plugin + * @param udpw message wrapper to queue */ static void -reschedule_session_timeout (struct Session *s) +enqueue (struct Plugin *plugin, + struct UDP_MessageWrapper *udpw) { - if (GNUNET_YES == s->in_destroy) + struct Session *session = udpw->session; + + if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX) + { + GNUNET_break (0); + } + else + { + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total bytes in send buffers", + udpw->msg_size, + GNUNET_NO); + plugin->bytes_in_buffer += udpw->msg_size; + } + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total messages in send buffers", + 1, + GNUNET_NO); + if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length) + { + GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, + plugin->ipv4_queue_tail, + udpw); + } + else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length) + { + GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head, + plugin->ipv6_queue_tail, + udpw); + } + else + { + GNUNET_break (0); + udpw->cont (udpw->cont_cls, + &session->target, + GNUNET_SYSERR, + udpw->msg_size, + 0); + GNUNET_free (udpw); return; - GNUNET_assert(NULL != s->timeout_task); - s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); + } + session->msgs_in_queue++; + session->bytes_in_queue += udpw->msg_size; } /** - * Function obtain the network type for a session + * We have completed our (attempt) to transmit a message that had to + * be fragmented -- either because we got an ACK saying that all + * fragments were received, or because of timeout / disconnect. Clean + * up our state. * - * @param cls closure (`struct Plugin *`) - * @param session the session - * @return the network type - */ -static enum GNUNET_ATS_Network_Type -udp_get_network (void *cls, - struct Session *session) -{ - return session->scope; -} - - -/** - * Closure for #session_cmp_it(). + * @param frag_ctx fragmentation context to clean up + * @param result #GNUNET_OK if we succeeded (got ACK), + * #GNUNET_SYSERR if the transmission failed */ -struct SessionCompareContext +static void +fragmented_message_done (struct UDP_FragmentationContext *frag_ctx, + int result) { - /** - * Set to session matching the address. - */ - struct Session *res; - - /** - * Address we are looking for. - */ - const struct GNUNET_HELLO_Address *address; -}; - + struct Plugin *plugin = frag_ctx->plugin; + struct Session *s = frag_ctx->session; + struct UDP_MessageWrapper *udpw; + struct UDP_MessageWrapper *tmp; + size_t overhead; -/** - * Find a session with a matching address. - * - * @param cls the `struct SessionCompareContext *` - * @param key peer identity (unused) - * @param value the `struct Session *` - * @return #GNUNET_NO if we found the session, #GNUNET_OK if not - */ -static int -session_cmp_it (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) -{ - struct SessionCompareContext *cctx = cls; - struct Session *s = value; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p: Fragmented message removed with result %s\n", + frag_ctx, + (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS"); + /* Call continuation for fragmented message */ + if (frag_ctx->on_wire_size >= frag_ctx->payload_size) + overhead = frag_ctx->on_wire_size - frag_ctx->payload_size; + else + overhead = frag_ctx->on_wire_size; + if (NULL != frag_ctx->cont) + frag_ctx->cont (frag_ctx->cont_cls, + &s->target, + result, + s->frag_ctx->payload_size, + frag_ctx->on_wire_size); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented messages active", + -1, + GNUNET_NO); - if (0 == GNUNET_HELLO_address_cmp (s->address, - cctx->address)) + if (GNUNET_OK == result) { - cctx->res = s; - return GNUNET_NO; - } - return GNUNET_YES; -} - - -/** - * Locate an existing session the transport service is using to - * send data to another peer. Performs some basic sanity checks - * on the address and then tries to locate a matching session. - * - * @param cls the plugin - * @param address the address we should locate the session by - * @return the session if it exists, or NULL if it is not found - */ -static struct Session * -udp_plugin_lookup_session (void *cls, - const struct GNUNET_HELLO_Address *address) -{ - struct Plugin *plugin = cls; - const struct IPv6UdpAddress *udp_a6; - const struct IPv4UdpAddress *udp_a4; - struct SessionCompareContext cctx; - - if ( (NULL == address->address) || - ((address->address_length != sizeof (struct IPv4UdpAddress)) && - (address->address_length != sizeof (struct IPv6UdpAddress)))) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Trying to locate session for address of unexpected length %u (should be %u or %u)\n", - address->address_length, - sizeof (struct IPv4UdpAddress), - sizeof (struct IPv6UdpAddress)); - return NULL; + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, messages, sent, success", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, bytes payload, sent, success", + s->frag_ctx->payload_size, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, bytes overhead, sent, success", + overhead, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total, bytes overhead, sent", + overhead, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total, bytes payload, sent", + s->frag_ctx->payload_size, + GNUNET_NO); } - - if (address->address_length == sizeof(struct IPv4UdpAddress)) + else { - if (NULL == plugin->sockv4) - return NULL; - udp_a4 = (const struct IPv4UdpAddress *) address->address; - if (0 == udp_a4->u4_port) - return NULL; + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, messages, sent, failure", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, bytes payload, sent, failure", + s->frag_ctx->payload_size, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, bytes payload, sent, failure", + overhead, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, bytes payload, sent, failure", + overhead, + GNUNET_NO); } - if (address->address_length == sizeof(struct IPv6UdpAddress)) + /* Remove remaining fragments from queue, no need to transmit those + any longer. */ + if (s->address->address_length == sizeof(struct IPv6UdpAddress)) { - if (NULL == plugin->sockv6) - return NULL; - udp_a6 = (const struct IPv6UdpAddress *) address->address; - if (0 == udp_a6->u6_port) - return NULL; + udpw = plugin->ipv6_queue_head; + while (NULL != udpw) + { + tmp = udpw->next; + if ( (udpw->frag_ctx != NULL) && + (udpw->frag_ctx == frag_ctx) ) + { + dequeue (plugin, + udpw); + GNUNET_free (udpw); + } + udpw = tmp; + } } - - /* check if session already exists */ - cctx.address = address; - cctx.res = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Looking for existing session for peer `%s' `%s' \n", - GNUNET_i2s (&address->peer), - udp_address_to_string (plugin, - address->address, - address->address_length)); - GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions, - &address->peer, - &session_cmp_it, - &cctx); - if (NULL != cctx.res) + if (s->address->address_length == sizeof(struct IPv4UdpAddress)) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Found existing session %p\n", - cctx.res); - return cctx.res; + udpw = plugin->ipv4_queue_head; + while (NULL != udpw) + { + tmp = udpw->next; + if ( (NULL != udpw->frag_ctx) && + (udpw->frag_ctx == frag_ctx) ) + { + dequeue (plugin, + udpw); + GNUNET_free (udpw); + } + udpw = tmp; + } } - return NULL; + notify_session_monitor (s->plugin, + s, + GNUNET_TRANSPORT_SS_UPDATE); + GNUNET_FRAGMENT_context_destroy (frag_ctx->frag, + &s->last_expected_msg_delay, + &s->last_expected_ack_delay); + s->frag_ctx = NULL; + GNUNET_free (frag_ctx); } /** - * Allocate a new session for the given endpoint address. - * Note that this function does not inform the service - * of the new session, this is the responsibility of the - * caller (if needed). + * We are finished with a fragment in the message queue. + * Notify the continuation and update statistics. * - * @param cls the `struct Plugin` - * @param address address of the other peer to use - * @param network_type network type the address belongs to - * @return NULL on error, otherwise session handle + * @param cls the `struct Plugin *` + * @param udpw the queue entry + * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static struct Session * -udp_plugin_create_session (void *cls, - const struct GNUNET_HELLO_Address *address, - enum GNUNET_ATS_Network_Type network_type) +static void +qc_fragment_sent (void *cls, + struct UDP_MessageWrapper *udpw, + int result) { struct Plugin *plugin = cls; - struct Session *s; - - s = GNUNET_new (struct Session); - s->plugin = plugin; - s->address = GNUNET_HELLO_address_copy (address); - s->target = address->peer; - s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, - 250); - s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS; - s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS; - s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO; - s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); - s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, - &session_timeout, s); - s->scope = network_type; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Creating new session %p for peer `%s' address `%s'\n", - s, - GNUNET_i2s (&address->peer), - udp_address_to_string (plugin, - address->address, - address->address_length)); - GNUNET_assert(GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (plugin->sessions, - &s->target, - s, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - GNUNET_STATISTICS_set (plugin->env->stats, - "# UDP sessions active", - GNUNET_CONTAINER_multipeermap_size (plugin->sessions), - GNUNET_NO); - return s; + GNUNET_assert (NULL != udpw->frag_ctx); + if (GNUNET_OK == result) + { + GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, fragments, sent, success", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, fragments bytes, sent, success", + udpw->msg_size, + GNUNET_NO); + } + else + { + fragmented_message_done (udpw->frag_ctx, + GNUNET_SYSERR); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, fragments, sent, failure", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, fragmented msgs, fragments bytes, sent, failure", + udpw->msg_size, + GNUNET_NO); + } } /** - * Function that will be called whenever the transport service wants to - * notify the plugin that a session is still active and in use and - * therefore the session timeout for this session has to be updated + * Function that is called with messages created by the fragmentation + * module. In the case of the `proc` callback of the + * #GNUNET_FRAGMENT_context_create() function, this function must + * eventually call #GNUNET_FRAGMENT_context_transmission_done(). * - * @param cls closure with the `struct Plugin` - * @param peer which peer was the session for - * @param session which session is being updated + * @param cls closure, the `struct UDP_FragmentationContext` + * @param msg the message that was created */ static void -udp_plugin_update_session_timeout (void *cls, - const struct GNUNET_PeerIdentity *peer, - struct Session *session) +enqueue_fragment (void *cls, + const struct GNUNET_MessageHeader *msg) { - struct Plugin *plugin = cls; + struct UDP_FragmentationContext *frag_ctx = cls; + struct Plugin *plugin = frag_ctx->plugin; + struct UDP_MessageWrapper *udpw; + struct Session *session = frag_ctx->session; + size_t msg_len = ntohs (msg->size); - if (GNUNET_YES != - GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions, - peer, - session)) - { - GNUNET_break(0); - return; - } - /* Reschedule session timeout */ - reschedule_session_timeout (session); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Enqueuing fragment with %u bytes\n", + msg_len); + udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len); + udpw->session = session; + udpw->msg_buf = (char *) &udpw[1]; + udpw->msg_size = msg_len; + udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */ + udpw->timeout = frag_ctx->timeout; + udpw->frag_ctx = frag_ctx; + udpw->qc = &qc_fragment_sent; + udpw->qc_cls = plugin; + memcpy (udpw->msg_buf, + msg, + msg_len); + enqueue (plugin, + udpw); + if (sizeof (struct IPv4UdpAddress) == session->address->address_length) + schedule_select_v4 (plugin); + else + schedule_select_v6 (plugin); } /** - * Creates a new outbound session the transport service will use to - * send data to the peer. + * We are finished with a message from the message queue. + * Notify the continuation and update statistics. * * @param cls the `struct Plugin *` - * @param address the address - * @return the session or NULL of max connections exceeded + * @param udpw the queue entry + * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static struct Session * -udp_plugin_get_session (void *cls, - const struct GNUNET_HELLO_Address *address) +static void +qc_message_sent (void *cls, + struct UDP_MessageWrapper *udpw, + int result) { struct Plugin *plugin = cls; - struct Session *s; - enum GNUNET_ATS_Network_Type network_type; - struct IPv4UdpAddress *udp_v4; - struct IPv6UdpAddress *udp_v6; + size_t overhead; - if (NULL == address) + if (udpw->msg_size >= udpw->payload_size) + overhead = udpw->msg_size - udpw->payload_size; + else + overhead = udpw->msg_size; + + if (NULL != udpw->cont) + udpw->cont (udpw->cont_cls, + &udpw->session->target, + result, + udpw->payload_size, + overhead); + if (GNUNET_OK == result) { - GNUNET_break (0); - return NULL; + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, unfragmented msgs, messages, sent, success", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, unfragmented msgs, bytes payload, sent, success", + udpw->payload_size, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, unfragmented msgs, bytes overhead, sent, success", + overhead, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total, bytes overhead, sent", + overhead, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total, bytes payload, sent", + udpw->payload_size, + GNUNET_NO); } - if ( (address->address_length != sizeof(struct IPv4UdpAddress)) && - (address->address_length != sizeof(struct IPv6UdpAddress)) ) + else { - GNUNET_break_op (0); - return NULL; + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, unfragmented msgs, messages, sent, failure", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, unfragmented msgs, bytes payload, sent, failure", + udpw->payload_size, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, unfragmented msgs, bytes overhead, sent, failure", + overhead, + GNUNET_NO); } - if (NULL != (s = udp_plugin_lookup_session (cls, - address))) - return s; - - /* need to create new session */ - if (sizeof (struct IPv4UdpAddress) == address->address_length) - { - struct sockaddr_in v4; - - udp_v4 = (struct IPv4UdpAddress *) address->address; - memset (&v4, '\0', sizeof (v4)); - v4.sin_family = AF_INET; -#if HAVE_SOCKADDR_IN_SIN_LEN - v4.sin_len = sizeof (struct sockaddr_in); -#endif - v4.sin_port = udp_v4->u4_port; - v4.sin_addr.s_addr = udp_v4->ipv4_addr; - network_type = plugin->env->get_address_type (plugin->env->cls, - (const struct sockaddr *) &v4, - sizeof (v4)); - } - if (sizeof (struct IPv6UdpAddress) == address->address_length) - { - struct sockaddr_in6 v6; - - udp_v6 = (struct IPv6UdpAddress *) address->address; - memset (&v6, '\0', sizeof (v6)); - v6.sin6_family = AF_INET6; -#if HAVE_SOCKADDR_IN_SIN_LEN - v6.sin6_len = sizeof (struct sockaddr_in6); -#endif - v6.sin6_port = udp_v6->u6_port; - v6.sin6_addr = udp_v6->ipv6_addr; - network_type = plugin->env->get_address_type (plugin->env->cls, - (const struct sockaddr *) &v6, - sizeof (v6)); - } - return udp_plugin_create_session (cls, - address, - network_type); -} - - -/** - * Enqueue a message for transmission. - * - * @param plugin the UDP plugin - * @param udpw message wrapper to queue - */ -static void -enqueue (struct Plugin *plugin, - struct UDP_MessageWrapper *udpw) -{ - struct Session *session = udpw->session; - - if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX) - { - GNUNET_break (0); - } - else - { - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes in buffers", - udpw->msg_size, - GNUNET_NO); - plugin->bytes_in_buffer += udpw->msg_size; - } - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, msgs in buffers", - 1, GNUNET_NO); - if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress)) - GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head, - plugin->ipv4_queue_tail, - udpw); - else if (udpw->session->address->address_length == sizeof (struct IPv6UdpAddress)) - GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head, - plugin->ipv6_queue_tail, - udpw); - else - { - GNUNET_break (0); - return; - } - session->msgs_in_queue++; - session->bytes_in_queue += udpw->msg_size; -} - - -/** - * Fragment message was transmitted via UDP, let fragmentation know - * to send the next fragment now. - * - * @param cls the `struct UDPMessageWrapper *` of the fragment - * @param target destination peer (ignored) - * @param result #GNUNET_OK on success (ignored) - * @param payload bytes payload sent - * @param physical bytes physical sent - */ -static void -send_next_fragment (void *cls, - const struct GNUNET_PeerIdentity *target, - int result, - size_t payload, - size_t physical) -{ - struct UDP_MessageWrapper *udpw = cls; - - GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag); -} - - -/** - * Function that is called with messages created by the fragmentation - * module. In the case of the 'proc' callback of the - * #GNUNET_FRAGMENT_context_create() function, this function must - * eventually call #GNUNET_FRAGMENT_context_transmission_done(). - * - * @param cls closure, the 'struct FragmentationContext' - * @param msg the message that was created - */ -static void -enqueue_fragment (void *cls, - const struct GNUNET_MessageHeader *msg) -{ - struct UDP_FragmentationContext *frag_ctx = cls; - struct Plugin *plugin = frag_ctx->plugin; - struct UDP_MessageWrapper *udpw; - size_t msg_len = ntohs (msg->size); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Enqueuing fragment with %u bytes\n", - msg_len); - udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len); - udpw->session = frag_ctx->session; - udpw->msg_buf = (char *) &udpw[1]; - udpw->msg_size = msg_len; - udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */ - udpw->cont = &send_next_fragment; - udpw->cont_cls = udpw; - udpw->timeout = frag_ctx->timeout; - udpw->frag_ctx = frag_ctx; - udpw->msg_type = UMT_MSG_FRAGMENTED; - memcpy (udpw->msg_buf, msg, msg_len); - enqueue (plugin, - udpw); - if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress)) - schedule_select_v4 (plugin); - else - schedule_select_v6 (plugin); -} +} /** - * Function that can be used by the transport service to transmit - * a message using the plugin. Note that in the case of a - * peer disconnecting, the continuation MUST be called - * prior to the disconnect notification itself. This function - * will be called with this peer's HELLO message to initiate - * a fresh connection to another peer. + * Function that can be used by the transport service to transmit a + * message using the plugin. Note that in the case of a peer + * disconnecting, the continuation MUST be called prior to the + * disconnect notification itself. This function will be called with + * this peer's HELLO message to initiate a fresh connection to another + * peer. * * @param cls closure * @param s which session must be used * @param msgbuf the message to transmit - * @param msgbuf_size number of bytes in 'msgbuf' + * @param msgbuf_size number of bytes in @a msgbuf * @param priority how important is the message (most plugins will * ignore message priority and just FIFO) * @param to how long to wait at most for the transmission (does not @@ -2057,7 +1805,7 @@ enqueue_fragment (void *cls, * been transmitted (or if the transport is ready * for the next transmission call; or if the * peer disconnected...); can be NULL - * @param cont_cls closure for cont + * @param cont_cls closure for @a cont * @return number of bytes used (on the physical network, with overheads); * -1 on hard errors (i.e. address invalid); 0 is a legal value * and does NOT mean that the message was not transmitted (DV) @@ -2074,20 +1822,20 @@ udp_plugin_send (void *cls, { struct Plugin *plugin = cls; size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage); - struct UDP_FragmentationContext * frag_ctx; - struct UDP_MessageWrapper * udpw; + struct UDP_FragmentationContext *frag_ctx; + struct UDP_MessageWrapper *udpw; struct UDPMessage *udp; - char mbuf[udpmlen]; + char mbuf[udpmlen] GNUNET_ALIGN; - if ( (s->address->address_length == sizeof(struct IPv6UdpAddress)) && - (plugin->sockv6 == NULL) ) + if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) && + (NULL == plugin->sockv6) ) return GNUNET_SYSERR; - if ( (s->address->address_length == sizeof(struct IPv4UdpAddress)) && - (plugin->sockv4 == NULL) ) + if ( (sizeof(struct IPv4UdpAddress) == s->address->address_length) && + (NULL == plugin->sockv4) ) return GNUNET_SYSERR; if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { - GNUNET_break(0); + GNUNET_break (0); return GNUNET_SYSERR; } if (GNUNET_YES != @@ -2095,7 +1843,7 @@ udp_plugin_send (void *cls, &s->target, s)) { - GNUNET_break(0); + GNUNET_break (0); return GNUNET_SYSERR; } LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -2106,19 +1854,19 @@ udp_plugin_send (void *cls, s->address->address, s->address->address_length)); - /* Message */ udp = (struct UDPMessage *) mbuf; udp->header.size = htons (udpmlen); udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE); udp->reserved = htonl (0); udp->sender = *plugin->env->my_identity; - /* We do not update the session time out here! - * Otherwise this session will not timeout since we send keep alive before - * session can timeout + /* We do not update the session time out here! Otherwise this + * session will not timeout since we send keep alive before session + * can timeout. * - * For UDP we update session timeout only on receive, this will cover keep - * alives, since remote peer will reply with keep alive response! + * For UDP we update session timeout only on receive, this will + * cover keep alives, since remote peer will reply with keep alive + * responses! */ if (udpmlen <= UDP_MTU) { @@ -2128,35 +1876,43 @@ udp_plugin_send (void *cls, udpw->msg_buf = (char *) &udpw[1]; udpw->msg_size = udpmlen; /* message size with UDP overhead */ udpw->payload_size = msgbuf_size; /* message size without UDP overhead */ - udpw->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), to); + udpw->timeout = GNUNET_TIME_relative_to_absolute (to); udpw->cont = cont; udpw->cont_cls = cont_cls; udpw->frag_ctx = NULL; - udpw->msg_type = UMT_MSG_UNFRAGMENTED; - memcpy (udpw->msg_buf, udp, sizeof(struct UDPMessage)); - memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)], msgbuf, msgbuf_size); - enqueue (plugin, udpw); - + udpw->qc = &qc_message_sent; + udpw->qc_cls = plugin; + memcpy (udpw->msg_buf, + udp, + sizeof (struct UDPMessage)); + memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)], + msgbuf, + msgbuf_size); + enqueue (plugin, + udpw); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, messages, attempt", 1, GNUNET_NO); + "# UDP, unfragmented messages queued total", + 1, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, bytes payload, attempt", + "# UDP, unfragmented bytes payload queued total", udpw->payload_size, GNUNET_NO); } else { /* fragmented message */ - if (s->frag_ctx != NULL) + if (NULL != s->frag_ctx) return GNUNET_SYSERR; - memcpy (&udp[1], msgbuf, msgbuf_size); + memcpy (&udp[1], + msgbuf, + msgbuf_size); frag_ctx = GNUNET_new (struct UDP_FragmentationContext); frag_ctx->plugin = plugin; frag_ctx->session = s; frag_ctx->cont = cont; frag_ctx->cont_cls = cont_cls; - frag_ctx->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), - to); + frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to); frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */ frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */ frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats, @@ -2169,124 +1925,552 @@ udp_plugin_send (void *cls, frag_ctx); s->frag_ctx = frag_ctx; GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, pending", + "# UDP, fragmented messages active", 1, GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, attempt", + "# UDP, fragmented messages, total", 1, GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes payload, attempt", + "# UDP, fragmented bytes (payload)", frag_ctx->payload_size, GNUNET_NO); } - notify_session_monitor (s->plugin, - s, - GNUNET_TRANSPORT_SS_UPDATE); - if (s->address->address_length == sizeof (struct IPv4UdpAddress)) - schedule_select_v4 (plugin); + notify_session_monitor (s->plugin, + s, + GNUNET_TRANSPORT_SS_UPDATE); + if (s->address->address_length == sizeof (struct IPv4UdpAddress)) + schedule_select_v4 (plugin); + else + schedule_select_v6 (plugin); + return udpmlen; +} + + +/** + * Handle an ACK message. + * + * @param plugin the UDP plugin + * @param msg the (presumed) UDP ACK message + * @param udp_addr sender address + * @param udp_addr_len number of bytes in @a udp_addr + */ +static void +read_process_ack (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + const union UdpAddress *udp_addr, + socklen_t udp_addr_len) +{ + const struct GNUNET_MessageHeader *ack; + const struct UDP_ACK_Message *udp_ack; + struct GNUNET_HELLO_Address *address; + struct Session *s; + struct GNUNET_TIME_Relative flow_delay; + + if (ntohs (msg->size) + < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return; + } + udp_ack = (const struct UDP_ACK_Message *) msg; + address = GNUNET_HELLO_address_allocate (&udp_ack->sender, + PLUGIN_NAME, + udp_addr, + udp_addr_len, + GNUNET_HELLO_ADDRESS_INFO_NONE); + s = udp_plugin_lookup_session (plugin, + address); + if (NULL == s) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "UDP session of address %s for ACK not found\n", + udp_address_to_string (plugin, + address->address, + address->address_length)); + GNUNET_HELLO_address_free (address); + return; + } + if (NULL == s->frag_ctx) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Fragmentation context of address %s for ACK not found\n", + udp_address_to_string (plugin, + address->address, + address->address_length)); + GNUNET_HELLO_address_free (address); + return; + } + GNUNET_HELLO_address_free (address); + + flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "We received a sending delay of %s for %s\n", + GNUNET_STRINGS_relative_time_to_string (flow_delay, + GNUNET_YES), + GNUNET_i2s (&udp_ack->sender)); + s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay); + + ack = (const struct GNUNET_MessageHeader *) &udp_ack[1]; + if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message)) + { + GNUNET_break_op(0); + return; + } + + if (GNUNET_OK != + GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, + ack)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "UDP processes %u-byte acknowledgement from `%s' at `%s'\n", + (unsigned int) ntohs (msg->size), + GNUNET_i2s (&udp_ack->sender), + udp_address_to_string (plugin, + udp_addr, + udp_addr_len)); + /* Expect more ACKs to arrive */ + return; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message from %s at %s full ACK'ed\n", + GNUNET_i2s (&udp_ack->sender), + udp_address_to_string (plugin, + udp_addr, + udp_addr_len)); + + /* Remove fragmented message after successful sending */ + fragmented_message_done (s->frag_ctx, + GNUNET_OK); +} + + +/* ********************** Receiving ********************** */ + + +/** + * Closure for #find_receive_context(). + */ +struct FindReceiveContext +{ + /** + * Where to store the result. + */ + struct DefragContext *rc; + + /** + * Session associated with this context. + */ + struct Session *session; + + /** + * Address to find. + */ + const union UdpAddress *udp_addr; + + /** + * Number of bytes in @e udp_addr. + */ + size_t udp_addr_len; + +}; + + +/** + * Scan the heap for a receive context with the given address. + * + * @param cls the `struct FindReceiveContext` + * @param node internal node of the heap + * @param element value stored at the node (a `struct ReceiveContext`) + * @param cost cost associated with the node + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. + */ +static int +find_receive_context (void *cls, + struct GNUNET_CONTAINER_HeapNode *node, + void *element, + GNUNET_CONTAINER_HeapCostType cost) +{ + struct FindReceiveContext *frc = cls; + struct DefragContext *e = element; + + if ( (frc->udp_addr_len == e->udp_addr_len) && + (0 == memcmp (frc->udp_addr, + e->udp_addr, + frc->udp_addr_len)) ) + { + frc->rc = e; + return GNUNET_NO; + } + return GNUNET_YES; +} + + +/** + * Message tokenizer has broken up an incomming message. Pass it on + * to the service. + * + * @param cls the `struct Plugin *` + * @param client the `struct Session *` + * @param hdr the actual message + * @return #GNUNET_OK (always) + */ +static int +process_inbound_tokenized_messages (void *cls, + void *client, + const struct GNUNET_MessageHeader *hdr) +{ + struct Plugin *plugin = cls; + struct Session *session = client; + + if (GNUNET_YES == session->in_destroy) + return GNUNET_OK; + reschedule_session_timeout (session); + session->flow_delay_for_other_peer + = plugin->env->receive (plugin->env->cls, + session->address, + session, + hdr); + return GNUNET_OK; +} + + +/** + * Functions with this signature are called whenever we need to close + * a session due to a disconnect or failure to establish a connection. + * + * @param cls closure with the `struct Plugin` + * @param s session to close down + * @return #GNUNET_OK on success + */ +static int +udp_disconnect_session (void *cls, + struct Session *s) +{ + struct Plugin *plugin = cls; + struct UDP_MessageWrapper *udpw; + struct UDP_MessageWrapper *next; + struct FindReceiveContext frc; + + GNUNET_assert (GNUNET_YES != s->in_destroy); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Session %p to peer `%s' at address %s ended\n", + s, + GNUNET_i2s (&s->target), + udp_address_to_string (plugin, + s->address->address, + s->address->address_length)); + if (NULL != s->timeout_task) + { + GNUNET_SCHEDULER_cancel (s->timeout_task); + s->timeout_task = NULL; + } + if (NULL != s->frag_ctx) + { + /* Remove fragmented message due to disconnect */ + fragmented_message_done (s->frag_ctx, + GNUNET_SYSERR); + } + + frc.rc = NULL; + frc.udp_addr = s->address->address; + frc.udp_addr_len = s->address->address_length; + /* Lookup existing receive context for this address */ + if (NULL != plugin->defrag_ctxs) + { + GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs, + &find_receive_context, + &frc); + if (NULL != frc.rc) + { + struct DefragContext *d_ctx = frc.rc; + + GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode); + GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag); + GNUNET_free (d_ctx); + } + } + next = plugin->ipv4_queue_head; + while (NULL != (udpw = next)) + { + next = udpw->next; + if (udpw->session == s) + { + dequeue (plugin, + udpw); + udpw->qc (udpw->qc_cls, + udpw, + GNUNET_SYSERR); + GNUNET_free (udpw); + } + } + next = plugin->ipv6_queue_head; + while (NULL != (udpw = next)) + { + next = udpw->next; + if (udpw->session == s) + { + dequeue (plugin, + udpw); + udpw->qc (udpw->qc_cls, + udpw, + GNUNET_SYSERR); + GNUNET_free (udpw); + } + } + notify_session_monitor (s->plugin, + s, + GNUNET_TRANSPORT_SS_DONE); + plugin->env->session_end (plugin->env->cls, + s->address, + s); + + if ( (NULL != s->frag_ctx) && + (NULL != s->frag_ctx->cont) ) + { + /* The 'frag_ctx' itself will be freed in #free_session() a bit + later, as it might be in use right now */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Calling continuation for fragemented message to `%s' with result SYSERR\n", + GNUNET_i2s (&s->target)); + s->frag_ctx->cont (s->frag_ctx->cont_cls, + &s->target, + GNUNET_SYSERR, + s->frag_ctx->payload_size, + s->frag_ctx->on_wire_size); + } + + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (plugin->sessions, + &s->target, + s)); + GNUNET_STATISTICS_set (plugin->env->stats, + "# UDP sessions active", + GNUNET_CONTAINER_multipeermap_size (plugin->sessions), + GNUNET_NO); + if (s->rc > 0) + { + s->in_destroy = GNUNET_YES; + } else - schedule_select_v6 (plugin); - return udpmlen; + { + free_session (s); + } + return GNUNET_OK; } /** - * Our external IP address/port mapping has changed. + * Destroy a session, plugin is being unloaded. * - * @param cls closure, the `struct LocalAddrList` - * @param add_remove #GNUNET_YES to mean the new public IP address, #GNUNET_NO to mean - * the previous (now invalid) one - * @param addr either the previous or the new public IP address - * @param addrlen actual lenght of the address + * @param cls the `struct Plugin` + * @param key hash of public key of target peer + * @param value a `struct PeerSession *` to clean up + * @return #GNUNET_OK (continue to iterate) + */ +static int +disconnect_and_free_it (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) +{ + struct Plugin *plugin = cls; + + udp_disconnect_session (plugin, + value); + return GNUNET_OK; +} + + +/** + * Disconnect from a remote node. Clean up session if we have one for + * this peer. + * + * @param cls closure for this call (should be handle to Plugin) + * @param target the peeridentity of the peer to disconnect + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed */ static void -udp_nat_port_map_callback (void *cls, - int add_remove, - const struct sockaddr *addr, - socklen_t addrlen) +udp_disconnect (void *cls, + const struct GNUNET_PeerIdentity *target) { struct Plugin *plugin = cls; - struct GNUNET_HELLO_Address *address; - struct IPv4UdpAddress u4; - struct IPv6UdpAddress u6; - void *arg; - size_t args; - LOG (GNUNET_ERROR_TYPE_INFO, - "NAT notification to %s address `%s'\n", - (GNUNET_YES == add_remove) ? "add" : "remove", - GNUNET_a2s (addr, addrlen)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Disconnecting from peer `%s'\n", + GNUNET_i2s (target)); + GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions, + target, + &disconnect_and_free_it, + plugin); +} + - /* convert 'address' to our internal format */ - switch (addr->sa_family) +/** + * Session was idle, so disconnect it. + * + * @param cls the `struct Session` to time out + * @param tc scheduler context + */ +static void +session_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Session *s = cls; + struct Plugin *plugin = s->plugin; + struct GNUNET_TIME_Relative left; + + s->timeout_task = NULL; + left = GNUNET_TIME_absolute_get_remaining (s->timeout); + if (left.rel_value_us > 0) { - case AF_INET: - GNUNET_assert(addrlen == sizeof(struct sockaddr_in)); - memset (&u4, 0, sizeof(u4)); - u4.options = htonl (plugin->myoptions); - u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr; - u4.u4_port = ((struct sockaddr_in *) addr)->sin_port; - if (0 == ((struct sockaddr_in *) addr)->sin_port) - return; - arg = &u4; - args = sizeof(struct IPv4UdpAddress); - break; - case AF_INET6: - GNUNET_assert(addrlen == sizeof(struct sockaddr_in6)); - memset (&u6, 0, sizeof(u6)); - u6.options = htonl (plugin->myoptions); - if (0 == ((struct sockaddr_in6 *) addr)->sin6_port) - return; - memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr, - sizeof(struct in6_addr)); - u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port; - arg = &u6; - args = sizeof(struct IPv6UdpAddress); - break; - default: - GNUNET_break(0); + /* not actually our turn yet, but let's at least update + the monitor, it may think we're about to die ... */ + notify_session_monitor (s->plugin, + s, + GNUNET_TRANSPORT_SS_UPDATE); + s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, + &session_timeout, + s); return; } - /* modify our published address list */ - address = GNUNET_HELLO_address_allocate (plugin->env->my_identity, - PLUGIN_NAME, - arg, args, - GNUNET_HELLO_ADDRESS_INFO_NONE); - plugin->env->notify_address (plugin->env->cls, add_remove, address); - GNUNET_HELLO_address_free (address); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Session %p was idle for %s, disconnecting\n", + s, + GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT, + GNUNET_YES)); + /* call session destroy function */ + udp_disconnect_session (plugin, + s); } /** - * Message tokenizer has broken up an incomming message. Pass it on - * to the service. + * Allocate a new session for the given endpoint address. + * Note that this function does not inform the service + * of the new session, this is the responsibility of the + * caller (if needed). + * + * @param cls the `struct Plugin` + * @param address address of the other peer to use + * @param network_type network type the address belongs to + * @return NULL on error, otherwise session handle + */ +static struct Session * +udp_plugin_create_session (void *cls, + const struct GNUNET_HELLO_Address *address, + enum GNUNET_ATS_Network_Type network_type) +{ + struct Plugin *plugin = cls; + struct Session *s; + + s = GNUNET_new (struct Session); + s->plugin = plugin; + s->address = GNUNET_HELLO_address_copy (address); + s->target = address->peer; + s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + 250); + s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS; + s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS; + s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO; + s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); + s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, + &session_timeout, + s); + s->scope = network_type; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new session %p for peer `%s' address `%s'\n", + s, + GNUNET_i2s (&address->peer), + udp_address_to_string (plugin, + address->address, + address->address_length)); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_put (plugin->sessions, + &s->target, + s, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); + GNUNET_STATISTICS_set (plugin->env->stats, + "# UDP sessions active", + GNUNET_CONTAINER_multipeermap_size (plugin->sessions), + GNUNET_NO); + return s; +} + + +/** + * Creates a new outbound session the transport service will use to + * send data to the peer. * * @param cls the `struct Plugin *` - * @param client the `struct Session *` - * @param hdr the actual message - * @return #GNUNET_OK (always) + * @param address the address + * @return the session or NULL of max connections exceeded */ -static int -process_inbound_tokenized_messages (void *cls, - void *client, - const struct GNUNET_MessageHeader *hdr) +static struct Session * +udp_plugin_get_session (void *cls, + const struct GNUNET_HELLO_Address *address) { struct Plugin *plugin = cls; - struct Session *session = client; - struct GNUNET_TIME_Relative delay; + struct Session *s; + enum GNUNET_ATS_Network_Type network_type; + const struct IPv4UdpAddress *udp_v4; + const struct IPv6UdpAddress *udp_v6; - if (GNUNET_YES == session->in_destroy) - return GNUNET_OK; - /* setup ATS */ - reschedule_session_timeout (session); - delay = plugin->env->receive (plugin->env->cls, - session->address, - session, - hdr); - session->flow_delay_for_other_peer = delay; - return GNUNET_OK; + if (NULL == address) + { + GNUNET_break (0); + return NULL; + } + if ( (address->address_length != sizeof(struct IPv4UdpAddress)) && + (address->address_length != sizeof(struct IPv6UdpAddress)) ) + { + GNUNET_break_op (0); + return NULL; + } + if (NULL != (s = udp_plugin_lookup_session (cls, + address))) + return s; + + /* need to create new session */ + if (sizeof (struct IPv4UdpAddress) == address->address_length) + { + struct sockaddr_in v4; + + udp_v4 = (const struct IPv4UdpAddress *) address->address; + memset (&v4, '\0', sizeof (v4)); + v4.sin_family = AF_INET; +#if HAVE_SOCKADDR_IN_SIN_LEN + v4.sin_len = sizeof (struct sockaddr_in); +#endif + v4.sin_port = udp_v4->u4_port; + v4.sin_addr.s_addr = udp_v4->ipv4_addr; + network_type = plugin->env->get_address_type (plugin->env->cls, + (const struct sockaddr *) &v4, + sizeof (v4)); + } + if (sizeof (struct IPv6UdpAddress) == address->address_length) + { + struct sockaddr_in6 v6; + + udp_v6 = (const struct IPv6UdpAddress *) address->address; + memset (&v6, '\0', sizeof (v6)); + v6.sin6_family = AF_INET6; +#if HAVE_SOCKADDR_IN_SIN_LEN + v6.sin6_len = sizeof (struct sockaddr_in6); +#endif + v6.sin6_port = udp_v6->u6_port; + v6.sin6_addr = udp_v6->ipv6_addr; + network_type = plugin->env->get_address_type (plugin->env->cls, + (const struct sockaddr *) &v6, + sizeof (v6)); + } + GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type); + return udp_plugin_create_session (cls, + address, + network_type); } @@ -2328,7 +2512,8 @@ process_udp_message (struct Plugin *plugin, udp_addr_len, GNUNET_HELLO_ADDRESS_INFO_NONE); if (NULL == - (s = udp_plugin_lookup_session (plugin, address))) + (s = udp_plugin_lookup_session (plugin, + address))) { s = udp_plugin_create_session (plugin, address, @@ -2346,7 +2531,6 @@ process_udp_message (struct Plugin *plugin, } GNUNET_free (address); - /* iterate over all embedded messages */ s->rc++; GNUNET_SERVER_mst_receive (plugin->mst, s, @@ -2355,7 +2539,8 @@ process_udp_message (struct Plugin *plugin, GNUNET_YES, GNUNET_NO); s->rc--; - if ((0 == s->rc) && (GNUNET_YES == s->in_destroy)) + if ( (0 == s->rc) && + (GNUNET_YES == s->in_destroy) ) free_session (s); } @@ -2375,22 +2560,55 @@ fragment_msg_proc (void *cls, if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE) { - GNUNET_break(0); + GNUNET_break_op (0); return; } if (ntohs (msg->size) < sizeof(struct UDPMessage)) { - GNUNET_break(0); + GNUNET_break_op (0); return; } - um = (const struct UDPMessage *) msg; - dc->sender = um->sender; - dc->have_sender = GNUNET_YES; - process_udp_message (dc->plugin, - um, - dc->udp_addr, - dc->udp_addr_len, - dc->network_type); + um = (const struct UDPMessage *) msg; + dc->sender = um->sender; + dc->have_sender = GNUNET_YES; + process_udp_message (dc->plugin, + um, + dc->udp_addr, + dc->udp_addr_len, + dc->network_type); +} + + +/** + * We finished sending an acknowledgement. Update + * statistics. + * + * @param cls the `struct Plugin` + * @param udpw message queue entry of the ACK + * @param result #GNUNET_OK if the transmission worked, + * #GNUNET_SYSERR if we failed to send the ACK + */ +static void +ack_message_sent (void *cls, + struct UDP_MessageWrapper *udpw, + int result) +{ + struct Plugin *plugin = cls; + + if (GNUNET_OK == result) + { + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, ACK messages sent", + 1, + GNUNET_NO); + } + else + { + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, ACK transmissions failed", + 1, + GNUNET_NO); + } } @@ -2407,6 +2625,7 @@ ack_proc (void *cls, const struct GNUNET_MessageHeader *msg) { struct DefragContext *rc = cls; + struct Plugin *plugin = rc->plugin; size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size); struct UDP_ACK_Message *udp_ack; uint32_t delay; @@ -2425,19 +2644,23 @@ ack_proc (void *cls, rc->udp_addr, rc->udp_addr_len, GNUNET_HELLO_ADDRESS_INFO_NONE); - s = udp_plugin_lookup_session (rc->plugin, + s = udp_plugin_lookup_session (plugin, address); GNUNET_HELLO_address_free (address); if (NULL == s) { LOG (GNUNET_ERROR_TYPE_ERROR, "Trying to transmit ACK to peer `%s' but no session found!\n", - udp_address_to_string (rc->plugin, + udp_address_to_string (plugin, rc->udp_addr, rc->udp_addr_len)); GNUNET_CONTAINER_heap_remove_node (rc->hnode); GNUNET_DEFRAGMENT_context_destroy (rc->defrag); GNUNET_free (rc); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, ACK transmissions failed", + 1, + GNUNET_NO); return; } if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX) @@ -2447,7 +2670,7 @@ ack_proc (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %s\n", - udp_address_to_string (rc->plugin, + udp_address_to_string (plugin, rc->udp_addr, rc->udp_addr_len), GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer, @@ -2458,120 +2681,25 @@ ack_proc (void *cls, udpw->session = s; udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; udpw->msg_buf = (char *) &udpw[1]; - udpw->msg_type = UMT_MSG_ACK; + udpw->qc = &ack_message_sent; + udpw->qc_cls = plugin; udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf; udp_ack->header.size = htons ((uint16_t) msize); udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); udp_ack->delay = htonl (delay); - udp_ack->sender = *rc->plugin->env->my_identity; - memcpy (&udp_ack[1], msg, ntohs (msg->size)); - enqueue (rc->plugin, udpw); - notify_session_monitor (s->plugin, + udp_ack->sender = *plugin->env->my_identity; + memcpy (&udp_ack[1], + msg, + ntohs (msg->size)); + enqueue (plugin, + udpw); + notify_session_monitor (plugin, s, GNUNET_TRANSPORT_SS_UPDATE); if (s->address->address_length == sizeof (struct IPv4UdpAddress)) - schedule_select_v4 (rc->plugin); + schedule_select_v4 (plugin); else - schedule_select_v6 (rc->plugin); -} - - -/** - * Handle an ACK message. - * - * @param plugin the UDP plugin - * @param msg the (presumed) UDP ACK message - * @param udp_addr sender address - * @param udp_addr_len number of bytes in @a udp_addr - */ -static void -read_process_ack (struct Plugin *plugin, - const struct GNUNET_MessageHeader *msg, - const union UdpAddress *udp_addr, - socklen_t udp_addr_len) -{ - const struct GNUNET_MessageHeader *ack; - const struct UDP_ACK_Message *udp_ack; - struct GNUNET_HELLO_Address *address; - struct Session *s; - struct GNUNET_TIME_Relative flow_delay; - - if (ntohs (msg->size) - < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader)) - { - GNUNET_break_op(0); - return; - } - udp_ack = (const struct UDP_ACK_Message *) msg; - address = GNUNET_HELLO_address_allocate (&udp_ack->sender, - PLUGIN_NAME, - udp_addr, - udp_addr_len, - GNUNET_HELLO_ADDRESS_INFO_NONE); - s = udp_plugin_lookup_session (plugin, - address); - if (NULL == s) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "UDP session of address %s for ACK not found\n", - udp_address_to_string (plugin, - address->address, - address->address_length)); - GNUNET_HELLO_address_free (address); - return; - } - if (NULL == s->frag_ctx) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Fragmentation context of address %s for ACK not found\n", - udp_address_to_string (plugin, - address->address, - address->address_length)); - GNUNET_HELLO_address_free (address); - return; - } - GNUNET_HELLO_address_free (address); - - flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "We received a sending delay of %s for %s\n", - GNUNET_STRINGS_relative_time_to_string (flow_delay, - GNUNET_YES), - GNUNET_i2s (&udp_ack->sender)); - s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay); - - ack = (const struct GNUNET_MessageHeader *) &udp_ack[1]; - if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message)) - { - GNUNET_break_op(0); - return; - } - - if (GNUNET_OK != - GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, - ack)) - { - LOG(GNUNET_ERROR_TYPE_DEBUG, - "UDP processes %u-byte acknowledgement from `%s' at `%s'\n", - (unsigned int ) ntohs (msg->size), - GNUNET_i2s (&udp_ack->sender), - udp_address_to_string (plugin, - udp_addr, - udp_addr_len)); - /* Expect more ACKs to arrive */ - return; - } - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Message from %s at %s full ACK'ed\n", - GNUNET_i2s (&udp_ack->sender), - udp_address_to_string (plugin, - udp_addr, - udp_addr_len)); - - /* Remove fragmented message after successful sending */ - fragmented_message_done (s->frag_ctx, - GNUNET_OK); + schedule_select_v6 (plugin); } @@ -2625,10 +2753,10 @@ read_process_fragment (struct Plugin *plugin, &ack_proc); d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx, - (GNUNET_CONTAINER_HeapCostType) now.abs_value_us); + (GNUNET_CONTAINER_HeapCostType) now.abs_value_us); LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n", - (unsigned int ) ntohs (msg->size), + (unsigned int) ntohs (msg->size), udp_address_to_string (plugin, udp_addr, udp_addr_len)); @@ -2637,19 +2765,20 @@ read_process_fragment (struct Plugin *plugin, { LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n", - (unsigned int ) ntohs (msg->size), + (unsigned int) ntohs (msg->size), udp_address_to_string (plugin, udp_addr, udp_addr_len)); } if (GNUNET_OK == - GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg)) + GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, + msg)) { /* keep this 'rc' from expiring */ GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode, - (GNUNET_CONTAINER_HeapCostType) now.abs_value_us); + (GNUNET_CONTAINER_HeapCostType) now.abs_value_us); } if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) > UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG) @@ -2659,6 +2788,10 @@ read_process_fragment (struct Plugin *plugin, GNUNET_assert (NULL != d_ctx); GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag); GNUNET_free (d_ctx); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, Defragmentations aborted", + 1, + GNUNET_NO); } } @@ -2687,10 +2820,15 @@ udp_select_read (struct Plugin *plugin, size_t int_addr_len; enum GNUNET_ATS_Network_Type network_type; - fromlen = sizeof(addr); - memset (&addr, 0, sizeof(addr)); - size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof(buf), - (struct sockaddr *) &addr, &fromlen); + fromlen = sizeof (addr); + memset (&addr, + 0, + sizeof(addr)); + size = GNUNET_NETWORK_socket_recvfrom (rsock, + buf, + sizeof(buf), + (struct sockaddr *) &addr, + &fromlen); sa = (const struct sockaddr *) &addr; #if MINGW /* On SOCK_DGRAM UDP sockets recvfrom might fail with a @@ -2703,8 +2841,9 @@ udp_select_read (struct Plugin *plugin, * error indicates a previous send operation resulted in an ICMP Port * Unreachable message. */ - if ( (-1 == size) && (ECONNRESET == errno) ) - return; + if ( (-1 == size) && + (ECONNRESET == errno) ) + return; #endif if (-1 == size) { @@ -2719,10 +2858,11 @@ udp_select_read (struct Plugin *plugin, LOG (GNUNET_ERROR_TYPE_WARNING, "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n", (unsigned int ) size, - GNUNET_a2s (sa, fromlen)); + GNUNET_a2s (sa, + fromlen)); /* _MAY_ be a connection failure (got partial message) */ /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */ - GNUNET_break_op(0); + GNUNET_break_op (0); return; } msg = (const struct GNUNET_MessageHeader *) buf; @@ -2743,7 +2883,7 @@ udp_select_read (struct Plugin *plugin, return; } GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes, received", + "# UDP, total bytes received", size, GNUNET_NO); network_type = plugin->env->get_address_type (plugin->env->cls, @@ -2828,7 +2968,7 @@ static struct UDP_MessageWrapper * remove_timeout_messages_and_select (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) { - struct UDP_MessageWrapper *udpw = NULL; + struct UDP_MessageWrapper *udpw; struct GNUNET_TIME_Relative remaining; struct Session *session; int removed; @@ -2845,95 +2985,26 @@ remove_timeout_messages_and_select (struct Plugin *plugin, if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us) { /* Message timed out */ - switch (udpw->msg_type) - { - case UMT_MSG_UNFRAGMENTED: - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes, sent, timeout", - udpw->msg_size, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, messages, sent, timeout", - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, messages, sent, timeout", - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, bytes, sent, timeout", - udpw->payload_size, - GNUNET_NO); - /* Not fragmented message */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Message for peer `%s' with size %u timed out\n", - GNUNET_i2s (&udpw->session->target), - udpw->payload_size); - call_continuation (udpw, GNUNET_SYSERR); - /* Remove message */ - removed = GNUNET_YES; - dequeue (plugin, udpw); - GNUNET_free(udpw); - break; - case UMT_MSG_FRAGMENTED: - /* Fragmented message */ - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes, sent, timeout", - udpw->frag_ctx->on_wire_size, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, messages, sent, timeout", - 1, - GNUNET_NO); - call_continuation (udpw, - GNUNET_SYSERR); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Fragment for message for peer `%s' with size %u timed out\n", - GNUNET_i2s (&udpw->session->target), - udpw->frag_ctx->payload_size); - - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, sent, timeout", - 1, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes, sent, timeout", - udpw->frag_ctx->payload_size, - GNUNET_NO); - /* Remove fragmented message due to timeout */ - fragmented_message_done (udpw->frag_ctx, - GNUNET_SYSERR); - break; - case UMT_MSG_ACK: - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes, sent, timeout", - udpw->msg_size, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, messages, sent, timeout", - 1, - GNUNET_NO); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "ACK Message for peer `%s' with size %u timed out\n", - GNUNET_i2s (&udpw->session->target), - udpw->payload_size); - call_continuation (udpw, - GNUNET_SYSERR); - removed = GNUNET_YES; - dequeue (plugin, - udpw); - GNUNET_free (udpw); - break; - default: - break; - } + udpw->qc (udpw->qc_cls, + udpw, + GNUNET_SYSERR); + /* Remove message */ + removed = GNUNET_YES; + dequeue (plugin, + udpw); + GNUNET_free (udpw); + if (sock == plugin->sockv4) + { udpw = plugin->ipv4_queue_head; + } else if (sock == plugin->sockv6) + { udpw = plugin->ipv6_queue_head; + } else { - GNUNET_break(0); /* should never happen */ + GNUNET_break (0); /* should never happen */ udpw = NULL; } GNUNET_STATISTICS_update (plugin->env->stats, @@ -2945,7 +3016,7 @@ remove_timeout_messages_and_select (struct Plugin *plugin, { /* Message did not time out, check flow delay */ remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer); - if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us) + if (0 == remaining.rel_value_us) { /* this message is not delayed */ LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -2976,7 +3047,13 @@ remove_timeout_messages_and_select (struct Plugin *plugin, /** - * FIXME. + * We failed to transmit a message via UDP. Generate + * a descriptive error message. + * + * @param plugin our plugin + * @param sa target address we were trying to reach + * @param slen number of bytes in @a sa + * @param error the errno value returned from the sendto() call */ static void analyze_send_error (struct Plugin *plugin, @@ -2986,10 +3063,13 @@ analyze_send_error (struct Plugin *plugin, { enum GNUNET_ATS_Network_Type type; - type = plugin->env->get_address_type (plugin->env->cls, sa, slen); - if (((GNUNET_ATS_NET_LAN == type) - || (GNUNET_ATS_NET_WAN == type)) - && ((ENETUNREACH == errno)|| (ENETDOWN == errno))) + type = plugin->env->get_address_type (plugin->env->cls, + sa, + slen); + if ( ( (GNUNET_ATS_NET_LAN == type) || + (GNUNET_ATS_NET_WAN == type) ) && + ( (ENETUNREACH == errno) || + (ENETDOWN == errno) ) ) { if (slen == sizeof (struct sockaddr_in)) { @@ -3000,7 +3080,8 @@ analyze_send_error (struct Plugin *plugin, LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, _("UDP could not transmit message to `%s': " "Network seems down, please check your network configuration\n"), - GNUNET_a2s (sa, slen)); + GNUNET_a2s (sa, + slen)); } if (slen == sizeof (struct sockaddr_in6)) { @@ -3020,7 +3101,9 @@ analyze_send_error (struct Plugin *plugin, { LOG (GNUNET_ERROR_TYPE_WARNING, "UDP could not transmit message to `%s': `%s'\n", - GNUNET_a2s (sa, slen), STRERROR (error)); + GNUNET_a2s (sa, + slen), + STRERROR (error)); } } @@ -3038,7 +3121,7 @@ udp_select_send (struct Plugin *plugin, { ssize_t sent; socklen_t slen; - struct sockaddr *a; + const struct sockaddr *a; const struct IPv4UdpAddress *u4; struct sockaddr_in a4; const struct IPv6UdpAddress *u6; @@ -3052,35 +3135,39 @@ udp_select_send (struct Plugin *plugin, if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length) { u4 = udpw->session->address->address; - memset (&a4, 0, sizeof(a4)); + memset (&a4, + 0, + sizeof(a4)); a4.sin_family = AF_INET; #if HAVE_SOCKADDR_IN_SIN_LEN a4.sin_len = sizeof (a4); #endif a4.sin_port = u4->u4_port; - memcpy (&a4.sin_addr, - &u4->ipv4_addr, - sizeof(struct in_addr)); - a = (struct sockaddr *) &a4; + a4.sin_addr.s_addr = u4->ipv4_addr; + a = (const struct sockaddr *) &a4; slen = sizeof (a4); } else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length) { u6 = udpw->session->address->address; - memset (&a6, 0, sizeof(a6)); + memset (&a6, + 0, + sizeof(a6)); a6.sin6_family = AF_INET6; #if HAVE_SOCKADDR_IN_SIN_LEN a6.sin6_len = sizeof (a6); #endif a6.sin6_port = u6->u6_port; - memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr)); - a = (struct sockaddr *) &a6; + a6.sin6_addr = u6->ipv6_addr; + a = (const struct sockaddr *) &a6; slen = sizeof (a6); } else { - call_continuation (udpw, - GNUNET_OK); + GNUNET_break (0); + udpw->qc (udpw->qc_cls, + udpw, + GNUNET_SYSERR); dequeue (plugin, udpw); notify_session_monitor (plugin, @@ -3101,8 +3188,9 @@ udp_select_send (struct Plugin *plugin, a, slen, errno); - call_continuation (udpw, - GNUNET_SYSERR); + udpw->qc (udpw->qc_cls, + udpw, + GNUNET_SYSERR); GNUNET_STATISTICS_update (plugin->env->stats, "# UDP, total, bytes, sent, failure", sent, @@ -3119,7 +3207,8 @@ udp_select_send (struct Plugin *plugin, "UDP transmitted %u-byte message to `%s' `%s' (%d: %s)\n", (unsigned int) (udpw->msg_size), GNUNET_i2s (&udpw->session->target), - GNUNET_a2s (a, slen), + GNUNET_a2s (a, + slen), (int ) sent, (sent < 0) ? STRERROR (errno) : "ok"); GNUNET_STATISTICS_update (plugin->env->stats, @@ -3132,7 +3221,9 @@ udp_select_send (struct Plugin *plugin, GNUNET_NO); if (NULL != udpw->frag_ctx) udpw->frag_ctx->on_wire_size += udpw->msg_size; - call_continuation (udpw, GNUNET_OK); + udpw->qc (udpw->qc_cls, + udpw, + GNUNET_OK); } dequeue (plugin, udpw); @@ -3144,6 +3235,9 @@ udp_select_send (struct Plugin *plugin, } +/* ***************** Event loop (part 2) *************** */ + + /** * We have been notified that our readset has something to read. We don't * know which socket needs to be read, so we have to check each one @@ -3202,6 +3296,9 @@ udp_plugin_select_v6 (void *cls, } +/* ******************* Initialization *************** */ + + /** * Setup the UDP sockets (for IPv4 and IPv6) for the plugin. * @@ -3219,8 +3316,8 @@ setup_sockets (struct Plugin *plugin, int sockets_created = 0; struct sockaddr_in6 server_addrv6; struct sockaddr_in server_addrv4; - struct sockaddr *server_addr; - struct sockaddr *addrs[2]; + const struct sockaddr *server_addr; + const struct sockaddr *addrs[2]; socklen_t addrlens[2]; socklen_t addrlen; int eno; @@ -3229,16 +3326,20 @@ setup_sockets (struct Plugin *plugin, eno = EINVAL; if (GNUNET_YES == plugin->enable_ipv6) { - plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0); + plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, + SOCK_DGRAM, + 0); if (NULL == plugin->sockv6) { - LOG(GNUNET_ERROR_TYPE_WARNING, - "Disabling IPv6 since it is not supported on this system!\n"); + LOG (GNUNET_ERROR_TYPE_INFO, + _("Disabling IPv6 since it is not supported on this system!\n")); plugin->enable_ipv6 = GNUNET_NO; } else { - memset (&server_addrv6, '\0', sizeof(struct sockaddr_in6)); + memset (&server_addrv6, + 0, + sizeof(struct sockaddr_in6)); #if HAVE_SOCKADDR_IN_SIN_LEN server_addrv6.sin6_len = sizeof (struct sockaddr_in6); #endif @@ -3250,19 +3351,21 @@ setup_sockets (struct Plugin *plugin, if (0 == plugin->port) /* autodetect */ server_addrv6.sin6_port - = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, + 33537) + 32000); else server_addrv6.sin6_port = htons (plugin->port); - addrlen = sizeof(struct sockaddr_in6); - server_addr = (struct sockaddr *) &server_addrv6; + addrlen = sizeof (struct sockaddr_in6); + server_addr = (const struct sockaddr *) &server_addrv6; tries = 0; while (tries < 10) { LOG(GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv6 `%s'\n", - GNUNET_a2s (server_addr, addrlen)); + GNUNET_a2s (server_addr, + addrlen)); /* binding */ if (GNUNET_OK == GNUNET_NETWORK_socket_bind (plugin->sockv6, @@ -3277,7 +3380,8 @@ setup_sockets (struct Plugin *plugin, } /* autodetect */ server_addrv6.sin6_port - = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, + 33537) + 32000); tries++; } @@ -3294,17 +3398,19 @@ setup_sockets (struct Plugin *plugin, if (NULL != plugin->sockv6) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "IPv6 socket created on port %s\n", - GNUNET_a2s (server_addr, addrlen)); - addrs[sockets_created] = (struct sockaddr *) &server_addrv6; - addrlens[sockets_created] = sizeof(struct sockaddr_in6); + "IPv6 UDP socket created listinging at %s\n", + GNUNET_a2s (server_addr, + addrlen)); + addrs[sockets_created] = server_addr; + addrlens[sockets_created] = addrlen; sockets_created++; } else { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Failed to bind UDP socket to %s: %s\n", - GNUNET_a2s (server_addr, addrlen), + LOG (GNUNET_ERROR_TYPE_WARNING, + _("Failed to bind UDP socket to %s: %s\n"), + GNUNET_a2s (server_addr, + addrlen), STRERROR (eno)); } } @@ -3312,18 +3418,22 @@ setup_sockets (struct Plugin *plugin, /* Create IPv4 socket */ eno = EINVAL; - plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0); + plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, + SOCK_DGRAM, + 0); if (NULL == plugin->sockv4) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "socket"); - LOG (GNUNET_ERROR_TYPE_WARNING, - "Disabling IPv4 since it is not supported on this system!\n"); + LOG (GNUNET_ERROR_TYPE_INFO, + _("Disabling IPv4 since it is not supported on this system!\n")); plugin->enable_ipv4 = GNUNET_NO; } else { - memset (&server_addrv4, '\0', sizeof(struct sockaddr_in)); + memset (&server_addrv4, + 0, + sizeof(struct sockaddr_in)); #if HAVE_SOCKADDR_IN_SIN_LEN server_addrv4.sin_len = sizeof (struct sockaddr_in); #endif @@ -3342,15 +3452,16 @@ setup_sockets (struct Plugin *plugin, else server_addrv4.sin_port = htons (plugin->port); - addrlen = sizeof(struct sockaddr_in); - server_addr = (struct sockaddr *) &server_addrv4; + addrlen = sizeof (struct sockaddr_in); + server_addr = (const struct sockaddr *) &server_addrv4; tries = 0; while (tries < 10) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding to IPv4 `%s'\n", - GNUNET_a2s (server_addr, addrlen)); + GNUNET_a2s (server_addr, + addrlen)); /* binding */ if (GNUNET_OK == @@ -3367,7 +3478,8 @@ setup_sockets (struct Plugin *plugin, /* autodetect */ server_addrv4.sin_port - = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537) + = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, + 33537) + 32000); tries++; } @@ -3386,16 +3498,18 @@ setup_sockets (struct Plugin *plugin, { LOG (GNUNET_ERROR_TYPE_DEBUG, "IPv4 socket created on port %s\n", - GNUNET_a2s (server_addr, addrlen)); - addrs[sockets_created] = (struct sockaddr *) &server_addrv4; - addrlens[sockets_created] = sizeof(struct sockaddr_in); + GNUNET_a2s (server_addr, + addrlen)); + addrs[sockets_created] = server_addr; + addrlens[sockets_created] = addrlen; sockets_created++; } else { LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to bind UDP socket to %s: %s\n"), - GNUNET_a2s (server_addr, addrlen), + GNUNET_a2s (server_addr, + addrlen), STRERROR (eno)); } } @@ -3412,75 +3526,15 @@ setup_sockets (struct Plugin *plugin, GNUNET_NO, plugin->port, sockets_created, - (const struct sockaddr **) addrs, + addrs, addrlens, &udp_nat_port_map_callback, NULL, plugin); - return sockets_created; } -/** - * Return information about the given session to the - * monitor callback. - * - * @param cls the `struct Plugin` with the monitor callback (`sic`) - * @param peer peer we send information about - * @param value our `struct Session` to send information about - * @return #GNUNET_OK (continue to iterate) - */ -static int -send_session_info_iter (void *cls, - const struct GNUNET_PeerIdentity *peer, - void *value) -{ - struct Plugin *plugin = cls; - struct Session *session = value; - - notify_session_monitor (plugin, - session, - GNUNET_TRANSPORT_SS_INIT); - notify_session_monitor (plugin, - session, - GNUNET_TRANSPORT_SS_UP); - return GNUNET_OK; -} - - -/** - * Begin monitoring sessions of a plugin. There can only - * be one active monitor per plugin (i.e. if there are - * multiple monitors, the transport service needs to - * multiplex the generated events over all of them). - * - * @param cls closure of the plugin - * @param sic callback to invoke, NULL to disable monitor; - * plugin will being by iterating over all active - * sessions immediately and then enter monitor mode - * @param sic_cls closure for @a sic - */ -static void -udp_plugin_setup_monitor (void *cls, - GNUNET_TRANSPORT_SessionInfoCallback sic, - void *sic_cls) -{ - struct Plugin *plugin = cls; - - plugin->sic = sic; - plugin->sic_cls = sic_cls; - if (NULL != sic) - { - GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions, - &send_session_info_iter, - plugin); - /* signal end of first iteration */ - sic (sic_cls, NULL, NULL); - } -} - - /** * The exported method. Makes the core api available via a global and * returns the udp transport API. @@ -3502,7 +3556,6 @@ libgnunet_plugin_transport_udp_init (void *cls) unsigned long long enable_broadcasting_recv; char *bind4_address; char *bind6_address; - char *fancy_interval; struct GNUNET_TIME_Relative interval; struct sockaddr_in server_addrv4; struct sockaddr_in6 server_addrv6; @@ -3527,54 +3580,76 @@ libgnunet_plugin_transport_udp_init (void *cls) if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", - "PORT", &port)) + "PORT", + &port)) port = 2086; + if (port > 65535) + { + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, + "transport-udp", + "PORT", + _("must be in [0,65535]")); + return NULL; + } if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", - "ADVERTISED_PORT", &aport)) + "ADVERTISED_PORT", + &aport)) aport = port; - if (port > 65535) + if (aport > 65535) { - LOG (GNUNET_ERROR_TYPE_WARNING, - _("Given `%s' option is out of range: %llu > %u\n"), - "PORT", port, - 65535); + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, + "transport-udp", + "ADVERTISED_PORT", + _("must be in [0,65535]")); return NULL; } - /* Protocols */ if (GNUNET_YES == - GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat", "DISABLEV6")) + GNUNET_CONFIGURATION_get_value_yesno (env->cfg, + "nat", + "DISABLEV6")) enable_v6 = GNUNET_NO; else enable_v6 = GNUNET_YES; - /* Addresses */ have_bind4 = GNUNET_NO; - memset (&server_addrv4, 0, sizeof(server_addrv4)); + memset (&server_addrv4, + 0, + sizeof (server_addrv4)); if (GNUNET_YES == - GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp", - "BINDTO", &bind4_address)) + GNUNET_CONFIGURATION_get_value_string (env->cfg, + "transport-udp", + "BINDTO", + &bind4_address)) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Binding udp plugin to specific address: `%s'\n", + "Binding UDP plugin to specific address: `%s'\n", bind4_address); if (1 != inet_pton (AF_INET, bind4_address, &server_addrv4.sin_addr)) { + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, + "transport-udp", + "BINDTO", + _("must be valid IPv4 address")); GNUNET_free (bind4_address); return NULL; } have_bind4 = GNUNET_YES; } - GNUNET_free_non_null(bind4_address); + GNUNET_free_non_null (bind4_address); have_bind6 = GNUNET_NO; - memset (&server_addrv6, 0, sizeof(server_addrv6)); + memset (&server_addrv6, + 0, + sizeof (server_addrv6)); if (GNUNET_YES == - GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp", - "BINDTO6", &bind6_address)) + GNUNET_CONFIGURATION_get_value_string (env->cfg, + "transport-udp", + "BINDTO6", + &bind6_address)) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding udp plugin to specific address: `%s'\n", @@ -3583,9 +3658,10 @@ libgnunet_plugin_transport_udp_init (void *cls) bind6_address, &server_addrv6.sin6_addr)) { - LOG (GNUNET_ERROR_TYPE_ERROR, - _("Invalid IPv6 address: `%s'\n"), - bind6_address); + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, + "transport-udp", + "BINDTO6", + _("must be valid IPv6 address")); GNUNET_free (bind6_address); return NULL; } @@ -3593,40 +3669,35 @@ libgnunet_plugin_transport_udp_init (void *cls) } GNUNET_free_non_null (bind6_address); - /* Enable neighbour discovery */ enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, - "transport-udp", "BROADCAST"); + "transport-udp", + "BROADCAST"); if (enable_broadcasting == GNUNET_SYSERR) enable_broadcasting = GNUNET_NO; enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, - "transport-udp", "BROADCAST_RECEIVE"); + "transport-udp", + "BROADCAST_RECEIVE"); if (enable_broadcasting_recv == GNUNET_SYSERR) enable_broadcasting_recv = GNUNET_YES; if (GNUNET_SYSERR == - GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp", - "BROADCAST_INTERVAL", - &fancy_interval)) - { - interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10); - } - else + GNUNET_CONFIGURATION_get_value_time (env->cfg, + "transport-udp", + "BROADCAST_INTERVAL", + &interval)) { - if (GNUNET_SYSERR == - GNUNET_STRINGS_fancy_time_to_relative (fancy_interval, &interval)) - { - interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30); - } - GNUNET_free(fancy_interval); + interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + 10); } - - /* Maximum datarate */ if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", - "MAX_BPS", &udp_max_bps)) + GNUNET_CONFIGURATION_get_value_number (env->cfg, + "transport-udp", + "MAX_BPS", + &udp_max_bps)) { - udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */ + /* 50 MB/s == infinity for practical purposes */ + udp_max_bps = 1024 * 1024 * 50; } p = GNUNET_new (struct Plugin); @@ -3638,9 +3709,9 @@ libgnunet_plugin_transport_udp_init (void *cls) p->enable_broadcasting = enable_broadcasting; p->enable_broadcasting_receiving = enable_broadcasting_recv; p->env = env; - p->sessions = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); - p->defrag_ctxs = GNUNET_CONTAINER_heap_create ( - GNUNET_CONTAINER_HEAP_ORDER_MIN); + p->sessions = GNUNET_CONTAINER_multipeermap_create (16, + GNUNET_NO); + p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p); GNUNET_BANDWIDTH_tracker_init (&p->tracker, @@ -3648,15 +3719,15 @@ libgnunet_plugin_transport_udp_init (void *cls) NULL, GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps), 30); - LOG(GNUNET_ERROR_TYPE_DEBUG, - "Setting up sockets\n"); res = setup_sockets (p, (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL, (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL); - if ((res == 0) || ((p->sockv4 == NULL )&& (p->sockv6 == NULL))) + if ( (0 == res) || + ( (NULL == p->sockv4) && + (NULL == p->sockv6) ) ) { LOG (GNUNET_ERROR_TYPE_ERROR, - _("Failed to create network sockets, plugin failed\n")); + _("Failed to create UDP network sockets\n")); GNUNET_CONTAINER_multipeermap_destroy (p->sessions); GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs); GNUNET_SERVER_mst_destroy (p->mst); @@ -3665,11 +3736,12 @@ libgnunet_plugin_transport_udp_init (void *cls) } /* Setup broadcasting and receiving beacons */ - setup_broadcast (p, &server_addrv6, &server_addrv4); + setup_broadcast (p, + &server_addrv6, + &server_addrv4); api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions); api->cls = p; - api->send = NULL; api->disconnect_session = &udp_disconnect_session; api->query_keepalive_factor = &udp_query_keepalive_factor; api->disconnect_peer = &udp_disconnect; @@ -3724,12 +3796,11 @@ libgnunet_plugin_transport_udp_done (void *cls) struct GNUNET_TRANSPORT_PluginFunctions *api = cls; struct Plugin *plugin = api->cls; struct PrettyPrinterContext *cur; - struct PrettyPrinterContext *next; struct UDP_MessageWrapper *udpw; if (NULL == plugin) { - GNUNET_free(api); + GNUNET_free (api); return NULL; } stop_broadcast (plugin); @@ -3743,25 +3814,17 @@ libgnunet_plugin_transport_udp_done (void *cls) GNUNET_SCHEDULER_cancel (plugin->select_task_v6); plugin->select_task_v6 = NULL; } - - /* Closing sockets */ - if (GNUNET_YES == plugin->enable_ipv4) + if (NULL != plugin->sockv4) { - if (NULL != plugin->sockv4) - { - GNUNET_break (GNUNET_OK == - GNUNET_NETWORK_socket_close (plugin->sockv4)); - plugin->sockv4 = NULL; - } + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (plugin->sockv4)); + plugin->sockv4 = NULL; } - if (GNUNET_YES == plugin->enable_ipv6) + if (NULL != plugin->sockv6) { - if (NULL != plugin->sockv6) - { - GNUNET_break (GNUNET_OK == - GNUNET_NETWORK_socket_close (plugin->sockv6)); - plugin->sockv6 = NULL; - } + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (plugin->sockv6)); + plugin->sockv6 = NULL; } if (NULL != plugin->nat) { @@ -3771,7 +3834,8 @@ libgnunet_plugin_transport_udp_done (void *cls) if (NULL != plugin->defrag_ctxs) { GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs, - &heap_cleanup_iterator, NULL); + &heap_cleanup_iterator, + NULL); GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs); plugin->defrag_ctxs = NULL; } @@ -3780,39 +3844,32 @@ libgnunet_plugin_transport_udp_done (void *cls) GNUNET_SERVER_mst_destroy (plugin->mst); plugin->mst = NULL; } - - /* Clean up leftover messages */ - udpw = plugin->ipv4_queue_head; - while (NULL != udpw) + while (NULL != (udpw = plugin->ipv4_queue_head)) { - struct UDP_MessageWrapper *tmp = udpw->next; - dequeue (plugin, udpw); - call_continuation (udpw, GNUNET_SYSERR); - GNUNET_free(udpw); - udpw = tmp; + dequeue (plugin, + udpw); + udpw->qc (udpw->qc_cls, + udpw, + GNUNET_SYSERR); + GNUNET_free (udpw); } - udpw = plugin->ipv6_queue_head; - while (NULL != udpw) + while (NULL != (udpw = plugin->ipv6_queue_head)) { - struct UDP_MessageWrapper *tmp = udpw->next; - dequeue (plugin, udpw); - call_continuation (udpw, GNUNET_SYSERR); - GNUNET_free(udpw); - udpw = tmp; + dequeue (plugin, + udpw); + udpw->qc (udpw->qc_cls, + udpw, + GNUNET_SYSERR); + GNUNET_free (udpw); } - - /* Clean up sessions */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Cleaning up sessions\n"); GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions, - &disconnect_and_free_it, plugin); + &disconnect_and_free_it, + plugin); GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions); - next = plugin->ppc_dll_head; - for (cur = next; NULL != cur; cur = next) + while (NULL != (cur = plugin->ppc_dll_head)) { - GNUNET_break(0); - next = cur->next; + GNUNET_break (0); GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head, plugin->ppc_dll_tail, cur); diff --git a/src/transport/test_plugin_transport.c b/src/transport/test_plugin_transport.c index df7c7b23f..84c9362e9 100644 --- a/src/transport/test_plugin_transport.c +++ b/src/transport/test_plugin_transport.c @@ -342,16 +342,16 @@ test_addr_string (void *cls, if (NULL == w->addrstring) { GNUNET_break(0); - GNUNET_log(GNUNET_ERROR_TYPE_ERROR, - "Plugin cannot convert address to string!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Plugin cannot convert address to string!\n"); end_badly_now (); return; } - GNUNET_log(GNUNET_ERROR_TYPE_INFO, - "Plugin added address `%s'\n", - w->addrstring); - GNUNET_log(GNUNET_ERROR_TYPE_INFO, - "Testing address_to_string: OK\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Plugin added address `%s'\n", + w->addrstring); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Testing address_to_string: OK\n"); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Testing: string_to_address \n"); s2a = NULL; @@ -385,14 +385,16 @@ test_addr_string (void *cls, } else if (0 != memcmp (s2a, w->address->address, s2a_len)) { - GNUNET_log(GNUNET_ERROR_TYPE_ERROR, - "Plugin creates different address length when converting back and forth %i!\n", - memcmp (s2a, w->address->address, s2a_len)); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Plugin creates different address length when converting back and forth %i!\n", + memcmp (s2a, + w->address->address, + s2a_len)); } else { - GNUNET_log(GNUNET_ERROR_TYPE_INFO, - "Testing string_to_address: OK\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Testing string_to_address: OK\n"); } GNUNET_free(s2a); @@ -406,10 +408,13 @@ test_addr_string (void *cls, &address_pretty_printer_cb, w); if (GNUNET_OK != - api->check_address (api->cls, w->address->address, w->address->address_length)) + api->check_address (api->cls, + w->address->address, + w->address->address_length)) { - GNUNET_break(0); - GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Plugin refuses added address!\n"); + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Plugin refuses added address!\n"); end_badly_now (); return; } -- 2.25.1