X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fats%2Fgnunet-service-ats_scheduling.c;h=4a20f909dbc496a8e915d5224ee98551fcd0f86b;hb=bace3fe85e0dccc59a1d2352e028021e528608b5;hp=8125b8664c2e538b451a89545a58f09d6a176de7;hpb=5419748cc47ea35b77a616ff269aae822cab8cea;p=oweals%2Fgnunet.git diff --git a/src/ats/gnunet-service-ats_scheduling.c b/src/ats/gnunet-service-ats_scheduling.c index 8125b8664..4a20f909d 100644 --- a/src/ats/gnunet-service-ats_scheduling.c +++ b/src/ats/gnunet-service-ats_scheduling.c @@ -25,98 +25,49 @@ * @author Christian Grothoff */ #include "platform.h" +#include "gnunet-service-ats.h" #include "gnunet-service-ats_addresses.h" #include "gnunet-service-ats_scheduling.h" #include "ats.h" -/** - * We keep clients that are interested in scheduling in a linked list. - * This list typically has only one entry (for the - * gnunet-service-transport process); however, it is possible that - * there is more than one (at least briefly) because after a crash a - * new one may connect before we've been notified to clean up the old - * process. - */ -struct SchedulingClient -{ - /** - * Next in doubly-linked list. - */ - struct SchedulingClient * next; - - /** - * Previous in doubly-linked list. - */ - struct SchedulingClient * prev; - - /** - * Actual handle to the client. - */ - struct GNUNET_SERVER_Client *client; - -}; - - -/** - * Head of linked list of all clients to this service. - */ -static struct SchedulingClient *sc_head; - -/** - * Tail of linked list of all clients to this service. - */ -static struct SchedulingClient *sc_tail; - /** * Context for sending messages to clients. */ static struct GNUNET_SERVER_NotificationContext *nc; -static unsigned long long total_quota_in; - -static unsigned long long total_quota_out; - - /** - * Find the scheduling client associated with the given - * handle. - * - * @param client server handle - * @return internal handle + * Actual handle to the client. */ -static struct SchedulingClient * -find_client (struct GNUNET_SERVER_Client *client) -{ - struct SchedulingClient * sc; +static struct GNUNET_SERVER_Client *my_client; - for (sc = sc_head; sc != NULL; sc = sc->next) - if (sc->client == client) - return sc; - return NULL; -} +/** + * Handle to address subsystem + */ +static struct GAS_Addresses_Handle *address_handle; /** * Register a new scheduling client. * * @param client handle of the new client + * @return GNUNET_OK on success, GNUNET_SYSERR on error */ -void +int GAS_scheduling_add_client (struct GNUNET_SERVER_Client *client) { - struct SchedulingClient *sc; - - GNUNET_break (NULL == find_client (client)); - sc = GNUNET_malloc (sizeof (struct SchedulingClient)); - sc->client = client; + if (my_client != NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "This ATS already has a scheduling client, refusing new scheduling client for now.\n"); + return GNUNET_SYSERR; + } + my_client = client; GNUNET_SERVER_notification_context_add (nc, client); - GNUNET_SERVER_client_keep (client); - GNUNET_CONTAINER_DLL_insert(sc_head, sc_tail, sc); + return GNUNET_OK; } - /** * Unregister a client (which may have been a scheduling client, * but this is not assured). @@ -126,15 +77,10 @@ GAS_scheduling_add_client (struct GNUNET_SERVER_Client *client) void GAS_scheduling_remove_client (struct GNUNET_SERVER_Client *client) { - struct SchedulingClient * sc; - - sc = find_client (client); - if (NULL == sc) + if (my_client != client) return; - GNUNET_CONTAINER_DLL_remove (sc_head, sc_tail, sc); - GAS_address_client_disconnected (client); - GNUNET_SERVER_client_drop (client); - GNUNET_free (sc); + GAS_addresses_destroy_all (address_handle); + my_client = NULL; } @@ -146,7 +92,6 @@ GAS_scheduling_remove_client (struct GNUNET_SERVER_Client *client) * @param plugin_name 0-termintated string specifying the transport plugin * @param plugin_addr binary address for the plugin to use * @param plugin_addr_len number of bytes in plugin_addr - * @param session_client which client gave us this session_id? * @param session_id session ID to use for the given client (other clients will see 0) * @param atsi performance data for the address * @param atsi_count number of performance records in 'ats' @@ -154,53 +99,59 @@ GAS_scheduling_remove_client (struct GNUNET_SERVER_Client *client) * @param bandwidth_in assigned inbound bandwidth */ void -GAS_scheduling_transmit_address_suggestion (const struct GNUNET_PeerIdentity *peer, - const char *plugin_name, - const void *plugin_addr, size_t plugin_addr_len, - struct GNUNET_SERVER_Client *session_client, - uint32_t session_id, - const struct GNUNET_TRANSPORT_ATS_Information *atsi, - uint32_t atsi_count, - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in) +GAS_scheduling_transmit_address_suggestion (const struct GNUNET_PeerIdentity + *peer, const char *plugin_name, + const void *plugin_addr, + size_t plugin_addr_len, + uint32_t session_id, + const struct GNUNET_ATS_Information + *atsi, uint32_t atsi_count, + struct GNUNET_BANDWIDTH_Value32NBO + bandwidth_out, + struct GNUNET_BANDWIDTH_Value32NBO + bandwidth_in) { - struct SchedulingClient *sc; struct AddressSuggestionMessage *msg; size_t plugin_name_length = strlen (plugin_name) + 1; - size_t msize = sizeof (struct AddressSuggestionMessage) + atsi_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) - + plugin_addr_len + plugin_name_length; - char buf[msize]; - struct GNUNET_TRANSPORT_ATS_Information *atsp; + size_t msize = + sizeof (struct AddressSuggestionMessage) + + atsi_count * sizeof (struct GNUNET_ATS_Information) + plugin_addr_len + + plugin_name_length; + char buf[msize] GNUNET_ALIGN; + struct GNUNET_ATS_Information *atsp; char *addrp; + if (my_client == NULL) + return; + GNUNET_STATISTICS_update (GSA_stats, "# address suggestions made", 1, + GNUNET_NO); GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - GNUNET_assert (atsi_count < GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_TRANSPORT_ATS_Information)); - msg = (struct AddressSuggestionMessage*) buf; + GNUNET_assert (atsi_count < + GNUNET_SERVER_MAX_MESSAGE_SIZE / + sizeof (struct GNUNET_ATS_Information)); + msg = (struct AddressSuggestionMessage *) buf; msg->header.size = htons (msize); msg->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION); msg->ats_count = htonl (atsi_count); msg->peer = *peer; msg->address_length = htons (plugin_addr_len); msg->plugin_name_length = htons (plugin_name_length); - /* session ID is set only if 'client' is the same... */ + msg->session_id = htonl (session_id); msg->bandwidth_out = bandwidth_out; msg->bandwidth_in = bandwidth_in; - atsp = (struct GNUNET_TRANSPORT_ATS_Information* ) &msg[1]; - memcpy (atsp, atsi, sizeof (struct GNUNET_TRANSPORT_ATS_Information) * atsi_count); - addrp = (char*) &atsp[atsi_count]; + atsp = (struct GNUNET_ATS_Information *) &msg[1]; + memcpy (atsp, atsi, sizeof (struct GNUNET_ATS_Information) * atsi_count); + addrp = (char *) &atsp[atsi_count]; memcpy (addrp, plugin_addr, plugin_addr_len); strcpy (&addrp[plugin_addr_len], plugin_name); - for (sc = sc_head; sc != NULL; sc = sc->next) - { - if (sc->client == session_client) - msg->session_id = htonl (session_id); - else - msg->session_id = htonl (0); - GNUNET_SERVER_notification_context_unicast (nc, - sc->client, - &msg->header, - GNUNET_YES); - } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "ATS sends quota for peer `%s': (in/out) %u/%u\n", + GNUNET_i2s (peer), ntohl (bandwidth_in.value__), + ntohl (bandwidth_out.value__)); + + GNUNET_SERVER_notification_context_unicast (nc, my_client, &msg->header, + GNUNET_YES); } @@ -213,14 +164,120 @@ GAS_scheduling_transmit_address_suggestion (const struct GNUNET_PeerIdentity *pe */ void GAS_handle_request_address (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct GNUNET_MessageHeader *message) +{ + const struct RequestAddressMessage *msg = + (const struct RequestAddressMessage *) message; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' message\n", + "REQUEST_ADDRESS"); + GNUNET_break (0 == ntohl (msg->reserved)); + GAS_addresses_request_address (address_handle, &msg->peer); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + +/** + * Handle 'request address' messages from clients. + * + * @param cls unused, NULL + * @param client client that sent the request + * @param message the request message + */ +void +GAS_handle_request_address_cancel (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { - const struct RequestAddressMessage * msg = (const struct RequestAddressMessage *) message; + const struct RequestAddressMessage *msg = + (const struct RequestAddressMessage *) message; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' message\n", "REQUEST_ADDRESS"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' message\n", + "REQUEST_ADDRESS_CANCEL"); GNUNET_break (0 == ntohl (msg->reserved)); - GAS_addresses_request_address (&msg->peer); + + GAS_addresses_request_address_cancel (address_handle, &msg->peer); + + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + +/** + * Handle 'reset backoff' messages from clients. + * + * @param cls unused, NULL + * @param client client that sent the request + * @param message the request message + */ +void +GAS_handle_reset_backoff (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct ResetBackoffMessage *msg = + (const struct ResetBackoffMessage *) message; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' message\n", + "RESET_BACKOFF"); + GNUNET_break (0 == ntohl (msg->reserved)); + GAS_addresses_handle_backoff_reset (address_handle, &msg->peer); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + +/** + * Handle 'address add' messages from clients. + * + * @param cls unused, NULL + * @param client client that sent the request + * @param message the request message + */ +void +GAS_handle_address_add (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct AddressUpdateMessage *m; + const struct GNUNET_ATS_Information *atsi; + const char *address; + const char *plugin_name; + uint16_t address_length; + uint16_t plugin_name_length; + uint32_t ats_count; + uint16_t size; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' message\n", + "ADDRESS_ADD"); + size = ntohs (message->size); + if (size < sizeof (struct AddressUpdateMessage)) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + m = (const struct AddressUpdateMessage *) message; + ats_count = ntohl (m->ats_count); + address_length = ntohs (m->address_length); + plugin_name_length = ntohs (m->plugin_name_length); + atsi = (const struct GNUNET_ATS_Information *) &m[1]; + address = (const char *) &atsi[ats_count]; + if (plugin_name_length != 0) + plugin_name = &address[address_length]; + else + plugin_name = ""; + + if ((address_length + plugin_name_length + + ats_count * sizeof (struct GNUNET_ATS_Information) + + sizeof (struct AddressUpdateMessage) != ntohs (message->size)) || + (ats_count > + GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_ATS_Information)) || + ((plugin_name_length > 0) && (plugin_name[plugin_name_length - 1] != '\0'))) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + GNUNET_STATISTICS_update (GSA_stats, "# address updates received", 1, + GNUNET_NO); + GAS_addresses_add (address_handle, &m->peer, plugin_name, address, address_length, + ntohl (m->session_id), atsi, ats_count); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -234,11 +291,10 @@ GAS_handle_request_address (void *cls, struct GNUNET_SERVER_Client *client, */ void GAS_handle_address_update (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) - + const struct GNUNET_MessageHeader *message) { - const struct AddressUpdateMessage * m; - const struct GNUNET_TRANSPORT_ATS_Information *atsi; + const struct AddressUpdateMessage *m; + const struct GNUNET_ATS_Information *atsi; const char *address; const char *plugin_name; uint16_t address_length; @@ -246,111 +302,178 @@ GAS_handle_address_update (void *cls, struct GNUNET_SERVER_Client *client, uint32_t ats_count; uint16_t size; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' message\n", - "ADDRESS_UPDATE"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' message\n", + "ADDRESS_UPDATE"); size = ntohs (message->size); - if (size <= sizeof (struct AddressUpdateMessage)) + if (size < sizeof (struct AddressUpdateMessage)) { GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - m = (const struct AddressUpdateMessage*) message; + m = (const struct AddressUpdateMessage *) message; ats_count = ntohl (m->ats_count); address_length = ntohs (m->address_length); - plugin_name_length = ntohs (m->plugin_name_length); - atsi = (const struct GNUNET_TRANSPORT_ATS_Information*) &m[1]; - address = (const char*) &atsi[ats_count]; + plugin_name_length = ntohs (m->plugin_name_length); + atsi = (const struct GNUNET_ATS_Information *) &m[1]; + address = (const char *) &atsi[ats_count]; if (plugin_name_length != 0) plugin_name = &address[address_length]; else plugin_name = ""; - if ( (address_length + - plugin_name_length + - ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) + - sizeof (struct AddressUpdateMessage) != ntohs (message->size)) || - (ats_count > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_TRANSPORT_ATS_Information)) || - (plugin_name[plugin_name_length - 1] != '\0') ) + + if ((address_length + plugin_name_length + + ats_count * sizeof (struct GNUNET_ATS_Information) + + sizeof (struct AddressUpdateMessage) != ntohs (message->size)) || + (ats_count > + GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_ATS_Information)) || + ((plugin_name_length > 0) && (plugin_name[plugin_name_length - 1] != '\0'))) { GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - GAS_address_update (&m->peer, - plugin_name, - address, - address_length, - client, - ntohl (m->session_id), - atsi, - ats_count); + GNUNET_STATISTICS_update (GSA_stats, "# address updates received", 1, + GNUNET_NO); + GAS_addresses_update (address_handle, &m->peer, plugin_name, address, address_length, + ntohl (m->session_id), atsi, ats_count); GNUNET_SERVER_receive_done (client, GNUNET_OK); } /** - * Handle 'address destroyed' messages from clients. + * Handle 'address in use' messages from clients. * * @param cls unused, NULL * @param client client that sent the request * @param message the request message */ void -GAS_handle_address_destroyed (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) - +GAS_handle_address_in_use (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { - const struct AddressDestroyedMessage * m; + const struct AddressUseMessage *m; const char *address; const char *plugin_name; + int res; uint16_t address_length; uint16_t plugin_name_length; + uint16_t size; + uint16_t in_use; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' message of size %u %u\n", - "ADDRESS_DESTROYED", ntohs (message->size), sizeof (struct AddressDestroyedMessage)); size = ntohs (message->size); - if (size < sizeof (struct AddressDestroyedMessage)) + if (size < sizeof (struct AddressUseMessage)) { GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - m = (const struct AddressDestroyedMessage*) message; - GNUNET_break (0 == ntohl (m->reserved)); + m = (const struct AddressUseMessage *) message; + address_length = ntohs (m->address_length); - plugin_name_length = ntohs (m->plugin_name_length); - address = (const char*) &m[1]; + plugin_name_length = ntohs (m->plugin_name_length); + + address = (const char *) &m[1]; if (plugin_name_length != 0) plugin_name = &address[address_length]; else plugin_name = ""; - if ( (address_length + - plugin_name_length + - sizeof (struct AddressDestroyedMessage) != ntohs (message->size))) + if ((address_length + plugin_name_length + + sizeof (struct AddressUseMessage) != ntohs (message->size)) || + ((plugin_name_length > 0) && + (plugin_name[plugin_name_length - 1] != '\0'))) { GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + in_use = ntohs (m->in_use); + res = GAS_addresses_in_use (address_handle, + &m->peer, + plugin_name, + address, + address_length, + ntohl (m->session_id), + in_use); + + if (res == GNUNET_OK) + GNUNET_SERVER_receive_done (client, GNUNET_OK); + else + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + } + +} + +/** + * Handle 'address destroyed' messages from clients. + * + * @param cls unused, NULL + * @param client client that sent the request + * @param message the request message + */ +void +GAS_handle_address_destroyed (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct AddressDestroyedMessage *m; + struct SessionReleaseMessage srm; + const char *address; + const char *plugin_name; + uint16_t address_length; + uint16_t plugin_name_length; + uint16_t size; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' message of size %u %u\n", + "ADDRESS_DESTROYED", ntohs (message->size), + sizeof (struct AddressDestroyedMessage)); + size = ntohs (message->size); + if ((size < sizeof (struct AddressDestroyedMessage)) || (client != my_client)) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + m = (const struct AddressDestroyedMessage *) message; + GNUNET_break (0 == ntohl (m->reserved)); + address_length = ntohs (m->address_length); + plugin_name_length = ntohs (m->plugin_name_length); + address = (const char *) &m[1]; if (plugin_name_length != 0) - if (plugin_name[plugin_name_length - 1] != '\0') - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - - GAS_address_destroyed (&m->peer, - plugin_name, - address, - address_length, - client, - ntohl (m->session_id)); + plugin_name = &address[address_length]; + else + plugin_name = ""; + if ((address_length + plugin_name_length + + sizeof (struct AddressDestroyedMessage) != ntohs (message->size))) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + if ((plugin_name_length == 0) || + (plugin_name[plugin_name_length - 1] != '\0')) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + GNUNET_STATISTICS_update (GSA_stats, "# addresses destroyed", 1, GNUNET_NO); + GAS_addresses_destroy (address_handle, &m->peer, plugin_name, + address, address_length, + ntohl (m->session_id)); + if (0 != ntohl (m->session_id)) + { + srm.header.type = ntohs (GNUNET_MESSAGE_TYPE_ATS_SESSION_RELEASE); + srm.header.size = ntohs (sizeof (struct SessionReleaseMessage)); + srm.session_id = m->session_id; + srm.peer = m->peer; + GNUNET_SERVER_notification_context_unicast (nc, client, &srm.header, + GNUNET_NO); + } GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -359,22 +482,12 @@ GAS_handle_address_destroyed (void *cls, struct GNUNET_SERVER_Client *client, * Initialize scheduling subsystem. * * @param server handle to our server - * @param cfg configuration to use + * @param ah the address handle to use */ void -GAS_scheduling_init (struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *cfg) +GAS_scheduling_init (struct GNUNET_SERVER_Handle *server, struct GAS_Addresses_Handle *ah) { - GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_get_value_number (cfg, - "core", - "TOTAL_QUOTA_IN", - &total_quota_in)); - GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_get_value_number (cfg, - "core", - "TOTAL_QUOTA_OUT", - &total_quota_out)); + address_handle = ah; nc = GNUNET_SERVER_notification_context_create (server, 128); } @@ -385,8 +498,13 @@ GAS_scheduling_init (struct GNUNET_SERVER_Handle *server, void GAS_scheduling_done () { + if (NULL != my_client) + { + my_client = NULL; + } GNUNET_SERVER_notification_context_destroy (nc); nc = NULL; + }