X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Fplugin_transport_udp.c;h=9bfe9f0c2c61ade8c9c0162fd100e232bd3edbab;hb=6c471eeb15e27f8226492b4860a3c2acb94c5f25;hp=ae710c73cf1b81806b5a6f88b8a66b76a9a4ccb6;hpb=28b2cc084550fd228907b7637adb3c54d16b6b7f;p=oweals%2Fgnunet.git diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index ae710c73c..9bfe9f0c2 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -93,13 +93,13 @@ struct Session */ struct GNUNET_PeerIdentity target; + struct FragmentationContext * frag_ctx; + /** * Address of the other peer */ const struct sockaddr *sock_addr; - size_t addrlen; - /** * Desired delay for next sending we send to other peer */ @@ -110,6 +110,11 @@ struct Session */ struct GNUNET_TIME_Absolute flow_delay_from_other_peer; + /** + * Session timeout task + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + /** * expected delay for ACKs */ @@ -117,7 +122,8 @@ struct Session struct GNUNET_ATS_Information ats; - struct FragmentationContext * frag_ctx; + size_t addrlen; + unsigned int rc; @@ -147,12 +153,12 @@ struct SourceInformation */ const void *arg; + struct Session *session; /** * Number of bytes in source address. */ size_t args; - struct Session *session; }; @@ -171,12 +177,13 @@ struct FindReceiveContext */ const struct sockaddr *addr; + struct Session *session; + /** * Number of bytes in 'addr'. */ socklen_t addr_len; - struct Session *session; }; @@ -229,9 +236,6 @@ struct FragmentationContext struct GNUNET_FRAGMENT_Context * frag; struct Session * session; - struct GNUNET_TIME_Absolute timeout; - - /** * Function to call upon completion of the transmission. */ @@ -242,6 +246,8 @@ struct FragmentationContext */ void *cont_cls; + struct GNUNET_TIME_Absolute timeout; + size_t bytes_to_send; }; @@ -252,9 +258,6 @@ struct UDPMessageWrapper struct UDPMessageWrapper *prev; struct UDPMessageWrapper *next; char *udp; - size_t msg_size; - - struct GNUNET_TIME_Absolute timeout; /** * Function to call upon completion of the transmission. @@ -268,6 +271,9 @@ struct UDPMessageWrapper struct FragmentationContext *frag_ctx; + size_t msg_size; + + struct GNUNET_TIME_Absolute timeout; }; @@ -293,6 +299,11 @@ struct UDP_ACK_Message }; +/** + * Encapsulation of all of the state of the plugin. + */ +struct Plugin * plugin; + /** * We have been notified that our readset has something to read. We don't @@ -305,6 +316,7 @@ struct UDP_ACK_Message static void udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + /** * We have been notified that our readset has something to read. We don't * know which socket needs to be read, so we have to check each one @@ -316,6 +328,27 @@ udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); static void udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + +/** + * Start session timeout + */ +static void +start_session_timeout (struct Session *s); + +/** + * Increment session timeout due to activity + */ +static void +reschedule_session_timeout (struct Session *s); + +/** + * Cancel timeout + */ +static void +stop_session_timeout (struct Session *s); + + + /** * Function called for a quick conversion of the binary address to * a numeric address. Note that the caller must not free the @@ -380,13 +413,13 @@ udp_address_to_string (void *cls, const void *addr, size_t addrlen) * If the function returns GNUNET_SYSERR, its contents are undefined. * @return GNUNET_OK on success, GNUNET_SYSERR on failure */ -int +static int udp_string_to_address (void *cls, const char *addr, uint16_t addrlen, void **buf, size_t *added) { struct sockaddr_storage socket_address; - - if ((NULL == addr) || (addrlen == 0)) + + if ((NULL == addr) || (0 == addrlen)) { GNUNET_break (0); return GNUNET_SYSERR; @@ -402,37 +435,40 @@ udp_string_to_address (void *cls, const char *addr, uint16_t addrlen, return GNUNET_SYSERR; } - int ret = GNUNET_STRINGS_to_address_ip (addr, strlen (addr), - &socket_address); - - if (ret != GNUNET_OK) + if (GNUNET_OK != GNUNET_STRINGS_to_address_ip (addr, strlen (addr), + &socket_address)) { return GNUNET_SYSERR; } - if (socket_address.ss_family == AF_INET) - { - struct IPv4UdpAddress *u4; - struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address; - u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress)); - u4->ipv4_addr = in4->sin_addr.s_addr; - u4->u4_port = in4->sin_port; - *buf = u4; - *added = sizeof (struct IPv4UdpAddress); - return GNUNET_OK; - } - else if (socket_address.ss_family == AF_INET6) + switch (socket_address.ss_family) { - struct IPv6UdpAddress *u6; - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address; - u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress)); - u6->ipv6_addr = in6->sin6_addr; - u6->u6_port = in6->sin6_port; - *buf = u6; - *added = sizeof (struct IPv6UdpAddress); - return GNUNET_OK; + case AF_INET: + { + struct IPv4UdpAddress *u4; + struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address; + u4 = GNUNET_malloc (sizeof (struct IPv4UdpAddress)); + u4->ipv4_addr = in4->sin_addr.s_addr; + u4->u4_port = in4->sin_port; + *buf = u4; + *added = sizeof (struct IPv4UdpAddress); + return GNUNET_OK; + } + case AF_INET6: + { + struct IPv6UdpAddress *u6; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address; + u6 = GNUNET_malloc (sizeof (struct IPv6UdpAddress)); + u6->ipv6_addr = in6->sin6_addr; + u6->u6_port = in6->sin6_port; + *buf = u6; + *added = sizeof (struct IPv6UdpAddress); + return GNUNET_OK; + } + default: + GNUNET_break (0); + return GNUNET_SYSERR; } - return GNUNET_SYSERR; } @@ -539,10 +575,11 @@ udp_plugin_address_pretty_printer (void *cls, const char *type, GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc); } + static void call_continuation (struct UDPMessageWrapper *udpw, int result) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling continuation for %u byte message to `%s' with result %s\n", udpw->msg_size, GNUNET_i2s (&udpw->session->target), (GNUNET_OK == result) ? "OK" : "SYSERR"); @@ -553,6 +590,7 @@ call_continuation (struct UDPMessageWrapper *udpw, int result) } + /** * Check if the given port is plausible (must be either our listen * port or our advertised port). If it is neither, we return @@ -571,7 +609,6 @@ check_port (struct Plugin *plugin, uint16_t in_port) } - /** * Function that will be called to check if a binary address for this * plugin is well-formed and corresponds to an address for THIS peer @@ -649,18 +686,15 @@ free_session (struct Session *s) /** - * Destroy a session, plugin is being unloaded. + * Functions with this signature are called whenever we need + * to close a session due to a disconnect or failure to + * establish a connection. * - * @param cls unused - * @param key hash of public key of target peer - * @param value a 'struct PeerSession*' to clean up - * @return GNUNET_OK (continue to iterate) + * @param s session to close down */ -static int -disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) +static void +disconnect_session (struct Session *s) { - struct Plugin *plugin = cls; - struct Session *s = value; struct UDPMessageWrapper *udpw; struct UDPMessageWrapper *next; @@ -670,6 +704,7 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) s, GNUNET_i2s (&s->target), GNUNET_a2s (s->sock_addr, s->addrlen)); + stop_session_timeout(s); next = plugin->ipv4_queue_head; while (NULL != (udpw = next)) { @@ -700,7 +735,7 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) if (NULL != s->frag_ctx->cont) { s->frag_ctx->cont (s->frag_ctx->cont_cls, &s->target, GNUNET_SYSERR); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling continuation for fragemented message to `%s' with result SYSERR\n", GNUNET_i2s (&s->target)); } @@ -718,6 +753,20 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) s->in_destroy = GNUNET_YES; else free_session (s); +} + +/** + * Destroy a session, plugin is being unloaded. + * + * @param cls unused + * @param key hash of public key of target peer + * @param value a 'struct PeerSession*' to clean up + * @return GNUNET_OK (continue to iterate) + */ +static int +disconnect_and_free_it (void *cls, const struct GNUNET_HashCode * key, void *value) +{ + disconnect_session(value); return GNUNET_OK; } @@ -742,6 +791,7 @@ udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target) GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin); } + static struct Session * create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, const void *addr, size_t addrlen, @@ -796,21 +846,18 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, GNUNET_break_op (0); return NULL; } - s->addrlen = len; s->target = *target; s->sock_addr = (const struct sockaddr *) &s[1]; - s->flow_delay_for_other_peer = GNUNET_TIME_relative_get_zero(); - s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero(); s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS; - + start_session_timeout(s); return s; } static int session_cmp_it (void *cls, - const GNUNET_HashCode * key, + const struct GNUNET_HashCode * key, void *value) { struct SessionCompareContext * cctx = cls; @@ -819,12 +866,9 @@ session_cmp_it (void *cls, socklen_t s_addrlen = s->addrlen; -#if VERBOSE_UDP - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing address %s <-> %s\n", + LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing address %s <-> %s\n", udp_address_to_string (NULL, (void *) address->address, address->address_length), GNUNET_a2s (s->sock_addr, s->addrlen)); -#endif - if ((address->address_length == sizeof (struct IPv4UdpAddress)) && (s_addrlen == sizeof (struct sockaddr_in))) { @@ -852,8 +896,6 @@ session_cmp_it (void *cls, return GNUNET_NO; } } - - return GNUNET_YES; } @@ -909,15 +951,14 @@ udp_plugin_get_session (void *cls, struct SessionCompareContext cctx; cctx.addr = address; cctx.res = NULL; -#if VERBOSE_UDP - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length)); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Looking for existing session for peer `%s' `%s' \n", + GNUNET_i2s (&address->peer), + udp_address_to_string(NULL, address->address, address->address_length)); GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx); if (cctx.res != NULL) { -#if VERBOSE_UDP - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res); return cctx.res; } @@ -927,13 +968,11 @@ udp_plugin_get_session (void *cls, address->address, address->address_length, NULL, NULL); -#if VERBOSE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating new session %p for peer `%s' address `%s'\n", - s, - GNUNET_i2s(&address->peer), - udp_address_to_string(NULL,address->address,address->address_length)); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new session %p for peer `%s' address `%s'\n", + s, + GNUNET_i2s(&address->peer), + udp_address_to_string(NULL,address->address,address->address_length)); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (plugin->sessions, &s->target.hashPubKey, @@ -995,13 +1034,10 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg) struct Plugin *plugin = frag_ctx->plugin; struct UDPMessageWrapper * udpw; struct Session *s; - size_t msg_len = ntohs (msg->size); -#if VERBOSE_UDP - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper)); -#endif - + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper)); udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len); udpw->session = frag_ctx->session; s = udpw->session; @@ -1013,10 +1049,8 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg) udpw->timeout = frag_ctx->timeout; udpw->frag_ctx = frag_ctx; memcpy (udpw->udp, msg, msg_len); - enqueue (plugin, udpw); - if (s->addrlen == sizeof (struct sockaddr_in)) { if (plugin->with_v4_ws == GNUNET_NO) @@ -1033,7 +1067,6 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg) plugin->with_v4_ws = GNUNET_YES; } } - else if (s->addrlen == sizeof (struct sockaddr_in6)) { if (plugin->with_v6_ws == GNUNET_NO) @@ -1090,7 +1123,6 @@ udp_plugin_send (void *cls, { struct Plugin *plugin = cls; size_t mlen = msgbuf_size + sizeof (struct UDPMessage); - struct UDPMessageWrapper * udpw; struct UDPMessage *udp; char mbuf[mlen]; @@ -1099,29 +1131,24 @@ udp_plugin_send (void *cls, if ((s->addrlen == sizeof (struct sockaddr_in6)) && (plugin->sockv6 == NULL)) return GNUNET_SYSERR; - - if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL)) - return GNUNET_SYSERR; - - + if ((s->addrlen == sizeof (struct sockaddr_in)) && (plugin->sockv4 == NULL)) + return GNUNET_SYSERR; if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { GNUNET_break (0); return GNUNET_SYSERR; } - if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s)) { GNUNET_break (0); return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP transmits %u-byte message to `%s' using address `%s'\n", - mlen, - GNUNET_i2s (&s->target), - GNUNET_a2s(s->sock_addr, s->addrlen)); - + mlen, + GNUNET_i2s (&s->target), + GNUNET_a2s(s->sock_addr, s->addrlen)); + /* Message */ udp = (struct UDPMessage *) mbuf; udp->header.size = htons (mlen); @@ -1129,6 +1156,7 @@ udp_plugin_send (void *cls, udp->reserved = htonl (0); udp->sender = *plugin->env->my_identity; + reschedule_session_timeout(s); if (mlen <= UDP_MTU) { udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen); @@ -1186,7 +1214,6 @@ udp_plugin_send (void *cls, plugin->with_v4_ws = GNUNET_YES; } } - else if (s->addrlen == sizeof (struct sockaddr_in6)) { if (plugin->with_v6_ws == GNUNET_NO) @@ -1263,7 +1290,7 @@ udp_nat_port_map_callback (void *cls, int add_remove, * @param client the 'struct SourceInformation' * @param hdr the actual message */ -static void +static int process_inbound_tokenized_messages (void *cls, void *client, const struct GNUNET_MessageHeader *hdr) { @@ -1274,21 +1301,22 @@ process_inbound_tokenized_messages (void *cls, void *client, GNUNET_assert (si->session != NULL); if (GNUNET_YES == si->session->in_destroy) - return; + return GNUNET_OK; /* setup ATS */ ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE); ats[0].value = htonl (1); ats[1] = si->session->ats; GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED); - delay = plugin->env->receive (plugin->env->cls, - &si->sender, - hdr, - (const struct GNUNET_ATS_Information *) &ats, 2, - NULL, - si->arg, - si->args); + &si->sender, + hdr, + (const struct GNUNET_ATS_Information *) &ats, 2, + NULL, + si->arg, + si->args); si->session->flow_delay_for_other_peer = delay; + reschedule_session_timeout(si->session); + return GNUNET_OK; } @@ -1421,16 +1449,19 @@ fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg) rc->src_addr, rc->addr_len); } + struct LookupContext { const struct sockaddr * addr; - size_t addrlen; struct Session *res; + + size_t addrlen; }; + static int -lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value) +lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value) { struct LookupContext *l_ctx = cls; struct Session * s = value; @@ -1444,6 +1475,7 @@ lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value) return GNUNET_YES; } + /** * Transmit an acknowledgement. * @@ -1455,7 +1487,6 @@ static void ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) { struct DefragContext *rc = cls; - size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size); struct UDP_ACK_Message *udp_ack; uint32_t delay = 0; @@ -1485,14 +1516,10 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) sockaddr_in6)), delay); udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize); - udpw->cont = NULL; - udpw->cont_cls = NULL; - udpw->frag_ctx = NULL; udpw->msg_size = msize; udpw->session = s; - udpw->timeout = GNUNET_TIME_absolute_get_forever(); + udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; udpw->udp = (char *)&udpw[1]; - udp_ack = (struct UDP_ACK_Message *) udpw->udp; udp_ack->header.size = htons ((uint16_t) msize); udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); @@ -1504,10 +1531,11 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) } -static void read_process_msg (struct Plugin *plugin, - const struct GNUNET_MessageHeader *msg, - char *addr, - socklen_t fromlen) +static void +read_process_msg (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + const char *addr, + socklen_t fromlen) { if (ntohs (msg->size) < sizeof (struct UDPMessage)) { @@ -1516,18 +1544,19 @@ static void read_process_msg (struct Plugin *plugin, } process_udp_message (plugin, (const struct UDPMessage *) msg, (const struct sockaddr *) addr, fromlen); - return; } -static void read_process_ack (struct Plugin *plugin, - const struct GNUNET_MessageHeader *msg, - char *addr, - socklen_t fromlen) + +static void +read_process_ack (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + char *addr, + socklen_t fromlen) { const struct GNUNET_MessageHeader *ack; const struct UDP_ACK_Message *udp_ack; struct LookupContext l_ctx; - struct Session *s = NULL; + struct Session *s; struct GNUNET_TIME_Relative flow_delay; if (ntohs (msg->size) < @@ -1536,9 +1565,7 @@ static void read_process_ack (struct Plugin *plugin, GNUNET_break_op (0); return; } - udp_ack = (const struct UDP_ACK_Message *) msg; - l_ctx.addr = (const struct sockaddr *) addr; l_ctx.addrlen = fromlen; l_ctx.res = NULL; @@ -1566,10 +1593,10 @@ static void read_process_ack (struct Plugin *plugin, if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack)) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "UDP processes %u-byte acknowledgement from `%s' at `%s'\n", - (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender), - GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "UDP processes %u-byte acknowledgement from `%s' at `%s'\n", + (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); return; } @@ -1579,12 +1606,12 @@ static void read_process_ack (struct Plugin *plugin, GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag); - struct UDPMessageWrapper * udpw = NULL; - struct UDPMessageWrapper * tmp = NULL; + struct UDPMessageWrapper * udpw; + struct UDPMessageWrapper * tmp; if (s->addrlen == sizeof (struct sockaddr_in6)) { udpw = plugin->ipv6_queue_head; - while (udpw!= NULL) + while (NULL != udpw) { tmp = udpw->next; if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx)) @@ -1612,7 +1639,7 @@ static void read_process_ack (struct Plugin *plugin, if (s->frag_ctx->cont != NULL) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling continuation for fragmented message to `%s' with result %s\n", GNUNET_i2s (&s->target), "OK"); s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK); @@ -1620,19 +1647,19 @@ static void read_process_ack (struct Plugin *plugin, GNUNET_free (s->frag_ctx); s->frag_ctx = NULL; - return; } -static void read_process_fragment (struct Plugin *plugin, - const struct GNUNET_MessageHeader *msg, - char *addr, - socklen_t fromlen) + +static void +read_process_fragment (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + char *addr, + socklen_t fromlen) { struct DefragContext *d_ctx; struct GNUNET_TIME_Absolute now; struct FindReceiveContext frc; - frc.rc = NULL; frc.addr = (const struct sockaddr *) addr; frc.addr_len = fromlen; @@ -1663,15 +1690,17 @@ static void read_process_fragment (struct Plugin *plugin, GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx, (GNUNET_CONTAINER_HeapCostType) now.abs_value); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n", - (unsigned int) ntohs (msg->size), - GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Created new defragmentation context for %u-byte fragment from `%s'\n", + (unsigned int) ntohs (msg->size), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); } else { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n", - (unsigned int) ntohs (msg->size), - GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Found existing defragmentation context for %u-byte fragment from `%s'\n", + (unsigned int) ntohs (msg->size), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); } if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg)) @@ -1692,6 +1721,7 @@ static void read_process_fragment (struct Plugin *plugin, } } + /** * Read and process a message from the given socket. * @@ -1711,8 +1741,15 @@ udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) memset (&addr, 0, sizeof (addr)); size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf), (struct sockaddr *) &addr, &fromlen); - - if (size < sizeof (struct GNUNET_MessageHeader)) +#if MINGW + /* On SOCK_DGRAM UDP sockets recvfrom might fail with a + * WSAECONNRESET error to indicate that previous sendto() (???) + * on this socket has failed. + */ + if ( (-1 == size) && (ECONNRESET == errno) ) + return; +#endif + if ( (-1 == size) || (size < sizeof (struct GNUNET_MessageHeader))) { GNUNET_break_op (0); return; @@ -1760,8 +1797,6 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) ssize_t sent; size_t slen; struct GNUNET_TIME_Absolute max; - struct GNUNET_TIME_Absolute ; - struct UDPMessageWrapper *udpw = NULL; if (sock == plugin->sockv4) @@ -1791,16 +1826,18 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) call_continuation(udpw, GNUNET_SYSERR); if (udpw->frag_ctx != NULL) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n", - GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Fragmented message for peer `%s' with size %u timed out\n", + GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send); udpw->session->last_expected_delay = GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag); GNUNET_free (udpw->frag_ctx); udpw->session->frag_ctx = NULL; } else { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' with size %u timed out\n", - GNUNET_i2s(&udpw->session->target), udpw->msg_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message for peer `%s' with size %u timed out\n", + GNUNET_i2s(&udpw->session->target), udpw->msg_size); } if (sock == plugin->sockv4) @@ -1822,16 +1859,18 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) if (delta.rel_value == 0) { /* this message is not delayed */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is not delayed \n", - GNUNET_i2s(&udpw->session->target), udpw->msg_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message for peer `%s' (%u bytes) is not delayed \n", + GNUNET_i2s(&udpw->session->target), udpw->msg_size); break; } else { /* this message is delayed, try next */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is delayed for %llu \n", - GNUNET_i2s(&udpw->session->target), udpw->msg_size, - delta); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message for peer `%s' (%u bytes) is delayed for %llu \n", + GNUNET_i2s(&udpw->session->target), udpw->msg_size, + delta); udpw = udpw->next; } } @@ -1859,9 +1898,9 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) * have a valid global IPv6 address assigned */ LOG (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, - _("UDP could not message to `%s': `%s'\n, " \ - "Please check your network configuration and disable IPv6 if your\n" \ - "connection does not have a global IPv6 address"), + _("UDP could not message to `%s': `%s'. " + "Please check your network configuration and disable IPv6 if your " + "connection does not have a global IPv6 address\n"), GNUNET_a2s (sa, slen), STRERROR (errno)); } @@ -1893,6 +1932,7 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) return sent; } + /** * We have been notified that our readset has something to read. We don't * know which socket needs to be read, so we have to check each one @@ -2005,7 +2045,7 @@ setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0); if (NULL == plugin->sockv6) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n"); + LOG (GNUNET_ERROR_TYPE_WARNING, "Disabling IPv6 since it is not supported on this system!\n"); plugin->enable_ipv6 = GNUNET_NO; } else @@ -2102,7 +2142,7 @@ setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct } if (sockets_created == 0) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n")); + LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UDP sockets\n")); plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, @@ -2142,6 +2182,82 @@ setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct return sockets_created; } +/** + * Session was idle, so disconnect it + */ +static void +session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GNUNET_assert (NULL != cls); + struct Session *s = cls; + + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %llu, disconnecting\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); + + /* call session destroy function */ + disconnect_session(s); + +} + +/** + * Start session timeout + */ +static void +start_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); + + s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + s); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p set to %llu\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); +} + +/** + * Increment session timeout due to activity + */ +static void +reschedule_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); + + GNUNET_SCHEDULER_cancel (s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + s); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p set to %llu\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); +} + +/** + * Cancel timeout + */ +static void +stop_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + + if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task) + { + GNUNET_SCHEDULER_cancel (s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p canceled\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p was not active\n", + s); + } +} /** * The exported method. Makes the core api available via a global and @@ -2155,8 +2271,7 @@ libgnunet_plugin_transport_udp_init (void *cls) { struct GNUNET_TRANSPORT_PluginEnvironment *env = cls; struct GNUNET_TRANSPORT_PluginFunctions *api; - struct Plugin *plugin; - + struct Plugin *p; unsigned long long port; unsigned long long aport; unsigned long long broadcast; @@ -2165,10 +2280,8 @@ libgnunet_plugin_transport_udp_init (void *cls) char * bind4_address; char * bind6_address; struct GNUNET_TIME_Relative interval; - struct sockaddr_in serverAddrv4; struct sockaddr_in6 serverAddrv6; - int res; if (NULL == env->receive) @@ -2212,7 +2325,6 @@ libgnunet_plugin_transport_udp_init (void *cls) else enable_v6 = GNUNET_YES; - /* Addresses */ memset (&serverAddrv6, 0, sizeof (serverAddrv6)); memset (&serverAddrv4, 0, sizeof (serverAddrv4)); @@ -2249,7 +2361,6 @@ libgnunet_plugin_transport_udp_init (void *cls) } } - /* Enable neighbour discovery */ broadcast = GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "transport-udp", "BROADCAST"); @@ -2269,23 +2380,23 @@ libgnunet_plugin_transport_udp_init (void *cls) udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */ } - plugin = GNUNET_malloc (sizeof (struct Plugin)); + p = GNUNET_malloc (sizeof (struct Plugin)); api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions)); - GNUNET_BANDWIDTH_tracker_init (&plugin->tracker, + GNUNET_BANDWIDTH_tracker_init (&p->tracker, GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30); - - - plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10); - plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin); - plugin->port = port; - plugin->aport = aport; - plugin->broadcast_interval = interval; - plugin->enable_ipv6 = enable_v6; - plugin->env = env; - - api->cls = plugin; + p->sessions = GNUNET_CONTAINER_multihashmap_create (10); + p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p); + p->port = port; + p->aport = aport; + p->broadcast_interval = interval; + p->enable_ipv6 = enable_v6; + p->env = env; + + plugin = p; + + api->cls = p; api->send = NULL; api->disconnect = &udp_disconnect; api->address_pretty_printer = &udp_plugin_address_pretty_printer; @@ -2296,11 +2407,11 @@ libgnunet_plugin_transport_udp_init (void *cls) api->send = &udp_plugin_send; LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n"); - res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4); - if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL))) + res = setup_sockets (p, &serverAddrv6, &serverAddrv4); + if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL))) { LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n"); - GNUNET_free (plugin); + GNUNET_free (p); GNUNET_free (api); return NULL; } @@ -2308,7 +2419,7 @@ libgnunet_plugin_transport_udp_init (void *cls) if (broadcast == GNUNET_YES) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n"); - setup_broadcast (plugin, &serverAddrv6, &serverAddrv4); + setup_broadcast (p, &serverAddrv6, &serverAddrv4); } GNUNET_free_non_null (bind4_address);