From c0bb6897499d729673abd2f7d3ff1027c95b93fc Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 19 Mar 2010 13:22:43 +0000 Subject: [PATCH] fixing disconnect handling, making TCP plugin ready for bi-di use --- src/transport/gnunet-service-transport.c | 9 +- src/transport/plugin_transport.h | 9 +- src/transport/plugin_transport_tcp.c | 280 ++++++++++++++--------- src/transport/plugin_transport_udp.c | 9 +- 4 files changed, 188 insertions(+), 119 deletions(-) diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index a2190535c..75580ce79 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -305,11 +305,10 @@ struct TransportPlugin GNUNET_SCHEDULER_TaskIdentifier address_update_task; /** - * Set to GNUNET_YES if we need to scrap the existing - * list of "addresses" and start fresh when we receive - * the next address update from a transport. Set to - * GNUNET_NO if we should just add the new address - * to the list and wait for the commit call. + * Set to GNUNET_YES if we need to scrap the existing list of + * "addresses" and start fresh when we receive the next address + * update from a transport. Set to GNUNET_NO if we should just add + * the new address to the list and wait for the commit call. */ int rebuild; diff --git a/src/transport/plugin_transport.h b/src/transport/plugin_transport.h index 6e5e2479e..0291f9bb4 100644 --- a/src/transport/plugin_transport.h +++ b/src/transport/plugin_transport.h @@ -45,7 +45,7 @@ * @param cls closure * @param peer (claimed) identity of the other peer * @param message the message, NULL if we only care about - * learning about the delay until we should receive again + * learning about the delay until we should receive again -- FIXME! * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) * @param sender_address binary address of the sender (if observed) * @param sender_address_len number of bytes in sender_address @@ -205,8 +205,10 @@ typedef void * is "on its own" (i.e. re-use existing TCP connection)) * @param addrlen length of the address in bytes * @param force_address GNUNET_YES if the plugin MUST use the given address, - * otherwise the plugin may use other addresses or - * existing connections (if available) + * GNUNET_NO means the plugin may use any other address and + * GNUNET_SYSERR means that only reliable existing + * bi-directional connections should be used (regardless + * of address) * @param cont continuation to call once the message has * been transmitted (or if the transport is ready * for the next transmission call; or if the @@ -351,7 +353,6 @@ struct GNUNET_TRANSPORT_PluginFunctions */ GNUNET_TRANSPORT_CheckAddress check_address; - }; diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index 6786b9c34..afba750f2 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c @@ -17,13 +17,11 @@ Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ - /** * @file transport/plugin_transport_tcp.c * @brief Implementation of the TCP transport service * @author Christian Grothoff */ - #include "platform.h" #include "gnunet_hello_lib.h" #include "gnunet_connection_lib.h" @@ -185,6 +183,11 @@ struct Session */ int expecting_welcome; + /** + * Was this a connection that was inbound (we accepted)? (GNUNET_YES/GNUNET_NO) + */ + int inbound; + }; @@ -347,12 +350,17 @@ static size_t do_transmit (void *cls, size_t size, void *buf) { struct Session *session = cls; - struct PendingMessage *pm; + struct GNUNET_PeerIdentity pid; + struct Plugin *plugin; + struct PendingMessage *pos; + struct PendingMessage *hd; + struct PendingMessage *tl; + struct GNUNET_TIME_Absolute now; char *cbuf; - size_t ret; session->transmit_handle = NULL; + plugin = session->plugin; if (buf == NULL) { #if DEBUG_TCP @@ -361,63 +369,97 @@ do_transmit (void *cls, size_t size, void *buf) "Timeout trying to transmit to peer `%4s', discarding message queue.\n", GNUNET_i2s (&session->target)); #endif - /* timeout */ - while (NULL != (pm = session->pending_messages_head)) - { + /* timeout; cancel all messages that have already expired */ + hd = NULL; + tl = NULL; + ret = 0; + now = GNUNET_TIME_absolute_get (); + while ( (NULL != (pos = session->pending_messages_head)) && + (pos->timeout.value <= now.value) ) + { GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, session->pending_messages_tail, - pm); + pos); #if DEBUG_TCP GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", "Failed to transmit %u byte message to `%4s'.\n", - pm->message_size, + pos->message_size, GNUNET_i2s (&session->target)); #endif - GNUNET_STATISTICS_update (session->plugin->env->stats, - gettext_noop ("# bytes currently in TCP buffers"), - - (int64_t) pm->message_size, - GNUNET_NO); - GNUNET_STATISTICS_update (session->plugin->env->stats, - gettext_noop ("# bytes discarded by TCP (timeout)"), - pm->message_size, - GNUNET_NO); - if (pm->transmit_cont != NULL) - pm->transmit_cont (pm->transmit_cont_cls, - &session->target, GNUNET_SYSERR); - GNUNET_free (pm); + ret += pos->message_size; + GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos); } + /* do this call before callbacks (so that if callbacks destroy + session, they have a chance to cancel actions done by this + call) */ + process_pending_messages (session); + pid = session->target; + /* no do callbacks and do not use session again since + the callbacks may abort the session */ + while (NULL != (pos = hd)) + { + GNUNET_CONTAINER_DLL_remove (hd, tl, pos); + if (pos->transmit_cont != NULL) + pos->transmit_cont (pos->transmit_cont_cls, + &pid, GNUNET_SYSERR); + GNUNET_free (pos); + } + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop ("# bytes currently in TCP buffers"), + - (int64_t) ret, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop ("# bytes discarded by TCP (timeout)"), + ret, + GNUNET_NO); return 0; } + /* copy all pending messages that would fit */ ret = 0; cbuf = buf; - while (NULL != (pm = session->pending_messages_head)) + hd = NULL; + tl = NULL; + while (NULL != (pos = session->pending_messages_head)) { - if (size < pm->message_size) + if (ret + pos->message_size > size) break; - memcpy (cbuf, pm->msg, pm->message_size); - cbuf += pm->message_size; - ret += pm->message_size; - size -= pm->message_size; GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, session->pending_messages_tail, - pm); - GNUNET_STATISTICS_update (session->plugin->env->stats, - gettext_noop ("# bytes currently in TCP buffers"), - - (int64_t) pm->message_size, - GNUNET_NO); - if (pm->transmit_cont != NULL) - pm->transmit_cont (pm->transmit_cont_cls, - &session->target, GNUNET_OK); - GNUNET_free (pm); + pos); + GNUNET_assert (size >= pos->message_size); + memcpy (cbuf, pos->msg, pos->message_size); + cbuf += pos->message_size; + ret += pos->message_size; + size -= pos->message_size; + GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos); + } + /* schedule 'continuation' before callbacks so that callbacks that + cancel everything don't cause us to use a session that no longer + exists... */ + process_pending_messages (session); + pid = session->target; + /* we'll now call callbacks that may cancel the session; hence + we should not use 'session' after this point */ + while (NULL != (pos = hd)) + { + GNUNET_CONTAINER_DLL_remove (hd, tl, pos); + if (pos->transmit_cont != NULL) + pos->transmit_cont (pos->transmit_cont_cls, + &pid, GNUNET_OK); + GNUNET_free (pos); } - if (session->client != NULL) - process_pending_messages (session); + GNUNET_assert (hd == NULL); + GNUNET_assert (tl == NULL); #if DEBUG_TCP > 1 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", "Transmitting %u bytes\n", ret); #endif - GNUNET_STATISTICS_update (session->plugin->env->stats, + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop ("# bytes currently in TCP buffers"), + - (int64_t) ret, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, gettext_noop ("# bytes transmitted via TCP"), ret, GNUNET_NO); @@ -435,6 +477,7 @@ static void process_pending_messages (struct Session *session) { struct PendingMessage *pm; + GNUNET_assert (session->client != NULL); if (session->transmit_handle != NULL) return; @@ -518,36 +561,15 @@ disconnect_session (struct Session *session) &session->target, GNUNET_SYSERR); GNUNET_free (pm); } - if (GNUNET_NO == session->expecting_welcome) - { -#if DEBUG_TCP - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "tcp", - "Notifying transport service about loss of connection with `%4s'.\n", - GNUNET_i2s (&session->target)); -#endif - /* Data session that actually went past the initial handshake; - transport service may know about this one, so we need to - notify transport service about disconnect */ - // FIXME: we should have a very clear connect-disconnect - // protocol with gnunet-service-transport! - // FIXME: but this is not possible for all plugins, so what gives? - } + GNUNET_break (session->client != NULL); if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (session->plugin->env->sched, session->receive_delay_task); - if (session->client != NULL) - { - GNUNET_SERVER_receive_done (session->client, - GNUNET_SYSERR); - } + GNUNET_SERVER_receive_done (session->client, + GNUNET_SYSERR); } - if (session->client != NULL) - { - GNUNET_SERVER_client_drop (session->client); - session->client = NULL; - } + GNUNET_SERVER_client_drop (session->client); GNUNET_STATISTICS_update (session->plugin->env->stats, gettext_noop ("# TCP sessions active"), -1, @@ -579,8 +601,10 @@ disconnect_session (struct Session *session) * is "on its own" (i.e. re-use existing TCP connection)) * @param addrlen length of the address in bytes * @param force_address GNUNET_YES if the plugin MUST use the given address, - * otherwise the plugin may use other addresses or - * existing connections (if available) + * GNUNET_NO means the plugin may use any other address and + * GNUNET_SYSERR means that only reliable existing + * bi-directional connections should be used (regardless + * of address) * @param cont continuation to call once the message has * been transmitted (or if the transport is ready * for the next transmission call; or if the @@ -604,6 +628,7 @@ tcp_plugin_send (void *cls, { struct Plugin *plugin = cls; struct Session *session; + struct Session *next; struct PendingMessage *pm; struct GNUNET_CONNECTION_Handle *sa; int af; @@ -612,22 +637,43 @@ tcp_plugin_send (void *cls, gettext_noop ("# bytes TCP was asked to transmit"), msgbuf_size, GNUNET_NO); - session = plugin->sessions; /* FIXME: we could do this a cheaper with a hash table where we could restrict the iteration to entries that match the target peer... */ - while ( (session != NULL) && - ( (session->client == NULL) || - (0 != memcmp (target, - &session->target, - sizeof (struct GNUNET_PeerIdentity))) || - ( (GNUNET_YES == force_address) && - (addr != NULL) && - ( (addrlen != session->connect_alen) || - (0 != memcmp (session->connect_addr, - addr, - addrlen)) ) ) ) ) - session = session->next; + next = plugin->sessions; + while (NULL != (session = next)) + { + next = session->next; + if (session->client == NULL) + continue; + if (0 != memcmp (target, + &session->target, + sizeof (struct GNUNET_PeerIdentity))) + continue; + if (GNUNET_SYSERR == force_address) + { + if (session->expecting_welcome == GNUNET_NO) + break; /* established and reliable (TCP!) */ + else + continue; /* not established */ + } + if (GNUNET_NO == force_address) + break; + GNUNET_break (GNUNET_YES == force_address); + if (addr == NULL) + { + GNUNET_break (0); + break; + } + if (session->inbound == GNUNET_YES) + continue; + if (addrlen != session->connect_alen) + continue; + if (0 == memcmp (session->connect_addr, + addr, + addrlen)) + break; + } if ( (session == NULL) && (addr == NULL) ) { @@ -646,9 +692,13 @@ tcp_plugin_send (void *cls, if (session == NULL) { if (sizeof (struct sockaddr_in) == addrlen) - af = AF_INET; + { + af = AF_INET; + } else if (sizeof (struct sockaddr_in6) == addrlen) - af = AF_INET6; + { + af = AF_INET6; + } else { GNUNET_break_op (0); @@ -672,7 +722,6 @@ tcp_plugin_send (void *cls, GNUNET_NO); return -1; } - #if DEBUG_TCP GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", @@ -692,7 +741,6 @@ tcp_plugin_send (void *cls, } GNUNET_assert (session != NULL); GNUNET_assert (session->client != NULL); - GNUNET_STATISTICS_update (plugin->env->stats, gettext_noop ("# bytes currently in TCP buffers"), msgbuf_size, @@ -740,10 +788,12 @@ tcp_plugin_send (void *cls, * to be cancelled */ static void -tcp_plugin_disconnect (void *cls, const struct GNUNET_PeerIdentity *target) +tcp_plugin_disconnect (void *cls, + const struct GNUNET_PeerIdentity *target) { struct Plugin *plugin = cls; struct Session *session; + struct Session *next; struct PendingMessage *pm; #if DEBUG_TCP @@ -752,44 +802,53 @@ tcp_plugin_disconnect (void *cls, const struct GNUNET_PeerIdentity *target) "Asked to cancel session with `%4s'\n", GNUNET_i2s (target)); #endif - session = plugin->sessions; - while (NULL != session) + next = plugin->sessions; + while (NULL != (session = next)) { - if (0 == memcmp (target, + next = session->next; + if (0 != memcmp (target, &session->target, sizeof (struct GNUNET_PeerIdentity))) + continue; + pm = session->pending_messages_head; + while (pm != NULL) { - pm = session->pending_messages_head; - while (pm != NULL) - { - pm->transmit_cont = NULL; - pm->transmit_cont_cls = NULL; - pm = pm->next; - } - if (session->client != NULL) - { - GNUNET_SERVER_client_drop (session->client); - session->client = NULL; - } - /* rest of the clean-up of the session will be done as part of - disconnect_notify which should be triggered any time now - (or which may be triggering this call in the first place) */ + pm->transmit_cont = NULL; + pm->transmit_cont_cls = NULL; + pm = pm->next; } - session = session->next; + disconnect_session (session); } } +/** + * Context for address to string conversion. + */ struct PrettyPrinterContext { + /** + * Function to call with the result. + */ GNUNET_TRANSPORT_AddressStringCallback asc; + + /** + * Clsoure for 'asc'. + */ void *asc_cls; + + /** + * Port to add after the IP address. + */ uint16_t port; }; /** * Append our port and forward the result. + * + * @param cls the 'struct PrettyPrinterContext*' + * @param hostname hostname part of the address */ static void append_port (void *cls, const char *hostname) @@ -873,6 +932,8 @@ tcp_plugin_address_pretty_printer (void *cls, * our listen port or our advertised port). If it is * neither, we return one of these two ports at random. * + * @param plugin global variables + * @param in_port port number to check * @return either in_port or a more plausible port */ static uint16_t @@ -890,7 +951,7 @@ check_port (struct Plugin *plugin, uint16_t in_port) * Another peer has suggested an address for this peer and transport * plugin. Check that this could be a valid address. * - * @param cls closure + * @param cls closure, our 'struct Plugin*' * @param addr pointer to the address * @param addrlen length of addr * @return GNUNET_OK if this is a plausible address for this peer @@ -967,6 +1028,7 @@ handle_tcp_welcome (void *cls, GNUNET_SERVER_client_keep (client); session = create_session (plugin, &wm->clientIdentity, client); + session->inbound = GNUNET_YES; if (GNUNET_OK == GNUNET_SERVER_client_get_address (client, &vaddr, &alen)) { @@ -1010,6 +1072,9 @@ handle_tcp_welcome (void *cls, /** * Task to signal the server that we can continue * receiving from the TCP client now. + * + * @param cls the 'struct Session*' + * @param tc task context (unused) */ static void delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) @@ -1056,7 +1121,6 @@ handle_tcp_data (void *cls, } if ( (NULL == session) || (GNUNET_NO != session->expecting_welcome)) { - GNUNET_break_op (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } @@ -1264,10 +1328,10 @@ libgnunet_plugin_transport_tcp_init (void *cls) if (aport != bport) GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "tcp", - _ - ("TCP transport advertises itself as being on port %llu\n"), + _("TCP transport advertises itself as being on port %llu\n"), aport); - GNUNET_SERVER_disconnect_notify (plugin->server, &disconnect_notify, + GNUNET_SERVER_disconnect_notify (plugin->server, + &disconnect_notify, plugin); /* FIXME: do the two calls below periodically again and not just once (since the info we get might change...) */ diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index 739716b17..bdec32490 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -186,8 +186,11 @@ udp_transport_server_stop (void *cls) * @param timeout when should we time out (give up) if we can not transmit? * @param addr the addr to send the message to, needs to be a sockaddr for us * @param addrlen the len of addr - * @param force_address not used, we had better have an address to send to - * because we are stateless!! + * @param force_address GNUNET_YES if the plugin MUST use the given address, + * GNUNET_NO means the plugin may use any other address and + * GNUNET_SYSERR means that only reliable existing + * bi-directional connections should be used (regardless + * of address) * @param cont continuation to call once the message has * been transmitted (or if the transport is ready * for the next transmission call; or if the @@ -222,6 +225,8 @@ udp_plugin_send (void *cls, #endif return -1; /* Can never send if we don't have an address!! */ } + if (force_address == GNUNET_SYSERR) + return -1; /* never reliable */ /* Build the message to be sent */ message = GNUNET_malloc (sizeof (struct UDPMessage) + msgbuf_size); -- 2.25.1