From 67733608f6251367c11277b2f2fe0ee9d8281ccc Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 17 Jan 2015 23:30:08 +0000 Subject: [PATCH] use modern MQ API, use multipeermap instead of DLL --- src/ats/ats_api_scheduling.c | 666 +++++++++++++++-------------------- 1 file changed, 280 insertions(+), 386 deletions(-) diff --git a/src/ats/ats_api_scheduling.c b/src/ats/ats_api_scheduling.c index 496a18b12..08155eaef 100644 --- a/src/ats/ats_api_scheduling.c +++ b/src/ats/ats_api_scheduling.c @@ -27,38 +27,16 @@ #include "gnunet_ats_service.h" #include "ats.h" - -#define INTERFACE_PROCESSING_INTERVALL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) - -#define NOT_FOUND 0 - /** - * Message in linked list we should send to the ATS service. The - * actual binary message follows this struct. + * How frequently do we scan the interfaces for changes to the addresses? */ -struct PendingMessage -{ - - /** - * Kept in a DLL. - */ - struct PendingMessage *next; - - /** - * Kept in a DLL. - */ - struct PendingMessage *prev; +#define INTERFACE_PROCESSING_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2) - /** - * Size of the message. - */ - size_t size; - /** - * Is this the 'ATS_START' message? - */ - int is_init; -}; +/** + * Session ID we use if there is no session / slot. + */ +#define NOT_FOUND 0 /** @@ -83,29 +61,49 @@ struct SessionRecord }; +/** + * We keep a list of our local networks so we can answer + * LAN vs. WAN questions. Note: WLAN is not detected yet. + * (maybe we can do that heuristically based on interface + * name in the future?) + */ struct ATS_Network { - struct ATS_Network * next; + /** + * Kept in a DLL. + */ + struct ATS_Network *next; - struct ATS_Network * prev; + /** + * Kept in a DLL. + */ + struct ATS_Network *prev; + /** + * Network address. + */ struct sockaddr *network; + /** + * Netmask to determine what is in the LAN. + */ struct sockaddr *netmask; + /** + * How long are @e network and @e netmask? + */ socklen_t length; }; /** - * Handle for address suggestions + * Handle for ATS address suggestion requests. */ struct GNUNET_ATS_SuggestHandle { - struct GNUNET_ATS_SuggestHandle *prev; - - struct GNUNET_ATS_SuggestHandle *next; - + /** + * ID of the peer for which address suggestion was requested. + */ struct GNUNET_PeerIdentity id; }; @@ -132,14 +130,11 @@ struct GNUNET_ATS_SchedulingHandle void *suggest_cb_cls; /** - * DLL for suggestions head - */ - struct GNUNET_ATS_SuggestHandle *sug_head; - - /** - * DLL for suggestions tail + * Map with the identities of all the peers for which we would + * like to have address suggestions. The key is the PID, the + * value is currently the `struct GNUNET_ATS_SuggestHandle` */ - struct GNUNET_ATS_SuggestHandle *sug_tail; + struct GNUNET_CONTAINER_MultiPeerMap *sug_requests; /** * Connection to ATS service. @@ -147,27 +142,17 @@ struct GNUNET_ATS_SchedulingHandle struct GNUNET_CLIENT_Connection *client; /** - * Head of list of messages for the ATS service. - */ - struct PendingMessage *pending_head; - - /** - * Tail of list of messages for the ATS service - */ - struct PendingMessage *pending_tail; - - /** - * Current request for transmission to ATS. + * Message queue for sending requests to the ATS service. */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_MQ_Handle *mq; /** - * Head of network list + * Head of LAN networks list. */ struct ATS_Network *net_head; /** - * Tail of network list + * Tail of LAN networks list. */ struct ATS_Network *net_tail; @@ -185,7 +170,7 @@ struct GNUNET_ATS_SchedulingHandle struct GNUNET_SCHEDULER_Task *task; /** - * Task retrieving interfaces from the system + * Task for periodically refreshing our LAN network list. */ struct GNUNET_SCHEDULER_Task *interface_task; @@ -194,10 +179,6 @@ struct GNUNET_ATS_SchedulingHandle */ unsigned int session_array_size; - /** - * Should we reconnect to ATS due to some serious error? - */ - int reconnect; }; @@ -235,101 +216,18 @@ reconnect_task (void *cls, static void force_reconnect (struct GNUNET_ATS_SchedulingHandle *sh) { - sh->reconnect = GNUNET_NO; - GNUNET_CLIENT_disconnect (sh->client); - sh->client = NULL; - sh->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, - &reconnect_task, - sh); -} - - -/** - * Transmit messages from the message queue to the service - * (if there are any, and if we are not already trying). - * - * @param sh handle to use - */ -static void -do_transmit (struct GNUNET_ATS_SchedulingHandle *sh); - - -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls the `struct GNUNET_ATS_SchedulingHandle` - * @param msg message received, NULL on timeout or fatal error - */ -static void -process_ats_message (void *cls, - const struct GNUNET_MessageHeader *msg); - - -/** - * We can now transmit a message to ATS. Do it. - * - * @param cls the `struct GNUNET_ATS_SchedulingHandle` - * @param size number of bytes we can transmit to ATS - * @param buf where to copy the messages - * @return number of bytes copied into @a buf - */ -static size_t -transmit_message_to_ats (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_ATS_SchedulingHandle *sh = cls; - struct PendingMessage *p; - size_t ret; - char *cbuf; - - sh->th = NULL; - if ((0 == size) || (NULL == buf)) + if (NULL != sh->mq) { - force_reconnect (sh); - return 0; + GNUNET_MQ_destroy (sh->mq); + sh->mq = NULL; } - ret = 0; - cbuf = buf; - while ((NULL != (p = sh->pending_head)) && (p->size <= size)) + if (NULL != sh->client) { - memcpy (&cbuf[ret], - &p[1], - p->size); - ret += p->size; - size -= p->size; - GNUNET_CONTAINER_DLL_remove (sh->pending_head, - sh->pending_tail, - p); - GNUNET_free (p); + GNUNET_CLIENT_disconnect (sh->client); + sh->client = NULL; } - do_transmit (sh); - return ret; -} - - -/** - * Transmit messages from the message queue to the service - * (if there are any, and if we are not already trying). - * - * @param sh handle to use - */ -static void -do_transmit (struct GNUNET_ATS_SchedulingHandle *sh) -{ - struct PendingMessage *p; - - if (NULL != sh->th) - return; - if (NULL == (p = sh->pending_head)) - return; - if (NULL == sh->client) - return; /* currently reconnecting */ - sh->th = - GNUNET_CLIENT_notify_transmit_ready (sh->client, p->size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_message_to_ats, + sh->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &reconnect_task, sh); } @@ -371,7 +269,7 @@ find_session (struct GNUNET_ATS_SchedulingHandle *sh, sizeof (struct GNUNET_PeerIdentity))) { GNUNET_break (0); - sh->reconnect = GNUNET_YES; + force_reconnect (sh); return NULL; } /* This check exploits the fact that first field of a session object @@ -381,9 +279,11 @@ find_session (struct GNUNET_ATS_SchedulingHandle *sh, memcmp (peer, sh->session_array[session_id].session, sizeof (struct GNUNET_PeerIdentity))) { - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-scheduling-api", - "Session %p belongs to peer `%s'\n", - sh->session_array[session_id].session, GNUNET_i2s_full ((struct GNUNET_PeerIdentity *) &sh->session_array[session_id].peer)); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "ats-scheduling-api", + "Session %p belongs to peer `%s'\n", + sh->session_array[session_id].session, + GNUNET_i2s_full ((struct GNUNET_PeerIdentity *) &sh->session_array[session_id].peer)); /* GNUNET_break (0); sh->reconnect = GNUNET_YES; @@ -536,18 +436,18 @@ release_session (struct GNUNET_ATS_SchedulingHandle *sh, uint32_t session_id, const struct GNUNET_PeerIdentity *peer) { - - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-scheduling-api", - "Release sessionID %u from peer %s in %p\n", - (unsigned int) session_id, GNUNET_i2s (peer), sh); - + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "ats-scheduling-api", + "Release sessionID %u from peer %s in %p\n", + (unsigned int) session_id, + GNUNET_i2s (peer), + sh); if (session_id >= sh->session_array_size) { GNUNET_break (0); - sh->reconnect = GNUNET_YES; + force_reconnect (sh); return; } - /* this slot should have been removed from remove_session before */ GNUNET_assert (sh->session_array[session_id].session == NULL); @@ -556,33 +456,48 @@ release_session (struct GNUNET_ATS_SchedulingHandle *sh, sizeof (struct GNUNET_PeerIdentity))) { GNUNET_break (0); - sh->reconnect = GNUNET_YES; + force_reconnect (sh); return; } sh->session_array[session_id].slot_used = GNUNET_NO; - memset (&sh->session_array[session_id].peer, 0, + memset (&sh->session_array[session_id].peer, + 0, sizeof (struct GNUNET_PeerIdentity)); } +/** + * Type of a function to call when we receive a session release + * message from the service. + * + * @param cls the `struct GNUNET_ATS_SchedulingHandle` + * @param msg message received, NULL on timeout or fatal error + */ static void -process_release_message (struct GNUNET_ATS_SchedulingHandle *sh, - const struct SessionReleaseMessage *srm) +process_ats_session_release_message (void *cls, + const struct GNUNET_MessageHeader *msg) { - release_session (sh, ntohl (srm->session_id), &srm->peer); + struct GNUNET_ATS_SchedulingHandle *sh = cls; + const struct SessionReleaseMessage *srm; + + srm = (const struct SessionReleaseMessage *) msg; + + release_session (sh, + ntohl (srm->session_id), + &srm->peer); } /** - * Type of a function to call when we receive a message - * from the service. + * Type of a function to call when we receive a address suggestion + * message from the service. * * @param cls the `struct GNUNET_ATS_SchedulingHandle` * @param msg message received, NULL on timeout or fatal error */ static void -process_ats_message (void *cls, - const struct GNUNET_MessageHeader *msg) +process_ats_address_suggestion_message (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_ATS_SchedulingHandle *sh = cls; const struct AddressSuggestionMessage *m; @@ -595,23 +510,7 @@ process_ats_message (void *cls, struct GNUNET_HELLO_Address address; struct Session *s; - if (NULL == msg) - { - force_reconnect (sh); - return; - } - if ((ntohs (msg->type) == GNUNET_MESSAGE_TYPE_ATS_SESSION_RELEASE) && - (ntohs (msg->size) == sizeof (struct SessionReleaseMessage))) - { - process_release_message (sh, (const struct SessionReleaseMessage *) msg); - GNUNET_CLIENT_receive (sh->client, &process_ats_message, sh, - GNUNET_TIME_UNIT_FOREVER_REL); - if (GNUNET_YES == sh->reconnect) - force_reconnect (sh); - return; - } - if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION) || - (ntohs (msg->size) <= sizeof (struct AddressSuggestionMessage))) + if (ntohs (msg->size) <= sizeof (struct AddressSuggestionMessage)) { GNUNET_break (0); force_reconnect (sh); @@ -642,21 +541,17 @@ process_ats_message (void *cls, else { s = find_session (sh, session_id, &m->peer); - if (s == NULL) + if (NULL == s) { - - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "ats-scheduling-api", - "ATS tries to use outdated session `%s'\n", - GNUNET_i2s (&m->peer)); - GNUNET_CLIENT_receive (sh->client, &process_ats_message, sh, - GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "ats-scheduling-api", + "ATS tries to use outdated session `%s'\n", + GNUNET_i2s (&m->peer)); return; } } - if (NULL == sh->suggest_cb) - return; - + return; address.peer = m->peer; address.address = plugin_address; address.address_length = plugin_address_length; @@ -667,23 +562,40 @@ process_ats_message (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "ATS returned invalid address for peer `%s' transport `%s' address length %i, session_id %i\n", - GNUNET_i2s (&address.peer), address.transport_name, - plugin_address_length, session_id); + GNUNET_i2s (&address.peer), + address.transport_name, + plugin_address_length, + session_id); GNUNET_break_op (0); - GNUNET_CLIENT_receive (sh->client, &process_ats_message, sh, - GNUNET_TIME_UNIT_FOREVER_REL); return; } - sh->suggest_cb (sh->suggest_cb_cls, - (const struct GNUNET_PeerIdentity *) &m->peer, - &address, s, m->bandwidth_out, - m->bandwidth_in, atsi, ats_count); + &m->peer, + &address, + s, + m->bandwidth_out, + m->bandwidth_in, + atsi, ats_count); +} - GNUNET_CLIENT_receive (sh->client, &process_ats_message, sh, - GNUNET_TIME_UNIT_FOREVER_REL); - if (GNUNET_YES == sh->reconnect) - force_reconnect (sh); + +/** + * We encountered an error handling the MQ to the + * ATS service. Reconnect. + * + * @param cls the `struct GNUNET_ATS_SchedulingHandle` + * @param error details about the error + */ +static void +error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_ATS_SchedulingHandle *sh = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "ATS connection died (code %d), reconnecting\n", + (int) error); + force_reconnect (sh); } @@ -695,36 +607,39 @@ process_ats_message (void *cls, static void reconnect (struct GNUNET_ATS_SchedulingHandle *sh) { - struct PendingMessage *p; + static const struct GNUNET_MQ_MessageHandler handlers[] = + { { &process_ats_session_release_message, + GNUNET_MESSAGE_TYPE_ATS_SESSION_RELEASE, + sizeof (struct SessionReleaseMessage) }, + { &process_ats_address_suggestion_message, + GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION, + 0 }, + { NULL, 0, 0 } }; + struct GNUNET_MQ_Envelope *ev; struct ClientStartMessage *init; GNUNET_assert (NULL == sh->client); sh->client = GNUNET_CLIENT_connect ("ats", sh->cfg); - GNUNET_assert (NULL != sh->client); - GNUNET_CLIENT_receive (sh->client, - &process_ats_message, sh, - GNUNET_TIME_UNIT_FOREVER_REL); - if ( (NULL == (p = sh->pending_head)) || - (GNUNET_YES != p->is_init) ) + if (NULL == sh->client) { - p = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct ClientStartMessage)); - p->size = sizeof (struct ClientStartMessage); - p->is_init = GNUNET_YES; - init = (struct ClientStartMessage *) &p[1]; - init->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_START); - init->header.size = htons (sizeof (struct ClientStartMessage)); - init->start_flag = htonl (START_FLAG_SCHEDULING); - GNUNET_CONTAINER_DLL_insert (sh->pending_head, - sh->pending_tail, - p); + force_reconnect (sh); + return; } - do_transmit (sh); + sh->mq = GNUNET_MQ_queue_for_connection_client (sh->client, + handlers, + &error_handler, + sh); + ev = GNUNET_MQ_msg (init, + GNUNET_MESSAGE_TYPE_ATS_START); + init->start_flag = htonl (START_FLAG_SCHEDULING); + GNUNET_MQ_send (sh->mq, ev); + // FIXME: iterate over addresses... + // FIXME: iterate over peermap for address suggestion requests! } /** - * Delete the current network list. + * Delete all entries from the current network list. * * @param sh scheduling handle to clean up */ @@ -743,6 +658,20 @@ delete_networks (struct GNUNET_ATS_SchedulingHandle *sh) } +/** + * Function invoked for each interface found. Adds the interface's + * network addresses to the respective DLL, so we can distinguish + * between LAN and WAN. + * + * @param cls closure + * @param name name of the interface (can be NULL for unknown) + * @param isDefault is this presumably the default interface + * @param addr address of this interface (can be NULL for unknown or unassigned) + * @param broadcast_addr the broadcast address (can be NULL for unknown or unassigned) + * @param netmask the network mask (can be NULL for unknown or unassigned) + * @param addrlen length of the address + * @return #GNUNET_OK to continue iteration + */ static int interface_proc (void *cls, const char *name, @@ -759,7 +688,7 @@ interface_proc (void *cls, /* Skipping IPv4 loopback addresses since we have special check */ if (addr->sa_family == AF_INET) { - struct sockaddr_in * a4 = (struct sockaddr_in *) addr; + const struct sockaddr_in *a4 = (const struct sockaddr_in *) addr; if ((a4->sin_addr.s_addr & htonl(0xff000000)) == htonl (0x7f000000)) return GNUNET_OK; @@ -767,19 +696,19 @@ interface_proc (void *cls, /* Skipping IPv6 loopback addresses since we have special check */ if (addr->sa_family == AF_INET6) { - struct sockaddr_in6 * a6 = (struct sockaddr_in6 *) addr; + const struct sockaddr_in6 *a6 = (const struct sockaddr_in6 *) addr; if (IN6_IS_ADDR_LOOPBACK (&a6->sin6_addr)) return GNUNET_OK; } if (addr->sa_family == AF_INET) { - struct sockaddr_in *addr4 = (struct sockaddr_in *) addr; - struct sockaddr_in *netmask4 = (struct sockaddr_in *) netmask; - struct sockaddr_in *tmp = NULL; + const struct sockaddr_in *addr4 = (const struct sockaddr_in *) addr; + const struct sockaddr_in *netmask4 = (const struct sockaddr_in *) netmask; + struct sockaddr_in *tmp; struct sockaddr_in network4; - net = GNUNET_malloc(sizeof (struct ATS_Network) + 2 * sizeof (struct sockaddr_in)); + net = GNUNET_malloc (sizeof (struct ATS_Network) + 2 * sizeof (struct sockaddr_in)); tmp = (struct sockaddr_in *) &net[1]; net->network = (struct sockaddr *) &tmp[0]; net->netmask = (struct sockaddr *) &tmp[1]; @@ -798,12 +727,12 @@ interface_proc (void *cls, if (addr->sa_family == AF_INET6) { - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *) addr; - struct sockaddr_in6 *netmask6 = (struct sockaddr_in6 *) netmask; - struct sockaddr_in6 * tmp = NULL; + const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *) addr; + const struct sockaddr_in6 *netmask6 = (const struct sockaddr_in6 *) netmask; + struct sockaddr_in6 * tmp; struct sockaddr_in6 network6; - net = GNUNET_malloc(sizeof (struct ATS_Network) + 2 * sizeof (struct sockaddr_in6)); + net = GNUNET_malloc (sizeof (struct ATS_Network) + 2 * sizeof (struct sockaddr_in6)); tmp = (struct sockaddr_in6 *) &net[1]; net->network = (struct sockaddr *) &tmp[0]; net->netmask = (struct sockaddr *) &tmp[1]; @@ -814,7 +743,7 @@ interface_proc (void *cls, #if HAVE_SOCKADDR_IN_SIN_LEN network6.sin6_len = sizeof (network6); #endif - int c = 0; + unsigned int c = 0; uint32_t *addr_elem = (uint32_t *) &addr6->sin6_addr; uint32_t *mask_elem = (uint32_t *) &netmask6->sin6_addr; uint32_t *net_elem = (uint32_t *) &network6.sin6_addr; @@ -824,19 +753,23 @@ interface_proc (void *cls, memcpy (net->netmask, netmask6, sizeof (struct sockaddr_in6)); memcpy (net->network, &network6, sizeof (struct sockaddr_in6)); } + if (NULL == net) + return GNUNET_OK; /* odd / unsupported address family */ /* Store in list */ - if (net != NULL) - { #if VERBOSE_ATS - char * netmask = GNUNET_strdup (GNUNET_a2s((struct sockaddr *) net->netmask, addrlen)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding network `%s', netmask `%s'\n", - GNUNET_a2s((struct sockaddr *) net->network, addrlen), - netmask); - GNUNET_free (netmask); -# endif - GNUNET_CONTAINER_DLL_insert(sh->net_head, sh->net_tail, net); - } + char * netmask = GNUNET_strdup (GNUNET_a2s((struct sockaddr *) net->netmask, addrlen)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding network `%s', netmask `%s'\n", + GNUNET_a2s ((struct sockaddr *) net->network, + addrlen), + netmask); + GNUNET_free (netmask); +#endif + GNUNET_CONTAINER_DLL_insert (sh->net_head, + sh->net_tail, + net); + return GNUNET_OK; } @@ -857,7 +790,7 @@ get_addresses (void *cls, delete_networks (sh); GNUNET_OS_network_interfaces_list (&interface_proc, sh); - sh->interface_task = GNUNET_SCHEDULER_add_delayed (INTERFACE_PROCESSING_INTERVALL, + sh->interface_task = GNUNET_SCHEDULER_add_delayed (INTERFACE_PROCESSING_INTERVAL, &get_addresses, sh); } @@ -974,7 +907,7 @@ GNUNET_ATS_address_get_type (struct GNUNET_ATS_SchedulingHandle *sh, } /* Check local networks */ - while ((cur != NULL) && (type == GNUNET_ATS_NET_UNSPECIFIED)) + while ((NULL != cur) && (GNUNET_ATS_NET_UNSPECIFIED == type)) { if (addrlen != cur->length) { @@ -983,18 +916,18 @@ GNUNET_ATS_address_get_type (struct GNUNET_ATS_SchedulingHandle *sh, } if (addr->sa_family == AF_INET) { - struct sockaddr_in * a4 = (struct sockaddr_in *) addr; - struct sockaddr_in * net4 = (struct sockaddr_in *) cur->network; - struct sockaddr_in * mask4 = (struct sockaddr_in *) cur->netmask; + const struct sockaddr_in *a4 = (const struct sockaddr_in *) addr; + const struct sockaddr_in *net4 = (const struct sockaddr_in *) cur->network; + const struct sockaddr_in *mask4 = (const struct sockaddr_in *) cur->netmask; if (((a4->sin_addr.s_addr & mask4->sin_addr.s_addr)) == net4->sin_addr.s_addr) type = GNUNET_ATS_NET_LAN; } if (addr->sa_family == AF_INET6) { - struct sockaddr_in6 * a6 = (struct sockaddr_in6 *) addr; - struct sockaddr_in6 * net6 = (struct sockaddr_in6 *) cur->network; - struct sockaddr_in6 * mask6 = (struct sockaddr_in6 *) cur->netmask; + const struct sockaddr_in6 *a6 = (const struct sockaddr_in6 *) addr; + const struct sockaddr_in6 *net6 = (const struct sockaddr_in6 *) cur->network; + const struct sockaddr_in6 *mask6 = (const struct sockaddr_in6 *) cur->netmask; int res = GNUNET_YES; int c = 0; @@ -1046,9 +979,11 @@ GNUNET_ATS_scheduling_init (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_array_grow (sh->session_array, sh->session_array_size, 4); + sh->sug_requests = GNUNET_CONTAINER_multipeermap_create (32, + GNUNET_YES); GNUNET_OS_network_interfaces_list (&interface_proc, sh); - sh->interface_task = GNUNET_SCHEDULER_add_delayed (INTERFACE_PROCESSING_INTERVALL, + sh->interface_task = GNUNET_SCHEDULER_add_delayed (INTERFACE_PROCESSING_INTERVAL, &get_addresses, sh); reconnect (sh); @@ -1056,6 +991,28 @@ GNUNET_ATS_scheduling_init (const struct GNUNET_CONFIGURATION_Handle *cfg, } +/** + * Function called to free all `struct GNUNET_ATS_SuggestHandles` + * in the map. + * + * @param cls NULL + * @param key the key + * @param value the value to free + * @return #GNUNET_OK (continue to iterate) + */ +static int +free_sug_handle (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) +{ + struct GNUNET_ATS_SuggestHandle *cur = value; + + GNUNET_free (cur); + return GNUNET_OK; +} + + + /** * Client is done with ATS scheduling, release resources. * @@ -1064,15 +1021,10 @@ GNUNET_ATS_scheduling_init (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_ATS_scheduling_done (struct GNUNET_ATS_SchedulingHandle *sh) { - struct PendingMessage *p; - struct GNUNET_ATS_SuggestHandle *cur; - - while (NULL != (p = sh->pending_head)) + if (NULL != sh->mq) { - GNUNET_CONTAINER_DLL_remove (sh->pending_head, - sh->pending_tail, - p); - GNUNET_free (p); + GNUNET_MQ_destroy (sh->mq); + sh->mq = NULL; } if (NULL != sh->client) { @@ -1084,19 +1036,16 @@ GNUNET_ATS_scheduling_done (struct GNUNET_ATS_SchedulingHandle *sh) GNUNET_SCHEDULER_cancel (sh->task); sh->task = NULL; } - while (NULL != (cur = sh->sug_head)) - { - GNUNET_CONTAINER_DLL_remove (sh->sug_head, - sh->sug_tail, - cur); - GNUNET_free (cur); - } - delete_networks (sh); + GNUNET_CONTAINER_multipeermap_iterate (sh->sug_requests, + &free_sug_handle, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (sh->sug_requests); if (NULL != sh->interface_task) { GNUNET_SCHEDULER_cancel (sh->interface_task); sh->interface_task = NULL; } + delete_networks (sh); GNUNET_array_grow (sh->session_array, sh->session_array_size, 0); @@ -1115,22 +1064,13 @@ void GNUNET_ATS_reset_backoff (struct GNUNET_ATS_SchedulingHandle *sh, const struct GNUNET_PeerIdentity *peer) { - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *ev; struct ResetBackoffMessage *m; - p = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct ResetBackoffMessage)); - p->size = sizeof (struct ResetBackoffMessage); - p->is_init = GNUNET_NO; - m = (struct ResetBackoffMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_RESET_BACKOFF); - m->header.size = htons (sizeof (struct ResetBackoffMessage)); + ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_ATS_RESET_BACKOFF); m->reserved = htonl (0); m->peer = *peer; - GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head, - sh->pending_tail, - p); - do_transmit (sh); + GNUNET_MQ_send (sh->mq, ev); } @@ -1148,29 +1088,23 @@ struct GNUNET_ATS_SuggestHandle * GNUNET_ATS_suggest_address (struct GNUNET_ATS_SchedulingHandle *sh, const struct GNUNET_PeerIdentity *peer) { - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *ev; struct RequestAddressMessage *m; struct GNUNET_ATS_SuggestHandle *s; - // FIXME: ATS needs to remember this in case of - // a disconnect! - p = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct RequestAddressMessage)); - p->size = sizeof (struct RequestAddressMessage); - m = (struct RequestAddressMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_REQUEST_ADDRESS); - m->header.size = htons (sizeof (struct RequestAddressMessage)); - m->reserved = htonl (0); - m->peer = *peer; - GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head, - sh->pending_tail, - p); - do_transmit (sh); s = GNUNET_new (struct GNUNET_ATS_SuggestHandle); s->id = *peer; - GNUNET_CONTAINER_DLL_insert_tail (sh->sug_head, - sh->sug_tail, - s); + GNUNET_break (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_put (sh->sug_requests, + &s->id, + s, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + if (NULL == sh->mq) + return s; + ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_ATS_REQUEST_ADDRESS); + m->reserved = htonl (0); + m->peer = *peer; + GNUNET_MQ_send (sh->mq, ev); return s; } @@ -1185,36 +1119,28 @@ void GNUNET_ATS_suggest_address_cancel (struct GNUNET_ATS_SchedulingHandle *sh, const struct GNUNET_PeerIdentity *peer) { - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *ev; struct RequestAddressMessage *m; struct GNUNET_ATS_SuggestHandle *s; - for (s = sh->sug_head; NULL != s; s = s->next) - if (0 == memcmp (peer, &s->id, sizeof (s->id))) - break; + s = GNUNET_CONTAINER_multipeermap_get (sh->sug_requests, + peer); if (NULL == s) { GNUNET_break (0); return; } - GNUNET_CONTAINER_DLL_remove (sh->sug_head, - sh->sug_tail, - s); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_remove (sh->sug_requests, + &s->id, + s)); GNUNET_free (s); - - p = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct RequestAddressMessage)); - p->size = sizeof (struct RequestAddressMessage); - p->is_init = GNUNET_NO; - m = (struct RequestAddressMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_REQUEST_ADDRESS_CANCEL); - m->header.size = htons (sizeof (struct RequestAddressMessage)); + if (NULL == sh->mq) + return; + ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_ATS_REQUEST_ADDRESS_CANCEL); m->reserved = htonl (0); m->peer = *peer; - GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head, - sh->pending_tail, - p); - do_transmit (sh); + GNUNET_MQ_send (sh->mq, ev); } @@ -1242,8 +1168,9 @@ GNUNET_ATS_session_known (struct GNUNET_ATS_SchedulingHandle *sh, /** - * We have a new address ATS should know. Addresses have to be added with this - * function before they can be: updated, set in use and destroyed + * We have a new address ATS should know. Addresses have to be added + * with this function before they can be: updated, set in use and + * destroyed. * * @param sh handle * @param address the address @@ -1259,8 +1186,7 @@ GNUNET_ATS_address_add (struct GNUNET_ATS_SchedulingHandle *sh, const struct GNUNET_ATS_Information *ats, uint32_t ats_count) { - - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *ev; struct AddressUpdateMessage *m; struct GNUNET_ATS_Information *am; char *pm; @@ -1278,9 +1204,9 @@ GNUNET_ATS_address_add (struct GNUNET_ATS_SchedulingHandle *sh, ? 0 : strlen (address->transport_name) + 1; - msize = sizeof (struct AddressUpdateMessage) + address->address_length + + msize = address->address_length + ats_count * sizeof (struct GNUNET_ATS_Information) + namelen; - if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || + if ((msize + sizeof (struct AddressUpdateMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (address->address_length >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (namelen >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (ats_count >= @@ -1303,12 +1229,7 @@ GNUNET_ATS_address_add (struct GNUNET_ATS_SchedulingHandle *sh, GNUNET_break (NOT_FOUND != s); } - p = GNUNET_malloc (sizeof (struct PendingMessage) + msize); - p->size = msize; - p->is_init = GNUNET_NO; - m = (struct AddressUpdateMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_ADD); - m->header.size = htons (msize); + ev = GNUNET_MQ_msg_extra (m, msize, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_ADD); m->ats_count = htonl (ats_count); m->peer = address->peer; m->address_length = htons (address->address_length); @@ -1322,19 +1243,17 @@ GNUNET_ATS_address_add (struct GNUNET_ATS_SchedulingHandle *sh, address->transport_name, session, s); - am = (struct GNUNET_ATS_Information *) &m[1]; - memcpy (am, ats, ats_count * sizeof (struct GNUNET_ATS_Information)); + memcpy (am, + ats, + ats_count * sizeof (struct GNUNET_ATS_Information)); pm = (char *) &am[ats_count]; memcpy (pm, address->address, address->address_length); if (NULL != address->transport_name) memcpy (&pm[address->address_length], address->transport_name, namelen); - GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head, - sh->pending_tail, - p); - do_transmit (sh); + GNUNET_MQ_send (sh->mq, ev); return GNUNET_OK; } @@ -1362,7 +1281,7 @@ GNUNET_ATS_address_update (struct GNUNET_ATS_SchedulingHandle *sh, const struct GNUNET_ATS_Information *ats, uint32_t ats_count) { - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *ev; struct AddressUpdateMessage *m; struct GNUNET_ATS_Information *am; char *pm; @@ -1383,10 +1302,9 @@ GNUNET_ATS_address_update (struct GNUNET_ATS_SchedulingHandle *sh, namelen = (address->transport_name == NULL) ? 0 : strlen (address->transport_name) + 1; - msize = - sizeof (struct AddressUpdateMessage) + address->address_length + + msize = address->address_length + ats_count * sizeof (struct GNUNET_ATS_Information) + namelen; - if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || + if ((msize + sizeof (struct AddressUpdateMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (address->address_length >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (namelen >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (ats_count >= @@ -1403,18 +1321,12 @@ GNUNET_ATS_address_update (struct GNUNET_ATS_SchedulingHandle *sh, return GNUNET_NO; } - p = GNUNET_malloc (sizeof (struct PendingMessage) + msize); - p->size = msize; - p->is_init = GNUNET_NO; - m = (struct AddressUpdateMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE); - m->header.size = htons (msize); + ev = GNUNET_MQ_msg_extra (m, msize, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE); m->ats_count = htonl (ats_count); m->peer = address->peer; m->address_length = htons (address->address_length); m->address_local_info = htonl ((uint32_t) address->local_info); m->plugin_name_length = htons (namelen); - m->session_id = htonl (s); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1431,8 +1343,7 @@ GNUNET_ATS_address_update (struct GNUNET_ATS_SchedulingHandle *sh, pm = (char *) &am[ats_count]; memcpy (pm, address->address, address->address_length); memcpy (&pm[address->address_length], address->transport_name, namelen); - GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head, sh->pending_tail, p); - do_transmit (sh); + GNUNET_MQ_send (sh->mq, ev); return GNUNET_YES; } @@ -1452,19 +1363,18 @@ GNUNET_ATS_address_in_use (struct GNUNET_ATS_SchedulingHandle *sh, struct Session *session, int in_use) { - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *ev; struct AddressUseMessage *m; char *pm; size_t namelen; size_t msize; uint32_t s = 0; - GNUNET_assert (NULL != address); namelen = (address->transport_name == NULL) ? 0 : strlen (address->transport_name) + 1; - msize = sizeof (struct AddressUseMessage) + address->address_length + namelen; - if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || + msize = address->address_length + namelen; + if ((msize + sizeof (struct AddressUseMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (address->address_length >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (namelen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)) { @@ -1472,7 +1382,7 @@ GNUNET_ATS_address_in_use (struct GNUNET_ATS_SchedulingHandle *sh, return; } - if (session != NULL) + if (NULL != session) { s = find_session_id (sh, session, &address->peer); if ((s == NOT_FOUND) && (GNUNET_NO == in_use)) @@ -1492,12 +1402,7 @@ GNUNET_ATS_address_in_use (struct GNUNET_ATS_SchedulingHandle *sh, } } - p = GNUNET_malloc (sizeof (struct PendingMessage) + msize); - p->size = msize; - p->is_init = GNUNET_NO; - m = (struct AddressUseMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_IN_USE); - m->header.size = htons (msize); + ev = GNUNET_MQ_msg_extra (m, msize, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_IN_USE); m->peer = address->peer; m->in_use = htons (in_use); m->address_length = htons (address->address_length); @@ -1513,8 +1418,7 @@ GNUNET_ATS_address_in_use (struct GNUNET_ATS_SchedulingHandle *sh, pm = (char *) &m[1]; memcpy (pm, address->address, address->address_length); memcpy (&pm[address->address_length], address->transport_name, namelen); - GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head, sh->pending_tail, p); - do_transmit (sh); + GNUNET_MQ_send (sh->mq, ev); } @@ -1533,7 +1437,7 @@ GNUNET_ATS_address_destroyed (struct GNUNET_ATS_SchedulingHandle *sh, const struct GNUNET_HELLO_Address *address, struct Session *session) { - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *ev; struct AddressDestroyedMessage *m; char *pm; size_t namelen; @@ -1545,14 +1449,11 @@ GNUNET_ATS_address_destroyed (struct GNUNET_ATS_SchedulingHandle *sh, GNUNET_break (0); return; } - - GNUNET_assert (address->transport_name != NULL); + GNUNET_assert (NULL != address->transport_name); namelen = strlen (address->transport_name) + 1; GNUNET_assert (namelen > 1); - msize = - sizeof (struct AddressDestroyedMessage) + address->address_length + - namelen; - if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || + msize = address->address_length + namelen; + if ((msize + sizeof (struct AddressDestroyedMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (address->address_length >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (namelen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)) { @@ -1570,13 +1471,7 @@ GNUNET_ATS_address_destroyed (struct GNUNET_ATS_SchedulingHandle *sh, return; } - p = GNUNET_malloc (sizeof (struct PendingMessage) + msize); - p->size = msize; - p->is_init = GNUNET_NO; - m = (struct AddressDestroyedMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_DESTROYED); - m->header.size = htons (msize); - m->reserved = htonl (0); + ev = GNUNET_MQ_msg_extra (m, msize, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_DESTROYED); m->peer = address->peer; m->address_length = htons (address->address_length); m->address_local_info = htonl ((uint32_t) address->local_info); @@ -1590,8 +1485,7 @@ GNUNET_ATS_address_destroyed (struct GNUNET_ATS_SchedulingHandle *sh, pm = (char *) &m[1]; memcpy (pm, address->address, address->address_length); memcpy (&pm[address->address_length], address->transport_name, namelen); - GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head, sh->pending_tail, p); - do_transmit (sh); + GNUNET_MQ_send (sh->mq, ev); remove_session (sh, s, &address->peer); } -- 2.25.1