From 7262f6743a026cf110d3aedc7e39951280d65fa1 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 21 Jun 2016 19:07:03 +0000 Subject: [PATCH] convert namecache to new MQ API --- src/include/gnunet_namecache_service.h | 21 +- src/namecache/namecache_api.c | 518 +++++++++---------------- 2 files changed, 195 insertions(+), 344 deletions(-) diff --git a/src/include/gnunet_namecache_service.h b/src/include/gnunet_namecache_service.h index a6d293575..2c4ca29c3 100644 --- a/src/include/gnunet_namecache_service.h +++ b/src/include/gnunet_namecache_service.h @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2012, 2013 GNUnet e.V. + Copyright (C) 2012, 2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -98,9 +98,10 @@ GNUNET_NAMECACHE_disconnect (struct GNUNET_NAMECACHE_Handle *h); * #GNUNET_YES (or other positive value) on success * @param emsg NULL on success, otherwise an error message */ -typedef void (*GNUNET_NAMECACHE_ContinuationWithStatus) (void *cls, - int32_t success, - const char *emsg); +typedef void +(*GNUNET_NAMECACHE_ContinuationWithStatus) (void *cls, + int32_t success, + const char *emsg); @@ -112,7 +113,7 @@ typedef void (*GNUNET_NAMECACHE_ContinuationWithStatus) (void *cls, * @param block block to store * @param cont continuation to call when done * @param cont_cls closure for @a cont - * @return handle to abort the request + * @return handle to abort the request, NULL on error */ struct GNUNET_NAMECACHE_QueueEntry * GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, @@ -127,8 +128,9 @@ GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, * @param cls closure * @param block block that was stored in the namecache */ -typedef void (*GNUNET_NAMECACHE_BlockProcessor) (void *cls, - const struct GNUNET_GNSRECORD_Block *block); +typedef void +(*GNUNET_NAMECACHE_BlockProcessor) (void *cls, + const struct GNUNET_GNSRECORD_Block *block); /** @@ -141,12 +143,13 @@ typedef void (*GNUNET_NAMECACHE_BlockProcessor) (void *cls, * @param proc function to call on the matching block, or with * NULL if there is no matching block * @param proc_cls closure for @a proc - * @return a handle that can be used to cancel + * @return a handle that can be used to cancel, NULL on error */ struct GNUNET_NAMECACHE_QueueEntry * GNUNET_NAMECACHE_lookup_block (struct GNUNET_NAMECACHE_Handle *h, const struct GNUNET_HashCode *derived_hash, - GNUNET_NAMECACHE_BlockProcessor proc, void *proc_cls); + GNUNET_NAMECACHE_BlockProcessor proc, + void *proc_cls); /** diff --git a/src/namecache/namecache_api.c b/src/namecache/namecache_api.c index 15a750448..51cbacf12 100644 --- a/src/namecache/namecache_api.c +++ b/src/namecache/namecache_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2010-2013 GNUnet e.V. + Copyright (C) 2010-2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -90,31 +90,6 @@ struct GNUNET_NAMECACHE_QueueEntry }; -/** - * Message in linked list we should send to the service. The - * actual binary message follows this struct. - */ -struct PendingMessage -{ - - /** - * Kept in a DLL. - */ - struct PendingMessage *next; - - /** - * Kept in a DLL. - */ - struct PendingMessage *prev; - - /** - * Size of the message. - */ - size_t size; - -}; - - /** * Connection to the NAMECACHE service. */ @@ -127,25 +102,15 @@ struct GNUNET_NAMECACHE_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Message queue to service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Currently pending transmission request (or NULL). */ struct GNUNET_CLIENT_TransmitHandle *th; - /** - * Head of linked list of pending messages to send to the service - */ - struct PendingMessage *pending_head; - - /** - * Tail of linked list of pending messages to send to the service - */ - struct PendingMessage *pending_tail; - /** * Head of pending namecache queue entries */ @@ -159,7 +124,7 @@ struct GNUNET_NAMECACHE_Handle /** * Reconnect task */ - struct GNUNET_SCHEDULER_Task * reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** * Delay introduced before we reconnect. @@ -171,11 +136,6 @@ struct GNUNET_NAMECACHE_Handle */ int reconnect; - /** - * Did we start to receive yet? - */ - int is_receiving; - /** * The last operation id used for a NAMECACHE operation */ @@ -194,285 +154,165 @@ force_reconnect (struct GNUNET_NAMECACHE_Handle *h); /** - * Handle an incoming message of type - * #GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE. + * Find queue entry for the given @a rid. * - * @param qe the respective entry in the message queue - * @param msg the message we received - * @param size the message size - * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and we did NOT notify the client + * @param h handle to search + * @param rid request ID to look for + * @return NULL if not found, otherwise the queue entry (removed from the queue) */ -static int -handle_lookup_block_response (struct GNUNET_NAMECACHE_QueueEntry *qe, - const struct LookupBlockResponseMessage *msg, - size_t size) +static struct GNUNET_NAMECACHE_QueueEntry * +find_qe (struct GNUNET_NAMECACHE_Handle *h, + uint32_t rid) { - struct GNUNET_GNSRECORD_Block *block; - char buf[size + sizeof (struct GNUNET_GNSRECORD_Block) - - sizeof (struct LookupBlockResponseMessage)]; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s'\n", - "LOOKUP_BLOCK_RESPONSE"); - if (0 == GNUNET_TIME_absolute_ntoh (msg->expire).abs_value_us) - { - /* no match found */ - if (NULL != qe->block_proc) - qe->block_proc (qe->block_proc_cls, NULL); - return GNUNET_OK; - } + struct GNUNET_NAMECACHE_QueueEntry *qe; - block = (struct GNUNET_GNSRECORD_Block *) buf; - block->signature = msg->signature; - block->derived_key = msg->derived_key; - block->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN); - block->purpose.size = htonl (size - sizeof (struct LookupBlockResponseMessage) + - sizeof (struct GNUNET_TIME_AbsoluteNBO) + - sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose)); - block->expiration_time = msg->expire; - memcpy (&block[1], - &msg[1], - size - sizeof (struct LookupBlockResponseMessage)); - if (GNUNET_OK != - GNUNET_GNSRECORD_block_verify (block)) + for (qe = h->op_head; qe != NULL; qe = qe->next) { - GNUNET_break (0); - return GNUNET_SYSERR; + if (qe->op_id == rid) + { + GNUNET_CONTAINER_DLL_remove (h->op_head, + h->op_tail, + qe); + return qe; + } } - if (NULL != qe->block_proc) - qe->block_proc (qe->block_proc_cls, block); - else - GNUNET_break (0); - return GNUNET_OK; + return NULL; } /** * Handle an incoming message of type - * #GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE + * #GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE. * - * @param qe the respective entry in the message queue + * @param cls the `struct GNUNET_NAMECACHE_Handle` * @param msg the message we received - * @param size the message size - * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and we did NOT notify the client */ static int -handle_block_cache_response (struct GNUNET_NAMECACHE_QueueEntry *qe, - const struct BlockCacheResponseMessage *msg, - size_t size) +check_lookup_block_response (void *cls, + const struct LookupBlockResponseMessage *msg) { - int res; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s'\n", - "BLOCK_CACHE_RESPONSE"); - res = ntohl (msg->op_result); - /* TODO: add actual error message from namecache to response... */ - if (NULL != qe->cont) - qe->cont (qe->cont_cls, - res, - (GNUNET_OK == res) - ? NULL - : _("Namecache failed to cache block")); + /* any length will do, format validation is in handler */ return GNUNET_OK; } /** - * Handle incoming messages for record operations + * Handle an incoming message of type + * #GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE. * - * @param qe the respective zone iteration handle + * @param cls the `struct GNUNET_NAMECACHE_Handle` * @param msg the message we received - * @param type the message type in host byte order - * @param size the message size - * @return #GNUNET_OK on success, #GNUNET_NO if we notified the client about - * the error, #GNUNET_SYSERR on error and we did NOT notify the client - */ -static int -manage_record_operations (struct GNUNET_NAMECACHE_QueueEntry *qe, - const struct GNUNET_MessageHeader *msg, - uint16_t type, - size_t size) -{ - /* handle different message type */ - switch (type) - { - case GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE: - if (size < sizeof (struct LookupBlockResponseMessage)) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - return handle_lookup_block_response (qe, (const struct LookupBlockResponseMessage *) msg, size); - case GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE: - if (size != sizeof (struct BlockCacheResponseMessage)) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - return handle_block_cache_response (qe, (const struct BlockCacheResponseMessage *) msg, size); - default: - GNUNET_break (0); - return GNUNET_SYSERR; - } -} - - -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls the `struct GNUNET_NAMECACHE_SchedulingHandle` - * @param msg message received, NULL on timeout or fatal error */ static void -process_namecache_message (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_lookup_block_response (void *cls, + const struct LookupBlockResponseMessage *msg) { struct GNUNET_NAMECACHE_Handle *h = cls; - const struct GNUNET_NAMECACHE_Header *gm; + size_t size; struct GNUNET_NAMECACHE_QueueEntry *qe; - uint16_t size; - uint16_t type; - uint32_t r_id; - int ret; - if (NULL == msg) - { - force_reconnect (h); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received LOOKUP_BLOCK_RESPONSE\n"); + qe = find_qe (h, + ntohl (msg->gns_header.r_id)); + if (NULL == qe) return; - } - size = ntohs (msg->size); - type = ntohs (msg->type); - if (size < sizeof (struct GNUNET_NAMECACHE_Header)) + if (0 == GNUNET_TIME_absolute_ntoh (msg->expire).abs_value_us) { - GNUNET_break_op (0); - GNUNET_CLIENT_receive (h->client, - &process_namecache_message, h, - GNUNET_TIME_UNIT_FOREVER_REL); + /* no match found */ + if (NULL != qe->block_proc) + qe->block_proc (qe->block_proc_cls, + NULL); + GNUNET_free (qe); return; } - gm = (const struct GNUNET_NAMECACHE_Header *) msg; - r_id = ntohl (gm->r_id); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message type %u size %u op %u\n", - (unsigned int) type, - (unsigned int) size, - (unsigned int) r_id); - - /* Is it a record related operation ? */ - for (qe = h->op_head; qe != NULL; qe = qe->next) - if (qe->op_id == r_id) - break; - if (NULL != qe) + size = ntohs (msg->gns_header.header.size) + - sizeof (struct LookupBlockResponseMessage); { - ret = manage_record_operations (qe, msg, type, size); - if (GNUNET_SYSERR == ret) + char buf[size + sizeof (struct GNUNET_GNSRECORD_Block)] GNUNET_ALIGN; + struct GNUNET_GNSRECORD_Block *block; + + block = (struct GNUNET_GNSRECORD_Block *) buf; + block->signature = msg->signature; + block->derived_key = msg->derived_key; + block->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN); + block->purpose.size = htonl (size + + sizeof (struct GNUNET_TIME_AbsoluteNBO) + + sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose)); + block->expiration_time = msg->expire; + memcpy (&block[1], + &msg[1], + size); + if (GNUNET_OK != + GNUNET_GNSRECORD_block_verify (block)) { - /* protocol error, need to reconnect */ - h->reconnect = GNUNET_YES; + GNUNET_break (0); + if (NULL != qe->block_proc) + qe->block_proc (qe->block_proc_cls, + NULL); + force_reconnect (h); } else { - /* client was notified about success or failure, clean up 'qe' */ - GNUNET_CONTAINER_DLL_remove (h->op_head, - h->op_tail, - qe); - GNUNET_free (qe); + if (NULL != qe->block_proc) + qe->block_proc (qe->block_proc_cls, + block); } } - if (GNUNET_YES == h->reconnect) - { - force_reconnect (h); - return; - } - GNUNET_CLIENT_receive (h->client, &process_namecache_message, h, - GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_free (qe); } /** - * Transmit messages from the message queue to the service - * (if there are any, and if we are not already trying). - * - * @param h handle to use - */ -static void -do_transmit (struct GNUNET_NAMECACHE_Handle *h); - - -/** - * We can now transmit a message to NAMECACHE. Do it. + * Handle an incoming message of type + * #GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE * * @param cls the `struct GNUNET_NAMECACHE_Handle` - * @param size number of bytes we can transmit - * @param buf where to copy the messages - * @return number of bytes copied into @a buf + * @param msg the message we received + * @param size the message size + * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and we did NOT notify the client */ -static size_t -transmit_message_to_namecache (void *cls, - size_t size, - void *buf) +static void +handle_block_cache_response (void *cls, + const struct BlockCacheResponseMessage *msg) { struct GNUNET_NAMECACHE_Handle *h = cls; - struct PendingMessage *p; - size_t ret; - char *cbuf; + struct GNUNET_NAMECACHE_QueueEntry *qe; + int res; - h->th = NULL; - if ((0 == size) || (NULL == buf)) - { - force_reconnect (h); - return 0; - } - ret = 0; - cbuf = buf; - while ( (NULL != (p = h->pending_head)) && - (p->size <= size) ) - { - memcpy (&cbuf[ret], &p[1], p->size); - ret += p->size; - size -= p->size; - GNUNET_CONTAINER_DLL_remove (h->pending_head, - h->pending_tail, - p); - if (GNUNET_NO == h->is_receiving) - { - h->is_receiving = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, - &process_namecache_message, h, - GNUNET_TIME_UNIT_FOREVER_REL); - } - GNUNET_free (p); - } - do_transmit (h); - return ret; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received BLOCK_CACHE_RESPONSE\n"); + qe = find_qe (h, + ntohl (msg->gns_header.r_id)); + if (NULL == qe) + return; + res = ntohl (msg->op_result); + /* TODO: add actual error message from namecache to response... */ + if (NULL != qe->cont) + qe->cont (qe->cont_cls, + res, + (GNUNET_OK == res) + ? NULL + : _("Namecache failed to cache block")); + GNUNET_free (qe); } /** - * Transmit messages from the message queue to the service - * (if there are any, and if we are not already trying). + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param h handle to use + * @param cls closure with the `struct GNUNET_NAMECACHE_Handle *` + * @param error error code */ static void -do_transmit (struct GNUNET_NAMECACHE_Handle *h) +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { - struct PendingMessage *p; - - if (NULL != h->th) - return; /* transmission request already pending */ - if (NULL == (p = h->pending_head)) - return; /* transmission queue empty */ - if (NULL == h->client) - return; /* currently reconnecting */ - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, p->size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_message_to_namecache, - h); - GNUNET_break (NULL != h->th); + struct GNUNET_NAMECACHE_Handle *h = cls; + + force_reconnect (h); } @@ -484,10 +324,23 @@ do_transmit (struct GNUNET_NAMECACHE_Handle *h) static void reconnect (struct GNUNET_NAMECACHE_Handle *h) { - GNUNET_assert (NULL == h->client); - h->client = GNUNET_CLIENT_connect ("namecache", h->cfg); - GNUNET_assert (NULL != h->client); - do_transmit (h); + GNUNET_MQ_hd_var_size (lookup_block_response, + GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE, + struct LookupBlockResponseMessage); + GNUNET_MQ_hd_fixed_size (block_cache_response, + GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE, + struct BlockCacheResponseMessage); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_lookup_block_response_handler (h), + make_block_cache_response_handler (h), + GNUNET_MQ_handler_end () + }; + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connecT (h->cfg, + "namecache", + handlers, + &mq_error_handler, + h); } @@ -514,17 +367,24 @@ reconnect_task (void *cls) static void force_reconnect (struct GNUNET_NAMECACHE_Handle *h) { - if (NULL != h->th) + struct GNUNET_NAMECACHE_QueueEntry *qe; + + h->reconnect = GNUNET_NO; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; + while (NULL != (qe = h->op_head)) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; + GNUNET_CONTAINER_DLL_remove (h->op_head, + h->op_tail, + qe); + if (NULL != qe->cont) + qe->cont (qe->cont_cls, + GNUNET_SYSERR, + _("Error communicating with namecache service")); + GNUNET_free (qe); } - h->reconnect = GNUNET_NO; - GNUNET_CLIENT_disconnect (h->client); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting to namecache\n"); - h->is_receiving = GNUNET_NO; - h->client = NULL; h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect_task, @@ -558,8 +418,12 @@ GNUNET_NAMECACHE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) h = GNUNET_new (struct GNUNET_NAMECACHE_Handle); h->cfg = cfg; - h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect_task, h); - h->last_op_id_used = 0; + reconnect (h); + if (NULL == h->mq) + { + GNUNET_free (h); + return NULL; + } return h; } @@ -573,31 +437,20 @@ GNUNET_NAMECACHE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) void GNUNET_NAMECACHE_disconnect (struct GNUNET_NAMECACHE_Handle *h) { - struct PendingMessage *p; struct GNUNET_NAMECACHE_QueueEntry *q; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up\n"); - GNUNET_assert (NULL != h); - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - while (NULL != (p = h->pending_head)) - { - GNUNET_CONTAINER_DLL_remove (h->pending_head, h->pending_tail, p); - GNUNET_free (p); - } GNUNET_break (NULL == h->op_head); while (NULL != (q = h->op_head)) { - GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, q); + GNUNET_CONTAINER_DLL_remove (h->op_head, + h->op_tail, + q); GNUNET_free (q); } - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } if (NULL != h->reconnect_task) { @@ -615,8 +468,8 @@ GNUNET_NAMECACHE_disconnect (struct GNUNET_NAMECACHE_Handle *h) * @param h handle to the namecache * @param block block to store * @param cont continuation to call when done - * @param cont_cls closure for cont - * @return handle to abort the request + * @param cont_cls closure for @a cont + * @return handle to abort the request, NULL on error */ struct GNUNET_NAMECACHE_QueueEntry * GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, @@ -625,13 +478,13 @@ GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, void *cont_cls) { struct GNUNET_NAMECACHE_QueueEntry *qe; - struct PendingMessage *pe; struct BlockCacheMessage *msg; + struct GNUNET_MQ_Envelope *env; uint32_t rid; size_t blen; - size_t msg_size; - GNUNET_assert (NULL != h); + if (NULL == h->mq) + return NULL; blen = ntohl (block->purpose.size) - sizeof (struct GNUNET_TIME_AbsoluteNBO) - sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose); @@ -641,27 +494,22 @@ GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, qe->cont = cont; qe->cont_cls = cont_cls; qe->op_id = rid; - GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, qe); - - /* setup msg */ - msg_size = sizeof (struct BlockCacheMessage) + blen; - pe = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size); - pe->size = msg_size; - msg = (struct BlockCacheMessage *) &pe[1]; - msg->gns_header.header.type = htons (GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE); - msg->gns_header.header.size = htons (msg_size); + GNUNET_CONTAINER_DLL_insert_tail (h->op_head, + h->op_tail, + qe); + /* send msg */ + env = GNUNET_MQ_msg_extra (msg, + blen, + GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE); msg->gns_header.r_id = htonl (rid); msg->expire = block->expiration_time; msg->signature = block->signature; msg->derived_key = block->derived_key; - memcpy (&msg[1], &block[1], blen); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending `%s' message with size %u and expiration %s\n", - "NAMECACHE_BLOCK_CACHE", - (unsigned int) msg_size, - GNUNET_STRINGS_absolute_time_to_string (GNUNET_TIME_absolute_ntoh (msg->expire))); - GNUNET_CONTAINER_DLL_insert_tail (h->pending_head, h->pending_tail, pe); - do_transmit (h); + memcpy (&msg[1], + &block[1], + blen); + GNUNET_MQ_send (h->mq, + env); return qe; } @@ -674,41 +522,40 @@ GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, * @param derived_hash hash of zone key combined with name to lookup * @param proc function to call on the matching block, or with * NULL if there is no matching block - * @param proc_cls closure for proc - * @return a handle that can be used to cancel + * @param proc_cls closure for @a proc + * @return a handle that can be used to cancel, NULL on error */ struct GNUNET_NAMECACHE_QueueEntry * GNUNET_NAMECACHE_lookup_block (struct GNUNET_NAMECACHE_Handle *h, const struct GNUNET_HashCode *derived_hash, - GNUNET_NAMECACHE_BlockProcessor proc, void *proc_cls) + GNUNET_NAMECACHE_BlockProcessor proc, + void *proc_cls) { struct GNUNET_NAMECACHE_QueueEntry *qe; - struct PendingMessage *pe; struct LookupBlockMessage *msg; - size_t msg_size; + struct GNUNET_MQ_Envelope *env; uint32_t rid; + if (NULL == h->mq) + return NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, "Looking for block under %s\n", GNUNET_h2s (derived_hash)); - rid = get_op_id(h); + rid = get_op_id (h); qe = GNUNET_new (struct GNUNET_NAMECACHE_QueueEntry); qe->nsh = h; qe->block_proc = proc; qe->block_proc_cls = proc_cls; qe->op_id = rid; - GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, qe); - - msg_size = sizeof (struct LookupBlockMessage); - pe = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size); - pe->size = msg_size; - msg = (struct LookupBlockMessage *) &pe[1]; - msg->gns_header.header.type = htons (GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK); - msg->gns_header.header.size = htons (msg_size); + GNUNET_CONTAINER_DLL_insert_tail (h->op_head, + h->op_tail, + qe); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK); msg->gns_header.r_id = htonl (rid); msg->query = *derived_hash; - GNUNET_CONTAINER_DLL_insert_tail (h->pending_head, h->pending_tail, pe); - do_transmit (h); + GNUNET_MQ_send (h->mq, + env); return qe; } @@ -724,8 +571,9 @@ GNUNET_NAMECACHE_cancel (struct GNUNET_NAMECACHE_QueueEntry *qe) { struct GNUNET_NAMECACHE_Handle *h = qe->nsh; - GNUNET_assert (NULL != qe); - GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, qe); + GNUNET_CONTAINER_DLL_remove (h->op_head, + h->op_tail, + qe); GNUNET_free(qe); } -- 2.25.1