From 3140154d46212e08e0d73ed891a66213a6813075 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 24 Jun 2016 20:17:39 +0000 Subject: [PATCH] refactoring datastore API to use MQ API, also fixing misc. bugs in new mysql backend --- src/datastore/datastore.h | 33 +- src/datastore/datastore_api.c | 1231 ++++++++--------- src/datastore/gnunet-datastore.c | 6 +- src/datastore/gnunet-service-datastore.c | 111 +- src/datastore/perf_datastore_api.c | 5 - src/datastore/plugin_datastore_mysql.c | 454 +++--- src/datastore/test_datastore_api.c | 41 +- src/datastore/test_datastore_api_management.c | 39 +- src/datastore/test_plugin_datastore.c | 48 +- .../test_plugin_datastore_data_mysql.conf | 3 +- src/fs/fs_publish.c | 3 - src/fs/fs_publish_ublock.c | 1 - src/fs/fs_unindex.c | 5 +- src/fs/gnunet-service-fs_cadet_server.c | 1 - src/fs/gnunet-service-fs_indexing.c | 8 +- src/fs/gnunet-service-fs_pr.c | 8 +- src/fs/gnunet-service-fs_push.c | 1 - src/fs/gnunet-service-fs_put.c | 1 - src/include/gnunet_datastore_service.h | 16 +- src/include/gnunet_protocols.h | 5 + 20 files changed, 1079 insertions(+), 941 deletions(-) diff --git a/src/datastore/datastore.h b/src/datastore/datastore.h index 5767ae6a0..dc3d9d1f2 100644 --- a/src/datastore/datastore.h +++ b/src/datastore/datastore.h @@ -106,12 +106,10 @@ struct ReleaseReserveMessage * Message to the datastore service asking about specific * content. */ -struct GetMessage +struct GetKeyMessage { /** - * Type is GNUNET_MESSAGE_TYPE_DATASTORE_GET. Size - * can either be "sizeof(struct GetMessage)" or - * "sizeof(struct GetMessage) - sizeof(struct GNUNET_HashCode)"! + * Type is #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY. */ struct GNUNET_MessageHeader header; @@ -126,14 +124,37 @@ struct GetMessage uint64_t offset GNUNET_PACKED; /** - * Desired key (optional). Check the "size" of the - * header to see if the key is actually present. + * Desired key. */ struct GNUNET_HashCode key; }; +/** + * Message to the datastore service asking about specific + * content. + */ +struct GetMessage +{ + /** + * Type is #GNUNET_MESSAGE_TYPE_DATASTORE_GET. + */ + struct GNUNET_MessageHeader header; + + /** + * Desired content type. (actually an enum GNUNET_BLOCK_Type) + */ + uint32_t type GNUNET_PACKED; + + /** + * Offset of the result. + */ + uint64_t offset GNUNET_PACKED; + +}; + + /** * Message to the datastore service asking about zero * anonymity content. diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index b2de3d35d..285634759 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -21,7 +21,7 @@ /** * @file datastore/datastore_api.c * @brief Management for the datastore for files stored on a GNUnet node. Implements - * a priority queue for requests (with timeouts). + * a priority queue for requests * @author Christian Grothoff */ #include "platform.h" @@ -95,7 +95,6 @@ union QueueContext }; - /** * Entry in our priority queue. */ @@ -117,13 +116,6 @@ struct GNUNET_DATASTORE_QueueEntry */ struct GNUNET_DATASTORE_Handle *h; - /** - * Response processor (NULL if we are not waiting for a response). - * This struct should be used for the closure, function-specific - * arguments can be passed via 'qc'. - */ - GNUNET_CLIENT_MessageHandler response_proc; - /** * Function to call after transmission of the request. */ @@ -140,14 +132,10 @@ struct GNUNET_DATASTORE_QueueEntry union QueueContext qc; /** - * Task for timeout signalling. - */ - struct GNUNET_SCHEDULER_Task *task; - - /** - * Timeout for the current operation. + * Envelope of the request to transmit, NULL after + * transmission. */ - struct GNUNET_TIME_Absolute timeout; + struct GNUNET_MQ_Envelope *env; /** * Priority in the queue. @@ -161,22 +149,13 @@ struct GNUNET_DATASTORE_QueueEntry unsigned int max_queue; /** - * Number of bytes in the request message following - * this struct. 32-bit value for nicer memory - * access (and overall struct alignment). - */ - uint32_t message_size; - - /** - * Has this message been transmitted to the service? - * Only ever #GNUNET_YES for the head of the queue. - * Note that the overall struct should end at a - * multiple of 64 bits. + * Expected response type. */ - int was_transmitted; + uint16_t response_type; }; + /** * Handle to the datastore service. */ @@ -191,18 +170,13 @@ struct GNUNET_DATASTORE_Handle /** * Current connection to the datastore service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Handle for statistics. */ struct GNUNET_STATISTICS_Handle *stats; - /** - * Current transmit handle. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - /** * Current head of priority queue. */ @@ -216,7 +190,7 @@ struct GNUNET_DATASTORE_Handle /** * Task for trying to reconnect. */ - struct GNUNET_SCHEDULER_Task * reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** * How quickly should we retry? Used for exponential back-off on @@ -236,11 +210,6 @@ struct GNUNET_DATASTORE_Handle */ unsigned int result_count; - /** - * Are we currently trying to receive from the service? - */ - int in_receive; - /** * We should ignore the next message(s) from the service. */ @@ -249,6 +218,110 @@ struct GNUNET_DATASTORE_Handle }; +/** + * Try reconnecting to the datastore service. + * + * @param cls the `struct GNUNET_DATASTORE_Handle` + */ +static void +try_reconnect (void *cls); + + +/** + * Disconnect from the service and then try reconnecting to the datastore service + * after some delay. + * + * @param h handle to datastore to disconnect and reconnect + */ +static void +do_disconnect (struct GNUNET_DATASTORE_Handle *h) +{ + if (NULL == h->mq) + { + GNUNET_break (0); + return; + } + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; + h->skip_next_messages = 0; + h->reconnect_task + = GNUNET_SCHEDULER_add_delayed (h->retry_time, + &try_reconnect, + h); +} + + +/** + * Free a queue entry. Removes the given entry from the + * queue and releases associated resources. Does NOT + * call the callback. + * + * @param qe entry to free. + */ +static void +free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) +{ + struct GNUNET_DATASTORE_Handle *h = qe->h; + + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + h->queue_size--; + GNUNET_free (qe); +} + + +/** + * Handle error in sending drop request to datastore. + * + * @param cls closure with the datastore handle + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "MQ error, reconnecting to DATASTORE\n"); + do_disconnect (h); + qe = h->queue_head; + if ( (NULL != qe) && + (NULL == qe->env) ) + { + union QueueContext qc = qe->qc; + uint16_t rt = qe->response_type; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Failed to receive response from database.\n"); + free_queue_entry (qe); + switch (rt) + { + case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: + if (NULL != qc.sc.cont) + qc.sc.cont (qc.sc.cont_cls, + GNUNET_SYSERR, + GNUNET_TIME_UNIT_ZERO_ABS, + _("DATASTORE disconnected")); + break; + case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: + if (NULL != qc.rc.proc) + qc.rc.proc (qc.rc.proc_cls, + NULL, + 0, + NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); + break; + default: + GNUNET_break (0); + } + } +} + + /** * Connect to the datastore service. * @@ -258,22 +331,27 @@ struct GNUNET_DATASTORE_Handle struct GNUNET_DATASTORE_Handle * GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) { - struct GNUNET_CLIENT_Connection *c; struct GNUNET_DATASTORE_Handle *h; - c = GNUNET_CLIENT_connect ("datastore", cfg); - if (c == NULL) - return NULL; /* oops */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Establishing DATASTORE connection!\n"); h = GNUNET_new (struct GNUNET_DATASTORE_Handle); - h->client = c; h->cfg = cfg; - h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); + try_reconnect (h); + if (NULL == h->mq) + { + GNUNET_free (h); + return NULL; + } + h->stats = GNUNET_STATISTICS_create ("datastore-api", + cfg); return h; } /** - * Task used by 'transmit_drop' to disconnect the datastore. + * Task used by to disconnect from the datastore after + * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message. * * @param cls the datastore handle */ @@ -282,39 +360,29 @@ disconnect_after_drop (void *cls) { struct GNUNET_DATASTORE_Handle *h = cls; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Drop sent, disconnecting\n"); GNUNET_DATASTORE_disconnect (h, GNUNET_NO); } /** - * Transmit DROP message to datastore service. + * Handle error in sending drop request to datastore. * - * @param cls the `struct GNUNET_DATASTORE_Handle` - * @param size number of bytes that can be copied to @a buf - * @param buf where to copy the drop message - * @return number of bytes written to @a buf + * @param cls closure with the datastore handle + * @param error error code */ -static size_t -transmit_drop (void *cls, size_t size, void *buf) +static void +disconnect_on_mq_error (void *cls, + enum GNUNET_MQ_Error error) { struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_MessageHeader *hdr; - if (buf == NULL) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to drop database.\n")); - GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h); - return 0; - } - GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); - hdr = buf; - hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); - hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP); - GNUNET_SCHEDULER_add_now (&disconnect_after_drop, - h); - return sizeof (struct GNUNET_MessageHeader); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Failed to ask datastore to drop tables\n"); + GNUNET_DATASTORE_disconnect (h, + GNUNET_NO); } @@ -333,15 +401,10 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n"); - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } if (NULL != h->reconnect_task) { @@ -350,25 +413,52 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, } while (NULL != (qe = h->queue_head)) { - GNUNET_assert (NULL != qe->response_proc); - qe->response_proc (h, NULL); + switch (qe->response_type) + { + case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: + if (NULL != qe->qc.sc.cont) + qe->qc.sc.cont (qe->qc.sc.cont_cls, + GNUNET_SYSERR, + GNUNET_TIME_UNIT_ZERO_ABS, + _("Disconnected from DATASTORE")); + break; + case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: + if (NULL != qe->qc.rc.proc) + qe->qc.rc.proc (qe->qc.rc.proc_cls, + NULL, + 0, + NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); + break; + default: + GNUNET_break (0); + } + free_queue_entry (qe); } if (GNUNET_YES == drop) { - h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); - if (NULL != h->client) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Re-connecting to issue DROP!\n"); + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connecT (h->cfg, + "datastore", + NULL, + &disconnect_on_mq_error, + h); + if (NULL != h->mq) { - if (NULL != - GNUNET_CLIENT_notify_transmit_ready (h->client, - sizeof (struct - GNUNET_MessageHeader), - GNUNET_TIME_UNIT_SECONDS, - GNUNET_YES, - &transmit_drop, - h)) - return; - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + struct GNUNET_MessageHeader *hdr; + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg (hdr, + GNUNET_MESSAGE_TYPE_DATASTORE_DROP); + GNUNET_MQ_notify_sent (env, + &disconnect_after_drop, + h); + GNUNET_MQ_send (h->mq, + env); + return; } GNUNET_break (0); } @@ -379,68 +469,38 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, } -/** - * A request has timed out (before being transmitted to the service). - * - * @param cls the `struct GNUNET_DATASTORE_QueueEntry` - */ -static void -timeout_queue_entry (void *cls) -{ - struct GNUNET_DATASTORE_QueueEntry *qe = cls; - struct GNUNET_DATASTORE_Handle *h = qe->h; - - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# queue entry timeouts"), - 1, - GNUNET_NO); - qe->task = NULL; - GNUNET_assert (GNUNET_NO == qe->was_transmitted); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout of request in datastore queue\n"); - /* response_proc's expect request at the head of the queue! */ - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - GNUNET_CONTAINER_DLL_insert (h->queue_head, - h->queue_tail, - qe); - GNUNET_assert (h->queue_head == qe); - qe->response_proc (qe->h, NULL); -} - - /** * Create a new entry for our priority queue (and possibly discard other entires if * the queue is getting too long). * * @param h handle to the datastore - * @param msize size of the message to queue + * @param env envelope with the message to queue * @param queue_priority priority of the entry * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout timeout for the operation - * @param response_proc function to call with replies (can be NULL) + * @param expected_type which type of response do we expect, + * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or + * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA * @param qc client context (NOT a closure for @a response_proc) * @return NULL if the queue is full */ static struct GNUNET_DATASTORE_QueueEntry * make_queue_entry (struct GNUNET_DATASTORE_Handle *h, - size_t msize, + struct GNUNET_MQ_Envelope *env, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_CLIENT_MessageHandler response_proc, + uint16_t expected_type, const union QueueContext *qc) { - struct GNUNET_DATASTORE_QueueEntry *ret; + struct GNUNET_DATASTORE_QueueEntry *qe; struct GNUNET_DATASTORE_QueueEntry *pos; unsigned int c; c = 0; pos = h->queue_head; - while ((pos != NULL) && (c < max_queue_size) && - (pos->priority >= queue_priority)) + while ( (NULL != pos) && + (c < max_queue_size) && + (pos->priority >= queue_priority) ) { c++; pos = pos->next; @@ -451,18 +511,17 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, gettext_noop ("# queue overflows"), 1, GNUNET_NO); + GNUNET_MQ_discard (env); return NULL; } - ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); - ret->h = h; - ret->response_proc = response_proc; - ret->qc = *qc; - ret->timeout = GNUNET_TIME_relative_to_absolute (timeout); - ret->priority = queue_priority; - ret->max_queue = max_queue_size; - ret->message_size = msize; - ret->was_transmitted = GNUNET_NO; - if (pos == NULL) + qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry); + qe->h = h; + qe->env = env; + qe->response_type = expected_type; + qe->qc = *qc; + qe->priority = queue_priority; + qe->max_queue = max_queue_size; + if (NULL == pos) { /* append at the tail */ pos = h->queue_tail; @@ -472,49 +531,23 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, pos = pos->prev; /* do not insert at HEAD if HEAD query was already * transmitted and we are still receiving replies! */ - if ((pos == NULL) && (h->queue_head->was_transmitted)) + if ( (NULL == pos) && + (NULL == h->queue_head->env) ) pos = h->queue_head; } c++; #if INSANE_STATISTICS GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"), - 1, GNUNET_NO); + 1, + GNUNET_NO); #endif GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, - ret); + qe); h->queue_size++; - ret->task = GNUNET_SCHEDULER_add_delayed (timeout, - &timeout_queue_entry, - ret); - for (pos = ret->next; NULL != pos; pos = pos->next) - { - if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO)) - { - GNUNET_assert (NULL != pos->response_proc); - /* move 'pos' element to head so that it will be - * killed on 'NULL' call below */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Dropping request from datastore queue\n"); - /* response_proc's expect request at the head of the queue! */ - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - pos); - GNUNET_CONTAINER_DLL_insert (h->queue_head, - h->queue_tail, - pos); - GNUNET_STATISTICS_update (h->stats, - gettext_noop - ("# Requests dropped from datastore queue"), 1, - GNUNET_NO); - GNUNET_assert (h->queue_head == pos); - pos->response_proc (h, NULL); - break; - } - } - return ret; + return qe; } @@ -525,333 +558,346 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, * @param h handle to the datastore */ static void -process_queue (struct GNUNET_DATASTORE_Handle *h); - - -/** - * Try reconnecting to the datastore service. - * - * @param cls the `struct GNUNET_DATASTORE_Handle` - */ -static void -try_reconnect (void *cls) +process_queue (struct GNUNET_DATASTORE_Handle *h) { - struct GNUNET_DATASTORE_Handle *h = cls; - - h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); - h->reconnect_task = NULL; - h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); - if (h->client == NULL) - { - LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n"); - return; - } - GNUNET_STATISTICS_update (h->stats, - gettext_noop - ("# datastore connections (re)created"), 1, - GNUNET_NO); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); - process_queue (h); -} - + struct GNUNET_DATASTORE_QueueEntry *qe; -/** - * Disconnect from the service and then try reconnecting to the datastore service - * after some delay. - * - * @param h handle to datastore to disconnect and reconnect - */ -static void -do_disconnect (struct GNUNET_DATASTORE_Handle *h) -{ - if (NULL == h->client) + if (NULL == (qe = h->queue_head)) { + /* no entry in queue */ LOG (GNUNET_ERROR_TYPE_DEBUG, - "Client NULL in disconnect, will not try to reconnect\n"); + "Queue empty\n"); return; } - GNUNET_CLIENT_disconnect (h->client); - h->skip_next_messages = 0; - h->client = NULL; - h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->retry_time, - &try_reconnect, - h); -} - - -/** - * Function called whenever we receive a message from - * the service. Calls the appropriate handler. - * - * @param cls the `struct GNUNET_DATASTORE_Handle` - * @param msg the received message - */ -static void -receive_cb (void *cls, - const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_DATASTORE_QueueEntry *qe; - - h->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving reply from datastore\n"); - if (h->skip_next_messages > 0) + if (NULL == qe->env) { - h->skip_next_messages--; - process_queue (h); + /* waiting for replies */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Head request already transmitted\n"); return; } - if (NULL == (qe = h->queue_head)) + if (NULL == h->mq) { - GNUNET_break (0); - process_queue (h); + /* waiting for reconnect */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Not connected\n"); return; } - qe->response_proc (h, msg); + GNUNET_MQ_send (h->mq, + qe->env); + qe->env = NULL; } + + /** - * Transmit request from queue to datastore service. + * Function called to check status message from the service. * - * @param cls the `struct GNUNET_DATASTORE_Handle` - * @param size number of bytes that can be copied to @a buf - * @param buf where to copy the drop message - * @return number of bytes written to @a buf + * @param cls closure + * @param sm status message received + * @return #GNUNET_OK if the message is well-formed */ -static size_t -transmit_request (void *cls, - size_t size, - void *buf) +static int +check_status (void *cls, + const struct StatusMessage *sm) { - struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_DATASTORE_QueueEntry *qe; - size_t msize; + uint16_t msize = ntohs (sm->header.size) - sizeof (*sm); + int32_t status = ntohl (sm->status); - h->th = NULL; - if (NULL == (qe = h->queue_head)) - return 0; /* no entry in queue */ - if (NULL == buf) + if (msize > 0) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to DATASTORE.\n"); - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# transmission request failures"), - 1, GNUNET_NO); - do_disconnect (h); - return 0; + const char *emsg = (const char *) &sm[1]; + + if ('\0' != emsg[msize - 1]) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } } - if (size < (msize = qe->message_size)) + else if (GNUNET_SYSERR == status) { - process_queue (h); - return 0; + GNUNET_break (0); + return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u byte request to DATASTORE\n", - msize); - memcpy (buf, &qe[1], msize); - qe->was_transmitted = GNUNET_YES; - GNUNET_SCHEDULER_cancel (qe->task); - qe->task = NULL; - GNUNET_assert (GNUNET_NO == h->in_receive); - h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, - &receive_cb, h, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); -#if INSANE_STATISTICS - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# bytes sent to datastore"), msize, - GNUNET_NO); -#endif - return msize; + return GNUNET_OK; } /** - * Process entries in the queue (or do nothing if we are already - * doing so). + * Function called to handle status message from the service. * - * @param h handle to the datastore + * @param cls closure + * @param sm status message received */ static void -process_queue (struct GNUNET_DATASTORE_Handle *h) +handle_status (void *cls, + const struct StatusMessage *sm) { + struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; + struct StatusContext rc; + const char *emsg; + int32_t status = ntohl (sm->status); - if (NULL == (qe = h->queue_head)) - { - /* no entry in queue */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queue empty\n"); - return; - } - if (GNUNET_YES == qe->was_transmitted) + if (h->skip_next_messages > 0) { - /* waiting for replies */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Head request already transmitted\n"); + h->skip_next_messages--; + process_queue (h); return; } - if (NULL != h->th) + if (NULL == (qe = h->queue_head)) { - /* request pending */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Pending transmission request\n"); + GNUNET_break (0); + do_disconnect (h); return; } - if (NULL == h->client) + if (NULL != qe->env) { - /* waiting for reconnect */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Not connected\n"); + GNUNET_break (0); + do_disconnect (h); return; } - if (GNUNET_YES == h->in_receive) + if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type) { - /* wait for response to previous query */ + GNUNET_break (0); + do_disconnect (h); return; } + rc = qe->qc.sc; + free_queue_entry (qe); + if (ntohs (sm->header.size) > sizeof (struct StatusMessage)) + emsg = (const char *) &sm[1]; + else + emsg = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queueing %u byte request to DATASTORE\n", - qe->message_size); - h->th - = GNUNET_CLIENT_notify_transmit_ready (h->client, - qe->message_size, - GNUNET_TIME_absolute_get_remaining (qe->timeout), - GNUNET_YES, - &transmit_request, - h); - GNUNET_assert (GNUNET_NO == h->in_receive); - GNUNET_break (NULL != h->th); -} - - -/** - * Dummy continuation used to do nothing (but be non-zero). - * - * @param cls closure - * @param result result - * @param min_expiration expiration time - * @param emsg error message - */ -static void -drop_status_cont (void *cls, int32_t result, - struct GNUNET_TIME_Absolute min_expiration, - const char *emsg) -{ - /* do nothing */ + "Received status %d/%s\n", + (int) status, + emsg); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# status messages received"), + 1, + GNUNET_NO); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + process_queue (h); + if (NULL != rc.cont) + rc.cont (rc.cont_cls, + status, + GNUNET_TIME_absolute_ntoh (sm->min_expiration), + emsg); } /** - * Free a queue entry. Removes the given entry from the - * queue and releases associated resources. Does NOT - * call the callback. + * Check data message we received from the service. * - * @param qe entry to free. + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param dm message received */ -static void -free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) -{ - struct GNUNET_DATASTORE_Handle *h = qe->h; - - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - if (qe->task != NULL) +static int +check_data (void *cls, + const struct DataMessage *dm) +{ + uint16_t msize = ntohs (dm->header.size) - sizeof (*dm); + + if (msize != ntohl (dm->size)) { - GNUNET_SCHEDULER_cancel (qe->task); - qe->task = NULL; + GNUNET_break (0); + return GNUNET_SYSERR; } - h->queue_size--; - qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ - GNUNET_free (qe); + return GNUNET_OK; } /** - * Type of a function to call when we receive a message - * from the service. + * Handle data message we got from the service. * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param dm message received */ static void -process_status_message (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_data (void *cls, + const struct DataMessage *dm) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; - struct StatusContext rc; - const struct StatusMessage *sm; - const char *emsg; - int32_t status; - int was_transmitted; + struct ResultContext rc; - if (NULL == (qe = h->queue_head)) + if (h->skip_next_messages > 0) + { + process_queue (h); + return; + } + qe = h->queue_head; + if (NULL == qe) { GNUNET_break (0); do_disconnect (h); return; } - rc = qe->qc.sc; - if (NULL == msg) + if (NULL != qe->env) { - was_transmitted = qe->was_transmitted; - free_queue_entry (qe); - if (was_transmitted == GNUNET_YES) - do_disconnect (h); - else - process_queue (h); - if (NULL != rc.cont) - rc.cont (rc.cont_cls, GNUNET_SYSERR, - GNUNET_TIME_UNIT_ZERO_ABS, - _("Failed to receive status response from database.")); + GNUNET_break (0); + do_disconnect (h); + return; + } + if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) + { + GNUNET_break (0); + do_disconnect (h); return; } - GNUNET_assert (GNUNET_YES == qe->was_transmitted); +#if INSANE_STATISTICS + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# Results received"), + 1, + GNUNET_NO); +#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received result %llu with type %u and size %u with key %s\n", + (unsigned long long) GNUNET_ntohll (dm->uid), + ntohl (dm->type), + ntohl (dm->size), + GNUNET_h2s (&dm->key)); + rc = qe->qc.rc; free_queue_entry (qe); - if ((ntohs (msg->size) < sizeof (struct StatusMessage)) || - (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS)) + h->retry_time = GNUNET_TIME_UNIT_ZERO; + process_queue (h); + if (NULL != rc.proc) + rc.proc (rc.proc_cls, + &dm->key, + ntohl (dm->size), + &dm[1], + ntohl (dm->type), + ntohl (dm->priority), + ntohl (dm->anonymity), + GNUNET_TIME_absolute_ntoh (dm->expiration), + GNUNET_ntohll (dm->uid)); +} + + +/** + * Type of a function to call when we receive a + * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service. + * + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param msg message received + */ +static void +handle_data_end (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct ResultContext rc; + + if (h->skip_next_messages > 0) + { + h->skip_next_messages--; + process_queue (h); + return; + } + qe = h->queue_head; + if (NULL == qe) { GNUNET_break (0); - h->retry_time = GNUNET_TIME_UNIT_ZERO; do_disconnect (h); - if (rc.cont != NULL) - rc.cont (rc.cont_cls, GNUNET_SYSERR, - GNUNET_TIME_UNIT_ZERO_ABS, - _("Error reading response from datastore service")); return; } - sm = (const struct StatusMessage *) msg; - status = ntohl (sm->status); - emsg = NULL; - if (ntohs (msg->size) > sizeof (struct StatusMessage)) + if (NULL != qe->env) { - emsg = (const char *) &sm[1]; - if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0') - { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); - } + GNUNET_break (0); + do_disconnect (h); + return; } - if ((status == GNUNET_SYSERR) && (emsg == NULL)) + if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) { GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); + do_disconnect (h); + return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg); + rc = qe->qc.rc; + free_queue_entry (qe); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received end of result set, new queue size is %u\n", + h->queue_size); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + h->result_count = 0; + process_queue (h); + /* signal end of iteration */ + if (NULL != rc.proc) + rc.proc (rc.proc_cls, + NULL, + 0, + NULL, + 0, + 0, + 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); +} + + +/** + * Try reconnecting to the datastore service. + * + * @param cls the `struct GNUNET_DATASTORE_Handle` + */ +static void +try_reconnect (void *cls) +{ + GNUNET_MQ_hd_var_size (status, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + struct StatusMessage); + GNUNET_MQ_hd_var_size (data, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + struct DataMessage); + GNUNET_MQ_hd_fixed_size (data_end, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END, + struct GNUNET_MessageHeader); + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_status_handler (h), + make_data_handler (h), + make_data_end_handler (h), + GNUNET_MQ_handler_end () + }; + + h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); + h->reconnect_task = NULL; + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connecT (h->cfg, + "datastore", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) + return; GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# status messages received"), 1, + gettext_noop ("# datastore connections (re)created"), + 1, GNUNET_NO); - h->retry_time = GNUNET_TIME_UNIT_ZERO; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Reconnected to DATASTORE\n"); process_queue (h); - if (rc.cont != NULL) - rc.cont (rc.cont_cls, status, - GNUNET_TIME_absolute_ntoh (sm->min_expiration), - emsg); +} + + +/** + * Dummy continuation used to do nothing (but be non-zero). + * + * @param cls closure + * @param result result + * @param min_expiration expiration time + * @param emsg error message + */ +static void +drop_status_cont (void *cls, + int32_t result, + struct GNUNET_TIME_Absolute min_expiration, + const char *emsg) +{ + /* do nothing */ } @@ -874,7 +920,6 @@ process_status_message (void *cls, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout timeout for the operation * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -894,33 +939,51 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct DataMessage *dm; - size_t msize; union QueueContext qc; + if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + return NULL; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to put %u bytes of data under key `%s' for %s\n", size, GNUNET_h2s (key), GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), GNUNET_YES)); - msize = sizeof (struct DataMessage) + size; - GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); + env = GNUNET_MQ_msg_extra (dm, + size, + GNUNET_MESSAGE_TYPE_DATASTORE_PUT); + dm->rid = htonl (rid); + dm->size = htonl ((uint32_t) size); + dm->type = htonl (type); + dm->priority = htonl (priority); + dm->anonymity = htonl (anonymity); + dm->replication = htonl (replication); + dm->reserved = htonl (0); + dm->uid = GNUNET_htonll (0); + dm->expiration = GNUNET_TIME_absolute_hton (expiration); + dm->key = *key; + memcpy (&dm[1], + data, + size); qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; qe = make_queue_entry (h, - msize, + env, queue_priority, max_queue_size, - timeout, - &process_status_message, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, &qc); - if (qe == NULL) + if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n"); @@ -930,20 +993,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, gettext_noop ("# PUT requests executed"), 1, GNUNET_NO); - dm = (struct DataMessage *) &qe[1]; - dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT); - dm->header.size = htons (msize); - dm->rid = htonl (rid); - dm->size = htonl ((uint32_t) size); - dm->type = htonl (type); - dm->priority = htonl (priority); - dm->anonymity = htonl (anonymity); - dm->replication = htonl (replication); - dm->reserved = htonl (0); - dm->uid = GNUNET_htonll (0); - dm->expiration = GNUNET_TIME_absolute_hton (expiration); - dm->key = *key; - memcpy (&dm[1], data, size); process_queue (h); return qe; } @@ -972,6 +1021,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct ReserveMessage *rm; union QueueContext qc; @@ -981,14 +1031,18 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, "Asked to reserve %llu bytes of data and %u entries\n", (unsigned long long) amount, (unsigned int) entries); + env = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); + rm->entries = htonl (entries); + rm->amount = GNUNET_htonll (amount); + qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; qe = make_queue_entry (h, - sizeof (struct ReserveMessage), + env, UINT_MAX, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_status_message, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, &qc); if (NULL == qe) { @@ -1000,11 +1054,6 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, gettext_noop ("# RESERVE requests executed"), 1, GNUNET_NO); - rm = (struct ReserveMessage *) &qe[1]; - rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); - rm->header.size = htons (sizeof (struct ReserveMessage)); - rm->entries = htonl (entries); - rm->amount = GNUNET_htonll (amount); process_queue (h); return qe; } @@ -1024,7 +1073,6 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -1036,29 +1084,31 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, uint32_t rid, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct ReleaseReserveMessage *rrm; union QueueContext qc; - if (cont == NULL) + if (NULL == cont) cont = &drop_status_cont; LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid); + env = GNUNET_MQ_msg (rrm, + GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); + rrm->rid = htonl (rid); qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; qe = make_queue_entry (h, - sizeof (struct ReleaseReserveMessage), + env, queue_priority, max_queue_size, - timeout, - &process_status_message, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, &qc); - if (qe == NULL) + if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry to release reserve\n"); @@ -1068,10 +1118,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, gettext_noop ("# RELEASE RESERVE requests executed"), 1, GNUNET_NO); - rrm = (struct ReleaseReserveMessage *) &qe[1]; - rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); - rrm->header.size = htons (sizeof (struct ReleaseReserveMessage)); - rrm->rid = htonl (rid); process_queue (h); return qe; } @@ -1087,7 +1133,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -1101,26 +1146,36 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct UpdateMessage *um; union QueueContext qc; - if (cont == NULL) + if (NULL == cont) cont = &drop_status_cont; LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to update entry %llu raising priority by %u and expiration to %s\n", uid, (unsigned int) priority, GNUNET_STRINGS_absolute_time_to_string (expiration)); + env = GNUNET_MQ_msg (um, + GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); + um->priority = htonl (priority); + um->expiration = GNUNET_TIME_absolute_hton (expiration); + um->uid = GNUNET_htonll (uid); + qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority, - max_queue_size, timeout, &process_status_message, &qc); - if (qe == NULL) + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for UPDATE\n"); @@ -1129,12 +1184,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, GNUNET_STATISTICS_update (h->stats, gettext_noop ("# UPDATE requests executed"), 1, GNUNET_NO); - um = (struct UpdateMessage *) &qe[1]; - um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); - um->header.size = htons (sizeof (struct UpdateMessage)); - um->priority = htonl (priority); - um->expiration = GNUNET_TIME_absolute_hton (expiration); - um->uid = GNUNET_htonll (uid); process_queue (h); return qe; } @@ -1154,7 +1203,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -1168,161 +1216,64 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, const void *data, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct DataMessage *dm; - size_t msize; + struct GNUNET_MQ_Envelope *env; union QueueContext qc; - if (cont == NULL) + if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + return NULL; + } + if (NULL == cont) cont = &drop_status_cont; LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n", size, GNUNET_h2s (key)); + env = GNUNET_MQ_msg_extra (dm, + size, + GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); + dm->rid = htonl (0); + dm->size = htonl (size); + dm->type = htonl (0); + dm->priority = htonl (0); + dm->anonymity = htonl (0); + dm->uid = GNUNET_htonll (0); + dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); + dm->key = *key; + memcpy (&dm[1], + data, + size); + qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - msize = sizeof (struct DataMessage) + size; - GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); + qe = make_queue_entry (h, - msize, + env, queue_priority, max_queue_size, - timeout, - &process_status_message, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, &qc); - if (qe == NULL) + if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n"); return NULL; } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# REMOVE requests executed"), 1, + gettext_noop ("# REMOVE requests executed"), + 1, GNUNET_NO); - dm = (struct DataMessage *) &qe[1]; - dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); - dm->header.size = htons (msize); - dm->rid = htonl (0); - dm->size = htonl (size); - dm->type = htonl (0); - dm->priority = htonl (0); - dm->anonymity = htonl (0); - dm->uid = GNUNET_htonll (0); - dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); - dm->key = *key; - memcpy (&dm[1], data, size); process_queue (h); return qe; } -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` - * @param msg message received, NULL on timeout or fatal error - */ -static void -process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_DATASTORE_QueueEntry *qe; - struct ResultContext rc; - const struct DataMessage *dm; - int was_transmitted; - - if (NULL == msg) - { - qe = h->queue_head; - GNUNET_assert (NULL != qe); - rc = qe->qc.rc; - was_transmitted = qe->was_transmitted; - free_queue_entry (qe); - if (GNUNET_YES == was_transmitted) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Failed to receive response from database.\n"); - do_disconnect (h); - } - else - { - process_queue (h); - } - if (NULL != rc.proc) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; - } - if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) - { - GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)); - qe = h->queue_head; - rc = qe->qc.rc; - GNUNET_assert (GNUNET_YES == qe->was_transmitted); - free_queue_entry (qe); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received end of result set, new queue size is %u\n", h->queue_size); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - h->result_count = 0; - process_queue (h); - if (NULL != rc.proc) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; - } - qe = h->queue_head; - GNUNET_assert (NULL != qe); - rc = qe->qc.rc; - if (GNUNET_YES != qe->was_transmitted) - { - GNUNET_break (0); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; - } - if ((ntohs (msg->size) < sizeof (struct DataMessage)) || - (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || - (ntohs (msg->size) != - sizeof (struct DataMessage) + - ntohl (((const struct DataMessage *) msg)->size))) - { - GNUNET_break (0); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; - } -#if INSANE_STATISTICS - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1, - GNUNET_NO); -#endif - dm = (const struct DataMessage *) msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received result %llu with type %u and size %u with key %s\n", - (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type), - ntohl (dm->size), GNUNET_h2s (&dm->key)); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - process_queue (h); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type), - ntohl (dm->priority), ntohl (dm->anonymity), - GNUNET_TIME_absolute_ntoh (dm->expiration), - GNUNET_ntohll (dm->uid)); -} - /** * Get a random value from the datastore for content replication. @@ -1335,7 +1286,6 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param proc function to call on a random value; it * will be called once with a value (if available) * and always once with a value of NULL. @@ -1347,23 +1297,27 @@ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct GNUNET_MessageHeader *m; union QueueContext qc; GNUNET_assert (NULL != proc); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get replication entry in %s\n", - GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES)); + "Asked to get replication entry\n"); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; - qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader), - queue_priority, max_queue_size, timeout, - &process_result_message, &qc); + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + &qc); if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1374,9 +1328,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, gettext_noop ("# GET REPLICATION requests executed"), 1, GNUNET_NO); - m = (struct GNUNET_MessageHeader *) &qe[1]; - m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); - m->size = htons (sizeof (struct GNUNET_MessageHeader)); process_queue (h); return qe; } @@ -1393,7 +1344,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param type allowed type for the operation (never zero) * @param proc function to call on a random value; it * will be called once with a value (if available) @@ -1407,31 +1357,32 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, enum GNUNET_BLOCK_Type type, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct GetZeroAnonymityMessage *m; union QueueContext qc; GNUNET_assert (NULL != proc); GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get %llu-th zero-anonymity entry of type %d in %s\n", + "Asked to get %llu-th zero-anonymity entry of type %d\n", (unsigned long long) offset, - type, - GNUNET_STRINGS_relative_time_to_string (timeout, - GNUNET_YES)); + type); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); + m->type = htonl ((uint32_t) type); + m->offset = GNUNET_htonll (offset); qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; qe = make_queue_entry (h, - sizeof (struct GetZeroAnonymityMessage), + env, queue_priority, max_queue_size, - timeout, - &process_result_message, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, &qc); if (NULL == qe) { @@ -1443,11 +1394,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, gettext_noop ("# GET ZERO ANONYMITY requests executed"), 1, GNUNET_NO); - m = (struct GetZeroAnonymityMessage *) &qe[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); - m->header.size = htons (sizeof (struct GetZeroAnonymityMessage)); - m->type = htonl ((uint32_t) type); - m->offset = GNUNET_htonll (offset); process_queue (h); return qe; } @@ -1467,7 +1413,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param proc function to call on each matching value; * will be called once with a NULL value at the end * @param proc_cls closure for @a proc @@ -1481,28 +1426,44 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; + struct GetKeyMessage *gkm; struct GetMessage *gm; union QueueContext qc; GNUNET_assert (NULL != proc); LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to look for data of type %u under key `%s'\n", - (unsigned int) type, GNUNET_h2s (key)); + (unsigned int) type, + GNUNET_h2s (key)); + if (NULL == key) + { + env = GNUNET_MQ_msg (gm, + GNUNET_MESSAGE_TYPE_DATASTORE_GET); + gm->type = htonl (type); + gm->offset = GNUNET_htonll (offset); + } + else + { + env = GNUNET_MQ_msg (gkm, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); + gkm->type = htonl (type); + gkm->offset = GNUNET_htonll (offset); + gkm->key = *key; + } qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; qe = make_queue_entry (h, - sizeof (struct GetMessage), + env, queue_priority, max_queue_size, - timeout, - &process_result_message, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, &qc); - if (qe == NULL) + if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n", @@ -1515,20 +1476,6 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, 1, GNUNET_NO); #endif - gm = (struct GetMessage *) &qe[1]; - gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET); - gm->type = htonl (type); - gm->offset = GNUNET_htonll (offset); - if (key != NULL) - { - gm->header.size = htons (sizeof (struct GetMessage)); - gm->key = *key; - } - else - { - gm->header.size = - htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode)); - } process_queue (h); return qe; } @@ -1543,16 +1490,14 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, void GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) { - struct GNUNET_DATASTORE_Handle *h; + struct GNUNET_DATASTORE_Handle *h = qe->h; - GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); - h = qe->h; LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending DATASTORE request %p cancelled (%d, %d)\n", qe, - qe->was_transmitted, + NULL == qe->env, h->queue_head == qe); - if (GNUNET_YES == qe->was_transmitted) + if (NULL == qe->env) { free_queue_entry (qe); h->skip_next_messages++; diff --git a/src/datastore/gnunet-datastore.c b/src/datastore/gnunet-datastore.c index ddca4ee06..b3d14c43c 100644 --- a/src/datastore/gnunet-datastore.c +++ b/src/datastore/gnunet-datastore.c @@ -137,7 +137,8 @@ do_finish (void *cls, static void do_put (void *cls, const struct GNUNET_HashCode *key, - size_t size, const void *data, + size_t size, + const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, @@ -158,7 +159,7 @@ do_put (void *cls, priority, anonymity, 0 /* FIXME: replication is lost... */, expiration, - 0, 1, GNUNET_TIME_UNIT_FOREVER_REL, + 0, 1, &do_finish, NULL); } @@ -173,7 +174,6 @@ do_get () offset, NULL, GNUNET_BLOCK_TYPE_ANY, 0, 1, - GNUNET_TIME_UNIT_FOREVER_REL, &do_put, NULL); } diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c index a67d1c772..57527991c 100644 --- a/src/datastore/gnunet-service-datastore.c +++ b/src/datastore/gnunet-service-datastore.c @@ -416,16 +416,20 @@ delete_expired (void *cls) * @param expiration expiration time for the content * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available - * - * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue + * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue * (continue on call to "next", of course), - * GNUNET_NO to delete the item and continue (if supported) + * #GNUNET_NO to delete the item and continue (if supported) */ static int -quota_processor (void *cls, const struct GNUNET_HashCode * key, uint32_t size, - const void *data, enum GNUNET_BLOCK_Type type, - uint32_t priority, uint32_t anonymity, - struct GNUNET_TIME_Absolute expiration, uint64_t uid) +quota_processor (void *cls, + const struct GNUNET_HashCode *key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) { unsigned long long *need = cls; @@ -473,12 +477,15 @@ manage_space (unsigned long long need) unsigned long long last; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to free up %llu bytes of cache space\n", need); + "Asked to free up %llu bytes of cache space\n", + need); last = 0; while ((need > 0) && (last != need)) { last = need; - plugin->api->get_expiration (plugin->api->cls, "a_processor, &need); + plugin->api->get_expiration (plugin->api->cls, + "a_processor, + &need); } } @@ -1068,7 +1075,7 @@ handle_put (void *cls, /** - * Handle GET-message. + * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message. * * @param cls closure * @param client identification of the client @@ -1080,28 +1087,52 @@ handle_get (void *cls, const struct GNUNET_MessageHeader *message) { const struct GetMessage *msg; - uint16_t size; - size = ntohs (message->size); - if ((size != sizeof (struct GetMessage)) && - (size != sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode))) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } msg = (const struct GetMessage *) message; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Processing GET request of type %u\n", + ntohl (msg->type)); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# GET requests received"), + 1, + GNUNET_NO); + GNUNET_SERVER_client_keep (client); + plugin->api->get_key (plugin->api->cls, + GNUNET_ntohll (msg->offset), + NULL, + NULL, + ntohl (msg->type), + &transmit_item, + client); +} + +/** + * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message. + * + * @param cls closure + * @param client identification of the client + * @param message the actual message + */ +static void +handle_get_key (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GetKeyMessage *msg; + + msg = (const struct GetKeyMessage *) message; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing GET request for `%s' of type %u\n", GNUNET_h2s (&msg->key), ntohl (msg->type)); GNUNET_STATISTICS_update (stats, - gettext_noop ("# GET requests received"), + gettext_noop ("# GET KEY requests received"), 1, GNUNET_NO); GNUNET_SERVER_client_keep (client); - if ( (size == sizeof (struct GetMessage)) && - (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)) ) + if (GNUNET_YES != + GNUNET_CONTAINER_bloomfilter_test (filter, + &msg->key)) { /* don't bother database... */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1112,14 +1143,19 @@ handle_get (void *cls, ("# requests filtered by bloomfilter"), 1, GNUNET_NO); - transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + transmit_item (client, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } - plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset), - ((size == - sizeof (struct GetMessage)) ? &msg->key : NULL), NULL, - ntohl (msg->type), &transmit_item, client); + plugin->api->get_key (plugin->api->cls, + GNUNET_ntohll (msg->offset), + &msg->key, + NULL, + ntohl (msg->type), + &transmit_item, + client); } @@ -1369,7 +1405,8 @@ disk_utilization_change_cb (void *cls, _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"), (long long) payload, (long long) -delta); - plugin->api->estimate_size (plugin->api->cls, &payload); + plugin->api->estimate_size (plugin->api->cls, + &payload); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("New payload: %lld\n"), (long long) payload); @@ -1474,7 +1511,10 @@ static const struct GNUNET_SERVER_MessageHandler handlers[] = { {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0}, {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, sizeof (struct UpdateMessage)}, - {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0}, + {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, + sizeof (struct GetMessage) }, + {&handle_get_key, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY, + sizeof (struct GetKeyMessage) }, {&handle_get_replication, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, sizeof (struct GNUNET_MessageHeader)}, @@ -1555,6 +1595,10 @@ process_stat_done (void *cls, "Failed to obtain value from statistics service, recomputing it\n"); plugin->api->estimate_size (plugin->api->cls, &payload); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("New payload: %lld\n"), + (long long) payload); + } if (GNUNET_YES == refresh_bf) { @@ -1624,7 +1668,13 @@ cleaning_task (void *cls) expired_kill_task = NULL; } if (GNUNET_YES == do_drop) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropping database!\n"); plugin->api->drop (plugin->api->cls); + payload = 0; + last_sync++; + } if (NULL != plugin) { unload_plugin (plugin); @@ -1651,7 +1701,8 @@ cleaning_task (void *cls) sync_stats (); if (NULL != stats) { - GNUNET_STATISTICS_destroy (stats, GNUNET_YES); + GNUNET_STATISTICS_destroy (stats, + GNUNET_YES); stats = NULL; } GNUNET_free (quota_stat_name); diff --git a/src/datastore/perf_datastore_api.c b/src/datastore/perf_datastore_api.c index 97774198c..4f1f99a5c 100644 --- a/src/datastore/perf_datastore_api.c +++ b/src/datastore/perf_datastore_api.c @@ -332,7 +332,6 @@ delete_value (void *cls, key, size, data, 1, 1, - TIMEOUT, &remove_next, crc)); } @@ -396,7 +395,6 @@ run_continuation (void *cls) (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), 1, 1, - TIMEOUT, &check_success, crc)); break; case RP_CUT: @@ -404,7 +402,6 @@ run_continuation (void *cls) GNUNET_assert (NULL != GNUNET_DATASTORE_get_for_replication (datastore, 1, 1, - TIMEOUT, &delete_value, crc)); break; @@ -466,7 +463,6 @@ run_continuation (void *cls) (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), 1, 1, - TIMEOUT, &check_success, crc)); break; @@ -573,7 +569,6 @@ run (void *cls, 0, 0, 0, GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS), 0, 1, - TIMEOUT, &run_tests, crc)) { FPRINTF (stderr, diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c index bae40d17d..0ae5c1a2e 100644 --- a/src/datastore/plugin_datastore_mysql.c +++ b/src/datastore/plugin_datastore_mysql.c @@ -180,7 +180,7 @@ struct Plugin #define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?" struct GNUNET_MYSQL_StatementHandle *dec_repl; -#define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090" +#define SELECT_SIZE "SELECT SUM(LENGTH(value)+256) FROM gn090" struct GNUNET_MYSQL_StatementHandle *get_size; #define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid "\ @@ -221,23 +221,22 @@ struct Plugin * * @param plugin plugin context * @param uid unique ID of the entry to delete - * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error + * @return #GNUNET_OK on success, #GNUNET_NO if no such value exists, #GNUNET_SYSERR on error */ static int -do_delete_entry (struct Plugin *plugin, unsigned long long uid) +do_delete_entry (struct Plugin *plugin, + unsigned long long uid) { int ret; uint64_t uid64 = (uint64_t) uid; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Deleting value %llu from gn090 table\n", - uid); - struct GNUNET_MY_QueryParam params_delete[] = { GNUNET_MY_query_param_uint64 (&uid64), GNUNET_MY_query_param_end }; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Deleting value %llu from gn090 table\n", + uid); ret = GNUNET_MY_exec_prepared (plugin->mc, plugin->delete_entry_by_uid, params_delete); @@ -247,7 +246,7 @@ do_delete_entry (struct Plugin *plugin, unsigned long long uid) } GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Deleting value %llu from gn090 table failed\n", - uid); + (unsigned long long) uid); return ret; } @@ -256,7 +255,7 @@ do_delete_entry (struct Plugin *plugin, unsigned long long uid) * Get an estimate of how much space the database is * currently using. * - * @param cls our "struct Plugin *" + * @param cls our `struct Plugin *` * @return number of bytes used on disk */ static void @@ -266,26 +265,33 @@ mysql_plugin_estimate_size (void *cls, struct Plugin *plugin = cls; uint64_t total; int ret; - struct GNUNET_MY_QueryParam params_get[] = { GNUNET_MY_query_param_end }; - struct GNUNET_MY_ResultSpec results_get[] = { GNUNET_MY_result_spec_uint64 (&total), GNUNET_MY_result_spec_end }; - ret = GNUNET_MY_exec_prepared (plugin->mc, plugin->get_size, params_get); - if (GNUNET_OK == ret) - { - if (GNUNET_OK == GNUNET_MY_extract_result (plugin->get_size, results_get)) - { - *estimate = (unsigned long long)total; - } - } - else - *estimate = 0; + ret = GNUNET_MY_exec_prepared (plugin->mc, + plugin->get_size, + params_get); + *estimate = 0; + total = UINT64_MAX; + if ( (GNUNET_OK == ret) && + (GNUNET_OK == + GNUNET_MY_extract_result (plugin->get_size, + results_get)) ) + { + *estimate = (unsigned long long) total; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Size estimate for MySQL payload is %lld\n", + (long long) total); + GNUNET_assert (UINT64_MAX != total); + GNUNET_break (GNUNET_NO == + GNUNET_MY_extract_result (plugin->get_size, + NULL)); + } } @@ -294,7 +300,7 @@ mysql_plugin_estimate_size (void *cls, * * @param cls closure * @param key key for the item - * @param size number of bytes in data + * @param size number of bytes in @a data * @param data content stored * @param type type of the content * @param priority priority of the content @@ -302,7 +308,7 @@ mysql_plugin_estimate_size (void *cls, * @param replication replication-level for the content * @param expiration expiration time for the content * @param cont continuation called with success or failure status - * @param cont_cls continuation closure + * @param cont_cls closure for @a cont */ static void mysql_plugin_put (void *cls, @@ -318,12 +324,9 @@ mysql_plugin_put (void *cls, void *cont_cls) { struct Plugin *plugin = cls; - uint64_t lexpiration = expiration.abs_value_us; uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); - unsigned long lsize = 0; - struct GNUNET_HashCode vhash; struct GNUNET_MY_QueryParam params_insert[] = { GNUNET_MY_query_param_uint32 (&replication), @@ -334,7 +337,7 @@ mysql_plugin_put (void *cls, GNUNET_MY_query_param_uint64 (&lrvalue), GNUNET_MY_query_param_auto_from_type (key), GNUNET_MY_query_param_auto_from_type (&vhash), - GNUNET_MY_query_param_fixed_size (data, lsize), + GNUNET_MY_query_param_fixed_size (data, size), GNUNET_MY_query_param_end }; @@ -344,7 +347,6 @@ mysql_plugin_put (void *cls, cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large")); return; } - lsize = size; GNUNET_CRYPTO_hash (data, size, &vhash); @@ -354,15 +356,28 @@ mysql_plugin_put (void *cls, plugin->insert_entry, params_insert)) { - cont (cont_cls, key, size, GNUNET_SYSERR, _("MySQL statement run failure")); + cont (cont_cls, + key, + size, + GNUNET_SYSERR, + _("MySQL statement run failure")); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Inserted value `%s' with size %u into gn090 table\n", - GNUNET_h2s (key), (unsigned int) size); + GNUNET_h2s (key), + (unsigned int) size); if (size > 0) - plugin->env->duc (plugin->env->cls, size); - cont (cont_cls, key, size, GNUNET_OK, NULL); + plugin->env->duc (plugin->env->cls, + size); + GNUNET_break (GNUNET_NO == + GNUNET_MY_extract_result (plugin->insert_entry, + NULL)); + cont (cont_cls, + key, + size, + GNUNET_OK, + NULL); } @@ -390,18 +405,22 @@ mysql_plugin_put (void *cls, * @param cons_cls continuation closure */ static void -mysql_plugin_update (void *cls, uint64_t uid, int delta, +mysql_plugin_update (void *cls, + uint64_t uid, + int delta, struct GNUNET_TIME_Absolute expire, - PluginUpdateCont cont, void *cont_cls) + PluginUpdateCont cont, + void *cont_cls) { struct Plugin *plugin = cls; - uint32_t idelta = (uint32_t)delta; + uint32_t idelta = (uint32_t) delta; uint64_t lexpire = expire.abs_value_us; int ret; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Updating value %llu adding %d to priority and maxing exp at %s\n", - (unsigned long long)uid, delta, + (unsigned long long) uid, + delta, GNUNET_STRINGS_absolute_time_to_string (expire)); struct GNUNET_MY_QueryParam params_update[] = { @@ -416,12 +435,21 @@ mysql_plugin_update (void *cls, uint64_t uid, int delta, plugin->update_entry, params_update); - if (ret != GNUNET_OK) + if (GNUNET_OK != ret) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update value %llu\n", + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to update value %llu\n", (unsigned long long) uid); } - cont (cont_cls, ret, NULL); + else + { + GNUNET_break (GNUNET_NO == + GNUNET_MY_extract_result (plugin->update_entry, + NULL)); + } + cont (cont_cls, + ret, + NULL); } @@ -432,7 +460,7 @@ mysql_plugin_update (void *cls, uint64_t uid, int delta, * @param plugin the plugin handle * @param stmt select statement to run * @param proc function to call on result - * @param proc_cls closure for proc + * @param proc_cls closure for @a proc * @param params_select arguments to initialize stmt */ static void @@ -474,7 +502,7 @@ execute_select (struct Plugin *plugin, ret = GNUNET_MY_extract_result (stmt, results_select); - if (ret <= 0) + if (GNUNET_OK != ret) { proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); @@ -489,6 +517,9 @@ execute_select (struct Plugin *plugin, (unsigned int) anonymity, GNUNET_STRINGS_absolute_time_to_string (expiration)); GNUNET_assert (value_size < MAX_DATUM_SIZE); + GNUNET_break (GNUNET_NO == + GNUNET_MY_extract_result (stmt, + NULL)); ret = proc (proc_cls, &key, value_size, @@ -498,7 +529,8 @@ execute_select (struct Plugin *plugin, anonymity, expiration, uid); - if (ret == GNUNET_NO) + GNUNET_MY_cleanup_result (results_select); + if (GNUNET_NO == ret) { do_delete_entry (plugin, uid); if (0 != value_size) @@ -538,16 +570,15 @@ mysql_plugin_get_key (void *cls, struct Plugin *plugin = cls; int ret; uint64_t total; - - total = -1; struct GNUNET_MY_ResultSpec results_get[] = { GNUNET_MY_result_spec_uint64 (&total), GNUNET_MY_result_spec_end }; - if (type != 0) + total = UINT64_MAX; + if (0 != type) { - if (vhash != NULL) + if (NULL != vhash) { struct GNUNET_MY_QueryParam params_get[] = { GNUNET_MY_query_param_auto_from_type (key), @@ -560,9 +591,15 @@ mysql_plugin_get_key (void *cls, GNUNET_MY_exec_prepared (plugin->mc, plugin->count_entry_by_hash_vhash_and_type, params_get); - ret = - GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type, - results_get); + GNUNET_break (GNUNET_OK == ret); + if (GNUNET_OK == ret) + ret = + GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type, + results_get); + if (GNUNET_OK == ret) + GNUNET_break (GNUNET_NO == + GNUNET_MY_extract_result (plugin->count_entry_by_hash_vhash_and_type, + NULL)); } else { @@ -576,14 +613,20 @@ mysql_plugin_get_key (void *cls, GNUNET_MY_exec_prepared (plugin->mc, plugin->count_entry_by_hash_and_type, params_get); - ret = - GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type, - results_get); + GNUNET_break (GNUNET_OK == ret); + if (GNUNET_OK == ret) + ret = + GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type, + results_get); + if (GNUNET_OK == ret) + GNUNET_break (GNUNET_NO == + GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_type, + NULL)); } } else { - if (vhash != NULL) + if (NULL != vhash) { struct GNUNET_MY_QueryParam params_get[] = { GNUNET_MY_query_param_auto_from_type (key), @@ -595,9 +638,15 @@ mysql_plugin_get_key (void *cls, GNUNET_MY_exec_prepared (plugin->mc, plugin->count_entry_by_hash_and_vhash, params_get); - ret = - GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash, - results_get); + GNUNET_break (GNUNET_OK == ret); + if (GNUNET_OK == ret) + ret = + GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash, + results_get); + if (GNUNET_OK == ret) + GNUNET_break (GNUNET_NO == + GNUNET_MY_extract_result (plugin->count_entry_by_hash_and_vhash, + NULL)); } else { @@ -610,12 +659,19 @@ mysql_plugin_get_key (void *cls, GNUNET_MY_exec_prepared (plugin->mc, plugin->count_entry_by_hash, params_get); - ret = - GNUNET_MY_extract_result (plugin->count_entry_by_hash, - results_get); + GNUNET_break (GNUNET_OK == ret); + if (GNUNET_OK == ret) + ret = + GNUNET_MY_extract_result (plugin->count_entry_by_hash, + results_get); + if (GNUNET_OK == ret) + GNUNET_break (GNUNET_NO == + GNUNET_MY_extract_result (plugin->count_entry_by_hash, + NULL)); } } - if ((ret != GNUNET_OK) || (0 >= total)) + if ( (GNUNET_OK != ret) || + (0 >= total) ) { proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; @@ -640,7 +696,8 @@ mysql_plugin_get_key (void *cls, execute_select (plugin, plugin->select_entry_by_hash_vhash_and_type, - proc, proc_cls, + proc, + proc_cls, params_select); } else @@ -654,7 +711,8 @@ mysql_plugin_get_key (void *cls, execute_select (plugin, plugin->select_entry_by_hash_and_type, - proc, proc_cls, + proc, + proc_cls, params_select); } } @@ -671,7 +729,8 @@ mysql_plugin_get_key (void *cls, execute_select (plugin, plugin->select_entry_by_hash_and_vhash, - proc, proc_cls, + proc, + proc_cls, params_select); } else @@ -684,28 +743,31 @@ mysql_plugin_get_key (void *cls, execute_select (plugin, plugin->select_entry_by_hash, - proc, proc_cls, + proc, + proc_cls, params_select); } } - + } /** * Get a zero-anonymity datum from the datastore. * - * @param cls our "struct Plugin*" + * @param cls our `struct Plugin *` * @param offset offset of the result * @param type entries of which type should be considered? * Use 0 for any type. * @param proc function to call on a matching value or NULL - * @param proc_cls closure for iter + * @param proc_cls closure for @a proc */ static void -mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset, +mysql_plugin_get_zero_anonymity (void *cls, + uint64_t offset, enum GNUNET_BLOCK_Type type, - PluginDatumProcessor proc, void *proc_cls) + PluginDatumProcessor proc, + void *proc_cls) { struct Plugin *plugin = cls; uint32_t typei = (uint32_t) type; @@ -719,8 +781,10 @@ mysql_plugin_get_zero_anonymity (void *cls, uint64_t offset, GNUNET_MY_query_param_end }; - execute_select (plugin, plugin->zero_iter, - proc, proc_cls, + execute_select (plugin, + plugin->zero_iter, + proc, + proc_cls, params_zero_iter); } @@ -749,13 +813,13 @@ struct ReplCtx /** - * Wrapper for the processor for 'mysql_plugin_get_replication'. + * Wrapper for the processor for #mysql_plugin_get_replication(). * Decrements the replication counter and calls the original * iterator. * * @param cls closure * @param key key for the content - * @param size number of bytes in data + * @param size number of bytes in @a data * @param data content stored * @param type type of the content * @param priority priority of the content @@ -763,19 +827,18 @@ struct ReplCtx * @param expiration expiration time for the content * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available - * - * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue + * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue * (continue on call to "next", of course), - * GNUNET_NO to delete the item and continue (if supported) + * #GNUNET_NO to delete the item and continue (if supported) */ static int repl_proc (void *cls, - const struct GNUNET_HashCode * key, + const struct GNUNET_HashCode *key, uint32_t size, - const void *data, - enum GNUNET_BLOCK_Type type, + const void *data, + enum GNUNET_BLOCK_Type type, uint32_t priority, - uint32_t anonymity, + uint32_t anonymity, struct GNUNET_TIME_Absolute expiration, uint64_t uid) { @@ -784,21 +847,26 @@ repl_proc (void *cls, int ret; int iret; - ret = - rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity, - expiration, uid); + ret = rc->proc (rc->proc_cls, + key, + size, + data, + type, + priority, + anonymity, + expiration, + uid); if (NULL != key) { - struct GNUNET_MY_QueryParam params_proc[] = { - GNUNET_MY_query_param_uint64 (&uid), - GNUNET_MY_query_param_end - }; - - iret = - GNUNET_MY_exec_prepared (plugin->mc, - plugin->dec_repl, - params_proc); - if (iret == GNUNET_SYSERR) + struct GNUNET_MY_QueryParam params_proc[] = { + GNUNET_MY_query_param_uint64 (&uid), + GNUNET_MY_query_param_end + }; + + iret = GNUNET_MY_exec_prepared (plugin->mc, + plugin->dec_repl, + params_proc); + if (GNUNET_SYSERR == iret) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to reduce replication counter\n"); @@ -813,35 +881,29 @@ repl_proc (void *cls, * Get a random item for replication. Returns a single, not expired, * random item from those with the highest replication counters. The * item's replication counter is decremented by one IF it was positive - * before. Call 'proc' with all values ZERO or NULL if the datastore + * before. Call @a proc with all values ZERO or NULL if the datastore * is empty. * * @param cls closure * @param proc function to call the value (once only). - * @param proc_cls closure for proc + * @param proc_cls closure for @a proc */ static void -mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc, +mysql_plugin_get_replication (void *cls, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; uint64_t rvalue; uint32_t repl; - struct ReplCtx rc; - rc.plugin = plugin; - rc.proc = proc; - rc.proc_cls = proc_cls; - struct GNUNET_MY_QueryParam params_get[] = { GNUNET_MY_query_param_end }; - struct GNUNET_MY_ResultSpec results_get[] = { GNUNET_MY_result_spec_uint32 (&repl), GNUNET_MY_result_spec_end }; - struct GNUNET_MY_QueryParam params_select[] = { GNUNET_MY_query_param_uint32 (&repl), GNUNET_MY_query_param_uint64 (&rvalue), @@ -850,27 +912,36 @@ mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc, GNUNET_MY_query_param_end }; + rc.plugin = plugin; + rc.proc = proc; + rc.proc_cls = proc_cls; + if (1 != - GNUNET_MY_exec_prepared (plugin->mc, plugin->max_repl, params_get)) + GNUNET_MY_exec_prepared (plugin->mc, + plugin->max_repl, + params_get)) { proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } - if (1 != - GNUNET_MY_extract_result (plugin->max_repl, results_get)) + if (GNUNET_OK != + GNUNET_MY_extract_result (plugin->max_repl, + results_get)) { - proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); - return; + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + return; } + GNUNET_break (GNUNET_NO == + GNUNET_MY_extract_result (plugin->max_repl, + NULL)); + rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT64_MAX); - rvalue = - (unsigned long long) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT64_MAX); - - execute_select (plugin, + execute_select (plugin, plugin->select_replication, - &repl_proc, &rc, + &repl_proc, + &rc, params_select); } @@ -880,69 +951,91 @@ mysql_plugin_get_replication (void *cls, PluginDatumProcessor proc, * * @param cls closure * @param proc function to call on each key - * @param proc_cls closure for proc + * @param proc_cls closure for @a proc */ static void mysql_plugin_get_keys (void *cls, - PluginKeyProcessor proc, - void *proc_cls) + PluginKeyProcessor proc, + void *proc_cls) { struct Plugin *plugin = cls; - char *query = "SELECT hash FROM gn090"; int ret; MYSQL_STMT *statement; - struct GNUNET_MYSQL_StatementHandle *statements_handle_select = NULL; - - + unsigned int cnt; struct GNUNET_HashCode key; - - statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys); - - statements_handle_select = GNUNET_MYSQL_statement_prepare (plugin->mc, - query); - GNUNET_assert (proc != NULL); - + struct GNUNET_HashCode last; struct GNUNET_MY_QueryParam params_select[] = { GNUNET_MY_query_param_end }; - struct GNUNET_MY_ResultSpec results_select[] = { GNUNET_MY_result_spec_auto_from_type (&key), GNUNET_MY_result_spec_end }; - if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, - statements_handle_select, - params_select)) + GNUNET_assert (NULL != proc); + statement = GNUNET_MYSQL_statement_get_stmt (plugin->get_all_keys); + if (GNUNET_OK != + GNUNET_MY_exec_prepared (plugin->mc, + plugin->get_all_keys, + params_select)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("`%s' for `%s' failed at %s:%d with error: %s\n"), - "mysql_stmt_execute", query, __FILE__, __LINE__, + "mysql_stmt_execute", + GET_ALL_KEYS, + __FILE__, + __LINE__, mysql_stmt_error (statement)); GNUNET_MYSQL_statements_invalidate (plugin->mc); proc (proc_cls, NULL, 0); return; } - - ret = GNUNET_MY_extract_result (statements_handle_select, - results_select); - - if (ret != MYSQL_NO_DATA) + ret = GNUNET_YES; + cnt = 0; + while (ret == GNUNET_YES) + { + ret = GNUNET_MY_extract_result (plugin->get_all_keys, + results_select); + if (0 != memcmp (&last, + &key, + sizeof (key))) + { + if (0 != cnt) + proc (proc_cls, + &last, + cnt); + cnt = 1; + last = key; + } + else + { + cnt++; + } + } + if (0 != cnt) + proc (proc_cls, + &last, + cnt); + /* finally, let app know we are done */ + proc (proc_cls, + NULL, + 0); + if (GNUNET_SYSERR == ret) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), - "mysql_stmt_fetch", __FILE__, __LINE__, - mysql_stmt_error (statement)); + "mysql_stmt_fetch", + __FILE__, + __LINE__, + mysql_stmt_error (statement)); GNUNET_MYSQL_statements_invalidate (plugin->mc); return; } - - mysql_stmt_reset (statement); } /** - * Context for 'expi_proc' function. + * Context for #expi_proc() function. */ struct ExpiCtx { @@ -958,7 +1051,7 @@ struct ExpiCtx PluginDatumProcessor proc; /** - * Closure for proc. + * Closure for @e proc. */ void *proc_cls; }; @@ -966,7 +1059,7 @@ struct ExpiCtx /** - * Wrapper for the processor for 'mysql_plugin_get_expiration'. + * Wrapper for the processor for #mysql_plugin_get_expiration(). * If no expired value was found, we do a second query for * low-priority content. * @@ -980,83 +1073,94 @@ struct ExpiCtx * @param expiration expiration time for the content * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available - * - * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue + * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue * (continue on call to "next", of course), - * GNUNET_NO to delete the item and continue (if supported) + * #GNUNET_NO to delete the item and continue (if supported) */ static int expi_proc (void *cls, - const struct GNUNET_HashCode * key, + const struct GNUNET_HashCode *key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, - uint32_t anonymity, + uint32_t anonymity, struct GNUNET_TIME_Absolute expiration, uint64_t uid) { struct ExpiCtx *rc = cls; struct Plugin *plugin = rc->plugin; - struct GNUNET_MY_QueryParam params_select[] = { GNUNET_MY_query_param_end }; if (NULL == key) { - execute_select (plugin, plugin->select_priority, rc->proc, rc->proc_cls, + execute_select (plugin, + plugin->select_priority, + rc->proc, + rc->proc_cls, params_select); return GNUNET_SYSERR; } - return rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity, - expiration, uid); + return rc->proc (rc->proc_cls, + key, + size, + data, + type, + priority, + anonymity, + expiration, + uid); } /** * Get a random item for expiration. - * Call 'proc' with all values ZERO or NULL if the datastore is empty. + * Call @a proc with all values ZERO or NULL if the datastore is empty. * * @param cls closure * @param proc function to call the value (once only). - * @param proc_cls closure for proc + * @param proc_cls closure for @a proc */ static void -mysql_plugin_get_expiration (void *cls, PluginDatumProcessor proc, +mysql_plugin_get_expiration (void *cls, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; - uint64_t nt; + struct GNUNET_TIME_Absolute now; + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_absolute_time (&now), + GNUNET_MY_query_param_end + }; struct ExpiCtx rc; rc.plugin = plugin; rc.proc = proc; rc.proc_cls = proc_cls; - nt = GNUNET_TIME_absolute_get ().abs_value_us; - - struct GNUNET_MY_QueryParam params_select[] = { - GNUNET_MY_query_param_uint64 (&nt), - GNUNET_MY_query_param_end - }; - - execute_select (plugin, plugin->select_expiration, expi_proc, &rc, + now = GNUNET_TIME_absolute_get (); + execute_select (plugin, + plugin->select_expiration, + expi_proc, + &rc, params_select); - } /** * Drop database. * - * @param cls the "struct Plugin*" + * @param cls the `struct Plugin *` */ static void mysql_plugin_drop (void *cls) { struct Plugin *plugin = cls; - if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "DROP TABLE gn090")) + if (GNUNET_OK != + GNUNET_MYSQL_statement_run (plugin->mc, + "DROP TABLE gn090")) return; /* error */ plugin->env->duc (plugin->env->cls, 0); } @@ -1065,8 +1169,8 @@ mysql_plugin_drop (void *cls) /** * Entry point for the plugin. * - * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*" - * @return our "struct Plugin*" + * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment *` + * @return our `struct Plugin *` */ void * libgnunet_plugin_datastore_mysql_init (void *cls) @@ -1077,7 +1181,8 @@ libgnunet_plugin_datastore_mysql_init (void *cls) plugin = GNUNET_new (struct Plugin); plugin->env = env; - plugin->mc = GNUNET_MYSQL_context_create (env->cfg, "datastore-mysql"); + plugin->mc = GNUNET_MYSQL_context_create (env->cfg, + "datastore-mysql"); if (NULL == plugin->mc) { GNUNET_free (plugin); @@ -1155,7 +1260,8 @@ libgnunet_plugin_datastore_mysql_init (void *cls) /** * Exit point from the plugin. - * @param cls our "struct Plugin*" + * + * @param cls our `struct Plugin *` * @return always NULL */ void * diff --git a/src/datastore/test_datastore_api.c b/src/datastore/test_datastore_api.c index 7d4565de6..6ebfee01e 100644 --- a/src/datastore/test_datastore_api.c +++ b/src/datastore/test_datastore_api.c @@ -412,7 +412,7 @@ run_continuation (void *cls) GNUNET_DATASTORE_put (datastore, 0, &crc->key, get_size (crc->i), get_data (crc->i), get_type (crc->i), get_priority (crc->i), get_anonymity (crc->i), 0, - get_expiration (crc->i), 1, 1, TIMEOUT, + get_expiration (crc->i), 1, 1, &check_success, crc); crc->i++; if (crc->i == ITERATIONS) @@ -423,10 +423,17 @@ run_continuation (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing GET number %u\n", crc->i); - GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key, - get_type (crc->i), 1, 1, TIMEOUT, - &check_value, crc); + GNUNET_CRYPTO_hash (&crc->i, + sizeof (int), + &crc->key); + GNUNET_DATASTORE_get_key (datastore, + crc->offset, + &crc->key, + get_type (crc->i), + 1, + 1, + &check_value, + crc); break; case RP_DEL: crc->i--; @@ -436,9 +443,14 @@ run_continuation (void *cls) crc->data = NULL; GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); GNUNET_assert (NULL != - GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key, - get_type (crc->i), 1, 1, TIMEOUT, - &delete_value, crc)); + GNUNET_DATASTORE_get_key (datastore, + crc->offset, + &crc->key, + get_type (crc->i), + 1, + 1, + &delete_value, + crc)); break; case RP_DO_DEL: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -455,7 +467,7 @@ run_continuation (void *cls) } GNUNET_assert (NULL != GNUNET_DATASTORE_remove (datastore, &crc->key, crc->size, - crc->data, 1, 1, TIMEOUT, + crc->data, 1, 1, &check_success, crc)); break; case RP_DELVALIDATE: @@ -466,7 +478,7 @@ run_continuation (void *cls) GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); GNUNET_assert (NULL != GNUNET_DATASTORE_get_key (datastore, crc->offset, &crc->key, - get_type (crc->i), 1, 1, TIMEOUT, + get_type (crc->i), 1, 1, &check_nothing, crc)); break; case RP_RESERVE: @@ -479,7 +491,7 @@ run_continuation (void *cls) GNUNET_DATASTORE_put (datastore, crc->rid, &crc->key, get_size (42), get_data (42), get_type (42), get_priority (42), get_anonymity (42), 0, get_expiration (42), 1, 1, - TIMEOUT, &check_success, crc); + &check_success, crc); break; case RP_PUT_MULTIPLE_NEXT: crc->phase = RP_GET_MULTIPLE; @@ -493,7 +505,6 @@ run_continuation (void *cls) 0, get_expiration (43), 1, 1, - TIMEOUT, &check_success, crc); break; case RP_GET_MULTIPLE: @@ -502,7 +513,6 @@ run_continuation (void *cls) crc->offset, &crc->key, get_type (42), 1, 1, - TIMEOUT, &check_multiple, crc)); break; case RP_GET_MULTIPLE_NEXT: @@ -512,7 +522,6 @@ run_continuation (void *cls) &crc->key, get_type (42), 1, 1, - TIMEOUT, &check_multiple, crc)); break; case RP_UPDATE: @@ -521,7 +530,7 @@ run_continuation (void *cls) GNUNET_DATASTORE_update (datastore, crc->uid, 100, get_expiration (42), 1, - 1, TIMEOUT, + 1, &check_success, crc); break; case RP_UPDATE_VALIDATE: @@ -531,7 +540,6 @@ run_continuation (void *cls) &crc->key, get_type (42), 1, 1, - TIMEOUT, &check_update, crc)); break; case RP_DONE: @@ -631,7 +639,6 @@ run (void *cls, GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS), 0, 1, - TIMEOUT, &run_tests, crc)) { FPRINTF (stderr, diff --git a/src/datastore/test_datastore_api_management.c b/src/datastore/test_datastore_api_management.c index c9fec79e3..954e61bec 100644 --- a/src/datastore/test_datastore_api_management.c +++ b/src/datastore/test_datastore_api_management.c @@ -193,10 +193,18 @@ run_continuation (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing `%s' number %u\n", "PUT", crc->i); GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_put (datastore, 0, &crc->key, get_size (crc->i), - get_data (crc->i), get_type (crc->i), - get_priority (crc->i), get_anonymity (crc->i), 0, - get_expiration (crc->i), 1, 1, TIMEOUT, + GNUNET_DATASTORE_put (datastore, + 0, + &crc->key, + get_size (crc->i), + get_data (crc->i), + get_type (crc->i), + get_priority (crc->i), + get_anonymity (crc->i), + 0, + get_expiration (crc->i), + 1, + 1, &check_success, crc); crc->i++; if (crc->i == ITERATIONS) @@ -213,7 +221,8 @@ run_continuation (void *cls) crc->i); GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key, - get_type (crc->i), 1, 1, TIMEOUT, &check_value, + get_type (crc->i), 1, 1, + &check_value, crc); break; case RP_GET_FAIL: @@ -221,7 +230,8 @@ run_continuation (void *cls) crc->i); GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); GNUNET_DATASTORE_get_key (datastore, crc->offset++, &crc->key, - get_type (crc->i), 1, 1, TIMEOUT, &check_nothing, + get_type (crc->i), 1, 1, + &check_nothing, crc); break; case RP_DONE: @@ -266,11 +276,18 @@ run (void *cls, now = GNUNET_TIME_absolute_get (); datastore = GNUNET_DATASTORE_connect (cfg); if (NULL == - GNUNET_DATASTORE_put (datastore, 0, &zkey, 4, "TEST", - GNUNET_BLOCK_TYPE_TEST, 0, 0, 0, - GNUNET_TIME_relative_to_absolute - (GNUNET_TIME_UNIT_SECONDS), 0, 1, - GNUNET_TIME_UNIT_MINUTES, &run_tests, crc)) + GNUNET_DATASTORE_put (datastore, + 0, + &zkey, + 4, + "TEST", + GNUNET_BLOCK_TYPE_TEST, + 0, 0, 0, + GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS), + 0, + 1, + &run_tests, + crc)) { FPRINTF (stderr, "%s", "Test 'put' operation failed.\n"); GNUNET_free (crc); diff --git a/src/datastore/test_plugin_datastore.c b/src/datastore/test_plugin_datastore.c index 9ec0c53a2..9b85d57da 100644 --- a/src/datastore/test_plugin_datastore.c +++ b/src/datastore/test_plugin_datastore.c @@ -109,11 +109,14 @@ put_continuation (void *cls, if (GNUNET_OK != status) { - FPRINTF (stderr, "ERROR: `%s'\n", msg); + FPRINTF (stderr, + "ERROR: `%s'\n", + msg); } else { - crc->api->estimate_size (crc->api->cls, &cs); + crc->api->estimate_size (crc->api->cls, + &cs); GNUNET_assert (os <= cs); os = cs; stored_bytes += size; @@ -184,22 +187,30 @@ static uint64_t guid; static int -iterate_one_shot (void *cls, const struct GNUNET_HashCode * key, uint32_t size, - const void *data, enum GNUNET_BLOCK_Type type, - uint32_t priority, uint32_t anonymity, - struct GNUNET_TIME_Absolute expiration, uint64_t uid) +iterate_one_shot (void *cls, + const struct GNUNET_HashCode *key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) { struct CpsRunContext *crc = cls; - GNUNET_assert (key != NULL); + GNUNET_assert (NULL != key); guid = uid; crc->phase++; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found result type=%u, priority=%u, size=%u, expire=%s, key %s\n", - type, priority, size, + (unsigned int) type, + (unsigned int) priority, + (unsigned int) size, GNUNET_STRINGS_absolute_time_to_string (expiration), GNUNET_h2s (key)); - GNUNET_SCHEDULER_add_now (&test, crc); + GNUNET_SCHEDULER_add_now (&test, + crc); return GNUNET_OK; } @@ -219,11 +230,14 @@ unload_plugin (struct GNUNET_DATASTORE_PluginFunctions *api, char *libname; if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_string (cfg, "DATASTORE", "DATABASE", + GNUNET_CONFIGURATION_get_value_string (cfg, + "DATASTORE", + "DATABASE", &name)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("No `%s' specified for `%s' in configuration!\n"), "DATABASE", + _("No `%s' specified for `%s' in configuration!\n"), + "DATABASE", "DATASTORE"); return; } @@ -290,8 +304,16 @@ test (void *cls) break; } gen_key (5, &key); - crc->api->get_key (crc->api->cls, crc->offset++, &key, NULL, - GNUNET_BLOCK_TYPE_ANY, &iterate_one_shot, crc); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Looking for %s\n", + GNUNET_h2s (&key)); + crc->api->get_key (crc->api->cls, + crc->offset++, + &key, + NULL, + GNUNET_BLOCK_TYPE_ANY, + &iterate_one_shot, + crc); break; case RP_UPDATE: crc->api->update (crc->api->cls, diff --git a/src/datastore/test_plugin_datastore_data_mysql.conf b/src/datastore/test_plugin_datastore_data_mysql.conf index ac7a3cde1..18eda687e 100644 --- a/src/datastore/test_plugin_datastore_data_mysql.conf +++ b/src/datastore/test_plugin_datastore_data_mysql.conf @@ -6,5 +6,4 @@ GNUNET_TEST_HOME = /tmp/test-gnunet-datastore-plugin-mysql/ DATABASE = mysql [datastore-mysql] -DATABASE = gnunet - +DATABASE = gnunetcheck diff --git a/src/fs/fs_publish.c b/src/fs/fs_publish.c index 530a7ac9b..89cc2714c 100644 --- a/src/fs/fs_publish.c +++ b/src/fs/fs_publish.c @@ -266,7 +266,6 @@ publish_sblocks_cont (void *cls, { pc->qre = GNUNET_DATASTORE_release_reserve (pc->dsh, pc->rid, UINT_MAX, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, &finish_release_reserve, pc); } else @@ -526,7 +525,6 @@ block_proc (void *cls, p->bo.replication_level, p->bo.expiration_time, -2, 1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, &ds_put_cont, pc); return; } @@ -547,7 +545,6 @@ block_proc (void *cls, p->bo.replication_level, p->bo.expiration_time, -2, 1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, &ds_put_cont, pc); } diff --git a/src/fs/fs_publish_ublock.c b/src/fs/fs_publish_ublock.c index 9ea7bf41b..7de9ea689 100644 --- a/src/fs/fs_publish_ublock.c +++ b/src/fs/fs_publish_ublock.c @@ -294,7 +294,6 @@ GNUNET_FS_publish_ublock_ (struct GNUNET_FS_Handle *h, bo->replication_level, bo->expiration_time, -2, 1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, &ublock_put_cont, uc); } else diff --git a/src/fs/fs_unindex.c b/src/fs/fs_unindex.c index 2e9c17217..2c4cb6ae6 100644 --- a/src/fs/fs_unindex.c +++ b/src/fs/fs_unindex.c @@ -215,7 +215,7 @@ unindex_process (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending REMOVE request to DATASTORE service\n"); GNUNET_DATASTORE_remove (uc->dsh, &chk->query, size, data, -2, 1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, &process_cont, uc); + &process_cont, uc); uc->chk = *chk; } @@ -552,7 +552,6 @@ process_kblock_for_unindex (void *cls, data, 0 /* priority */, 1 /* queue size */, - GNUNET_TIME_UNIT_FOREVER_REL, &continue_after_remove, uc); return; @@ -563,7 +562,6 @@ process_kblock_for_unindex (void *cls, GNUNET_BLOCK_TYPE_FS_UBLOCK, 0 /* priority */, 1 /* queue size */, - GNUNET_TIME_UNIT_FOREVER_REL, &process_kblock_for_unindex, uc); } @@ -615,7 +613,6 @@ GNUNET_FS_unindex_do_remove_kblocks_ (struct GNUNET_FS_UnindexContext *uc) GNUNET_BLOCK_TYPE_FS_UBLOCK, 0 /* priority */, 1 /* queue size */, - GNUNET_TIME_UNIT_FOREVER_REL, &process_kblock_for_unindex, uc); } diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c index ccf6b3c5c..c9d838fef 100644 --- a/src/fs/gnunet-service-fs_cadet_server.c +++ b/src/fs/gnunet-service-fs_cadet_server.c @@ -445,7 +445,6 @@ request_cb (void *cls, ntohl (sqm->type), 0 /* priority */, GSF_datastore_queue_size, - GNUNET_TIME_UNIT_FOREVER_REL, &handle_datastore_reply, sc); if (NULL == sc->qe) { diff --git a/src/fs/gnunet-service-fs_indexing.c b/src/fs/gnunet-service-fs_indexing.c index 9687b24bc..87de0986d 100644 --- a/src/fs/gnunet-service-fs_indexing.c +++ b/src/fs/gnunet-service-fs_indexing.c @@ -523,7 +523,7 @@ GNUNET_FS_handle_on_demand_block (const struct GNUNET_HashCode * key, uint32_t s { GNUNET_break (0); GNUNET_DATASTORE_remove (dsh, key, size, data, -1, -1, - GNUNET_TIME_UNIT_FOREVER_REL, &remove_cont, NULL); + &remove_cont, NULL); return GNUNET_SYSERR; } odb = (const struct OnDemandBlock *) data; @@ -542,7 +542,7 @@ GNUNET_FS_handle_on_demand_block (const struct GNUNET_HashCode * key, uint32_t s ("# index blocks removed: original file inaccessible"), 1, GNUNET_YES); GNUNET_DATASTORE_remove (dsh, key, size, data, -1, -1, - GNUNET_TIME_UNIT_FOREVER_REL, &remove_cont, NULL); + &remove_cont, NULL); return GNUNET_SYSERR; } if ((NULL == @@ -560,7 +560,7 @@ GNUNET_FS_handle_on_demand_block (const struct GNUNET_HashCode * key, uint32_t s if (fh != NULL) GNUNET_DISK_file_close (fh); GNUNET_DATASTORE_remove (dsh, key, size, data, -1, -1, - GNUNET_TIME_UNIT_FOREVER_REL, &remove_cont, NULL); + &remove_cont, NULL); return GNUNET_SYSERR; } GNUNET_DISK_file_close (fh); @@ -574,7 +574,7 @@ GNUNET_FS_handle_on_demand_block (const struct GNUNET_HashCode * key, uint32_t s _("Indexed file `%s' changed at offset %llu\n"), fn, (unsigned long long) off); GNUNET_DATASTORE_remove (dsh, key, size, data, -1, -1, - GNUNET_TIME_UNIT_FOREVER_REL, &remove_cont, NULL); + &remove_cont, NULL); return GNUNET_SYSERR; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index a7a62a743..d82b2a954 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -1149,7 +1149,6 @@ handle_dht_reply (void *cls, 1 /* anonymity */ , 0 /* replication */ , exp, 1 + prq.priority, MAX_DATASTORE_QUEUE, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, &put_migration_continuation, pmc)) { put_migration_continuation (pmc, @@ -1472,7 +1471,6 @@ process_local_reply (void *cls, pr->public_data.options)) ? UINT_MAX : GSF_datastore_queue_size /* max queue size */ , - GNUNET_TIME_UNIT_FOREVER_REL, &process_local_reply, pr); if (NULL != pr->qe) return; /* we're done */ @@ -1492,7 +1490,7 @@ process_local_reply (void *cls, { GNUNET_break (0); GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1, - GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL); + NULL, NULL); pr->qe_start = GNUNET_TIME_absolute_get (); pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, @@ -1512,7 +1510,6 @@ process_local_reply (void *cls, pr->public_data.options)) ? UINT_MAX : GSF_datastore_queue_size /* max queue size */ , - GNUNET_TIME_UNIT_FOREVER_REL, &process_local_reply, pr); if (NULL == pr->qe) { @@ -1574,7 +1571,6 @@ process_local_reply (void *cls, public_data.options)) ? UINT_MAX : GSF_datastore_queue_size /* max queue size */ , - GNUNET_TIME_UNIT_FOREVER_REL, &process_local_reply, pr); /* check if we successfully queued another datastore request; * if so, return, otherwise call our continuation (if we have @@ -1681,7 +1677,6 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, public_data.options)) ? UINT_MAX : GSF_datastore_queue_size /* max queue size */ , - GNUNET_TIME_UNIT_FOREVER_REL, &process_local_reply, pr); if (NULL != pr->qe) return; @@ -1795,7 +1790,6 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, prq.priority, 1 /* anonymity */ , 0 /* replication */ , expiration, 1 + prq.priority, MAX_DATASTORE_QUEUE, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, &put_migration_continuation, pmc)) { put_migration_continuation (pmc, diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index d7a15fad6..71a8e81e4 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c @@ -582,7 +582,6 @@ gather_migration_blocks (void *cls) value_found = GNUNET_NO; mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, &process_migration_content, NULL); if (NULL == mig_qe) consider_gathering (); diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c index 3517c7b24..bb4cb4ecb 100644 --- a/src/fs/gnunet-service-fs_put.c +++ b/src/fs/gnunet-service-fs_put.c @@ -225,7 +225,6 @@ gather_dht_put_blocks (void *cls) po->dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, po->dht_put_type, &process_dht_put_content, po); if (NULL == po->dht_qe) diff --git a/src/include/gnunet_datastore_service.h b/src/include/gnunet_datastore_service.h index e4b445bbb..f594d8fa6 100644 --- a/src/include/gnunet_datastore_service.h +++ b/src/include/gnunet_datastore_service.h @@ -92,7 +92,7 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, * operation. * * @param cls closure - * @param success #GNUNET_SYSERR on failure (including timeout/queue drop) + * @param success #GNUNET_SYSERR on failure * #GNUNET_NO if content was already there * #GNUNET_YES (or other positive value) on success * @param min_expiration minimum expiration time required for 0-priority content to be stored @@ -149,7 +149,6 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout timeout for the operation * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -169,7 +168,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls); @@ -188,7 +186,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -199,7 +196,6 @@ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, uint32_t rid, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls); @@ -214,7 +210,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -228,7 +223,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls); @@ -246,7 +240,6 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to @@ -260,7 +253,6 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, const void *data, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls); @@ -305,7 +297,6 @@ typedef void * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param proc function to call on a matching value; * or with a NULL value if no datum matches * @param proc_cls closure for @a proc @@ -319,7 +310,6 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls); @@ -339,7 +329,6 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param type allowed type for the operation (never zero) * @param proc function to call on a random value; it * will be called once with a value (if available) @@ -353,7 +342,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, enum GNUNET_BLOCK_Type type, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls); @@ -370,7 +358,6 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param proc function to call on a random value; it * will be called once with a value (if available) * and always once with a value of NULL. @@ -382,7 +369,6 @@ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls); diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 8a429ba41..1435612d7 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -459,6 +459,11 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_DATASTORE_DROP 103 +/** + * Message sent by datastore client to get data by key. + */ +#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY 104 + /******************************************************************************* * FS message types -- 2.25.1