From deb14caf5519f62ca11734a938f7d6024c1242a5 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 12 May 2010 14:22:36 +0000 Subject: [PATCH] stuff --- src/datastore/Makefile.am | 2 +- src/datastore/datastore_api.c | 1073 +++++++++++------ src/datastore/perf_datastore_api.c | 10 +- src/datastore/test_datastore_api.c | 38 +- src/datastore/test_datastore_api_management.c | 10 +- 5 files changed, 746 insertions(+), 387 deletions(-) diff --git a/src/datastore/Makefile.am b/src/datastore/Makefile.am index 1768af60f..a3882d676 100644 --- a/src/datastore/Makefile.am +++ b/src/datastore/Makefile.am @@ -64,7 +64,7 @@ check_PROGRAMS = \ perf_datastore_api \ perf_plugin_datastore -#TESTS = $(check_PROGRAMS) +TESTS = $(check_PROGRAMS) test_datastore_api_SOURCES = \ test_datastore_api.c diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 72f7faed7..3e47f84d1 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -20,11 +20,13 @@ /** * @file datastore/datastore_api.c - * @brief Management for the datastore for files stored on a GNUnet node + * @brief Management for the datastore for files stored on a GNUnet node. Implements + * a priority queue for requests (with timeouts). * @author Christian Grothoff */ #include "platform.h" #include "gnunet_arm_service.h" +#include "gnunet_constants.h" #include "gnunet_datastore_service.h" #include "datastore.h" @@ -49,6 +51,29 @@ struct QueueEntry */ struct GNUNET_DATASTORE_Handle *h; + /** + * Response processor (NULL if we are not waiting for a response). + * This struct should be used for the closure, function-specific + * arguments can be passed via 'client_ctx'. + */ + GNUNET_CLIENT_MessageHandler response_proc; + + /** + * Specific context (variable argument that + * can be used by the response processor). + */ + void *client_ctx; + + /** + * Function to call after transmission of the request. + */ + GNUNET_DATASTORE_ContinuationWithStatus cont; + + /** + * Closure for 'cont'. + */ + void *cont_cls; + /** * Task for timeout signalling. */ @@ -72,34 +97,23 @@ struct QueueEntry /** * Number of bytes in the request message following - * this struct. + * this struct. 32-bit value for nicer memory + * access (and overall struct alignment). */ - uint16_t message_size; + uint32_t message_size; /** * Has this message been transmitted to the service? * Only ever GNUNET_YES for the head of the queue. + * Note that the overall struct should end at a + * multiple of 64 bits. */ - int16_t was_transmitted; - - /** - * Response processor (NULL if we are not waiting for a response). - * This struct should be used for the closure, function-specific - * arguments can be passed via 'client_ctx'. - */ - GNUNET_CLIENT_MessageHandler response_proc; - - /** - * Specific context (variable argument that - * can be used by the response processor). - */ - void *client_ctx; + int32_t was_transmitted; }; /** - * Handle to the datastore service. Followed - * by 65536 bytes used for storing messages. + * Handle to the datastore service. */ struct GNUNET_DATASTORE_Handle { @@ -119,6 +133,11 @@ struct GNUNET_DATASTORE_Handle */ struct GNUNET_CLIENT_Connection *client; + /** + * Current transmit handle. + */ + struct GNUNET_CLIENT_TransmitHandle *th; + /** * Current head of priority queue. */ @@ -129,6 +148,17 @@ struct GNUNET_DATASTORE_Handle */ struct QueueEntry *queue_tail; + /** + * Task for trying to reconnect. + */ + GNUNET_SCHEDULER_TaskIdentifier reconnect_task; + + /** + * How quickly should we retry? Used for exponential back-off on + * connect-errors. + */ + struct GNUNET_TIME_Relative retry_time; + /** * Number of entries in the queue. */ @@ -214,6 +244,9 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, if (h->client != NULL) GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (h->sched, + h->reconnect_task); h->client = NULL; while (NULL != (qe = h->queue_head)) { @@ -245,131 +278,271 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, } -#if 0 /** - * Type of a function to call when we receive a message - * from the service. This specific function is used - * to handle messages of type "struct StatusMessage". + * A request has timed out (before being transmitted to the service). * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error + * @param cls the 'struct QueueEntry' + * @param tc scheduler context */ -static void -with_status_response_handler (void *cls, - const struct - GNUNET_MessageHeader * msg) +static void +timeout_queue_entry (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct GNUNET_DATASTORE_Handle *h = cls; - GNUNET_DATASTORE_ContinuationWithStatus cont = h->response_proc; - const struct StatusMessage *sm; - const char *emsg; - int status; + struct QueueEntry *qe = cls; + struct GNUNET_DATASTORE_Handle *h = qe->h; + + qe->task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_assert (qe->was_transmitted == GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + if (qe->cont != NULL) + qe->cont (qe->cont_cls, GNUNET_NO, _("timeout")); + if (qe->response_proc != NULL) + qe->response_proc (qe, NULL); + GNUNET_free (qe); +} - h->message_size = 0; - if (msg == NULL) + +/** + * Create a new entry for our priority queue (and possibly discard other entires if + * the queue is getting too long). + * + * @param h handle to the datastore + * @param msize size of the message to queue + * @param queue_priority priority of the entry + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout timeout for the operation + * @param cont continuation to call when done with request transmission (can be NULL) + * @param cont_cls closure for cont + * @param response_proc function to call with replies (can be NULL) + * @param client_ctx client context (NOT a closure for response_proc) + * @return NULL if the queue is full (and this entry was dropped) + */ +static struct QueueEntry * +make_queue_entry (struct GNUNET_DATASTORE_Handle *h, + size_t msize, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls, + GNUNET_CLIENT_MessageHandler response_proc, + void *client_ctx) +{ + struct QueueEntry *ret; + struct QueueEntry *pos; + unsigned int c; + + c = 0; + pos = h->queue_head; + while ( (pos != NULL) && + (c < max_queue_size) && + (pos->priority >= queue_priority) ) { - h->response_proc = NULL; - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - cont (h->response_proc_cls, - GNUNET_SYSERR, - _("Timeout trying to read response from datastore service")); - return; + c++; + pos = pos->next; } - if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || - (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) + if (c >= max_queue_size) + return NULL; + if (pos == NULL) { - GNUNET_break (0); - h->response_proc = NULL; - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - cont (h->response_proc_cls, - GNUNET_SYSERR, - _("Error reading response from datastore service")); - return; + /* append at the tail */ + pos = h->queue_tail; } - sm = (const struct StatusMessage*) msg; - status = ntohl(sm->status); - emsg = NULL; - if (ntohs(msg->size) > sizeof(struct StatusMessage)) + else { - emsg = (const char*) &sm[1]; - if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0') + pos = pos->prev; + /* do not insert at HEAD if HEAD query was already + transmitted and we are still receiving replies! */ + if ( (pos == NULL) && + (h->queue_head->was_transmitted) ) + pos = h->queue_head; + } + ret = GNUNET_malloc (sizeof (struct QueueEntry) + msize); + GNUNET_CONTAINER_DLL_insert_after (h->queue_head, + h->queue_tail, + pos, + ret); + ret->h = h; + ret->response_proc = response_proc; + ret->client_ctx = client_ctx; + ret->cont = cont; + ret->cont_cls = cont_cls; + ret->task = GNUNET_SCHEDULER_add_delayed (h->sched, + timeout, + &timeout_queue_entry, + ret); + ret->timeout = GNUNET_TIME_relative_to_absolute (timeout); + ret->priority = queue_priority; + ret->max_queue = max_queue_size; + ret->message_size = msize; + ret->was_transmitted = GNUNET_NO; + h->queue_size++; + c++; + pos = ret->next; + while (pos != NULL) + { + if (pos->max_queue < h->queue_size) { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + pos); + GNUNET_SCHEDULER_cancel (h->sched, + pos->task); + if (pos->cont != NULL) + pos->cont (pos->cont_cls, GNUNET_NO, _("Message queue full")); + if (pos->response_proc != NULL) + pos->response_proc (pos, NULL); + GNUNET_free (pos); + h->queue_size--; + break; } - } - if ( (status == GNUNET_SYSERR) && - (emsg == NULL) ) - { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); + pos = pos->next; } - h->response_proc = NULL; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received status %d/%s\n", - status, - emsg); -#endif - cont (h->response_proc_cls, - status, - emsg); + return ret; } /** - * Helper function that will initiate the - * transmission of a message to the datastore - * service. The message must already be prepared - * and stored in the buffer at the end of the - * handle. The message must be of a type that - * expects a "StatusMessage" in response. + * Process entries in the queue (or do nothing if we are already + * doing so). + * + * @param h handle to the datastore + */ +static void +process_queue (struct GNUNET_DATASTORE_Handle *h); + + +/** + * Try reconnecting to the datastore service. * - * @param h handle to the service with prepared message - * @param cont function to call with result - * @param cont_cls closure - * @param timeout timeout for the operation + * @param cls the 'struct GNUNET_DATASTORE_Handle' + * @param tc scheduler context */ static void -transmit_for_status (struct GNUNET_DATASTORE_Handle *h, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) +try_reconnect (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - const struct GNUNET_MessageHeader *hdr; - uint16_t msize; + struct GNUNET_DATASTORE_Handle *h = cls; - GNUNET_assert (cont != NULL); - hdr = (const struct GNUNET_MessageHeader*) &h[1]; - msize = ntohs(hdr->size); -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u byte message of type %u to datastore service\n", - msize, - ntohs(hdr->type)); -#endif - GNUNET_assert (h->response_proc == NULL); - h->response_proc = cont; - h->response_proc_cls = cont_cls; - h->timeout = GNUNET_TIME_relative_to_absolute (timeout); - h->message_size = msize; - if (GNUNET_OK != - GNUNET_CLIENT_transmit_and_get_response (h->client, - hdr, - timeout, - GNUNET_YES, - &with_status_response_handler, - h)) + if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value) + h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY; + else + h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2); + if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value) + h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT; + h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; + h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); + if (h->client == NULL) + return; + process_queue (h); +} + + +/** + * Disconnect from the service and then try reconnecting to the datastore service + * after some delay. + * + * @param cls the 'struct GNUNET_DATASTORE_Handle' + * @param tc scheduler context + */ +static void +do_disconnect (struct GNUNET_DATASTORE_Handle *h) +{ + GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + h->client = NULL; + h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched, + h->retry_time, + &try_reconnect, + h); +} + + +/** + * Transmit request from queue to datastore service. + * + * @param cls the 'struct GNUNET_DATASTORE_Handle' + * @param size number of bytes that can be copied to buf + * @param buf where to copy the drop message + * @return number of bytes written to buf + */ +static size_t +transmit_request (void *cls, + size_t size, + void *buf) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct QueueEntry *qe; + size_t msize; + + h->th = NULL; + if (NULL == (qe = h->queue_head)) + return 0; /* no entry in queue */ + if (buf == NULL) { - GNUNET_break (0); - h->response_proc = NULL; - h->message_size = 0; - cont (cont_cls, - GNUNET_SYSERR, - _("Not ready to transmit request to datastore service")); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to transmit request to database.\n")); + do_disconnect (h); + return 0; + } + if (size < (msize = qe->message_size)) + { + process_queue (h); + return 0; + } + memcpy (buf, &qe[1], msize); + qe->was_transmitted = GNUNET_YES; + if (qe->cont != NULL) + qe->cont (qe->cont_cls, GNUNET_OK, NULL); + GNUNET_SCHEDULER_cancel (h->sched, + qe->task); + qe->task = GNUNET_SCHEDULER_NO_TASK; + if (qe->response_proc == NULL) + { + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + GNUNET_free (qe); + process_queue (h); } + else + { + GNUNET_CLIENT_receive (h->client, + qe->response_proc, + qe, + GNUNET_TIME_absolute_get_remaining (qe->timeout)); + } + return msize; +} + + +/** + * Process entries in the queue (or do nothing if we are already + * doing so). + * + * @param h handle to the datastore + */ +static void +process_queue (struct GNUNET_DATASTORE_Handle *h) +{ + struct QueueEntry *qe; + + if (NULL == (qe = h->queue_head)) + return; /* no entry in queue */ + if (qe->was_transmitted == GNUNET_YES) + return; /* waiting for replies */ + if (h->th != NULL) + return; /* request pending */ + if (h->client == NULL) + return; /* waiting for reconnect */ + h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, + qe->message_size, + GNUNET_TIME_absolute_get_remaining (qe->timeout), + GNUNET_YES, + &transmit_request, + h); } @@ -388,6 +561,9 @@ transmit_for_status (struct GNUNET_DATASTORE_Handle *h, * @param priority priority of the content * @param anonymity anonymity-level for the content * @param expiration expiration time for the content + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) * @param timeout timeout for the operation * @param cont continuation to call when done * @param cont_cls closure for cont @@ -402,10 +578,13 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t priority, uint32_t anonymity, struct GNUNET_TIME_Absolute expiration, + unsigned int queue_priority, + unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { + struct QueueEntry *qe; struct DataMessage *dm; size_t msize; @@ -417,7 +596,12 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, #endif msize = sizeof(struct DataMessage) + size; GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - dm = (struct DataMessage*) &h[1]; + qe = make_queue_entry (h, msize, + queue_priority, max_queue_size, timeout, + cont, cont_cls, NULL, NULL); + if (qe == NULL) + return; + dm = (struct DataMessage* ) &qe[1]; dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); dm->header.size = htons(msize); dm->rid = htonl(rid); @@ -429,7 +613,100 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, dm->expiration = GNUNET_TIME_absolute_hton(expiration); dm->key = *key; memcpy (&dm[1], data, size); - transmit_for_status (h, cont, cont_cls, timeout); + process_queue (h); +} + + +/** + * Context for processing status messages. + */ +struct StatusContext +{ + /** + * Continuation to call with the status. + */ + GNUNET_DATASTORE_ContinuationWithStatus cont; + + /** + * Closure for cont. + */ + void *cont_cls; + +}; + + +/** + * Type of a function to call when we receive a message + * from the service. + * + * @param cls closure + * @param msg message received, NULL on timeout or fatal error + */ +static void +process_status_message (void *cls, + const struct + GNUNET_MessageHeader * msg) +{ + struct QueueEntry *qe = cls; + struct GNUNET_DATASTORE_Handle *h = qe->h; + struct StatusContext *rc = qe->client_ctx; + const struct StatusMessage *sm; + const char *emsg; + int32_t status; + + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + GNUNET_free (qe); + if (msg == NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to receive response from database.\n")); + do_disconnect (h); + return; + } + + if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || + (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) + { + GNUNET_break (0); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + rc->cont (rc->cont_cls, + GNUNET_SYSERR, + _("Error reading response from datastore service")); + GNUNET_free (rc); + return; + } + sm = (const struct StatusMessage*) msg; + status = ntohl(sm->status); + emsg = NULL; + if (ntohs(msg->size) > sizeof(struct StatusMessage)) + { + emsg = (const char*) &sm[1]; + if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0') + { + GNUNET_break (0); + emsg = _("Invalid error message received from datastore service"); + } + } + if ( (status == GNUNET_SYSERR) && + (emsg == NULL) ) + { + GNUNET_break (0); + emsg = _("Invalid error message received from datastore service"); + } +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received status %d/%s\n", + (int) status, + emsg); +#endif + rc->cont (rc->cont_cls, + status, + emsg); + GNUNET_free (rc); + process_queue (h); } @@ -441,27 +718,48 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * @param h handle to the datastore * @param amount how much space (in bytes) should be reserved (for content only) * @param entries how many entries will be created (to calculate per-entry overhead) + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response (or before dying in queue) * @param cont continuation to call when done; "success" will be set to * a positive reservation value if space could be reserved. * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response */ void GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, uint32_t entries, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) + void *cont_cls) { + struct QueueEntry *qe; struct ReserveMessage *rm; + struct StatusContext *scont; - rm = (struct ReserveMessage*) &h[1]; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asked to reserve %llu bytes of data and %u entries'\n", + (unsigned long long) amount, + (unsigned int) entries); +#endif + scont = GNUNET_malloc (sizeof (struct StatusContext)); + scont->cont = cont; + scont->cont_cls = cont_cls; + qe = make_queue_entry (h, sizeof(struct ReserveMessage), + queue_priority, max_queue_size, timeout, + NULL, NULL, &process_status_message, scont); + if (qe == NULL) + return; + rm = (struct ReserveMessage*) &qe[1]; rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); rm->header.size = htons(sizeof (struct ReserveMessage)); rm->entries = htonl(entries); rm->amount = GNUNET_htonll(amount); - transmit_for_status (h, cont, cont_cls, timeout); + process_queue (h); } @@ -473,24 +771,48 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, * @param h handle to the datastore * @param rid reservation ID (value of "success" in original continuation * from the "reserve" function). + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response */ void GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, int rid, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) + void *cont_cls) { + struct QueueEntry *qe; struct ReleaseReserveMessage *rrm; + struct StatusContext *scont; - rrm = (struct ReleaseReserveMessage*) &h[1]; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asked to reserve %llu bytes of data and %u entries'\n", + (unsigned long long) amount, + (unsigned int) entries); +#endif + scont = GNUNET_malloc (sizeof (struct StatusContext)); + scont->cont = cont; + scont->cont_cls = cont_cls; + qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage), + queue_priority, max_queue_size, timeout, + NULL, NULL, &process_status_message, scont); + if (qe == NULL) + return; + rrm = (struct ReleaseReserveMessage*) &qe[1]; rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); rrm->header.size = htons(sizeof (struct ReleaseReserveMessage)); rrm->rid = htonl(rid); - transmit_for_status (h, cont, cont_cls, timeout); + process_queue (h); } @@ -501,264 +823,334 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, * @param uid identifier for the value * @param priority how much to increase the priority of the value * @param expiration new expiration value should be MAX of existing and this argument + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response */ void GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, unsigned long long uid, uint32_t priority, struct GNUNET_TIME_Absolute expiration, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) + void *cont_cls) { + struct QueueEntry *qe; struct UpdateMessage *um; + struct StatusContext *scont; - um = (struct UpdateMessage*) &h[1]; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asked to update entry %llu raising priority by %u and expiration to %llu\n", + uid, + (unsigned int) priority, + (unsigned long long) expiration.value); +#endif + scont = GNUNET_malloc (sizeof (struct StatusContext)); + scont->cont = cont; + scont->cont_cls = cont_cls; + qe = make_queue_entry (h, sizeof(struct UpdateMessage), + queue_priority, max_queue_size, timeout, + NULL, NULL, &process_status_message, scont); + if (qe == NULL) + return; + um = (struct UpdateMessage*) &qe[1]; um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); um->header.size = htons(sizeof (struct UpdateMessage)); um->priority = htonl(priority); um->expiration = GNUNET_TIME_absolute_hton(expiration); um->uid = GNUNET_htonll(uid); - transmit_for_status (h, cont, cont_cls, timeout); + process_queue (h); } /** - * Helper function that will initiate the transmission of a message to - * the datastore service. The message must already be prepared and - * stored in the buffer at the end of the handle. The message must be - * of a type that expects a "DataMessage" in response. + * Explicitly remove some content from the database. + * The "cont"inuation will be called with status + * "GNUNET_OK" if content was removed, "GNUNET_NO" + * if no matching entry was found and "GNUNET_SYSERR" + * on all other types of errors. * - * @param h handle to the service with prepared message - * @param cont function to call with result - * @param cont_cls closure - * @param timeout timeout for the operation + * @param h handle to the datastore + * @param key key for the value + * @param size number of bytes in data + * @param data content stored + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response + * @param cont continuation to call when done + * @param cont_cls closure for cont */ -static void -transmit_for_result (struct GNUNET_DATASTORE_Handle *h, - GNUNET_DATASTORE_Iterator cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout); +void +GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, + const GNUNET_HashCode *key, + uint32_t size, + const void *data, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) +{ + struct QueueEntry *qe; + struct DataMessage *dm; + size_t msize; + struct StatusContext *scont; + +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asked to remove %u bytes under key `%s'\n", + size, + GNUNET_h2s (key)); +#endif + scont = GNUNET_malloc (sizeof (struct StatusContext)); + scont->cont = cont; + scont->cont_cls = cont_cls; + msize = sizeof(struct DataMessage) + size; + GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); + qe = make_queue_entry (h, msize, + queue_priority, max_queue_size, timeout, + NULL, NULL, &process_status_message, scont); + if (qe == NULL) + return; + dm = (struct DataMessage*) &qe[1]; + dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); + dm->header.size = htons(msize); + dm->rid = htonl(0); + dm->size = htonl(size); + dm->type = htonl(0); + dm->priority = htonl(0); + dm->anonymity = htonl(0); + dm->uid = GNUNET_htonll(0); + dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS); + dm->key = *key; + memcpy (&dm[1], data, size); + process_queue (h); +} + + + +/** + * Context for processing result messages. + */ +struct ResultContext +{ + /** + * Iterator to call with the result. + */ + GNUNET_DATASTORE_Iterator iter; + + /** + * Closure for iter. + */ + void *iter_cls; + + /** + * Automatically get the next result, or wait for a call to + * GNUNET_DATASTORE_get_next? GNUNET_YES means we automatically + * get the next one (if there are more). + */ + int get_next; + +}; /** * Type of a function to call when we receive a message - * from the service. This specific function is used - * to handle messages of type "struct DataMessage". + * from the service. * * @param cls closure * @param msg message received, NULL on timeout or fatal error */ static void -with_result_response_handler (void *cls, - const struct - GNUNET_MessageHeader * msg) +process_result_message (void *cls, + const struct GNUNET_MessageHeader * msg) { - struct GNUNET_DATASTORE_Handle *h = cls; - GNUNET_DATASTORE_Iterator cont = h->response_proc; + struct QueueEntry *qe = cls; + struct GNUNET_DATASTORE_Handle *h = qe->h; + struct ResultContext *rc = qe->client_ctx; const struct DataMessage *dm; - size_t msize; - struct GNUNET_TIME_Relative remaining; if (msg == NULL) { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Got disconnected from datastore\n"); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to receive response from datastore\n")); #endif - h->response_proc = NULL; - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - remaining = GNUNET_TIME_absolute_get_remaining (h->timeout); - if (remaining.value > 0) - { - transmit_for_result (h, - cont, - h->response_proc_cls, - remaining); - } - else - { - h->message_size = 0; - cont (h->response_proc_cls, + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + GNUNET_free (qe); + do_disconnect (h); + rc->iter (rc->iter_cls, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - } + GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_free (rc); return; } - h->message_size = 0; if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) { GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); - h->response_proc = NULL; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received end of result set\n"); #endif - cont (h->response_proc_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + GNUNET_free (qe); + rc->iter (rc->iter_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_free (rc); + process_queue (h); return; } if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || - (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ) + (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || + (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) { GNUNET_break (0); - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - h->response_proc = NULL; - cont (h->response_proc_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + GNUNET_free (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + rc->iter (rc->iter_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_free (rc); return; } dm = (const struct DataMessage*) msg; - msize = ntohl(dm->size); - if (ntohs(msg->size) != msize + sizeof(struct DataMessage)) - { - GNUNET_break (0); - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - h->response_proc = NULL; - cont (h->response_proc_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - return; - } #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received result %llu with type %u and size %u with key %s\n", (unsigned long long) GNUNET_ntohll(dm->uid), ntohl(dm->type), - msize, + ntohl(dm->size), GNUNET_h2s(&dm->key)); #endif - cont (h->response_proc_cls, - &dm->key, - msize, - &dm[1], - ntohl(dm->type), - ntohl(dm->priority), - ntohl(dm->anonymity), - GNUNET_TIME_absolute_ntoh(dm->expiration), - GNUNET_ntohll(dm->uid)); + rc->iter (rc->iter_cls, + &dm->key, + ntohl(dm->size), + &dm[1], + ntohl(dm->type), + ntohl(dm->priority), + ntohl(dm->anonymity), + GNUNET_TIME_absolute_ntoh(dm->expiration), + GNUNET_ntohll(dm->uid)); + if (rc->get_next == GNUNET_YES) + GNUNET_CLIENT_receive (h->client, + qe->response_proc, + qe, + GNUNET_TIME_absolute_get_remaining (qe->timeout)); } /** - * Function called to trigger obtaining the next result - * from the datastore. - * - * @param h handle to the datastore - * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort - * iteration (with a final call to "iter" with key/data == NULL). - */ -void -GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, - int more) -{ - GNUNET_DATASTORE_Iterator cont; - - if (GNUNET_YES == more) - { - GNUNET_CLIENT_receive (h->client, - &with_result_response_handler, - h, - GNUNET_TIME_absolute_get_remaining (h->timeout)); - return; - } - cont = h->response_proc; - h->response_proc = NULL; - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - cont (h->response_proc_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); -} - - -/** - * Helper function that will initiate the transmission of a message to - * the datastore service. The message must already be prepared and - * stored in the buffer at the end of the handle. The message must be - * of a type that expects a "DataMessage" in response. + * Get a random value from the datastore. * - * @param h handle to the service with prepared message - * @param cont function to call with result - * @param cont_cls closure - * @param timeout timeout for the operation + * @param h handle to the datastore + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response + * @param iter function to call on a random value; it + * will be called once with a value (if available) + * and always once with a value of NULL. + * @param iter_cls closure for iter */ -static void -transmit_for_result (struct GNUNET_DATASTORE_Handle *h, - GNUNET_DATASTORE_Iterator cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) +void +GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_Iterator iter, + void *iter_cls) { - const struct GNUNET_MessageHeader *hdr; - uint16_t msize; + struct QueueEntry *qe; + struct GNUNET_MessageHeader *m; + struct ResultContext *rcont; - GNUNET_assert (cont != NULL); - hdr = (const struct GNUNET_MessageHeader*) &h[1]; - msize = ntohs(hdr->size); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u byte message of type %u to datastore service\n", - msize, - ntohs(hdr->type)); + "Asked to get random entry in %llu ms\n", + (unsigned long long) timeout.value); #endif - GNUNET_assert (h->response_proc == NULL); - h->response_proc = cont; - h->response_proc_cls = cont_cls; - h->timeout = GNUNET_TIME_relative_to_absolute (timeout); - h->message_size = msize; - if (GNUNET_OK != - GNUNET_CLIENT_transmit_and_get_response (h->client, - hdr, - timeout, - GNUNET_YES, - &with_result_response_handler, - h)) - { - GNUNET_break (0); - h->response_proc = NULL; - h->message_size = 0; - cont (h->response_proc_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - } + rcont = GNUNET_malloc (sizeof (struct ResultContext)); + rcont->iter = iter; + rcont->iter_cls = iter_cls; + rcont->get_next = GNUNET_YES; + qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), + queue_priority, max_queue_size, timeout, + NULL, NULL, &process_result_message, rcont); + if (qe == NULL) + return; + m = (struct GNUNET_MessageHeader*) &qe[1]; + m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM); + m->size = htons(sizeof (struct GNUNET_MessageHeader)); + process_queue (h); } + /** * Iterate over the results for a particular key - * in the datastore. + * in the datastore. The iterator will only be called + * once initially; if the first call did contain a + * result, further results can be obtained by calling + * "GNUNET_DATASTORE_get_next" with the given argument. * * @param h handle to the datastore * @param key maybe NULL (to match all entries) * @param type desired type, 0 for any + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter - * @param timeout how long to wait at most for a response */ void GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, const GNUNET_HashCode * key, - enum GNUNET_BLOCK_Type type, - GNUNET_DATASTORE_Iterator iter, void *iter_cls, - struct GNUNET_TIME_Relative timeout) + enum GNUNET_BLOCK_Type type, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_Iterator iter, + void *iter_cls) { + struct QueueEntry *qe; struct GetMessage *gm; + struct ResultContext *rcont; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to look for data under key `%s'\n", + "Asked to look for data of type %u under key `%s'\n", + (unsigned int) type, GNUNET_h2s (key)); #endif - gm = (struct GetMessage*) &h[1]; + rcont = GNUNET_malloc (sizeof (struct ResultContext)); + rcont->iter = iter; + rcont->iter_cls = iter_cls; + rcont->get_next = GNUNET_NO; + qe = make_queue_entry (h, sizeof(struct GetMessage), + queue_priority, max_queue_size, timeout, + NULL, NULL, &process_result_message, rcont); + if (qe == NULL) + return; + gm = (struct GetMessage*) &qe[1]; gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); gm->type = htonl(type); if (key != NULL) @@ -770,80 +1162,47 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, { gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode)); } - GNUNET_assert (h->response_proc == NULL); - transmit_for_result (h, iter, iter_cls, timeout); + process_queue (h); } /** - * Get a random value from the datastore. - * + * Function called to trigger obtaining the next result + * from the datastore. + * * @param h handle to the datastore - * @param iter function to call on a random value; it - * will be called exactly once; if no values - * are available, the value will be NULL. - * @param iter_cls closure for iter - * @param timeout how long to wait at most for a response + * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort + * iteration (with a final call to "iter" with key/data == NULL). */ -void -GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, - GNUNET_DATASTORE_Iterator iter, void *iter_cls, - struct GNUNET_TIME_Relative timeout) +void +GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, + int more) { - struct GNUNET_MessageHeader *m; + struct QueueEntry *qe = h->queue_head; + struct ResultContext *rc = qe->client_ctx; - m = (struct GNUNET_MessageHeader*) &h[1]; - m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM); - m->size = htons(sizeof (struct GNUNET_MessageHeader)); - GNUNET_assert (h->response_proc == NULL); - transmit_for_result (h, iter, iter_cls, timeout); + GNUNET_assert (NULL != qe); + GNUNET_assert (&process_result_message == qe->response_proc); + GNUNET_assert (rc->get_next == GNUNET_NO); + if (GNUNET_YES == more) + { + GNUNET_CLIENT_receive (h->client, + qe->response_proc, + qe, + GNUNET_TIME_absolute_get_remaining (qe->timeout)); + return; + } + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + GNUNET_free (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + rc->iter (rc->iter_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_free (rc); } -/** - * Explicitly remove some content from the database. - * - * @param h handle to the datastore - * @param key key for the value - * @param size number of bytes in data - * @param data content stored - * @param cont continuation to call when done - * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response - */ -void -GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode * key, - uint32_t size, const void *data, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) -{ - struct DataMessage *dm; - size_t msize; - -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to remove %u bytes of data under key `%s'\n", - size, - GNUNET_h2s (key)); -#endif - msize = sizeof(struct DataMessage) + size; - GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - dm = (struct DataMessage*) &h[1]; - dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); - dm->header.size = htons(msize); - dm->rid = htonl(0); - dm->size = htonl(size); - dm->type = htonl(0); - dm->priority = htonl(0); - dm->anonymity = htonl(0); - dm->uid = GNUNET_htonll(0); - dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS); - dm->key = *key; - memcpy (&dm[1], data, size); - transmit_for_status (h, cont, cont_cls, timeout); -} -#endif - /* end of datastore_api.c */ diff --git a/src/datastore/perf_datastore_api.c b/src/datastore/perf_datastore_api.c index b92fefe4d..23ff7c49a 100644 --- a/src/datastore/perf_datastore_api.c +++ b/src/datastore/perf_datastore_api.c @@ -186,9 +186,9 @@ do_delete (void *cls, &crc->key, crc->esize, crc->data, + 1, 1, TIMEOUT, &remove_next, - crc, - TIMEOUT); + crc); } @@ -275,16 +275,16 @@ run_continuation (void *cls, GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000))), - TIMEOUT, + 1, 1, TIMEOUT, &check_success, crc); break; case RP_CUT: /* trim down below MAX_SIZE again */ GNUNET_DATASTORE_get_random (datastore, + 1, 1, TIMEOUT, &delete_value, - crc, - TIMEOUT); + crc); break; case RP_REPORT: printf ( diff --git a/src/datastore/test_datastore_api.c b/src/datastore/test_datastore_api.c index 07f1dc426..53e61d092 100644 --- a/src/datastore/test_datastore_api.c +++ b/src/datastore/test_datastore_api.c @@ -355,7 +355,7 @@ run_continuation (void *cls, get_priority (crc->i), get_anonymity (crc->i), get_expiration (crc->i), - TIMEOUT, + 1, 1, TIMEOUT, &check_success, crc); crc->i++; @@ -374,9 +374,9 @@ run_continuation (void *cls, GNUNET_DATASTORE_get (datastore, &crc->key, get_type (crc->i), + 1, 1, TIMEOUT, &check_value, - crc, - TIMEOUT); + crc); break; case RP_DEL: crc->i--; @@ -390,9 +390,9 @@ run_continuation (void *cls, GNUNET_DATASTORE_get (datastore, &crc->key, get_type (crc->i), + 1, 1, TIMEOUT, &delete_value, - crc, - TIMEOUT); + crc); break; case RP_DO_DEL: #if VERBOSE @@ -414,9 +414,9 @@ run_continuation (void *cls, &crc->key, crc->size, crc->data, + 1, 1, TIMEOUT, &check_success, - crc, - TIMEOUT); + crc); break; case RP_DELVALIDATE: crc->i--; @@ -430,18 +430,18 @@ run_continuation (void *cls, GNUNET_DATASTORE_get (datastore, &crc->key, get_type (crc->i), + 1, 1, TIMEOUT, &check_nothing, - crc, - TIMEOUT); + crc); break; case RP_RESERVE: crc->phase = RP_PUT_MULTIPLE; GNUNET_DATASTORE_reserve (datastore, 128*1024, 2, + 1, 1, TIMEOUT, &get_reserved, - crc, - TIMEOUT); + crc); break; case RP_PUT_MULTIPLE: crc->phase = RP_PUT_MULTIPLE_NEXT; @@ -454,7 +454,7 @@ run_continuation (void *cls, get_priority (42), get_anonymity (42), get_expiration (42), - TIMEOUT, + 1, 1, TIMEOUT, &check_success, crc); break; @@ -469,7 +469,7 @@ run_continuation (void *cls, get_priority (43), get_anonymity (43), get_expiration (43), - TIMEOUT, + 1, 1, TIMEOUT, &check_success, crc); break; @@ -477,9 +477,9 @@ run_continuation (void *cls, GNUNET_DATASTORE_get (datastore, &crc->key, get_type (42), + 1, 1, TIMEOUT, &check_multiple, - crc, - TIMEOUT); + crc); break; case RP_GET_MULTIPLE_NEXT: case RP_GET_MULTIPLE_DONE: @@ -492,17 +492,17 @@ run_continuation (void *cls, crc->uid, 100, get_expiration (42), + 1, 1, TIMEOUT, &check_success, - crc, - TIMEOUT); + crc); break; case RP_UPDATE_VALIDATE: GNUNET_DATASTORE_get (datastore, &crc->key, get_type (42), + 1, 1, TIMEOUT, &check_update, - crc, - TIMEOUT); + crc); break; case RP_UPDATE_DONE: GNUNET_assert (0); diff --git a/src/datastore/test_datastore_api_management.c b/src/datastore/test_datastore_api_management.c index 169cef554..0bdea5135 100644 --- a/src/datastore/test_datastore_api_management.c +++ b/src/datastore/test_datastore_api_management.c @@ -235,7 +235,7 @@ run_continuation (void *cls, get_priority (crc->i), get_anonymity (crc->i), get_expiration (crc->i), - TIMEOUT, + 1, 1, TIMEOUT, &check_success, crc); crc->i++; @@ -259,9 +259,9 @@ run_continuation (void *cls, GNUNET_DATASTORE_get (datastore, &crc->key, get_type (crc->i), + 1, 1, TIMEOUT, &check_value, - crc, - TIMEOUT); + crc); break; case RP_GET_FAIL: #if VERBOSE @@ -274,9 +274,9 @@ run_continuation (void *cls, GNUNET_DATASTORE_get (datastore, &crc->key, get_type (crc->i), + 1, 1, TIMEOUT, &check_nothing, - crc, - TIMEOUT); + crc); break; case RP_DONE: GNUNET_assert (0 == crc->i); -- 2.25.1