From 9a3d2cb8ea446ae1291286b94a7784676ea134cb Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 23 Jun 2016 17:31:13 +0000 Subject: [PATCH] convert perf API to new MQ API --- src/ats/ats_api_performance.c | 655 ++++++++++++++-------------------- 1 file changed, 268 insertions(+), 387 deletions(-) diff --git a/src/ats/ats_api_performance.c b/src/ats/ats_api_performance.c index 48bb2ebf4..60827e174 100644 --- a/src/ats/ats_api_performance.c +++ b/src/ats/ats_api_performance.c @@ -1,21 +1,21 @@ /* - This file is part of GNUnet. - Copyright (C) 2010,2011 GNUnet e.V. - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - Boston, MA 02110-1301, USA. + This file is part of GNUnet. + Copyright (C) 2010, 2011, 2016 GNUnet e.V. + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file ats/ats_api_performance.c @@ -31,35 +31,6 @@ #define LOG(kind,...) GNUNET_log_from(kind, "ats-performance-api", __VA_ARGS__) -/** - * Message in linked list we should send to the ATS service. The - * actual binary message follows this struct. - */ -struct PendingMessage -{ - - /** - * Kept in a DLL. - */ - struct PendingMessage *next; - - /** - * Kept in a DLL. - */ - struct PendingMessage *prev; - - /** - * Size of the message. - */ - size_t size; - - /** - * Is this the 'ATS_START' message? - */ - int is_init; -}; - - /** * Linked list of pending reservations. */ @@ -185,17 +156,7 @@ struct GNUNET_ATS_PerformanceHandle /** * Connection to ATS service. */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Head of list of messages for the ATS service. - */ - struct PendingMessage *pending_head; - - /** - * Tail of list of messages for the ATS service - */ - struct PendingMessage *pending_tail; + struct GNUNET_MQ_Handle *mq; /** * Head of linked list of pending reservation requests. @@ -273,182 +234,162 @@ reconnect_task (void *cls) /** - * Transmit messages from the message queue to the service - * (if there are any, and if we are not already trying). - * - * @param ph handle to use - */ -static void -do_transmit (struct GNUNET_ATS_PerformanceHandle *ph); - - -/** - * Type of a function to call when we receive a message - * from the service. + * Reconnect to the ATS service, something went wrong. * - * @param cls the `struct GNUNET_ATS_SchedulingHandle` - * @param msg message received, NULL on timeout or fatal error + * @param ph handle to reconnect */ static void -process_ats_message (void *cls, - const struct GNUNET_MessageHeader *msg); - - -/** - * We can now transmit a message to ATS. Do it. - * - * @param cls the `struct GNUNET_ATS_PerformanceHandle` - * @param size number of bytes we can transmit to ATS - * @param buf where to copy the messages - * @return number of bytes copied into @a buf - */ -static size_t -transmit_message_to_ats (void *cls, - size_t size, - void *buf) +do_reconnect (struct GNUNET_ATS_PerformanceHandle *ph) { - struct GNUNET_ATS_PerformanceHandle *ph = cls; - struct PendingMessage *p; - size_t ret; - char *cbuf; - - ph->th = NULL; - ret = 0; - cbuf = buf; - while ((NULL != (p = ph->pending_head)) && (p->size <= size)) + struct GNUNET_ATS_ReservationContext *rc; + struct GNUNET_ATS_AddressListHandle *alh; + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_zero; + + if (NULL != ph->mq) { - memcpy (&cbuf[ret], &p[1], p->size); - ret += p->size; - size -= p->size; - GNUNET_CONTAINER_DLL_remove (ph->pending_head, - ph->pending_tail, - p); - GNUNET_free(p); + GNUNET_MQ_destroy (ph->mq); + ph->mq = NULL; } - do_transmit (ph); - if (GNUNET_NO == ph->in_receive) + while (NULL != (rc = ph->reservation_head)) { - ph->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (ph->client, - &process_ats_message, - ph, - GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_CONTAINER_DLL_remove (ph->reservation_head, + ph->reservation_tail, + rc); + if (NULL != rc->rcb) + rc->rcb (rc->rcb_cls, + NULL, + 0, + GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_free (rc); } - return ret; + bandwidth_zero.value__ = htonl (0); + while (NULL != (alh = ph->addresslist_head)) + { + GNUNET_CONTAINER_DLL_remove (ph->addresslist_head, + ph->addresslist_tail, + alh); + if (NULL != alh->cb) + alh->cb (alh->cb_cls, + NULL, + GNUNET_NO, + bandwidth_zero, + bandwidth_zero, + NULL); + GNUNET_free (alh); + } + if (NULL != ph->addr_info_cb) + { + /* Indicate reconnect */ + ph->addr_info_cb (ph->addr_info_cb_cls, + NULL, + GNUNET_NO, + bandwidth_zero, + bandwidth_zero, + NULL); + } + ph->backoff = GNUNET_TIME_STD_BACKOFF (ph->backoff); + ph->task = GNUNET_SCHEDULER_add_delayed (ph->backoff, + &reconnect_task, + ph); } /** - * Transmit messages from the message queue to the service - * (if there are any, and if we are not already trying). + * We received a peer information message. Validate and process it. * - * @param ph handle to use + * @param cls our context with the callback + * @param pi the message + * @return #GNUNET_OK if the message was well-formed */ -static void -do_transmit (struct GNUNET_ATS_PerformanceHandle *ph) +static int +check_peer_information (void *cls, + const struct PeerInformationMessage *pi) { - struct PendingMessage *p; + const char *plugin_address; + const char *plugin_name; + uint16_t plugin_address_length; + uint16_t plugin_name_length; - if (NULL != ph->th) - return; - if (NULL == (p = ph->pending_head)) - return; - if (NULL == ph->client) - return; /* currently reconnecting */ - ph->th = GNUNET_CLIENT_notify_transmit_ready (ph->client, - p->size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &transmit_message_to_ats, ph); + plugin_address_length = ntohs (pi->address_length); + plugin_name_length = ntohs (pi->plugin_name_length); + plugin_address = (const char *) &pi[1]; + plugin_name = &plugin_address[plugin_address_length]; + if ( (plugin_address_length + plugin_name_length + + sizeof(struct PeerInformationMessage) != ntohs (pi->header.size)) || + (plugin_name[plugin_name_length - 1] != '\0')) + { + GNUNET_break(0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** * We received a peer information message. Validate and process it. * - * @param ph our context with the callback - * @param msg the message + * @param cls our context with the callback + * @param pi the message * @return #GNUNET_OK if the message was well-formed */ -static int -process_pi_message (struct GNUNET_ATS_PerformanceHandle *ph, - const struct GNUNET_MessageHeader *msg) +static void +handle_peer_information (void *cls, + const struct PeerInformationMessage *pi) { - const struct PeerInformationMessage *pi; + struct GNUNET_ATS_PerformanceHandle *ph = cls; const char *plugin_address; const char *plugin_name; struct GNUNET_HELLO_Address address; uint16_t plugin_address_length; - uint16_t plugin_name_length; int addr_active; struct GNUNET_ATS_Properties prop; - if (ntohs (msg->size) < sizeof(struct PeerInformationMessage)) - { - GNUNET_break(0); - return GNUNET_SYSERR; - } - pi = (const struct PeerInformationMessage *) msg; + if (NULL == ph->addr_info_cb) + return; plugin_address_length = ntohs (pi->address_length); - plugin_name_length = ntohs (pi->plugin_name_length); addr_active = (int) ntohl (pi->address_active); plugin_address = (const char *) &pi[1]; plugin_name = &plugin_address[plugin_address_length]; - if ((plugin_address_length + plugin_name_length - + sizeof(struct PeerInformationMessage) != ntohs (msg->size)) - || (plugin_name[plugin_name_length - 1] != '\0')) - { - GNUNET_break(0); - return GNUNET_SYSERR; - } - if (NULL != ph->addr_info_cb) - { - GNUNET_ATS_properties_ntoh (&prop, - &pi->properties); - address.peer = pi->peer; - address.local_info = (enum GNUNET_HELLO_AddressInfo) ntohl (pi->address_local_info); - address.address = plugin_address; - address.address_length = plugin_address_length; - address.transport_name = plugin_name; - ph->addr_info_cb (ph->addr_info_cb_cls, - &address, - addr_active, - pi->bandwidth_out, - pi->bandwidth_in, - &prop); - } - return GNUNET_OK; + GNUNET_ATS_properties_ntoh (&prop, + &pi->properties); + address.peer = pi->peer; + address.local_info = (enum GNUNET_HELLO_AddressInfo) ntohl (pi->address_local_info); + address.address = plugin_address; + address.address_length = plugin_address_length; + address.transport_name = plugin_name; + ph->addr_info_cb (ph->addr_info_cb_cls, + &address, + addr_active, + pi->bandwidth_out, + pi->bandwidth_in, + &prop); } /** * We received a reservation result message. Validate and process it. * - * @param ph our context with the callback - * @param msg the message - * @return #GNUNET_OK if the message was well-formed + * @param cls our context with the callback + * @param rr the message */ -static int -process_rr_message (struct GNUNET_ATS_PerformanceHandle *ph, - const struct GNUNET_MessageHeader *msg) +static void +handle_reservation_result (void *cls, + const struct ReservationResultMessage *rr) { - const struct ReservationResultMessage *rr; + struct GNUNET_ATS_PerformanceHandle *ph = cls; struct GNUNET_ATS_ReservationContext *rc; int32_t amount; - if (ntohs (msg->size) < sizeof(struct ReservationResultMessage)) - { - GNUNET_break(0); - return GNUNET_SYSERR; - } - rr = (const struct ReservationResultMessage *) msg; amount = ntohl (rr->amount); rc = ph->reservation_head; - if (0 != memcmp (&rr->peer, &rc->peer, sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&rr->peer, + &rc->peer, + sizeof(struct GNUNET_PeerIdentity))) { GNUNET_break(0); - return GNUNET_SYSERR; + reconnect (ph); + return; } GNUNET_CONTAINER_DLL_remove (ph->reservation_head, ph->reservation_tail, @@ -457,41 +398,71 @@ process_rr_message (struct GNUNET_ATS_PerformanceHandle *ph, (NULL != rc->rcb) ) { /* tell client if not cancelled */ - if (rc->rcb != NULL ) + if (NULL != rc->rcb) rc->rcb (rc->rcb_cls, &rr->peer, amount, GNUNET_TIME_relative_ntoh (rr->res_delay)); - GNUNET_free(rc); - return GNUNET_OK; + GNUNET_free (rc); + return; } /* amount non-zero, but client cancelled, consider undo! */ if (GNUNET_YES != rc->undo) { - GNUNET_free(rc); - return GNUNET_OK; /* do not try to undo failed undos or negative amounts */ + GNUNET_free (rc); + return; /* do not try to undo failed undos or negative amounts */ } - GNUNET_free(rc); + GNUNET_free (rc); (void) GNUNET_ATS_reserve_bandwidth (ph, &rr->peer, -amount, NULL, NULL); - return GNUNET_OK; } /** - * We received a PeerInformationMessage. Validate and process it. + * We received a PeerInformationMessage. Validate it. * - * @param ph our context with the callback + * @param cls our context with the callback * @param msg the message * @return #GNUNET_OK if the message was well-formed */ static int -process_ar_message (struct GNUNET_ATS_PerformanceHandle *ph, - const struct GNUNET_MessageHeader *msg) +check_address_list (void *cls, + const struct PeerInformationMessage *pi) +{ + const char *plugin_address; + const char *plugin_name; + uint16_t plugin_address_length; + uint16_t plugin_name_length; + + plugin_address_length = ntohs (pi->address_length); + plugin_name_length = ntohs (pi->plugin_name_length); + plugin_address = (const char *) &pi[1]; + plugin_name = &plugin_address[plugin_address_length]; + if ( (plugin_address_length + plugin_name_length + + sizeof (struct PeerInformationMessage) != ntohs (pi->header.size)) || + (plugin_name[plugin_name_length - 1] != '\0') ) + { + GNUNET_break(0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * We received a #GNUNET_MESSAGE_TYPE_ATS_ADDRESSLIST_RESPONSE. + * Process it. + * + * @param cls our context with the callback + * @param msg the message + */ +static void +handle_address_list (void *cls, + const struct PeerInformationMessage *pi) { - const struct PeerInformationMessage *pi; + struct GNUNET_ATS_PerformanceHandle *ph = cls; struct GNUNET_ATS_AddressListHandle *alh; struct GNUNET_ATS_AddressListHandle *next; const char *plugin_address; @@ -505,25 +476,12 @@ process_ar_message (struct GNUNET_ATS_PerformanceHandle *ph, uint32_t active; uint32_t id; - if (ntohs (msg->size) < sizeof(struct PeerInformationMessage)) - { - GNUNET_break(0); - return GNUNET_SYSERR; - } - pi = (const struct PeerInformationMessage *) msg; id = ntohl (pi->id); active = ntohl (pi->address_active); plugin_address_length = ntohs (pi->address_length); plugin_name_length = ntohs (pi->plugin_name_length); plugin_address = (const char *) &pi[1]; plugin_name = &plugin_address[plugin_address_length]; - if ( (plugin_address_length + plugin_name_length - + sizeof (struct PeerInformationMessage) != ntohs (msg->size)) || - (plugin_name[plugin_name_length - 1] != '\0') ) - { - GNUNET_break(0); - return GNUNET_SYSERR; - } LOG (GNUNET_ERROR_TYPE_DEBUG, "Received ATS_ADDRESSLIST_RESPONSE message for peer %s and plugin %s\n", GNUNET_i2s (&pi->peer), @@ -537,10 +495,7 @@ process_ar_message (struct GNUNET_ATS_PerformanceHandle *ph, break; } if (NULL == alh) - { - /* was canceled */ - return GNUNET_SYSERR; - } + return; /* was canceled */ memset (&allzeros, '\0', sizeof (allzeros)); if ( (0 == memcmp (&allzeros, &pi->peer, sizeof(allzeros))) && @@ -562,7 +517,7 @@ process_ar_message (struct GNUNET_ATS_PerformanceHandle *ph, bandwidth_zero, NULL); GNUNET_free (alh); - return GNUNET_OK; + return; } address.peer = pi->peer; @@ -582,87 +537,24 @@ process_ar_message (struct GNUNET_ATS_PerformanceHandle *ph, pi->bandwidth_in, &prop); } - return GNUNET_OK; } /** - * Type of a function to call when we receive a message - * from the service. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param cls the 'struct GNUNET_ATS_SchedulingHandle' - * @param msg message received, NULL on timeout or fatal error + * @param cls closure with the `struct GNUNET_ATS_PerformanceHandle *` + * @param error error code */ static void -process_ats_message (void *cls, - const struct GNUNET_MessageHeader *msg) +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { struct GNUNET_ATS_PerformanceHandle *ph = cls; - if (NULL == msg) - goto reconnect; - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_ATS_PEER_INFORMATION: - if (GNUNET_OK != process_pi_message (ph, msg)) - { - GNUNET_break (0); - goto reconnect; - } - break; - case GNUNET_MESSAGE_TYPE_ATS_RESERVATION_RESULT: - if (GNUNET_OK != process_rr_message (ph, msg)) - { - GNUNET_break (0); - goto reconnect; - } - break; - case GNUNET_MESSAGE_TYPE_ATS_ADDRESSLIST_RESPONSE: - if (GNUNET_OK != process_ar_message (ph, msg)) - { - GNUNET_break (0); - goto reconnect; - } - break; - default: - GNUNET_break (0); - goto reconnect; - } - ph->backoff = GNUNET_TIME_UNIT_ZERO; - GNUNET_CLIENT_receive (ph->client, - &process_ats_message, - ph, - GNUNET_TIME_UNIT_FOREVER_REL); - return; - - reconnect: - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Reconnecting!\n"); - if (NULL != ph->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (ph->th); - ph->th = NULL; - } - if (NULL != ph->client) - { - GNUNET_CLIENT_disconnect (ph->client); - ph->client = NULL; - ph->in_receive = GNUNET_NO; - if (NULL != ph->addr_info_cb) - { - /* Indicate reconnect */ - ph->addr_info_cb (ph->addr_info_cb_cls, - NULL, - GNUNET_NO, - GNUNET_BANDWIDTH_value_init (0), - GNUNET_BANDWIDTH_value_init (0), - NULL); - } - } - ph->backoff = GNUNET_TIME_STD_BACKOFF (ph->backoff); - ph->task = GNUNET_SCHEDULER_add_delayed (ph->backoff, - &reconnect_task, - ph); + do_reconnect (ph); } @@ -674,30 +566,39 @@ process_ats_message (void *cls, static void reconnect (struct GNUNET_ATS_PerformanceHandle *ph) { - struct PendingMessage *p; + GNUNET_MQ_hd_var_size (peer_information, + GNUNET_MESSAGE_TYPE_ATS_PEER_INFORMATION, + struct PeerInformationMessage); + GNUNET_MQ_hd_fixed_size (reservation_result, + GNUNET_MESSAGE_TYPE_ATS_RESERVATION_RESULT, + struct ReservationResultMessage); + GNUNET_MQ_hd_var_size (address_list, + GNUNET_MESSAGE_TYPE_ATS_ADDRESSLIST_RESPONSE, + struct PeerInformationMessage); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_peer_information_handler (ph), + make_reservation_result_handler (ph), + make_address_list_handler (ph), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; struct ClientStartMessage *init; - GNUNET_assert (NULL == ph->client); - ph->client = GNUNET_CLIENT_connect ("ats", - ph->cfg); - GNUNET_assert (NULL != ph->client); - if ((NULL == (p = ph->pending_head)) || (GNUNET_YES != p->is_init)) - { - p = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct ClientStartMessage)); - p->size = sizeof(struct ClientStartMessage); - p->is_init = GNUNET_YES; - init = (struct ClientStartMessage *) &p[1]; - init->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_START); - init->header.size = htons (sizeof(struct ClientStartMessage)); - init->start_flag = htonl ( (NULL == ph->addr_info_cb) - ? START_FLAG_PERFORMANCE_NO_PIC - : START_FLAG_PERFORMANCE_WITH_PIC); - GNUNET_CONTAINER_DLL_insert (ph->pending_head, - ph->pending_tail, - p); - } - do_transmit (ph); + GNUNET_assert (NULL == ph->mq); + ph->mq = GNUNET_CLIENT_connecT (ph->cfg, + "ats", + handlers, + &mq_error_handler, + ph); + if (NULL == ph->mq) + return; + env = GNUNET_MQ_msg (init, + GNUNET_MESSAGE_TYPE_ATS_START); + init->start_flag = htonl ( (NULL == ph->addr_info_cb) + ? START_FLAG_PERFORMANCE_NO_PIC + : START_FLAG_PERFORMANCE_WITH_PIC); + GNUNET_MQ_send (ph->mq, + env); } @@ -721,8 +622,12 @@ GNUNET_ATS_performance_init (const struct GNUNET_CONFIGURATION_Handle *cfg, ph->cfg = cfg; ph->addr_info_cb = addr_info_cb; ph->addr_info_cb_cls = addr_info_cb_cls; - ph->id = 0; reconnect (ph); + if (NULL == ph->mq) + { + GNUNET_free (ph); + return NULL; + } return ph; } @@ -735,17 +640,9 @@ GNUNET_ATS_performance_init (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_ATS_performance_done (struct GNUNET_ATS_PerformanceHandle *ph) { - struct PendingMessage *p; struct GNUNET_ATS_ReservationContext *rc; struct GNUNET_ATS_AddressListHandle *alh; - while (NULL != (p = ph->pending_head)) - { - GNUNET_CONTAINER_DLL_remove (ph->pending_head, - ph->pending_tail, - p); - GNUNET_free (p); - } while (NULL != (alh = ph->addresslist_head)) { GNUNET_CONTAINER_DLL_remove (ph->addresslist_head, @@ -761,16 +658,15 @@ GNUNET_ATS_performance_done (struct GNUNET_ATS_PerformanceHandle *ph) GNUNET_break (NULL == rc->rcb); GNUNET_free (rc); } - if (NULL != ph->task) { GNUNET_SCHEDULER_cancel (ph->task); ph->task = NULL; } - if (NULL != ph->client) + if (NULL != ph->mq) { - GNUNET_CLIENT_disconnect (ph->client); - ph->client = NULL; + GNUNET_MQ_destroy (ph->mq); + ph->mq = NULL; } GNUNET_free (ph); } @@ -779,7 +675,7 @@ GNUNET_ATS_performance_done (struct GNUNET_ATS_PerformanceHandle *ph) /** * Reserve inbound bandwidth from the given peer. ATS will look at * the current amount of traffic we receive from the peer and ensure - * that the peer could add 'amount' of data to its stream. + * that the peer could add @a amount of data to its stream. * * @param ph performance handle * @param peer identifies the peer @@ -794,12 +690,15 @@ struct GNUNET_ATS_ReservationContext * GNUNET_ATS_reserve_bandwidth (struct GNUNET_ATS_PerformanceHandle *ph, const struct GNUNET_PeerIdentity *peer, int32_t amount, - GNUNET_ATS_ReservationCallback rcb, void *rcb_cls) + GNUNET_ATS_ReservationCallback rcb, + void *rcb_cls) { struct GNUNET_ATS_ReservationContext *rc; - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *env; struct ReservationRequestMessage *m; + if (NULL == ph->mq) + return NULL; rc = GNUNET_new (struct GNUNET_ATS_ReservationContext); rc->size = amount; rc->peer = *peer; @@ -811,20 +710,12 @@ GNUNET_ATS_reserve_bandwidth (struct GNUNET_ATS_PerformanceHandle *ph, GNUNET_CONTAINER_DLL_insert_tail (ph->reservation_head, ph->reservation_tail, rc); - - p = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct ReservationRequestMessage)); - p->size = sizeof(struct ReservationRequestMessage); - p->is_init = GNUNET_NO; - m = (struct ReservationRequestMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_RESERVATION_REQUEST); - m->header.size = htons (sizeof(struct ReservationRequestMessage)); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_ATS_RESERVATION_REQUEST); m->amount = htonl (amount); m->peer = *peer; - GNUNET_CONTAINER_DLL_insert_tail (ph->pending_head, - ph->pending_tail, - p); - do_transmit (ph); + GNUNET_MQ_send (ph->mq, + env); return rc; } @@ -832,7 +723,7 @@ GNUNET_ATS_reserve_bandwidth (struct GNUNET_ATS_PerformanceHandle *ph, /** * Cancel request for reserving bandwidth. * - * @param rc context returned by the original GNUNET_ATS_reserve_bandwidth call + * @param rc context returned by the original #GNUNET_ATS_reserve_bandwidth() call */ void GNUNET_ATS_reserve_bandwidth_cancel (struct GNUNET_ATS_ReservationContext *rc) @@ -844,7 +735,7 @@ GNUNET_ATS_reserve_bandwidth_cancel (struct GNUNET_ATS_ReservationContext *rc) /** * Get information about addresses known to the ATS subsystem. * - * @param handle the performance handle to use + * @param ph the performance handle to use * @param peer peer idm can be NULL for all peers * @param all #GNUNET_YES to get information about all addresses or #GNUNET_NO to * get only address currently used @@ -854,24 +745,28 @@ GNUNET_ATS_reserve_bandwidth_cancel (struct GNUNET_ATS_ReservationContext *rc) * @return ats performance context */ struct GNUNET_ATS_AddressListHandle* -GNUNET_ATS_performance_list_addresses (struct GNUNET_ATS_PerformanceHandle *handle, +GNUNET_ATS_performance_list_addresses (struct GNUNET_ATS_PerformanceHandle *ph, const struct GNUNET_PeerIdentity *peer, int all, GNUNET_ATS_AddressInformationCallback infocb, void *infocb_cls) { struct GNUNET_ATS_AddressListHandle *alh; - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *env; struct AddressListRequestMessage *m; + if (NULL == ph->mq) + return NULL; if (NULL == infocb) + { + GNUNET_break (0); return NULL; + } alh = GNUNET_new (struct GNUNET_ATS_AddressListHandle); - alh->id = handle->id; - handle->id++; + alh->id = ph->id++; alh->cb = infocb; alh->cb_cls = infocb_cls; - alh->ph = handle; + alh->ph = ph; alh->all_addresses = all; if (NULL == peer) { @@ -882,25 +777,17 @@ GNUNET_ATS_performance_list_addresses (struct GNUNET_ATS_PerformanceHandle *hand alh->all_peers = GNUNET_NO; alh->peer = *peer; } - GNUNET_CONTAINER_DLL_insert (handle->addresslist_head, - handle->addresslist_tail, + GNUNET_CONTAINER_DLL_insert (ph->addresslist_head, + ph->addresslist_tail, alh); - - p = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct AddressListRequestMessage)); - p->size = sizeof (struct AddressListRequestMessage); - m = (struct AddressListRequestMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESSLIST_REQUEST); - m->header.size = htons (sizeof(struct AddressListRequestMessage)); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_ATS_ADDRESSLIST_REQUEST); m->all = htonl (all); m->id = htonl (alh->id); if (NULL != peer) m->peer = *peer; - GNUNET_CONTAINER_DLL_insert_tail (handle->pending_head, - handle->pending_tail, - p); - do_transmit (handle); - + GNUNET_MQ_send (ph->mq, + env); return alh; } @@ -908,15 +795,17 @@ GNUNET_ATS_performance_list_addresses (struct GNUNET_ATS_PerformanceHandle *hand /** * Cancel a pending address listing operation * - * @param handle the handle of the request to cancel + * @param alh the handle of the request to cancel */ void -GNUNET_ATS_performance_list_addresses_cancel (struct GNUNET_ATS_AddressListHandle *handle) +GNUNET_ATS_performance_list_addresses_cancel (struct GNUNET_ATS_AddressListHandle *alh) { - GNUNET_CONTAINER_DLL_remove (handle->ph->addresslist_head, - handle->ph->addresslist_tail, - handle); - GNUNET_free (handle); + struct GNUNET_ATS_PerformanceHandle *ph = alh->ph; + + GNUNET_CONTAINER_DLL_remove (ph->addresslist_head, + ph->addresslist_tail, + alh); + GNUNET_free (alh); } @@ -947,16 +836,18 @@ GNUNET_ATS_print_preference_type (uint32_t type) */ void GNUNET_ATS_performance_change_preference (struct GNUNET_ATS_PerformanceHandle *ph, - const struct GNUNET_PeerIdentity *peer, ...) + const struct GNUNET_PeerIdentity *peer, + ...) { - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *env; struct ChangePreferenceMessage *m; - size_t msize; uint32_t count; struct PreferenceInformation *pi; va_list ap; enum GNUNET_ATS_PreferenceKind kind; + if (NULL == ph->mq) + return; count = 0; va_start(ap, peer); while (GNUNET_ATS_PREFERENCE_END != @@ -977,14 +868,9 @@ GNUNET_ATS_performance_change_preference (struct GNUNET_ATS_PerformanceHandle *p } } va_end(ap); - msize = count * sizeof(struct PreferenceInformation) - + sizeof(struct ChangePreferenceMessage); - p = GNUNET_malloc (sizeof (struct PendingMessage) + msize); - p->size = msize; - p->is_init = GNUNET_NO; - m = (struct ChangePreferenceMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_PREFERENCE_CHANGE); - m->header.size = htons (msize); + env = GNUNET_MQ_msg_extra (m, + count * sizeof(struct PreferenceInformation), + GNUNET_MESSAGE_TYPE_ATS_PREFERENCE_CHANGE); m->num_preferences = htonl (count); m->peer = *peer; pi = (struct PreferenceInformation *) &m[1]; @@ -1011,8 +897,8 @@ GNUNET_ATS_performance_change_preference (struct GNUNET_ATS_PerformanceHandle *p } } va_end(ap); - GNUNET_CONTAINER_DLL_insert_tail(ph->pending_head, ph->pending_tail, p); - do_transmit (ph); + GNUNET_MQ_send (ph->mq, + env); } @@ -1028,16 +914,18 @@ GNUNET_ATS_performance_change_preference (struct GNUNET_ATS_PerformanceHandle *p void GNUNET_ATS_performance_give_feedback (struct GNUNET_ATS_PerformanceHandle *ph, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_TIME_Relative scope, ...) + const struct GNUNET_TIME_Relative scope, + ...) { - struct PendingMessage *p; + struct GNUNET_MQ_Envelope *env; struct FeedbackPreferenceMessage *m; - size_t msize; uint32_t count; struct PreferenceInformation *pi; va_list ap; enum GNUNET_ATS_PreferenceKind kind; + if (NULL == ph->mq) + return; count = 0; va_start(ap, scope); while (GNUNET_ATS_PREFERENCE_END != @@ -1058,14 +946,9 @@ GNUNET_ATS_performance_give_feedback (struct GNUNET_ATS_PerformanceHandle *ph, } } va_end(ap); - msize = count * sizeof(struct PreferenceInformation) - + sizeof(struct FeedbackPreferenceMessage); - p = GNUNET_malloc (sizeof (struct PendingMessage) + msize); - p->size = msize; - p->is_init = GNUNET_NO; - m = (struct FeedbackPreferenceMessage *) &p[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_PREFERENCE_FEEDBACK); - m->header.size = htons (msize); + env = GNUNET_MQ_msg_extra (m, + count * sizeof(struct PreferenceInformation), + GNUNET_MESSAGE_TYPE_ATS_PREFERENCE_FEEDBACK); m->scope = GNUNET_TIME_relative_hton (scope); m->num_feedback = htonl (count); m->peer = *peer; @@ -1093,10 +976,8 @@ GNUNET_ATS_performance_give_feedback (struct GNUNET_ATS_PerformanceHandle *ph, } } va_end(ap); - GNUNET_CONTAINER_DLL_insert_tail (ph->pending_head, - ph->pending_tail, - p); - do_transmit (ph); + GNUNET_MQ_send (ph->mq, + env); } /* end of ats_api_performance.c */ -- 2.25.1