From 09f60ef3bac1f997ba86898b4140bcda70191740 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 25 Jun 2016 17:40:16 +0000 Subject: [PATCH] refactoring gns_api to use MQ API --- src/gns/gns_api.c | 416 ++++++++------------------ src/gns/gnunet-service-gns.c | 8 +- src/gns/gnunet-service-gns_resolver.c | 10 +- 3 files changed, 133 insertions(+), 301 deletions(-) diff --git a/src/gns/gns_api.c b/src/gns/gns_api.c index a1fe3680a..8f821f715 100644 --- a/src/gns/gns_api.c +++ b/src/gns/gns_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009-2013 GNUnet e.V. + Copyright (C) 2009-2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -63,50 +63,20 @@ struct GNUNET_GNS_LookupRequest GNUNET_GNS_LookupResultProcessor lookup_proc; /** - * processor closure + * @e lookup_proc closure */ void *proc_cls; /** - * request id - */ - uint32_t r_id; - -}; - - -/** - * Entry in our list of messages to be (re-)transmitted. - */ -struct PendingMessage -{ - /** - * This is a doubly-linked list. - */ - struct PendingMessage *prev; - - /** - * This is a doubly-linked list. + * Envelope with the message for this queue entry. */ - struct PendingMessage *next; - - /** - * Size of the message. - */ - size_t size; + struct GNUNET_MQ_Envelope *env; /** * request id */ uint32_t r_id; - /** - * This message has been transmitted. GNUNET_NO if the message is - * in the "pending" DLL, GNUNET_YES if it has been transmitted to - * the service via the current client connection. - */ - int transmitted; - }; @@ -122,39 +92,24 @@ struct GNUNET_GNS_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). - */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Currently pending transmission request (or NULL). - */ - struct GNUNET_CLIENT_TransmitHandle *th; - - /** - * Head of linked list of shorten messages we would like to transmit. - */ - struct PendingMessage *pending_head; - - /** - * Tail of linked list of shorten messages we would like to transmit. + * Connection to service (if available). */ - struct PendingMessage *pending_tail; + struct GNUNET_MQ_Handle *mq; /** - * Head of linked list of lookup messages we would like to transmit. + * Head of linked list of active lookup requests. */ struct GNUNET_GNS_LookupRequest *lookup_head; /** - * Tail of linked list of lookup messages we would like to transmit. + * Tail of linked list of active lookup requests. */ struct GNUNET_GNS_LookupRequest *lookup_tail; /** * Reconnect task */ - struct GNUNET_SCHEDULER_Task * reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** * How long do we wait until we try to reconnect? @@ -166,37 +121,16 @@ struct GNUNET_GNS_Handle */ uint32_t r_id_gen; - /** - * Did we start our receive loop yet? - */ - int in_receive; - }; -/** - * Try to send messages from list of messages to send - * @param handle GNS_Handle - */ -static void -process_pending_messages (struct GNUNET_GNS_Handle *handle); - - /** * Reconnect to GNS service. * * @param handle the handle to the GNS service */ static void -reconnect (struct GNUNET_GNS_Handle *handle) -{ - GNUNET_assert (NULL == handle->client); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Trying to connect to GNS\n"); - handle->client = GNUNET_CLIENT_connect ("gns", handle->cfg); - GNUNET_assert (NULL != handle->client); - process_pending_messages (handle); -} +reconnect (struct GNUNET_GNS_Handle *handle); /** @@ -222,23 +156,10 @@ reconnect_task (void *cls) static void force_reconnect (struct GNUNET_GNS_Handle *handle) { - struct GNUNET_GNS_LookupRequest *lh; - struct PendingMessage *p; - - GNUNET_CLIENT_disconnect (handle->client); - handle->client = NULL; - handle->in_receive = GNUNET_NO; - for (lh = handle->lookup_head; NULL != lh; lh = lh->next) - { - p = (struct PendingMessage *) &lh[1]; - if (GNUNET_NO == p->transmitted) - continue; - p->transmitted = GNUNET_NO; - GNUNET_CONTAINER_DLL_insert (handle->pending_head, - handle->pending_tail, - p); - } - handle->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (handle->reconnect_backoff); + GNUNET_MQ_destroy (handle->mq); + handle->mq = NULL; + handle->reconnect_backoff + = GNUNET_TIME_STD_BACKOFF (handle->reconnect_backoff); handle->reconnect_task = GNUNET_SCHEDULER_add_delayed (handle->reconnect_backoff, &reconnect_task, @@ -247,209 +168,124 @@ force_reconnect (struct GNUNET_GNS_Handle *handle) /** - * Transmit the next pending message, called by notify_transmit_ready - * - * @param cls the closure - * @param size size of pending data - * @param buf buffer with pending data - * @return size data transmitted - */ -static size_t -transmit_pending (void *cls, size_t size, void *buf); - - -/** - * Handler for messages received from the GNS service - * - * @param cls the 'struct GNUNET_GNS_Handle' - * @param msg the incoming message - */ -static void -process_message (void *cls, const struct GNUNET_MessageHeader *msg); - - -/** - * Try to send messages from list of messages to send + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param handle the GNS handle + * @param cls closure with the `struct GNUNET_GNS_Handle *` + * @param error error code */ static void -process_pending_messages (struct GNUNET_GNS_Handle *handle) +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { - struct PendingMessage *p = handle->pending_head; - - if (NULL == handle->client) - return; /* wait for reconnect */ - if (NULL != handle->th) - return; /* transmission request already pending */ - - while ((NULL != p) && (p->transmitted == GNUNET_YES)) - p = p->next; - if (NULL == p) - return; /* no messages pending */ + struct GNUNET_GNS_Handle *handle = cls; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Trying to transmit %u bytes\n", - (unsigned int) p->size); - handle->th = - GNUNET_CLIENT_notify_transmit_ready (handle->client, - p->size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_pending, - handle); - GNUNET_break (NULL != handle->th); + force_reconnect (handle); } /** - * Transmit the next pending message, called by notify_transmit_ready + * Check validity of message received from the GNS service * - * @param cls the closure - * @param size size of pending data - * @param buf buffer with pending data - * @return size data transmitted + * @param cls the `struct GNUNET_GNS_Handle *` + * @param loookup_msg the incoming message */ -static size_t -transmit_pending (void *cls, size_t size, void *buf) +static int +check_result (void *cls, + const struct GNUNET_GNS_ClientLookupResultMessage *lookup_msg) { - struct GNUNET_GNS_Handle *handle = cls; - char *cbuf = buf; - struct PendingMessage *p; - size_t tsize; - - handle->th = NULL; - if ((0 == size) || (NULL == buf)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmission to GNS service failed!\n"); - force_reconnect (handle); - return 0; - } - if (NULL == (p = handle->pending_head)) - return 0; + size_t mlen = ntohs (lookup_msg->header.size) - sizeof (*lookup_msg); + uint32_t rd_count = ntohl (lookup_msg->rd_count); + struct GNUNET_GNSRECORD_Data rd[rd_count]; - tsize = 0; - while ((NULL != (p = handle->pending_head)) && (p->size <= size)) + if (GNUNET_SYSERR == + GNUNET_GNSRECORD_records_deserialize (mlen, + (const char*) &lookup_msg[1], + rd_count, + rd)) { - memcpy (&cbuf[tsize], &p[1], p->size); - tsize += p->size; - size -= p->size; - p->transmitted = GNUNET_YES; - GNUNET_CONTAINER_DLL_remove (handle->pending_head, - handle->pending_tail, - p); - if (GNUNET_YES != handle->in_receive) - { - GNUNET_CLIENT_receive (handle->client, &process_message, handle, - GNUNET_TIME_UNIT_FOREVER_REL); - handle->in_receive = GNUNET_YES; - } + GNUNET_break (0); + return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending %u bytes\n", - (unsigned int) tsize); - process_pending_messages (handle); - return tsize; + return GNUNET_OK; } /** - * Process a given reply to the lookup request + * Handler for messages received from the GNS service * - * @param qe a queue entry - * @param msg the lookup message received + * @param cls the `struct GNUNET_GNS_Handle *` + * @param loookup_msg the incoming message */ static void -process_lookup_reply (struct GNUNET_GNS_LookupRequest *qe, - const struct GNUNET_GNS_ClientLookupResultMessage *msg) +handle_result (void *cls, + const struct GNUNET_GNS_ClientLookupResultMessage *lookup_msg) { - struct GNUNET_GNS_Handle *handle = qe->gns_handle; - struct PendingMessage *p = (struct PendingMessage *) &qe[1]; + struct GNUNET_GNS_Handle *handle = cls; + size_t mlen = ntohs (lookup_msg->header.size) - sizeof (*lookup_msg); + uint32_t rd_count = ntohl (lookup_msg->rd_count); + struct GNUNET_GNSRECORD_Data rd[rd_count]; + uint32_t r_id = ntohl (lookup_msg->id); + struct GNUNET_GNS_LookupRequest *lr; GNUNET_GNS_LookupResultProcessor proc; void *proc_cls; - uint32_t rd_count = ntohl (msg->rd_count); - struct GNUNET_GNSRECORD_Data rd[rd_count]; - size_t mlen; - if (GNUNET_YES != p->transmitted) - { - /* service send reply to query we never managed to send!? */ - GNUNET_break (0); - force_reconnect (handle); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received lookup reply from GNS service (%u records)\n", + (unsigned int) rd_count); + for (lr = handle->lookup_head; NULL != lr; lr = lr->next) + if (lr->r_id == r_id) + break; + if (NULL == lr) return; - } - mlen = ntohs (msg->header.size); - mlen -= sizeof (struct GNUNET_GNS_ClientLookupResultMessage); - proc = qe->lookup_proc; - proc_cls = qe->proc_cls; - GNUNET_CONTAINER_DLL_remove (handle->lookup_head, handle->lookup_tail, qe); - GNUNET_free (qe); - if (GNUNET_SYSERR == GNUNET_GNSRECORD_records_deserialize (mlen, - (const char*) &msg[1], - rd_count, - rd)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - _("Failed to deserialize lookup reply from GNS service!\n")); - proc (proc_cls, 0, NULL); - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received lookup reply from GNS service (%u records)\n", - (unsigned int) rd_count); - proc (proc_cls, rd_count, rd); - } + proc = lr->lookup_proc; + proc_cls = lr->proc_cls; + GNUNET_CONTAINER_DLL_remove (handle->lookup_head, + handle->lookup_tail, + lr); + GNUNET_free (lr); + GNUNET_assert (GNUNET_OK == + GNUNET_GNSRECORD_records_deserialize (mlen, + (const char*) &lookup_msg[1], + rd_count, + rd)); + proc (proc_cls, + rd_count, + rd); } /** - * Handler for messages received from the GNS service + * Reconnect to GNS service. * - * @param cls the 'struct GNUNET_GNS_Handle' - * @param msg the incoming message + * @param handle the handle to the GNS service */ static void -process_message (void *cls, const struct GNUNET_MessageHeader *msg) +reconnect (struct GNUNET_GNS_Handle *handle) { - struct GNUNET_GNS_Handle *handle = cls; - struct GNUNET_GNS_LookupRequest *lr; - const struct GNUNET_GNS_ClientLookupResultMessage *lookup_msg; - uint32_t r_id; - - if (NULL == msg) - { - force_reconnect (handle); - return; - } + GNUNET_MQ_hd_var_size (result, + GNUNET_MESSAGE_TYPE_GNS_LOOKUP_RESULT, + struct GNUNET_GNS_ClientLookupResultMessage); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_result_handler (handle), + GNUNET_MQ_handler_end () + }; + struct GNUNET_GNS_LookupRequest *lh; - GNUNET_CLIENT_receive (handle->client, &process_message, handle, - GNUNET_TIME_UNIT_FOREVER_REL); - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_GNS_LOOKUP_RESULT: - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got LOOKUP_RESULT msg\n"); - if (ntohs (msg->size) < sizeof (struct GNUNET_GNS_ClientLookupResultMessage)) - { - GNUNET_break (0); - force_reconnect (handle); - return; - } - lookup_msg = (const struct GNUNET_GNS_ClientLookupResultMessage *) msg; - r_id = ntohl (lookup_msg->id); - for (lr = handle->lookup_head; NULL != lr; lr = lr->next) - if (lr->r_id == r_id) - { - process_lookup_reply(lr, lookup_msg); - break; - } - break; - default: - GNUNET_break (0); - force_reconnect (handle); + GNUNET_assert (NULL == handle->mq); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Trying to connect to GNS\n"); + handle->mq = GNUNET_CLIENT_connecT (handle->cfg, + "gns", + handlers, + &mq_error_handler, + handle); + if (NULL == handle->mq) return; - } + for (lh = handle->lookup_head; NULL != lh; lh = lh->next) + GNUNET_MQ_send_copy (handle->mq, + lh->env); } @@ -467,6 +303,11 @@ GNUNET_GNS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) handle = GNUNET_new (struct GNUNET_GNS_Handle); handle->cfg = cfg; reconnect (handle); + if (NULL == handle->mq) + { + GNUNET_free (handle); + return NULL; + } return handle; } @@ -479,10 +320,10 @@ GNUNET_GNS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) void GNUNET_GNS_disconnect (struct GNUNET_GNS_Handle *handle) { - if (NULL != handle->client) + if (NULL != handle->mq) { - GNUNET_CLIENT_disconnect (handle->client); - handle->client = NULL; + GNUNET_MQ_destroy (handle->mq); + handle->mq = NULL; } if (NULL != handle->reconnect_task) { @@ -502,16 +343,12 @@ GNUNET_GNS_disconnect (struct GNUNET_GNS_Handle *handle) void GNUNET_GNS_lookup_cancel (struct GNUNET_GNS_LookupRequest *lr) { - struct PendingMessage *p = (struct PendingMessage*) &lr[1]; - - GNUNET_assert (NULL != lr->gns_handle); - if (GNUNET_NO == p->transmitted) - GNUNET_CONTAINER_DLL_remove (lr->gns_handle->pending_head, - lr->gns_handle->pending_tail, - p); - GNUNET_CONTAINER_DLL_remove (lr->gns_handle->lookup_head, - lr->gns_handle->lookup_tail, + struct GNUNET_GNS_Handle *handle = lr->gns_handle; + + GNUNET_CONTAINER_DLL_remove (handle->lookup_head, + handle->lookup_tail, lr); + GNUNET_MQ_discard (lr->env); GNUNET_free (lr); } @@ -542,8 +379,7 @@ GNUNET_GNS_lookup (struct GNUNET_GNS_Handle *handle, /* IPC to shorten gns names, return shorten_handle */ struct GNUNET_GNS_ClientLookupMessage *lookup_msg; struct GNUNET_GNS_LookupRequest *lr; - size_t msize; - struct PendingMessage *pending; + size_t nlen; if (NULL == name) { @@ -553,28 +389,20 @@ GNUNET_GNS_lookup (struct GNUNET_GNS_Handle *handle, LOG (GNUNET_ERROR_TYPE_DEBUG, "Trying to lookup `%s' in GNS\n", name); - msize = sizeof (struct GNUNET_GNS_ClientLookupMessage) - + strlen (name) + 1; - if (msize > UINT16_MAX) + nlen = strlen (name) + 1; + if (nlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*lr)) { GNUNET_break (0); return NULL; } - lr = GNUNET_malloc (sizeof (struct GNUNET_GNS_LookupRequest) + - sizeof (struct PendingMessage) + msize); + lr = GNUNET_new (struct GNUNET_GNS_LookupRequest); lr->gns_handle = handle; lr->lookup_proc = proc; lr->proc_cls = proc_cls; lr->r_id = handle->r_id_gen++; - pending = (struct PendingMessage *)&lr[1]; - pending->size = msize; - pending->r_id = lr->r_id; - GNUNET_CONTAINER_DLL_insert_tail (handle->lookup_head, - handle->lookup_tail, lr); - - lookup_msg = (struct GNUNET_GNS_ClientLookupMessage *) &pending[1]; - lookup_msg->header.type = htons (GNUNET_MESSAGE_TYPE_GNS_LOOKUP); - lookup_msg->header.size = htons (msize); + lr->env = GNUNET_MQ_msg_extra (lookup_msg, + nlen, + GNUNET_MESSAGE_TYPE_GNS_LOOKUP); lookup_msg->id = htonl (lr->r_id); lookup_msg->options = htons ((uint16_t) options); lookup_msg->zone = *zone; @@ -584,11 +412,15 @@ GNUNET_GNS_lookup (struct GNUNET_GNS_Handle *handle, lookup_msg->have_key = htons (GNUNET_YES); lookup_msg->shorten_key = *shorten_zone_key; } - memcpy (&lookup_msg[1], name, strlen (name) + 1); - GNUNET_CONTAINER_DLL_insert_tail (handle->pending_head, - handle->pending_tail, - pending); - process_pending_messages (handle); + memcpy (&lookup_msg[1], + name, + nlen); + GNUNET_CONTAINER_DLL_insert (handle->lookup_head, + handle->lookup_tail, + lr); + if (NULL != handle->mq) + GNUNET_MQ_send_copy (handle->mq, + lr->env); return lr; } diff --git a/src/gns/gnunet-service-gns.c b/src/gns/gnunet-service-gns.c index 5e1b5830f..e2bb0ad8c 100644 --- a/src/gns/gnunet-service-gns.c +++ b/src/gns/gnunet-service-gns.c @@ -706,9 +706,8 @@ send_lookup_response (void* cls, size_t len; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending `%s' message with %d results\n", - "LOOKUP_RESULT", - rd_count); + "Sending LOOKUP_RESULT message with %u results\n", + (unsigned int) rd_count); len = GNUNET_GNSRECORD_records_get_size (rd_count, rd); rmsg = GNUNET_malloc (len + sizeof (struct GNUNET_GNS_ClientLookupResultMessage)); @@ -757,8 +756,7 @@ handle_lookup (void *cls, const struct GNUNET_GNS_ClientLookupMessage *sh_msg; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' message\n", - "LOOKUP"); + "Received LOOKUP message\n"); msg_size = ntohs (message->size); if (msg_size < sizeof (struct GNUNET_GNS_ClientLookupMessage)) { diff --git a/src/gns/gnunet-service-gns_resolver.c b/src/gns/gnunet-service-gns_resolver.c index daae46ab7..2dec9a2af 100644 --- a/src/gns/gnunet-service-gns_resolver.c +++ b/src/gns/gnunet-service-gns_resolver.c @@ -2537,12 +2537,14 @@ GNS_resolver_init (struct GNUNET_NAMECACHE_Handle *nc, dht_lookup_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); max_allowed_background_queries = max_bg_queries; - if (GNUNET_SYSERR == (use_cache = GNUNET_CONFIGURATION_get_value_yesno (c, - "gns", - "USE_CACHE"))) + if (GNUNET_SYSERR == (use_cache = + GNUNET_CONFIGURATION_get_value_yesno (c, + "gns", + "USE_CACHE"))) use_cache = GNUNET_YES; if (GNUNET_NO == use_cache) - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Namecache disabled\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Namecache disabled\n"); if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (c, -- 2.25.1