From a856620f5547c9cc2b8f5a2319b2b44bcc44c733 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Thu, 10 Nov 2011 17:24:42 +0000 Subject: [PATCH] improved fast_reconnect improved continuation management (no peer references) but - ats address update and destroying sessions is not working correctly... --- .../gnunet-service-transport_neighbours.c | 239 ++++++++++++++---- 1 file changed, 186 insertions(+), 53 deletions(-) diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c index c9e8e1624..d62e4249a 100644 --- a/src/transport/gnunet-service-transport_neighbours.c +++ b/src/transport/gnunet-service-transport_neighbours.c @@ -59,6 +59,7 @@ #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3) +#define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15) @@ -451,13 +452,6 @@ reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) #if DEBUG_TRANSPORT #endif - /* This jut a temporary debug message to check if a the value - * SETUP_CONNECTION_TIMEOUT was choosen to small for slow machines - */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Information for developers: Connection to peer `%s' %s failed in state `%s', resetting connection attempt \n", - GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), - print_state (n->state)); GNUNET_STATISTICS_update (GST_stats, gettext_noop @@ -468,7 +462,7 @@ reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) n->state = S_NOT_CONNECTED; /* destroying address */ - GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL); + GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session); /* request new address */ if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) @@ -505,7 +499,7 @@ change (struct NeighbourMapEntry *n, int state, int line) case S_DISCONNECT: break; case S_FAST_RECONNECT: - if ((state == S_CONNECT_SENT) || (state == S_DISCONNECT)) + if ((state == S_CONNECTED) || (state == S_DISCONNECT)) allowed = GNUNET_YES; break; default: @@ -597,7 +591,7 @@ send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf, return GNUNET_SYSERR; } - if ((session == NULL) && (address == NULL)) + if ((session == NULL) && (address->address_length == 0)) { if (cont != NULL) cont (cont_cls, target, GNUNET_SYSERR); @@ -709,6 +703,17 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) if (NULL == mq) return; /* no more messages */ + if (n->address == NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No address for peer `%s'\n", + GNUNET_i2s (&n->id)); + transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); + GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); + n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); + return; + } + if (GST_plugins_find (n->address->transport_name) == NULL) { GNUNET_break (0); @@ -718,7 +723,7 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) n->is_active = mq; mq->n = n; - if ( (n->session == NULL) && (NULL == n->address) ) + if ((n->address->address_length == 0) && (n->session == NULL)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n", @@ -855,8 +860,9 @@ disconnect_neighbour (struct NeighbourMapEntry *n) if (is_disconnecting (n)) return; + /* send DISCONNECT MESSAGE */ - if ((previous_state == S_CONNECTED)|| is_connecting (n)) + if ((previous_state == S_CONNECTED) || is_connecting (n)) { if (GNUNET_OK == send_disconnect (&n->id, n->address, @@ -915,6 +921,10 @@ disconnect_neighbour (struct NeighbourMapEntry *n) case S_FAST_RECONNECT: GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1, GNUNET_NO); + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# fast reconnects failed"), + 1, GNUNET_NO); disconnect_notify_cb (callback_cls, &n->id); default: break; @@ -1077,6 +1087,12 @@ GST_neighbours_stop () disconnect_notify_cb = NULL; } +struct ContinutionContext +{ + struct GNUNET_HELLO_Address *address; + + struct Session *session; +}; /** * We tried to send a SESSION_CONNECT message to another peer. If this @@ -1090,34 +1106,53 @@ static void send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target, int success) { - struct GNUNET_HELLO_Address *address = cls; - struct NeighbourMapEntry *n; + struct ContinutionContext * cc = cls; + struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer); if (GNUNET_YES != success) - GNUNET_ATS_address_destroyed (GST_ats, address, NULL); + GNUNET_ATS_address_destroyed (GST_ats, cc->address, NULL); + //GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); if ( (NULL == neighbours) || - (NULL == (n = lookup_neighbour (&address->peer))) || - (n->state == S_DISCONNECT) || - (GNUNET_YES == success) ) + (NULL == n) || + (n->state == S_DISCONNECT)) { - GNUNET_HELLO_address_free (address); + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); return; } + + if ((GNUNET_YES == success) && + ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT))) + { + change_state (n, S_CONNECT_SENT); + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); + return; + } + + if ((GNUNET_NO == success) && + ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT))) + { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n", - GNUNET_i2s (&n->id), - GST_plugins_a2s (n->address), - n->session); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n", + GNUNET_i2s (&n->id), + GST_plugins_a2s (n->address), + n->session); #endif - change_state (n, S_NOT_CONNECTED); - if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = - GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel, - n); - GNUNET_ATS_suggest_address (GST_ats, &n->id); - GNUNET_HELLO_address_free (address); + change_state (n, S_NOT_CONNECTED); + GNUNET_ATS_address_destroyed (GST_ats, cc->address, NULL); + //GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); + + if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (n->ats_suggest); + n->ats_suggest = + GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel, + n); + GNUNET_ATS_suggest_address (GST_ats, &n->id); + } + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); } @@ -1133,13 +1168,25 @@ send_switch_address_continuation (void *cls, const struct GNUNET_PeerIdentity *target, int success) { - struct NeighbourMapEntry *n = cls; + struct ContinutionContext * cc = cls; + struct NeighbourMapEntry *n; - GNUNET_assert (n != NULL); - if (is_disconnecting (n)) + if (neighbours == NULL) + { + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); + return; /* neighbour is going away */ + } + + n = lookup_neighbour(&cc->address->peer); + if ((n == NULL) || (is_disconnecting (n))) + { + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); return; /* neighbour is going away */ + } - GNUNET_assert (n->state == S_CONNECTED); + GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT)); if (GNUNET_YES != success) { #if DEBUG_TRANSPORT @@ -1148,8 +1195,8 @@ send_switch_address_continuation (void *cls, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session); #endif - - GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL); + GNUNET_ATS_address_destroyed (GST_ats, cc->address, NULL); + //GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) GNUNET_SCHEDULER_cancel (n->ats_suggest); @@ -1157,10 +1204,53 @@ send_switch_address_continuation (void *cls, GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, n); GNUNET_ATS_suggest_address (GST_ats, &n->id); + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); return; } /* Tell ATS that switching addresses was successful */ - GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES); + switch (n->state) { + case S_CONNECTED: + GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES); + break; + case S_FAST_RECONNECT: +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Successful fast reconnect to peer `%s'\n", GNUNET_i2s (&n->id)); +#endif + change_state (n, S_CONNECTED); + neighbours_connected++; + GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, + GNUNET_NO); + + GST_validation_set_address_use (&n->id, + cc->address, + cc->session, + GNUNET_YES); + + GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES); + + if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK) + n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n); + + /* Updating quotas */ + struct QuotaSetMessage q_msg; + GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending outbound quota of %u Bps for peer `%s' to all clients\n", + ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer)); +#endif + q_msg.header.size = htons (sizeof (struct QuotaSetMessage)); + q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); + q_msg.quota = n->bandwidth_out; + q_msg.peer = (*target); + GST_clients_broadcast (&q_msg.header, GNUNET_NO); + default: + break; + } + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); } /** @@ -1176,11 +1266,30 @@ send_connect_ack_continuation (void *cls, const struct GNUNET_PeerIdentity *target, int success) { - struct NeighbourMapEntry *n = cls; + struct ContinutionContext * cc = cls; + struct NeighbourMapEntry *n; + + if (neighbours == NULL) + { + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); + return; /* neighbour is going away */ + } + + n = lookup_neighbour(&cc->address->peer); + if ((n == NULL) || (is_disconnecting (n))) + { + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); + return; /* neighbour is going away */ + } - GNUNET_assert (n != NULL); if (GNUNET_YES == success) + { + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); return; /* sending successful */ + } /* sending failed, ask for next address */ #if DEBUG_TRANSPORT @@ -1192,7 +1301,8 @@ send_connect_ack_continuation (void *cls, #endif change_state (n, S_NOT_CONNECTED); - GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL); + GNUNET_ATS_address_destroyed (GST_ats, cc->address, NULL); + //GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) GNUNET_SCHEDULER_cancel (n->ats_suggest); @@ -1200,8 +1310,10 @@ send_connect_ack_continuation (void *cls, GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, n); GNUNET_ATS_suggest_address (GST_ats, &n->id); -} + GNUNET_HELLO_address_free (cc->address); + GNUNET_free (cc); +} /** * For an existing neighbour record, set the active connection to @@ -1229,6 +1341,7 @@ GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer, { struct NeighbourMapEntry *n; struct SessionConnectMessage connect_msg; + struct ContinutionContext * cc; size_t msg_len; size_t ret; @@ -1294,7 +1407,7 @@ GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer, GST_clients_broadcast (&q_msg.header, GNUNET_NO); return GNUNET_NO; } - if (n->state == S_CONNECTED) + if (n->state == S_CONNECTED) { /* mark old address as no longer used */ GNUNET_assert (NULL != n->address); @@ -1320,7 +1433,6 @@ GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer, { case S_NOT_CONNECTED: case S_CONNECT_SENT: - case S_FAST_RECONNECT: msg_len = sizeof (struct SessionConnectMessage); connect_msg.header.size = htons (msg_len); connect_msg.header.type = @@ -1328,13 +1440,16 @@ GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer, connect_msg.reserved = htonl (0); connect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - change_state (n, S_CONNECT_SENT); + + cc = GNUNET_malloc(sizeof (struct ContinutionContext)); + cc->session = session; + cc->address = GNUNET_HELLO_address_copy (address); ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session, address, GNUNET_YES, &send_connect_continuation, - GNUNET_HELLO_address_copy (address)); + cc); return GNUNET_NO; case S_CONNECT_RECV: /* We received a CONNECT message and asked ATS for an address */ @@ -1345,14 +1460,18 @@ GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer, connect_msg.reserved = htonl (0); connect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); + cc = GNUNET_malloc(sizeof (struct ContinutionContext)); + cc->session = session; + cc->address = GNUNET_HELLO_address_copy (address); ret = send_with_plugin (&n->id, (const void *) &connect_msg, msg_len, UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session, address, GNUNET_YES, - &send_connect_ack_continuation, n); + &send_connect_ack_continuation, cc); return GNUNET_NO; case S_CONNECTED: - /* connected peer is switching addresses */ + case S_FAST_RECONNECT: + /* connected peer is switching addresses or tries fast reconnect*/ GST_validation_set_address_use (&n->id, n->address, n->session, @@ -1364,11 +1483,14 @@ GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer, connect_msg.reserved = htonl (0); connect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); + cc = GNUNET_malloc(sizeof (struct ContinutionContext)); + cc->session = session; + cc->address = GNUNET_HELLO_address_copy (address); ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session, address, GNUNET_YES, - &send_switch_address_continuation, n); + &send_switch_address_continuation, cc); if (ret == GNUNET_SYSERR) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1511,7 +1633,6 @@ GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) return GNUNET_YES; } - /** * A session was terminated. Take note. * @@ -1541,22 +1662,34 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, if (session != n->session) return; /* doesn't affect us */ if (n->state == S_CONNECTED) + { GST_validation_set_address_use (&n->id, n->address, n->session, GNUNET_NO); - n->session = NULL; + GNUNET_ATS_address_in_use (GST_ats,n->address, n->session, GNUNET_NO); + } + + + //GNUNET_ATS_address_destroyed(GST_ats, n->address, n->session); + if (NULL != n->address) { GNUNET_HELLO_address_free (n->address); n->address = NULL; } + n->session = NULL; /* not connected anymore anyway, shouldn't matter */ if ((S_CONNECTED != n->state) && (!is_connecting (n))) return; - change_state (n, S_FAST_RECONNECT); + /* connected, try fast reconnect */ +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Trying fast reconnect to peer `%s'\n", GNUNET_i2s (peer)); +#endif + change_state (n, S_FAST_RECONNECT); GNUNET_assert (neighbours_connected > 0); neighbours_connected--; -- 2.25.1