X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Fgnunet-service-transport_neighbours.c;h=01546ded4d059e1a58fa928012b364ae55419fe2;hb=9bbe1dc9c3dfa9de90759e540d715de6547e43cd;hp=15bb0bdab5846ca30e09390b1e34c6e82ba94ff4;hpb=6115a1150c65bd4a33ed61c6e96594c4a73d86ac;p=oweals%2Fgnunet.git diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c index 15bb0bdab..01546ded4 100644 --- a/src/transport/gnunet-service-transport_neighbours.c +++ b/src/transport/gnunet-service-transport_neighbours.c @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -157,6 +157,26 @@ struct SessionKeepAliveMessage uint32_t nonce GNUNET_PACKED; }; + +/** + * Message a peer sends to another when connected to indicate that + * the other peer should limit transmissions to the indicated + * quota. + */ +struct SessionQuotaMessage +{ + /** + * Header of type #GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_QUOTA. + */ + struct GNUNET_MessageHeader header; + + /** + * Quota to use (for sending), in bytes per second. + */ + uint32_t quota GNUNET_PACKED; +}; + + /** * Message we send to the other peer to notify him that we intentionally * are disconnecting (to reduce timeouts). This is just a friendly @@ -336,7 +356,7 @@ struct NeighbourMapEntry /** * Main task that drives this peer (timeouts, keepalives, etc.). - * Always runs the 'master_task'. + * Always runs the #master_task(). */ struct GNUNET_SCHEDULER_Task *task; @@ -371,7 +391,7 @@ struct NeighbourMapEntry /** * Time where we should cut the connection (timeout) if we don't * make progress in the state machine (or get a KEEPALIVE_RESPONSE - * if we are in #S_CONNECTED). + * if we are in #GNUNET_TRANSPORT_PS_CONNECTED). */ struct GNUNET_TIME_Absolute timeout; @@ -387,6 +407,13 @@ struct NeighbourMapEntry */ unsigned int quota_violation_count; + /** + * Latest quota the other peer send us in bytes per second. + * We should not send more, least the other peer throttle + * receiving our traffic. + */ + struct GNUNET_BANDWIDTH_Value32NBO neighbour_receive_quota; + /** * The current state of the peer. */ @@ -641,13 +668,15 @@ test_connected (struct NeighbourMapEntry *n) /** * Send information about a new outbound quota to our clients. + * Note that the outbound quota is enforced client-side (i.e. + * in libgnunettransport). * * @param target affected peer * @param quota new quota */ static void -send_outbound_quota (const struct GNUNET_PeerIdentity *target, - struct GNUNET_BANDWIDTH_Value32NBO quota) +send_outbound_quota_to_clients (const struct GNUNET_PeerIdentity *target, + struct GNUNET_BANDWIDTH_Value32NBO quota) { struct QuotaSetMessage q_msg; @@ -826,6 +855,9 @@ set_alternative_address (struct NeighbourMapEntry *n, n->alternative_address.session = session; n->alternative_address.ats_active = GNUNET_NO; n->alternative_address.keep_alive_nonce = 0; + GNUNET_assert (GNUNET_YES == + GST_ats_is_known (n->alternative_address.address, + n->alternative_address.session)); } @@ -839,37 +871,29 @@ set_alternative_address (struct NeighbourMapEntry *n, * address must be setup) * @param bandwidth_in inbound quota to be used when connection is up * @param bandwidth_out outbound quota to be used when connection is up - * @param is_active #GNUNET_YES to mark this as the active address with ATS */ static void set_primary_address (struct NeighbourMapEntry *n, const struct GNUNET_HELLO_Address *address, struct Session *session, struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in, - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, - int is_active) + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out) { if (session == n->primary_address.session) { - if (is_active != n->primary_address.ats_active) + GST_validation_set_address_use (n->primary_address.address, + GNUNET_YES); + if (n->primary_address.bandwidth_in.value__ != bandwidth_in.value__) { - n->primary_address.ats_active = is_active; - GST_validation_set_address_use (n->primary_address.address, - is_active); + n->primary_address.bandwidth_in = bandwidth_in; + GST_neighbours_set_incoming_quota (&address->peer, + bandwidth_in); } - if (GNUNET_YES == is_active) + if (n->primary_address.bandwidth_out.value__ != bandwidth_out.value__) { - if (n->primary_address.bandwidth_in.value__ != bandwidth_in.value__) - { - n->primary_address.bandwidth_in = bandwidth_in; - GST_neighbours_set_incoming_quota (&address->peer, bandwidth_in); - } - if (n->primary_address.bandwidth_out.value__ != bandwidth_out.value__) - { - n->primary_address.bandwidth_out = bandwidth_out; - send_outbound_quota (&address->peer, - bandwidth_out); - } + n->primary_address.bandwidth_out = bandwidth_out; + send_outbound_quota_to_clients (&address->peer, + bandwidth_out); } return; } @@ -897,18 +921,17 @@ set_primary_address (struct NeighbourMapEntry *n, n->primary_address.bandwidth_in = bandwidth_in; n->primary_address.bandwidth_out = bandwidth_out; n->primary_address.session = session; - n->primary_address.ats_active = is_active; n->primary_address.keep_alive_nonce = 0; - if (GNUNET_YES == is_active) - { - /* subsystems about address use */ - GST_validation_set_address_use (n->primary_address.address, - GNUNET_YES); - GST_neighbours_set_incoming_quota (&address->peer, bandwidth_in); - send_outbound_quota (&address->peer, - bandwidth_out); - } - + GNUNET_assert (GNUNET_YES == + GST_ats_is_known (n->primary_address.address, + n->primary_address.session)); + /* subsystems about address use */ + GST_validation_set_address_use (n->primary_address.address, + GNUNET_YES); + GST_neighbours_set_incoming_quota (&address->peer, + bandwidth_in); + send_outbound_quota_to_clients (&address->peer, + bandwidth_out); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Neighbour `%s' switched to address `%s'\n", GNUNET_i2s (&n->id), @@ -1046,7 +1069,7 @@ send_with_session (struct NeighbourMapEntry *n, struct GNUNET_TRANSPORT_PluginFunctions *papi; struct GNUNET_TIME_Relative result = GNUNET_TIME_UNIT_FOREVER_REL; - GNUNET_assert (n->primary_address.session != NULL); + GNUNET_assert (NULL != n->primary_address.session); if ( ((NULL == (papi = GST_plugins_find (n->primary_address.address->transport_name)) || (-1 == papi->send (papi->cls, n->primary_address.session, @@ -1236,13 +1259,20 @@ disconnect_neighbour (struct NeighbourMapEntry *n) static void transmit_send_continuation (void *cls, const struct GNUNET_PeerIdentity *receiver, - int success, size_t size_payload, size_t physical) + int success, + size_t size_payload, + size_t physical) { struct MessageQueue *mq = cls; struct NeighbourMapEntry *n; if (NULL == (n = lookup_neighbour (receiver))) { + if (NULL != mq->cont) + mq->cont (mq->cont_cls, + GNUNET_SYSERR /* not connected */, + size_payload, + 0); GNUNET_free (mq); return; /* disconnect or other error while transmitting, can happen */ } @@ -1253,7 +1283,8 @@ transmit_send_continuation (void *cls, n->is_active = NULL; if (NULL != n->task) GNUNET_SCHEDULER_cancel (n->task); - n->task = GNUNET_SCHEDULER_add_now (&master_task, n); + n->task = GNUNET_SCHEDULER_add_now (&master_task, + n); } if (bytes_in_send_queue < mq->message_buf_size) { @@ -1267,18 +1298,17 @@ transmit_send_continuation (void *cls, GNUNET_break (0); } - GNUNET_break (size_payload == mq->message_buf_size); bytes_in_send_queue -= mq->message_buf_size; GNUNET_STATISTICS_set (GST_stats, - gettext_noop - ("# bytes in message queue for other peers"), - bytes_in_send_queue, GNUNET_NO); + gettext_noop ("# bytes in message queue for other peers"), + bytes_in_send_queue, + GNUNET_NO); if (GNUNET_OK == success) GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# messages transmitted to other peers"), - 1, GNUNET_NO); + gettext_noop ("# messages transmitted to other peers"), + 1, + GNUNET_NO); else GNUNET_STATISTICS_update (GST_stats, gettext_noop @@ -1291,7 +1321,10 @@ transmit_send_continuation (void *cls, mq->message_buf_size, (success == GNUNET_OK) ? "success" : "FAILURE"); if (NULL != mq->cont) - mq->cont (mq->cont_cls, success, size_payload, physical); + mq->cont (mq->cont_cls, + success, + size_payload, + physical); GNUNET_free (mq); } @@ -1339,9 +1372,9 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) if (timeout.rel_value_us > 0) break; GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# messages timed out while in transport queue"), - 1, GNUNET_NO); + gettext_noop ("# messages timed out while in transport queue"), + 1, + GNUNET_NO); GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); @@ -1396,10 +1429,11 @@ send_keepalive (struct NeighbourMapEntry *n) nonce = 0; /* 0 indicates 'not set' */ while (0 == nonce) - nonce = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + nonce = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending keep alive to peer `%s' with nonce %u\n", + "Sending KEEPALIVE to peer `%s' with nonce %u\n", GNUNET_i2s (&n->id), nonce); m.header.size = htons (sizeof (struct SessionKeepAliveMessage)); @@ -1414,7 +1448,7 @@ send_keepalive (struct NeighbourMapEntry *n) GNUNET_YES, NULL, NULL); GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# keepalives sent"), + gettext_noop ("# KEEPALIVES sent"), 1, GNUNET_NO); n->primary_address.keep_alive_nonce = nonce; @@ -1440,9 +1474,12 @@ GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour, struct SessionKeepAliveMessage msg; if (sizeof (struct SessionKeepAliveMessage) != ntohs (m->size)) + { + GNUNET_break_op (0); return; + } - msg_in = (struct SessionKeepAliveMessage *) m; + msg_in = (const struct SessionKeepAliveMessage *) m; if (NULL == (n = lookup_neighbour (neighbour))) { GNUNET_STATISTICS_update (GST_stats, @@ -1461,8 +1498,13 @@ GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received keep alive request from peer `%s' with nonce %u\n", - GNUNET_i2s (&n->id), ntohl (msg_in->nonce)); + "Received KEEPALIVE request from peer `%s' with nonce %u\n", + GNUNET_i2s (&n->id), + ntohl (msg_in->nonce)); + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# KEEPALIVES received in good order"), + 1, + GNUNET_NO); /* send reply to allow neighbour to measure latency */ msg.header.size = htons (sizeof (struct SessionKeepAliveMessage)); @@ -1493,61 +1535,70 @@ GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour, struct NeighbourMapEntry *n; const struct SessionKeepAliveMessage *msg; struct GNUNET_TRANSPORT_PluginFunctions *papi; - struct GNUNET_ATS_Information ats; struct GNUNET_TIME_Relative latency; if (sizeof (struct SessionKeepAliveMessage) != ntohs (m->size)) + { + GNUNET_break_op (0); return; + } msg = (const struct SessionKeepAliveMessage *) m; if (NULL == (n = lookup_neighbour (neighbour))) { GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# KEEPALIVE_RESPONSE messages discarded (not connected)"), - 1, GNUNET_NO); + gettext_noop ("# KEEPALIVE_RESPONSEs discarded (not connected)"), + 1, + GNUNET_NO); return; } if ( (GNUNET_TRANSPORT_PS_CONNECTED != n->state) || (GNUNET_YES != n->expect_latency_response) ) { GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# KEEPALIVE_RESPONSE messages discarded (not expected)"), - 1, GNUNET_NO); + gettext_noop ("# KEEPALIVE_RESPONSEs discarded (not expected)"), + 1, + GNUNET_NO); return; } if (NULL == n->primary_address.address) { GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# KEEPALIVE_RESPONSE messages discarded (address changed)"), - 1, GNUNET_NO); + gettext_noop ("# KEEPALIVE_RESPONSEs discarded (address changed)"), + 1, + GNUNET_NO); return; } if (n->primary_address.keep_alive_nonce != ntohl (msg->nonce)) { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# KEEPALIVE_RESPONSE messages discarded (wrong nonce)"), - 1, GNUNET_NO); + if (0 == n->primary_address.keep_alive_nonce) + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# KEEPALIVE_RESPONSEs discarded (no nonce)"), + 1, + GNUNET_NO); + else + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# KEEPALIVE_RESPONSEs discarded (bad nonce)"), + 1, + GNUNET_NO); return; } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received keep alive response from peer `%s' for session %p\n", - GNUNET_i2s (&n->id), n->primary_address.session); + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# KEEPALIVE_RESPONSEs received (OK)"), + 1, + GNUNET_NO); - } /* Update session timeout here */ if (NULL != (papi = GST_plugins_find (n->primary_address.address->transport_name))) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Updating session for peer `%s' for session %p\n", - GNUNET_i2s (&n->id), n->primary_address.session); - papi->update_session_timeout (papi->cls, &n->id, n->primary_address.session); + "Updating session for peer `%s' for session %p\n", + GNUNET_i2s (&n->id), + n->primary_address.session); + papi->update_session_timeout (papi->cls, + &n->id, + n->primary_address.session); } else { @@ -1562,18 +1613,13 @@ GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour, latency = GNUNET_TIME_absolute_get_duration (n->last_keep_alive_time); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Latency for peer `%s' is %s\n", + "Received KEEPALIVE_RESPONSE from peer `%s', latency is %s\n", GNUNET_i2s (&n->id), GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_YES)); - /* append latency */ - ats.type = htonl (GNUNET_ATS_QUALITY_NET_DELAY); - ats.value = htonl ( (latency.rel_value_us > UINT32_MAX) - ? UINT32_MAX - : (uint32_t) latency.rel_value_us ); - GST_ats_update_metrics (n->primary_address.address, - n->primary_address.session, - &ats, 1); + GST_ats_update_delay (n->primary_address.address, + GNUNET_TIME_relative_divide (latency, + 2)); } @@ -1590,8 +1636,9 @@ GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour, * @return how long to wait before reading more from this sender */ struct GNUNET_TIME_Relative -GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity - *sender, ssize_t size, int *do_forward) +GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity *sender, + ssize_t size, + int *do_forward) { struct NeighbourMapEntry *n; struct GNUNET_TIME_Relative ret; @@ -1692,14 +1739,20 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, { GNUNET_break (0); if (NULL != cont) - cont (cont_cls, GNUNET_SYSERR, msg_size, 0); + cont (cont_cls, + GNUNET_SYSERR, + msg_size, + 0); return; } if (GNUNET_YES != test_connected (n)) { GNUNET_break (0); if (NULL != cont) - cont (cont_cls, GNUNET_SYSERR, msg_size, 0); + cont (cont_cls, + GNUNET_SYSERR, + msg_size, + 0); return; } bytes_in_send_queue += msg_size; @@ -1715,10 +1768,13 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, mq->message_buf_size = msg_size; mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueueing %u bytes to send to peer %s\n", - msg_size, GNUNET_i2s (target)); - - GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Enqueueing %u bytes to send to peer %s\n", + msg_size, + GNUNET_i2s (target)); + GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, + n->messages_tail, + mq); if (NULL != n->task) GNUNET_SCHEDULER_cancel (n->task); n->task = GNUNET_SCHEDULER_add_now (&master_task, n); @@ -1890,6 +1946,7 @@ send_syn (struct NeighbourAddress *na) disconnect_neighbour (n); break; } + return; } GST_neighbours_notify_data_sent (na->address, na->session, @@ -2083,6 +2140,7 @@ setup_neighbour (const struct GNUNET_PeerIdentity *peer) n->id = *peer; n->ack_state = ACK_UNDEFINED; n->last_util_transmission = GNUNET_TIME_absolute_get(); + n->neighbour_receive_quota = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; GNUNET_BANDWIDTH_tracker_init (&n->in_tracker, &inbound_bw_tracker_update, n, @@ -2123,16 +2181,6 @@ struct BlacklistCheckSwitchContext */ struct GST_BlacklistCheck *blc; - /** - * Address we are asking the blacklist subsystem about. - */ - struct GNUNET_HELLO_Address *address; - - /** - * Session we should use in conjunction with @e address, can be NULL. - */ - struct Session *session; - /** * Inbound bandwidth that was assigned to @e address. */ @@ -2151,11 +2199,17 @@ struct BlacklistCheckSwitchContext * * @param cls blc_ctx bl context * @param peer the peer - * @param result the result + * @param address address associated with the request + * @param session session associated with the request + * @param result #GNUNET_OK if the connection is allowed, + * #GNUNET_NO if not, + * #GNUNET_SYSERR if operation was aborted */ static void try_connect_bl_check_cont (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_HELLO_Address *address, + struct Session *session, int result) { struct BlacklistCheckSwitchContext *blc_ctx = cls; @@ -2263,7 +2317,9 @@ GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target) (blc = GST_blacklist_test_allowed (target, NULL, &try_connect_bl_check_cont, - blc_ctx))) + blc_ctx, + NULL, + NULL))) { blc_ctx->blc = blc; } @@ -2274,7 +2330,7 @@ GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target) * We received a 'SYN' message from the other peer. * Consider switching to it. * - * @param message possibly a 'struct TransportSynMessage' (check format) + * @param message possibly a `struct TransportSynMessage` (check format) * @param peer identity of the peer to switch the address for * @return #GNUNET_OK if the message was fine, #GNUNET_SYSERR on serious error */ @@ -2425,6 +2481,7 @@ try_run_fast_ats_update (const struct GNUNET_HELLO_Address *address, struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out) { struct NeighbourMapEntry *n; + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_min; n = lookup_neighbour (&address->peer); if ( (NULL == n) || @@ -2439,13 +2496,24 @@ try_run_fast_ats_update (const struct GNUNET_HELLO_Address *address, /* switch to a different session, but keeping same address; could happen if there is a 2nd inbound connection */ n->primary_address.session = session; + GNUNET_assert (GNUNET_YES == + GST_ats_is_known (n->primary_address.address, + n->primary_address.session)); + } + if (n->primary_address.bandwidth_in.value__ != bandwidth_in.value__) + { + n->primary_address.bandwidth_in = bandwidth_in; + GST_neighbours_set_incoming_quota (&address->peer, + bandwidth_in); + } + if (n->primary_address.bandwidth_out.value__ != bandwidth_out.value__) + { + n->primary_address.bandwidth_out = bandwidth_out; + bandwidth_min = GNUNET_BANDWIDTH_value_min (bandwidth_out, + n->neighbour_receive_quota); + send_outbound_quota_to_clients (&address->peer, + bandwidth_min); } - n->primary_address.bandwidth_in = bandwidth_in; - n->primary_address.bandwidth_out = bandwidth_out; - GST_neighbours_set_incoming_quota (&address->peer, - bandwidth_in); - send_outbound_quota (&address->peer, - bandwidth_out); return GNUNET_OK; } @@ -2457,54 +2525,66 @@ try_run_fast_ats_update (const struct GNUNET_HELLO_Address *address, * @param cls the `struct BlacklistCheckSwitchContext` with * the information about the future address * @param peer the peer we may switch addresses on - * @param result #GNUNET_NO if we are not allowed to use the new - * address + * @param address address associated with the request + * @param session session associated with the request + * @param result #GNUNET_OK if the connection is allowed, + * #GNUNET_NO if not, + * #GNUNET_SYSERR if operation was aborted */ static void switch_address_bl_check_cont (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_HELLO_Address *address, + struct Session *session, int result) { struct BlacklistCheckSwitchContext *blc_ctx = cls; struct GNUNET_TRANSPORT_PluginFunctions *papi; struct NeighbourMapEntry *n; - if (result == GNUNET_NO) + if (GNUNET_SYSERR == result) + goto cleanup; + + papi = GST_plugins_find (address->transport_name); + GNUNET_assert (NULL != papi); + + if (GNUNET_NO == result) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Blacklist denied to switch to suggested address `%s' session %p for peer `%s'\n", - GST_plugins_a2s (blc_ctx->address), - blc_ctx->session, - GNUNET_i2s (&blc_ctx->address->peer)); + GST_plugins_a2s (address), + session, + GNUNET_i2s (peer)); GNUNET_STATISTICS_update (GST_stats, "# ATS suggestions ignored (blacklist denied)", 1, GNUNET_NO); - /* FIXME: tell plugin to force killing session here and now - (note: _proper_ plugin API for this does not yet exist) */ - GST_ats_block_address (blc_ctx->address, - blc_ctx->session); + papi->disconnect_session (papi->cls, + session); + if (GNUNET_YES != + GNUNET_HELLO_address_check_option (address, + GNUNET_HELLO_ADDRESS_INFO_INBOUND)) + GST_ats_block_address (address, + NULL); goto cleanup; } - papi = GST_plugins_find (blc_ctx->address->transport_name); - GNUNET_assert (NULL != papi); - if (NULL == blc_ctx->session) + if (NULL == session) { /* need to create a session, ATS only gave us an address */ - blc_ctx->session = papi->get_session (papi->cls, - blc_ctx->address); + session = papi->get_session (papi->cls, + address); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Obtained new session for peer `%s' and address '%s': %p\n", - GNUNET_i2s (&blc_ctx->address->peer), - GST_plugins_a2s (blc_ctx->address), - blc_ctx->session); - if (NULL != blc_ctx->session) - GST_ats_new_session (blc_ctx->address, - blc_ctx->session); + GNUNET_i2s (&address->peer), + GST_plugins_a2s (address), + session); + if (NULL != session) + GST_ats_new_session (address, + session); } - if (NULL == blc_ctx->session) + if (NULL == session) { /* session creation failed, bad!, fail! */ GNUNET_STATISTICS_update (GST_stats, @@ -2514,10 +2594,10 @@ switch_address_bl_check_cont (void *cls, /* No session could be obtained, remove blacklist check and clean up */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to obtain new session for peer `%s' and address '%s'\n", - GNUNET_i2s (&blc_ctx->address->peer), - GST_plugins_a2s (blc_ctx->address)); - GST_ats_block_address (blc_ctx->address, - blc_ctx->session); + GNUNET_i2s (&address->peer), + GST_plugins_a2s (address)); + GST_ats_block_address (address, + session); goto cleanup; } @@ -2525,8 +2605,8 @@ switch_address_bl_check_cont (void *cls, it is theoretically possible that the situation changed in the meantime, hence we check again here */ if (GNUNET_OK == - try_run_fast_ats_update (blc_ctx->address, - blc_ctx->session, + try_run_fast_ats_update (address, + session, blc_ctx->bandwidth_in, blc_ctx->bandwidth_out)) goto cleanup; /* was just a minor update, we're done */ @@ -2540,26 +2620,25 @@ switch_address_bl_check_cont (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer `%s' switches to address `%s'\n", - GNUNET_i2s (&blc_ctx->address->peer), - GST_plugins_a2s (blc_ctx->address)); + GNUNET_i2s (&address->peer), + GST_plugins_a2s (address)); switch (n->state) { case GNUNET_TRANSPORT_PS_NOT_CONNECTED: GNUNET_break (0); - GST_ats_block_address (blc_ctx->address, - blc_ctx->session); + GST_ats_block_address (address, + session); free_neighbour (n); return; case GNUNET_TRANSPORT_PS_INIT_ATS: /* We requested an address and ATS suggests one: * set primary address and send SYN message*/ set_primary_address (n, - blc_ctx->address, - blc_ctx->session, + address, + session, blc_ctx->bandwidth_in, - blc_ctx->bandwidth_out, - GNUNET_NO); + blc_ctx->bandwidth_out); if (ACK_SEND_SYN_ACK == n->ack_state) { /* Send pending SYN_ACK message */ @@ -2577,11 +2656,10 @@ switch_address_bl_check_cont (void *cls, * Switch and send new SYN */ /* ATS suggests a different address, switch again */ set_primary_address (n, - blc_ctx->address, - blc_ctx->session, + address, + session, blc_ctx->bandwidth_in, - blc_ctx->bandwidth_out, - GNUNET_NO); + blc_ctx->bandwidth_out); if (ACK_SEND_SYN_ACK == n->ack_state) { /* Send pending SYN_ACK message */ @@ -2598,11 +2676,10 @@ switch_address_bl_check_cont (void *cls, /* We requested an address and ATS suggests one: * set primary address and send SYN_ACK message*/ set_primary_address (n, - blc_ctx->address, - blc_ctx->session, + address, + session, blc_ctx->bandwidth_in, - blc_ctx->bandwidth_out, - GNUNET_NO); + blc_ctx->bandwidth_out); /* Send an ACK message as a response to the SYN msg */ set_state_and_timeout (n, GNUNET_TRANSPORT_PS_SYN_RECV_ACK, @@ -2623,11 +2700,10 @@ switch_address_bl_check_cont (void *cls, n->connect_ack_timestamp); } set_primary_address (n, - blc_ctx->address, - blc_ctx->session, + address, + session, blc_ctx->bandwidth_in, - blc_ctx->bandwidth_out, - GNUNET_NO); + blc_ctx->bandwidth_out); set_state_and_timeout (n, GNUNET_TRANSPORT_PS_SYN_RECV_ACK, GNUNET_TIME_relative_to_absolute (SETUP_CONNECTION_TIMEOUT)); @@ -2635,12 +2711,12 @@ switch_address_bl_check_cont (void *cls, case GNUNET_TRANSPORT_PS_CONNECTED: GNUNET_assert (NULL != n->primary_address.address); GNUNET_assert (NULL != n->primary_address.session); - GNUNET_break (n->primary_address.session != blc_ctx->session); + GNUNET_break (n->primary_address.session != session); /* ATS asks us to switch a life connection; see if we can get a SYN_ACK on it before we actually do this! */ set_alternative_address (n, - blc_ctx->address, - blc_ctx->session, + address, + session, blc_ctx->bandwidth_in, blc_ctx->bandwidth_out); set_state_and_timeout (n, @@ -2654,11 +2730,10 @@ switch_address_bl_check_cont (void *cls, break; case GNUNET_TRANSPORT_PS_RECONNECT_ATS: set_primary_address (n, - blc_ctx->address, - blc_ctx->session, + address, + session, blc_ctx->bandwidth_in, - blc_ctx->bandwidth_out, - GNUNET_NO); + blc_ctx->bandwidth_out); if (ACK_SEND_SYN_ACK == n->ack_state) { /* Send pending SYN_ACK message */ @@ -2675,11 +2750,10 @@ switch_address_bl_check_cont (void *cls, /* ATS asks us to switch while we were trying to reconnect; switch to new address and send SYN again */ set_primary_address (n, - blc_ctx->address, - blc_ctx->session, + address, + session, blc_ctx->bandwidth_in, - blc_ctx->bandwidth_out, - GNUNET_NO); + blc_ctx->bandwidth_out); set_state_and_timeout (n, GNUNET_TRANSPORT_PS_RECONNECT_SENT, GNUNET_TIME_relative_to_absolute (FAST_RECONNECT_TIMEOUT)); @@ -2687,8 +2761,8 @@ switch_address_bl_check_cont (void *cls, break; case GNUNET_TRANSPORT_PS_SWITCH_SYN_SENT: if ( (0 == GNUNET_HELLO_address_cmp (n->primary_address.address, - blc_ctx->address)) && - (n->primary_address.session == blc_ctx->session) ) + address)) && + (n->primary_address.session == session) ) { /* ATS switches back to still-active session */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2701,8 +2775,8 @@ switch_address_bl_check_cont (void *cls, } /* ATS asks us to switch a life connection, send */ set_alternative_address (n, - blc_ctx->address, - blc_ctx->session, + address, + session, blc_ctx->bandwidth_in, blc_ctx->bandwidth_out); set_state_and_timeout (n, @@ -2731,7 +2805,6 @@ switch_address_bl_check_cont (void *cls, GNUNET_CONTAINER_DLL_remove (pending_bc_head, pending_bc_tail, blc_ctx); - GNUNET_HELLO_address_free (blc_ctx->address); GNUNET_free (blc_ctx); } @@ -2791,14 +2864,14 @@ GST_neighbours_switch_to_address (const struct GNUNET_HELLO_Address *address, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "ATS suggests address '%s' for peer `%s'\n", + "ATS suggests address '%s' for peer `%s' at %u/%u speed\n", GST_plugins_a2s (address), - GNUNET_i2s (&address->peer)); + GNUNET_i2s (&address->peer), + (unsigned int) ntohl (bandwidth_in.value__), + (unsigned int) ntohl (bandwidth_out.value__)); /* Perform blacklist check */ blc_ctx = GNUNET_new (struct BlacklistCheckSwitchContext); - blc_ctx->address = GNUNET_HELLO_address_copy (address); - blc_ctx->session = session; blc_ctx->bandwidth_in = bandwidth_in; blc_ctx->bandwidth_out = bandwidth_out; GNUNET_CONTAINER_DLL_insert (pending_bc_head, @@ -2807,7 +2880,9 @@ GST_neighbours_switch_to_address (const struct GNUNET_HELLO_Address *address, if (NULL != (blc = GST_blacklist_test_allowed (&address->peer, address->transport_name, &switch_address_bl_check_cont, - blc_ctx))) + blc_ctx, + address, + session))) { blc_ctx->blc = blc; } @@ -2829,12 +2904,12 @@ send_utilization_data (void *cls, void *value) { struct NeighbourMapEntry *n = value; - struct GNUNET_ATS_Information atsi[4]; uint32_t bps_in; uint32_t bps_out; struct GNUNET_TIME_Relative delta; - if (GNUNET_YES != test_connected (n)) + if ( (GNUNET_YES != test_connected (n)) || + (NULL == n->primary_address.address) ) return GNUNET_OK; delta = GNUNET_TIME_absolute_get_difference (n->last_util_transmission, GNUNET_TIME_absolute_get ()); @@ -2845,19 +2920,14 @@ send_utilization_data (void *cls, if ((0 != n->util_total_bytes_sent) && (0 != delta.rel_value_us)) bps_out = (1000LL * 1000LL * n->util_total_bytes_sent) / delta.rel_value_us; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s' total: received %u Bytes/s, sent %u Bytes/s\n", GNUNET_i2s (key), bps_in, bps_out); - atsi[0].type = htonl (GNUNET_ATS_UTILIZATION_OUT); - atsi[0].value = htonl (bps_out); - atsi[1].type = htonl (GNUNET_ATS_UTILIZATION_IN); - atsi[1].value = htonl (bps_in); - GST_ats_update_metrics (n->primary_address.address, - n->primary_address.session, - atsi, 2); + GST_ats_update_utilization (n->primary_address.address, + bps_in, + bps_out); n->util_total_bytes_recv = 0; n->util_total_bytes_sent = 0; n->last_util_transmission = GNUNET_TIME_absolute_get (); @@ -3183,8 +3253,7 @@ GST_neighbours_handle_session_syn_ack (const struct GNUNET_MessageHeader *messag n->primary_address.address, n->primary_address.session, n->primary_address.bandwidth_in, - n->primary_address.bandwidth_out, - GNUNET_YES); + n->primary_address.bandwidth_out); send_session_ack_message (n); break; case GNUNET_TRANSPORT_PS_SYN_RECV_ATS: @@ -3225,8 +3294,7 @@ GST_neighbours_handle_session_syn_ack (const struct GNUNET_MessageHeader *messag n->alternative_address.address, n->alternative_address.session, n->alternative_address.bandwidth_in, - n->alternative_address.bandwidth_out, - GNUNET_YES); + n->alternative_address.bandwidth_out); GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# Successful attempts to switch addresses"), 1, @@ -3382,6 +3450,9 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, /* Destroy the inbound address since it cannot be used */ free_address (&n->primary_address); n->primary_address = n->alternative_address; + GNUNET_assert (GNUNET_YES == + GST_ats_is_known (n->primary_address.address, + n->primary_address.session)); memset (&n->alternative_address, 0, sizeof (struct NeighbourAddress)); @@ -3432,9 +3503,9 @@ GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message, return GNUNET_SYSERR; } GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# ACK messages received"), - 1, GNUNET_NO); + gettext_noop ("# ACK messages received"), + 1, + GNUNET_NO); if (NULL == (n = lookup_neighbour (&address->peer))) { GNUNET_break_op (0); @@ -3457,8 +3528,9 @@ GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message, now wait for the ACK to finally be connected - If we sent a SYN_ACK to this peer before */ - if ( (GNUNET_TRANSPORT_PS_SYN_RECV_ACK != n->state) && - (ACK_SEND_ACK != n->ack_state)) + if ( ( (GNUNET_TRANSPORT_PS_SYN_RECV_ACK != n->state) && + (ACK_SEND_ACK != n->ack_state) ) || + (NULL == n->primary_address.address) ) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Received unexpected ACK message from peer `%s' in state %s/%s\n", @@ -3467,7 +3539,8 @@ GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message, print_ack_state (n->ack_state)); GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# unexpected ACK messages"), 1, + gettext_noop ("# unexpected ACK messages"), + 1, GNUNET_NO); return GNUNET_OK; } @@ -3482,13 +3555,21 @@ GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message, GNUNET_TRANSPORT_PS_CONNECTED, GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT)); - /* Set primary address to used */ - set_primary_address (n, - n->primary_address.address, - n->primary_address.session, - n->primary_address.bandwidth_in, - n->primary_address.bandwidth_out, - GNUNET_YES); + if (NULL == n->primary_address.address) { + /* See issue #3693. + * We are in state = PSY_SYN_RECV_ACK or ack_state = ACK_SEND_ACK, which + * really means we did try (and succeed) to send a SYN and are waiting for + * an ACK. + * That suggests that the primary_address used to be non-NULL, but maybe it + * got reset to NULL without the state being changed appropriately? + */ + GNUNET_break (0); + return GNUNET_OK; + } + + /* Reset backoff for primary address */ + GST_ats_block_reset (n->primary_address.address, + n->primary_address.session); return GNUNET_OK; } @@ -3507,7 +3588,9 @@ GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) /** - * Change the incoming quota for the given peer. + * Change the incoming quota for the given peer. Updates + * our own receive rate and informs the neighbour about + * the new quota. * * @param neighbour identity of peer to change qutoa for * @param quota new quota @@ -3531,7 +3614,21 @@ GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour, ntohl (quota.value__), GNUNET_i2s (&n->id)); GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota); if (0 != ntohl (quota.value__)) + { + struct SessionQuotaMessage sqm; + + sqm.header.size = htons (sizeof (struct SessionQuotaMessage)); + sqm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_QUOTA); + sqm.quota = quota.value__; + (void) send_with_session (n, + &sqm, + sizeof (sqm), + UINT32_MAX - 1, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_NO, + NULL, NULL); return; + } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Disconnecting peer `%4s' due to SET_QUOTA\n", GNUNET_i2s (&n->id)); @@ -3563,6 +3660,54 @@ delayed_disconnect (void *cls, } +/** + * We received a quoat message from the given peer, + * validate and process. + * + * @param peer sender of the message + * @param msg the quota message + */ +void +GST_neighbours_handle_quota_message (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *msg) +{ + struct NeighbourMapEntry *n; + const struct SessionQuotaMessage *sqm; + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_min; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received QUOTA message from peer `%s'\n", + GNUNET_i2s (peer)); + if (ntohs (msg->size) != sizeof (struct SessionQuotaMessage)) + { + GNUNET_break_op (0); + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# quota messages ignored (malformed)"), + 1, + GNUNET_NO); + return; + } + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# QUOTA messages received"), + 1, GNUNET_NO); + sqm = (const struct SessionQuotaMessage *) msg; + if (NULL == (n = lookup_neighbour (peer))) + { + /* gone already */ + return; + } + n->neighbour_receive_quota + = GNUNET_BANDWIDTH_value_max (GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, + GNUNET_BANDWIDTH_value_init (ntohl (sqm->quota))); + + bandwidth_min = GNUNET_BANDWIDTH_value_min (n->primary_address.bandwidth_out, + n->neighbour_receive_quota); + send_outbound_quota_to_clients (peer, + bandwidth_min); +} + + /** * We received a disconnect message from the given peer, * validate and process. @@ -3577,7 +3722,7 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer struct NeighbourMapEntry *n; const struct SessionDisconnectMessage *sdm; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received DISCONNECT message from peer `%s'\n", GNUNET_i2s (peer)); if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage)) @@ -3585,7 +3730,8 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer GNUNET_break_op (0); GNUNET_STATISTICS_update (GST_stats, gettext_noop - ("# disconnect messages ignored (malformed)"), 1, + ("# disconnect messages ignored (malformed)"), + 1, GNUNET_NO); return; } @@ -3637,7 +3783,11 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer GNUNET_break_op (0); return; } - n->delayed_disconnect_task = GNUNET_SCHEDULER_add_now (&delayed_disconnect, n); + if (NULL == n->delayed_disconnect_task) + { + n->delayed_disconnect_task = GNUNET_SCHEDULER_add_now (&delayed_disconnect, + n); + } } @@ -3711,7 +3861,9 @@ GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls) return; /* can happen during shutdown */ ic.cb = cb; ic.cb_cls = cb_cls; - GNUNET_CONTAINER_multipeermap_iterate (neighbours, &neighbours_iterate, &ic); + GNUNET_CONTAINER_multipeermap_iterate (neighbours, + &neighbours_iterate, + &ic); } @@ -3830,8 +3982,6 @@ GST_neighbours_stop () GST_blacklist_test_cancel (cur->blc); cur->blc = NULL; } - if (NULL != cur->address) - GNUNET_HELLO_address_free (cur->address); GNUNET_free (cur); } }