X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdatastore%2Fdatastore_api.c;h=916e6acaef930121af9a4bfa04f0be4c72437d35;hb=0945dcf2c250dea65d520ef26f9917e9be3ac4ac;hp=8a8f64eb392548210d7c37d7ac8846f063cfaada;hpb=16a6919a9f98ee9fa1fee9dd262906c321004a19;p=oweals%2Fgnunet.git diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 8a8f64eb3..916e6acae 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2004-2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,14 +14,14 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file datastore/datastore_api.c * @brief Management for the datastore for files stored on a GNUnet node. Implements - * a priority queue for requests (with timeouts). + * a priority queue for requests * @author Christian Grothoff */ #include "platform.h" @@ -31,10 +31,17 @@ #include "gnunet_statistics_service.h" #include "datastore.h" +#define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__) + +/** + * Collect an instane number of statistics? May cause excessive IPC. + */ +#define INSANE_STATISTICS GNUNET_NO + /** * If a client stopped asking for more results, how many more do * we receive from the DB before killing the connection? Trade-off - * between re-doing TCP handshakes and (needlessly) receiving + * between re-doing TCP handshakes and (needlessly) receiving * useless results. */ #define MAX_EXCESS_RESULTS 8 @@ -50,7 +57,7 @@ struct StatusContext GNUNET_DATASTORE_ContinuationWithStatus cont; /** - * Closure for cont. + * Closure for @e cont. */ void *cont_cls; @@ -68,7 +75,7 @@ struct ResultContext GNUNET_DATASTORE_DatumProcessor proc; /** - * Closure for proc. + * Closure for @e proc. */ void *proc_cls; @@ -88,7 +95,6 @@ union QueueContext }; - /** * Entry in our priority queue. */ @@ -110,20 +116,13 @@ struct GNUNET_DATASTORE_QueueEntry */ struct GNUNET_DATASTORE_Handle *h; - /** - * Response processor (NULL if we are not waiting for a response). - * This struct should be used for the closure, function-specific - * arguments can be passed via 'qc'. - */ - GNUNET_CLIENT_MessageHandler response_proc; - /** * Function to call after transmission of the request. */ GNUNET_DATASTORE_ContinuationWithStatus cont; /** - * Closure for 'cont'. + * Closure for @e cont. */ void *cont_cls; @@ -133,14 +132,10 @@ struct GNUNET_DATASTORE_QueueEntry union QueueContext qc; /** - * Task for timeout signalling. + * Envelope of the request to transmit, NULL after + * transmission. */ - GNUNET_SCHEDULER_TaskIdentifier task; - - /** - * Timeout for the current operation. - */ - struct GNUNET_TIME_Absolute timeout; + struct GNUNET_MQ_Envelope *env; /** * Priority in the queue. @@ -154,24 +149,15 @@ struct GNUNET_DATASTORE_QueueEntry unsigned int max_queue; /** - * Number of bytes in the request message following - * this struct. 32-bit value for nicer memory - * access (and overall struct alignment). + * Expected response type. */ - uint32_t message_size; - - /** - * Has this message been transmitted to the service? - * Only ever GNUNET_YES for the head of the queue. - * Note that the overall struct should end at a - * multiple of 64 bits. - */ - int was_transmitted; + uint16_t response_type; }; + /** - * Handle to the datastore service. + * Handle to the datastore service. */ struct GNUNET_DATASTORE_Handle { @@ -184,18 +170,13 @@ struct GNUNET_DATASTORE_Handle /** * Current connection to the datastore service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Handle for statistics. */ struct GNUNET_STATISTICS_Handle *stats; - /** - * Current transmit handle. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - /** * Current head of priority queue. */ @@ -209,7 +190,7 @@ struct GNUNET_DATASTORE_Handle /** * Task for trying to reconnect. */ - GNUNET_SCHEDULER_TaskIdentifier reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** * How quickly should we retry? Used for exponential back-off on @@ -229,11 +210,6 @@ struct GNUNET_DATASTORE_Handle */ unsigned int result_count; - /** - * Are we currently trying to receive from the service? - */ - int in_receive; - /** * We should ignore the next message(s) from the service. */ @@ -242,6 +218,111 @@ struct GNUNET_DATASTORE_Handle }; +/** + * Try reconnecting to the datastore service. + * + * @param cls the `struct GNUNET_DATASTORE_Handle` + */ +static void +try_reconnect (void *cls); + + +/** + * Disconnect from the service and then try reconnecting to the datastore service + * after some delay. + * + * @param h handle to datastore to disconnect and reconnect + */ +static void +do_disconnect (struct GNUNET_DATASTORE_Handle *h) +{ + if (NULL == h->mq) + { + GNUNET_break (0); + return; + } + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; + h->skip_next_messages = 0; + h->reconnect_task + = GNUNET_SCHEDULER_add_delayed (h->retry_time, + &try_reconnect, + h); +} + + +/** + * Free a queue entry. Removes the given entry from the + * queue and releases associated resources. Does NOT + * call the callback. + * + * @param qe entry to free. + */ +static void +free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) +{ + struct GNUNET_DATASTORE_Handle *h = qe->h; + + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + h->queue_size--; + if (NULL != qe->env) + GNUNET_MQ_discard (qe->env); + GNUNET_free (qe); +} + + +/** + * Handle error in sending drop request to datastore. + * + * @param cls closure with the datastore handle + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "MQ error, reconnecting to DATASTORE\n"); + do_disconnect (h); + qe = h->queue_head; + if ( (NULL != qe) && + (NULL == qe->env) ) + { + union QueueContext qc = qe->qc; + uint16_t rt = qe->response_type; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Failed to receive response from database.\n"); + free_queue_entry (qe); + switch (rt) + { + case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: + if (NULL != qc.sc.cont) + qc.sc.cont (qc.sc.cont_cls, + GNUNET_SYSERR, + GNUNET_TIME_UNIT_ZERO_ABS, + _("DATASTORE disconnected")); + break; + case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: + if (NULL != qc.rc.proc) + qc.rc.proc (qc.rc.proc_cls, + NULL, + 0, + NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); + break; + default: + GNUNET_break (0); + } + } +} + /** * Connect to the datastore service. @@ -252,48 +333,58 @@ struct GNUNET_DATASTORE_Handle struct GNUNET_DATASTORE_Handle * GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) { - struct GNUNET_CLIENT_Connection *c; struct GNUNET_DATASTORE_Handle *h; - c = GNUNET_CLIENT_connect ("datastore", cfg); - if (c == NULL) - return NULL; /* oops */ - h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) + - GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); - h->client = c; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Establishing DATASTORE connection!\n"); + h = GNUNET_new (struct GNUNET_DATASTORE_Handle); h->cfg = cfg; - h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); + try_reconnect (h); + if (NULL == h->mq) + { + GNUNET_free (h); + return NULL; + } + h->stats = GNUNET_STATISTICS_create ("datastore-api", + cfg); return h; } /** - * Transmit DROP message to datastore service. + * Task used by to disconnect from the datastore after + * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message. * - * @param cls the 'struct GNUNET_DATASTORE_Handle' - * @param size number of bytes that can be copied to buf - * @param buf where to copy the drop message - * @return number of bytes written to buf + * @param cls the datastore handle */ -static size_t -transmit_drop (void *cls, size_t size, void *buf) +static void +disconnect_after_drop (void *cls) { struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_MessageHeader *hdr; - if (buf == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to drop database.\n")); - GNUNET_DATASTORE_disconnect (h, GNUNET_NO); - return 0; - } - GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); - hdr = buf; - hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); - hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP); - GNUNET_DATASTORE_disconnect (h, GNUNET_NO); - return sizeof (struct GNUNET_MessageHeader); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Drop sent, disconnecting\n"); + GNUNET_DATASTORE_disconnect (h, + GNUNET_NO); +} + + +/** + * Handle error in sending drop request to datastore. + * + * @param cls closure with the datastore handle + * @param error error code + */ +static void +disconnect_on_mq_error (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + + LOG (GNUNET_ERROR_TYPE_ERROR, + "Failed to ask datastore to drop tables\n"); + GNUNET_DATASTORE_disconnect (h, + GNUNET_NO); } @@ -302,132 +393,146 @@ transmit_drop (void *cls, size_t size, void *buf) * associated resources). * * @param h handle to the datastore - * @param drop set to GNUNET_YES to delete all data in datastore (!) + * @param drop set to #GNUNET_YES to delete all data in datastore (!) */ void -GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop) +GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, + int drop) { struct GNUNET_DATASTORE_QueueEntry *qe; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n"); -#endif - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - if (h->client != NULL) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Datastore disconnect\n"); + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } - if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK) + if (NULL != h->reconnect_task) { GNUNET_SCHEDULER_cancel (h->reconnect_task); - h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; + h->reconnect_task = NULL; } while (NULL != (qe = h->queue_head)) { - GNUNET_assert (NULL != qe->response_proc); - qe->response_proc (h, NULL); + switch (qe->response_type) + { + case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: + if (NULL != qe->qc.sc.cont) + qe->qc.sc.cont (qe->qc.sc.cont_cls, + GNUNET_SYSERR, + GNUNET_TIME_UNIT_ZERO_ABS, + _("Disconnected from DATASTORE")); + break; + case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: + if (NULL != qe->qc.rc.proc) + qe->qc.rc.proc (qe->qc.rc.proc_cls, + NULL, + 0, + NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); + break; + default: + GNUNET_break (0); + } + free_queue_entry (qe); } if (GNUNET_YES == drop) { - h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); - if (h->client != NULL) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Re-connecting to issue DROP!\n"); + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connect (h->cfg, + "datastore", + NULL, + &disconnect_on_mq_error, + h); + if (NULL != h->mq) { - if (NULL != - GNUNET_CLIENT_notify_transmit_ready (h->client, - sizeof (struct - GNUNET_MessageHeader), - GNUNET_TIME_UNIT_MINUTES, - GNUNET_YES, &transmit_drop, h)) - return; - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = NULL; + struct GNUNET_MessageHeader *hdr; + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg (hdr, + GNUNET_MESSAGE_TYPE_DATASTORE_DROP); + GNUNET_MQ_notify_sent (env, + &disconnect_after_drop, + h); + GNUNET_MQ_send (h->mq, + env); + return; } GNUNET_break (0); } - GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO); + GNUNET_STATISTICS_destroy (h->stats, + GNUNET_NO); h->stats = NULL; GNUNET_free (h); } -/** - * A request has timed out (before being transmitted to the service). - * - * @param cls the 'struct GNUNET_DATASTORE_QueueEntry' - * @param tc scheduler context - */ -static void -timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_DATASTORE_QueueEntry *qe = cls; - - GNUNET_STATISTICS_update (qe->h->stats, - gettext_noop ("# queue entry timeouts"), 1, - GNUNET_NO); - qe->task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (qe->was_transmitted == GNUNET_NO); -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Timeout of request in datastore queue\n"); -#endif - qe->response_proc (qe->h, NULL); -} - - /** * Create a new entry for our priority queue (and possibly discard other entires if * the queue is getting too long). * * @param h handle to the datastore - * @param msize size of the message to queue + * @param env envelope with the message to queue * @param queue_priority priority of the entry * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout timeout for the operation - * @param response_proc function to call with replies (can be NULL) - * @param qc client context (NOT a closure for response_proc) - * @return NULL if the queue is full + * @param expected_type which type of response do we expect, + * #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or + * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA + * @param qc client context (NOT a closure for @a response_proc) + * @return NULL if the queue is full */ static struct GNUNET_DATASTORE_QueueEntry * -make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize, - unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_CLIENT_MessageHandler response_proc, +make_queue_entry (struct GNUNET_DATASTORE_Handle *h, + struct GNUNET_MQ_Envelope *env, + unsigned int queue_priority, + unsigned int max_queue_size, + uint16_t expected_type, const union QueueContext *qc) { - struct GNUNET_DATASTORE_QueueEntry *ret; + struct GNUNET_DATASTORE_QueueEntry *qe; struct GNUNET_DATASTORE_QueueEntry *pos; unsigned int c; - c = 0; - pos = h->queue_head; - while ((pos != NULL) && (c < max_queue_size) && - (pos->priority >= queue_priority)) + if ( (NULL != h->queue_tail) && + (h->queue_tail->priority >= queue_priority) ) + { + c = h->queue_size; + pos = NULL; + } + else + { + c = 0; + pos = h->queue_head; + } + while ( (NULL != pos) && + (c < max_queue_size) && + (pos->priority >= queue_priority) ) { c++; pos = pos->next; } if (c >= max_queue_size) { - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1, + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue overflows"), + 1, GNUNET_NO); + GNUNET_MQ_discard (env); return NULL; } - ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); - ret->h = h; - ret->response_proc = response_proc; - ret->qc = *qc; - ret->timeout = GNUNET_TIME_relative_to_absolute (timeout); - ret->priority = queue_priority; - ret->max_queue = max_queue_size; - ret->message_size = msize; - ret->was_transmitted = GNUNET_NO; - if (pos == NULL) + qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry); + qe->h = h; + qe->env = env; + qe->response_type = expected_type; + qe->qc = *qc; + qe->priority = queue_priority; + qe->max_queue = max_queue_size; + if (NULL == pos) { /* append at the tail */ pos = h->queue_tail; @@ -437,135 +542,115 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize, pos = pos->prev; /* do not insert at HEAD if HEAD query was already * transmitted and we are still receiving replies! */ - if ((pos == NULL) && (h->queue_head->was_transmitted)) + if ( (NULL == pos) && + (NULL == h->queue_head->env) ) pos = h->queue_head; } c++; - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"), - 1, GNUNET_NO); - GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret); - h->queue_size++; - ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret); - pos = ret->next; - while (pos != NULL) - { - if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO)) - { - GNUNET_assert (pos->response_proc != NULL); - /* move 'pos' element to head so that it will be - * killed on 'NULL' call below */ -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropping request from datastore queue\n"); +#if INSANE_STATISTICS + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue entries created"), + 1, + GNUNET_NO); #endif - GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos); - GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos); - GNUNET_STATISTICS_update (h->stats, - gettext_noop - ("# Requests dropped from datastore queue"), 1, - GNUNET_NO); - GNUNET_assert (h->queue_head == pos); - pos->response_proc (h, NULL); - break; - } - pos = pos->next; - } - return ret; + GNUNET_CONTAINER_DLL_insert_after (h->queue_head, + h->queue_tail, + pos, + qe); + h->queue_size++; + return qe; } /** * Process entries in the queue (or do nothing if we are already * doing so). - * - * @param h handle to the datastore - */ -static void -process_queue (struct GNUNET_DATASTORE_Handle *h); - - -/** - * Try reconnecting to the datastore service. * - * @param cls the 'struct GNUNET_DATASTORE_Handle' - * @param tc scheduler context + * @param h handle to the datastore */ static void -try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +process_queue (struct GNUNET_DATASTORE_Handle *h) { - struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; - if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value) - h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY; - else - h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2); - if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value) - h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT; - h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); - if (h->client == NULL) + if (NULL == (qe = h->queue_head)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "DATASTORE reconnect failed (fatally)\n"); + /* no entry in queue */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queue empty\n"); return; } - GNUNET_STATISTICS_update (h->stats, - gettext_noop - ("# datastore connections (re)created"), 1, - GNUNET_NO); -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); -#endif - process_queue (h); + if (NULL == qe->env) + { + /* waiting for replies */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Head request already transmitted\n"); + return; + } + if (NULL == h->mq) + { + /* waiting for reconnect */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Not connected\n"); + return; + } + GNUNET_MQ_send (h->mq, + qe->env); + qe->env = NULL; } + + /** - * Disconnect from the service and then try reconnecting to the datastore service - * after some delay. + * Function called to check status message from the service. * - * @param h handle to datastore to disconnect and reconnect + * @param cls closure + * @param sm status message received + * @return #GNUNET_OK if the message is well-formed */ -static void -do_disconnect (struct GNUNET_DATASTORE_Handle *h) +static int +check_status (void *cls, + const struct StatusMessage *sm) { - if (h->client == NULL) + uint16_t msize = ntohs (sm->header.size) - sizeof (*sm); + int32_t status = ntohl (sm->status); + + if (msize > 0) { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "client NULL in disconnect, will not try to reconnect\n"); -#endif - return; + const char *emsg = (const char *) &sm[1]; + + if ('\0' != emsg[msize - 1]) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } } -#if 0 - GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"), - 1, GNUNET_NO); -#endif - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->skip_next_messages = 0; - h->client = NULL; - h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h); + else if (GNUNET_SYSERR == status) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** - * Function called whenever we receive a message from - * the service. Calls the appropriate handler. + * Function called to handle status message from the service. * - * @param cls the 'struct GNUNET_DATASTORE_Handle' - * @param msg the received message + * @param cls closure + * @param sm status message received */ static void -receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) +handle_status (void *cls, + const struct StatusMessage *sm) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; + struct StatusContext rc; + const char *emsg; + int32_t status = ntohl (sm->status); - h->in_receive = GNUNET_NO; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n"); -#endif if (h->skip_next_messages > 0) { h->skip_next_messages--; @@ -575,240 +660,255 @@ receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) if (NULL == (qe = h->queue_head)) { GNUNET_break (0); - process_queue (h); + do_disconnect (h); + return; + } + if (NULL != qe->env) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type) + { + GNUNET_break (0); + do_disconnect (h); return; } - qe->response_proc (h, msg); + rc = qe->qc.sc; + free_queue_entry (qe); + if (ntohs (sm->header.size) > sizeof (struct StatusMessage)) + emsg = (const char *) &sm[1]; + else + emsg = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received status %d/%s\n", + (int) status, + emsg); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# status messages received"), + 1, + GNUNET_NO); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + process_queue (h); + if (NULL != rc.cont) + rc.cont (rc.cont_cls, + status, + GNUNET_TIME_absolute_ntoh (sm->min_expiration), + emsg); } /** - * Transmit request from queue to datastore service. + * Check data message we received from the service. * - * @param cls the 'struct GNUNET_DATASTORE_Handle' - * @param size number of bytes that can be copied to buf - * @param buf where to copy the drop message - * @return number of bytes written to buf + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param dm message received */ -static size_t -transmit_request (void *cls, size_t size, void *buf) +static int +check_data (void *cls, + const struct DataMessage *dm) { - struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_DATASTORE_QueueEntry *qe; - size_t msize; + uint16_t msize = ntohs (dm->header.size) - sizeof (*dm); - h->th = NULL; - if (NULL == (qe = h->queue_head)) - return 0; /* no entry in queue */ - if (buf == NULL) + if (msize != ntohl (dm->size)) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to DATASTORE.\n")); - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# transmission request failures"), - 1, GNUNET_NO); - do_disconnect (h); - return 0; - } - if (size < (msize = qe->message_size)) - { - process_queue (h); - return 0; + GNUNET_break (0); + return GNUNET_SYSERR; } -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u byte request to DATASTORE\n", msize); -#endif - memcpy (buf, &qe[1], msize); - qe->was_transmitted = GNUNET_YES; - GNUNET_SCHEDULER_cancel (qe->task); - qe->task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (GNUNET_NO == h->in_receive); - h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, &receive_cb, h, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# bytes sent to datastore"), 1, - GNUNET_NO); - return msize; + return GNUNET_OK; } /** - * Process entries in the queue (or do nothing if we are already - * doing so). - * - * @param h handle to the datastore + * Handle data message we got from the service. + * + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param dm message received */ static void -process_queue (struct GNUNET_DATASTORE_Handle *h) +handle_data (void *cls, + const struct DataMessage *dm) { + struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; + struct ResultContext rc; - if (NULL == (qe = h->queue_head)) - { -#if DEBUG_DATASTORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n"); -#endif - return; /* no entry in queue */ - } - if (qe->was_transmitted == GNUNET_YES) + if (h->skip_next_messages > 0) { -#if DEBUG_DATASTORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n"); -#endif - return; /* waiting for replies */ + process_queue (h); + return; } - if (h->th != NULL) + qe = h->queue_head; + if (NULL == qe) { -#if DEBUG_DATASTORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n"); -#endif - return; /* request pending */ + GNUNET_break (0); + do_disconnect (h); + return; } - if (h->client == NULL) + if (NULL != qe->env) { -#if DEBUG_DATASTORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n"); -#endif - return; /* waiting for reconnect */ + GNUNET_break (0); + do_disconnect (h); + return; } - if (GNUNET_YES == h->in_receive) + if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) { - /* wait for response to previous query */ + GNUNET_break (0); + do_disconnect (h); return; } -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queueing %u byte request to DATASTORE\n", qe->message_size); +#if INSANE_STATISTICS + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# Results received"), + 1, + GNUNET_NO); #endif - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, - GNUNET_TIME_absolute_get_remaining - (qe->timeout), GNUNET_YES, - &transmit_request, h); - GNUNET_assert (GNUNET_NO == h->in_receive); - GNUNET_break (NULL != h->th); -} - - -/** - * Dummy continuation used to do nothing (but be non-zero). - * - * @param cls closure - * @param result result - * @param emsg error message - */ -static void -drop_status_cont (void *cls, int32_t result, const char *emsg) -{ - /* do nothing */ -} - - -/** - * Free a queue entry. Removes the given entry from the - * queue and releases associated resources. Does NOT - * call the callback. - * - * @param qe entry to free. - */ -static void -free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) -{ - struct GNUNET_DATASTORE_Handle *h = qe->h; - - GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe); - if (qe->task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (qe->task); - qe->task = GNUNET_SCHEDULER_NO_TASK; - } - h->queue_size--; - qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ - GNUNET_free (qe); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received result %llu with type %u and size %u with key %s\n", + (unsigned long long) GNUNET_ntohll (dm->uid), + ntohl (dm->type), + ntohl (dm->size), + GNUNET_h2s (&dm->key)); + rc = qe->qc.rc; + free_queue_entry (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + process_queue (h); + if (NULL != rc.proc) + rc.proc (rc.proc_cls, + &dm->key, + ntohl (dm->size), + &dm[1], + ntohl (dm->type), + ntohl (dm->priority), + ntohl (dm->anonymity), + GNUNET_TIME_absolute_ntoh (dm->expiration), + GNUNET_ntohll (dm->uid)); } /** - * Type of a function to call when we receive a message - * from the service. + * Type of a function to call when we receive a + * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service. * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error + * @param cls closure with the `struct GNUNET_DATASTORE_Handle *` + * @param msg message received */ static void -process_status_message (void *cls, const struct GNUNET_MessageHeader *msg) +handle_data_end (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; - struct StatusContext rc; - const struct StatusMessage *sm; - const char *emsg; - int32_t status; - int was_transmitted; + struct ResultContext rc; - if (NULL == (qe = h->queue_head)) + if (h->skip_next_messages > 0) { - GNUNET_break (0); - do_disconnect (h); + h->skip_next_messages--; + process_queue (h); return; } - rc = qe->qc.sc; - if (msg == NULL) + qe = h->queue_head; + if (NULL == qe) { - was_transmitted = qe->was_transmitted; - free_queue_entry (qe); - if (was_transmitted == GNUNET_YES) - do_disconnect (h); - else - process_queue (h); - if (rc.cont != NULL) - rc.cont (rc.cont_cls, GNUNET_SYSERR, - _("Failed to receive status response from database.")); + GNUNET_break (0); + do_disconnect (h); return; } - GNUNET_assert (GNUNET_YES == qe->was_transmitted); - free_queue_entry (qe); - if ((ntohs (msg->size) < sizeof (struct StatusMessage)) || - (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS)) + if (NULL != qe->env) { GNUNET_break (0); - h->retry_time = GNUNET_TIME_UNIT_ZERO; do_disconnect (h); - if (rc.cont != NULL) - rc.cont (rc.cont_cls, GNUNET_SYSERR, - _("Error reading response from datastore service")); return; } - sm = (const struct StatusMessage *) msg; - status = ntohl (sm->status); - emsg = NULL; - if (ntohs (msg->size) > sizeof (struct StatusMessage)) - { - emsg = (const char *) &sm[1]; - if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0') - { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); - } - } - if ((status == GNUNET_SYSERR) && (emsg == NULL)) + if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) { GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); + do_disconnect (h); + return; } -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, - emsg); -#endif + rc = qe->qc.rc; + free_queue_entry (qe); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received end of result set, new queue size is %u\n", + h->queue_size); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + h->result_count = 0; + process_queue (h); + /* signal end of iteration */ + if (NULL != rc.proc) + rc.proc (rc.proc_cls, + NULL, + 0, + NULL, + 0, + 0, + 0, + GNUNET_TIME_UNIT_ZERO_ABS, + 0); +} + + +/** + * Try reconnecting to the datastore service. + * + * @param cls the `struct GNUNET_DATASTORE_Handle` + */ +static void +try_reconnect (void *cls) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (status, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + struct StatusMessage, + h), + GNUNET_MQ_hd_var_size (data, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + struct DataMessage, + h), + GNUNET_MQ_hd_fixed_size (data_end, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END, + struct GNUNET_MessageHeader, + h), + GNUNET_MQ_handler_end () + }; + + h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); + h->reconnect_task = NULL; + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connect (h->cfg, + "datastore", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) + return; GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# status messages received"), 1, + gettext_noop ("# datastore connections (re)created"), + 1, GNUNET_NO); - h->retry_time.rel_value = 0; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Reconnected to DATASTORE\n"); process_queue (h); - if (rc.cont != NULL) - rc.cont (rc.cont_cls, status, emsg); +} + + +/** + * Dummy continuation used to do nothing (but be non-zero). + * + * @param cls closure + * @param result result + * @param min_expiration expiration time + * @param emsg error message + */ +static void +drop_status_cont (void *cls, + int32_t result, + struct GNUNET_TIME_Absolute min_expiration, + const char *emsg) +{ + /* do nothing */ } @@ -831,55 +931,48 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader *msg) * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout timeout for the operation * @param cont continuation to call when done - * @param cont_cls closure for cont + * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel; note that even if NULL is returned, the callback will be invoked * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid, - const GNUNET_HashCode * key, size_t size, - const void *data, enum GNUNET_BLOCK_Type type, - uint32_t priority, uint32_t anonymity, +GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, + uint32_t rid, + const struct GNUNET_HashCode *key, + size_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, - unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, + unsigned int queue_priority, + unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct DataMessage *dm; - size_t msize; union QueueContext qc; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to put %u bytes of data under key `%s' for %llu ms\n", - size, GNUNET_h2s (key), - GNUNET_TIME_absolute_get_remaining (expiration).rel_value); -#endif - msize = sizeof (struct DataMessage) + size; - GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - qc.sc.cont = cont; - qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, - &process_status_message, &qc); - if (qe == NULL) + if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for PUT\n"); -#endif + GNUNET_break (0); return NULL; } - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"), - 1, GNUNET_NO); - dm = (struct DataMessage *) &qe[1]; - dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT); - dm->header.size = htons (msize); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to put %u bytes of data under key `%s' for %s\n", + size, + GNUNET_h2s (key), + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), + GNUNET_YES)); + env = GNUNET_MQ_msg_extra (dm, + size, + GNUNET_MESSAGE_TYPE_DATASTORE_PUT); dm->rid = htonl (rid); dm->size = htonl ((uint32_t) size); dm->type = htonl (type); @@ -890,7 +983,27 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid, dm->uid = GNUNET_htonll (0); dm->expiration = GNUNET_TIME_absolute_hton (expiration); dm->key = *key; - memcpy (&dm[1], data, size); + GNUNET_memcpy (&dm[1], + data, + size); + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for PUT\n"); + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# PUT requests executed"), + 1, + GNUNET_NO); process_queue (h); return qe; } @@ -904,56 +1017,54 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid, * @param h handle to the datastore * @param amount how much space (in bytes) should be reserved (for content only) * @param entries how many entries will be created (to calculate per-entry overhead) - * @param queue_priority ranking of this request in the priority queue - * @param max_queue_size at what queue size should this request be dropped - * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response (or before dying in queue) * @param cont continuation to call when done; "success" will be set to * a positive reservation value if space could be reserved. - * @param cont_cls closure for cont + * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel; note that even if NULL is returned, the callback will be invoked * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, - uint32_t entries, unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, +GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, + uint64_t amount, + uint32_t entries, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct ReserveMessage *rm; union QueueContext qc; - if (cont == NULL) + if (NULL == cont) cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to reserve %llu bytes of data and %u entries\n", - (unsigned long long) amount, (unsigned int) entries); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to reserve %llu bytes of data and %u entries\n", + (unsigned long long) amount, + (unsigned int) entries); + env = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); + rm->entries = htonl (entries); + rm->amount = GNUNET_htonll (amount); + qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority, - max_queue_size, timeout, &process_status_message, &qc); - if (qe == NULL) + qe = make_queue_entry (h, + env, + UINT_MAX, + UINT_MAX, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry to reserve\n"); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry to reserve\n"); return NULL; } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# RESERVE requests executed"), 1, + gettext_noop ("# RESERVE requests executed"), + 1, GNUNET_NO); - rm = (struct ReserveMessage *) &qe[1]; - rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); - rm->header.size = htons (sizeof (struct ReserveMessage)); - rm->entries = htonl (entries); - rm->amount = GNUNET_htonll (amount); process_queue (h); return qe; } @@ -973,116 +1084,51 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done - * @param cont_cls closure for cont + * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel; note that even if NULL is returned, the callback will be invoked * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, - uint32_t rid, unsigned int queue_priority, + uint32_t rid, + unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct ReleaseReserveMessage *rrm; union QueueContext qc; - if (cont == NULL) + if (NULL == cont) cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to release reserve %d\n", + rid); + env = GNUNET_MQ_msg (rrm, + GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); + rrm->rid = htonl (rid); qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, &qc); - if (qe == NULL) + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry to release reserve\n"); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry to release reserve\n"); return NULL; } GNUNET_STATISTICS_update (h->stats, gettext_noop ("# RELEASE RESERVE requests executed"), 1, GNUNET_NO); - rrm = (struct ReleaseReserveMessage *) &qe[1]; - rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); - rrm->header.size = htons (sizeof (struct ReleaseReserveMessage)); - rrm->rid = htonl (rid); - process_queue (h); - return qe; -} - - -/** - * Update a value in the datastore. - * - * @param h handle to the datastore - * @param uid identifier for the value - * @param priority how much to increase the priority of the value - * @param expiration new expiration value should be MAX of existing and this argument - * @param queue_priority ranking of this request in the priority queue - * @param max_queue_size at what queue size should this request be dropped - * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response - * @param cont continuation to call when done - * @param cont_cls closure for cont - * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) - */ -struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, - uint32_t priority, - struct GNUNET_TIME_Absolute expiration, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) -{ - struct GNUNET_DATASTORE_QueueEntry *qe; - struct UpdateMessage *um; - union QueueContext qc; - - if (cont == NULL) - cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to update entry %llu raising priority by %u and expiration to %llu\n", - uid, (unsigned int) priority, - (unsigned long long) expiration.abs_value); -#endif - qc.sc.cont = cont; - qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority, - max_queue_size, timeout, &process_status_message, &qc); - if (qe == NULL) - { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for UPDATE\n"); -#endif - return NULL; - } - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# UPDATE requests executed"), 1, - GNUNET_NO); - um = (struct UpdateMessage *) &qe[1]; - um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); - um->header.size = htons (sizeof (struct UpdateMessage)); - um->priority = htonl (priority); - um->expiration = GNUNET_TIME_absolute_hton (expiration); - um->uid = GNUNET_htonll (uid); process_queue (h); return qe; } @@ -1090,9 +1136,9 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, /** * Explicitly remove some content from the database. - * The "cont"inuation will be called with status - * "GNUNET_OK" if content was removed, "GNUNET_NO" - * if no matching entry was found and "GNUNET_SYSERR" + * The @a cont continuation will be called with `status` + * #GNUNET_OK" if content was removed, #GNUNET_NO + * if no matching entry was found and #GNUNET_SYSERR * on all other types of errors. * * @param h handle to the datastore @@ -1102,54 +1148,41 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param cont continuation to call when done - * @param cont_cls closure for cont + * @param cont_cls closure for @a cont * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel; note that even if NULL is returned, the callback will be invoked * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode * key, size_t size, - const void *data, unsigned int queue_priority, + const struct GNUNET_HashCode *key, + size_t size, + const void *data, + unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct DataMessage *dm; - size_t msize; + struct GNUNET_MQ_Envelope *env; union QueueContext qc; - if (cont == NULL) - cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to remove %u bytes under key `%s'\n", size, - GNUNET_h2s (key)); -#endif - qc.sc.cont = cont; - qc.sc.cont_cls = cont_cls; - msize = sizeof (struct DataMessage) + size; - GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, - &process_status_message, &qc); - if (qe == NULL) + if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for REMOVE\n"); -#endif + GNUNET_break (0); return NULL; } - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# REMOVE requests executed"), 1, - GNUNET_NO); - dm = (struct DataMessage *) &qe[1]; - dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); - dm->header.size = htons (msize); + if (NULL == cont) + cont = &drop_status_cont; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to remove %u bytes under key `%s'\n", + size, + GNUNET_h2s (key)); + env = GNUNET_MQ_msg_extra (dm, + size, + GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); dm->rid = htonl (0); dm->size = htonl (size); dm->type = htonl (0); @@ -1158,119 +1191,35 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, dm->uid = GNUNET_htonll (0); dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); dm->key = *key; - memcpy (&dm[1], data, size); - process_queue (h); - return qe; -} - + GNUNET_memcpy (&dm[1], + data, + size); -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_DATASTORE_Handle *h = cls; - struct GNUNET_DATASTORE_QueueEntry *qe; - struct ResultContext rc; - const struct DataMessage *dm; - int was_transmitted; + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; - if (msg == NULL) + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, + &qc); + if (NULL == qe) { - qe = h->queue_head; - GNUNET_assert (NULL != qe); - rc = qe->qc.rc; - was_transmitted = qe->was_transmitted; - free_queue_entry (qe); - if (was_transmitted == GNUNET_YES) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to receive response from database.\n")); - do_disconnect (h); - } - else - { - process_queue (h); - } - if (rc.proc != NULL) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; - } - if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) - { - GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)); - qe = h->queue_head; - rc = qe->qc.rc; - GNUNET_assert (GNUNET_YES == qe->was_transmitted); - free_queue_entry (qe); -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received end of result set, new queue size is %u\n", - h->queue_size); -#endif - if (rc.proc != NULL) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - h->retry_time.rel_value = 0; - h->result_count = 0; - process_queue (h); - return; - } - qe = h->queue_head; - GNUNET_assert (NULL != qe); - rc = qe->qc.rc; - if (GNUNET_YES != qe->was_transmitted) - { - GNUNET_break (0); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; - } - if ((ntohs (msg->size) < sizeof (struct DataMessage)) || - (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || - (ntohs (msg->size) != - sizeof (struct DataMessage) + - ntohl (((const struct DataMessage *) msg)->size))) - { - GNUNET_break (0); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, - 0); - return; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for REMOVE\n"); + return NULL; } - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1, + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# REMOVE requests executed"), + 1, GNUNET_NO); - dm = (const struct DataMessage *) msg; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received result %llu with type %u and size %u with key %s\n", - (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type), - ntohl (dm->size), GNUNET_h2s (&dm->key)); -#endif - free_queue_entry (qe); - h->retry_time.rel_value = 0; process_queue (h); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type), - ntohl (dm->priority), ntohl (dm->anonymity), - GNUNET_TIME_absolute_ntoh (dm->expiration), - GNUNET_ntohll (dm->uid)); + return qe; } + /** * Get a random value from the datastore for content replication. * Returns a single, random value among those with the highest @@ -1282,11 +1231,10 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param proc function to call on a random value; it * will be called once with a value (if available) * and always once with a value of NULL. - * @param proc_cls closure for proc + * @param proc_cls closure for @a proc * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel */ @@ -1294,40 +1242,37 @@ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct GNUNET_MessageHeader *m; union QueueContext qc; GNUNET_assert (NULL != proc); -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get replication entry in %llu ms\n", - (unsigned long long) timeout.rel_value); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to get replication entry\n"); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; - qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader), - queue_priority, max_queue_size, timeout, - &process_result_message, &qc); - if (qe == NULL) + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + &qc); + if (NULL == qe) { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for GET REPLICATION\n"); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for GET REPLICATION\n"); return NULL; } GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET REPLICATION requests executed"), 1, GNUNET_NO); - m = (struct GNUNET_MessageHeader *) &qe[1]; - m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); - m->size = htons (sizeof (struct GNUNET_MessageHeader)); process_queue (h); return qe; } @@ -1344,12 +1289,11 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param type allowed type for the operation (never zero) * @param proc function to call on a random value; it * will be called once with a value (if available) * or with NULL if none value exists. - * @param proc_cls closure for proc + * @param proc_cls closure for @a proc * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel */ @@ -1358,45 +1302,43 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, enum GNUNET_BLOCK_Type type, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; struct GetZeroAnonymityMessage *m; union QueueContext qc; GNUNET_assert (NULL != proc); GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", - (unsigned long long) offset, type, - (unsigned long long) timeout.rel_value); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to get %llu-th zero-anonymity entry of type %d\n", + (unsigned long long) offset, + type); + env = GNUNET_MQ_msg (m, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); + m->type = htonl ((uint32_t) type); + m->offset = GNUNET_htonll (offset); qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; - qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage), - queue_priority, max_queue_size, timeout, - &process_result_message, &qc); - if (qe == NULL) + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + &qc); + if (NULL == qe) { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for zero-anonymity procation\n"); -#endif + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for zero-anonymity procation\n"); return NULL; } GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET ZERO ANONYMITY requests executed"), 1, GNUNET_NO); - m = (struct GetZeroAnonymityMessage *) &qe[1]; - m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); - m->header.size = htons (sizeof (struct GetZeroAnonymityMessage)); - m->type = htonl ((uint32_t) type); - m->offset = GNUNET_htonll (offset); process_queue (h); return qe; } @@ -1416,60 +1358,69 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) - * @param timeout how long to wait at most for a response * @param proc function to call on each matching value; * will be called once with a NULL value at the end - * @param proc_cls closure for proc + * @param proc_cls closure for @a proc * @return NULL if the entry was not queued, otherwise a handle that can be used to * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, - const GNUNET_HashCode * key, +GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, + uint64_t offset, + const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MQ_Envelope *env; + struct GetKeyMessage *gkm; struct GetMessage *gm; union QueueContext qc; GNUNET_assert (NULL != proc); -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to look for data of type %u under key `%s'\n", - (unsigned int) type, GNUNET_h2s (key)); -#endif - qc.rc.proc = proc; - qc.rc.proc_cls = proc_cls; - qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority, - max_queue_size, timeout, &process_result_message, &qc); - if (qe == NULL) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to look for data of type %u under key `%s'\n", + (unsigned int) type, + GNUNET_h2s (key)); + if (NULL == key) { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n", - GNUNET_h2s (key)); -#endif - return NULL; + env = GNUNET_MQ_msg (gm, + GNUNET_MESSAGE_TYPE_DATASTORE_GET); + gm->type = htonl (type); + gm->offset = GNUNET_htonll (offset); } - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"), - 1, GNUNET_NO); - gm = (struct GetMessage *) &qe[1]; - gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET); - gm->type = htonl (type); - gm->offset = GNUNET_htonll (offset); - if (key != NULL) + else { - gm->header.size = htons (sizeof (struct GetMessage)); - gm->key = *key; + env = GNUNET_MQ_msg (gkm, + GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); + gkm->type = htonl (type); + gkm->offset = GNUNET_htonll (offset); + gkm->key = *key; } - else + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; + qe = make_queue_entry (h, + env, + queue_priority, + max_queue_size, + GNUNET_MESSAGE_TYPE_DATASTORE_DATA, + &qc); + if (NULL == qe) { - gm->header.size = - htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not queue request for `%s'\n", + GNUNET_h2s (key)); + return NULL; } +#if INSANE_STATISTICS + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# GET requests executed"), + 1, + GNUNET_NO); +#endif process_queue (h); return qe; } @@ -1478,22 +1429,20 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, /** * Cancel a datastore operation. The final callback from the * operation must not have been done yet. - * + * * @param qe operation to cancel */ void GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) { - struct GNUNET_DATASTORE_Handle *h; + struct GNUNET_DATASTORE_Handle *h = qe->h; - GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); - h = qe->h; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Pending DATASTORE request %p cancelled (%d, %d)\n", qe, - qe->was_transmitted, h->queue_head == qe); -#endif - if (GNUNET_YES == qe->was_transmitted) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Pending DATASTORE request %p cancelled (%d, %d)\n", + qe, + NULL == qe->env, + h->queue_head == qe); + if (NULL == qe->env) { free_queue_entry (qe); h->skip_next_messages++;