From f4c9fdda81d61cc8a8119faf7f624650137b43ee Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 12 May 2010 15:01:03 +0000 Subject: [PATCH] adding support for request queueing to datastore API --- src/datastore/datastore_api.c | 233 ++++++++++----------- src/datastore/plugin_datastore_sqlite.c | 21 +- src/datastore/test_datastore_api.c | 31 ++- src/datastore/test_datastore_api_data.conf | 1 + 4 files changed, 155 insertions(+), 131 deletions(-) diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 3e47f84d1..65a519e59 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -67,12 +67,12 @@ struct QueueEntry /** * Function to call after transmission of the request. */ - GNUNET_DATASTORE_ContinuationWithStatus cont; + GNUNET_DATASTORE_ContinuationWithStatus contX; /** * Closure for 'cont'. */ - void *cont_cls; + void *cont_clsX; /** * Task for timeout signalling. @@ -296,8 +296,6 @@ timeout_queue_entry (void *cls, GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe); - if (qe->cont != NULL) - qe->cont (qe->cont_cls, GNUNET_NO, _("timeout")); if (qe->response_proc != NULL) qe->response_proc (qe, NULL); GNUNET_free (qe); @@ -314,8 +312,6 @@ timeout_queue_entry (void *cls, * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) * @param timeout timeout for the operation - * @param cont continuation to call when done with request transmission (can be NULL) - * @param cont_cls closure for cont * @param response_proc function to call with replies (can be NULL) * @param client_ctx client context (NOT a closure for response_proc) * @return NULL if the queue is full (and this entry was dropped) @@ -326,8 +322,6 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, GNUNET_CLIENT_MessageHandler response_proc, void *client_ctx) { @@ -368,8 +362,6 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, ret->h = h; ret->response_proc = response_proc; ret->client_ctx = client_ctx; - ret->cont = cont; - ret->cont_cls = cont_cls; ret->task = GNUNET_SCHEDULER_add_delayed (h->sched, timeout, &timeout_queue_entry, @@ -391,8 +383,6 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, pos); GNUNET_SCHEDULER_cancel (h->sched, pos->task); - if (pos->cont != NULL) - pos->cont (pos->cont_cls, GNUNET_NO, _("Message queue full")); if (pos->response_proc != NULL) pos->response_proc (pos, NULL); GNUNET_free (pos); @@ -494,26 +484,13 @@ transmit_request (void *cls, } memcpy (buf, &qe[1], msize); qe->was_transmitted = GNUNET_YES; - if (qe->cont != NULL) - qe->cont (qe->cont_cls, GNUNET_OK, NULL); GNUNET_SCHEDULER_cancel (h->sched, qe->task); qe->task = GNUNET_SCHEDULER_NO_TASK; - if (qe->response_proc == NULL) - { - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - GNUNET_free (qe); - process_queue (h); - } - else - { - GNUNET_CLIENT_receive (h->client, - qe->response_proc, - qe, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); - } + GNUNET_CLIENT_receive (h->client, + qe->response_proc, + qe, + GNUNET_TIME_absolute_get_remaining (qe->timeout)); return msize; } @@ -546,75 +523,6 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) } -/** - * Store an item in the datastore. If the item is already present, - * the priorities are summed up and the higher expiration time and - * lower anonymity level is used. - * - * @param h handle to the datastore - * @param rid reservation ID to use (from "reserve"); use 0 if no - * prior reservation was made - * @param key key for the value - * @param size number of bytes in data - * @param data content stored - * @param type type of the content - * @param priority priority of the content - * @param anonymity anonymity-level for the content - * @param expiration expiration time for the content - * @param queue_priority ranking of this request in the priority queue - * @param max_queue_size at what queue size should this request be dropped - * (if other requests of higher priority are in the queue) - * @param timeout timeout for the operation - * @param cont continuation to call when done - * @param cont_cls closure for cont - */ -void -GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, - int rid, - const GNUNET_HashCode * key, - uint32_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute expiration, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) -{ - struct QueueEntry *qe; - struct DataMessage *dm; - size_t msize; - -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to put %u bytes of data under key `%s'\n", - size, - GNUNET_h2s (key)); -#endif - msize = sizeof(struct DataMessage) + size; - GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - qe = make_queue_entry (h, msize, - queue_priority, max_queue_size, timeout, - cont, cont_cls, NULL, NULL); - if (qe == NULL) - return; - dm = (struct DataMessage* ) &qe[1]; - dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); - dm->header.size = htons(msize); - dm->rid = htonl(rid); - dm->size = htonl(size); - dm->type = htonl(type); - dm->priority = htonl(priority); - dm->anonymity = htonl(anonymity); - dm->uid = GNUNET_htonll(0); - dm->expiration = GNUNET_TIME_absolute_hton(expiration); - dm->key = *key; - memcpy (&dm[1], data, size); - process_queue (h); -} /** @@ -635,6 +543,20 @@ struct StatusContext }; +/** + * Dummy continuation used to do nothing (but be non-zero). + * + * @param cls closure + * @param result result + * @param emsg error message + */ +static void +drop_status_cont (void *cls, int result, const char *emsg) +{ + /* do nothing */ +} + + /** * Type of a function to call when we receive a message * from the service. @@ -710,6 +632,81 @@ process_status_message (void *cls, } +/** + * Store an item in the datastore. If the item is already present, + * the priorities are summed up and the higher expiration time and + * lower anonymity level is used. + * + * @param h handle to the datastore + * @param rid reservation ID to use (from "reserve"); use 0 if no + * prior reservation was made + * @param key key for the value + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout timeout for the operation + * @param cont continuation to call when done + * @param cont_cls closure for cont + */ +void +GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, + int rid, + const GNUNET_HashCode * key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) +{ + struct StatusContext *scont; + struct QueueEntry *qe; + struct DataMessage *dm; + size_t msize; + +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asked to put %u bytes of data under key `%s'\n", + size, + GNUNET_h2s (key)); +#endif + msize = sizeof(struct DataMessage) + size; + GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); + scont = GNUNET_malloc (sizeof (struct StatusContext)); + scont->cont = cont; + scont->cont_cls = cont_cls; + qe = make_queue_entry (h, msize, + queue_priority, max_queue_size, timeout, + &process_status_message, scont); + if (qe == NULL) + return; + dm = (struct DataMessage* ) &qe[1]; + dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); + dm->header.size = htons(msize); + dm->rid = htonl(rid); + dm->size = htonl(size); + dm->type = htonl(type); + dm->priority = htonl(priority); + dm->anonymity = htonl(anonymity); + dm->uid = GNUNET_htonll(0); + dm->expiration = GNUNET_TIME_absolute_hton(expiration); + dm->key = *key; + memcpy (&dm[1], data, size); + process_queue (h); +} + + /** * Reserve space in the datastore. This function should be used * to avoid "out of space" failures during a longer sequence of "put" @@ -740,6 +737,8 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, struct ReserveMessage *rm; struct StatusContext *scont; + if (cont == NULL) + cont = &drop_status_cont; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to reserve %llu bytes of data and %u entries'\n", @@ -751,7 +750,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, scont->cont_cls = cont_cls; qe = make_queue_entry (h, sizeof(struct ReserveMessage), queue_priority, max_queue_size, timeout, - NULL, NULL, &process_status_message, scont); + &process_status_message, scont); if (qe == NULL) return; rm = (struct ReserveMessage*) &qe[1]; @@ -794,18 +793,19 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, struct ReleaseReserveMessage *rrm; struct StatusContext *scont; + if (cont == NULL) + cont = &drop_status_cont; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to reserve %llu bytes of data and %u entries'\n", - (unsigned long long) amount, - (unsigned int) entries); + "Asked to release reserve %d\n", + rid); #endif scont = GNUNET_malloc (sizeof (struct StatusContext)); scont->cont = cont; scont->cont_cls = cont_cls; qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage), queue_priority, max_queue_size, timeout, - NULL, NULL, &process_status_message, scont); + &process_status_message, scont); if (qe == NULL) return; rrm = (struct ReleaseReserveMessage*) &qe[1]; @@ -845,6 +845,8 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, struct UpdateMessage *um; struct StatusContext *scont; + if (cont == NULL) + cont = &drop_status_cont; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to update entry %llu raising priority by %u and expiration to %llu\n", @@ -857,7 +859,7 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, scont->cont_cls = cont_cls; qe = make_queue_entry (h, sizeof(struct UpdateMessage), queue_priority, max_queue_size, timeout, - NULL, NULL, &process_status_message, scont); + &process_status_message, scont); if (qe == NULL) return; um = (struct UpdateMessage*) &qe[1]; @@ -904,6 +906,8 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, size_t msize; struct StatusContext *scont; + if (cont == NULL) + cont = &drop_status_cont; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n", @@ -917,7 +921,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, - NULL, NULL, &process_status_message, scont); + &process_status_message, scont); if (qe == NULL) return; dm = (struct DataMessage*) &qe[1]; @@ -952,13 +956,6 @@ struct ResultContext */ void *iter_cls; - /** - * Automatically get the next result, or wait for a call to - * GNUNET_DATASTORE_get_next? GNUNET_YES means we automatically - * get the next one (if there are more). - */ - int get_next; - }; @@ -1048,11 +1045,6 @@ process_result_message (void *cls, ntohl(dm->anonymity), GNUNET_TIME_absolute_ntoh(dm->expiration), GNUNET_ntohll(dm->uid)); - if (rc->get_next == GNUNET_YES) - GNUNET_CLIENT_receive (h->client, - qe->response_proc, - qe, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); } @@ -1089,10 +1081,9 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, rcont = GNUNET_malloc (sizeof (struct ResultContext)); rcont->iter = iter; rcont->iter_cls = iter_cls; - rcont->get_next = GNUNET_YES; qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), queue_priority, max_queue_size, timeout, - NULL, NULL, &process_result_message, rcont); + &process_result_message, rcont); if (qe == NULL) return; m = (struct GNUNET_MessageHeader*) &qe[1]; @@ -1144,10 +1135,9 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, rcont = GNUNET_malloc (sizeof (struct ResultContext)); rcont->iter = iter; rcont->iter_cls = iter_cls; - rcont->get_next = GNUNET_NO; qe = make_queue_entry (h, sizeof(struct GetMessage), queue_priority, max_queue_size, timeout, - NULL, NULL, &process_result_message, rcont); + &process_result_message, rcont); if (qe == NULL) return; gm = (struct GetMessage*) &qe[1]; @@ -1183,7 +1173,6 @@ GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, GNUNET_assert (NULL != qe); GNUNET_assert (&process_result_message == qe->response_proc); - GNUNET_assert (rc->get_next == GNUNET_NO); if (GNUNET_YES == more) { GNUNET_CLIENT_receive (h->client, diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c index 9a387bb14..1576cef00 100644 --- a/src/datastore/plugin_datastore_sqlite.c +++ b/src/datastore/plugin_datastore_sqlite.c @@ -258,11 +258,21 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg, "datastore-sqlite"); return GNUNET_SYSERR; } - if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir)) + if (GNUNET_OK != GNUNET_DISK_file_test (afsdir)) { - GNUNET_break (0); - GNUNET_free (afsdir); - return GNUNET_SYSERR; + if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir)) + { + GNUNET_break (0); + GNUNET_free (afsdir); + return GNUNET_SYSERR; + } + /* database is new or got deleted, reset payload to zero! */ + if (plugin->stat_get != NULL) + { + GNUNET_STATISTICS_get_cancel (plugin->stat_get); + plugin->stat_get = NULL; + } + plugin->payload = 0; } plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir), #ifdef ENABLE_NLS @@ -779,6 +789,9 @@ sqlite_plugin_put (void *cls, LOG_SQLITE (plugin, msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); sqlite3_reset (stmt); + database_shutdown (plugin); + database_setup (plugin->env->cfg, + plugin); return GNUNET_SYSERR; } if (SQLITE_OK != sqlite3_reset (stmt)) diff --git a/src/datastore/test_datastore_api.c b/src/datastore/test_datastore_api.c index 53e61d092..fc5b3f68e 100644 --- a/src/datastore/test_datastore_api.c +++ b/src/datastore/test_datastore_api.c @@ -109,7 +109,8 @@ enum RunPhase RP_GET_MULTIPLE_DONE, RP_UPDATE, RP_UPDATE_VALIDATE, - RP_UPDATE_DONE + RP_UPDATE_DONE, + RP_ERROR }; @@ -139,9 +140,13 @@ check_success (void *cls, { struct CpsRunContext *crc = cls; if (GNUNET_OK != success) - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "%s\n", msg); - GNUNET_assert (GNUNET_OK == success); + { + ok = 42; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%s\n", msg); + GNUNET_SCHEDULER_shutdown (crc->sched); + return; + } GNUNET_free_non_null (crc->data); crc->data = NULL; GNUNET_SCHEDULER_add_continuation (crc->sched, @@ -221,7 +226,16 @@ delete_value (void *cls, struct CpsRunContext *crc = cls; if (key == NULL) { - crc->phase = RP_DO_DEL; + if (crc->data == NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Content not found!\n"); + crc->phase = RP_ERROR; + } + else + { + crc->phase = RP_DO_DEL; + } GNUNET_SCHEDULER_add_continuation (crc->sched, &run_continuation, crc, @@ -386,6 +400,7 @@ run_continuation (void *cls, "DEL", crc->i); #endif + crc->data = NULL; GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key); GNUNET_DATASTORE_get (datastore, &crc->key, @@ -515,6 +530,12 @@ run_continuation (void *cls, GNUNET_DATASTORE_disconnect (datastore, GNUNET_YES); GNUNET_free (crc); ok = 0; + break; + case RP_ERROR: + GNUNET_DATASTORE_disconnect (datastore, GNUNET_YES); + GNUNET_free (crc); + ok = 43; + break; } } diff --git a/src/datastore/test_datastore_api_data.conf b/src/datastore/test_datastore_api_data.conf index bb72a40c4..59374dc40 100644 --- a/src/datastore/test_datastore_api_data.conf +++ b/src/datastore/test_datastore_api_data.conf @@ -30,6 +30,7 @@ DATABASE = sqlite # REJECT_FROM = # REJECT_FROM6 = # PREFIX = +# DEBUG = YES #PREFIX = valgrind --tool=memcheck --leak-check=yes #BINARY = /home/grothoff/bin/gnunet-service-datastore -- 2.25.1