X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdatastore%2Fdatastore_api.c;h=916e6acaef930121af9a4bfa04f0be4c72437d35;hb=0945dcf2c250dea65d520ef26f9917e9be3ac4ac;hp=12ec8bf0535bc2bfe7bf6c76f62a9005965f2e79;hpb=3a52873d301e7f458dd9622e3632d4056551d7b0;p=oweals%2Fgnunet.git diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 12ec8bf05..916e6acae 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -1,10 +1,10 @@ /* This file is part of GNUnet - (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors) + Copyright (C) 2004-2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 2, or (at your + by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but @@ -14,22 +14,87 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file datastore/datastore_api.c * @brief Management for the datastore for files stored on a GNUnet node. Implements - * a priority queue for requests (with timeouts). + * a priority queue for requests * @author Christian Grothoff */ #include "platform.h" #include "gnunet_arm_service.h" #include "gnunet_constants.h" #include "gnunet_datastore_service.h" +#include "gnunet_statistics_service.h" #include "datastore.h" +#define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__) + +/** + * Collect an instane number of statistics? May cause excessive IPC. + */ +#define INSANE_STATISTICS GNUNET_NO + +/** + * If a client stopped asking for more results, how many more do + * we receive from the DB before killing the connection? Trade-off + * between re-doing TCP handshakes and (needlessly) receiving + * useless results. + */ +#define MAX_EXCESS_RESULTS 8 + +/** + * Context for processing status messages. + */ +struct StatusContext +{ + /** + * Continuation to call with the status. + */ + GNUNET_DATASTORE_ContinuationWithStatus cont; + + /** + * Closure for @e cont. + */ + void *cont_cls; + +}; + + +/** + * Context for processing result messages. + */ +struct ResultContext +{ + /** + * Function to call with the result. + */ + GNUNET_DATASTORE_DatumProcessor proc; + + /** + * Closure for @e proc. + */ + void *proc_cls; + +}; + + +/** + * Context for a queue operation. + */ +union QueueContext +{ + + struct StatusContext sc; + + struct ResultContext rc; + +}; + + /** * Entry in our priority queue. */ @@ -51,38 +116,26 @@ struct GNUNET_DATASTORE_QueueEntry */ struct GNUNET_DATASTORE_Handle *h; - /** - * Response processor (NULL if we are not waiting for a response). - * This struct should be used for the closure, function-specific - * arguments can be passed via 'client_ctx'. - */ - GNUNET_CLIENT_MessageHandler response_proc; - - /** - * Specific context (variable argument that - * can be used by the response processor). - */ - void *client_ctx; - /** * Function to call after transmission of the request. */ GNUNET_DATASTORE_ContinuationWithStatus cont; - + /** - * Closure for 'cont'. + * Closure for @e cont. */ void *cont_cls; /** - * Task for timeout signalling. + * Context for the operation. */ - GNUNET_SCHEDULER_TaskIdentifier task; + union QueueContext qc; /** - * Timeout for the current operation. + * Envelope of the request to transmit, NULL after + * transmission. */ - struct GNUNET_TIME_Absolute timeout; + struct GNUNET_MQ_Envelope *env; /** * Priority in the queue. @@ -96,24 +149,15 @@ struct GNUNET_DATASTORE_QueueEntry unsigned int max_queue; /** - * Number of bytes in the request message following - * this struct. 32-bit value for nicer memory - * access (and overall struct alignment). + * Expected response type. */ - uint32_t message_size; - - /** - * Has this message been transmitted to the service? - * Only ever GNUNET_YES for the head of the queue. - * Note that the overall struct should end at a - * multiple of 64 bits. - */ - int32_t was_transmitted; + uint16_t response_type; }; + /** - * Handle to the datastore service. + * Handle to the datastore service. */ struct GNUNET_DATASTORE_Handle { @@ -123,20 +167,15 @@ struct GNUNET_DATASTORE_Handle */ const struct GNUNET_CONFIGURATION_Handle *cfg; - /** - * Our scheduler. - */ - struct GNUNET_SCHEDULER_Handle *sched; - /** * Current connection to the datastore service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** - * Current transmit handle. + * Handle for statistics. */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_STATISTICS_Handle *stats; /** * Current head of priority queue. @@ -151,7 +190,7 @@ struct GNUNET_DATASTORE_Handle /** * Task for trying to reconnect. */ - GNUNET_SCHEDULER_TaskIdentifier reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** * How quickly should we retry? Used for exponential back-off on @@ -164,69 +203,188 @@ struct GNUNET_DATASTORE_Handle */ unsigned int queue_size; + /** + * Number of results we're receiving for the current query + * after application stopped to care. Used to determine when + * to reset the connection. + */ + unsigned int result_count; + + /** + * We should ignore the next message(s) from the service. + */ + unsigned int skip_next_messages; + }; +/** + * Try reconnecting to the datastore service. + * + * @param cls the `struct GNUNET_DATASTORE_Handle` + */ +static void +try_reconnect (void *cls); + + +/** + * Disconnect from the service and then try reconnecting to the datastore service + * after some delay. + * + * @param h handle to datastore to disconnect and reconnect + */ +static void +do_disconnect (struct GNUNET_DATASTORE_Handle *h) +{ + if (NULL == h->mq) + { + GNUNET_break (0); + return; + } + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; + h->skip_next_messages = 0; + h->reconnect_task + = GNUNET_SCHEDULER_add_delayed (h->retry_time, + &try_reconnect, + h); +} + + +/** + * Free a queue entry. Removes the given entry from the + * queue and releases associated resources. Does NOT + * call the callback. + * + * @param qe entry to free. + */ +static void +free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) +{ + struct GNUNET_DATASTORE_Handle *h = qe->h; + + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + h->queue_size--; + if (NULL != qe->env) + GNUNET_MQ_discard (qe->env); + GNUNET_free (qe); +} + + +/** + * Handle error in sending drop request to datastore. + * + * @param cls closure with the datastore handle + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "MQ error, reconnecting to DATASTORE\n"); + do_disconnect (h); + qe = h->queue_head; + if ( (NULL != qe) && + (NULL == qe->env) ) + { + union QueueContext qc = qe->qc; + uint16_t rt = qe->response_type; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Failed to receive response from database.\n"); + free_queue_entry (qe); + switch (rt) + { + case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: + if (NULL != qc.sc.cont) + qc.sc.cont (qc.sc.cont_cls, + GNUNET_SYSERR, + GNUNET_TIME_UNIT_ZERO_ABS, + _("DATASTORE disconnected")); + break; + case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: + if (NULL != qc.rc.proc) + qc.rc.proc (qc.rc.proc_cls, + NULL, + 0, + NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); + break; + default: + GNUNET_break (0); + } + } +} + /** * Connect to the datastore service. * * @param cfg configuration to use - * @param sched scheduler to use * @return handle to use to access the service */ struct GNUNET_DATASTORE_Handle * -GNUNET_DATASTORE_connect (const struct - GNUNET_CONFIGURATION_Handle - *cfg, - struct - GNUNET_SCHEDULER_Handle - *sched) +GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) { - struct GNUNET_CLIENT_Connection *c; struct GNUNET_DATASTORE_Handle *h; - - c = GNUNET_CLIENT_connect (sched, "datastore", cfg); - if (c == NULL) - return NULL; /* oops */ - h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + - GNUNET_SERVER_MAX_MESSAGE_SIZE); - h->client = c; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Establishing DATASTORE connection!\n"); + h = GNUNET_new (struct GNUNET_DATASTORE_Handle); h->cfg = cfg; - h->sched = sched; + try_reconnect (h); + if (NULL == h->mq) + { + GNUNET_free (h); + return NULL; + } + h->stats = GNUNET_STATISTICS_create ("datastore-api", + cfg); return h; } /** - * Transmit DROP message to datastore service. + * Task used by to disconnect from the datastore after + * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message. * - * @param cls the 'struct GNUNET_DATASTORE_Handle' - * @param size number of bytes that can be copied to buf - * @param buf where to copy the drop message - * @return number of bytes written to buf + * @param cls the datastore handle */ -static size_t -transmit_drop (void *cls, - size_t size, - void *buf) +static void +disconnect_after_drop (void *cls) { struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_MessageHeader *hdr; - - if (buf == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to drop database.\n")); - GNUNET_DATASTORE_disconnect (h, GNUNET_NO); - return 0; - } - GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader)); - hdr = buf; - hdr->size = htons(sizeof(struct GNUNET_MessageHeader)); - hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP); - GNUNET_DATASTORE_disconnect (h, GNUNET_NO); - return sizeof(struct GNUNET_MessageHeader); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Drop sent, disconnecting\n"); + GNUNET_DATASTORE_disconnect (h, + GNUNET_NO); +} + + +/** + * Handle error in sending drop request to datastore. + * + * @param cls closure with the datastore handle + * @param error error code + */ +static void +disconnect_on_mq_error (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + + LOG (GNUNET_ERROR_TYPE_ERROR, + "Failed to ask datastore to drop tables\n"); + GNUNET_DATASTORE_disconnect (h, + GNUNET_NO); } @@ -235,410 +393,522 @@ transmit_drop (void *cls, * associated resources). * * @param h handle to the datastore - * @param drop set to GNUNET_YES to delete all data in datastore (!) + * @param drop set to #GNUNET_YES to delete all data in datastore (!) */ -void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, - int drop) +void +GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, + int drop) { struct GNUNET_DATASTORE_QueueEntry *qe; - if (h->client != NULL) - { - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = NULL; - } - if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, - h->reconnect_task); - h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Datastore disconnect\n"); + if (NULL != h->mq) + { + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; + } + if (NULL != h->reconnect_task) + { + GNUNET_SCHEDULER_cancel (h->reconnect_task); + h->reconnect_task = NULL; + } while (NULL != (qe = h->queue_head)) + { + switch (qe->response_type) { - GNUNET_assert (NULL != qe->response_proc); - qe->response_proc (qe, NULL); + case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: + if (NULL != qe->qc.sc.cont) + qe->qc.sc.cont (qe->qc.sc.cont_cls, + GNUNET_SYSERR, + GNUNET_TIME_UNIT_ZERO_ABS, + _("Disconnected from DATASTORE")); + break; + case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: + if (NULL != qe->qc.rc.proc) + qe->qc.rc.proc (qe->qc.rc.proc_cls, + NULL, + 0, + NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); + break; + default: + GNUNET_break (0); } - if (GNUNET_YES == drop) + free_queue_entry (qe); + } + if (GNUNET_YES == drop) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Re-connecting to issue DROP!\n"); + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connect (h->cfg, + "datastore", + NULL, + &disconnect_on_mq_error, + h); + if (NULL != h->mq) { - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - if (h->client != NULL) - { - if (NULL != - GNUNET_CLIENT_notify_transmit_ready (h->client, - sizeof(struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_MINUTES, - GNUNET_YES, - &transmit_drop, - h)) - return; - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - } - GNUNET_break (0); + struct GNUNET_MessageHeader *hdr; + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg (hdr, + GNUNET_MESSAGE_TYPE_DATASTORE_DROP); + GNUNET_MQ_notify_sent (env, + &disconnect_after_drop, + h); + GNUNET_MQ_send (h->mq, + env); + return; } + GNUNET_break (0); + } + GNUNET_STATISTICS_destroy (h->stats, + GNUNET_NO); + h->stats = NULL; GNUNET_free (h); } -/** - * A request has timed out (before being transmitted to the service). - * - * @param cls the 'struct GNUNET_DATASTORE_QueueEntry' - * @param tc scheduler context - */ -static void -timeout_queue_entry (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_DATASTORE_QueueEntry *qe = cls; - - qe->task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (qe->was_transmitted == GNUNET_NO); - qe->response_proc (qe, NULL); -} - - /** * Create a new entry for our priority queue (and possibly discard other entires if * the queue is getting too long). * * @param h handle to the datastore - * @param msize size of the message to queue + * @param env envelope with the message to queue * @param queue_priority priority of the entry * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout timeout for the operation - * @param response_proc function to call with replies (can be NULL) - * @param client_ctx client context (NOT a closure for response_proc) - * @return NULL if the queue is full (and this entry was dropped) + * @param expected_type which type of response do we expect, + * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or + * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA + * @param qc client context (NOT a closure for @a response_proc) + * @return NULL if the queue is full */ static struct GNUNET_DATASTORE_QueueEntry * make_queue_entry (struct GNUNET_DATASTORE_Handle *h, - size_t msize, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_CLIENT_MessageHandler response_proc, - void *client_ctx) + struct GNUNET_MQ_Envelope *env, + unsigned int queue_priority, + unsigned int max_queue_size, + uint16_t expected_type, + const union QueueContext *qc) { - struct GNUNET_DATASTORE_QueueEntry *ret; + struct GNUNET_DATASTORE_QueueEntry *qe; struct GNUNET_DATASTORE_QueueEntry *pos; unsigned int c; - c = 0; - pos = h->queue_head; - while ( (pos != NULL) && - (c < max_queue_size) && - (pos->priority >= queue_priority) ) - { - c++; - pos = pos->next; - } - ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); - ret->h = h; - ret->response_proc = response_proc; - ret->client_ctx = client_ctx; - ret->timeout = GNUNET_TIME_relative_to_absolute (timeout); - ret->priority = queue_priority; - ret->max_queue = max_queue_size; - ret->message_size = msize; - ret->was_transmitted = GNUNET_NO; - if (pos == NULL) - { - /* append at the tail */ - pos = h->queue_tail; - } + if ( (NULL != h->queue_tail) && + (h->queue_tail->priority >= queue_priority) ) + { + c = h->queue_size; + pos = NULL; + } else - { - pos = pos->prev; - /* do not insert at HEAD if HEAD query was already - transmitted and we are still receiving replies! */ - if ( (pos == NULL) && - (h->queue_head->was_transmitted) ) - pos = h->queue_head; - } + { + c = 0; + pos = h->queue_head; + } + while ( (NULL != pos) && + (c < max_queue_size) && + (pos->priority >= queue_priority) ) + { + c++; + pos = pos->next; + } + if (c >= max_queue_size) + { + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue overflows"), + 1, + GNUNET_NO); + GNUNET_MQ_discard (env); + return NULL; + } + qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry); + qe->h = h; + qe->env = env; + qe->response_type = expected_type; + qe->qc = *qc; + qe->priority = queue_priority; + qe->max_queue = max_queue_size; + if (NULL == pos) + { + /* append at the tail */ + pos = h->queue_tail; + } + else + { + pos = pos->prev; + /* do not insert at HEAD if HEAD query was already + * transmitted and we are still receiving replies! */ + if ( (NULL == pos) && + (NULL == h->queue_head->env) ) + pos = h->queue_head; + } c++; +#if INSANE_STATISTICS + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue entries created"), + 1, + GNUNET_NO); +#endif GNUNET_CONTAINER_DLL_insert_after (h->queue_head, - h->queue_tail, - pos, - ret); + h->queue_tail, + pos, + qe); h->queue_size++; - if (c > max_queue_size) - { - response_proc (ret, NULL); - GNUNET_free (ret); - return NULL; - } - ret->task = GNUNET_SCHEDULER_add_delayed (h->sched, - timeout, - &timeout_queue_entry, - ret); - pos = ret->next; - while (pos != NULL) - { - if (pos->max_queue < h->queue_size) - { - GNUNET_assert (pos->response_proc != NULL); - pos->response_proc (pos, NULL); - break; - } - pos = pos->next; - } - return ret; + return qe; } /** * Process entries in the queue (or do nothing if we are already * doing so). - * + * * @param h handle to the datastore */ static void -process_queue (struct GNUNET_DATASTORE_Handle *h); +process_queue (struct GNUNET_DATASTORE_Handle *h) +{ + struct GNUNET_DATASTORE_QueueEntry *qe; + + if (NULL == (qe = h->queue_head)) + { + /* no entry in queue */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queue empty\n"); + return; + } + if (NULL == qe->env) + { + /* waiting for replies */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Head request already transmitted\n"); + return; + } + if (NULL == h->mq) + { + /* waiting for reconnect */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Not connected\n"); + return; + } + GNUNET_MQ_send (h->mq, + qe->env); + qe->env = NULL; +} + + /** - * Try reconnecting to the datastore service. + * Function called to check status message from the service. * - * @param cls the 'struct GNUNET_DATASTORE_Handle' - * @param tc scheduler context + * @param cls closure + * @param sm status message received + * @return #GNUNET_OK if the message is well-formed */ -static void -try_reconnect (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +static int +check_status (void *cls, + const struct StatusMessage *sm) { - struct GNUNET_DATASTORE_Handle *h = cls; + uint16_t msize = ntohs (sm->header.size) - sizeof (*sm); + int32_t status = ntohl (sm->status); - if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value) - h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY; - else - h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2); - if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value) - h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT; - h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - if (h->client == NULL) - return; - process_queue (h); + if (msize > 0) + { + const char *emsg = (const char *) &sm[1]; + + if ('\0' != emsg[msize - 1]) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + } + else if (GNUNET_SYSERR == status) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** - * Disconnect from the service and then try reconnecting to the datastore service - * after some delay. + * Function called to handle status message from the service. * - * @param h handle to datastore to disconnect and reconnect + * @param cls closure + * @param sm status message received */ static void -do_disconnect (struct GNUNET_DATASTORE_Handle *h) +handle_status (void *cls, + const struct StatusMessage *sm) { - if (h->client == NULL) + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct StatusContext rc; + const char *emsg; + int32_t status = ntohl (sm->status); + + if (h->skip_next_messages > 0) + { + h->skip_next_messages--; + process_queue (h); + return; + } + if (NULL == (qe = h->queue_head)) + { + GNUNET_break (0); + do_disconnect (h); return; - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = NULL; - h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched, - h->retry_time, - &try_reconnect, - h); + } + if (NULL != qe->env) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + rc = qe->qc.sc; + free_queue_entry (qe); + if (ntohs (sm->header.size) > sizeof (struct StatusMessage)) + emsg = (const char *) &sm[1]; + else + emsg = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received status %d/%s\n", + (int) status, + emsg); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# status messages received"), + 1, + GNUNET_NO); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + process_queue (h); + if (NULL != rc.cont) + rc.cont (rc.cont_cls, + status, + GNUNET_TIME_absolute_ntoh (sm->min_expiration), + emsg); } /** - * Transmit request from queue to datastore service. + * Check data message we received from the service. * - * @param cls the 'struct GNUNET_DATASTORE_Handle' - * @param size number of bytes that can be copied to buf - * @param buf where to copy the drop message - * @return number of bytes written to buf + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param dm message received */ -static size_t -transmit_request (void *cls, - size_t size, - void *buf) +static int +check_data (void *cls, + const struct DataMessage *dm) { - struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_DATASTORE_QueueEntry *qe; - size_t msize; - - h->th = NULL; - if (NULL == (qe = h->queue_head)) - return 0; /* no entry in queue */ - if (buf == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to database.\n")); - do_disconnect (h); - return 0; - } - if (size < (msize = qe->message_size)) - { - process_queue (h); - return 0; - } - memcpy (buf, &qe[1], msize); - qe->was_transmitted = GNUNET_YES; - GNUNET_SCHEDULER_cancel (h->sched, - qe->task); - qe->task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_CLIENT_receive (h->client, - qe->response_proc, - qe, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); - return msize; + uint16_t msize = ntohs (dm->header.size) - sizeof (*dm); + + if (msize != ntohl (dm->size)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** - * Process entries in the queue (or do nothing if we are already - * doing so). - * - * @param h handle to the datastore + * Handle data message we got from the service. + * + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param dm message received */ static void -process_queue (struct GNUNET_DATASTORE_Handle *h) +handle_data (void *cls, + const struct DataMessage *dm) { + struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; + struct ResultContext rc; - if (NULL == (qe = h->queue_head)) - return; /* no entry in queue */ - if (qe->was_transmitted == GNUNET_YES) - return; /* waiting for replies */ - if (h->th != NULL) - return; /* request pending */ - if (h->client == NULL) - return; /* waiting for reconnect */ - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, - qe->message_size, - GNUNET_TIME_absolute_get_remaining (qe->timeout), - GNUNET_YES, - &transmit_request, - h); + if (h->skip_next_messages > 0) + { + process_queue (h); + return; + } + qe = h->queue_head; + if (NULL == qe) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + if (NULL != qe->env) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) + { + GNUNET_break (0); + do_disconnect (h); + return; + } +#if INSANE_STATISTICS + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# Results received"), + 1, + GNUNET_NO); +#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received result %llu with type %u and size %u with key %s\n", + (unsigned long long) GNUNET_ntohll (dm->uid), + ntohl (dm->type), + ntohl (dm->size), + GNUNET_h2s (&dm->key)); + rc = qe->qc.rc; + free_queue_entry (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + process_queue (h); + if (NULL != rc.proc) + rc.proc (rc.proc_cls, + &dm->key, + ntohl (dm->size), + &dm[1], + ntohl (dm->type), + ntohl (dm->priority), + ntohl (dm->anonymity), + GNUNET_TIME_absolute_ntoh (dm->expiration), + GNUNET_ntohll (dm->uid)); } - - /** - * Context for processing status messages. + * Type of a function to call when we receive a + * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service. + * + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param msg message received */ -struct StatusContext +static void +handle_data_end (void *cls, + const struct GNUNET_MessageHeader *msg) { - /** - * Continuation to call with the status. - */ - GNUNET_DATASTORE_ContinuationWithStatus cont; - - /** - * Closure for cont. - */ - void *cont_cls; + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct ResultContext rc; -}; + if (h->skip_next_messages > 0) + { + h->skip_next_messages--; + process_queue (h); + return; + } + qe = h->queue_head; + if (NULL == qe) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + if (NULL != qe->env) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + rc = qe->qc.rc; + free_queue_entry (qe); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received end of result set, new queue size is %u\n", + h->queue_size); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + h->result_count = 0; + process_queue (h); + /* signal end of iteration */ + if (NULL != rc.proc) + rc.proc (rc.proc_cls, + NULL, + 0, + NULL, + 0, + 0, + 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); +} /** - * Dummy continuation used to do nothing (but be non-zero). + * Try reconnecting to the datastore service. * - * @param cls closure - * @param result result - * @param emsg error message + * @param cls the `struct GNUNET_DATASTORE_Handle` */ static void -drop_status_cont (void *cls, int result, const char *emsg) +try_reconnect (void *cls) { - /* do nothing */ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (status, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + struct StatusMessage, + h), + GNUNET_MQ_hd_var_size (data, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + struct DataMessage, + h), + GNUNET_MQ_hd_fixed_size (data_end, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END, + struct GNUNET_MessageHeader, + h), + GNUNET_MQ_handler_end () + }; + + h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); + h->reconnect_task = NULL; + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connect (h->cfg, + "datastore", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) + return; + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# datastore connections (re)created"), + 1, + GNUNET_NO); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Reconnected to DATASTORE\n"); + process_queue (h); } -static void -free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) -{ - struct GNUNET_DATASTORE_Handle *h = qe->h; - - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); - if (qe->task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, - qe->task); - qe->task = GNUNET_SCHEDULER_NO_TASK; - } - h->queue_size--; - GNUNET_free (qe); -} - /** - * Type of a function to call when we receive a message - * from the service. + * Dummy continuation used to do nothing (but be non-zero). * * @param cls closure - * @param msg message received, NULL on timeout or fatal error + * @param result result + * @param min_expiration expiration time + * @param emsg error message */ -static void -process_status_message (void *cls, - const struct - GNUNET_MessageHeader * msg) +static void +drop_status_cont (void *cls, + int32_t result, + struct GNUNET_TIME_Absolute min_expiration, + const char *emsg) { - struct GNUNET_DATASTORE_QueueEntry *qe = cls; - struct GNUNET_DATASTORE_Handle *h = qe->h; - struct StatusContext *rc = qe->client_ctx; - const struct StatusMessage *sm; - const char *emsg; - int32_t status; - - free_queue_entry (qe); - if (msg == NULL) - { - if (NULL == h->client) - return; /* forced disconnect */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to receive response from database.\n")); - do_disconnect (h); - return; - } - - if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || - (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) - { - GNUNET_break (0); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - rc->cont (rc->cont_cls, - GNUNET_SYSERR, - _("Error reading response from datastore service")); - GNUNET_free (rc); - return; - } - sm = (const struct StatusMessage*) msg; - status = ntohl(sm->status); - emsg = NULL; - if (ntohs(msg->size) > sizeof(struct StatusMessage)) - { - emsg = (const char*) &sm[1]; - if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0') - { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); - } - } - if ( (status == GNUNET_SYSERR) && - (emsg == NULL) ) - { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); - } -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received status %d/%s\n", - (int) status, - emsg); -#endif - rc->cont (rc->cont_cls, - status, - emsg); - GNUNET_free (rc); - process_queue (h); + /* do nothing */ } @@ -656,66 +926,84 @@ process_status_message (void *cls, * @param type type of the content * @param priority priority of the content * @param anonymity anonymity-level for the content + * @param replication how often should the content be replicated to other peers? * @param expiration expiration time for the content * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout timeout for the operation * @param cont continuation to call when done - * @param cont_cls closure for cont + * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel; note that even if NULL is returned, the callback will be invoked * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, - int rid, - const GNUNET_HashCode * key, - uint32_t size, + uint32_t rid, + const struct GNUNET_HashCode *key, + size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, + uint32_t replication, struct GNUNET_TIME_Absolute expiration, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) + unsigned int queue_priority, + unsigned int max_queue_size, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { - struct StatusContext *scont; struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct DataMessage *dm; - size_t msize; + union QueueContext qc; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to put %u bytes of data under key `%s'\n", - size, - GNUNET_h2s (key)); -#endif - msize = sizeof(struct DataMessage) + size; - GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - scont = GNUNET_malloc (sizeof (struct StatusContext)); - scont->cont = cont; - scont->cont_cls = cont_cls; - qe = make_queue_entry (h, msize, - queue_priority, max_queue_size, timeout, - &process_status_message, scont); - if (qe == NULL) + if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); return NULL; - dm = (struct DataMessage* ) &qe[1]; - dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); - dm->header.size = htons(msize); - dm->rid = htonl(rid); - dm->size = htonl(size); - dm->type = htonl(type); - dm->priority = htonl(priority); - dm->anonymity = htonl(anonymity); - dm->uid = GNUNET_htonll(0); - dm->expiration = GNUNET_TIME_absolute_hton(expiration); + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to put %u bytes of data under key `%s' for %s\n", + size, + GNUNET_h2s (key), + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), + GNUNET_YES)); + env = GNUNET_MQ_msg_extra (dm, + size, + GNUNET_MESSAGE_TYPE_DATASTORE_PUT); + dm->rid = htonl (rid); + dm->size = htonl ((uint32_t) size); + dm->type = htonl (type); + dm->priority = htonl (priority); + dm->anonymity = htonl (anonymity); + dm->replication = htonl (replication); + dm->reserved = htonl (0); + dm->uid = GNUNET_htonll (0); + dm->expiration = GNUNET_TIME_absolute_hton (expiration); dm->key = *key; - memcpy (&dm[1], data, size); + GNUNET_memcpy (&dm[1], + data, + size); + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for PUT\n"); + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# PUT requests executed"), + 1, + GNUNET_NO); process_queue (h); return qe; } @@ -729,52 +1017,54 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * @param h handle to the datastore * @param amount how much space (in bytes) should be reserved (for content only) * @param entries how many entries will be created (to calculate per-entry overhead) - * @param queue_priority ranking of this request in the priority queue - * @param max_queue_size at what queue size should this request be dropped - * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response (or before dying in queue) * @param cont continuation to call when done; "success" will be set to * a positive reservation value if space could be reserved. - * @param cont_cls closure for cont + * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel; note that even if NULL is returned, the callback will be invoked * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, - uint64_t amount, - uint32_t entries, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) + uint64_t amount, + uint32_t entries, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct ReserveMessage *rm; - struct StatusContext *scont; + union QueueContext qc; - if (cont == NULL) + if (NULL == cont) cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to reserve %llu bytes of data and %u entries'\n", - (unsigned long long) amount, - (unsigned int) entries); -#endif - scont = GNUNET_malloc (sizeof (struct StatusContext)); - scont->cont = cont; - scont->cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof(struct ReserveMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, scont); - if (qe == NULL) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to reserve %llu bytes of data and %u entries\n", + (unsigned long long) amount, + (unsigned int) entries); + env = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); + rm->entries = htonl (entries); + rm->amount = GNUNET_htonll (amount); + + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, + env, + UINT_MAX, + UINT_MAX, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry to reserve\n"); return NULL; - rm = (struct ReserveMessage*) &qe[1]; - rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); - rm->header.size = htons(sizeof (struct ReserveMessage)); - rm->entries = htonl(entries); - rm->amount = GNUNET_htonll(amount); + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# RESERVE requests executed"), + 1, + GNUNET_NO); process_queue (h); return qe; } @@ -794,454 +1084,372 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done - * @param cont_cls closure for cont + * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel; note that even if NULL is returned, the callback will be invoked * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, - int rid, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) + uint32_t rid, + unsigned int queue_priority, + unsigned int max_queue_size, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct ReleaseReserveMessage *rrm; - struct StatusContext *scont; + union QueueContext qc; - if (cont == NULL) + if (NULL == cont) cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to release reserve %d\n", - rid); -#endif - scont = GNUNET_malloc (sizeof (struct StatusContext)); - scont->cont = cont; - scont->cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, scont); - if (qe == NULL) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to release reserve %d\n", + rid); + env = GNUNET_MQ_msg (rrm, + GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); + rrm->rid = htonl (rid); + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry to release reserve\n"); return NULL; - rrm = (struct ReleaseReserveMessage*) &qe[1]; - rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); - rrm->header.size = htons(sizeof (struct ReleaseReserveMessage)); - rrm->rid = htonl(rid); + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# RELEASE RESERVE requests executed"), 1, + GNUNET_NO); process_queue (h); return qe; } /** - * Update a value in the datastore. + * Explicitly remove some content from the database. + * The @a cont continuation will be called with `status` + * #GNUNET_OK" if content was removed, #GNUNET_NO + * if no matching entry was found and #GNUNET_SYSERR + * on all other types of errors. * * @param h handle to the datastore - * @param uid identifier for the value - * @param priority how much to increase the priority of the value - * @param expiration new expiration value should be MAX of existing and this argument + * @param key key for the value + * @param size number of bytes in data + * @param data content stored * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done - * @param cont_cls closure for cont + * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel; note that even if NULL is returned, the callback will be invoked * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, - unsigned long long uid, - uint32_t priority, - struct GNUNET_TIME_Absolute expiration, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) +GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, + const struct GNUNET_HashCode *key, + size_t size, + const void *data, + unsigned int queue_priority, + unsigned int max_queue_size, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; - struct UpdateMessage *um; - struct StatusContext *scont; + struct DataMessage *dm; + struct GNUNET_MQ_Envelope *env; + union QueueContext qc; - if (cont == NULL) + if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + return NULL; + } + if (NULL == cont) cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to update entry %llu raising priority by %u and expiration to %llu\n", - uid, - (unsigned int) priority, - (unsigned long long) expiration.value); -#endif - scont = GNUNET_malloc (sizeof (struct StatusContext)); - scont->cont = cont; - scont->cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof(struct UpdateMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, scont); - if (qe == NULL) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to remove %u bytes under key `%s'\n", + size, + GNUNET_h2s (key)); + env = GNUNET_MQ_msg_extra (dm, + size, + GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); + dm->rid = htonl (0); + dm->size = htonl (size); + dm->type = htonl (0); + dm->priority = htonl (0); + dm->anonymity = htonl (0); + dm->uid = GNUNET_htonll (0); + dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); + dm->key = *key; + GNUNET_memcpy (&dm[1], + data, + size); + + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for REMOVE\n"); return NULL; - um = (struct UpdateMessage*) &qe[1]; - um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); - um->header.size = htons(sizeof (struct UpdateMessage)); - um->priority = htonl(priority); - um->expiration = GNUNET_TIME_absolute_hton(expiration); - um->uid = GNUNET_htonll(uid); + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# REMOVE requests executed"), + 1, + GNUNET_NO); process_queue (h); return qe; } + /** - * Explicitly remove some content from the database. - * The "cont"inuation will be called with status - * "GNUNET_OK" if content was removed, "GNUNET_NO" - * if no matching entry was found and "GNUNET_SYSERR" - * on all other types of errors. + * Get a random value from the datastore for content replication. + * Returns a single, random value among those with the highest + * replication score, lowering positive replication scores by one for + * the chosen value (if only content with a replication score exists, + * a random value is returned and replication scores are not changed). * * @param h handle to the datastore - * @param key key for the value - * @param size number of bytes in data - * @param data content stored * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response - * @param cont continuation to call when done - * @param cont_cls closure for cont + * @param proc function to call on a random value; it + * will be called once with a value (if available) + * and always once with a value of NULL. + * @param proc_cls closure for @a proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode *key, - uint32_t size, - const void *data, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) +GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, + unsigned int queue_priority, + unsigned int max_queue_size, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; - struct DataMessage *dm; - size_t msize; - struct StatusContext *scont; - - if (cont == NULL) - cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to remove %u bytes under key `%s'\n", - size, - GNUNET_h2s (key)); -#endif - scont = GNUNET_malloc (sizeof (struct StatusContext)); - scont->cont = cont; - scont->cont_cls = cont_cls; - msize = sizeof(struct DataMessage) + size; - GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - qe = make_queue_entry (h, msize, - queue_priority, max_queue_size, timeout, - &process_status_message, scont); - if (qe == NULL) + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *m; + union QueueContext qc; + + GNUNET_assert (NULL != proc); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to get replication entry\n"); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + &qc); + if (NULL == qe) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for GET REPLICATION\n"); return NULL; - dm = (struct DataMessage*) &qe[1]; - dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); - dm->header.size = htons(msize); - dm->rid = htonl(0); - dm->size = htonl(size); - dm->type = htonl(0); - dm->priority = htonl(0); - dm->anonymity = htonl(0); - dm->uid = GNUNET_htonll(0); - dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS); - dm->key = *key; - memcpy (&dm[1], data, size); + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# GET REPLICATION requests executed"), 1, + GNUNET_NO); process_queue (h); return qe; } - -/** - * Context for processing result messages. - */ -struct ResultContext -{ - /** - * Iterator to call with the result. - */ - GNUNET_DATASTORE_Iterator iter; - - /** - * Closure for iter. - */ - void *iter_cls; - -}; - - -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -process_result_message (void *cls, - const struct GNUNET_MessageHeader * msg) -{ - struct GNUNET_DATASTORE_QueueEntry *qe = cls; - struct GNUNET_DATASTORE_Handle *h = qe->h; - struct ResultContext *rc = qe->client_ctx; - const struct DataMessage *dm; - - GNUNET_assert (h->queue_head == qe); - if (msg == NULL) - { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to receive response from datastore\n")); -#endif - free_queue_entry (qe); - do_disconnect (h); - rc->iter (rc->iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (rc); - return; - } - if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) - { - GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received end of result set\n"); -#endif - free_queue_entry (qe); - rc->iter (rc->iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (rc); - process_queue (h); - return; - } - if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || - (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || - (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) - { - GNUNET_break (0); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - rc->iter (rc->iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (rc); - return; - } - dm = (const struct DataMessage*) msg; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received result %llu with type %u and size %u with key %s\n", - (unsigned long long) GNUNET_ntohll(dm->uid), - ntohl(dm->type), - ntohl(dm->size), - GNUNET_h2s(&dm->key)); -#endif - rc->iter (rc->iter_cls, - &dm->key, - ntohl(dm->size), - &dm[1], - ntohl(dm->type), - ntohl(dm->priority), - ntohl(dm->anonymity), - GNUNET_TIME_absolute_ntoh(dm->expiration), - GNUNET_ntohll(dm->uid)); -} - - /** - * Get a random value from the datastore. + * Get a single zero-anonymity value from the datastore. * * @param h handle to the datastore + * @param offset offset of the result (modulo num-results); set to + * a random 64-bit value initially; then increment by + * one each time; detect that all results have been found by uid + * being again the first uid ever returned. * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response - * @param iter function to call on a random value; it + * @param type allowed type for the operation (never zero) + * @param proc function to call on a random value; it * will be called once with a value (if available) - * and always once with a value of NULL. - * @param iter_cls closure for iter + * or with NULL if none value exists. + * @param proc_cls closure for @a proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls) +GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, + uint64_t offset, + unsigned int queue_priority, + unsigned int max_queue_size, + enum GNUNET_BLOCK_Type type, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; - struct GNUNET_MessageHeader *m; - struct ResultContext *rcont; - -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get random entry in %llu ms\n", - (unsigned long long) timeout.value); -#endif - rcont = GNUNET_malloc (sizeof (struct ResultContext)); - rcont->iter = iter; - rcont->iter_cls = iter_cls; - qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), - queue_priority, max_queue_size, timeout, - &process_result_message, rcont); - if (qe == NULL) - return NULL; - m = (struct GNUNET_MessageHeader*) &qe[1]; - m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM); - m->size = htons(sizeof (struct GNUNET_MessageHeader)); + struct GNUNET_MQ_Envelope *env; + struct GetZeroAnonymityMessage *m; + union QueueContext qc; + + GNUNET_assert (NULL != proc); + GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to get %llu-th zero-anonymity entry of type %d\n", + (unsigned long long) offset, + type); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); + m->type = htonl ((uint32_t) type); + m->offset = GNUNET_htonll (offset); + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + &qc); + if (NULL == qe) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for zero-anonymity procation\n"); + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# GET ZERO ANONYMITY requests executed"), 1, + GNUNET_NO); process_queue (h); return qe; } - /** - * Iterate over the results for a particular key - * in the datastore. The iterator will only be called - * once initially; if the first call did contain a - * result, further results can be obtained by calling - * "GNUNET_DATASTORE_get_next" with the given argument. + * Get a result for a particular key from the datastore. The processor + * will only be called once. * * @param h handle to the datastore + * @param offset offset of the result (modulo num-results); set to + * a random 64-bit value initially; then increment by + * one each time; detect that all results have been found by uid + * being again the first uid ever returned. * @param key maybe NULL (to match all entries) * @param type desired type, 0 for any * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response - * @param iter function to call on each matching value; + * @param proc function to call on each matching value; * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param proc_cls closure for @a proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode * key, - enum GNUNET_BLOCK_Type type, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls) +GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, + uint64_t offset, + const struct GNUNET_HashCode *key, + enum GNUNET_BLOCK_Type type, + unsigned int queue_priority, + unsigned int max_queue_size, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; + struct GetKeyMessage *gkm; struct GetMessage *gm; - struct ResultContext *rcont; - -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to look for data of type %u under key `%s'\n", - (unsigned int) type, - GNUNET_h2s (key)); -#endif - rcont = GNUNET_malloc (sizeof (struct ResultContext)); - rcont->iter = iter; - rcont->iter_cls = iter_cls; - qe = make_queue_entry (h, sizeof(struct GetMessage), - queue_priority, max_queue_size, timeout, - &process_result_message, rcont); - if (qe == NULL) - return NULL; - gm = (struct GetMessage*) &qe[1]; - gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); - gm->type = htonl(type); - if (key != NULL) - { - gm->header.size = htons(sizeof (struct GetMessage)); - gm->key = *key; - } + union QueueContext qc; + + GNUNET_assert (NULL != proc); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to look for data of type %u under key `%s'\n", + (unsigned int) type, + GNUNET_h2s (key)); + if (NULL == key) + { + env = GNUNET_MQ_msg (gm, + GNUNET_MESSAGE_TYPE_DATASTORE_GET); + gm->type = htonl (type); + gm->offset = GNUNET_htonll (offset); + } else - { - gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode)); - } + { + env = GNUNET_MQ_msg (gkm, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); + gkm->type = htonl (type); + gkm->offset = GNUNET_htonll (offset); + gkm->key = *key; + } + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + &qc); + if (NULL == qe) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not queue request for `%s'\n", + GNUNET_h2s (key)); + return NULL; + } +#if INSANE_STATISTICS + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# GET requests executed"), + 1, + GNUNET_NO); +#endif process_queue (h); return qe; } -/** - * Function called to trigger obtaining the next result - * from the datastore. - * - * @param h handle to the datastore - * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort - * iteration (with a final call to "iter" with key/data == NULL). - */ -void -GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, - int more) -{ - struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; - struct ResultContext *rc = qe->client_ctx; - - GNUNET_assert (NULL != qe); - GNUNET_assert (&process_result_message == qe->response_proc); - if (GNUNET_YES == more) - { - GNUNET_CLIENT_receive (h->client, - qe->response_proc, - qe, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); - return; - } - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - rc->iter (rc->iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (rc); -} - - /** * Cancel a datastore operation. The final callback from the * operation must not have been done yet. - * + * * @param qe operation to cancel */ void GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) { - struct GNUNET_DATASTORE_Handle *h; - int reconnect; + struct GNUNET_DATASTORE_Handle *h = qe->h; - h = qe->h; - reconnect = qe->was_transmitted; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Pending DATASTORE request %p cancelled (%d, %d)\n", + qe, + NULL == qe->env, + h->queue_head == qe); + if (NULL == qe->env) + { + free_queue_entry (qe); + h->skip_next_messages++; + return; + } free_queue_entry (qe); - h->queue_size--; - if (reconnect) - { - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - } + process_queue (h); }