From 64d5a91798af32dd3a5183ee5e9ddafcbc376ead Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Thu, 27 Oct 2011 13:01:11 +0000 Subject: [PATCH] more changes: including connection switching --- .../gnunet-service-transport_neighbours_fsm.c | 325 ++++++++++-------- 1 file changed, 191 insertions(+), 134 deletions(-) diff --git a/src/transport/gnunet-service-transport_neighbours_fsm.c b/src/transport/gnunet-service-transport_neighbours_fsm.c index e920580a9..862608004 100644 --- a/src/transport/gnunet-service-transport_neighbours_fsm.c +++ b/src/transport/gnunet-service-transport_neighbours_fsm.c @@ -365,59 +365,40 @@ is_disconnecting (struct NeighbourMapEntry * n) return GNUNET_NO; } -static int -change (struct NeighbourMapEntry * n, int state, int line) +static const char * +print_state (int state) { - char * old = NULL; - char * new = NULL; - - switch (n->state) { - case S_CONNECTED: - old = "S_CONNECTED"; - break; - case S_CONNECT_RECV: - old = "S_CONNECT_RECV"; - break; - case S_CONNECT_RECV_ACK_SENT: - old = "S_CONNECT_RECV_ACK_SENT"; - break; - case S_CONNECT_SENT: - old = "S_CONNECT_SENT"; - break; - case S_DISCONNECT: - old = "S_DISCONNECT"; - break; - case S_NOT_CONNECTED: - old = "S_NOT_CONNECTED"; - break; - default: - GNUNET_break (0); - break; - } - switch (state) { case S_CONNECTED: - new = "S_CONNECTED"; + return "S_CONNECTED"; break; case S_CONNECT_RECV: - new = "S_CONNECT_RECV"; + return "S_CONNECT_RECV"; break; case S_CONNECT_RECV_ACK_SENT: - new = "S_CONNECT_RECV_ACK_SENT"; + return"S_CONNECT_RECV_ACK_SENT"; break; case S_CONNECT_SENT: - new = "S_CONNECT_SENT"; + return "S_CONNECT_SENT"; break; case S_DISCONNECT: - new = "S_DISCONNECT"; + return "S_DISCONNECT"; break; case S_NOT_CONNECTED: - new = "S_NOT_CONNECTED"; + return "S_NOT_CONNECTED"; break; default: GNUNET_break (0); break; } + return NULL; +} + +static int +change (struct NeighbourMapEntry * n, int state, int line) +{ + char * old = strdup(print_state(n->state)); + char * new = strdup(print_state(state)); /* allowed transitions */ int allowed = GNUNET_NO; @@ -471,18 +452,21 @@ change (struct NeighbourMapEntry * n, int state, int line) "Illegal state transition from `%s' to `%s' in line %u \n", old, new, line); GNUNET_break (0); + GNUNET_free (old); + GNUNET_free (new); return GNUNET_SYSERR; } n->state = state; GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n", GNUNET_i2s (&n->id), n, old, new, line); + GNUNET_free (old); + GNUNET_free (new); return GNUNET_OK; } static ssize_t -send_with_plugin (void *cls, - const struct GNUNET_PeerIdentity * target, +send_with_plugin ( const struct GNUNET_PeerIdentity * target, const char *msgbuf, size_t msgbuf_size, uint32_t priority, @@ -584,7 +568,6 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) struct MessageQueue *mq; struct GNUNET_TIME_Relative timeout; ssize_t ret; - struct GNUNET_TRANSPORT_PluginFunctions *papi; if (n->is_active != NULL) { @@ -609,8 +592,7 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) if (NULL == mq) return; /* no more messages */ - papi = GST_plugins_find (n->plugin_name); - if (papi == NULL) + if (GST_plugins_find (n->plugin_name) == NULL) { GNUNET_break (0); return; @@ -628,13 +610,13 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); return; } - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "try_transmission_to_peer\n"); - papi = GST_plugins_find (n->plugin_name); - ret = - papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size, - 0 /* priority -- remove from plugin API? */ , - timeout, n->session, n->addr, n->addrlen, GNUNET_YES, - &transmit_send_continuation, mq); + + ret = send_with_plugin (&n->id, + mq->message_buf, mq->message_buf_size, 0, + timeout, + n->session, n->plugin_name, n->addr, n->addrlen, + GNUNET_YES, + &transmit_send_continuation, mq); if (ret == -1) { /* failure, but 'send' would not call continuation in this case, @@ -654,7 +636,6 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) static void transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "transmission_task\n"); struct NeighbourMapEntry *n = cls; GNUNET_assert (NULL != lookup_neighbour(&n->id)); n->transmission_task = GNUNET_SCHEDULER_NO_TASK; @@ -679,6 +660,18 @@ GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb, neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE); } +static void +send_disconnect_cont (void *cls, + const struct GNUNET_PeerIdentity * target, + int result) +{ + struct NeighbourMapEntry *n = cls; + + if (result == GNUNET_OK) + change_state (n, S_DISCONNECT); + else + change_state (n, S_NOT_CONNECTED); +} static int send_disconnect (struct NeighbourMapEntry *n) @@ -703,10 +696,10 @@ send_disconnect (struct NeighbourMapEntry *n) &disconnect_msg.purpose, &disconnect_msg.signature)); - ret = send_with_plugin(NULL, &n->id, + ret = send_with_plugin(&n->id, (const char *) &disconnect_msg, sizeof (disconnect_msg), UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->plugin_name, n->addr, n->addrlen, - GNUNET_YES, NULL, n); + GNUNET_YES, &send_disconnect_cont, n); if (ret == GNUNET_SYSERR) return GNUNET_SYSERR; @@ -738,13 +731,11 @@ disconnect_neighbour (struct NeighbourMapEntry *n) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Sent DISCONNECT_MSG to `%s'\n", GNUNET_i2s (&n->id)); - change_state (n, S_DISCONNECT); } else { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Could not send DISCONNECT_MSG to `%s'\n", GNUNET_i2s (&n->id)); - change_state (n, S_NOT_CONNECTED); } } @@ -851,7 +842,7 @@ neighbour_keepalive_task (void *cls, m.size = htons (sizeof (struct GNUNET_MessageHeader)); m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE); - send_with_plugin(NULL, &n->id, (const void *) &m, + send_with_plugin(&n->id, (const void *) &m, sizeof (m), UINT32_MAX /* priority */ , GNUNET_TIME_UNIT_FOREVER_REL, @@ -921,6 +912,8 @@ send_connect_continuation (void *cls, struct NeighbourMapEntry *n = cls; GNUNET_assert (n != NULL); + GNUNET_assert (!is_connected(n)); + if (GNUNET_YES == n->in_disconnect) return; /* neighbour is going away */ if (GNUNET_YES != success) @@ -934,7 +927,6 @@ send_connect_continuation (void *cls, n->addrlen), n->session); #endif - change_state(n, S_NOT_CONNECTED); GNUNET_ATS_address_destroyed (GST_ats, &n->id, @@ -949,6 +941,52 @@ send_connect_continuation (void *cls, change_state(n, S_CONNECT_SENT); } + +/** + * We tried to switch addresses with an peer already connected. If it failed, + * we should tell ATS to not use this address anymore (until it is re-validated). + * + * @param cls the 'struct NeighbourMapEntry' + * @param success GNUNET_OK on success + */ +static void +send_switch_address_continuation (void *cls, + const struct GNUNET_PeerIdentity * target, + int success) + +{ + struct NeighbourMapEntry *n = cls; + + GNUNET_assert (n != NULL); + if (GNUNET_YES == n->in_disconnect) + return; /* neighbour is going away */ + + GNUNET_assert (n->state == S_CONNECTED); + if (GNUNET_YES != success) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n", + GNUNET_i2s (&n->id), n->plugin_name, + (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, + n->addr, + n->addrlen), + n->session); +#endif + change_state(n, S_NOT_CONNECTED); + + GNUNET_ATS_address_destroyed (GST_ats, + &n->id, + n->plugin_name, + n->addr, + n->addrlen, + NULL); + + GNUNET_ATS_suggest_address(GST_ats, &n->id); + return; + } +} + /** * We tried to send a SESSION_CONNECT message to another peer. If this * succeeded, we change the state. If it failed, we should tell @@ -965,11 +1003,6 @@ send_connect_ack_continuation (void *cls, { struct NeighbourMapEntry *n = cls; - //FIMXE comeplete this - GNUNET_break (0); -return; - - GNUNET_assert (n != NULL); if (GNUNET_YES == n->in_disconnect) return; /* neighbour is going away */ @@ -996,7 +1029,7 @@ return; GNUNET_ATS_suggest_address(GST_ats, &n->id); return; } - change_state(n, S_CONNECT_SENT); + //change_state(n, S_CONNECT_SENT); } /** @@ -1026,14 +1059,6 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, size_t msg_len; size_t ret; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n", - GNUNET_i2s (peer), plugin_name, - (address_len == 0) ? "" : GST_plugins_a2s (plugin_name, - address, - address_len), - session); - GNUNET_assert (neighbours != NULL); n = lookup_neighbour (peer); if (NULL == n) @@ -1046,16 +1071,17 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, return GNUNET_NO; } - #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n", - GNUNET_i2s (peer), plugin_name, +#endif + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "ATS tells us to switch to plugin `%s' address '%s' session %X for %s peer `%s'\n", + plugin_name, (address_len == 0) ? "" : GST_plugins_a2s (plugin_name, address, address_len), - session); -#endif + session, (is_connected(n) ? "CONNECTED" : "NOT CONNECTED"), + GNUNET_i2s (peer)); + GNUNET_free_non_null (n->addr); n->addr = GNUNET_malloc (address_len); memcpy (n->addr, address, address_len); @@ -1079,7 +1105,7 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, connect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - ret =send_with_plugin (NULL, peer, (const char *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_connect_continuation, n); + ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_connect_continuation, n); if (ret == GNUNET_SYSERR) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1090,7 +1116,9 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, address_len), session); } + return GNUNET_NO; } + /* We received a CONNECT message and asked ATS for an address */ else if (n->state == S_CONNECT_RECV) { msg_len = sizeof (struct SessionConnectMessage); @@ -1100,22 +1128,41 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, connect_msg.reserved = htonl (0); connect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - ret = send_with_plugin(NULL, &n->id, (const void *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_connect_ack_continuation, n); + ret = send_with_plugin(&n->id, (const void *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_connect_ack_continuation, n); if (ret == GNUNET_SYSERR) { change_state (n, S_NOT_CONNECTED); GNUNET_break (0); - return GNUNET_NO; } + return GNUNET_NO; } - else + /* connected peer is switching addresses */ + else if (n->state == S_CONNECTED) { - GNUNET_break (0); - } - - + msg_len = sizeof (struct SessionConnectMessage); + connect_msg.header.size = htons (msg_len); + connect_msg.header.type = + htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); + connect_msg.reserved = htonl (0); + connect_msg.timestamp = + GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); + ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, 0, GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name, address, address_len, GNUNET_YES, &send_switch_address_continuation, n); + if (ret == GNUNET_SYSERR) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n", + GNUNET_i2s (peer), plugin_name, + (address_len == 0) ? "" : GST_plugins_a2s (plugin_name, + address, + address_len), + session); + } + return GNUNET_NO; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Invalid connection state to switch addresses "); + GNUNET_break (0); return GNUNET_NO; } @@ -1222,10 +1269,11 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, GNUNET_assert (neighbours != NULL); #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +#endif + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Session %X to peer `%s' ended \n", session, GNUNET_i2s (peer)); -#endif + n = lookup_neighbour (peer); if (NULL == n) return; @@ -1246,6 +1294,7 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT, &neighbour_timeout_task, n); /* try QUICKLY to re-establish a connection, reduce timeout! */ +// change_state (n, S_NOT_CONNECTED); GNUNET_ATS_suggest_address (GST_ats, peer); } @@ -1267,7 +1316,7 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, { struct NeighbourMapEntry *n; struct MessageQueue *mq; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "GST_neighbours_send %u\n", __LINE__); + GNUNET_assert (neighbours != NULL); n = lookup_neighbour (target); @@ -1291,7 +1340,7 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, cont (cont_cls, GNUNET_SYSERR); return; } - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "GST_neighbours_send %u %X %s\n", __LINE__ , n->session, GST_plugins_a2s(n->plugin_name, n->addr, n->addrlen)); + if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0)) { GNUNET_STATISTICS_update (GST_stats, @@ -1308,7 +1357,7 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, cont (cont_cls, GNUNET_SYSERR); return; } - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "GST_neighbours_send %u\n", __LINE__); + GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader)); GNUNET_STATISTICS_update (GST_stats, gettext_noop @@ -1324,8 +1373,6 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "GST_neighbours_send %u\n", __LINE__); - if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) && (NULL == n->is_active)) n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); @@ -1642,47 +1689,6 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer GST_neighbours_force_disconnect (peer); } -static void neighbour_connected (struct NeighbourMapEntry *n, - const struct GNUNET_ATS_Information *ats, - uint32_t ats_count, int send_connect_ack) -{ - struct GNUNET_MessageHeader msg; - size_t msg_len; - int ret; - - if (is_connected(n)) - return; - - change_state (n, S_CONNECTED); - n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, - &neighbour_keepalive_task, - n); - - if (send_connect_ack) - { - /* send CONNECT_ACK (SYN_ACK)*/ - msg_len = sizeof (msg); - msg.size = htons (msg_len); - msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); - - ret = send_with_plugin (NULL, &n->id, (const char *) &msg, msg_len, 0, - GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->plugin_name, n->addr, n->addrlen, GNUNET_YES, NULL, NULL); - if (ret == GNUNET_SYSERR) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n", - GNUNET_i2s (&n->id), n->plugin_name, - (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, - n->addr, - n->addrlen), - n->session); - } - neighbours_connected++; - GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, - GNUNET_NO); - connect_notify_cb (callback_cls, &n->id, ats, ats_count); -} - - /** * We received a 'SESSION_CONNECT_ACK' message from the other peer. * Consider switching to it. @@ -1707,8 +1713,12 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, uint32_t ats_count) { const struct SessionConnectMessage *scm; + struct GNUNET_MessageHeader msg; struct GNUNET_TIME_Absolute ts; struct NeighbourMapEntry *n; + size_t msg_len; + size_t ret; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "GST_neighbours_handle_connect_ack SYN/ACK\n"); if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) @@ -1743,7 +1753,33 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, plugin_name, sender_address, sender_address_len, session, ats, ats_count); - neighbour_connected (n, ats, ats_count, GNUNET_YES); + change_state (n, S_CONNECTED); + n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, + &neighbour_keepalive_task, + n); + /* send ACK (ACK)*/ + msg_len = sizeof (msg); + msg.size = htons (msg_len); + msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); + + ret = send_with_plugin (&n->id, (const char *) &msg, msg_len, 0, + GNUNET_TIME_UNIT_FOREVER_REL, + n->session, n->plugin_name, n->addr, n->addrlen, + GNUNET_YES, NULL, NULL); + + if (ret == GNUNET_SYSERR) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to send SESSION_ACK to `%4s' using plugin `%s' address '%s' session %X\n", + GNUNET_i2s (&n->id), n->plugin_name, + (n->addrlen == 0) ? "" : GST_plugins_a2s (n->plugin_name, + n->addr, + n->addrlen), + n->session); + + neighbours_connected++; + GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, + GNUNET_NO); + connect_notify_cb (callback_cls, &n->id, ats, ats_count); } void @@ -1773,7 +1809,8 @@ GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message, if (n->state != S_CONNECT_RECV) { - send_disconnect(n); + send_disconnect (n); + change_state (n, S_DISCONNECT); GNUNET_break (0); return; } @@ -1793,7 +1830,15 @@ GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message, plugin_name, sender_address, sender_address_len, session, ats, ats_count); - neighbour_connected (n, ats, ats_count, GNUNET_NO); + change_state (n, S_CONNECTED); + n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, + &neighbour_keepalive_task, + n); + + neighbours_connected++; + GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, + GNUNET_NO); + connect_notify_cb (callback_cls, &n->id, ats, ats_count); } struct BlackListCheckContext @@ -1855,7 +1900,7 @@ handle_connect_blacklist_cont (void *cls, GNUNET_free (bcc); - if (n->state > S_NOT_CONNECTED) + if (n->state != S_NOT_CONNECTED) return; change_state (n, S_CONNECT_RECV); @@ -1887,6 +1932,7 @@ GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, uint32_t ats_count) { const struct SessionConnectMessage *scm; + struct NeighbourMapEntry * n; struct BlackListCheckContext * bcc = NULL; GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "GST_neighbours_handle_connect SYN\n"); @@ -1900,7 +1946,18 @@ GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, scm = (const struct SessionConnectMessage *) message; GNUNET_break_op (ntohl (scm->reserved) == 0); + n = lookup_neighbour(peer); + if (n != NULL) + { + /* connected peer switches addresses */ + if (is_connected(n)) + { + GNUNET_ATS_address_update(GST_ats, peer, plugin_name, sender_address, sender_address_len, session, ats, ats_count); + return; + } + } + /* we are not connected to this peer */ /* do blacklist check*/ bcc = GNUNET_malloc (sizeof (struct BlackListCheckContext) + sizeof (struct GNUNET_ATS_Information) * ats_count + -- 2.25.1