From 8e7b3d117600e84a4625149204e8d6e1d6472946 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 23 Jun 2014 11:40:41 +0000 Subject: [PATCH] adding missing monitoring callbacks for TCP --- src/transport/plugin_transport_tcp.c | 213 ++++++++++++++++++--------- 1 file changed, 143 insertions(+), 70 deletions(-) diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index f3791c138..01c3a0b1a 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c @@ -820,10 +820,10 @@ tcp_disconnect_session (void *cls, struct Plugin *plugin = cls; struct PendingMessage *pm; - LOG(GNUNET_ERROR_TYPE_DEBUG, - "Disconnecting session of peer `%s' address `%s'\n", - GNUNET_i2s (&session->target), - tcp_address_to_string (NULL, + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Disconnecting session of peer `%s' address `%s'\n", + GNUNET_i2s (&session->target), + tcp_address_to_string (NULL, session->address->address, session->address->address_length)); @@ -834,9 +834,10 @@ tcp_disconnect_session (void *cls, session->timeout = GNUNET_TIME_UNIT_ZERO_ABS; } - if (GNUNET_YES - == GNUNET_CONTAINER_multipeermap_remove (plugin->sessionmap, - &session->target, session)) + if (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (plugin->sessionmap, + &session->target, + session)) { GNUNET_STATISTICS_update (session->plugin->env->stats, gettext_noop ("# TCP sessions active"), -1, GNUNET_NO); @@ -859,9 +860,12 @@ tcp_disconnect_session (void *cls, session->transmit_handle = NULL; } plugin->env->unregister_quota_notification (plugin->env->cls, - &session->target, PLUGIN_NAME, session); + &session->target, + PLUGIN_NAME, + session); session->plugin->env->session_end (session->plugin->env->cls, - session->address, session); + session->address, + session); if (GNUNET_SCHEDULER_NO_TASK != session->nat_connection_timeout) { @@ -898,6 +902,12 @@ tcp_disconnect_session (void *cls, 0); GNUNET_free (pm); } + GNUNET_assert (0 == session->msgs_in_queue); + GNUNET_assert (0 == session->bytes_in_queue); + notify_session_monitor (session->plugin, + session, + GNUNET_TRANSPORT_SS_UP); + if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK ) { GNUNET_SCHEDULER_cancel (session->receive_delay_task); @@ -1008,9 +1018,9 @@ create_session (struct Plugin *plugin, else GNUNET_assert(NULL == client); - LOG(GNUNET_ERROR_TYPE_DEBUG, - "Creating new session for peer `%4s'\n", - GNUNET_i2s (&address->peer)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new session for peer `%4s'\n", + GNUNET_i2s (&address->peer)); session = GNUNET_new (struct Session); session->last_activity = GNUNET_TIME_absolute_get (); session->plugin = plugin; @@ -1037,17 +1047,30 @@ create_session (struct Plugin *plugin, pm); session->msgs_in_queue++; session->bytes_in_queue += pm->message_size; + session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + session); if (GNUNET_YES != is_nat) { GNUNET_STATISTICS_update (plugin->env->stats, - gettext_noop ("# TCP sessions active"), 1, GNUNET_NO); + gettext_noop ("# TCP sessions active"), + 1, + GNUNET_NO); + notify_session_monitor (session->plugin, + session, + GNUNET_TRANSPORT_SS_UP); + } + else + { + notify_session_monitor (session->plugin, + session, + GNUNET_TRANSPORT_SS_HANDSHAKE); } plugin->env->register_quota_notification (plugin->env->cls, - &address->peer, PLUGIN_NAME, session); - session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &session_timeout, - session); + &address->peer, + PLUGIN_NAME, + session); return session; } @@ -1130,13 +1153,19 @@ do_transmit (void *cls, size_t size, void *buf) &pid, GNUNET_SYSERR, pos->message_size, 0); - GNUNET_free(pos); + GNUNET_free (pos); } GNUNET_STATISTICS_update (plugin->env->stats, - gettext_noop ("# bytes currently in TCP buffers"), -(int64_t) ret, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, gettext_noop - ("# bytes discarded by TCP (timeout)"), ret, GNUNET_NO); + gettext_noop ("# bytes currently in TCP buffers"), -(int64_t) ret, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop ("# bytes discarded by TCP (timeout)"), + ret, + GNUNET_NO); + if (0 < ret) + notify_session_monitor (session->plugin, + session, + GNUNET_TRANSPORT_SS_UP); return 0; } /* copy all pending messages that would fit */ @@ -1167,6 +1196,9 @@ do_transmit (void *cls, size_t size, void *buf) size -= pos->message_size; GNUNET_CONTAINER_DLL_insert_tail (hd, tl, pos); } + notify_session_monitor (session->plugin, + session, + GNUNET_TRANSPORT_SS_UP); /* schedule 'continuation' before callbacks so that callbacks that * cancel everything don't cause us to use a session that no longer * exists... */ @@ -1260,6 +1292,7 @@ session_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value) return GNUNET_YES; } + /** * Check that the given session is known to the plugin and * is in one of our maps. @@ -1320,9 +1353,14 @@ find_session (struct Plugin *plugin, struct Session *session) * and does NOT mean that the message was not transmitted (DV) */ static ssize_t -tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf, - size_t msgbuf_size, unsigned int priority, struct GNUNET_TIME_Relative to, - GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) +tcp_plugin_send (void *cls, + struct Session *session, + const char *msgbuf, + size_t msgbuf_size, + unsigned int priority, + struct GNUNET_TIME_Relative to, + GNUNET_TRANSPORT_TransmitContinuation cont, + void *cont_cls) { struct Plugin * plugin = cls; struct PendingMessage *pm; @@ -1364,34 +1402,41 @@ tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf, GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head, session->pending_messages_tail, pm); + notify_session_monitor (session->plugin, + session, + GNUNET_TRANSPORT_SS_UP); session->msgs_in_queue++; session->bytes_in_queue += pm->message_size; process_pending_messages (session); return msgbuf_size; } - else if (GNUNET_YES - == GNUNET_CONTAINER_multipeermap_contains_value (plugin->nat_wait_conns, - &session->target, session)) + else if (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_contains_value (plugin->nat_wait_conns, + &session->target, + session)) { - LOG(GNUNET_ERROR_TYPE_DEBUG, - "This NAT WAIT session for peer `%s' is not yet ready!\n", - GNUNET_i2s (&session->target)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "This NAT WAIT session for peer `%s' is not yet ready!\n", + GNUNET_i2s (&session->target)); GNUNET_STATISTICS_update (plugin->env->stats, - gettext_noop ("# bytes currently in TCP buffers"), msgbuf_size, - GNUNET_NO); - + gettext_noop ("# bytes currently in TCP buffers"), msgbuf_size, + GNUNET_NO); /* append pm to pending_messages list */ GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head, session->pending_messages_tail, pm); session->msgs_in_queue++; session->bytes_in_queue += pm->message_size; + notify_session_monitor (session->plugin, + session, + GNUNET_TRANSPORT_SS_HANDSHAKE); return msgbuf_size; } else { LOG(GNUNET_ERROR_TYPE_ERROR, - "Invalid session %p\n", session); + "Invalid session %p\n", + session); if (NULL != cont) cont (cont_cls, &session->target, @@ -1445,6 +1490,7 @@ session_lookup_it (void *cls, return GNUNET_NO; } + /** * Task cleaning up a NAT connection attempt after timeout * @@ -1452,7 +1498,8 @@ session_lookup_it (void *cls, * @param tc scheduler context (unused) */ static void -nat_connect_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +nat_connect_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Session *session = cls; @@ -1464,9 +1511,11 @@ nat_connect_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) tcp_disconnect_session (session->plugin, session); } + static void tcp_plugin_update_session_timeout (void *cls, - const struct GNUNET_PeerIdentity *peer, struct Session *session) + const struct GNUNET_PeerIdentity *peer, + struct Session *session) { struct Plugin *plugin = cls; @@ -1493,10 +1542,12 @@ delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_SERVER_receive_done (session->client, GNUNET_OK); } -static void tcp_plugin_update_inbound_delay (void *cls, - const struct GNUNET_PeerIdentity *peer, - struct Session *session, - struct GNUNET_TIME_Relative delay) + +static void +tcp_plugin_update_inbound_delay (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct Session *session, + struct GNUNET_TIME_Relative delay) { if (GNUNET_SCHEDULER_NO_TASK == session->receive_delay_task) return; @@ -1520,7 +1571,8 @@ static void tcp_plugin_update_inbound_delay (void *cls, * @return the session if the address is valid, NULL otherwise */ static struct Session * -tcp_plugin_get_session (void *cls, const struct GNUNET_HELLO_Address *address) +tcp_plugin_get_session (void *cls, + const struct GNUNET_HELLO_Address *address) { struct Plugin *plugin = cls; struct Session *session = NULL; @@ -1709,9 +1761,11 @@ tcp_plugin_get_session (void *cls, const struct GNUNET_HELLO_Address *address) return session; } + static int -session_disconnect_it (void *cls, const struct GNUNET_PeerIdentity *key, - void *value) +session_disconnect_it (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { struct Plugin *plugin = cls; struct Session *session = value; @@ -1722,6 +1776,7 @@ session_disconnect_it (void *cls, const struct GNUNET_PeerIdentity *key, return GNUNET_YES; } + /** * Function that can be called to force a disconnect from the * specified neighbour. This should also cancel all previously @@ -1739,19 +1794,25 @@ session_disconnect_it (void *cls, const struct GNUNET_PeerIdentity *key, * to be cancelled */ static void -tcp_plugin_disconnect (void *cls, const struct GNUNET_PeerIdentity *target) +tcp_plugin_disconnect (void *cls, + const struct GNUNET_PeerIdentity *target) { struct Plugin *plugin = cls; LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s'\n", GNUNET_i2s (target)); - GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessionmap, target, - &session_disconnect_it, plugin); - GNUNET_CONTAINER_multipeermap_get_multiple (plugin->nat_wait_conns, target, - &session_disconnect_it, plugin); + GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessionmap, + target, + &session_disconnect_it, + plugin); + GNUNET_CONTAINER_multipeermap_get_multiple (plugin->nat_wait_conns, + target, + &session_disconnect_it, + plugin); } + /** * Running pretty printers: head */ @@ -2052,8 +2113,9 @@ tcp_plugin_check_address (void *cls, const void *addr, size_t addrlen) * @param message the actual message */ static void -handle_tcp_nat_probe (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_tcp_nat_probe (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { struct Plugin *plugin = cls; struct Session *session; @@ -2170,8 +2232,9 @@ handle_tcp_nat_probe (void *cls, struct GNUNET_SERVER_Client *client, * @param message the actual message */ static void -handle_tcp_welcome (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_tcp_welcome (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { struct Plugin *plugin = cls; const struct WelcomeMessage *wm = (const struct WelcomeMessage *) message; @@ -2308,8 +2371,9 @@ handle_tcp_welcome (void *cls, struct GNUNET_SERVER_Client *client, * @param message the actual message */ static void -handle_tcp_data (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_tcp_data (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { struct Plugin *plugin = cls; struct Session *session; @@ -2373,13 +2437,18 @@ handle_tcp_data (void *cls, struct GNUNET_SERVER_Client *client, distance.value = htonl ((uint32_t) session->ats_address_network_type); GNUNET_break(session->ats_address_network_type != GNUNET_ATS_NET_UNSPECIFIED); - GNUNET_assert( - GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap, - &session->target, session)); - - delay = plugin->env->receive (plugin->env->cls, session->address, session, message); - plugin->env->update_address_metrics (plugin->env->cls, session->address, - session, &distance, 1); + GNUNET_assert(GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap, + &session->target, + session)); + + delay = plugin->env->receive (plugin->env->cls, + session->address, + session, + message); + plugin->env->update_address_metrics (plugin->env->cls, + session->address, + session, + &distance, 1); reschedule_session_timeout (session); if (0 == delay.rel_value_us) { @@ -2397,6 +2466,7 @@ handle_tcp_data (void *cls, struct GNUNET_SERVER_Client *client, } } + /** * Functions with this signature are called whenever a peer * is disconnected on the network level. @@ -2415,11 +2485,11 @@ disconnect_notify (void *cls, struct GNUNET_SERVER_Client *client) session = lookup_session_by_client (plugin, client); if (NULL == session) return; /* unknown, nothing to do */ - LOG(GNUNET_ERROR_TYPE_DEBUG, - "Destroying session of `%4s' with %s due to network-level disconnect.\n", - GNUNET_i2s (&session->target), - tcp_address_to_string (session->plugin, session->address->address, - session->address->address_length)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Destroying session of `%4s' with %s due to network-level disconnect.\n", + GNUNET_i2s (&session->target), + tcp_address_to_string (session->plugin, session->address->address, + session->address->address_length)); if (plugin->cur_connections == plugin->max_connections) GNUNET_SERVER_resume (plugin->server); /* Resume server */ @@ -2430,10 +2500,13 @@ disconnect_notify (void *cls, struct GNUNET_SERVER_Client *client) plugin->cur_connections--; GNUNET_STATISTICS_update (session->plugin->env->stats, gettext_noop - ("# network-level TCP disconnect events"), 1, GNUNET_NO); + ("# network-level TCP disconnect events"), + 1, + GNUNET_NO); tcp_disconnect_session (plugin, session); } + /** * We can now send a probe message, copy into buffer to really send. * @@ -2464,7 +2537,7 @@ notify_send_probe (void *cls, GNUNET_assert(size >= sizeof(tcp_probe_ctx->message)); memcpy (buf, &tcp_probe_ctx->message, sizeof(tcp_probe_ctx->message)); GNUNET_SERVER_connect_socket (tcp_probe_ctx->plugin->server, - tcp_probe_ctx->sock); + tcp_probe_ctx->sock); ret = sizeof(tcp_probe_ctx->message); GNUNET_free(tcp_probe_ctx); return ret; -- 2.25.1