From 27ed8fcbc85a361864948edb517d47804c2b5a56 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 26 Apr 2011 18:19:15 +0000 Subject: [PATCH] datastore and fs fixes from Easter --- TODO | 26 +- src/core/core_api.c | 2 + src/datastore/datastore.h | 10 + src/datastore/datastore_api.c | 279 +++--- src/datastore/gnunet-service-datastore.c | 242 ++--- src/datastore/perf_datastore_api.c | 1 + src/datastore/perf_plugin_datastore.c | 78 +- .../perf_plugin_datastore_data_postgres.conf | 2 +- src/datastore/plugin_datastore_mysql.c | 790 +++++---------- src/datastore/plugin_datastore_postgres.c | 525 ++++------ src/datastore/plugin_datastore_sqlite.c | 918 ++++-------------- src/datastore/plugin_datastore_template.c | 110 +-- src/datastore/test_datastore_api.c | 285 +++--- .../test_datastore_api_data_sqlite.conf | 2 +- src/datastore/test_datastore_api_management.c | 68 +- src/datastore/test_plugin_datastore.c | 89 +- src/fs/Makefile.am | 4 +- src/fs/fs_download.c | 19 +- src/fs/fs_test_lib_data.conf | 6 +- src/fs/gnunet-pseudonym.c | 2 +- src/fs/gnunet-service-fs_cp.c | 7 +- src/fs/gnunet-service-fs_indexing.c | 2 +- src/fs/gnunet-service-fs_indexing.h | 2 +- src/fs/gnunet-service-fs_pe.c | 2 +- src/fs/gnunet-service-fs_pr.c | 147 ++- src/fs/gnunet-service-fs_put.c | 174 ++-- src/fs/test_fs_download_data.conf | 3 +- src/fs/test_gnunet_fs_idx.py.in | 2 +- src/fs/test_gnunet_fs_ns_data.conf | 2 +- ...test_gnunet_service_fs_migration_data.conf | 2 +- src/include/gnunet_datastore_plugin.h | 143 ++- src/include/gnunet_datastore_service.h | 109 +-- 32 files changed, 1446 insertions(+), 2607 deletions(-) diff --git a/TODO b/TODO index bd94dcfbe..b9bfaf729 100644 --- a/TODO +++ b/TODO @@ -1,23 +1,31 @@ 0.9.0pre3: [2'11] -* DATASTORE: - - postgres support currently not implemented -* NAT/UPNP: [Milan / Ayush / MW] +* NAT/UPNP: [Milan / MW] - [#1609] code clean up - testing - integration with transport service: + test TCP + implement UDP, HTTP/HTTPS * Transport: - - UDP fragmentation -* FS/CORE [CG] - - adjust service to deal with new datastore API (also crashes all over the place still, - likely related). + - ATS crashes [MW] + - UDP fragmentation [MW] +* CORE: + - Core API's peer_change_preference leaks 'irc' and + Core API's notify_transmit_ready leaks 'th'! +* FS [CG] + - test*.py fail - download of 100 MB file from 'leach' peer hung due to failure of core-api to call back after a change preference request (structs indicate request was transmitted but reply never received?) + => try again! + - test_gnunet_service_fs_p2p: + => sometimes DATASTORE get operation fails to queue on target (why?) + => do we need to just make the queue larger? + - with core queue size of 1, we get notify_transmit_ready + from core API returning NULL (why? ok? just have larger queue?) - other runs (-L DEBUG) with downloads using the new 'trust' test show non-deterministic results (for any set of peers) -* FS: [CG] + - implement 'SUPPORT_DELAYS' + - consider re-issue GSF_dht_lookup_ after non-DHT reply received - implement multi-peer FS performance tests + gauger them! + insert + download @@ -59,6 +67,8 @@ => If MiM attacker uses vetoed address, blacklist the specific IP for the presumed neighbour! - need to periodically probe latency/transport cost changes & possibly switch transport +* DATASTORE: [CG] + - check indexes / SQL for performance * DV: [Nate?] - proper bandwidth allocation - performance tests diff --git a/src/core/core_api.c b/src/core/core_api.c index 185e09d65..efb00c111 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c @@ -1842,6 +1842,7 @@ change_preference_send_continuation (void *cls, struct GNUNET_CORE_InformationRequestContext *irc = cls; irc->cm = NULL; + // FIXME: who frees 'irc'? } @@ -1901,6 +1902,7 @@ GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h, irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext)); irc->h = h; irc->pr = pr; + // FIXME: who frees 'irc'? (if not cancelled?) cm = GNUNET_malloc (sizeof (struct ControlMessage) + sizeof (struct RequestInfoMessage)); cm->cont = &change_preference_send_continuation; diff --git a/src/datastore/datastore.h b/src/datastore/datastore.h index 55ca7c8e5..d66ec0e95 100644 --- a/src/datastore/datastore.h +++ b/src/datastore/datastore.h @@ -113,6 +113,11 @@ struct GetMessage */ uint32_t type GNUNET_PACKED; + /** + * Offset of the result. + */ + uint64_t offset GNUNET_PACKED; + /** * Desired key (optional). Check the "size" of the * header to see if the key is actually present. @@ -138,6 +143,11 @@ struct GetZeroAnonymityMessage */ uint32_t type GNUNET_PACKED; + /** + * Offset of the result. + */ + uint64_t offset GNUNET_PACKED; + }; diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 2bba2e8ee..99060bd60 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -63,14 +63,14 @@ struct StatusContext struct ResultContext { /** - * Iterator to call with the result. + * Function to call with the result. */ - GNUNET_DATASTORE_Iterator iter; + GNUNET_DATASTORE_DatumProcessor proc; /** - * Closure for iter. + * Closure for proc. */ - void *iter_cls; + void *proc_cls; }; @@ -168,12 +168,6 @@ struct GNUNET_DATASTORE_QueueEntry */ int was_transmitted; - /** - * Are we expecting a single message in response to this - * request (and, if it is data, no 'END' message)? - */ - int one_shot; - }; /** @@ -241,10 +235,9 @@ struct GNUNET_DATASTORE_Handle int in_receive; /** - * We should either receive (and ignore) an 'END' message or force a - * disconnect for the next message from the service. + * We should ignore the next message(s) from the service. */ - unsigned int expect_end_or_disconnect; + unsigned int skip_next_messages; }; @@ -335,7 +328,7 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, while (NULL != (qe = h->queue_head)) { GNUNET_assert (NULL != qe->response_proc); - qe->response_proc (qe, NULL); + qe->response_proc (h, NULL); } if (GNUNET_YES == drop) { @@ -378,7 +371,7 @@ timeout_queue_entry (void *cls, GNUNET_NO); qe->task = GNUNET_SCHEDULER_NO_TASK; GNUNET_assert (qe->was_transmitted == GNUNET_NO); - qe->response_proc (qe, NULL); + qe->response_proc (qe->h, NULL); } @@ -394,7 +387,7 @@ timeout_queue_entry (void *cls, * @param timeout timeout for the operation * @param response_proc function to call with replies (can be NULL) * @param qc client context (NOT a closure for response_proc) - * @return NULL if the queue is full (and this entry was dropped) + * @return NULL if the queue is full */ static struct GNUNET_DATASTORE_QueueEntry * make_queue_entry (struct GNUNET_DATASTORE_Handle *h, @@ -418,6 +411,14 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, c++; pos = pos->next; } + if (c >= max_queue_size) + { + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue overflows"), + 1, + GNUNET_NO); + return NULL; + } ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); ret->h = h; ret->response_proc = response_proc; @@ -451,15 +452,6 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, pos, ret); h->queue_size++; - if (c > max_queue_size) - { - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# queue overflows"), - 1, - GNUNET_NO); - response_proc (ret, NULL); - return NULL; - } ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret); @@ -469,7 +461,15 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, if (pos->max_queue < h->queue_size) { GNUNET_assert (pos->response_proc != NULL); - pos->response_proc (pos, NULL); + /* move 'pos' element to head so that it will be + killed on 'NULL' call below */ + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + pos); + GNUNET_CONTAINER_DLL_insert (h->queue_head, + h->queue_tail, + pos); + pos->response_proc (h, NULL); break; } pos = pos->next; @@ -550,6 +550,7 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h) GNUNET_NO); #endif GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + h->skip_next_messages = 0; h->client = NULL; h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, @@ -700,6 +701,7 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) qe->task = GNUNET_SCHEDULER_NO_TASK; } h->queue_size--; + qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ GNUNET_free (qe); } @@ -724,16 +726,22 @@ process_status_message (void *cls, int was_transmitted; h->in_receive = GNUNET_NO; + if (h->skip_next_messages > 0) + { + h->skip_next_messages--; + process_queue (h); + return; + } if (NULL == (qe = h->queue_head)) { GNUNET_break (0); do_disconnect (h); return; } - was_transmitted = qe->was_transmitted; rc = qe->qc.sc; if (msg == NULL) { + was_transmitted = qe->was_transmitted; free_queue_entry (qe); if (NULL == h->client) return; /* forced disconnect */ @@ -1114,7 +1122,7 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, const GNUNET_HashCode *key, - size_t size, + size_t size, const void *data, unsigned int queue_priority, unsigned int max_queue_size, @@ -1186,45 +1194,35 @@ process_result_message (void *cls, struct GNUNET_DATASTORE_QueueEntry *qe; struct ResultContext rc; const struct DataMessage *dm; - int was_transmitted; h->in_receive = GNUNET_NO; + if (h->skip_next_messages > 0) + { + h->skip_next_messages--; + process_queue (h); + return; + } if (msg == NULL) { - if (NULL != (qe = h->queue_head)) + qe = h->queue_head; + GNUNET_assert (NULL != qe); + if (qe->was_transmitted == GNUNET_YES) { - was_transmitted = qe->was_transmitted; - free_queue_entry (qe); rc = qe->qc.rc; - if (was_transmitted == GNUNET_YES) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to receive response from database.\n")); - do_disconnect (h); - } - else - { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Request dropped due to finite datastore queue length.\n"); -#endif - } - if (rc.iter != NULL) - rc.iter (rc.iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to receive response from database.\n")); + do_disconnect (h); } + free_queue_entry (qe); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) { GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); - if (h->expect_end_or_disconnect > 0) - { - h->expect_end_or_disconnect--; - process_queue (h); - return; - } qe = h->queue_head; rc = qe->qc.rc; GNUNET_assert (GNUNET_YES == qe->was_transmitted); @@ -1234,8 +1232,8 @@ process_result_message (void *cls, "Received end of result set, new queue size is %u\n", h->queue_size); #endif - if (rc.iter != NULL) - rc.iter (rc.iter_cls, + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); h->retry_time.rel_value = 0; @@ -1243,13 +1241,6 @@ process_result_message (void *cls, process_queue (h); return; } - if (h->expect_end_or_disconnect > 0) - { - /* only 'END' allowed, must reconnect */ - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - return; - } qe = h->queue_head; rc = qe->qc.rc; GNUNET_assert (GNUNET_YES == qe->was_transmitted); @@ -1261,40 +1252,16 @@ process_result_message (void *cls, free_queue_entry (qe); h->retry_time = GNUNET_TIME_UNIT_ZERO; do_disconnect (h); - if (rc.iter != NULL) - rc.iter (rc.iter_cls, + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1, GNUNET_NO); - if (rc.iter == NULL) - { - h->result_count++; - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# Excess results received"), - 1, - GNUNET_NO); - if (h->result_count > MAX_EXCESS_RESULTS) - { - free_queue_entry (qe); - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# Forced database connection resets"), - 1, - GNUNET_NO); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - return; - } - if (GNUNET_YES == qe->one_shot) - free_queue_entry (qe); - else - GNUNET_DATASTORE_iterate_get_next (h); - return; - } dm = (const struct DataMessage*) msg; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1304,10 +1271,9 @@ process_result_message (void *cls, ntohl(dm->size), GNUNET_h2s(&dm->key)); #endif - if (GNUNET_YES == qe->one_shot) - free_queue_entry (qe); + free_queue_entry (qe); h->retry_time.rel_value = 0; - rc.iter (rc.iter_cls, + rc.proc (rc.proc_cls, &dm->key, ntohl(dm->size), &dm[1], @@ -1331,33 +1297,33 @@ process_result_message (void *cls, * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) * @param timeout how long to wait at most for a response - * @param iter function to call on a random value; it + * @param proc function to call on a random value; it * will be called once with a value (if available) * and always once with a value of NULL. - * @param iter_cls closure for iter + * @param proc_cls closure for proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls) + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct GNUNET_MessageHeader *m; union QueueContext qc; + GNUNET_assert (NULL != proc); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to get replication entry in %llu ms\n", (unsigned long long) timeout.rel_value); #endif - qc.rc.iter = iter; - qc.rc.iter_cls = iter_cls; + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), queue_priority, max_queue_size, timeout, &process_result_message, &qc); @@ -1369,7 +1335,6 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, #endif return NULL; } - qe->one_shot = GNUNET_YES; GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET REPLICATION requests executed"), 1, @@ -1383,43 +1348,50 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, /** - * Get a zero-anonymity value from the datastore. + * Get a single zero-anonymity value from the datastore. * * @param h handle to the datastore + * @param offset offset of the result (mod #num-results); set to + * a random 64-bit value initially; then increment by + * one each time; detect that all results have been found by uid + * being again the first uid ever returned. * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) * @param timeout how long to wait at most for a response - * @param type allowed type for the operation - * @param iter function to call on a random value; it + * @param type allowed type for the operation (never zero) + * @param proc function to call on a random value; it * will be called once with a value (if available) - * and always once with a value of NULL. - * @param iter_cls closure for iter + * or with NULL if none value exists. + * @param proc_cls closure for proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - enum GNUNET_BLOCK_Type type, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls) +GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, + uint64_t offset, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + enum GNUNET_BLOCK_Type type, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct GetZeroAnonymityMessage *m; union QueueContext qc; + GNUNET_assert (NULL != proc); GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get zero-anonymity entry in %llu ms\n", + "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", + (unsigned long long) offset, + type, (unsigned long long) timeout.rel_value); #endif - qc.rc.iter = iter; - qc.rc.iter_cls = iter_cls; + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage), queue_priority, max_queue_size, timeout, &process_result_message, &qc); @@ -1427,7 +1399,7 @@ GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, { #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for zero-anonymity iteration\n"); + "Could not create queue entry for zero-anonymity procation\n"); #endif return NULL; } @@ -1439,55 +1411,57 @@ GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); m->header.size = htons(sizeof (struct GetZeroAnonymityMessage)); m->type = htonl ((uint32_t) type); + m->offset = GNUNET_htonll (offset); process_queue (h); return qe; } - /** - * Iterate over the results for a particular key - * in the datastore. The iterator will only be called - * once initially; if the first call did contain a - * result, further results can be obtained by calling - * "GNUNET_DATASTORE_iterate_get_next" with the given argument. + * Get a result for a particular key from the datastore. The processor + * will only be called once. * * @param h handle to the datastore + * @param offset offset of the result (mod #num-results); set to + * a random 64-bit value initially; then increment by + * one each time; detect that all results have been found by uid + * being again the first uid ever returned. * @param key maybe NULL (to match all entries) * @param type desired type, 0 for any * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) * @param timeout how long to wait at most for a response - * @param iter function to call on each matching value; + * @param proc function to call on each matching value; * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param proc_cls closure for proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode * key, - enum GNUNET_BLOCK_Type type, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls) +GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, + uint64_t offset, + const GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct GetMessage *gm; union QueueContext qc; + GNUNET_assert (NULL != proc); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to look for data of type %u under key `%s'\n", (unsigned int) type, GNUNET_h2s (key)); #endif - qc.rc.iter = iter; - qc.rc.iter_cls = iter_cls; + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; qe = make_queue_entry (h, sizeof(struct GetMessage), queue_priority, max_queue_size, timeout, &process_result_message, &qc); @@ -1507,6 +1481,7 @@ GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h, gm = (struct GetMessage*) &qe[1]; gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); gm->type = htonl(type); + gm->offset = GNUNET_htonll (offset); if (key != NULL) { gm->header.size = htons(sizeof (struct GetMessage)); @@ -1521,25 +1496,6 @@ GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h, } -/** - * Function called to trigger obtaining the next result - * from the datastore. - * - * @param h handle to the datastore - */ -void -GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h) -{ - struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; - - h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, - &process_result_message, - h, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); -} - - /** * Cancel a datastore operation. The final callback from the * operation must not have been done yet. @@ -1551,6 +1507,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) { struct GNUNET_DATASTORE_Handle *h; + GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); h = qe->h; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1562,7 +1519,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) if (GNUNET_YES == qe->was_transmitted) { free_queue_entry (qe); - h->expect_end_or_disconnect++; + h->skip_next_messages++; return; } free_queue_entry (qe); diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c index 566a227c1..deab62dd0 100644 --- a/src/datastore/gnunet-service-datastore.c +++ b/src/datastore/gnunet-service-datastore.c @@ -209,18 +209,6 @@ sync_stats () - -/** - * Function called once the transmit operation has - * either failed or succeeded. - * - * @param cls closure - * @param status GNUNET_OK on success, GNUNET_SYSERR on error - */ -typedef void (*TransmitContinuation)(void *cls, - int status); - - /** * Context for transmitting replies to clients. */ @@ -252,22 +240,6 @@ struct TransmitCallbackContext */ struct GNUNET_SERVER_Client *client; - /** - * Function to call once msg has been transmitted - * (or at least added to the buffer). - */ - TransmitContinuation tc; - - /** - * Closure for tc. - */ - void *tc_cls; - - /** - * GNUNET_YES if we are supposed to signal the server - * completion of the client's request. - */ - int end; }; @@ -330,7 +302,6 @@ delete_expired (void *cls, */ static int expired_processor (void *cls, - void *next_cls, const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -396,7 +367,7 @@ delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { expired_kill_task = GNUNET_SCHEDULER_NO_TASK; - plugin->api->expiration_get (plugin->api->cls, + plugin->api->get_expiration (plugin->api->cls, &expired_processor, NULL); } @@ -424,7 +395,6 @@ delete_expired (void *cls, */ static int quota_processor (void *cls, - void *next_cls, const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -487,7 +457,7 @@ manage_space (unsigned long long need) (last != need) ) { last = need; - plugin->api->expiration_get (plugin->api->cls, + plugin->api->get_expiration (plugin->api->cls, "a_processor, &need); } @@ -521,14 +491,7 @@ transmit_callback (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Transmission to client failed!\n")); - if (tcc->tc != NULL) - tcc->tc (tcc->tc_cls, GNUNET_SYSERR); - if (GNUNET_YES == tcc->end) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Disconnecting client due to transmission failure!\n")); - GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR); - } + GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR); GNUNET_SERVER_client_drop (tcc->client); GNUNET_free (tcc->msg); GNUNET_free (tcc); @@ -536,23 +499,7 @@ transmit_callback (void *cls, } GNUNET_assert (size >= msize); memcpy (buf, tcc->msg, msize); - if (tcc->tc != NULL) - tcc->tc (tcc->tc_cls, GNUNET_OK); - if (GNUNET_YES == tcc->end) - { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Done processing client request\n"); -#endif - GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK); - } - else - { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Response transmitted, more pending!\n"); -#endif - } + GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK); GNUNET_SERVER_client_drop (tcc->client); GNUNET_free (tcc->msg); GNUNET_free (tcc); @@ -567,16 +514,10 @@ transmit_callback (void *cls, * @param msg message to transmit, will be freed! * @param tc function to call afterwards * @param tc_cls closure for tc - * @param end is this the last response (and we should - * signal the server completion accodingly after - * transmitting this message)? */ static void transmit (struct GNUNET_SERVER_Client *client, - struct GNUNET_MessageHeader *msg, - TransmitContinuation tc, - void *tc_cls, - int end) + struct GNUNET_MessageHeader *msg) { struct TransmitCallbackContext *tcc; @@ -586,17 +527,13 @@ transmit (struct GNUNET_SERVER_Client *client, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Shutdown in progress, aborting transmission.\n"); #endif + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); GNUNET_free (msg); - if (NULL != tc) - tc (tc_cls, GNUNET_SYSERR); return; } tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext)); tcc->msg = msg; tcc->client = client; - tcc->tc = tc; - tcc->tc_cls = tc_cls; - tcc->end = end; if (NULL == (tcc->th = GNUNET_SERVER_notify_transmit_ready (client, ntohs(msg->size), @@ -605,14 +542,7 @@ transmit (struct GNUNET_SERVER_Client *client, tcc))) { GNUNET_break (0); - if (GNUNET_YES == end) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Forcefully disconnecting client.\n")); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - } - if (NULL != tc) - tc (tc_cls, GNUNET_SYSERR); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); GNUNET_free (msg); GNUNET_free (tcc); return; @@ -653,33 +583,10 @@ transmit_status (struct GNUNET_SERVER_Client *client, sm->status = htonl(code); if (slen > 0) memcpy (&sm[1], msg, slen); - transmit (client, &sm->header, NULL, NULL, GNUNET_YES); + transmit (client, &sm->header); } -/** - * Function called once the transmit operation has - * either failed or succeeded. - * - * @param next_cls closure for calling "next_request" callback - * @param status GNUNET_OK on success, GNUNET_SYSERR on error - */ -static void -get_next(void *next_cls, - int status) -{ - if (status != GNUNET_OK) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Failed to transmit an item to the client; aborting iteration.\n")); - if (plugin != NULL) - plugin->api->next_request (next_cls, GNUNET_YES); - return; - } - if (next_cls != NULL) - plugin->api->next_request (next_cls, GNUNET_NO); -} - /** * Function that will transmit the given datastore entry @@ -702,7 +609,6 @@ get_next(void *next_cls, */ static int transmit_item (void *cls, - void *next_cls, const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -727,10 +633,11 @@ transmit_item (void *cls, end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader)); end->size = htons(sizeof(struct GNUNET_MessageHeader)); end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END); - transmit (client, end, NULL, NULL, GNUNET_YES); + transmit (client, end); GNUNET_SERVER_client_drop (client); return GNUNET_OK; } + GNUNET_assert (sizeof (struct DataMessage) + size < GNUNET_SERVER_MAX_MESSAGE_SIZE); dm = GNUNET_malloc (sizeof(struct DataMessage) + size); dm->header.size = htons(sizeof(struct DataMessage) + size); dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA); @@ -754,8 +661,7 @@ transmit_item (void *cls, gettext_noop ("# results found"), 1, GNUNET_NO); - transmit (client, &dm->header, &get_next, next_cls, - (next_cls != NULL) ? GNUNET_NO : GNUNET_YES); + transmit (client, &dm->header); return GNUNET_OK; } @@ -939,11 +845,6 @@ struct PutContext * Client to notify on completion. */ struct GNUNET_SERVER_Client *client; - - /** - * Did we find the data already in the database? - */ - int is_present; /* followed by the 'struct DataMessage' */ }; @@ -1009,7 +910,6 @@ execute_put (struct GNUNET_SERVER_Client *client, * matches the put and if none match executes the put. * * @param cls closure, pointer to the client (of type 'struct PutContext'). - * @param next_cls closure to use to ask for the next item * @param key key for the content * @param size number of bytes in data * @param data content stored @@ -1020,12 +920,11 @@ execute_put (struct GNUNET_SERVER_Client *client, * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available * - * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue, - * GNUNET_NO to delete the item and continue (if supported) + * @return GNUNET_OK usually + * GNUNET_NO to delete the item */ static int check_present (void *cls, - void *next_cls, const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -1041,13 +940,10 @@ check_present (void *cls, dm = (const struct DataMessage*) &pc[1]; if (key == NULL) { - if (pc->is_present == GNUNET_YES) - transmit_status (pc->client, GNUNET_NO, NULL); - else - execute_put (pc->client, dm); + execute_put (pc->client, dm); GNUNET_SERVER_client_drop (pc->client); GNUNET_free (pc); - return GNUNET_SYSERR; + return GNUNET_OK; } if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) || (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) || @@ -1056,12 +952,19 @@ check_present (void *cls, data, size)) ) ) { - pc->is_present = GNUNET_YES; - plugin->api->next_request (next_cls, GNUNET_YES); +#if DEBUG_MYSQL + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Result already present in datastore\n"); +#endif + transmit_status (pc->client, GNUNET_NO, NULL); + GNUNET_SERVER_client_drop (pc->client); + GNUNET_free (pc); } else { - plugin->api->next_request (next_cls, GNUNET_NO); + execute_put (pc->client, dm); + GNUNET_SERVER_client_drop (pc->client); + GNUNET_free (pc); } return GNUNET_OK; } @@ -1083,6 +986,7 @@ handle_put (void *cls, int rid; struct ReservationList *pos; struct PutContext *pc; + GNUNET_HashCode vhash; uint32_t size; if ( (dm == NULL) || @@ -1124,16 +1028,18 @@ handle_put (void *cls, if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key)) { + GNUNET_CRYPTO_hash (&dm[1], size, &vhash); pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage)); pc->client = client; GNUNET_SERVER_client_keep (client); memcpy (&pc[1], dm, size + sizeof (struct DataMessage)); - plugin->api->get (plugin->api->cls, - &dm->key, - NULL, - ntohl (dm->type), - &check_present, - pc); + plugin->api->get_key (plugin->api->cls, + 0, + &dm->key, + &vhash, + ntohl (dm->type), + &check_present, + pc); return; } execute_put (client, dm); @@ -1192,16 +1098,17 @@ handle_get (void *cls, 1, GNUNET_NO); transmit_item (client, - NULL, NULL, 0, NULL, 0, 0, 0, + NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } - plugin->api->get (plugin->api->cls, - ((size == sizeof(struct GetMessage)) ? &msg->key : NULL), - NULL, - ntohl(msg->type), - &transmit_item, - client); + plugin->api->get_key (plugin->api->cls, + GNUNET_ntohll (msg->offset), + ((size == sizeof(struct GetMessage)) ? &msg->key : NULL), + NULL, + ntohl(msg->type), + &transmit_item, + client); } @@ -1265,7 +1172,7 @@ handle_get_replication (void *cls, 1, GNUNET_NO); GNUNET_SERVER_client_keep (client); - plugin->api->replication_get (plugin->api->cls, + plugin->api->get_replication (plugin->api->cls, &transmit_item, client); } @@ -1303,37 +1210,20 @@ handle_get_zero_anonymity (void *cls, 1, GNUNET_NO); GNUNET_SERVER_client_keep (client); - plugin->api->iter_zero_anonymity (plugin->api->cls, - type, - &transmit_item, - client); + plugin->api->get_zero_anonymity (plugin->api->cls, + GNUNET_ntohll (msg->offset), + type, + &transmit_item, + client); } -/** - * Context for the 'remove_callback'. - */ -struct RemoveContext -{ - /** - * Client for whom we're doing the remvoing. - */ - struct GNUNET_SERVER_Client *client; - - /** - * GNUNET_YES if we managed to remove something. - */ - int found; -}; - - /** * Callback function that will cause the item that is passed * in to be deleted (by returning GNUNET_NO). */ static int remove_callback (void *cls, - void *next_cls, const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -1343,7 +1233,7 @@ remove_callback (void *cls, struct GNUNET_TIME_Absolute expiration, uint64_t uid) { - struct RemoveContext *rc = cls; + struct GNUNET_SERVER_Client *client = cls; if (key == NULL) { @@ -1352,15 +1242,10 @@ remove_callback (void *cls, "No further matches for `%s' request.\n", "REMOVE"); #endif - if (GNUNET_YES == rc->found) - transmit_status (rc->client, GNUNET_OK, NULL); - else - transmit_status (rc->client, GNUNET_NO, _("Content not found")); - GNUNET_SERVER_client_drop (rc->client); - GNUNET_free (rc); + transmit_status (client, GNUNET_NO, _("Content not found")); + GNUNET_SERVER_client_drop (client); return GNUNET_OK; /* last item */ } - rc->found = GNUNET_YES; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Item %llu matches `%s' request for key `%s' and type %u.\n", @@ -1375,7 +1260,8 @@ remove_callback (void *cls, GNUNET_YES); GNUNET_CONTAINER_bloomfilter_remove (filter, key); - plugin->api->next_request (next_cls, GNUNET_YES); + transmit_status (client, GNUNET_OK, NULL); + GNUNET_SERVER_client_drop (client); return GNUNET_NO; } @@ -1394,7 +1280,6 @@ handle_remove (void *cls, { const struct DataMessage *dm = check_data (message); GNUNET_HashCode vhash; - struct RemoveContext *rc; if (dm == NULL) { @@ -1413,18 +1298,17 @@ handle_remove (void *cls, gettext_noop ("# REMOVE requests received"), 1, GNUNET_NO); - rc = GNUNET_malloc (sizeof(struct RemoveContext)); GNUNET_SERVER_client_keep (client); - rc->client = client; GNUNET_CRYPTO_hash (&dm[1], ntohl(dm->size), &vhash); - plugin->api->get (plugin->api->cls, - &dm->key, - &vhash, - (enum GNUNET_BLOCK_Type) ntohl(dm->type), - &remove_callback, - rc); + plugin->api->get_key (plugin->api->cls, + 0, + &dm->key, + &vhash, + (enum GNUNET_BLOCK_Type) ntohl(dm->type), + &remove_callback, + client); } @@ -1469,7 +1353,7 @@ disk_utilization_change_cb (void *cls, _("Datastore payload inaccurate (%lld < %lld). Trying to fix.\n"), (long long) payload, (long long) -delta); - payload = plugin->api->get_size (plugin->api->cls); + payload = plugin->api->estimate_size (plugin->api->cls); sync_stats (); return; } @@ -1518,7 +1402,7 @@ process_stat_done (void *cls, stat_get = NULL; if (stats_worked == GNUNET_NO) - payload = plugin->api->get_size (plugin->api->cls); + payload = plugin->api->estimate_size (plugin->api->cls); } @@ -1636,8 +1520,6 @@ cleaning_task (void *cls, GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th); GNUNET_SERVER_client_drop (tcc->client); } - if (NULL != tcc->tc) - tcc->tc (tcc->tc_cls, GNUNET_SYSERR); GNUNET_free (tcc->msg); GNUNET_free (tcc); } diff --git a/src/datastore/perf_datastore_api.c b/src/datastore/perf_datastore_api.c index 6ea65c68d..2f0eb0de9 100644 --- a/src/datastore/perf_datastore_api.c +++ b/src/datastore/perf_datastore_api.c @@ -385,6 +385,7 @@ check () GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, argv, "perf-datastore-api", "nohelp", options, &run, NULL); + sleep (1); /* give datastore chance to process 'DROP' */ if (0 != GNUNET_OS_process_kill (proc, SIGTERM)) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); diff --git a/src/datastore/perf_plugin_datastore.c b/src/datastore/perf_plugin_datastore.c index 2903a8f28..6befa120c 100644 --- a/src/datastore/perf_plugin_datastore.c +++ b/src/datastore/perf_plugin_datastore.c @@ -37,7 +37,7 @@ * those take too long to run them in the usual "make check" * sequence. Hence the value used for shipping is tiny. */ -#define MAX_SIZE 1024LL * 1024 * 128 +#define MAX_SIZE 1024LL * 1024 * 32 #define ITERATIONS 2 @@ -81,6 +81,7 @@ struct CpsRunContext enum RunPhase phase; unsigned int cnt; unsigned int iter; + uint64_t offset; }; @@ -100,7 +101,8 @@ disk_utilization_change_cb (void *cls, static void -putValue (struct GNUNET_DATASTORE_PluginFunctions * api, int i, int k) +putValue (struct GNUNET_DATASTORE_PluginFunctions * api, + int i, int k) { char value[65536]; size_t size; @@ -156,7 +158,6 @@ test (void *cls, static int iterate_zeros (void *cls, - void *next_cls, const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -171,7 +172,18 @@ iterate_zeros (void *cls, int i; const char *cdata = data; - if (key == NULL) + GNUNET_assert (key != NULL); + GNUNET_assert (size >= 8); + memcpy (&i, &cdata[4], sizeof (i)); + hits[i/8] |= (1 << (i % 8)); + +#if VERBOSE + fprintf (stderr, "Found result type=%u, priority=%u, size=%u, expire=%llu\n", + type, priority, size, + (unsigned long long) expiration.abs_value); +#endif + crc->cnt++; + if (crc->cnt == PUT_10 / 4 - 1) { char buf[256]; unsigned int bc; @@ -192,42 +204,17 @@ iterate_zeros (void *cls, crc->cnt); GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms"); memset (hits, 0, sizeof (hits)); - if ( (int) (PUT_10 / 4 - crc->cnt) > 2) - { - fprintf (stderr, - "Got %d items, expected %d\n", - (int) crc->cnt, (int) PUT_10 / 4); - GNUNET_break (0); - crc->phase = RP_ERROR; - } - else - { - crc->phase++; - crc->cnt = 0; - crc->start = GNUNET_TIME_absolute_get (); - } - GNUNET_SCHEDULER_add_now (&test, crc); - return GNUNET_OK; + crc->phase++; + crc->cnt = 0; + crc->start = GNUNET_TIME_absolute_get (); } - GNUNET_assert (size >= 8); - memcpy (&i, &cdata[4], sizeof (i)); - hits[i/8] |= (1 << (i % 8)); - -#if VERBOSE - fprintf (stderr, "Found result type=%u, priority=%u, size=%u, expire=%llu\n", - type, priority, size, - (unsigned long long) expiration.abs_value); -#endif - crc->cnt++; - crc->api->next_request (next_cls, - GNUNET_NO); + GNUNET_SCHEDULER_add_now (&test, crc); return GNUNET_OK; } static int expiration_get (void *cls, - void *next_cls, const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -281,7 +268,6 @@ expiration_get (void *cls, static int replication_get (void *cls, - void *next_cls, const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -323,6 +309,7 @@ replication_get (void *cls, GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms"); memset (hits, 0, sizeof (hits)); crc->phase++; + crc->offset = 0; crc->cnt = 0; crc->start = GNUNET_TIME_absolute_get (); } @@ -386,7 +373,15 @@ test (void *cls, int j; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - crc->phase = RP_ERROR; + { + GNUNET_break (0); + crc->phase = RP_ERROR; + } +#if VERBOSE + fprintf (stderr, "In phase %d, iteration %u\n", + crc->phase, + crc->cnt); +#endif switch (crc->phase) { case RP_ERROR: @@ -419,17 +414,19 @@ test (void *cls, GNUNET_SCHEDULER_add_now (&test, crc); break; case RP_REP_GET: - crc->api->replication_get (crc->api->cls, + crc->api->get_replication (crc->api->cls, &replication_get, crc); break; case RP_ZA_GET: - crc->api->iter_zero_anonymity (crc->api->cls, 1, - &iterate_zeros, - crc); + crc->api->get_zero_anonymity (crc->api->cls, + crc->offset++, + 1, + &iterate_zeros, + crc); break; case RP_EXP_GET: - crc->api->expiration_get (crc->api->cls, + crc->api->get_expiration (crc->api->cls, &expiration_get, crc); break; @@ -549,7 +546,6 @@ main (int argc, char *argv[]) char *pos; char dir_name[128]; - if (1) return 0; /* determine name of plugin to use */ plugin_name = argv[0]; while (NULL != (pos = strstr(plugin_name, "_"))) diff --git a/src/datastore/perf_plugin_datastore_data_postgres.conf b/src/datastore/perf_plugin_datastore_data_postgres.conf index c2a181bc7..b7cf174e9 100644 --- a/src/datastore/perf_plugin_datastore_data_postgres.conf +++ b/src/datastore/perf_plugin_datastore_data_postgres.conf @@ -20,7 +20,7 @@ DATABASE = postgres # REJECT_FROM = # REJECT_FROM6 = # PREFIX = - +# DEBUG = YES [dht] AUTOSTART = NO diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c index c3d9212d3..2eefd9b04 100644 --- a/src/datastore/plugin_datastore_mysql.c +++ b/src/datastore/plugin_datastore_mysql.c @@ -160,58 +160,6 @@ struct GNUNET_MysqlStatementHandle }; -/** - * Context for the universal iterator. - */ -struct NextRequestClosure; - -/** - * Type of a function that will prepare - * the next iteration. - * - * @param cls closure - * @param nc the next context; NULL for the last - * call which gives the callback a chance to - * clean up the closure - * @return GNUNET_OK on success, GNUNET_NO if there are - * no more values, GNUNET_SYSERR on error - */ -typedef int (*PrepareFunction)(void *cls, - struct NextRequestClosure *nc); - - -struct NextRequestClosure -{ - struct Plugin *plugin; - - struct GNUNET_TIME_Absolute now; - - /** - * Function to call to prepare the next - * iteration. - */ - PrepareFunction prep; - - /** - * Closure for prep. - */ - void *prep_cls; - - MYSQL_BIND rbind[7]; - - enum GNUNET_BLOCK_Type type; - - PluginIterator dviter; - - void *dviter_cls; - - unsigned int count; - - int end_it; - - int one_shot; -}; - /** * Context for all functions in this plugin. @@ -243,16 +191,6 @@ struct Plugin */ char *cnffile; - /** - * Closure of the 'next_task' (must be freed if 'next_task' is cancelled). - */ - struct NextRequestClosure *next_task_nc; - - /** - * Pending task with scheduler for running the next request. - */ - GNUNET_SCHEDULER_TaskIdentifier next_task; - /** * Prepared statements. */ @@ -295,7 +233,7 @@ struct Plugin #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090" struct GNUNET_MysqlStatementHandle *get_size; -#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 ORDER BY uid DESC LIMIT 1 OFFSET ?" +#define SELECT_IT_NON_ANONYMOUS "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 AND type=? ORDER BY uid DESC LIMIT 1 OFFSET ?" struct GNUNET_MysqlStatementHandle *zero_iter; #define SELECT_IT_EXPIRATION "(SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY prio ASC LIMIT 1) "\ @@ -372,7 +310,6 @@ get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg) } - /** * Free a prepared statement. * @@ -381,8 +318,7 @@ get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg) */ static void prepared_statement_destroy (struct Plugin *plugin, - struct GNUNET_MysqlStatementHandle - *s) + struct GNUNET_MysqlStatementHandle *s) { GNUNET_CONTAINER_DLL_remove (plugin->shead, plugin->stail, @@ -397,6 +333,8 @@ prepared_statement_destroy (struct Plugin *plugin, /** * Close database connection and all prepared statements (we got a DB * disconnect error). + * + * @param plugin plugin context */ static int iclose (struct Plugin *plugin) @@ -420,10 +358,11 @@ iclose (struct Plugin *plugin) * Open the connection with the database (and initialize * our default options). * + * @param plugin plugin context * @return GNUNET_OK on success */ static int -iopen (struct Plugin *ret) +iopen (struct Plugin *plugin) { char *mysql_dbname; char *mysql_server; @@ -433,67 +372,67 @@ iopen (struct Plugin *ret) my_bool reconnect; unsigned int timeout; - ret->dbf = mysql_init (NULL); - if (ret->dbf == NULL) + plugin->dbf = mysql_init (NULL); + if (plugin->dbf == NULL) return GNUNET_SYSERR; - if (ret->cnffile != NULL) - mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile); - mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client"); + if (plugin->cnffile != NULL) + mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_FILE, plugin->cnffile); + mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_GROUP, "client"); reconnect = 0; - mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect); - mysql_options (ret->dbf, + mysql_options (plugin->dbf, MYSQL_OPT_RECONNECT, &reconnect); + mysql_options (plugin->dbf, MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout); - mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8"); + mysql_options(plugin->dbf, MYSQL_SET_CHARSET_NAME, "UTF8"); timeout = 60; /* in seconds */ - mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout); - mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout); + mysql_options (plugin->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout); + mysql_options (plugin->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout); mysql_dbname = NULL; - if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, + if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg, "datastore-mysql", "DATABASE")) GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, + GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg, "datastore-mysql", "DATABASE", &mysql_dbname)); else mysql_dbname = GNUNET_strdup ("gnunet"); mysql_user = NULL; - if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, + if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg, "datastore-mysql", "USER")) { GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, + GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg, "datastore-mysql", "USER", &mysql_user)); } mysql_password = NULL; - if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, + if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg, "datastore-mysql", "PASSWORD")) { GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, + GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg, "datastore-mysql", "PASSWORD", &mysql_password)); } mysql_server = NULL; - if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, + if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg, "datastore-mysql", "HOST")) { GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, + GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg, "datastore-mysql", "HOST", &mysql_server)); } mysql_port = 0; - if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, + if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg, "datastore-mysql", "PORT")) { GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, "datastore-mysql", + GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg, "datastore-mysql", "PORT", &mysql_port)); } GNUNET_assert (mysql_dbname != NULL); - mysql_real_connect (ret->dbf, + mysql_real_connect (plugin->dbf, mysql_server, mysql_user, mysql_password, mysql_dbname, @@ -503,10 +442,10 @@ iopen (struct Plugin *ret) GNUNET_free_non_null (mysql_user); GNUNET_free_non_null (mysql_password); GNUNET_free (mysql_dbname); - if (mysql_error (ret->dbf)[0]) + if (mysql_error (plugin->dbf)[0]) { LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, - "mysql_real_connect", ret); + "mysql_real_connect", plugin); return GNUNET_SYSERR; } return GNUNET_OK; @@ -686,19 +625,6 @@ init_params (struct Plugin *plugin, return GNUNET_OK; } -/** - * Type of a callback that will be called for each - * data set returned from MySQL. - * - * @param cls user-defined argument - * @param num_values number of elements in values - * @param values values returned by MySQL - * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort - */ -typedef int (*GNUNET_MysqlDataProcessor) (void *cls, - unsigned int num_values, - MYSQL_BIND *values); - /** * Run a prepared SELECT statement. @@ -708,40 +634,31 @@ typedef int (*GNUNET_MysqlDataProcessor) (void *cls, * @param result_size number of elements in results array * @param results pointer to already initialized MYSQL_BIND * array (of sufficient size) for passing results - * @param processor function to call on each result - * @param processor_cls extra argument to processor - * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective + * @param ap pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective * values (size + buffer-reference for pointers); terminated * with "-1" - * @return GNUNET_SYSERR on error, otherwise - * the number of successfully affected (or queried) rows + * @return GNUNET_SYSERR on error, otherwise GNUNET_OK or GNUNET_NO (no result) */ static int -prepared_statement_run_select (struct Plugin *plugin, - struct GNUNET_MysqlStatementHandle *s, - unsigned int result_size, - MYSQL_BIND *results, - GNUNET_MysqlDataProcessor processor, void *processor_cls, - ...) +prepared_statement_run_select_va (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle *s, + unsigned int result_size, + MYSQL_BIND *results, + va_list ap) { - va_list ap; int ret; unsigned int rsize; - int total; if (GNUNET_OK != prepare_statement (plugin, s)) { GNUNET_break (0); return GNUNET_SYSERR; } - va_start (ap, processor_cls); if (GNUNET_OK != init_params (plugin, s, ap)) { GNUNET_break (0); - va_end (ap); return GNUNET_SYSERR; } - va_end (ap); rsize = mysql_stmt_field_count (s->statement); if (rsize > result_size) { @@ -757,29 +674,53 @@ prepared_statement_run_select (struct Plugin *plugin, iclose (plugin); return GNUNET_SYSERR; } - - total = 0; - while (1) + ret = mysql_stmt_fetch (s->statement); + if (ret == MYSQL_NO_DATA) + return GNUNET_NO; + if (ret != 0) { - ret = mysql_stmt_fetch (s->statement); - if (ret == MYSQL_NO_DATA) - break; - if (ret != 0) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("`%s' failed at %s:%d with error: %s\n"), - "mysql_stmt_fetch", - __FILE__, __LINE__, mysql_stmt_error (s->statement)); - iclose (plugin); - return GNUNET_SYSERR; - } - if (processor != NULL) - if (GNUNET_OK != processor (processor_cls, rsize, results)) - break; - total++; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("`%s' failed at %s:%d with error: %s\n"), + "mysql_stmt_fetch", + __FILE__, __LINE__, mysql_stmt_error (s->statement)); + iclose (plugin); + return GNUNET_SYSERR; } mysql_stmt_reset (s->statement); - return total; + return GNUNET_OK; +} + + +/** + * Run a prepared SELECT statement. + * + * @param plugin plugin context + * @param s statement to run + * @param result_size number of elements in results array + * @param results pointer to already initialized MYSQL_BIND + * array (of sufficient size) for passing results + * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective + * values (size + buffer-reference for pointers); terminated + * with "-1" + * @return GNUNET_SYSERR on error, otherwise + * the number of successfully affected (or queried) rows + */ +static int +prepared_statement_run_select (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle *s, + unsigned int result_size, + MYSQL_BIND *results, + ...) +{ + va_list ap; + int ret; + + va_start (ap, results); + ret = prepared_statement_run_select_va (plugin, s, + result_size, results, + ap); + va_end (ap); + return ret; } @@ -853,23 +794,6 @@ do_delete_entry (struct Plugin *plugin, } -/** - * Function that simply returns GNUNET_OK - * - * @param cls closure, not used - * @param num_values not used - * @param values not used - * @return GNUNET_OK - */ -static int -return_ok (void *cls, - unsigned int num_values, - MYSQL_BIND *values) -{ - return GNUNET_OK; -} - - /** * Get an estimate of how much space the database is * currently using. @@ -878,7 +802,7 @@ return_ok (void *cls, * @return number of bytes used on disk */ static unsigned long long -mysql_plugin_get_size (void *cls) +mysql_plugin_estimate_size (void *cls) { struct Plugin *plugin = cls; MYSQL_BIND cbind[1]; @@ -893,7 +817,6 @@ mysql_plugin_get_size (void *cls) prepared_statement_run_select (plugin, plugin->get_size, 1, cbind, - &return_ok, NULL, -1)) return 0; return total; @@ -929,7 +852,6 @@ mysql_plugin_put (void *cls, { struct Plugin *plugin = cls; unsigned int irepl = replication; - unsigned int itype = type; unsigned int ipriority = priority; unsigned int ianonymity = anonymity; unsigned long long lexpiration = expiration.abs_value; @@ -952,7 +874,7 @@ mysql_plugin_put (void *cls, plugin->insert_entry, NULL, MYSQL_TYPE_LONG, &irepl, GNUNET_YES, - MYSQL_TYPE_LONG, &itype, GNUNET_YES, + MYSQL_TYPE_LONG, &type, GNUNET_YES, MYSQL_TYPE_LONG, &ipriority, GNUNET_YES, MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES, MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES, @@ -1034,20 +956,23 @@ mysql_plugin_update (void *cls, } - - /** - * Continuation of "mysql_next_request". + * Run the given select statement and call 'proc' on the resulting + * values (which must be in particular positions). * - * @param next_cls the next context - * @param tc the task context (unused) + * @param plugin the plugin handle + * @param stmt select statement to run + * @param proc function to call on result + * @param proc_cls closure for proc + * @param ... arguments to initialize stmt */ static void -mysql_next_request_cont (void *next_cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +execute_select (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle *stmt, + PluginDatumProcessor proc, void *proc_cls, + ...) { - struct NextRequestClosure *nrc = next_cls; - struct Plugin *plugin; + va_list ap; int ret; unsigned int type; unsigned int priority; @@ -1059,19 +984,10 @@ mysql_next_request_cont (void *next_cls, char value[GNUNET_DATASTORE_MAX_VALUE_SIZE]; GNUNET_HashCode key; struct GNUNET_TIME_Absolute expiration; - MYSQL_BIND *rbind = nrc->rbind; - - plugin = nrc->plugin; - plugin->next_task = GNUNET_SCHEDULER_NO_TASK; - plugin->next_task_nc = NULL; + MYSQL_BIND rbind[7]; - if (GNUNET_YES == nrc->end_it) - goto END_SET; - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - nrc->now = GNUNET_TIME_absolute_get (); hashSize = sizeof (GNUNET_HashCode); - memset (nrc->rbind, 0, sizeof (nrc->rbind)); - rbind = nrc->rbind; + memset (rbind, 0, sizeof (rbind)); rbind[0].buffer_type = MYSQL_TYPE_LONG; rbind[0].buffer = &type; rbind[0].is_unsigned = 1; @@ -1096,16 +1012,28 @@ mysql_next_request_cont (void *next_cls, rbind[6].buffer = &uid; rbind[6].is_unsigned = 1; - if (GNUNET_OK != nrc->prep (nrc->prep_cls, - nrc)) - goto END_SET; - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + va_start (ap, proc_cls); + ret = prepared_statement_run_select_va (plugin, + stmt, + 7, rbind, + ap); + va_end (ap); + if (ret <= 0) + { + proc (proc_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + return; + } GNUNET_assert (size <= sizeof(value)); if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) || (hashSize != sizeof (GNUNET_HashCode)) ) { GNUNET_break (0); - goto END_SET; + proc (proc_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + return; } #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1116,18 +1044,13 @@ mysql_next_request_cont (void *next_cls, anonymity, exp); #endif + GNUNET_assert (size < MAX_DATUM_SIZE); expiration.abs_value = exp; - ret = nrc->dviter (nrc->dviter_cls, - (nrc->one_shot == GNUNET_YES) ? NULL : nrc, - &key, - size, value, - type, priority, anonymity, expiration, - uid); - if (ret == GNUNET_SYSERR) - { - nrc->end_it = GNUNET_YES; - return; - } + ret = proc (proc_cls, + &key, + size, value, + type, priority, anonymity, expiration, + uid); if (ret == GNUNET_NO) { do_delete_entry (plugin, uid); @@ -1135,189 +1058,50 @@ mysql_next_request_cont (void *next_cls, plugin->env->duc (plugin->env->cls, - size); } - if (nrc->one_shot == GNUNET_YES) - GNUNET_free (nrc); - return; - END_SET: - /* call dviter with "end of set" */ - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - nrc->dviter (nrc->dviter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - nrc->prep (nrc->prep_cls, NULL); - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - GNUNET_free (nrc); } -/** - * Function invoked on behalf of a "PluginIterator" - * asking the database plugin to call the iterator - * with the next item. - * - * @param next_cls whatever argument was given - * to the PluginIterator as "next_cls". - * @param end_it set to GNUNET_YES if we - * should terminate the iteration early - * (iterator should be still called once more - * to signal the end of the iteration). - */ -static void -mysql_plugin_next_request (void *next_cls, - int end_it) -{ - struct NextRequestClosure *nrc = next_cls; - - if (GNUNET_YES == end_it) - nrc->end_it = GNUNET_YES; - nrc->plugin->next_task_nc = nrc; - nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont, - nrc); -} - /** - * Context for 'get_statement_prepare'. - */ -struct GetContext -{ - GNUNET_HashCode key; - GNUNET_HashCode vhash; - - unsigned int prio; - unsigned int anonymity; - unsigned long long expiration; - unsigned long long vkey; - unsigned long long total; - unsigned int off; - unsigned int count; - int have_vhash; -}; - - -static int -get_statement_prepare (void *cls, - struct NextRequestClosure *nrc) -{ - struct GetContext *gc = cls; - struct Plugin *plugin; - int ret; - unsigned long hashSize; - - if (NULL == nrc) - { - GNUNET_free (gc); - return GNUNET_NO; - } - if (gc->count == gc->total) - return GNUNET_NO; - plugin = nrc->plugin; - hashSize = sizeof (GNUNET_HashCode); - if (++gc->off >= gc->total) - gc->off = 0; -#if DEBUG_MYSQL - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Obtaining result number %d/%lld at offset %u for GET `%s'\n", - gc->count+1, - gc->total, - gc->off, - GNUNET_h2s (&gc->key)); -#endif - if (nrc->type != 0) - { - if (gc->have_vhash) - { - ret = prepared_statement_run_select (plugin, - plugin->select_entry_by_hash_vhash_and_type, - 7, nrc->rbind, - &return_ok, NULL, - MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, - MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, - MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, - MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, - -1); - } - else - { - ret = - prepared_statement_run_select (plugin, - plugin->select_entry_by_hash_and_type, - 7, nrc->rbind, - &return_ok, NULL, - MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, - MYSQL_TYPE_LONG, &nrc->type, GNUNET_YES, - MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, - -1); - } - } - else - { - if (gc->have_vhash) - { - ret = - prepared_statement_run_select (plugin, - plugin->select_entry_by_hash_and_vhash, - 7, nrc->rbind, - &return_ok, NULL, - MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, - MYSQL_TYPE_BLOB, &gc->vhash, hashSize, &hashSize, - MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, - -1); - } - else - { - ret = - prepared_statement_run_select (plugin, - plugin->select_entry_by_hash, - 7, nrc->rbind, - &return_ok, NULL, - MYSQL_TYPE_BLOB, &gc->key, hashSize, &hashSize, - MYSQL_TYPE_LONG, &gc->off, GNUNET_YES, - -1); - } - } - gc->count++; - return ret; -} - - -/** - * Iterate over the results for a particular key in the datastore. + * Get one of the results for a particular key in the datastore. * * @param cls closure - * @param key maybe NULL (to match all entries) + * @param offset offset of the result (mod #num-results); + * specific ordering does not matter for the offset + * @param key key to match, never NULL * @param vhash hash of the value, maybe NULL (to * match all values that have the right key). * Note that for DBlocks there is no difference * betwen key and vhash, but for other blocks * there may be! * @param type entries of which type are relevant? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * Use 0 for any type. + * @param proc function to call on each matching value; however, + * after the first call to "proc", the plugin must wait + * until "NextRequest" was called before giving the processor + * the next item; finally, the "proc" should be called once + * once with a NULL value at the end ("next_cls" should be NULL + * for that last call) + * @param proc_cls closure for proc */ static void -mysql_plugin_get (void *cls, - const GNUNET_HashCode *key, - const GNUNET_HashCode *vhash, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, void *iter_cls) +mysql_plugin_get_key (void *cls, + uint64_t offset, + const GNUNET_HashCode *key, + const GNUNET_HashCode *vhash, + enum GNUNET_BLOCK_Type type, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; - unsigned int itype = type; int ret; MYSQL_BIND cbind[1]; - struct GetContext *gc; - struct NextRequestClosure *nrc; long long total; unsigned long hashSize; unsigned long hashSize2; + unsigned long long off; GNUNET_assert (key != NULL); - if (iter == NULL) - return; + GNUNET_assert (NULL != proc); hashSize = sizeof (GNUNET_HashCode); hashSize2 = sizeof (GNUNET_HashCode); memset (cbind, 0, sizeof (cbind)); @@ -1333,10 +1117,9 @@ mysql_plugin_get (void *cls, prepared_statement_run_select (plugin, plugin->count_entry_by_hash_vhash_and_type, 1, cbind, - &return_ok, NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, - MYSQL_TYPE_LONG, &itype, GNUNET_YES, + MYSQL_TYPE_LONG, &type, GNUNET_YES, -1); } else @@ -1345,9 +1128,8 @@ mysql_plugin_get (void *cls, prepared_statement_run_select (plugin, plugin->count_entry_by_hash_and_type, 1, cbind, - &return_ok, NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize, - MYSQL_TYPE_LONG, &itype, GNUNET_YES, + MYSQL_TYPE_LONG, &type, GNUNET_YES, -1); } } @@ -1359,7 +1141,6 @@ mysql_plugin_get (void *cls, prepared_statement_run_select (plugin, plugin->count_entry_by_hash_and_vhash, 1, cbind, - &return_ok, NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize, MYSQL_TYPE_BLOB, vhash, hashSize2, &hashSize2, -1); @@ -1371,79 +1152,81 @@ mysql_plugin_get (void *cls, prepared_statement_run_select (plugin, plugin->count_entry_by_hash, 1, cbind, - &return_ok, NULL, MYSQL_TYPE_BLOB, key, hashSize, &hashSize, -1); } } if ((ret != GNUNET_OK) || (0 >= total)) { - iter (iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, + proc (proc_cls, + NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } + offset = offset % total; + off = (unsigned long long) offset; #if DEBUG_MYSQL GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Iterating over %lld results for GET `%s'\n", + "Obtaining %llu/%lld result for GET `%s'\n", + off, total, GNUNET_h2s (key)); #endif - gc = GNUNET_malloc (sizeof (struct GetContext)); - gc->key = *key; - if (vhash != NULL) + + if (type != GNUNET_BLOCK_TYPE_ANY) { - gc->have_vhash = GNUNET_YES; - gc->vhash = *vhash; + if (NULL != vhash) + { + execute_select (plugin, + plugin->select_entry_by_hash_vhash_and_type, + proc, proc_cls, + MYSQL_TYPE_BLOB, key, hashSize, &hashSize, + MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, + MYSQL_TYPE_LONG, &type, GNUNET_YES, + MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, + -1); + } + else + { + execute_select (plugin, + plugin->select_entry_by_hash_and_type, + proc, proc_cls, + MYSQL_TYPE_BLOB, key, hashSize, &hashSize, + MYSQL_TYPE_LONG, &type, GNUNET_YES, + MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, + -1); + } + } + else + { + if (NULL != vhash) + { + execute_select (plugin, + plugin->select_entry_by_hash_and_vhash, + proc, proc_cls, + MYSQL_TYPE_BLOB, key, hashSize, &hashSize, + MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, + MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, + -1); + } + else + { + execute_select (plugin, + plugin->select_entry_by_hash, + proc, proc_cls, + MYSQL_TYPE_BLOB, key, hashSize, &hashSize, + MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, + -1); + } } - gc->total = total; - gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total); - - - nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); - nrc->plugin = plugin; - nrc->type = type; - nrc->dviter = iter; - nrc->dviter_cls = iter_cls; - nrc->prep = &get_statement_prepare; - nrc->prep_cls = gc; - mysql_plugin_next_request (nrc, GNUNET_NO); -} - - -/** - * Run the prepared statement to get the next data item ready. - * - * @param cls not used - * @param nrc closure for the next request iterator - * @return GNUNET_OK on success, GNUNET_NO if there is no additional item - */ -static int -iterator_zero_prepare (void *cls, - struct NextRequestClosure *nrc) -{ - struct Plugin *plugin; - int ret; - - if (nrc == NULL) - return GNUNET_NO; - plugin = nrc->plugin; - ret = prepared_statement_run_select (plugin, - plugin->zero_iter, - 7, nrc->rbind, - &return_ok, NULL, - MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES, - -1); - nrc->count++; - return ret; } /** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. + * Get a zero-anonymity datum from the datastore. * * @param cls our "struct Plugin*" + * @param offset offset of the result * @param type entries of which type should be considered? * Use 0 for any type. * @param iter function to call on each matching value; @@ -1451,47 +1234,27 @@ iterator_zero_prepare (void *cls, * @param iter_cls closure for iter */ static void -mysql_plugin_iter_zero_anonymity (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) +mysql_plugin_get_zero_anonymity (void *cls, + uint64_t offset, + enum GNUNET_BLOCK_Type type, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure *nrc; - - nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); - nrc->plugin = plugin; - nrc->type = type; - nrc->dviter = iter; - nrc->dviter_cls = iter_cls; - nrc->prep = &iterator_zero_prepare; - mysql_plugin_next_request (nrc, GNUNET_NO); -} + unsigned long long off; + off = (unsigned long long) offset; + execute_select (plugin, + plugin->zero_iter, + proc, proc_cls, + MYSQL_TYPE_LONG, &type, GNUNET_YES, + MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, + -1); -/** - * Run the SELECT statement for the replication function. - * - * @param cls the 'struct Plugin' - * @param nrc the context (not used) - */ -static int -replication_prepare (void *cls, - struct NextRequestClosure *nrc) -{ - struct Plugin *plugin = cls; - - return prepared_statement_run_select (plugin, - plugin->select_replication, - 7, nrc->rbind, - &return_ok, NULL, - -1); } - /** - * Context for 'repl_iter' function. + * Context for 'repl_proc' function. */ struct ReplCtx { @@ -1504,22 +1267,21 @@ struct ReplCtx /** * Function to call for the result (or the NULL). */ - PluginIterator iter; + PluginDatumProcessor proc; /** - * Closure for iter. + * Closure for proc. */ - void *iter_cls; + void *proc_cls; }; /** - * Wrapper for the iterator for 'sqlite_plugin_replication_get'. + * Wrapper for the processor for 'mysql_plugin_get_replication'. * Decrements the replication counter and calls the original * iterator. * * @param cls closure - * @param next_cls closure to pass to the "next" function. * @param key key for the content * @param size number of bytes in data * @param data content stored @@ -1535,8 +1297,7 @@ struct ReplCtx * GNUNET_NO to delete the item and continue (if supported) */ static int -repl_iter (void *cls, - void *next_cls, +repl_proc (void *cls, const GNUNET_HashCode *key, uint32_t size, const void *data, @@ -1552,8 +1313,8 @@ repl_iter (void *cls, int ret; int iret; - ret = rc->iter (rc->iter_cls, - next_cls, key, + ret = rc->proc (rc->proc_cls, + key, size, data, type, priority, anonymity, expiration, uid); @@ -1561,10 +1322,10 @@ repl_iter (void *cls, { oid = (unsigned long long) uid; iret = prepared_statement_run (plugin, - plugin->dec_repl, - NULL, - MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, - -1); + plugin->dec_repl, + NULL, + MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, + -1); if (iret == GNUNET_SYSERR) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -1577,94 +1338,56 @@ repl_iter (void *cls, /** - * Get a random item for replication. Returns a single, not expired, random item - * from those with the highest replication counters. The item's - * replication counter is decremented by one IF it was positive before. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Get a random item for replication. Returns a single, not expired, + * random item from those with the highest replication counters. The + * item's replication counter is decremented by one IF it was positive + * before. Call 'proc' with all values ZERO or NULL if the datastore + * is empty. * * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param proc function to call the value (once only). + * @param iter_cls closure for proc */ static void -mysql_plugin_replication_get (void *cls, - PluginIterator iter, void *iter_cls) +mysql_plugin_get_replication (void *cls, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure *nrc; struct ReplCtx rc; rc.plugin = plugin; - rc.iter = iter; - rc.iter_cls = iter_cls; - nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); - nrc->plugin = plugin; - nrc->now = GNUNET_TIME_absolute_get (); - nrc->prep = &replication_prepare; - nrc->prep_cls = plugin; - nrc->type = 0; - nrc->dviter = &repl_iter; - nrc->dviter_cls = &rc; - nrc->end_it = GNUNET_NO; - nrc->one_shot = GNUNET_YES; - mysql_next_request_cont (nrc, NULL); -} - + rc.proc = proc; + rc.proc_cls = proc_cls; + execute_select (plugin, + plugin->select_replication, + &repl_proc, &rc, + -1); -/** - * Run the SELECT statement for the expiration function. - * - * @param cls the 'struct Plugin' - * @param nrc the context (not used) - * @return GNUNET_OK on success, GNUNET_NO if there are - * no more values, GNUNET_SYSERR on error - */ -static int -expiration_prepare (void *cls, - struct NextRequestClosure *nrc) -{ - struct Plugin *plugin = cls; - long long nt; - - if (NULL == nrc) - return GNUNET_NO; - nt = (long long) nrc->now.abs_value; - return prepared_statement_run_select - (plugin, - plugin->select_expiration, - 7, nrc->rbind, - &return_ok, NULL, - MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, - -1); } /** * Get a random item for expiration. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Call 'proc' with all values ZERO or NULL if the datastore is empty. * * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param proc function to call the value (once only). + * @param proc_cls closure for proc */ static void -mysql_plugin_expiration_get (void *cls, - PluginIterator iter, void *iter_cls) +mysql_plugin_get_expiration (void *cls, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure *nrc; - - nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); - nrc->plugin = plugin; - nrc->now = GNUNET_TIME_absolute_get (); - nrc->prep = &expiration_prepare; - nrc->prep_cls = plugin; - nrc->type = 0; - nrc->dviter = iter; - nrc->dviter_cls = iter_cls; - nrc->end_it = GNUNET_NO; - nrc->one_shot = GNUNET_YES; - mysql_next_request_cont (nrc, NULL); + long long nt; + + nt = (long long) GNUNET_TIME_absolute_get().abs_value; + execute_select (plugin, + plugin->select_expiration, + proc, proc_cls, + MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, + -1); + } @@ -1760,14 +1483,13 @@ libgnunet_plugin_datastore_mysql_init (void *cls) api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions)); api->cls = plugin; - api->get_size = &mysql_plugin_get_size; + api->estimate_size = &mysql_plugin_estimate_size; api->put = &mysql_plugin_put; - api->next_request = &mysql_plugin_next_request; - api->get = &mysql_plugin_get; - api->replication_get = &mysql_plugin_replication_get; - api->expiration_get = &mysql_plugin_expiration_get; api->update = &mysql_plugin_update; - api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity; + api->get_key = &mysql_plugin_get_key; + api->get_replication = &mysql_plugin_get_replication; + api->get_expiration = &mysql_plugin_get_expiration; + api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity; api->drop = &mysql_plugin_drop; GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql", _("Mysql database running\n")); @@ -1787,14 +1509,6 @@ libgnunet_plugin_datastore_mysql_done (void *cls) struct Plugin *plugin = api->cls; iclose (plugin); - if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (plugin->next_task); - plugin->next_task = GNUNET_SCHEDULER_NO_TASK; - plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL); - GNUNET_free (plugin->next_task_nc); - plugin->next_task_nc = NULL; - } GNUNET_free_non_null (plugin->cnffile); GNUNET_free (plugin); GNUNET_free (api); diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c index aea87fdf4..cb077f06a 100644 --- a/src/datastore/plugin_datastore_postgres.c +++ b/src/datastore/plugin_datastore_postgres.c @@ -43,103 +43,6 @@ #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS -/** - * Closure for 'postgres_next_request_cont'. - */ -struct NextRequestClosure -{ - /** - * Global plugin data. - */ - struct Plugin *plugin; - - /** - * Function to call for each matching entry. - */ - PluginIterator iter; - - /** - * Closure for 'iter'. - */ - void *iter_cls; - - /** - * Parameters for the prepared statement. - */ - const char *paramValues[5]; - - /** - * Name of the prepared statement to run. - */ - const char *pname; - - /** - * Size of values pointed to by paramValues. - */ - int paramLengths[5]; - - /** - * Number of paramters in paramValues/paramLengths. - */ - int nparams; - - /** - * Current time (possible parameter), big-endian. - */ - uint64_t bnow; - - /** - * Key (possible parameter) - */ - GNUNET_HashCode key; - - /** - * Hash of value (possible parameter) - */ - GNUNET_HashCode vhash; - - /** - * Number of entries found so far - */ - unsigned long long count; - - /** - * Offset this iteration starts at. - */ - uint64_t off; - - /** - * Current offset to use in query, big-endian. - */ - uint64_t blimit_off; - - /** - * Current total number of entries found so far, big-endian. - */ - uint64_t bcount; - - /** - * Overall number of matching entries. - */ - unsigned long long total; - - /** - * Type of block (possible paramter), big-endian. - */ - uint32_t btype; - - /** - * Flag set to GNUNET_YES to stop iteration. - */ - int end_it; - - /** - * Flag to indicate that there should only be one result. - */ - int one_shot; -}; - - /** * Context for all functions in this plugin. */ @@ -155,16 +58,6 @@ struct Plugin */ PGconn *dbh; - /** - * Closure of the 'next_task' (must be freed if 'next_task' is cancelled). - */ - struct NextRequestClosure *next_task_nc; - - /** - * Pending task with scheduler for running the next request. - */ - GNUNET_SCHEDULER_TaskIdentifier next_task; - }; @@ -434,7 +327,7 @@ init_connection (struct Plugin *plugin) pq_prepare (plugin, "select_non_anonymous", "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " - "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1", + "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2", 1, __LINE__)) || (GNUNET_OK != @@ -482,11 +375,13 @@ static int delete_by_rowid (struct Plugin *plugin, unsigned int rowid) { - const char *paramValues[] = { (const char *) &rowid }; - int paramLengths[] = { sizeof (rowid) }; + uint32_t browid; + const char *paramValues[] = { (const char *) &browid }; + int paramLengths[] = { sizeof (browid) }; const int paramFormats[] = { 1 }; PGresult *ret; + browid = htonl (rowid); ret = PQexecPrepared (plugin->dbh, "delrow", 1, paramValues, paramLengths, paramFormats, 1); @@ -510,7 +405,7 @@ delete_by_rowid (struct Plugin *plugin, * @return number of bytes used on disk */ static unsigned long long -postgres_plugin_get_size (void *cls) +postgres_plugin_estimate_size (void *cls) { struct Plugin *plugin = cls; unsigned long long total; @@ -619,22 +514,20 @@ postgres_plugin_put (void *cls, /** - * Function invoked on behalf of a "PluginIterator" - * asking the database plugin to call the iterator - * with the next item. + * Function invoked to process the result and call + * the processor. * - * @param next_cls the 'struct NextRequestClosure' - * @param tc scheduler context + * @param plugin global plugin data + * @param proc function to call the value (once only). + * @param proc_cls closure for proc + * @param res result from exec */ static void -postgres_next_request_cont (void *next_cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +process_result (struct Plugin *plugin, + PluginDatumProcessor proc, void *proc_cls, + PGresult *res) { - struct NextRequestClosure *nrc = next_cls; - struct Plugin *plugin = nrc->plugin; - const int paramFormats[] = { 1, 1, 1, 1, 1 }; int iret; - PGresult *res; enum GNUNET_BLOCK_Type type; uint32_t anonymity; uint32_t priority; @@ -643,38 +536,11 @@ postgres_next_request_cont (void *next_cls, struct GNUNET_TIME_Absolute expiration_time; GNUNET_HashCode key; - plugin->next_task = GNUNET_SCHEDULER_NO_TASK; - plugin->next_task_nc = NULL; - if ( (GNUNET_YES == nrc->end_it) || - (nrc->count == nrc->total) ) - { -#if DEBUG_POSTGRES - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "datastore-postgres", - "Ending iteration (%s)\n", - (GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set"); -#endif - nrc->iter (nrc->iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (nrc); - return; - } - if (nrc->off == nrc->total) - nrc->off = 0; - nrc->blimit_off = GNUNET_htonll (nrc->off); - nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count); - res = PQexecPrepared (plugin->dbh, - nrc->pname, - nrc->nparams, - nrc->paramValues, - nrc->paramLengths, - paramFormats, 1); if (GNUNET_OK != check_result (plugin, res, PGRES_TUPLES_OK, "PQexecPrepared", - nrc->pname, + "select", __LINE__)) { #if DEBUG_POSTGRES @@ -682,10 +548,9 @@ postgres_next_request_cont (void *next_cls, "datastore-postgres", "Ending iteration (postgres error)\n"); #endif - nrc->iter (nrc->iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (nrc); + proc (proc_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } @@ -697,11 +562,10 @@ postgres_next_request_cont (void *next_cls, "datastore-postgres", "Ending iteration (no more results)\n"); #endif - nrc->iter (nrc->iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + proc (proc_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); PQclear (res); - GNUNET_free (nrc); return; } if ((1 != PQntuples (res)) || @@ -710,11 +574,10 @@ postgres_next_request_cont (void *next_cls, (sizeof (uint32_t) != PQfsize (res, 6))) { GNUNET_break (0); - nrc->iter (nrc->iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + proc (proc_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); PQclear (res); - GNUNET_free (nrc); return; } rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6)); @@ -727,10 +590,9 @@ postgres_next_request_cont (void *next_cls, GNUNET_break (0); PQclear (res); delete_by_rowid (plugin, rowid); - nrc->iter (nrc->iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (nrc); + proc (proc_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } @@ -749,33 +611,23 @@ postgres_next_request_cont (void *next_cls, (unsigned int) size, (unsigned int) type); #endif - iret = nrc->iter (nrc->iter_cls, - (nrc->one_shot == GNUNET_YES) ? NULL : nrc, - &key, - size, - PQgetvalue (res, 0, 5), - (enum GNUNET_BLOCK_Type) type, - priority, - anonymity, - expiration_time, - rowid); + iret = proc (proc_cls, + &key, + size, + PQgetvalue (res, 0, 5), + (enum GNUNET_BLOCK_Type) type, + priority, + anonymity, + expiration_time, + rowid); PQclear (res); - if (iret != GNUNET_NO) - { - nrc->count++; - nrc->off++; - } - if (iret == GNUNET_SYSERR) + if (iret == GNUNET_NO) { #if DEBUG_POSTGRES - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "datastore-postgres", - "Ending iteration (client error)\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Processor asked for item %u to be removed.\n", + rowid); #endif - return; - } - if (iret == GNUNET_NO) - { if (GNUNET_OK == delete_by_rowid (plugin, rowid)) { #if DEBUG_POSTGRES @@ -794,34 +646,6 @@ postgres_next_request_cont (void *next_cls, #endif } } - if (nrc->one_shot == GNUNET_YES) - GNUNET_free (nrc); -} - - -/** - * Function invoked on behalf of a "PluginIterator" - * asking the database plugin to call the iterator - * with the next item. - * - * @param next_cls whatever argument was given - * to the PluginIterator as "next_cls". - * @param end_it set to GNUNET_YES if we - * should terminate the iteration early - * (iterator should be still called once more - * to signal the end of the iteration). - */ -static void -postgres_plugin_next_request (void *next_cls, - int end_it) -{ - struct NextRequestClosure *nrc = next_cls; - - if (GNUNET_YES == end_it) - nrc->end_it = GNUNET_YES; - nrc->plugin->next_task_nc = nrc; - nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&postgres_next_request_cont, - nrc); } @@ -843,62 +667,62 @@ postgres_plugin_next_request (void *next_cls, * @param iter_cls closure for iter */ static void -postgres_plugin_get (void *cls, - const GNUNET_HashCode * key, - const GNUNET_HashCode * vhash, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, void *iter_cls) +postgres_plugin_get_key (void *cls, + uint64_t offset, + const GNUNET_HashCode *key, + const GNUNET_HashCode *vhash, + enum GNUNET_BLOCK_Type type, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure *nrc; const int paramFormats[] = { 1, 1, 1, 1, 1 }; + int paramLengths[4]; + const char *paramValues[4]; + int nparams; + const char *pname; PGresult *ret; + uint64_t total; + uint64_t blimit_off; + uint32_t btype; GNUNET_assert (key != NULL); - nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); - nrc->plugin = plugin; - nrc->iter = iter; - nrc->iter_cls = iter_cls; - nrc->key = *key; - if (vhash != NULL) - nrc->vhash = *vhash; - nrc->paramValues[0] = (const char*) &nrc->key; - nrc->paramLengths[0] = sizeof (GNUNET_HashCode); - nrc->btype = htonl (type); + paramValues[0] = (const char*) key; + paramLengths[0] = sizeof (GNUNET_HashCode); + btype = htonl (type); if (type != 0) { if (vhash != NULL) { - nrc->paramValues[1] = (const char *) &nrc->vhash; - nrc->paramLengths[1] = sizeof (nrc->vhash); - nrc->paramValues[2] = (const char *) &nrc->btype; - nrc->paramLengths[2] = sizeof (nrc->btype); - nrc->paramValues[3] = (const char *) &nrc->blimit_off; - nrc->paramLengths[3] = sizeof (nrc->blimit_off); - nrc->nparams = 4; - nrc->pname = "getvt"; + paramValues[1] = (const char *) vhash; + paramLengths[1] = sizeof (GNUNET_HashCode); + paramValues[2] = (const char *) &btype; + paramLengths[2] = sizeof (btype); + paramValues[3] = (const char *) &blimit_off; + paramLengths[3] = sizeof (blimit_off); + nparams = 4; + pname = "getvt"; ret = PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3", 3, NULL, - nrc->paramValues, - nrc->paramLengths, + paramValues, + paramLengths, paramFormats, 1); } else { - nrc->paramValues[1] = (const char *) &nrc->btype; - nrc->paramLengths[1] = sizeof (nrc->btype); - nrc->paramValues[2] = (const char *) &nrc->blimit_off; - nrc->paramLengths[2] = sizeof (nrc->blimit_off); - nrc->nparams = 3; - nrc->pname = "gett"; + paramValues[1] = (const char *) &btype; + paramLengths[1] = sizeof (btype); + paramValues[2] = (const char *) &blimit_off; + paramLengths[2] = sizeof (blimit_off); + nparams = 3; + pname = "gett"; ret = PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2", 2, NULL, - nrc->paramValues, - nrc->paramLengths, + paramValues, + paramLengths, paramFormats, 1); } } @@ -906,32 +730,32 @@ postgres_plugin_get (void *cls, { if (vhash != NULL) { - nrc->paramValues[1] = (const char *) &nrc->vhash; - nrc->paramLengths[1] = sizeof (nrc->vhash); - nrc->paramValues[2] = (const char *) &nrc->blimit_off; - nrc->paramLengths[2] = sizeof (nrc->blimit_off); - nrc->nparams = 3; - nrc->pname = "getv"; + paramValues[1] = (const char *) vhash; + paramLengths[1] = sizeof (GNUNET_HashCode); + paramValues[2] = (const char *) &blimit_off; + paramLengths[2] = sizeof (blimit_off); + nparams = 3; + pname = "getv"; ret = PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2", 2, NULL, - nrc->paramValues, - nrc->paramLengths, + paramValues, + paramLengths, paramFormats, 1); } else { - nrc->paramValues[1] = (const char *) &nrc->blimit_off; - nrc->paramLengths[1] = sizeof (nrc->blimit_off); - nrc->nparams = 2; - nrc->pname = "get"; + paramValues[1] = (const char *) &blimit_off; + paramLengths[1] = sizeof (blimit_off); + nparams = 2; + pname = "get"; ret = PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1", 1, NULL, - nrc->paramValues, - nrc->paramLengths, + paramValues, + paramLengths, paramFormats, 1); } } @@ -939,13 +763,12 @@ postgres_plugin_get (void *cls, ret, PGRES_TUPLES_OK, "PQexecParams", - nrc->pname, + pname, __LINE__)) { - iter (iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, + proc (proc_cls, + NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (nrc); return; } if ((PQntuples (ret) != 1) || @@ -954,26 +777,30 @@ postgres_plugin_get (void *cls, { GNUNET_break (0); PQclear (ret); - iter (iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, + proc (proc_cls, + NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (nrc); return; } - nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0)); + total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0)); PQclear (ret); - if (nrc->total == 0) + if (total == 0) { - iter (iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, + proc (proc_cls, + NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (nrc); return; } - nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, - nrc->total); - postgres_plugin_next_request (nrc, - GNUNET_NO); + blimit_off = GNUNET_htonll (offset % total); + ret = PQexecPrepared (plugin->dbh, + pname, + nparams, + paramValues, + paramLengths, + paramFormats, 1); + process_result (plugin, + proc, proc_cls, + ret); } @@ -989,28 +816,33 @@ postgres_plugin_get (void *cls, * @param iter_cls closure for iter */ static void -postgres_plugin_iter_zero_anonymity (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) +postgres_plugin_get_zero_anonymity (void *cls, + uint64_t offset, + enum GNUNET_BLOCK_Type type, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure *nrc; + uint32_t btype; + uint64_t boff; + const int paramFormats[] = { 1, 1 }; + int paramLengths[] = { sizeof (btype), sizeof (boff) }; + const char *paramValues[] = { (const char*) &btype, (const char*) &boff }; + PGresult *ret; - nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); - nrc->total = UINT32_MAX; - nrc->btype = htonl ((uint32_t) type); - nrc->plugin = plugin; - nrc->iter = iter; - nrc->iter_cls = iter_cls; - nrc->pname = "select_non_anonymous"; - nrc->nparams = 1; - nrc->paramLengths[0] = sizeof (nrc->bcount); - nrc->paramValues[0] = (const char*) &nrc->bcount; - postgres_plugin_next_request (nrc, - GNUNET_NO); + btype = htonl ((uint32_t) type); + boff = GNUNET_htonll (offset); + ret = PQexecPrepared (plugin->dbh, + "select_non_anonymous", + 2, + paramValues, + paramLengths, + paramFormats, 1); + process_result (plugin, + proc, proc_cls, + ret); } + /** * Context for 'repl_iter' function. */ @@ -1025,12 +857,12 @@ struct ReplCtx /** * Function to call for the result (or the NULL). */ - PluginIterator iter; + PluginDatumProcessor proc; /** - * Closure for iter. + * Closure for proc. */ - void *iter_cls; + void *proc_cls; }; @@ -1056,8 +888,7 @@ struct ReplCtx * GNUNET_NO to delete the item and continue (if supported) */ static int -repl_iter (void *cls, - void *next_cls, +repl_proc (void *cls, const GNUNET_HashCode *key, uint32_t size, const void *data, @@ -1073,8 +904,8 @@ repl_iter (void *cls, PGresult *qret; uint32_t boid; - ret = rc->iter (rc->iter_cls, - next_cls, key, + ret = rc->proc (rc->proc_cls, + key, size, data, type, priority, anonymity, expiration, uid); @@ -1107,32 +938,30 @@ repl_iter (void *cls, * Get a random item for replication. Returns a single, not expired, random item * from those with the highest replication counters. The item's * replication counter is decremented by one IF it was positive before. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Call 'proc' with all values ZERO or NULL if the datastore is empty. * * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param proc function to call the value (once only). + * @param proc_cls closure for iter */ static void -postgres_plugin_replication_get (void *cls, - PluginIterator iter, void *iter_cls) +postgres_plugin_get_replication (void *cls, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure *nrc; struct ReplCtx rc; + PGresult *ret; rc.plugin = plugin; - rc.iter = iter; - rc.iter_cls = iter_cls; - nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); - nrc->one_shot = GNUNET_YES; - nrc->total = 1; - nrc->plugin = plugin; - nrc->iter = &repl_iter; - nrc->iter_cls = &rc; - nrc->pname = "select_replication_order"; - nrc->nparams = 0; - postgres_next_request_cont (nrc, NULL); + rc.proc = proc; + rc.proc_cls = proc_cls; + ret = PQexecPrepared (plugin->dbh, + "select_replication_order", + 0, + NULL, NULL, NULL, 1); + process_result (plugin, + &repl_proc, &rc, + ret); } @@ -1141,29 +970,31 @@ postgres_plugin_replication_get (void *cls, * Call 'iter' with all values ZERO or NULL if the datastore is empty. * * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param proc function to call the value (once only). + * @param proc_cls closure for iter */ static void -postgres_plugin_expiration_get (void *cls, - PluginIterator iter, void *iter_cls) +postgres_plugin_get_expiration (void *cls, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure *nrc; uint64_t btime; + const int paramFormats[] = { 1 }; + int paramLengths[] = { sizeof (btime) }; + const char *paramValues[] = { (const char*) &btime }; + PGresult *ret; btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value); - nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); - nrc->one_shot = GNUNET_YES; - nrc->total = 1; - nrc->plugin = plugin; - nrc->iter = iter; - nrc->iter_cls = iter_cls; - nrc->pname = "select_expiration_order"; - nrc->nparams = 1; - nrc->paramValues[0] = (const char *) &btime; - nrc->paramLengths[0] = sizeof (btime); - postgres_next_request_cont (nrc, NULL); + ret = PQexecPrepared (plugin->dbh, + "select_expiration_order", + 1, + paramValues, + paramLengths, + paramFormats, + 1); + process_result (plugin, + proc, proc_cls, + ret); } @@ -1260,14 +1091,13 @@ libgnunet_plugin_datastore_postgres_init (void *cls) } api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions)); api->cls = plugin; - api->get_size = &postgres_plugin_get_size; + api->estimate_size = &postgres_plugin_estimate_size; api->put = &postgres_plugin_put; - api->next_request = &postgres_plugin_next_request; - api->get = &postgres_plugin_get; - api->replication_get = &postgres_plugin_replication_get; - api->expiration_get = &postgres_plugin_expiration_get; api->update = &postgres_plugin_update; - api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity; + api->get_key = &postgres_plugin_get_key; + api->get_replication = &postgres_plugin_get_replication; + api->get_expiration = &postgres_plugin_get_expiration; + api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity; api->drop = &postgres_plugin_drop; GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres", @@ -1287,13 +1117,6 @@ libgnunet_plugin_datastore_postgres_done (void *cls) struct GNUNET_DATASTORE_PluginFunctions *api = cls; struct Plugin *plugin = api->cls; - if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (plugin->next_task); - plugin->next_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_free (plugin->next_task_nc); - plugin->next_task_nc = NULL; - } PQfinish (plugin->dbh); GNUNET_free (plugin); GNUNET_free (api); diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c index 6e77ec364..3710b7eb7 100644 --- a/src/datastore/plugin_datastore_sqlite.c +++ b/src/datastore/plugin_datastore_sqlite.c @@ -108,19 +108,14 @@ struct Plugin sqlite3_stmt *selExpi; /** - * Precompiled SQL for insertion. - */ - sqlite3_stmt *insertContent; - - /** - * Closure of the 'next_task' (must be freed if 'next_task' is cancelled). + * Precompiled SQL for expiration selection. */ - struct NextContext *next_task_nc; + sqlite3_stmt *selZeroAnon; /** - * Pending task with scheduler for running the next request. + * Precompiled SQL for insertion. */ - GNUNET_SCHEDULER_TaskIdentifier next_task; + sqlite3_stmt *insertContent; /** * Should the database be dropped on shutdown? @@ -326,6 +321,11 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg, " WHERE NOT EXISTS (SELECT 1 FROM gn090 WHERE expire < ?1 LIMIT 1) OR expire < ?1 " " ORDER BY prio ASC LIMIT 1", &plugin->selExpi) != SQLITE_OK) || + (sq_prepare (plugin->dbh, + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 " + "WHERE (anonLevel = 0 AND type=?1) " + "ORDER BY hash DESC LIMIT 1 OFFSET ?2", + &plugin->selZeroAnon) != SQLITE_OK) || (sq_prepare (plugin->dbh, "INSERT INTO gn090 (repl, type, prio, " "anonLevel, expire, hash, vhash, value) " @@ -367,6 +367,8 @@ database_shutdown (struct Plugin *plugin) sqlite3_finalize (plugin->selRepl); if (plugin->selExpi != NULL) sqlite3_finalize (plugin->selExpi); + if (plugin->selZeroAnon != NULL) + sqlite3_finalize (plugin->selZeroAnon); if (plugin->insertContent != NULL) sqlite3_finalize (plugin->insertContent); result = sqlite3_close(plugin->dbh); @@ -435,247 +437,6 @@ delete_by_rowid (struct Plugin* plugin, } -/** - * Context for the universal iterator. - */ -struct NextContext; - -/** - * Type of a function that will prepare - * the next iteration. - * - * @param cls closure - * @param nc the next context; NULL for the last - * call which gives the callback a chance to - * clean up the closure - * @return GNUNET_OK on success, GNUNET_NO if there are - * no more values, GNUNET_SYSERR on error - */ -typedef int (*PrepareFunction)(void *cls, - struct NextContext *nc); - - -/** - * Context we keep for the "next request" callback. - */ -struct NextContext -{ - /** - * Internal state. - */ - struct Plugin *plugin; - - /** - * Function to call on the next value. - */ - PluginIterator iter; - - /** - * Closure for iter. - */ - void *iter_cls; - - /** - * Function to call to prepare the next - * iteration. - */ - PrepareFunction prep; - - /** - * Closure for prep. - */ - void *prep_cls; - - /** - * Statement that the iterator will get the data - * from (updated or set by prep). - */ - sqlite3_stmt *stmt; - - /** - * Row ID of the last result. - */ - unsigned long long last_rowid; - - /** - * Key of the last result. - */ - GNUNET_HashCode lastKey; - - /** - * Priority of the last value visited. - */ - unsigned int lastPriority; - - /** - * Number of results processed so far. - */ - unsigned int count; - - /** - * Set to GNUNET_YES if we must stop now. - */ - int end_it; -}; - - -/** - * Continuation of "sqlite_next_request". - * - * @param cls the 'struct NextContext*' - * @param tc the task context (unused) - */ -static void -sqlite_next_request_cont (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct NextContext * nc = cls; - struct Plugin *plugin; - unsigned long long rowid; - int ret; - unsigned int size; - unsigned int hsize; - uint32_t anonymity; - uint32_t priority; - enum GNUNET_BLOCK_Type type; - const GNUNET_HashCode *key; - struct GNUNET_TIME_Absolute expiration; - - plugin = nc->plugin; - plugin->next_task = GNUNET_SCHEDULER_NO_TASK; - plugin->next_task_nc = NULL; - if ( (GNUNET_YES == nc->end_it) || - (GNUNET_OK != (nc->prep(nc->prep_cls, - nc))) ) - { -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Iteration completes after %u results\n", - nc->count); -#endif - END: - nc->iter (nc->iter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - nc->prep (nc->prep_cls, NULL); - GNUNET_free (nc); - return; - } - - type = sqlite3_column_int (nc->stmt, 0); - priority = sqlite3_column_int (nc->stmt, 1); - anonymity = sqlite3_column_int (nc->stmt, 2); - expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3); - hsize = sqlite3_column_bytes (nc->stmt, 4); - key = sqlite3_column_blob (nc->stmt, 4); - size = sqlite3_column_bytes (nc->stmt, 5); - rowid = sqlite3_column_int64 (nc->stmt, 6); - if (hsize != sizeof (GNUNET_HashCode)) - { - GNUNET_break (0); - if (SQLITE_OK != sqlite3_reset (nc->stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - if (GNUNET_OK == delete_by_rowid (plugin, rowid)) - plugin->env->duc (plugin->env->cls, - - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); - goto END; - } -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Iterator returns value with type %u/key `%s'/priority %u/expiration %llu (%lld).\n", - type, - GNUNET_h2s(key), - priority, - (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value, - (long long) expiration.abs_value); -#endif - if (size > MAX_ITEM_SIZE) - { - GNUNET_break (0); - if (SQLITE_OK != sqlite3_reset (nc->stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - if (GNUNET_OK == delete_by_rowid (plugin, rowid)) - plugin->env->duc (plugin->env->cls, - - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); - goto END; - } - { - char data[size]; - - memcpy (data, sqlite3_column_blob (nc->stmt, 5), size); - nc->count++; - nc->last_rowid = rowid; - nc->lastPriority = priority; - nc->lastKey = *key; - if (SQLITE_OK != sqlite3_reset (nc->stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - ret = nc->iter (nc->iter_cls, nc, - &nc->lastKey, - size, data, - type, priority, - anonymity, expiration, - rowid); - } - switch (ret) - { - case GNUNET_SYSERR: - nc->end_it = GNUNET_YES; - break; - case GNUNET_NO: - if (GNUNET_OK == delete_by_rowid (plugin, rowid)) - { -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Removed entry %llu (%u bytes)\n", - (unsigned long long) rowid, - size + GNUNET_DATASTORE_ENTRY_OVERHEAD); -#endif - plugin->env->duc (plugin->env->cls, - - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); - } - break; - case GNUNET_YES: - break; - default: - GNUNET_break (0); - } -} - - -/** - * Function invoked on behalf of a "PluginIterator" asking the - * database plugin to call the iterator with the next item. - * - * @param next_cls whatever argument was given - * to the PluginIterator as "next_cls". - * @param end_it set to GNUNET_YES if we - * should terminate the iteration early - * (iterator should be still called once more - * to signal the end of the iteration). - */ -static void -sqlite_next_request (void *next_cls, - int end_it) -{ - struct NextContext * nc= next_cls; - - if (GNUNET_YES == end_it) - nc->end_it = GNUNET_YES; - nc->plugin->next_task_nc = nc; - nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont, - nc); -} - - /** * Store an item in the datastore. * @@ -849,355 +610,147 @@ sqlite_plugin_update (void *cls, /** - * Internal context for an iteration. - */ -struct ZeroIterContext -{ - /** - * First iterator statement for zero-anonymity iteration. - */ - sqlite3_stmt *stmt_1; - - /** - * Second iterator statement for zero-anonymity iteration. - */ - sqlite3_stmt *stmt_2; - - /** - * Desired type for blocks returned by this iterator. - */ - enum GNUNET_BLOCK_Type type; -}; - - -/** - * Prepare our SQL query to obtain the next record from the database. + * Execute statement that gets a row and call the callback + * with the result. Resets the statement afterwards. * - * @param cls our "struct ZeroIterContext" - * @param nc NULL to terminate the iteration, otherwise our context for - * getting the next result. - * @return GNUNET_OK on success, GNUNET_NO if there are no more results, - * GNUNET_SYSERR on error (or end of iteration) + * @param plugin the plugin + * @param stmt the statement + * @param proc processor to call + * @param proc_cls closure for 'proc' */ -static int -zero_iter_next_prepare (void *cls, - struct NextContext *nc) +static void +execute_get (struct Plugin *plugin, + sqlite3_stmt *stmt, + PluginDatumProcessor proc, void *proc_cls) { - struct ZeroIterContext *ic = cls; - struct Plugin *plugin; + int n; + struct GNUNET_TIME_Absolute expiration; + unsigned long long rowid; + unsigned int size; int ret; - if (nc == NULL) - { -#if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to clean up iterator state.\n"); -#endif - sqlite3_finalize (ic->stmt_1); - sqlite3_finalize (ic->stmt_2); - return GNUNET_SYSERR; - } - plugin = nc->plugin; - - /* first try iter 1 */ -#if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Restricting to results larger than the last priority %u and key `%s'\n", - nc->lastPriority, - GNUNET_h2s (&nc->lastKey)); -#endif - if ( (SQLITE_OK != sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority)) || - (SQLITE_OK != sqlite3_bind_blob (ic->stmt_1, 2, - &nc->lastKey, - sizeof (GNUNET_HashCode), - SQLITE_TRANSIENT)) ) + n = sqlite3_step (stmt); + switch (n) { - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); - if (SQLITE_OK != sqlite3_reset (ic->stmt_1)) + case SQLITE_ROW: + size = sqlite3_column_bytes (stmt, 5); + rowid = sqlite3_column_int64 (stmt, 6); + if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode)) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, + "sqlite", + _("Invalid data in database. Trying to fix (by deletion).\n")); + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + if (GNUNET_OK == delete_by_rowid (plugin, rowid)) + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); + break; + } + expiration.abs_value = sqlite3_column_int64 (stmt, 3); + ret = proc (proc_cls, + sqlite3_column_blob (stmt, 4) /* key */, + size, + sqlite3_column_blob (stmt, 5) /* data */, + sqlite3_column_int (stmt, 0) /* type */, + sqlite3_column_int (stmt, 1) /* priority */, + sqlite3_column_int (stmt, 2) /* anonymity */, + expiration, + rowid); + if (SQLITE_OK != sqlite3_reset (stmt)) LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, - "sqlite3_reset"); - return GNUNET_SYSERR; - } - if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1))) - { -#if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Result found using iterator 1\n"); -#endif - nc->stmt = ic->stmt_1; - return GNUNET_OK; - } - if (ret != SQLITE_DONE) - { - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, - "sqlite3_step"); - if (SQLITE_OK != sqlite3_reset (ic->stmt_1)) + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + if ( (GNUNET_NO == ret) && + (GNUNET_OK == delete_by_rowid (plugin, rowid)) ) + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); + return; + case SQLITE_DONE: + /* database must be empty */ + if (SQLITE_OK != sqlite3_reset (stmt)) LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, - "sqlite3_reset"); - return GNUNET_SYSERR; - } - if (SQLITE_OK != sqlite3_reset (ic->stmt_1)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, - "sqlite3_reset"); - - /* now try iter 2 */ - if (SQLITE_OK != sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority)) - { - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); - return GNUNET_SYSERR; - } - if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) - { -#if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Result found using iterator 2\n"); -#endif - nc->stmt = ic->stmt_2; - return GNUNET_OK; - } - if (ret != SQLITE_DONE) - { + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + break; + case SQLITE_BUSY: + case SQLITE_ERROR: + case SQLITE_MISUSE: + default: LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); - if (SQLITE_OK != sqlite3_reset (ic->stmt_2)) + if (SQLITE_OK != sqlite3_reset (stmt)) LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - return GNUNET_SYSERR; + GNUNET_break (0); + database_shutdown (plugin); + database_setup (plugin->env->cfg, + plugin); + break; } - if (SQLITE_OK != sqlite3_reset (ic->stmt_2)) + if (SQLITE_OK != sqlite3_reset (stmt)) LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, - "sqlite3_reset"); -#if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No result found using either iterator\n"); -#endif - return GNUNET_NO; + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); } + /** * Select a subset of the items in the datastore and call - * the given iterator for each of them. + * the given processor for the item. * * @param cls our plugin context * @param type entries of which type should be considered? * Use 0 for any type. - * @param iter function to call on each matching value; + * @param proc function to call on each matching value; * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param proc_cls closure for proc */ static void -sqlite_plugin_iter_zero_anonymity (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) +sqlite_plugin_get_zero_anonymity (void *cls, + uint64_t offset, + enum GNUNET_BLOCK_Type type, + PluginDatumProcessor proc, + void *proc_cls) { struct Plugin *plugin = cls; - struct GNUNET_TIME_Absolute now; - struct NextContext *nc; - struct ZeroIterContext *ic; - sqlite3_stmt *stmt_1; - sqlite3_stmt *stmt_2; - char *q; + sqlite3_stmt *stmt; GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); - now = GNUNET_TIME_absolute_get (); - GNUNET_asprintf (&q, - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 " - "WHERE (anonLevel = 0 AND expire > %llu AND prio = ?1 AND type=%d AND hash < ?2) " - "ORDER BY hash DESC LIMIT 1", - (unsigned long long) now.abs_value, - type); - if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK) + stmt = plugin->selZeroAnon; + if ( (SQLITE_OK != sqlite3_bind_int (stmt, 1, type)) || + (SQLITE_OK != sqlite3_bind_int64 (stmt, 2, offset)) ) { LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2"); - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (q); - return; - } - GNUNET_free (q); - GNUNET_asprintf (&q, - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 " - "WHERE (anonLevel = 0 AND expire > %llu AND prio < ?1 AND type=%d) " - "ORDER BY prio DESC, hash DESC LIMIT 1", - (unsigned long long) now.abs_value, - type); - if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK) - { - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2"); - sqlite3_finalize (stmt_1); - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (q); + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind_XXXX"); + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } - GNUNET_free (q); - nc = GNUNET_malloc (sizeof(struct NextContext) + - sizeof(struct ZeroIterContext)); - nc->plugin = plugin; - nc->iter = iter; - nc->iter_cls = iter_cls; - nc->stmt = NULL; - ic = (struct ZeroIterContext*) &nc[1]; - ic->stmt_1 = stmt_1; - ic->stmt_2 = stmt_2; - ic->type = type; - nc->prep = &zero_iter_next_prepare; - nc->prep_cls = ic; - nc->lastPriority = INT32_MAX; - memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode)); - sqlite_next_request (nc, GNUNET_NO); + execute_get (plugin, stmt, proc, proc_cls); } -/** - * Context for get_next_prepare. - */ -struct GetNextContext -{ - - /** - * Our prepared statement. - */ - sqlite3_stmt *stmt; - - /** - * Plugin handle. - */ - struct Plugin *plugin; - - /** - * Key for the query. - */ - GNUNET_HashCode key; - - /** - * Vhash for the query. - */ - GNUNET_HashCode vhash; - - /** - * Expected total number of results. - */ - unsigned int total; - - /** - * Offset to add for the selected result. - */ - unsigned int off; - - /** - * Is vhash set? - */ - int have_vhash; - - /** - * Desired block type. - */ - enum GNUNET_BLOCK_Type type; - -}; - - -/** - * Prepare the stmt in 'nc' for the next round of execution, selecting the - * next return value. - * - * @param cls our "struct GetNextContext*" - * @param nc the general context - * @return GNUNET_YES if there are more results, - * GNUNET_NO if there are no more results, - * GNUNET_SYSERR on internal error - */ -static int -get_next_prepare (void *cls, - struct NextContext *nc) -{ - struct GetNextContext *gnc = cls; - int ret; - int limit_off; - unsigned int sqoff; - - if (nc == NULL) - { - sqlite3_finalize (gnc->stmt); - return GNUNET_SYSERR; - } - if (nc->count == gnc->total) - return GNUNET_NO; - if (nc->count + gnc->off == gnc->total) - nc->last_rowid = 0; - if (nc->count == 0) - limit_off = gnc->off; - else - limit_off = 0; - sqlite3_reset (nc->stmt); - sqoff = 1; - ret = sqlite3_bind_blob (nc->stmt, - sqoff++, - &gnc->key, - sizeof (GNUNET_HashCode), - SQLITE_TRANSIENT); - if ((gnc->have_vhash) && (ret == SQLITE_OK)) - ret = sqlite3_bind_blob (nc->stmt, - sqoff++, - &gnc->vhash, - sizeof (GNUNET_HashCode), SQLITE_TRANSIENT); - if ((gnc->type != 0) && (ret == SQLITE_OK)) - ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type); - if (ret == SQLITE_OK) - ret = sqlite3_bind_int64 (nc->stmt, sqoff++, limit_off); - if (ret != SQLITE_OK) - return GNUNET_SYSERR; -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Preparing to GET for key `%s' with type %d at offset %u\n", - GNUNET_h2s (&gnc->key), - gnc->type, - limit_off); -#endif - ret = sqlite3_step (nc->stmt); - switch (ret) - { - case SQLITE_ROW: - return GNUNET_OK; - case SQLITE_DONE: - return GNUNET_NO; - default: - LOG_SQLITE (gnc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, - "sqlite3_step"); - return GNUNET_SYSERR; - } -} - /** - * Iterate over the results for a particular key - * in the datastore. + * Get results for a particular key in the datastore. * * @param cls closure + * @param offset offset (mod count). * @param key key to match, never NULL * @param vhash hash of the value, maybe NULL (to * match all values that have the right key). @@ -1206,27 +759,27 @@ get_next_prepare (void *cls, * there may be! * @param type entries of which type are relevant? * Use 0 for any type. - * @param iter function to call on each matching value; + * @param proc function to call on each matching value; * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param proc_cls closure for proc */ static void -sqlite_plugin_get (void *cls, - const GNUNET_HashCode *key, - const GNUNET_HashCode *vhash, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, void *iter_cls) +sqlite_plugin_get_key (void *cls, + uint64_t offset, + const GNUNET_HashCode *key, + const GNUNET_HashCode *vhash, + enum GNUNET_BLOCK_Type type, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; - struct GetNextContext *gnc; - struct NextContext *nc; int ret; int total; + int limit_off; + unsigned int sqoff; sqlite3_stmt *stmt; char scratch[256]; - unsigned int sqoff; - GNUNET_assert (iter != NULL); + GNUNET_assert (proc != NULL); GNUNET_assert (key != NULL); GNUNET_snprintf (scratch, sizeof (scratch), "SELECT count(*) FROM gn090 WHERE hash=?%s%s", @@ -1236,7 +789,7 @@ sqlite_plugin_get (void *cls, { LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare"); - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } sqoff = 1; @@ -1253,7 +806,7 @@ sqlite_plugin_get (void *cls, LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR, "sqlite_bind"); sqlite3_finalize (stmt); - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } ret = sqlite3_step (stmt); @@ -1263,147 +816,64 @@ sqlite_plugin_get (void *cls, GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, "sqlite_step"); sqlite3_finalize (stmt); - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } total = sqlite3_column_int (stmt, 0); sqlite3_finalize (stmt); if (0 == total) { - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } + limit_off = (int) (offset % total); + if (limit_off < 0) + limit_off += total; GNUNET_snprintf (scratch, sizeof (scratch), "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ " "FROM gn090 WHERE hash=?%s%s " "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?", vhash == NULL ? "" : " AND vhash=?", type == 0 ? "" : " AND type=?"); - if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK) { LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare"); - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } - nc = GNUNET_malloc (sizeof(struct NextContext) + - sizeof(struct GetNextContext)); - nc->plugin = plugin; - nc->iter = iter; - nc->iter_cls = iter_cls; - nc->stmt = stmt; - gnc = (struct GetNextContext*) &nc[1]; - gnc->total = total; - gnc->type = type; - gnc->key = *key; - gnc->plugin = plugin; - gnc->stmt = stmt; /* alias used for freeing at the end! */ - if (NULL != vhash) - { - gnc->have_vhash = GNUNET_YES; - gnc->vhash = *vhash; - } - gnc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total); - nc->prep = &get_next_prepare; - nc->prep_cls = gnc; - sqlite_next_request (nc, GNUNET_NO); -} - - -/** - * Execute statement that gets a row and call the callback - * with the result. Resets the statement afterwards. - * - * @param plugin the plugin - * @param stmt the statement - * @param iter iterator to call - * @param iter_cls closure for 'iter' - */ -static void -execute_get (struct Plugin *plugin, - sqlite3_stmt *stmt, - PluginIterator iter, void *iter_cls) -{ - int n; - struct GNUNET_TIME_Absolute expiration; - unsigned long long rowid; - unsigned int size; - int ret; - - n = sqlite3_step (stmt); - switch (n) + sqoff = 1; + ret = sqlite3_bind_blob (stmt, + sqoff++, + key, + sizeof (GNUNET_HashCode), + SQLITE_TRANSIENT); + if ((vhash != NULL) && (ret == SQLITE_OK)) + ret = sqlite3_bind_blob (stmt, + sqoff++, + vhash, + sizeof (GNUNET_HashCode), SQLITE_TRANSIENT); + if ((type != 0) && (ret == SQLITE_OK)) + ret = sqlite3_bind_int (stmt, sqoff++, type); + if (ret == SQLITE_OK) + ret = sqlite3_bind_int64 (stmt, sqoff++, limit_off); + if (ret != SQLITE_OK) { - case SQLITE_ROW: - size = sqlite3_column_bytes (stmt, 5); - rowid = sqlite3_column_int64 (stmt, 6); - if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode)) - { - GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, - "sqlite", - _("Invalid data in database. Trying to fix (by deletion).\n")); - if (SQLITE_OK != sqlite3_reset (stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - if (GNUNET_OK == delete_by_rowid (plugin, rowid)) - plugin->env->duc (plugin->env->cls, - - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); - break; - } - expiration.abs_value = sqlite3_column_int64 (stmt, 3); - ret = iter (iter_cls, - NULL, - sqlite3_column_blob (stmt, 4) /* key */, - size, - sqlite3_column_blob (stmt, 5) /* data */, - sqlite3_column_int (stmt, 0) /* type */, - sqlite3_column_int (stmt, 1) /* priority */, - sqlite3_column_int (stmt, 2) /* anonymity */, - expiration, - rowid); - if (SQLITE_OK != sqlite3_reset (stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - if ( (GNUNET_NO == ret) && - (GNUNET_OK == delete_by_rowid (plugin, rowid)) ) - plugin->env->duc (plugin->env->cls, - - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); - return; - case SQLITE_DONE: - /* database must be empty */ - if (SQLITE_OK != sqlite3_reset (stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - break; - case SQLITE_BUSY: - case SQLITE_ERROR: - case SQLITE_MISUSE: - default: LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, - "sqlite3_step"); - if (SQLITE_OK != sqlite3_reset (stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, - "sqlite3_reset"); - GNUNET_break (0); - database_shutdown (plugin); - database_setup (plugin->env->cfg, - plugin); - break; + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite_bind"); + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + return; } - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + execute_get (plugin, stmt, proc, proc_cls); + sqlite3_finalize (stmt); } + /** - * Context for 'repl_iter' function. + * Context for 'repl_proc' function. */ struct ReplCtx { @@ -1416,22 +886,21 @@ struct ReplCtx /** * Function to call for the result (or the NULL). */ - PluginIterator iter; + PluginDatumProcessor proc; /** - * Closure for iter. + * Closure for proc. */ - void *iter_cls; + void *proc_cls; }; /** - * Wrapper for the iterator for 'sqlite_plugin_replication_get'. + * Wrapper for the processor for 'sqlite_plugin_replication_get'. * Decrements the replication counter and calls the original - * iterator. + * processor. * * @param cls closure - * @param next_cls closure to pass to the "next" function. * @param key key for the content * @param size number of bytes in data * @param data content stored @@ -1442,13 +911,11 @@ struct ReplCtx * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available * - * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue - * (continue on call to "next", of course), - * GNUNET_NO to delete the item and continue (if supported) + * @return GNUNET_OK for normal return, + * GNUNET_NO to delete the item */ static int -repl_iter (void *cls, - void *next_cls, +repl_proc (void *cls, const GNUNET_HashCode *key, uint32_t size, const void *data, @@ -1462,8 +929,8 @@ repl_iter (void *cls, struct Plugin *plugin = rc->plugin; int ret; - ret = rc->iter (rc->iter_cls, - next_cls, key, + ret = rc->proc (rc->proc_cls, + key, size, data, type, priority, anonymity, expiration, uid); @@ -1494,15 +961,15 @@ repl_iter (void *cls, * Get a random item for replication. Returns a single random item * from those with the highest replication counters. The item's * replication counter is decremented by one IF it was positive before. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Call 'proc' with all values ZERO or NULL if the datastore is empty. * * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param proc function to call the value (once only). + * @param proc_cls closure for proc */ static void -sqlite_plugin_replication_get (void *cls, - PluginIterator iter, void *iter_cls) +sqlite_plugin_get_replication (void *cls, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; struct ReplCtx rc; @@ -1513,24 +980,24 @@ sqlite_plugin_replication_get (void *cls, "Getting random block based on replication order.\n"); #endif rc.plugin = plugin; - rc.iter = iter; - rc.iter_cls = iter_cls; - execute_get (plugin, plugin->selRepl, &repl_iter, &rc); + rc.proc = proc; + rc.proc_cls = proc_cls; + execute_get (plugin, plugin->selRepl, &repl_proc, &rc); } /** * Get a random item that has expired or has low priority. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Call 'proc' with all values ZERO or NULL if the datastore is empty. * * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param proc function to call the value (once only). + * @param proc_cls closure for proc */ static void -sqlite_plugin_expiration_get (void *cls, - PluginIterator iter, void *iter_cls) +sqlite_plugin_get_expiration (void *cls, + PluginDatumProcessor proc, void *proc_cls) { struct Plugin *plugin = cls; sqlite3_stmt *stmt; @@ -1550,11 +1017,11 @@ sqlite_plugin_expiration_get (void *cls, if (SQLITE_OK != sqlite3_reset (stmt)) LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, + proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } - execute_get (plugin, stmt, iter, iter_cls); + execute_get (plugin, stmt, proc, proc_cls); } @@ -1579,7 +1046,7 @@ sqlite_plugin_drop (void *cls) * @return the size of the database on disk (estimate) */ static unsigned long long -sqlite_plugin_get_size (void *cls) +sqlite_plugin_estimate_size (void *cls) { struct Plugin *plugin = cls; sqlite3_stmt *stmt; @@ -1653,14 +1120,13 @@ libgnunet_plugin_datastore_sqlite_init (void *cls) } api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions)); api->cls = &plugin; - api->get_size = &sqlite_plugin_get_size; + api->estimate_size = &sqlite_plugin_estimate_size; api->put = &sqlite_plugin_put; - api->next_request = &sqlite_next_request; - api->get = &sqlite_plugin_get; - api->replication_get = &sqlite_plugin_replication_get; - api->expiration_get = &sqlite_plugin_expiration_get; api->update = &sqlite_plugin_update; - api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity; + api->get_key = &sqlite_plugin_get_key; + api->get_replication = &sqlite_plugin_get_replication; + api->get_expiration = &sqlite_plugin_get_expiration; + api->get_zero_anonymity = &sqlite_plugin_get_zero_anonymity; api->drop = &sqlite_plugin_drop; GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "sqlite", _("Sqlite database running\n")); @@ -1684,27 +1150,9 @@ libgnunet_plugin_datastore_sqlite_done (void *cls) #if DEBUG_SQLITE GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", - "sqlite plugin is doneing\n"); + "sqlite plugin is done\n"); #endif - if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK) - { -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Canceling next task\n"); -#endif - GNUNET_SCHEDULER_cancel (plugin->next_task); - plugin->next_task = GNUNET_SCHEDULER_NO_TASK; -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Prep'ing next task\n"); -#endif - plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL); - GNUNET_free (plugin->next_task_nc); - plugin->next_task_nc = NULL; - } fn = NULL; if (plugin->drop_on_shutdown) fn = GNUNET_strdup (plugin->fn); diff --git a/src/datastore/plugin_datastore_template.c b/src/datastore/plugin_datastore_template.c index 40b191538..6228e8c0c 100644 --- a/src/datastore/plugin_datastore_template.c +++ b/src/datastore/plugin_datastore_template.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2009 Christian Grothoff (and other contributing authors) + (C) 2009, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -47,7 +47,8 @@ struct Plugin * @param cls our "struct Plugin*" * @return number of bytes used on disk */ -static unsigned long long template_plugin_get_size (void *cls) +static unsigned long long +template_plugin_estimate_size (void *cls) { GNUNET_break (0); return 0; @@ -88,30 +89,11 @@ template_plugin_put (void *cls, /** - * Function invoked on behalf of a "PluginIterator" - * asking the database plugin to call the iterator - * with the next item. - * - * @param next_cls whatever argument was given - * to the PluginIterator as "next_cls". - * @param end_it set to GNUNET_YES if we - * should terminate the iteration early - * (iterator should be still called once more - * to signal the end of the iteration). - */ -static void -template_plugin_next_request (void *next_cls, - int end_it) -{ - GNUNET_break (0); -} - - -/** - * Iterate over the results for a particular key - * in the datastore. + * Get one of the results for a particular key in the datastore. * * @param cls closure + * @param offset offset of the result (mod #num-results); + * specific ordering does not matter for the offset * @param key maybe NULL (to match all entries) * @param vhash hash of the value, maybe NULL (to * match all values that have the right key). @@ -120,16 +102,17 @@ template_plugin_next_request (void *next_cls, * there may be! * @param type entries of which type are relevant? * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param proc function to call on each matching value; + * will be called with NULL if nothing matches + * @param proc_cls closure for proc */ static void -template_plugin_get (void *cls, - const GNUNET_HashCode * key, - const GNUNET_HashCode * vhash, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, void *iter_cls) +template_plugin_get_key (void *cls, + uint64_t offset, + const GNUNET_HashCode * key, + const GNUNET_HashCode * vhash, + enum GNUNET_BLOCK_Type type, + PluginDatumProcessor proc, void *proc_cls) { GNUNET_break (0); } @@ -137,34 +120,35 @@ template_plugin_get (void *cls, /** - * Get a random item for replication. Returns a single, not expired, random item - * from those with the highest replication counters. The item's - * replication counter is decremented by one IF it was positive before. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Get a random item for replication. Returns a single, not expired, + * random item from those with the highest replication counters. The + * item's replication counter is decremented by one IF it was positive + * before. Call 'proc' with all values ZERO or NULL if the datastore + * is empty. * * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param proc function to call the value (once only). + * @param proc_cls closure for proc */ static void -template_plugin_replication_get (void *cls, - PluginIterator iter, void *iter_cls) +template_plugin_get_replication (void *cls, + PluginDatumProcessor proc, void *proc_cls) { GNUNET_break (0); } /** - * Get a random item for expiration. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Get a random item for expiration. Call 'proc' with all values ZERO + * or NULL if the datastore is empty. * * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param proc function to call the value (once only). + * @param proc_cls closure for proc */ static void -template_plugin_expiration_get (void *cls, - PluginIterator iter, void *iter_cls) +template_plugin_get_expiration (void *cls, + PluginDatumProcessor proc, void *proc_cls) { GNUNET_break (0); } @@ -196,7 +180,8 @@ template_plugin_expiration_get (void *cls, static int template_plugin_update (void *cls, uint64_t uid, - int delta, struct GNUNET_TIME_Absolute expire, + int delta, + struct GNUNET_TIME_Absolute expire, char **msg) { GNUNET_break (0); @@ -206,21 +191,23 @@ template_plugin_update (void *cls, /** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. + * Call the given processor on an item with zero anonymity. * * @param cls our "struct Plugin*" + * @param offset offset of the result (mod #num-results); + * specific ordering does not matter for the offset * @param type entries of which type should be considered? * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param proc function to call on each matching value; + * will be called with NULL if no value matches + * @param proc_cls closure for proc */ static void -template_plugin_iter_zero_anonymity (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) +template_plugin_get_zero_anonymity (void *cls, + uint64_t offset, + enum GNUNET_BLOCK_Type type, + PluginDatumProcessor proc, + void *proc_cls) { GNUNET_break (0); } @@ -253,14 +240,13 @@ libgnunet_plugin_datastore_template_init (void *cls) plugin->env = env; api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions)); api->cls = plugin; - api->get_size = &template_plugin_get_size; + api->estimate_size = &template_plugin_estimate_size; api->put = &template_plugin_put; - api->next_request = &template_plugin_next_request; - api->get = &template_plugin_get; - api->replication_get = &template_plugin_replication_get; - api->expiration_get = &template_plugin_expiration_get; api->update = &template_plugin_update; - api->iter_zero_anonymity = &template_plugin_iter_zero_anonymity; + api->get_key = &template_plugin_get_key; + api->get_replication = &template_plugin_get_replication; + api->get_expiration = &template_plugin_get_expiration; + api->get_zero_anonymity = &template_plugin_get_zero_anonymity; api->drop = &template_plugin_drop; GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "template", _("Template database running\n")); diff --git a/src/datastore/test_datastore_api.c b/src/datastore/test_datastore_api.c index 4a4bbc439..deeee7164 100644 --- a/src/datastore/test_datastore_api.c +++ b/src/datastore/test_datastore_api.c @@ -102,20 +102,18 @@ get_expiration (int i) enum RunPhase { RP_DONE = 0, - RP_PUT, - RP_GET, - RP_DEL, - RP_DO_DEL, - RP_DELVALIDATE, - RP_RESERVE, - RP_PUT_MULTIPLE, - RP_PUT_MULTIPLE_NEXT, - RP_GET_MULTIPLE, - RP_GET_MULTIPLE_NEXT, /* 10 */ - RP_GET_MULTIPLE_DONE, - RP_UPDATE, - RP_UPDATE_VALIDATE, /* 13 */ - RP_UPDATE_DONE, + RP_PUT = 1, + RP_GET = 2, + RP_DEL = 3, + RP_DO_DEL = 4, + RP_DELVALIDATE = 5, + RP_RESERVE = 6, + RP_PUT_MULTIPLE = 7, + RP_PUT_MULTIPLE_NEXT = 8, + RP_GET_MULTIPLE = 9, + RP_GET_MULTIPLE_NEXT = 10, + RP_UPDATE = 11, + RP_UPDATE_VALIDATE = 12, RP_ERROR }; @@ -129,7 +127,9 @@ struct CpsRunContext void *data; size_t size; enum RunPhase phase; - unsigned long long uid; + uint64_t uid; + uint64_t offset; + uint64_t first_uid; }; @@ -144,16 +144,15 @@ check_success (void *cls, const char *msg) { struct CpsRunContext *crc = cls; + if (GNUNET_OK != success) { - ok = 42; GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Operation not successfull: `%s'\n", msg); + "Operation %d/%d not successfull: `%s'\n", + crc->phase, + crc->i, + msg); crc->phase = RP_ERROR; - GNUNET_SCHEDULER_add_continuation (&run_continuation, - crc, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - return; } GNUNET_free_non_null (crc->data); crc->data = NULL; @@ -171,7 +170,8 @@ get_reserved (void *cls, struct CpsRunContext *crc = cls; if (0 >= success) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "%s\n", msg); + "Error obtaining reservation: `%s'\n", + msg); GNUNET_assert (0 < success); crc->rid = success; GNUNET_SCHEDULER_add_continuation (&run_continuation, @@ -188,42 +188,48 @@ check_value (void *cls, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, uint64_t uid) + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) { - static int matched; struct CpsRunContext *crc = cls; int i; - if (key == NULL) - { - if (crc->i == 0) - { - crc->phase = RP_DEL; - crc->i = ITERATIONS; - } - GNUNET_assert (matched == GNUNET_YES); - matched = GNUNET_NO; - GNUNET_SCHEDULER_add_continuation (&run_continuation, - crc, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - return; - } i = crc->i; +#if 0 + fprintf (stderr, + "Check value got `%s' of size %u, type %d, expire %llu\n", + GNUNET_h2s (key), + (unsigned int) size, + type, + (unsigned long long) expiration.abs_value); + fprintf (stderr, + "Check value iteration %d wants size %u, type %d, expire %llu\n", + i, + (unsigned int) get_size (i), + get_type (i), + (unsigned long long) get_expiration(i).abs_value); +#endif GNUNET_assert (size == get_size (i)); GNUNET_assert (0 == memcmp (data, get_data(i), size)); GNUNET_assert (type == get_type (i)); GNUNET_assert (priority == get_priority (i)); GNUNET_assert (anonymity == get_anonymity(i)); GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value); - matched = GNUNET_YES; - GNUNET_DATASTORE_iterate_get_next (datastore); + crc->offset++; + if (crc->i == 0) + { + crc->phase = RP_DEL; + crc->i = ITERATIONS; + } + GNUNET_SCHEDULER_add_continuation (&run_continuation, + crc, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); } static void delete_value (void *cls, - const GNUNET_HashCode * key, + const GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, @@ -233,36 +239,23 @@ delete_value (void *cls, expiration, uint64_t uid) { struct CpsRunContext *crc = cls; - if (key == NULL) - { - if (crc->data == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Content %u not found!\n", - crc->i); - crc->phase = RP_ERROR; - } - else - { - crc->phase = RP_DO_DEL; - } - GNUNET_SCHEDULER_add_continuation (&run_continuation, - crc, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - return; - } + GNUNET_assert (crc->data == NULL); + GNUNET_assert (NULL != key); crc->size = size; crc->key = *key; crc->data = GNUNET_malloc (size); memcpy (crc->data, data, size); - GNUNET_DATASTORE_iterate_get_next (datastore); + crc->phase = RP_DO_DEL; + GNUNET_SCHEDULER_add_continuation (&run_continuation, + crc, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); } static void check_nothing (void *cls, - const GNUNET_HashCode * key, + const GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, @@ -272,11 +265,10 @@ check_nothing (void *cls, expiration, uint64_t uid) { struct CpsRunContext *crc = cls; + GNUNET_assert (key == NULL); if (crc->i == 0) - { - crc->phase = RP_RESERVE; - } + crc->phase = RP_RESERVE; GNUNET_SCHEDULER_add_continuation (&run_continuation, crc, GNUNET_SCHEDULER_REASON_PREREQ_DONE); @@ -296,47 +288,28 @@ check_multiple (void *cls, { struct CpsRunContext *crc = cls; - if (key == NULL) - { - if (crc->phase != RP_GET_MULTIPLE_DONE) - { - fprintf (stderr, - "Wrong phase: %d\n", - crc->phase); - GNUNET_break (0); - crc->phase = RP_ERROR; - } - else - { - crc->phase = RP_UPDATE; - } - GNUNET_SCHEDULER_add_continuation (&run_continuation, - crc, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - return; - } + GNUNET_assert (key != NULL); switch (crc->phase) { case RP_GET_MULTIPLE: crc->phase = RP_GET_MULTIPLE_NEXT; + crc->first_uid = uid; + crc->offset++; break; case RP_GET_MULTIPLE_NEXT: - crc->phase = RP_GET_MULTIPLE_DONE; - break; - case RP_GET_MULTIPLE_DONE: - /* do not advance further */ + GNUNET_assert (uid != crc->first_uid); + crc->phase = RP_UPDATE; break; default: GNUNET_break (0); + crc->phase = RP_ERROR; break; } -#if VERBOSE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Test in phase %u\n", crc->phase); -#endif if (priority == get_priority (42)) crc->uid = uid; - GNUNET_DATASTORE_iterate_get_next (datastore); + GNUNET_SCHEDULER_add_continuation (&run_continuation, + crc, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); } @@ -353,31 +326,19 @@ check_update (void *cls, { struct CpsRunContext *crc = cls; - if (key == NULL) - { - if (crc->phase != RP_UPDATE_DONE) - { - GNUNET_break (0); - crc->phase = RP_ERROR; - } - else - { - crc->phase = RP_DONE; - } - GNUNET_SCHEDULER_add_continuation (&run_continuation, - crc, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - return; - } + GNUNET_assert (key != NULL); if ( (anonymity == get_anonymity (42)) && (size == get_size (42)) && (priority == get_priority (42) + 100) ) + crc->phase = RP_DONE; + else { - crc->phase = RP_UPDATE_DONE; + GNUNET_assert (size == get_size (43)); + crc->offset++; } - else - GNUNET_assert (size == get_size (43)); - GNUNET_DATASTORE_iterate_get_next (datastore); + GNUNET_SCHEDULER_add_continuation (&run_continuation, + crc, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); } @@ -427,12 +388,13 @@ run_continuation (void *cls, crc->i); #endif GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_iterate_key (datastore, - &crc->key, - get_type (crc->i), - 1, 1, TIMEOUT, - &check_value, - crc); + GNUNET_DATASTORE_get_key (datastore, + crc->offset, + &crc->key, + get_type (crc->i), + 1, 1, TIMEOUT, + &check_value, + crc); break; case RP_DEL: crc->i--; @@ -444,12 +406,14 @@ run_continuation (void *cls, #endif crc->data = NULL; GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_iterate_key (datastore, - &crc->key, - get_type (crc->i), - 1, 1, TIMEOUT, - &delete_value, - crc); + GNUNET_assert (NULL != + GNUNET_DATASTORE_get_key (datastore, + crc->offset, + &crc->key, + get_type (crc->i), + 1, 1, TIMEOUT, + &delete_value, + crc)); break; case RP_DO_DEL: #if VERBOSE @@ -467,13 +431,14 @@ run_continuation (void *cls, { crc->phase = RP_DEL; } - GNUNET_DATASTORE_remove (datastore, - &crc->key, - crc->size, - crc->data, - 1, 1, TIMEOUT, - &check_success, - crc); + GNUNET_assert (NULL != + GNUNET_DATASTORE_remove (datastore, + &crc->key, + crc->size, + crc->data, + 1, 1, TIMEOUT, + &check_success, + crc)); break; case RP_DELVALIDATE: crc->i--; @@ -484,12 +449,14 @@ run_continuation (void *cls, crc->i); #endif GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_iterate_key (datastore, - &crc->key, - get_type (crc->i), - 1, 1, TIMEOUT, - &check_nothing, - crc); + GNUNET_assert (NULL != + GNUNET_DATASTORE_get_key (datastore, + crc->offset, + &crc->key, + get_type (crc->i), + 1, 1, TIMEOUT, + &check_nothing, + crc)); break; case RP_RESERVE: crc->phase = RP_PUT_MULTIPLE; @@ -533,16 +500,24 @@ run_continuation (void *cls, crc); break; case RP_GET_MULTIPLE: - GNUNET_DATASTORE_iterate_key (datastore, - &crc->key, - get_type (42), - 1, 1, TIMEOUT, - &check_multiple, - crc); + GNUNET_assert (NULL != + GNUNET_DATASTORE_get_key (datastore, + crc->offset, + &crc->key, + get_type (42), + 1, 1, TIMEOUT, + &check_multiple, + crc)); break; case RP_GET_MULTIPLE_NEXT: - case RP_GET_MULTIPLE_DONE: - GNUNET_assert (0); + GNUNET_assert (NULL != + GNUNET_DATASTORE_get_key (datastore, + crc->offset, + &crc->key, + get_type (42), + 1, 1, TIMEOUT, + &check_multiple, + crc)); break; case RP_UPDATE: GNUNET_assert (crc->uid > 0); @@ -556,15 +531,14 @@ run_continuation (void *cls, crc); break; case RP_UPDATE_VALIDATE: - GNUNET_DATASTORE_iterate_key (datastore, - &crc->key, - get_type (42), - 1, 1, TIMEOUT, - &check_update, - crc); - break; - case RP_UPDATE_DONE: - GNUNET_assert (0); + GNUNET_assert (NULL != + GNUNET_DATASTORE_get_key (datastore, + crc->offset, + &crc->key, + get_type (42), + 1, 1, TIMEOUT, + &check_update, + crc)); break; case RP_DONE: #if VERBOSE @@ -681,6 +655,7 @@ check () argv, "test-datastore-api", "nohelp", options, &run, NULL); #if START_DATASTORE + sleep (1); /* give datastore chance to receive 'DROP' request */ if (0 != GNUNET_OS_process_kill (proc, SIGTERM)) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); diff --git a/src/datastore/test_datastore_api_data_sqlite.conf b/src/datastore/test_datastore_api_data_sqlite.conf index d7c01fe22..931572025 100644 --- a/src/datastore/test_datastore_api_data_sqlite.conf +++ b/src/datastore/test_datastore_api_data_sqlite.conf @@ -29,7 +29,7 @@ DATABASE = sqlite # REJECT_FROM = # REJECT_FROM6 = # PREFIX = -# DEBUG = YES +#DEBUG = YES #PREFIX = valgrind --tool=memcheck --leak-check=yes #BINARY = /home/grothoff/bin/gnunet-service-datastore diff --git a/src/datastore/test_datastore_api_management.c b/src/datastore/test_datastore_api_management.c index 5dfb5cea7..41aa7ae3e 100644 --- a/src/datastore/test_datastore_api_management.c +++ b/src/datastore/test_datastore_api_management.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors) + (C) 2004, 2005, 2006, 2007, 2009, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -97,9 +97,9 @@ get_expiration (int i) enum RunPhase { - RP_DONE = 0, RP_PUT, RP_GET, + RP_DONE, RP_GET_FAIL }; @@ -112,6 +112,7 @@ struct CpsRunContext const struct GNUNET_CONFIGURATION_Handle *cfg; void *data; enum RunPhase phase; + uint64_t offset; }; @@ -146,42 +147,26 @@ check_value (void *cls, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, uint64_t uid) + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) { struct CpsRunContext *crc = cls; int i; - if (key == NULL) - { - crc->i--; - if (crc->found == GNUNET_YES) - { - crc->phase = RP_GET; - crc->found = GNUNET_NO; - } - else - { - fprintf (stderr, - "First not found was %u\n", crc->i); - crc->phase = RP_GET_FAIL; - } - if (0 == crc->i) - crc->phase = RP_DONE; - GNUNET_SCHEDULER_add_continuation (&run_continuation, - crc, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); - return; - } i = crc->i; - crc->found = GNUNET_YES; GNUNET_assert (size == get_size (i)); GNUNET_assert (0 == memcmp (data, get_data(i), size)); GNUNET_assert (type == get_type (i)); GNUNET_assert (priority == get_priority (i)); GNUNET_assert (anonymity == get_anonymity(i)); GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value); - GNUNET_DATASTORE_iterate_get_next (datastore); + crc->offset++; + crc->i--; + if (crc->i == 0) + crc->phase = RP_DONE; + GNUNET_SCHEDULER_add_continuation (&run_continuation, + crc, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); } @@ -241,7 +226,7 @@ run_continuation (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sleeping to give datastore time to clean up\n"); - sleep (5); + sleep (1); crc->phase = RP_GET; crc->i--; } @@ -254,12 +239,13 @@ run_continuation (void *cls, crc->i); #endif GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_iterate_key (datastore, - &crc->key, - get_type (crc->i), - 1, 1, TIMEOUT, - &check_value, - crc); + GNUNET_DATASTORE_get_key (datastore, + crc->offset++, + &crc->key, + get_type (crc->i), + 1, 1, TIMEOUT, + &check_value, + crc); break; case RP_GET_FAIL: #if VERBOSE @@ -269,12 +255,13 @@ run_continuation (void *cls, crc->i); #endif GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); - GNUNET_DATASTORE_iterate_key (datastore, - &crc->key, - get_type (crc->i), - 1, 1, TIMEOUT, - &check_nothing, - crc); + GNUNET_DATASTORE_get_key (datastore, + crc->offset++, + &crc->key, + get_type (crc->i), + 1, 1, TIMEOUT, + &check_nothing, + crc); break; case RP_DONE: GNUNET_assert (0 == crc->i); @@ -372,6 +359,7 @@ check () GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, argv, "test-datastore-api", "nohelp", options, &run, NULL); + sleep (1); /* give datastore chance to process 'DROP' request */ if (0 != GNUNET_OS_process_kill (proc, SIGTERM)) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); diff --git a/src/datastore/test_plugin_datastore.c b/src/datastore/test_plugin_datastore.c index d38e908ac..f0961f51e 100644 --- a/src/datastore/test_plugin_datastore.c +++ b/src/datastore/test_plugin_datastore.c @@ -65,6 +65,7 @@ struct CpsRunContext enum RunPhase phase; unsigned int cnt; unsigned int i; + uint64_t offset; }; @@ -120,6 +121,11 @@ put_value (struct GNUNET_DATASTORE_PluginFunctions * api, value[0] = k; msg = NULL; prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100); +#if VERBOSE + fprintf (stderr, + "putting type %u, anon %u under key %s\n", + i+1, i, GNUNET_h2s (&key)); +#endif if (GNUNET_OK != api->put (api->cls, &key, size, @@ -149,9 +155,11 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +static uint64_t guid; + + static int iterate_one_shot (void *cls, - void *next_cls, const GNUNET_HashCode * key, uint32_t size, const void *data, @@ -164,57 +172,21 @@ iterate_one_shot (void *cls, { struct CpsRunContext *crc = cls; - GNUNET_assert (NULL == next_cls); GNUNET_assert (key != NULL); + guid = uid; crc->phase++; #if VERBOSE fprintf (stderr, - "Found result type=%u, priority=%u, size=%u, expire=%llu\n", + "Found result type=%u, priority=%u, size=%u, expire=%llu, key %s\n", type, priority, size, - (unsigned long long) expiration.abs_value); + (unsigned long long) expiration.abs_value, + GNUNET_h2s (key)); #endif GNUNET_SCHEDULER_add_now (&test, crc); return GNUNET_OK; } -static uint64_t guid; - -static int -iterate_with_next (void *cls, - void *next_cls, - const GNUNET_HashCode * key, - uint32_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, - uint64_t uid) -{ - struct CpsRunContext *crc = cls; - - if (key == NULL) - { - crc->phase++; - GNUNET_SCHEDULER_add_now (&test, crc); - return GNUNET_OK; - } - guid = uid; -#if VERBOSE - fprintf (stderr, - "Found result type=%u, priority=%u, size=%u, expire=%llu\n", - type, priority, size, - (unsigned long long) expiration.abs_value); -#endif - crc->cnt++; - crc->api->next_request (next_cls, - GNUNET_NO); - return GNUNET_OK; -} - - /** * Function called when the service shuts * down. Unloads our datastore plugin. @@ -274,12 +246,19 @@ test (void *cls, if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Test aborted.\n"); crc->phase = RP_ERROR; - ok = 1; } +#if VERBOSE + fprintf (stderr, "In phase %d, iteration %u\n", + crc->phase, + crc->cnt); +#endif switch (crc->phase) { case RP_ERROR: + ok = 1; GNUNET_break (0); crc->api->drop (crc->api->cls); GNUNET_SCHEDULER_add_now (&cleaning_task, crc); @@ -289,7 +268,7 @@ test (void *cls, for (j=0;japi, j, crc->i); - cs = crc->api->get_size (crc->api->cls); + cs = crc->api->estimate_size (crc->api->cls); GNUNET_assert (os < cs); os = cs; } @@ -305,11 +284,12 @@ test (void *cls, break; } gen_key (5, &key); - crc->api->get (crc->api->cls, - &key, NULL, - GNUNET_BLOCK_TYPE_ANY, - &iterate_with_next, - crc); + crc->api->get_key (crc->api->cls, + crc->offset++, + &key, NULL, + GNUNET_BLOCK_TYPE_ANY, + &iterate_one_shot, + crc); break; case RP_UPDATE: GNUNET_assert (GNUNET_OK == @@ -329,18 +309,19 @@ test (void *cls, GNUNET_SCHEDULER_add_now (&test, crc); break; } - crc->api->iter_zero_anonymity (crc->api->cls, - 1, - &iterate_with_next, - crc); + crc->api->get_zero_anonymity (crc->api->cls, + 0, + 1, + &iterate_one_shot, + crc); break; case RP_REPL_GET: - crc->api->replication_get (crc->api->cls, + crc->api->get_replication (crc->api->cls, &iterate_one_shot, crc); break; case RP_EXPI_GET: - crc->api->expiration_get (crc->api->cls, + crc->api->get_expiration (crc->api->cls, &iterate_one_shot, crc); break; diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index f980f4206..20aa652ae 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am @@ -1,4 +1,3 @@ - INCLUDES = -I$(top_srcdir)/src/include if MINGW @@ -173,8 +172,7 @@ check_SCRIPTS = \ test_gnunet_fs_idx.py endif -#if !DISABLE_TEST_RUN -if 0 +if !DISABLE_TEST_RUN TESTS = \ test_fs_directory \ test_fs_download \ diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c index 8192b8c1f..8eb2b4331 100644 --- a/src/fs/fs_download.c +++ b/src/fs/fs_download.c @@ -756,10 +756,12 @@ try_top_down_reconstruction (struct GNUNET_FS_DownloadContext *dc, child_block_size = GNUNET_FS_tree_compute_tree_size (drc->depth); GNUNET_assert (0 == (drc->offset - dr->offset) % child_block_size); chk_off = (drc->offset - dr->offset) / child_block_size; - GNUNET_assert (drc->state == BRS_INIT); - drc->state = BRS_CHK_SET; - drc->chk = chks[chk_off]; - try_top_down_reconstruction (dc, drc); + if (drc->state == BRS_INIT) + { + drc->state = BRS_CHK_SET; + drc->chk = chks[chk_off]; + try_top_down_reconstruction (dc, drc); + } if (drc->state != BRS_DOWNLOAD_UP) up_done = GNUNET_NO; /* children not all done */ } @@ -815,10 +817,11 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc, dr->depth, GNUNET_h2s (&dr->chk.query)); #endif - GNUNET_assert (GNUNET_NO == - GNUNET_CONTAINER_multihashmap_contains_value (dc->active, - &dr->chk.query, - dr)); + if (GNUNET_NO != + GNUNET_CONTAINER_multihashmap_contains_value (dc->active, + &dr->chk.query, + dr)) + return; /* already active */ GNUNET_CONTAINER_multihashmap_put (dc->active, &dr->chk.query, dr, diff --git a/src/fs/fs_test_lib_data.conf b/src/fs/fs_test_lib_data.conf index 68c5166b3..204bb90cf 100644 --- a/src/fs/fs_test_lib_data.conf +++ b/src/fs/fs_test_lib_data.conf @@ -43,7 +43,7 @@ HOSTNAME = localhost #TOTAL_QUOTA_OUT = 9321 TOTAL_QUOTA_IN = 3932160 TOTAL_QUOTA_OUT = 3932160 -DEBUG = YES +#DEBUG = YES #PREFIX = valgrind --tool=memcheck --leak-check=yes #BINARY = /home/grothoff/bin/gnunet-service-core @@ -53,8 +53,8 @@ HOSTNAME = localhost #OPTIONS = -L DEBUG CONTENT_CACHING = NO CONTENT_PUSHING = NO -DEBUG = YES -#PREFIX = valgrind --tool=memcheck --leak-check=yes +# DEBUG = YES +# PREFIX = valgrind --tool=memcheck --leak-check=yes --trace-children=yes #BINARY = /home/grothoff/gn9/bin/gnunet-service-fs #PREFIX = xterm -e gdb -x cmd --args diff --git a/src/fs/gnunet-pseudonym.c b/src/fs/gnunet-pseudonym.c index 769b4239d..68a760867 100644 --- a/src/fs/gnunet-pseudonym.c +++ b/src/fs/gnunet-pseudonym.c @@ -341,7 +341,7 @@ main (int argc, char *const *argv) 0, &GNUNET_GETOPT_set_one, &no_remote_printing}, {'r', "replication", "LEVEL", gettext_noop ("set the desired replication LEVEL"), - 0, &GNUNET_GETOPT_set_uint, &bo.replication_level}, + 1, &GNUNET_GETOPT_set_uint, &bo.replication_level}, {'R', "root", "ID", gettext_noop ("specify ID of the root of the namespace"), diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 2522cbe7b..acad54501 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -704,9 +704,9 @@ copy_reply (void *cls, /** - * Free the given client request. + * Free the given request. * - * @param cls the client request to free + * @param cls the request to free * @param tc task context */ static void @@ -1182,6 +1182,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, NULL, 0, /* replies_seen */ &handle_p2p_reply, peerreq); + GNUNET_assert (NULL != pr); peerreq->pr = pr; GNUNET_break (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (cp->request_map, @@ -1427,7 +1428,7 @@ cancel_pending_request (void *cls, const GNUNET_HashCode *query, void *value) { - struct PeerRequest *peerreq = cls; + struct PeerRequest *peerreq = value; struct GSF_PendingRequest *pr = peerreq->pr; GSF_pending_request_cancel_ (pr); diff --git a/src/fs/gnunet-service-fs_indexing.c b/src/fs/gnunet-service-fs_indexing.c index cc99d3962..dc6b82952 100644 --- a/src/fs/gnunet-service-fs_indexing.c +++ b/src/fs/gnunet-service-fs_indexing.c @@ -566,7 +566,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, uint32_t anonymity, struct GNUNET_TIME_Absolute expiration, uint64_t uid, - GNUNET_DATASTORE_Iterator cont, + GNUNET_DATASTORE_DatumProcessor cont, void *cont_cls) { const struct OnDemandBlock *odb; diff --git a/src/fs/gnunet-service-fs_indexing.h b/src/fs/gnunet-service-fs_indexing.h index 6a2c3d4a0..e1154830b 100644 --- a/src/fs/gnunet-service-fs_indexing.h +++ b/src/fs/gnunet-service-fs_indexing.h @@ -63,7 +63,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, uint32_t anonymity, struct GNUNET_TIME_Absolute expiration, uint64_t uid, - GNUNET_DATASTORE_Iterator cont, + GNUNET_DATASTORE_DatumProcessor cont, void *cont_cls); /** diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index 28036150f..4dc9de1b8 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c @@ -158,7 +158,7 @@ plan (struct PeerPlan *pp, rp->transmission_counter); #endif - + GNUNET_assert (rp->hn == NULL); if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 7406bed0f..c1074e8bf 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -99,6 +99,20 @@ struct GSF_PendingRequest */ GNUNET_PEER_Id sender_pid; + /** + * Current offset for querying our local datastore for results. + * Starts at a random value, incremented until we get the same + * UID again (detected using 'first_uid'), which is then used + * to termiante the iteration. + */ + uint64_t local_result_offset; + + /** + * Unique ID of the first result from the local datastore; + * used to detect wrap-around of the offset. + */ + uint64_t first_uid; + /** * Number of valid entries in the 'replies_seen' array. */ @@ -113,7 +127,7 @@ struct GSF_PendingRequest * Mingle value we currently use for the bf. */ uint32_t mingle; - + }; @@ -273,6 +287,8 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, type); #endif pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); + pr->local_result_offset = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT64_MAX); pr->public_data.query = *query; if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type) { @@ -535,7 +551,20 @@ clean_request (void *cls, void *value) { struct GSF_PendingRequest *pr = value; - + GSF_LocalLookupContinuation cont; + +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cleaning up pending request for `%s'.\n", + GNUNET_h2s (key)); +#endif + if (NULL != (cont = pr->llc_cont)) + { + pr->llc_cont = NULL; + cont (pr->llc_cont_cls, + pr, + pr->local_result); + } GSF_plan_notify_request_done_ (pr); GNUNET_free_non_null (pr->replies_seen); if (NULL != pr->bf) @@ -560,6 +589,7 @@ clean_request (void *cls, void GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) { + if (NULL == pr_map) return; /* already cleaned up! */ GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_remove (pr_map, &pr->public_data.query, @@ -1023,13 +1053,22 @@ process_local_reply (void *cls, GNUNET_HashCode query; unsigned int old_rf; + pr->qe = NULL; + if (0 == pr->replies_seen_count) + { + pr->first_uid = uid; + } + else + { + if (uid == pr->first_uid) + key = NULL; /* all replies seen! */ + } if (NULL == key) { #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No further local responses available.\n"); #endif - pr->qe = NULL; if (NULL != (cont = pr->llc_cont)) { pr->llc_cont = NULL; @@ -1041,9 +1080,10 @@ process_local_reply (void *cls, } #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "New local response to `%s' of type %u.\n", + "Received reply for `%s' of type %d with UID %llu from datastore.\n", GNUNET_h2s (key), - type); + type, + (unsigned long long) uid); #endif if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) { @@ -1061,8 +1101,22 @@ process_local_reply (void *cls, &process_local_reply, pr)) { - if (pr->qe != NULL) - GNUNET_DATASTORE_iterate_get_next (GSF_dsh); + pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, + pr->local_result_offset - 1, + &pr->public_data.query, + pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK + ? GNUNET_BLOCK_TYPE_ANY + : pr->public_data.type, + (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) + ? UINT_MAX + : 1 /* queue priority */, + (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) + ? UINT_MAX + : 1 /* max queue size */, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_local_reply, + pr); + GNUNET_assert (NULL != pr->qe); } return; } @@ -1085,7 +1139,22 @@ process_local_reply (void *cls, -1, -1, GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL); - GNUNET_DATASTORE_iterate_get_next (GSF_dsh); + pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, + pr->local_result_offset - 1, + &pr->public_data.query, + pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK + ? GNUNET_BLOCK_TYPE_ANY + : pr->public_data.type, + (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) + ? UINT_MAX + : 1 /* queue priority */, + (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) + ? UINT_MAX + : 1 /* max queue size */, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_local_reply, + pr); + GNUNET_assert (NULL != pr->qe); return; } prq.type = type; @@ -1097,12 +1166,16 @@ process_local_reply (void *cls, GSF_update_datastore_delay_ (pr->public_data.start_time); process_reply (&prq, key, pr); pr->local_result = prq.eval; - if (pr->qe == NULL) + if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Request cancelled, not asking datastore for more\n"); -#endif + if (NULL != (cont = pr->llc_cont)) + { + pr->llc_cont = NULL; + cont (pr->llc_cont_cls, + pr, + pr->local_result); + } + return; } if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) || @@ -1116,8 +1189,6 @@ process_local_reply (void *cls, gettext_noop ("# processing result set cut short due to load"), 1, GNUNET_NO); - GNUNET_DATASTORE_cancel (pr->qe); - pr->qe = NULL; if (NULL != (cont = pr->llc_cont)) { pr->llc_cont = NULL; @@ -1127,7 +1198,22 @@ process_local_reply (void *cls, } return; } - GNUNET_DATASTORE_iterate_get_next (GSF_dsh); + pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, + pr->local_result_offset++, + &pr->public_data.query, + pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK + ? GNUNET_BLOCK_TYPE_ANY + : pr->public_data.type, + (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) + ? UINT_MAX + : 1 /* queue priority */, + (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) + ? UINT_MAX + : 1 /* max queue size */, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_local_reply, + pr); + GNUNET_assert (NULL != pr->qe); } @@ -1147,20 +1233,21 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, GNUNET_assert (NULL == pr->llc_cont); pr->llc_cont = cont; pr->llc_cont_cls = cont_cls; - pr->qe = GNUNET_DATASTORE_iterate_key (GSF_dsh, - &pr->public_data.query, - pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK - ? GNUNET_BLOCK_TYPE_ANY - : pr->public_data.type, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* queue priority */, - (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) - ? UINT_MAX - : 1 /* max queue size */, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_local_reply, - pr); + pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, + pr->local_result_offset++, + &pr->public_data.query, + pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK + ? GNUNET_BLOCK_TYPE_ANY + : pr->public_data.type, + (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) + ? UINT_MAX + : 1 /* queue priority */, + (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) + ? UINT_MAX + : 1 /* max queue size */, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_local_reply, + pr); } diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c index 121a90bcd..b15207ce8 100644 --- a/src/fs/gnunet-service-fs_put.c +++ b/src/fs/gnunet-service-fs_put.c @@ -35,25 +35,50 @@ /** - * Request to datastore for DHT PUTs (or NULL). + * Context for each zero-anonymity iterator. */ -static struct GNUNET_DATASTORE_QueueEntry *dht_qe; +struct PutOperator +{ -/** - * Type we will request for the next DHT PUT round from the datastore. - */ -static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; + /** + * Request to datastore for DHT PUTs (or NULL). + */ + struct GNUNET_DATASTORE_QueueEntry *dht_qe; + + /** + * Type we request from the datastore. + */ + enum GNUNET_BLOCK_Type dht_put_type; + + /** + * ID of task that collects blocks for DHT PUTs. + */ + GNUNET_SCHEDULER_TaskIdentifier dht_task; + + /** + * How many entires with zero anonymity of our type do we currently + * estimate to have in the database? + */ + uint64_t zero_anonymity_count_estimate; + + /** + * Current offset when iterating the database. + */ + uint64_t current_offset; +}; -/** - * ID of task that collects blocks for DHT PUTs. - */ -static GNUNET_SCHEDULER_TaskIdentifier dht_task; /** - * How many entires with zero anonymity do we currently estimate - * to have in the database? + * ANY-terminated list of our operators (one per type + * of block that we're putting into the DHT). */ -static unsigned int zero_anonymity_count_estimate; +static struct PutOperator operators[] = + { + { NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0 }, + { NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0 }, + { NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0 }, + { NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0 } + }; /** @@ -67,26 +92,26 @@ gather_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - /** - * If the DHT PUT gathering task is not currently running, consider - * (re)scheduling it with the appropriate delay. + * Task that is run periodically to obtain blocks for DHT PUTs. + * + * @param cls type of blocks to gather + * @param tc scheduler context (unused) */ static void -consider_dht_put_gathering (void *cls) +delay_dht_put_blocks (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct PutOperator *po = cls; struct GNUNET_TIME_Relative delay; - if (GSF_dsh == NULL) - return; - if (dht_qe != NULL) + po->dht_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - if (dht_task != GNUNET_SCHEDULER_NO_TASK) - return; - if (zero_anonymity_count_estimate > 0) + if (po->zero_anonymity_count_estimate > 0) { delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, - zero_anonymity_count_estimate); + po->zero_anonymity_count_estimate); delay = GNUNET_TIME_relative_min (delay, MAX_DHT_PUT_FREQ); } @@ -96,20 +121,9 @@ consider_dht_put_gathering (void *cls) (hopefully) appear */ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); } - dht_task = GNUNET_SCHEDULER_add_delayed (delay, - &gather_dht_put_blocks, - cls); -} - - -/** - * Function called upon completion of the DHT PUT operation. - */ -static void -dht_put_continuation (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - GNUNET_DATASTORE_iterate_get_next (GSF_dsh); + po->dht_task = GNUNET_SCHEDULER_add_delayed (delay, + &gather_dht_put_blocks, + po); } @@ -138,31 +152,19 @@ process_dht_put_content (void *cls, struct GNUNET_TIME_Absolute expiration, uint64_t uid) { - static unsigned int counter; - static GNUNET_HashCode last_vhash; - static GNUNET_HashCode vhash; + struct PutOperator *po = cls; + po->dht_qe = NULL; if (key == NULL) { - dht_qe = NULL; - consider_dht_put_gathering (cls); + po->zero_anonymity_count_estimate = po->current_offset - 1; + po->current_offset = 0; + po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, + po); return; } - /* slightly funky code to estimate the total number of values with zero - anonymity from the maximum observed length of a monotonically increasing - sequence of hashes over the contents */ - GNUNET_CRYPTO_hash (data, size, &vhash); - if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0) - { - if (zero_anonymity_count_estimate > 0) - zero_anonymity_count_estimate /= 2; - counter = 0; - } - last_vhash = vhash; - if (counter < 31) - counter++; - if (zero_anonymity_count_estimate < (1 << counter)) - zero_anonymity_count_estimate = (1 << counter); + po->zero_anonymity_count_estimate = GNUNET_MAX (po->current_offset, + po->zero_anonymity_count_estimate); #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Retrieved block `%s' of type %u for DHT PUT\n", @@ -178,8 +180,8 @@ process_dht_put_content (void *cls, data, expiration, GNUNET_TIME_UNIT_FOREVER_REL, - &dht_put_continuation, - cls); + &delay_dht_put_blocks, + po); } @@ -193,17 +195,20 @@ static void gather_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - dht_task = GNUNET_SCHEDULER_NO_TASK; - if (GSF_dsh == NULL) + struct PutOperator *po = cls; + + po->dht_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) - dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; - dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (GSF_dsh, + po->dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, + po->current_offset++, 0, UINT_MAX, GNUNET_TIME_UNIT_FOREVER_REL, - dht_put_type++, - &process_dht_put_content, NULL); - GNUNET_assert (dht_qe != NULL); + po->dht_put_type, + &process_dht_put_content, po); + if (NULL == po->dht_qe) + po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, + po); } @@ -213,7 +218,14 @@ gather_dht_put_blocks (void *cls, void GSF_put_init_ () { - dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL); + unsigned int i; + + i = 0; + while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY) + { + operators[i].dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]); + i++; + } } @@ -223,15 +235,23 @@ GSF_put_init_ () void GSF_put_done_ () { - if (GNUNET_SCHEDULER_NO_TASK != dht_task) - { - GNUNET_SCHEDULER_cancel (dht_task); - dht_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != dht_qe) + struct PutOperator *po; + unsigned int i; + + i = 0; + while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY) { - GNUNET_DATASTORE_cancel (dht_qe); - dht_qe = NULL; + if (GNUNET_SCHEDULER_NO_TASK != po->dht_task) + { + GNUNET_SCHEDULER_cancel (po->dht_task); + po->dht_task = GNUNET_SCHEDULER_NO_TASK; + } + if (NULL != po->dht_qe) + { + GNUNET_DATASTORE_cancel (po->dht_qe); + po->dht_qe = NULL; + } + i++; } } diff --git a/src/fs/test_fs_download_data.conf b/src/fs/test_fs_download_data.conf index 0a7eb311a..6bbae9dc9 100644 --- a/src/fs/test_fs_download_data.conf +++ b/src/fs/test_fs_download_data.conf @@ -36,7 +36,8 @@ HOSTNAME = localhost [fs] PORT = 42471 HOSTNAME = localhost -ACTIVEMIGRATION = NO +CONTENT_CACHING = NO +CONTENT_PUSHING = NO # DEBUG = YES #PREFIX = valgrind --tool=memcheck --leak-check=yes #BINARY = /home/grothoff/bin/gnunet-service-fs diff --git a/src/fs/test_gnunet_fs_idx.py.in b/src/fs/test_gnunet_fs_idx.py.in index 3bb3681c6..c97ffd883 100755 --- a/src/fs/test_gnunet_fs_idx.py.in +++ b/src/fs/test_gnunet_fs_idx.py.in @@ -31,7 +31,7 @@ try: pub.expect ("URI is `gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147'.\r") pub.expect (pexpect.EOF) - down = pexpect.spawn ('gnunet-download -c test_gnunet_fs_idx_data.conf -o \"COPYING\" gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147') + down = pexpect.spawn ('gnunet-download -c test_gnunet_fs_idx_data.conf -o COPYING gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147') down.expect (re.compile ("Downloading `COPYING\' done \(.*\).\r")); down.expect (pexpect.EOF); os.system ('rm COPYING'); diff --git a/src/fs/test_gnunet_fs_ns_data.conf b/src/fs/test_gnunet_fs_ns_data.conf index 65bac0a15..2086cd0fd 100644 --- a/src/fs/test_gnunet_fs_ns_data.conf +++ b/src/fs/test_gnunet_fs_ns_data.conf @@ -36,7 +36,7 @@ HOSTNAME = localhost [fs] PORT = 47471 HOSTNAME = localhost -#DEBUG = YES +DEBUG = YES #PREFIX = valgrind --tool=memcheck --leak-check=yes #BINARY = /home/grothoff/bin/gnunet-service-fs diff --git a/src/fs/test_gnunet_service_fs_migration_data.conf b/src/fs/test_gnunet_service_fs_migration_data.conf index a72a98e97..3ab61d76c 100644 --- a/src/fs/test_gnunet_service_fs_migration_data.conf +++ b/src/fs/test_gnunet_service_fs_migration_data.conf @@ -53,7 +53,7 @@ HOSTNAME = localhost ACTIVEMIGRATION = YES CONTENT_CACHING = YES CONTENT_PUSHING = YES -DEBUG = YES +#DEBUG = YES #PREFIX = valgrind --tool=memcheck --leak-check=yes #PREFIX = xterm -e gdb -x cmd --args diff --git a/src/include/gnunet_datastore_plugin.h b/src/include/gnunet_datastore_plugin.h index a5c548146..4d717996d 100644 --- a/src/include/gnunet_datastore_plugin.h +++ b/src/include/gnunet_datastore_plugin.h @@ -78,26 +78,9 @@ struct GNUNET_DATASTORE_PluginEnvironment /** - * Function invoked on behalf of a "PluginIterator" - * asking the database plugin to call the iterator - * with the next item. - * - * @param next_cls whatever argument was given - * to the PluginIterator as "next_cls". - * @param end_it set to GNUNET_YES if we - * should terminate the iteration early - * (iterator should be still called once more - * to signal the end of the iteration). - */ -typedef void (*PluginNextRequest)(void *next_cls, - int end_it); - - -/** - * An iterator over a set of items stored in the datastore. + * An processor over a set of items stored in the datastore. * * @param cls closure - * @param next_cls closure to pass to the "next" function. * @param key key for the content * @param size number of bytes in data * @param data content stored @@ -105,24 +88,21 @@ typedef void (*PluginNextRequest)(void *next_cls, * @param priority priority of the content * @param anonymity anonymity-level for the content * @param expiration expiration time for the content - * @param uid unique identifier for the datum; - * maybe 0 if no unique identifier is available + * @param uid unique identifier for the datum * - * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue - * (continue on call to "next", of course), - * GNUNET_NO to delete the item and continue (if supported) + * @return GNUNET_OK to keep the item + * GNUNET_NO to delete the item */ -typedef int (*PluginIterator) (void *cls, - void *next_cls, - const GNUNET_HashCode * key, - uint32_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, - uint64_t uid); +typedef int (*PluginDatumProcessor) (void *cls, + const GNUNET_HashCode * key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, + uint64_t uid); /** * Get an estimate of how much space the database is @@ -131,7 +111,7 @@ typedef int (*PluginIterator) (void *cls, * @param cls closure * @return number of bytes used on disk */ -typedef unsigned long long (*PluginGetSize) (void *cls); +typedef unsigned long long (*PluginEstimateSize) (void *cls); /** @@ -165,10 +145,11 @@ typedef int (*PluginPut) (void *cls, /** - * Iterate over the results for a particular key - * in the datastore. + * Get one of the results for a particular key in the datastore. * * @param cls closure + * @param offset offset of the result (mod #num-results); + * specific ordering does not matter for the offset * @param key key to match, never NULL * @param vhash hash of the value, maybe NULL (to * match all values that have the right key). @@ -177,34 +158,31 @@ typedef int (*PluginPut) (void *cls, * there may be! * @param type entries of which type are relevant? * Use 0 for any type. - * @param iter function to call on each matching value; however, - * after the first call to "iter", the plugin must wait - * until "NextRequest" was called before giving the iterator - * the next item; finally, the "iter" should be called once - * once with a NULL value at the end ("next_cls" should be NULL - * for that last call) - * @param iter_cls closure for iter + * @param proc function to call on the matching value; + * proc should be called with NULL if there is no result + * @param proc_cls closure for proc */ -typedef void (*PluginGet) (void *cls, - const GNUNET_HashCode *key, - const GNUNET_HashCode *vhash, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, void *iter_cls); +typedef void (*PluginGetKey) (void *cls, + uint64_t offset, + const GNUNET_HashCode *key, + const GNUNET_HashCode *vhash, + enum GNUNET_BLOCK_Type type, + PluginDatumProcessor proc, void *proc_cls); /** * Get a random item (additional constraints may apply depending on - * the specific implementation). Calls 'iter' with all values ZERO or - * NULL if no item applies, otherwise 'iter' is called once and only + * the specific implementation). Calls 'proc' with all values ZERO or + * NULL if no item applies, otherwise 'proc' is called once and only * once with an item, with the 'next_cls' argument being NULL. * * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param proc function to call the value (once only). + * @param proc_cls closure for proc */ -typedef void (*PluginRandomGet) (void *cls, - PluginIterator iter, void *iter_cls); +typedef void (*PluginGetRandom) (void *cls, + PluginDatumProcessor proc, void *proc_cls); /** @@ -238,26 +216,22 @@ typedef int (*PluginUpdate) (void *cls, /** - * Select a subset of the items in the datastore and call the given - * iterator for the first item; then allow getting more items by - * calling the 'next_request' callback with the given 'next_cls' - * argument passed to 'iter'. + * Select a single item from the datastore at the specified offset + * (among those applicable). * * @param cls closure + * @param offset offset of the result (mod #num-results); + * specific ordering does not matter for the offset * @param type entries of which type should be considered? - * Myst not be zero (ANY). - * @param iter function to call on each matching value; however, - * after the first call to "iter", the plugin must wait - * until "NextRequest" was called before giving the iterator - * the next item; finally, the "iter" should be called once - * once with a NULL value at the end ("next_cls" should be NULL - * for that last call) - * @param iter_cls closure for iter + * Must not be zero (ANY). + * @param proc function to call on the matching value + * @param proc_cls closure for proc */ -typedef void (*PluginSelector) (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls); +typedef void (*PluginGetType) (void *cls, + uint64_t offset, + enum GNUNET_BLOCK_Type type, + PluginDatumProcessor proc, + void *proc_cls); /** @@ -283,10 +257,10 @@ struct GNUNET_DATASTORE_PluginFunctions void *cls; /** - * Get the current on-disk size of the SQ store. Estimates are - * fine, if that's the only thing available. + * Calculate the current on-disk size of the SQ store. Estimates + * are fine, if that's the only thing available. */ - PluginGetSize get_size; + PluginEstimateSize estimate_size; /** * Function to store an item in the datastore. @@ -304,23 +278,14 @@ struct GNUNET_DATASTORE_PluginFunctions PluginUpdate update; /** - * Function called by iterators whenever they want the next value; - * note that unlike all of the other callbacks, this one does get a - * the "next_cls" closure which is usually different from the "cls" - * member of this struct! - */ - PluginNextRequest next_request; - - /** - * Function to iterate over the results for a particular key - * in the datastore. + * Get a particular datum matching a given hash from the datastore. */ - PluginGet get; + PluginGetKey get_key; /** - * Iterate over content with anonymity level zero. + * Get datum (of the specified type) with anonymity level zero. */ - PluginSelector iter_zero_anonymity; + PluginGetType get_zero_anonymity; /** * Function to get a random item with high replication score from @@ -329,13 +294,13 @@ struct GNUNET_DATASTORE_PluginFunctions * counters. The item's replication counter is decremented by one * IF it was positive before. */ - PluginRandomGet replication_get; + PluginGetRandom get_replication; /** * Function to get a random expired item or, if none are expired, one * with a low priority. */ - PluginRandomGet expiration_get; + PluginGetRandom get_expiration; /** * Delete the database. The next operation is diff --git a/src/include/gnunet_datastore_service.h b/src/include/gnunet_datastore_service.h index 53d04e517..c563e5cc9 100644 --- a/src/include/gnunet_datastore_service.h +++ b/src/include/gnunet_datastore_service.h @@ -262,7 +262,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, /** - * An iterator over a set of items stored in the datastore. + * Process a datum that was stored in the datastore. * * @param cls closure * @param key key for the content @@ -275,87 +275,79 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available */ -typedef void (*GNUNET_DATASTORE_Iterator) (void *cls, - const GNUNET_HashCode * key, - size_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, uint64_t uid); +typedef void (*GNUNET_DATASTORE_DatumProcessor) (void *cls, + const GNUNET_HashCode * key, + size_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, uint64_t uid); /** - * Iterate over the results for a particular key - * in the datastore. The iterator will only be called - * once initially; if the first call did contain a - * result, further results can be obtained by calling - * "GNUNET_DATASTORE_iterate_get_next" with the given argument. + * Get a result for a particular key from the datastore. The processor + * will only be called once. * * @param h handle to the datastore + * @param offset offset of the result (mod #num-results); set to + * a random 64-bit value initially; then increment by + * one each time; detect that all results have been found by uid + * being again the first uid ever returned. * @param key maybe NULL (to match all entries) * @param type desired type, 0 for any * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) * @param timeout how long to wait at most for a response - * @param iter function to call on each matching value; + * @param proc function to call on each matching value; * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param proc_cls closure for proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode * key, - enum GNUNET_BLOCK_Type type, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls); +GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, + uint64_t offset, + const GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls); /** - * Get all zero-anonymity values from the datastore. + * Get a single zero-anonymity value from the datastore. * * @param h handle to the datastore + * @param offset offset of the result (mod #num-results); set to + * a random 64-bit value initially; then increment by + * one each time; detect that all results have been found by uid + * being again the first uid ever returned. * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) * @param timeout how long to wait at most for a response * @param type allowed type for the operation (never zero) - * @param iter function to call on a random value; it + * @param proc function to call on a random value; it * will be called once with a value (if available) - * and always once with a value of NULL at the end. - * @param iter_cls closure for iter + * or with NULL if none value exists. + * @param proc_cls closure for proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - enum GNUNET_BLOCK_Type type, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls); - - -/** - * Function called to trigger obtaining the next result - * from the datastore. ONLY applies for 'GNUNET_DATASTORE_iterate_*' - * calls, not for 'get' calls. FIXME: how much mixing of iterate - * calls with other operations can we permit!? Should we pass - * the 'QueueEntry' instead of the datastore handle here instead? - * - * @param h handle to the datastore - */ -void -GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h); +GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, + uint64_t offset, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + enum GNUNET_BLOCK_Type type, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls); /** @@ -370,21 +362,20 @@ GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h); * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) * @param timeout how long to wait at most for a response - * @param iter function to call on a random value; it + * @param proc function to call on a random value; it * will be called once with a value (if available) * and always once with a value of NULL. - * @param iter_cls closure for iter + * @param proc_cls closure for proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls); + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls); -- 2.25.1