X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdatastore%2Fdatastore_api.c;h=916e6acaef930121af9a4bfa04f0be4c72437d35;hb=0945dcf2c250dea65d520ef26f9917e9be3ac4ac;hp=cca67f6bca999ed6d2cb7e5dce06751abf8e2193;hpb=bb5fe91d23b0938baa3c4f0e92a83df659df216a;p=oweals%2Fgnunet.git diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index cca67f6bc..916e6acae 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2004-2013 GNUnet e.V. + Copyright (C) 2004-2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -21,7 +21,7 @@ /** * @file datastore/datastore_api.c * @brief Management for the datastore for files stored on a GNUnet node. Implements - * a priority queue for requests (with timeouts). + * a priority queue for requests * @author Christian Grothoff */ #include "platform.h" @@ -57,7 +57,7 @@ struct StatusContext GNUNET_DATASTORE_ContinuationWithStatus cont; /** - * Closure for cont. + * Closure for @e cont. */ void *cont_cls; @@ -75,7 +75,7 @@ struct ResultContext GNUNET_DATASTORE_DatumProcessor proc; /** - * Closure for proc. + * Closure for @e proc. */ void *proc_cls; @@ -95,7 +95,6 @@ union QueueContext }; - /** * Entry in our priority queue. */ @@ -117,20 +116,13 @@ struct GNUNET_DATASTORE_QueueEntry */ struct GNUNET_DATASTORE_Handle *h; - /** - * Response processor (NULL if we are not waiting for a response). - * This struct should be used for the closure, function-specific - * arguments can be passed via 'qc'. - */ - GNUNET_CLIENT_MessageHandler response_proc; - /** * Function to call after transmission of the request. */ GNUNET_DATASTORE_ContinuationWithStatus cont; /** - * Closure for 'cont'. + * Closure for @e cont. */ void *cont_cls; @@ -140,14 +132,10 @@ struct GNUNET_DATASTORE_QueueEntry union QueueContext qc; /** - * Task for timeout signalling. - */ - struct GNUNET_SCHEDULER_Task * task; - - /** - * Timeout for the current operation. + * Envelope of the request to transmit, NULL after + * transmission. */ - struct GNUNET_TIME_Absolute timeout; + struct GNUNET_MQ_Envelope *env; /** * Priority in the queue. @@ -161,22 +149,13 @@ struct GNUNET_DATASTORE_QueueEntry unsigned int max_queue; /** - * Number of bytes in the request message following - * this struct. 32-bit value for nicer memory - * access (and overall struct alignment). - */ - uint32_t message_size; - - /** - * Has this message been transmitted to the service? - * Only ever GNUNET_YES for the head of the queue. - * Note that the overall struct should end at a - * multiple of 64 bits. + * Expected response type. */ - int was_transmitted; + uint16_t response_type; }; + /** * Handle to the datastore service. */ @@ -191,18 +170,13 @@ struct GNUNET_DATASTORE_Handle /** * Current connection to the datastore service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Handle for statistics. */ struct GNUNET_STATISTICS_Handle *stats; - /** - * Current transmit handle. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - /** * Current head of priority queue. */ @@ -216,7 +190,7 @@ struct GNUNET_DATASTORE_Handle /** * Task for trying to reconnect. */ - struct GNUNET_SCHEDULER_Task * reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** * How quickly should we retry? Used for exponential back-off on @@ -236,11 +210,6 @@ struct GNUNET_DATASTORE_Handle */ unsigned int result_count; - /** - * Are we currently trying to receive from the service? - */ - int in_receive; - /** * We should ignore the next message(s) from the service. */ @@ -249,6 +218,111 @@ struct GNUNET_DATASTORE_Handle }; +/** + * Try reconnecting to the datastore service. + * + * @param cls the `struct GNUNET_DATASTORE_Handle` + */ +static void +try_reconnect (void *cls); + + +/** + * Disconnect from the service and then try reconnecting to the datastore service + * after some delay. + * + * @param h handle to datastore to disconnect and reconnect + */ +static void +do_disconnect (struct GNUNET_DATASTORE_Handle *h) +{ + if (NULL == h->mq) + { + GNUNET_break (0); + return; + } + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; + h->skip_next_messages = 0; + h->reconnect_task + = GNUNET_SCHEDULER_add_delayed (h->retry_time, + &try_reconnect, + h); +} + + +/** + * Free a queue entry. Removes the given entry from the + * queue and releases associated resources. Does NOT + * call the callback. + * + * @param qe entry to free. + */ +static void +free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) +{ + struct GNUNET_DATASTORE_Handle *h = qe->h; + + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + h->queue_size--; + if (NULL != qe->env) + GNUNET_MQ_discard (qe->env); + GNUNET_free (qe); +} + + +/** + * Handle error in sending drop request to datastore. + * + * @param cls closure with the datastore handle + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "MQ error, reconnecting to DATASTORE\n"); + do_disconnect (h); + qe = h->queue_head; + if ( (NULL != qe) && + (NULL == qe->env) ) + { + union QueueContext qc = qe->qc; + uint16_t rt = qe->response_type; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Failed to receive response from database.\n"); + free_queue_entry (qe); + switch (rt) + { + case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: + if (NULL != qc.sc.cont) + qc.sc.cont (qc.sc.cont_cls, + GNUNET_SYSERR, + GNUNET_TIME_UNIT_ZERO_ABS, + _("DATASTORE disconnected")); + break; + case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: + if (NULL != qc.rc.proc) + qc.rc.proc (qc.rc.proc_cls, + NULL, + 0, + NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); + break; + default: + GNUNET_break (0); + } + } +} + /** * Connect to the datastore service. @@ -259,64 +333,58 @@ struct GNUNET_DATASTORE_Handle struct GNUNET_DATASTORE_Handle * GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) { - struct GNUNET_CLIENT_Connection *c; struct GNUNET_DATASTORE_Handle *h; - c = GNUNET_CLIENT_connect ("datastore", cfg); - if (c == NULL) - return NULL; /* oops */ - h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) + - GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); - h->client = c; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Establishing DATASTORE connection!\n"); + h = GNUNET_new (struct GNUNET_DATASTORE_Handle); h->cfg = cfg; - h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); + try_reconnect (h); + if (NULL == h->mq) + { + GNUNET_free (h); + return NULL; + } + h->stats = GNUNET_STATISTICS_create ("datastore-api", + cfg); return h; } /** - * Task used by 'transmit_drop' to disconnect the datastore. + * Task used by to disconnect from the datastore after + * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message. * * @param cls the datastore handle - * @param tc scheduler context */ static void -disconnect_after_drop (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +disconnect_after_drop (void *cls) { struct GNUNET_DATASTORE_Handle *h = cls; - GNUNET_DATASTORE_disconnect (h, GNUNET_NO); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Drop sent, disconnecting\n"); + GNUNET_DATASTORE_disconnect (h, + GNUNET_NO); } /** - * Transmit DROP message to datastore service. + * Handle error in sending drop request to datastore. * - * @param cls the `struct GNUNET_DATASTORE_Handle` - * @param size number of bytes that can be copied to @a buf - * @param buf where to copy the drop message - * @return number of bytes written to @a buf + * @param cls closure with the datastore handle + * @param error error code */ -static size_t -transmit_drop (void *cls, size_t size, void *buf) +static void +disconnect_on_mq_error (void *cls, + enum GNUNET_MQ_Error error) { struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_MessageHeader *hdr; - if (buf == NULL) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to drop database.\n")); - GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h); - return 0; - } - GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); - hdr = buf; - hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); - hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP); - GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h); - return sizeof (struct GNUNET_MessageHeader); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Failed to ask datastore to drop tables\n"); + GNUNET_DATASTORE_disconnect (h, + GNUNET_NO); } @@ -333,16 +401,12 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, { struct GNUNET_DATASTORE_QueueEntry *qe; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n"); - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - if (NULL != h->client) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Datastore disconnect\n"); + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } if (NULL != h->reconnect_task) { @@ -351,24 +415,52 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, } while (NULL != (qe = h->queue_head)) { - GNUNET_assert (NULL != qe->response_proc); - qe->response_proc (h, NULL); + switch (qe->response_type) + { + case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: + if (NULL != qe->qc.sc.cont) + qe->qc.sc.cont (qe->qc.sc.cont_cls, + GNUNET_SYSERR, + GNUNET_TIME_UNIT_ZERO_ABS, + _("Disconnected from DATASTORE")); + break; + case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: + if (NULL != qe->qc.rc.proc) + qe->qc.rc.proc (qe->qc.rc.proc_cls, + NULL, + 0, + NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); + break; + default: + GNUNET_break (0); + } + free_queue_entry (qe); } if (GNUNET_YES == drop) { - h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); - if (NULL != h->client) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Re-connecting to issue DROP!\n"); + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connect (h->cfg, + "datastore", + NULL, + &disconnect_on_mq_error, + h); + if (NULL != h->mq) { - if (NULL != - GNUNET_CLIENT_notify_transmit_ready (h->client, - sizeof (struct - GNUNET_MessageHeader), - GNUNET_TIME_UNIT_SECONDS, - GNUNET_YES, - &transmit_drop, h)) - return; - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + struct GNUNET_MessageHeader *hdr; + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg (hdr, + GNUNET_MESSAGE_TYPE_DATASTORE_DROP); + GNUNET_MQ_notify_sent (env, + &disconnect_after_drop, + h); + GNUNET_MQ_send (h->mq, + env); + return; } GNUNET_break (0); } @@ -379,88 +471,68 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, } -/** - * A request has timed out (before being transmitted to the service). - * - * @param cls the `struct GNUNET_DATASTORE_QueueEntry` - * @param tc scheduler context - */ -static void -timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_DATASTORE_QueueEntry *qe = cls; - struct GNUNET_DATASTORE_Handle *h = qe->h; - - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# queue entry timeouts"), 1, - GNUNET_NO); - qe->task = NULL; - GNUNET_assert (GNUNET_NO == qe->was_transmitted); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout of request in datastore queue\n"); - /* response_proc's expect request at the head of the queue! */ - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - GNUNET_CONTAINER_DLL_insert (h->queue_head, - h->queue_tail, - qe); - GNUNET_assert (h->queue_head == qe); - qe->response_proc (qe->h, NULL); -} - - /** * Create a new entry for our priority queue (and possibly discard other entires if * the queue is getting too long). * * @param h handle to the datastore - * @param msize size of the message to queue + * @param env envelope with the message to queue * @param queue_priority priority of the entry * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout timeout for the operation - * @param response_proc function to call with replies (can be NULL) + * @param expected_type which type of response do we expect, + * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or + * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA * @param qc client context (NOT a closure for @a response_proc) * @return NULL if the queue is full */ static struct GNUNET_DATASTORE_QueueEntry * make_queue_entry (struct GNUNET_DATASTORE_Handle *h, - size_t msize, + struct GNUNET_MQ_Envelope *env, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_CLIENT_MessageHandler response_proc, + uint16_t expected_type, const union QueueContext *qc) { - struct GNUNET_DATASTORE_QueueEntry *ret; + struct GNUNET_DATASTORE_QueueEntry *qe; struct GNUNET_DATASTORE_QueueEntry *pos; unsigned int c; - c = 0; - pos = h->queue_head; - while ((pos != NULL) && (c < max_queue_size) && - (pos->priority >= queue_priority)) + if ( (NULL != h->queue_tail) && + (h->queue_tail->priority >= queue_priority) ) + { + c = h->queue_size; + pos = NULL; + } + else + { + c = 0; + pos = h->queue_head; + } + while ( (NULL != pos) && + (c < max_queue_size) && + (pos->priority >= queue_priority) ) { c++; pos = pos->next; } if (c >= max_queue_size) { - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1, + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue overflows"), + 1, GNUNET_NO); + GNUNET_MQ_discard (env); return NULL; } - ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); - ret->h = h; - ret->response_proc = response_proc; - ret->qc = *qc; - ret->timeout = GNUNET_TIME_relative_to_absolute (timeout); - ret->priority = queue_priority; - ret->max_queue = max_queue_size; - ret->message_size = msize; - ret->was_transmitted = GNUNET_NO; - if (pos == NULL) + qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry); + qe->h = h; + qe->env = env; + qe->response_type = expected_type; + qe->qc = *qc; + qe->priority = queue_priority; + qe->max_queue = max_queue_size; + if (NULL == pos) { /* append at the tail */ pos = h->queue_tail; @@ -470,39 +542,23 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, pos = pos->prev; /* do not insert at HEAD if HEAD query was already * transmitted and we are still receiving replies! */ - if ((pos == NULL) && (h->queue_head->was_transmitted)) + if ( (NULL == pos) && + (NULL == h->queue_head->env) ) pos = h->queue_head; } c++; #if INSANE_STATISTICS - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"), - 1, GNUNET_NO); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue entries created"), + 1, + GNUNET_NO); #endif - GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret); + GNUNET_CONTAINER_DLL_insert_after (h->queue_head, + h->queue_tail, + pos, + qe); h->queue_size++; - ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret); - for (pos = ret->next; NULL != pos; pos = pos->next) - { - if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO)) - { - GNUNET_assert (NULL != pos->response_proc); - /* move 'pos' element to head so that it will be - * killed on 'NULL' call below */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Dropping request from datastore queue\n"); - /* response_proc's expect request at the head of the queue! */ - GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos); - GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos); - GNUNET_STATISTICS_update (h->stats, - gettext_noop - ("# Requests dropped from datastore queue"), 1, - GNUNET_NO); - GNUNET_assert (h->queue_head == pos); - pos->response_proc (h, NULL); - break; - } - } - return ret; + return qe; } @@ -513,77 +569,88 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, * @param h handle to the datastore */ static void -process_queue (struct GNUNET_DATASTORE_Handle *h); - - -/** - * Try reconnecting to the datastore service. - * - * @param cls the `struct GNUNET_DATASTORE_Handle` - * @param tc scheduler context - */ -static void -try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +process_queue (struct GNUNET_DATASTORE_Handle *h) { - struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; - h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); - h->reconnect_task = NULL; - h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); - if (h->client == NULL) + if (NULL == (qe = h->queue_head)) { - LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n"); + /* no entry in queue */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queue empty\n"); return; } - GNUNET_STATISTICS_update (h->stats, - gettext_noop - ("# datastore connections (re)created"), 1, - GNUNET_NO); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); - process_queue (h); + if (NULL == qe->env) + { + /* waiting for replies */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Head request already transmitted\n"); + return; + } + if (NULL == h->mq) + { + /* waiting for reconnect */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Not connected\n"); + return; + } + GNUNET_MQ_send (h->mq, + qe->env); + qe->env = NULL; } + + /** - * Disconnect from the service and then try reconnecting to the datastore service - * after some delay. + * Function called to check status message from the service. * - * @param h handle to datastore to disconnect and reconnect + * @param cls closure + * @param sm status message received + * @return #GNUNET_OK if the message is well-formed */ -static void -do_disconnect (struct GNUNET_DATASTORE_Handle *h) +static int +check_status (void *cls, + const struct StatusMessage *sm) { - if (NULL == h->client) + uint16_t msize = ntohs (sm->header.size) - sizeof (*sm); + int32_t status = ntohl (sm->status); + + if (msize > 0) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Client NULL in disconnect, will not try to reconnect\n"); - return; + const char *emsg = (const char *) &sm[1]; + + if ('\0' != emsg[msize - 1]) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } } - GNUNET_CLIENT_disconnect (h->client); - h->skip_next_messages = 0; - h->client = NULL; - h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h); + else if (GNUNET_SYSERR == status) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** - * Function called whenever we receive a message from - * the service. Calls the appropriate handler. + * Function called to handle status message from the service. * - * @param cls the `struct GNUNET_DATASTORE_Handle` - * @param msg the received message + * @param cls closure + * @param sm status message received */ static void -receive_cb (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_status (void *cls, + const struct StatusMessage *sm) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; + struct StatusContext rc; + const char *emsg; + int32_t status = ntohl (sm->status); - h->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving reply from datastore\n"); if (h->skip_next_messages > 0) { h->skip_next_messages--; @@ -593,248 +660,255 @@ receive_cb (void *cls, if (NULL == (qe = h->queue_head)) { GNUNET_break (0); - process_queue (h); + do_disconnect (h); return; } - qe->response_proc (h, msg); + if (NULL != qe->env) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + rc = qe->qc.sc; + free_queue_entry (qe); + if (ntohs (sm->header.size) > sizeof (struct StatusMessage)) + emsg = (const char *) &sm[1]; + else + emsg = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received status %d/%s\n", + (int) status, + emsg); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# status messages received"), + 1, + GNUNET_NO); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + process_queue (h); + if (NULL != rc.cont) + rc.cont (rc.cont_cls, + status, + GNUNET_TIME_absolute_ntoh (sm->min_expiration), + emsg); } /** - * Transmit request from queue to datastore service. + * Check data message we received from the service. * - * @param cls the `struct GNUNET_DATASTORE_Handle` - * @param size number of bytes that can be copied to @a buf - * @param buf where to copy the drop message - * @return number of bytes written to @a buf + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param dm message received */ -static size_t -transmit_request (void *cls, - size_t size, - void *buf) +static int +check_data (void *cls, + const struct DataMessage *dm) { - struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_DATASTORE_QueueEntry *qe; - size_t msize; + uint16_t msize = ntohs (dm->header.size) - sizeof (*dm); - h->th = NULL; - if (NULL == (qe = h->queue_head)) - return 0; /* no entry in queue */ - if (NULL == buf) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to DATASTORE.\n"); - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# transmission request failures"), - 1, GNUNET_NO); - do_disconnect (h); - return 0; - } - if (size < (msize = qe->message_size)) + if (msize != ntohl (dm->size)) { - process_queue (h); - return 0; + GNUNET_break (0); + return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u byte request to DATASTORE\n", - msize); - memcpy (buf, &qe[1], msize); - qe->was_transmitted = GNUNET_YES; - GNUNET_SCHEDULER_cancel (qe->task); - qe->task = NULL; - GNUNET_assert (GNUNET_NO == h->in_receive); - h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, - &receive_cb, h, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); -#if INSANE_STATISTICS - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# bytes sent to datastore"), msize, - GNUNET_NO); -#endif - return msize; + return GNUNET_OK; } /** - * Process entries in the queue (or do nothing if we are already - * doing so). + * Handle data message we got from the service. * - * @param h handle to the datastore + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param dm message received */ static void -process_queue (struct GNUNET_DATASTORE_Handle *h) +handle_data (void *cls, + const struct DataMessage *dm) { + struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; + struct ResultContext rc; - if (NULL == (qe = h->queue_head)) - { - /* no entry in queue */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queue empty\n"); - return; - } - if (GNUNET_YES == qe->was_transmitted) - { - /* waiting for replies */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Head request already transmitted\n"); - return; - } - if (NULL != h->th) + if (h->skip_next_messages > 0) { - /* request pending */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Pending transmission request\n"); + process_queue (h); return; } - if (NULL == h->client) + qe = h->queue_head; + if (NULL == qe) { - /* waiting for reconnect */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Not connected\n"); + GNUNET_break (0); + do_disconnect (h); return; } - if (GNUNET_YES == h->in_receive) + if (NULL != qe->env) { - /* wait for response to previous query */ + GNUNET_break (0); + do_disconnect (h); return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queueing %u byte request to DATASTORE\n", - qe->message_size); - h->th - = GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, - GNUNET_TIME_absolute_get_remaining (qe->timeout), - GNUNET_YES, - &transmit_request, h); - GNUNET_assert (GNUNET_NO == h->in_receive); - GNUNET_break (NULL != h->th); -} - - -/** - * Dummy continuation used to do nothing (but be non-zero). - * - * @param cls closure - * @param result result - * @param min_expiration expiration time - * @param emsg error message - */ -static void -drop_status_cont (void *cls, int32_t result, - struct GNUNET_TIME_Absolute min_expiration, - const char *emsg) -{ - /* do nothing */ -} - - -/** - * Free a queue entry. Removes the given entry from the - * queue and releases associated resources. Does NOT - * call the callback. - * - * @param qe entry to free. - */ -static void -free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) -{ - struct GNUNET_DATASTORE_Handle *h = qe->h; - - GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe); - if (qe->task != NULL) + if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) { - GNUNET_SCHEDULER_cancel (qe->task); - qe->task = NULL; + GNUNET_break (0); + do_disconnect (h); + return; } - h->queue_size--; - qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ - GNUNET_free (qe); +#if INSANE_STATISTICS + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# Results received"), + 1, + GNUNET_NO); +#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received result %llu with type %u and size %u with key %s\n", + (unsigned long long) GNUNET_ntohll (dm->uid), + ntohl (dm->type), + ntohl (dm->size), + GNUNET_h2s (&dm->key)); + rc = qe->qc.rc; + free_queue_entry (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + process_queue (h); + if (NULL != rc.proc) + rc.proc (rc.proc_cls, + &dm->key, + ntohl (dm->size), + &dm[1], + ntohl (dm->type), + ntohl (dm->priority), + ntohl (dm->anonymity), + GNUNET_TIME_absolute_ntoh (dm->expiration), + GNUNET_ntohll (dm->uid)); } /** - * Type of a function to call when we receive a message - * from the service. + * Type of a function to call when we receive a + * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service. * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param msg message received */ static void -process_status_message (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_data_end (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; - struct StatusContext rc; - const struct StatusMessage *sm; - const char *emsg; - int32_t status; - int was_transmitted; + struct ResultContext rc; - if (NULL == (qe = h->queue_head)) + if (h->skip_next_messages > 0) { - GNUNET_break (0); - do_disconnect (h); + h->skip_next_messages--; + process_queue (h); return; } - rc = qe->qc.sc; - if (NULL == msg) + qe = h->queue_head; + if (NULL == qe) { - was_transmitted = qe->was_transmitted; - free_queue_entry (qe); - if (was_transmitted == GNUNET_YES) - do_disconnect (h); - else - process_queue (h); - if (NULL != rc.cont) - rc.cont (rc.cont_cls, GNUNET_SYSERR, - GNUNET_TIME_UNIT_ZERO_ABS, - _("Failed to receive status response from database.")); + GNUNET_break (0); + do_disconnect (h); return; } - GNUNET_assert (GNUNET_YES == qe->was_transmitted); - free_queue_entry (qe); - if ((ntohs (msg->size) < sizeof (struct StatusMessage)) || - (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS)) + if (NULL != qe->env) { GNUNET_break (0); - h->retry_time = GNUNET_TIME_UNIT_ZERO; do_disconnect (h); - if (rc.cont != NULL) - rc.cont (rc.cont_cls, GNUNET_SYSERR, - GNUNET_TIME_UNIT_ZERO_ABS, - _("Error reading response from datastore service")); return; } - sm = (const struct StatusMessage *) msg; - status = ntohl (sm->status); - emsg = NULL; - if (ntohs (msg->size) > sizeof (struct StatusMessage)) - { - emsg = (const char *) &sm[1]; - if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0') - { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); - } - } - if ((status == GNUNET_SYSERR) && (emsg == NULL)) + if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) { GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); + do_disconnect (h); + return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg); + rc = qe->qc.rc; + free_queue_entry (qe); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received end of result set, new queue size is %u\n", + h->queue_size); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + h->result_count = 0; + process_queue (h); + /* signal end of iteration */ + if (NULL != rc.proc) + rc.proc (rc.proc_cls, + NULL, + 0, + NULL, + 0, + 0, + 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); +} + + +/** + * Try reconnecting to the datastore service. + * + * @param cls the `struct GNUNET_DATASTORE_Handle` + */ +static void +try_reconnect (void *cls) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (status, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + struct StatusMessage, + h), + GNUNET_MQ_hd_var_size (data, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + struct DataMessage, + h), + GNUNET_MQ_hd_fixed_size (data_end, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END, + struct GNUNET_MessageHeader, + h), + GNUNET_MQ_handler_end () + }; + + h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); + h->reconnect_task = NULL; + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connect (h->cfg, + "datastore", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) + return; GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# status messages received"), 1, + gettext_noop ("# datastore connections (re)created"), + 1, GNUNET_NO); - h->retry_time = GNUNET_TIME_UNIT_ZERO; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Reconnected to DATASTORE\n"); process_queue (h); - if (rc.cont != NULL) - rc.cont (rc.cont_cls, status, - GNUNET_TIME_absolute_ntoh (sm->min_expiration), - emsg); +} + + +/** + * Dummy continuation used to do nothing (but be non-zero). + * + * @param cls closure + * @param result result + * @param min_expiration expiration time + * @param emsg error message + */ +static void +drop_status_cont (void *cls, + int32_t result, + struct GNUNET_TIME_Absolute min_expiration, + const char *emsg) +{ + /* do nothing */ } @@ -857,7 +931,6 @@ process_status_message (void *cls, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout timeout for the operation * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -877,40 +950,29 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct DataMessage *dm; - size_t msize; union QueueContext qc; + if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + return NULL; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, - "Asked to put %u bytes of data under key `%s' for %s\n", size, + "Asked to put %u bytes of data under key `%s' for %s\n", + size, GNUNET_h2s (key), GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), GNUNET_YES)); - msize = sizeof (struct DataMessage) + size; - GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - qc.sc.cont = cont; - qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, - msize, - queue_priority, - max_queue_size, - timeout, - &process_status_message, &qc); - if (qe == NULL) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n"); - return NULL; - } - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"), - 1, GNUNET_NO); - dm = (struct DataMessage *) &qe[1]; - dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT); - dm->header.size = htons (msize); + env = GNUNET_MQ_msg_extra (dm, + size, + GNUNET_MESSAGE_TYPE_DATASTORE_PUT); dm->rid = htonl (rid); dm->size = htonl ((uint32_t) size); dm->type = htonl (type); @@ -921,7 +983,27 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, dm->uid = GNUNET_htonll (0); dm->expiration = GNUNET_TIME_absolute_hton (expiration); dm->key = *key; - memcpy (&dm[1], data, size); + GNUNET_memcpy (&dm[1], + data, + size); + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for PUT\n"); + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# PUT requests executed"), + 1, + GNUNET_NO); process_queue (h); return qe; } @@ -943,12 +1025,14 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, +GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, + uint64_t amount, uint32_t entries, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct ReserveMessage *rm; union QueueContext qc; @@ -956,15 +1040,21 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, cont = &drop_status_cont; LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to reserve %llu bytes of data and %u entries\n", - (unsigned long long) amount, (unsigned int) entries); + (unsigned long long) amount, + (unsigned int) entries); + env = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); + rm->entries = htonl (entries); + rm->amount = GNUNET_htonll (amount); + qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; qe = make_queue_entry (h, - sizeof (struct ReserveMessage), + env, UINT_MAX, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_status_message, &qc); + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -972,13 +1062,9 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, return NULL; } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# RESERVE requests executed"), 1, + gettext_noop ("# RESERVE requests executed"), + 1, GNUNET_NO); - rm = (struct ReserveMessage *) &qe[1]; - rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); - rm->header.size = htons (sizeof (struct ReserveMessage)); - rm->entries = htonl (entries); - rm->amount = GNUNET_htonll (amount); process_queue (h); return qe; } @@ -998,7 +1084,6 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -1007,25 +1092,34 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, - uint32_t rid, unsigned int queue_priority, + uint32_t rid, + unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct ReleaseReserveMessage *rrm; union QueueContext qc; - if (cont == NULL) + if (NULL == cont) cont = &drop_status_cont; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to release reserve %d\n", + rid); + env = GNUNET_MQ_msg (rrm, + GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); + rrm->rid = htonl (rid); qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, &qc); - if (qe == NULL) + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry to release reserve\n"); @@ -1035,72 +1129,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, gettext_noop ("# RELEASE RESERVE requests executed"), 1, GNUNET_NO); - rrm = (struct ReleaseReserveMessage *) &qe[1]; - rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); - rrm->header.size = htons (sizeof (struct ReleaseReserveMessage)); - rrm->rid = htonl (rid); - process_queue (h); - return qe; -} - - -/** - * Update a value in the datastore. - * - * @param h handle to the datastore - * @param uid identifier for the value - * @param priority how much to increase the priority of the value - * @param expiration new expiration value should be MAX of existing and this argument - * @param queue_priority ranking of this request in the priority queue - * @param max_queue_size at what queue size should this request be dropped - * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response - * @param cont continuation to call when done - * @param cont_cls closure for @a cont - * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) - */ -struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, - uint32_t priority, - struct GNUNET_TIME_Absolute expiration, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) -{ - struct GNUNET_DATASTORE_QueueEntry *qe; - struct UpdateMessage *um; - union QueueContext qc; - - if (cont == NULL) - cont = &drop_status_cont; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Asked to update entry %llu raising priority by %u and expiration to %s\n", - uid, - (unsigned int) priority, - GNUNET_STRINGS_absolute_time_to_string (expiration)); - qc.sc.cont = cont; - qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority, - max_queue_size, timeout, &process_status_message, &qc); - if (qe == NULL) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for UPDATE\n"); - return NULL; - } - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# UPDATE requests executed"), 1, - GNUNET_NO); - um = (struct UpdateMessage *) &qe[1]; - um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); - um->header.size = htons (sizeof (struct UpdateMessage)); - um->priority = htonl (priority); - um->expiration = GNUNET_TIME_absolute_hton (expiration); - um->uid = GNUNET_htonll (uid); process_queue (h); return qe; } @@ -1120,7 +1148,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -1129,39 +1156,33 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, - const struct GNUNET_HashCode * key, size_t size, - const void *data, unsigned int queue_priority, + const struct GNUNET_HashCode *key, + size_t size, + const void *data, + unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct DataMessage *dm; - size_t msize; + struct GNUNET_MQ_Envelope *env; union QueueContext qc; - if (cont == NULL) - cont = &drop_status_cont; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n", - size, GNUNET_h2s (key)); - qc.sc.cont = cont; - qc.sc.cont_cls = cont_cls; - msize = sizeof (struct DataMessage) + size; - GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, - &process_status_message, &qc); - if (qe == NULL) + if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n"); + GNUNET_break (0); return NULL; } - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# REMOVE requests executed"), 1, - GNUNET_NO); - dm = (struct DataMessage *) &qe[1]; - dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); - dm->header.size = htons (msize); + if (NULL == cont) + cont = &drop_status_cont; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to remove %u bytes under key `%s'\n", + size, + GNUNET_h2s (key)); + env = GNUNET_MQ_msg_extra (dm, + size, + GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); dm->rid = htonl (0); dm->size = htonl (size); dm->type = htonl (0); @@ -1170,116 +1191,35 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, dm->uid = GNUNET_htonll (0); dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); dm->key = *key; - memcpy (&dm[1], data, size); - process_queue (h); - return qe; -} - + GNUNET_memcpy (&dm[1], + data, + size); -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` - * @param msg message received, NULL on timeout or fatal error - */ -static void -process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_DATASTORE_QueueEntry *qe; - struct ResultContext rc; - const struct DataMessage *dm; - int was_transmitted; + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; - if (NULL == msg) - { - qe = h->queue_head; - GNUNET_assert (NULL != qe); - rc = qe->qc.rc; - was_transmitted = qe->was_transmitted; - free_queue_entry (qe); - if (GNUNET_YES == was_transmitted) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Failed to receive response from database.\n"); - do_disconnect (h); - } - else - { - process_queue (h); - } - if (NULL != rc.proc) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; - } - if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) { - GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)); - qe = h->queue_head; - rc = qe->qc.rc; - GNUNET_assert (GNUNET_YES == qe->was_transmitted); - free_queue_entry (qe); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received end of result set, new queue size is %u\n", h->queue_size); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - h->result_count = 0; - process_queue (h); - if (NULL != rc.proc) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; - } - qe = h->queue_head; - GNUNET_assert (NULL != qe); - rc = qe->qc.rc; - if (GNUNET_YES != qe->was_transmitted) - { - GNUNET_break (0); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; - } - if ((ntohs (msg->size) < sizeof (struct DataMessage)) || - (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || - (ntohs (msg->size) != - sizeof (struct DataMessage) + - ntohl (((const struct DataMessage *) msg)->size))) - { - GNUNET_break (0); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; + "Could not create queue entry for REMOVE\n"); + return NULL; } -#if INSANE_STATISTICS - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1, + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# REMOVE requests executed"), + 1, GNUNET_NO); -#endif - dm = (const struct DataMessage *) msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received result %llu with type %u and size %u with key %s\n", - (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type), - ntohl (dm->size), GNUNET_h2s (&dm->key)); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; process_queue (h); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type), - ntohl (dm->priority), ntohl (dm->anonymity), - GNUNET_TIME_absolute_ntoh (dm->expiration), - GNUNET_ntohll (dm->uid)); + return qe; } + /** * Get a random value from the datastore for content replication. * Returns a single, random value among those with the highest @@ -1291,7 +1231,6 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param proc function to call on a random value; it * will be called once with a value (if available) * and always once with a value of NULL. @@ -1303,23 +1242,27 @@ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct GNUNET_MessageHeader *m; union QueueContext qc; GNUNET_assert (NULL != proc); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get replication entry in %s\n", - GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES)); + "Asked to get replication entry\n"); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; - qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader), - queue_priority, max_queue_size, timeout, - &process_result_message, &qc); + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + &qc); if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1330,9 +1273,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, gettext_noop ("# GET REPLICATION requests executed"), 1, GNUNET_NO); - m = (struct GNUNET_MessageHeader *) &qe[1]; - m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); - m->size = htons (sizeof (struct GNUNET_MessageHeader)); process_queue (h); return qe; } @@ -1349,7 +1289,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param type allowed type for the operation (never zero) * @param proc function to call on a random value; it * will be called once with a value (if available) @@ -1363,26 +1302,33 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, enum GNUNET_BLOCK_Type type, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct GetZeroAnonymityMessage *m; union QueueContext qc; GNUNET_assert (NULL != proc); GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get %llu-th zero-anonymity entry of type %d in %s\n", - (unsigned long long) offset, type, - GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES)); + "Asked to get %llu-th zero-anonymity entry of type %d\n", + (unsigned long long) offset, + type); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); + m->type = htonl ((uint32_t) type); + m->offset = GNUNET_htonll (offset); qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; - qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage), - queue_priority, max_queue_size, timeout, - &process_result_message, &qc); + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + &qc); if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1393,11 +1339,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, gettext_noop ("# GET ZERO ANONYMITY requests executed"), 1, GNUNET_NO); - m = (struct GetZeroAnonymityMessage *) &qe[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); - m->header.size = htons (sizeof (struct GetZeroAnonymityMessage)); - m->type = htonl ((uint32_t) type); - m->offset = GNUNET_htonll (offset); process_queue (h); return qe; } @@ -1417,7 +1358,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param proc function to call on each matching value; * will be called once with a NULL value at the end * @param proc_cls closure for @a proc @@ -1427,34 +1367,51 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, - const struct GNUNET_HashCode * key, + const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; + struct GetKeyMessage *gkm; struct GetMessage *gm; union QueueContext qc; GNUNET_assert (NULL != proc); LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to look for data of type %u under key `%s'\n", - (unsigned int) type, GNUNET_h2s (key)); + (unsigned int) type, + GNUNET_h2s (key)); + if (NULL == key) + { + env = GNUNET_MQ_msg (gm, + GNUNET_MESSAGE_TYPE_DATASTORE_GET); + gm->type = htonl (type); + gm->offset = GNUNET_htonll (offset); + } + else + { + env = GNUNET_MQ_msg (gkm, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); + gkm->type = htonl (type); + gkm->offset = GNUNET_htonll (offset); + gkm->key = *key; + } qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; qe = make_queue_entry (h, - sizeof (struct GetMessage), + env, queue_priority, max_queue_size, - timeout, - &process_result_message, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, &qc); - if (qe == NULL) + if (NULL == qe) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n", + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not queue request for `%s'\n", GNUNET_h2s (key)); return NULL; } @@ -1464,20 +1421,6 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, 1, GNUNET_NO); #endif - gm = (struct GetMessage *) &qe[1]; - gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET); - gm->type = htonl (type); - gm->offset = GNUNET_htonll (offset); - if (key != NULL) - { - gm->header.size = htons (sizeof (struct GetMessage)); - gm->key = *key; - } - else - { - gm->header.size = - htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode)); - } process_queue (h); return qe; } @@ -1492,14 +1435,14 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, void GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) { - struct GNUNET_DATASTORE_Handle *h; + struct GNUNET_DATASTORE_Handle *h = qe->h; - GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); - h = qe->h; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Pending DATASTORE request %p cancelled (%d, %d)\n", qe, - qe->was_transmitted, h->queue_head == qe); - if (GNUNET_YES == qe->was_transmitted) + "Pending DATASTORE request %p cancelled (%d, %d)\n", + qe, + NULL == qe->env, + h->queue_head == qe); + if (NULL == qe->env) { free_queue_entry (qe); h->skip_next_messages++;