X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Fplugin_transport_udp.c;h=940bed254f220887257959352ac401bf2afa595d;hb=fc293a0b11c9819e95045dde70e46c160d1cfd60;hp=b14e14357c48df790014df80154277063e44f713;hpb=8b7ba67e4e201aaa44ef60b51396e29dc379d1fd;p=oweals%2Fgnunet.git diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index b14e14357..940bed254 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -93,13 +93,13 @@ struct Session */ struct GNUNET_PeerIdentity target; + struct FragmentationContext * frag_ctx; + /** * Address of the other peer */ const struct sockaddr *sock_addr; - size_t addrlen; - /** * Desired delay for next sending we send to other peer */ @@ -110,6 +110,11 @@ struct Session */ struct GNUNET_TIME_Absolute flow_delay_from_other_peer; + /** + * Session timeout task + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + /** * expected delay for ACKs */ @@ -117,7 +122,8 @@ struct Session struct GNUNET_ATS_Information ats; - struct FragmentationContext * frag_ctx; + size_t addrlen; + unsigned int rc; @@ -147,12 +153,12 @@ struct SourceInformation */ const void *arg; + struct Session *session; /** * Number of bytes in source address. */ size_t args; - struct Session *session; }; @@ -171,12 +177,13 @@ struct FindReceiveContext */ const struct sockaddr *addr; + struct Session *session; + /** * Number of bytes in 'addr'. */ socklen_t addr_len; - struct Session *session; }; @@ -229,9 +236,6 @@ struct FragmentationContext struct GNUNET_FRAGMENT_Context * frag; struct Session * session; - struct GNUNET_TIME_Absolute timeout; - - /** * Function to call upon completion of the transmission. */ @@ -242,6 +246,8 @@ struct FragmentationContext */ void *cont_cls; + struct GNUNET_TIME_Absolute timeout; + size_t bytes_to_send; }; @@ -252,9 +258,6 @@ struct UDPMessageWrapper struct UDPMessageWrapper *prev; struct UDPMessageWrapper *next; char *udp; - size_t msg_size; - - struct GNUNET_TIME_Absolute timeout; /** * Function to call upon completion of the transmission. @@ -268,6 +271,9 @@ struct UDPMessageWrapper struct FragmentationContext *frag_ctx; + size_t msg_size; + + struct GNUNET_TIME_Absolute timeout; }; @@ -293,6 +299,11 @@ struct UDP_ACK_Message }; +/** + * Encapsulation of all of the state of the plugin. + */ +struct Plugin * plugin; + /** * We have been notified that our readset has something to read. We don't @@ -305,6 +316,7 @@ struct UDP_ACK_Message static void udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + /** * We have been notified that our readset has something to read. We don't * know which socket needs to be read, so we have to check each one @@ -316,6 +328,27 @@ udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); static void udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + +/** + * Start session timeout + */ +static void +start_session_timeout (struct Session *s); + +/** + * Increment session timeout due to activity + */ +static void +reschedule_session_timeout (struct Session *s); + +/** + * Cancel timeout + */ +static void +stop_session_timeout (struct Session *s); + + + /** * Function called for a quick conversion of the binary address to * a numeric address. Note that the caller must not free the @@ -380,13 +413,13 @@ udp_address_to_string (void *cls, const void *addr, size_t addrlen) * If the function returns GNUNET_SYSERR, its contents are undefined. * @return GNUNET_OK on success, GNUNET_SYSERR on failure */ -int +static int udp_string_to_address (void *cls, const char *addr, uint16_t addrlen, void **buf, size_t *added) { struct sockaddr_storage socket_address; - - if ((NULL == addr) || (addrlen == 0)) + + if ((NULL == addr) || (0 == addrlen)) { GNUNET_break (0); return GNUNET_SYSERR; @@ -402,37 +435,40 @@ udp_string_to_address (void *cls, const char *addr, uint16_t addrlen, return GNUNET_SYSERR; } - int ret = GNUNET_STRINGS_to_address_ip (addr, strlen (addr), - &socket_address); - - if (ret != GNUNET_OK) + if (GNUNET_OK != GNUNET_STRINGS_to_address_ip (addr, strlen (addr), + &socket_address)) { return GNUNET_SYSERR; } - if (socket_address.ss_family == AF_INET) + switch (socket_address.ss_family) { - struct IPv4UdpAddress *u4; - struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address; - u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress)); - u4->ipv4_addr = in4->sin_addr.s_addr; - u4->u4_port = in4->sin_port; - *buf = u4; - *added = sizeof (struct IPv4UdpAddress); - return GNUNET_OK; - } - else if (socket_address.ss_family == AF_INET6) - { - struct IPv6UdpAddress *u6; - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address; - u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress)); - u6->ipv6_addr = in6->sin6_addr; - u6->u6_port = in6->sin6_port; - *buf = u6; - *added = sizeof (struct IPv6UdpAddress); - return GNUNET_OK; + case AF_INET: + { + struct IPv4UdpAddress *u4; + struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address; + u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress)); + u4->ipv4_addr = in4->sin_addr.s_addr; + u4->u4_port = in4->sin_port; + *buf = u4; + *added = sizeof (struct IPv4UdpAddress); + return GNUNET_OK; + } + case AF_INET6: + { + struct IPv6UdpAddress *u6; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address; + u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress)); + u6->ipv6_addr = in6->sin6_addr; + u6->u6_port = in6->sin6_port; + *buf = u6; + *added = sizeof (struct IPv6UdpAddress); + return GNUNET_OK; + } + default: + GNUNET_break (0); + return GNUNET_SYSERR; } - return GNUNET_SYSERR; } @@ -539,10 +575,11 @@ udp_plugin_address_pretty_printer (void *cls, const char *type, GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc); } + static void call_continuation (struct UDPMessageWrapper *udpw, int result) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling continuation for %u byte message to `%s' with result %s\n", udpw->msg_size, GNUNET_i2s (&udpw->session->target), (GNUNET_OK == result) ? "OK" : "SYSERR"); @@ -553,6 +590,7 @@ call_continuation (struct UDPMessageWrapper *udpw, int result) } + /** * Check if the given port is plausible (must be either our listen * port or our advertised port). If it is neither, we return @@ -571,7 +609,6 @@ check_port (struct Plugin *plugin, uint16_t in_port) } - /** * Function that will be called to check if a binary address for this * plugin is well-formed and corresponds to an address for THIS peer @@ -649,18 +686,15 @@ free_session (struct Session *s) /** - * Destroy a session, plugin is being unloaded. + * Functions with this signature are called whenever we need + * to close a session due to a disconnect or failure to + * establish a connection. * - * @param cls unused - * @param key hash of public key of target peer - * @param value a 'struct PeerSession*' to clean up - * @return GNUNET_OK (continue to iterate) + * @param s session to close down */ -static int -disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) +static void +disconnect_session (struct Session *s) { - struct Plugin *plugin = cls; - struct Session *s = value; struct UDPMessageWrapper *udpw; struct UDPMessageWrapper *next; @@ -670,6 +704,7 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) s, GNUNET_i2s (&s->target), GNUNET_a2s (s->sock_addr, s->addrlen)); + stop_session_timeout (s); next = plugin->ipv4_queue_head; while (NULL != (udpw = next)) { @@ -700,7 +735,7 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) if (NULL != s->frag_ctx->cont) { s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling continuation for fragemented message to `%s' with result SYSERR\n", GNUNET_i2s (&s->target)); } @@ -718,6 +753,20 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) s->in_destroy = GNUNET_YES; else free_session (s); +} + +/** + * Destroy a session, plugin is being unloaded. + * + * @param cls unused + * @param key hash of public key of target peer + * @param value a 'struct PeerSession*' to clean up + * @return GNUNET_OK (continue to iterate) + */ +static int +disconnect_and_free_it (void *cls, const struct GNUNET_HashCode * key, void *value) +{ + disconnect_session(value); return GNUNET_OK; } @@ -742,6 +791,7 @@ udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target) GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin); } + static struct Session * create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, const void *addr, size_t addrlen, @@ -796,21 +846,18 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, GNUNET_break_op (0); return NULL; } - s->addrlen = len; s->target = *target; s->sock_addr = (const struct sockaddr *) &s[1]; - s->flow_delay_for_other_peer = GNUNET_TIME_relative_get_zero(); - s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero(); s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS; - + start_session_timeout(s); return s; } static int session_cmp_it (void *cls, - const GNUNET_HashCode * key, + const struct GNUNET_HashCode * key, void *value) { struct SessionCompareContext * cctx = cls; @@ -819,12 +866,9 @@ session_cmp_it (void *cls, socklen_t s_addrlen = s->addrlen; -#if VERBOSE_UDP - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing address %s <-> %s\n", + LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing address %s <-> %s\n", udp_address_to_string (NULL, (void *) address->address, address->address_length), GNUNET_a2s (s->sock_addr, s->addrlen)); -#endif - if ((address->address_length == sizeof (struct IPv4UdpAddress)) && (s_addrlen == sizeof (struct sockaddr_in))) { @@ -852,8 +896,6 @@ session_cmp_it (void *cls, return GNUNET_NO; } } - - return GNUNET_YES; } @@ -909,15 +951,14 @@ udp_plugin_get_session (void *cls, struct SessionCompareContext cctx; cctx.addr = address; cctx.res = NULL; -#if VERBOSE_UDP - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length)); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Looking for existing session for peer `%s' `%s' \n", + GNUNET_i2s (&address->peer), + udp_address_to_string(NULL, address->address, address->address_length)); GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx); if (cctx.res != NULL) { -#if VERBOSE_UDP - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res); return cctx.res; } @@ -927,13 +968,11 @@ udp_plugin_get_session (void *cls, address->address, address->address_length, NULL, NULL); -#if VERBOSE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating new session %p for peer `%s' address `%s'\n", - s, - GNUNET_i2s(&address->peer), - udp_address_to_string(NULL,address->address,address->address_length)); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new session %p for peer `%s' address `%s'\n", + s, + GNUNET_i2s(&address->peer), + udp_address_to_string(NULL,address->address,address->address_length)); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (plugin->sessions, &s->target.hashPubKey, @@ -995,13 +1034,10 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg) struct Plugin *plugin = frag_ctx->plugin; struct UDPMessageWrapper * udpw; struct Session *s; - size_t msg_len = ntohs (msg->size); -#if VERBOSE_UDP - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper)); -#endif - + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper)); udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len); udpw->session = frag_ctx->session; s = udpw->session; @@ -1013,10 +1049,8 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg) udpw->timeout = frag_ctx->timeout; udpw->frag_ctx = frag_ctx; memcpy (udpw->udp, msg, msg_len); - enqueue (plugin, udpw); - if (s->addrlen == sizeof (struct sockaddr_in)) { if (plugin->with_v4_ws == GNUNET_NO) @@ -1033,7 +1067,6 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg) plugin->with_v4_ws = GNUNET_YES; } } - else if (s->addrlen == sizeof (struct sockaddr_in6)) { if (plugin->with_v6_ws == GNUNET_NO) @@ -1090,7 +1123,6 @@ udp_plugin_send (void *cls, { struct Plugin *plugin = cls; size_t mlen = msgbuf_size + sizeof (struct UDPMessage); - struct UDPMessageWrapper * udpw; struct UDPMessage *udp; char mbuf[mlen]; @@ -1099,29 +1131,24 @@ udp_plugin_send (void *cls, if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL)) return GNUNET_SYSERR; - - if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL)) - return GNUNET_SYSERR; - - + if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL)) + return GNUNET_SYSERR; if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { GNUNET_break (0); return GNUNET_SYSERR; } - if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s)) { GNUNET_break (0); return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP transmits %u-byte message to `%s' using address `%s'\n", - mlen, - GNUNET_i2s (&s->target), - GNUNET_a2s(s->sock_addr, s->addrlen)); - + mlen, + GNUNET_i2s (&s->target), + GNUNET_a2s(s->sock_addr, s->addrlen)); + /* Message */ udp = (struct UDPMessage *) mbuf; udp->header.size = htons (mlen); @@ -1129,6 +1156,7 @@ udp_plugin_send (void *cls, udp->reserved = htonl (0); udp->sender = *plugin->env->my_identity; + reschedule_session_timeout(s); if (mlen <= UDP_MTU) { udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen); @@ -1186,7 +1214,6 @@ udp_plugin_send (void *cls, plugin->with_v4_ws = GNUNET_YES; } } - else if (s->addrlen == sizeof (struct sockaddr_in6)) { if (plugin->with_v6_ws == GNUNET_NO) @@ -1263,7 +1290,7 @@ udp_nat_port_map_callback (void *cls, int add_remove, * @param client the 'struct SourceInformation' * @param hdr the actual message */ -static void +static int process_inbound_tokenized_messages (void *cls, void *client, const struct GNUNET_MessageHeader *hdr) { @@ -1274,21 +1301,22 @@ process_inbound_tokenized_messages (void *cls, void *client, GNUNET_assert (si->session != NULL); if (GNUNET_YES == si->session->in_destroy) - return; + return GNUNET_OK; /* setup ATS */ ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE); ats[0].value = htonl (1); ats[1] = si->session->ats; GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED); - delay = plugin->env->receive (plugin->env->cls, - &si->sender, - hdr, - (const struct GNUNET_ATS_Information *) &ats, 2, - NULL, - si->arg, - si->args); + &si->sender, + hdr, + (const struct GNUNET_ATS_Information *) &ats, 2, + si->session, + si->arg, + si->args); si->session->flow_delay_for_other_peer = delay; + reschedule_session_timeout(si->session); + return GNUNET_OK; } @@ -1421,16 +1449,19 @@ fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg) rc->src_addr, rc->addr_len); } + struct LookupContext { const struct sockaddr * addr; - size_t addrlen; struct Session *res; + + size_t addrlen; }; + static int -lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value) +lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value) { struct LookupContext *l_ctx = cls; struct Session * s = value; @@ -1444,6 +1475,7 @@ lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value) return GNUNET_YES; } + /** * Transmit an acknowledgement. * @@ -1455,7 +1487,6 @@ static void ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) { struct DefragContext *rc = cls; - size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size); struct UDP_ACK_Message *udp_ack; uint32_t delay = 0; @@ -1485,14 +1516,10 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) sockaddr_in6)), delay); udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize); - udpw->cont = NULL; - udpw->cont_cls = NULL; - udpw->frag_ctx = NULL; udpw->msg_size = msize; udpw->session = s; - udpw->timeout = GNUNET_TIME_absolute_get_forever(); + udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; udpw->udp = (char *)&udpw[1]; - udp_ack = (struct UDP_ACK_Message *) udpw->udp; udp_ack->header.size = htons ((uint16_t) msize); udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); @@ -1504,10 +1531,11 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) } -static void read_process_msg (struct Plugin *plugin, - const struct GNUNET_MessageHeader *msg, - char *addr, - socklen_t fromlen) +static void +read_process_msg (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + const char *addr, + socklen_t fromlen) { if (ntohs (msg->size) < sizeof (struct UDPMessage)) { @@ -1516,18 +1544,19 @@ static void read_process_msg (struct Plugin *plugin, } process_udp_message (plugin, (const struct UDPMessage *) msg, (const struct sockaddr *) addr, fromlen); - return; } -static void read_process_ack (struct Plugin *plugin, - const struct GNUNET_MessageHeader *msg, - char *addr, - socklen_t fromlen) + +static void +read_process_ack (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + char *addr, + socklen_t fromlen) { const struct GNUNET_MessageHeader *ack; const struct UDP_ACK_Message *udp_ack; struct LookupContext l_ctx; - struct Session *s = NULL; + struct Session *s; struct GNUNET_TIME_Relative flow_delay; if (ntohs (msg->size) < @@ -1536,9 +1565,7 @@ static void read_process_ack (struct Plugin *plugin, GNUNET_break_op (0); return; } - udp_ack = (const struct UDP_ACK_Message *) msg; - l_ctx.addr = (const struct sockaddr *) addr; l_ctx.addrlen = fromlen; l_ctx.res = NULL; @@ -1566,10 +1593,10 @@ static void read_process_ack (struct Plugin *plugin, if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack)) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "UDP processes %u-byte acknowledgement from `%s' at `%s'\n", - (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender), - GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "UDP processes %u-byte acknowledgement from `%s' at `%s'\n", + (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); return; } @@ -1579,12 +1606,12 @@ static void read_process_ack (struct Plugin *plugin, GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag); - struct UDPMessageWrapper * udpw = NULL; - struct UDPMessageWrapper * tmp = NULL; + struct UDPMessageWrapper * udpw; + struct UDPMessageWrapper * tmp; if (s->addrlen == sizeof (struct sockaddr_in6)) { udpw = plugin->ipv6_queue_head; - while (udpw!= NULL) + while (NULL != udpw) { tmp = udpw->next; if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx)) @@ -1612,7 +1639,7 @@ static void read_process_ack (struct Plugin *plugin, if (s->frag_ctx->cont != NULL) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling continuation for fragmented message to `%s' with result %s\n", GNUNET_i2s (&s->target), "OK"); s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK); @@ -1620,19 +1647,19 @@ static void read_process_ack (struct Plugin *plugin, GNUNET_free (s->frag_ctx); s->frag_ctx = NULL; - return; } -static void read_process_fragment (struct Plugin *plugin, - const struct GNUNET_MessageHeader *msg, - char *addr, - socklen_t fromlen) + +static void +read_process_fragment (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + char *addr, + socklen_t fromlen) { struct DefragContext *d_ctx; struct GNUNET_TIME_Absolute now; struct FindReceiveContext frc; - frc.rc = NULL; frc.addr = (const struct sockaddr *) addr; frc.addr_len = fromlen; @@ -1663,15 +1690,17 @@ static void read_process_fragment (struct Plugin *plugin, GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx, (GNUNET_CONTAINER_HeapCostType) now.abs_value); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n", - (unsigned int) ntohs (msg->size), - GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Created new defragmentation context for %u-byte fragment from `%s'\n", + (unsigned int) ntohs (msg->size), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); } else { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n", - (unsigned int) ntohs (msg->size), - GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Found existing defragmentation context for %u-byte fragment from `%s'\n", + (unsigned int) ntohs (msg->size), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); } if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg)) @@ -1692,6 +1721,7 @@ static void read_process_fragment (struct Plugin *plugin, } } + /** * Read and process a message from the given socket. * @@ -1711,8 +1741,15 @@ udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) memset (&addr, 0, sizeof (addr)); size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf), (struct sockaddr *) &addr, &fromlen); - - if (size < sizeof (struct GNUNET_MessageHeader)) +#if MINGW + /* On SOCK_DGRAM UDP sockets recvfrom might fail with a + * WSAECONNRESET error to indicate that previous sendto() (???) + * on this socket has failed. + */ + if ( (-1 == size) && (ECONNRESET == errno) ) + return; +#endif + if ( (-1 == size) || (size < sizeof (struct GNUNET_MessageHeader))) { GNUNET_break_op (0); return; @@ -1760,9 +1797,8 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) ssize_t sent; size_t slen; struct GNUNET_TIME_Absolute max; - struct GNUNET_TIME_Absolute ; - struct UDPMessageWrapper *udpw = NULL; + static int network_down_error; if (sock == plugin->sockv4) { @@ -1791,16 +1827,18 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) call_continuation(udpw, GNUNET_SYSERR); if (udpw->frag_ctx != NULL) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n", - GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Fragmented message for peer `%s' with size %u timed out\n", + GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send); udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag); GNUNET_free (udpw->frag_ctx); udpw->session->frag_ctx = NULL; } else { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' with size %u timed out\n", - GNUNET_i2s(&udpw->session->target), udpw->msg_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message for peer `%s' with size %u timed out\n", + GNUNET_i2s(&udpw->session->target), udpw->msg_size); } if (sock == plugin->sockv4) @@ -1822,16 +1860,18 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) if (delta.rel_value == 0) { /* this message is not delayed */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is not delayed \n", - GNUNET_i2s(&udpw->session->target), udpw->msg_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message for peer `%s' (%u bytes) is not delayed \n", + GNUNET_i2s(&udpw->session->target), udpw->msg_size); break; } else { /* this message is delayed, try next */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is delayed for %llu \n", - GNUNET_i2s(&udpw->session->target), udpw->msg_size, - delta); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message for peer `%s' (%u bytes) is delayed for %llu \n", + GNUNET_i2s(&udpw->session->target), udpw->msg_size, + delta); udpw = udpw->next; } } @@ -1847,10 +1887,46 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) if (GNUNET_SYSERR == sent) { - LOG (GNUNET_ERROR_TYPE_ERROR, + const struct GNUNET_ATS_Information type = plugin->env->get_address_type + (plugin->env->cls,sa, slen); + + if (((GNUNET_ATS_NET_LAN == ntohl(type.value)) || (GNUNET_ATS_NET_WAN == ntohl(type.value))) && + ((ENETUNREACH == errno) || (ENETDOWN == errno))) + { + if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in))) + { + /* IPv4: "Network unreachable" or "Network down" + * + * This indicates we do not have connectivity + */ + LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, + _("UDP could not transmit message to `%s': " + "Network seems down, please check your network configuration\n"), + GNUNET_a2s (sa, slen)); + } + if ((network_down_error == GNUNET_NO) && (slen == sizeof (struct sockaddr_in6))) + { + /* IPv6: "Network unreachable" or "Network down" + * + * This indicates that this system is IPv6 enabled, but does not + * have a valid global IPv6 address assigned or we do not have + * connectivity + */ + + LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, + _("UDP could not transmit message to `%s': " + "Please check your network configuration and disable IPv6 if your " + "connection does not have a global IPv6 address\n"), + GNUNET_a2s (sa, slen)); + } + } + else + { + LOG (GNUNET_ERROR_TYPE_WARNING, "UDP could not transmit %u-byte message to `%s': `%s'\n", (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), STRERROR (errno)); + } call_continuation(udpw, GNUNET_SYSERR); } else @@ -1860,6 +1936,7 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent, (sent < 0) ? STRERROR (errno) : "ok"); call_continuation(udpw, GNUNET_OK); + network_down_error = GNUNET_NO; } if (sock == plugin->sockv4) @@ -1872,6 +1949,7 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) return sent; } + /** * We have been notified that our readset has something to read. We don't * know which socket needs to be read, so we have to check each one @@ -1984,7 +2062,7 @@ setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0); if (NULL == plugin->sockv6) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n"); + LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n"); plugin->enable_ipv6 = GNUNET_NO; } else @@ -2081,7 +2159,7 @@ setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct } if (sockets_created == 0) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n")); + LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n")); plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, @@ -2121,6 +2199,79 @@ setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct return sockets_created; } +/** + * Session was idle, so disconnect it + */ +static void +session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GNUNET_assert (NULL != cls); + struct Session *s = cls; + + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Session %p was idle for %llu ms, disconnecting\n", + s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); + /* call session destroy function */ + disconnect_session(s); +} + + +/** + * Start session timeout + */ +static void +start_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + s); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout for session %p set to %llu ms\n", + s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); +} + + +/** + * Increment session timeout due to activity + */ +static void +reschedule_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); + + GNUNET_SCHEDULER_cancel (s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + s); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout rescheduled for session %p set to %llu ms\n", + s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); +} + + +/** + * Cancel timeout + */ +static void +stop_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + + if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task) + { + GNUNET_SCHEDULER_cancel (s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout stopped for session %p canceled\n", + s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); + } +} + + /** * The exported method. Makes the core api available via a global and @@ -2134,8 +2285,7 @@ libgnunet_plugin_transport_udp_init (void *cls) { struct GNUNET_TRANSPORT_PluginEnvironment *env = cls; struct GNUNET_TRANSPORT_PluginFunctions *api; - struct Plugin *plugin; - + struct Plugin *p; unsigned long long port; unsigned long long aport; unsigned long long broadcast; @@ -2143,11 +2293,10 @@ libgnunet_plugin_transport_udp_init (void *cls) unsigned long long enable_v6; char * bind4_address; char * bind6_address; + char * fancy_interval; struct GNUNET_TIME_Relative interval; - struct sockaddr_in serverAddrv4; struct sockaddr_in6 serverAddrv6; - int res; if (NULL == env->receive) @@ -2191,7 +2340,6 @@ libgnunet_plugin_transport_udp_init (void *cls) else enable_v6 = GNUNET_YES; - /* Addresses */ memset (&serverAddrv6, 0, sizeof (serverAddrv6)); memset (&serverAddrv4, 0, sizeof (serverAddrv4)); @@ -2228,18 +2376,25 @@ libgnunet_plugin_transport_udp_init (void *cls) } } - /* Enable neighbour discovery */ broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp", "BROADCAST"); if (broadcast == GNUNET_SYSERR) broadcast = GNUNET_NO; - if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-udp", - "BROADCAST_INTERVAL", &interval)) + if (GNUNET_SYSERR == GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp", + "BROADCAST_INTERVAL", &fancy_interval)) { interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10); } + else + { + if (GNUNET_SYSERR == GNUNET_STRINGS_fancy_time_to_relative(fancy_interval, &interval)) + { + interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30); + } + GNUNET_free (fancy_interval); + } /* Maximum datarate */ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp", @@ -2248,23 +2403,23 @@ libgnunet_plugin_transport_udp_init (void *cls) udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */ } - plugin = GNUNET_malloc (sizeof (struct Plugin)); + p = GNUNET_malloc (sizeof (struct Plugin)); api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions)); - GNUNET_BANDWIDTH_tracker_init (&plugin->tracker, + GNUNET_BANDWIDTH_tracker_init (&p->tracker, GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30); - - - plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10); - plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin); - plugin->port = port; - plugin->aport = aport; - plugin->broadcast_interval = interval; - plugin->enable_ipv6 = enable_v6; - plugin->env = env; - - api->cls = plugin; + p->sessions = GNUNET_CONTAINER_multihashmap_create (10); + p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p); + p->port = port; + p->aport = aport; + p->broadcast_interval = interval; + p->enable_ipv6 = enable_v6; + p->env = env; + + plugin = p; + + api->cls = p; api->send = NULL; api->disconnect = &udp_disconnect; api->address_pretty_printer = &udp_plugin_address_pretty_printer; @@ -2275,11 +2430,11 @@ libgnunet_plugin_transport_udp_init (void *cls) api->send = &udp_plugin_send; LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n"); - res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4); - if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL))) + res = setup_sockets (p, &serverAddrv6, &serverAddrv4); + if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL))) { LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n"); - GNUNET_free (plugin); + GNUNET_free (p); GNUNET_free (api); return NULL; } @@ -2287,7 +2442,7 @@ libgnunet_plugin_transport_udp_init (void *cls) if (broadcast == GNUNET_YES) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n"); - setup_broadcast (plugin, &serverAddrv6, &serverAddrv4); + setup_broadcast (p, &serverAddrv6, &serverAddrv4); } GNUNET_free_non_null (bind4_address);