X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdatastore%2Fdatastore_api.c;h=ff49c106e5a944c5548d9b7753e1c67ed574e9b6;hb=5746309cb4be2073d550ad7a6885e918631dbc38;hp=73783cfb8bdc744e7ee4b2fbb68eb39a1a2cf187;hpb=7bc466bbdb8b64cac68c5ee59eb7ab6b9d85c420;p=oweals%2Fgnunet.git diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 73783cfb8..ff49c106e 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -1,10 +1,10 @@ /* This file is part of GNUnet - (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors) + (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 2, or (at your + by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but @@ -20,16 +20,158 @@ /** * @file datastore/datastore_api.c - * @brief Management for the datastore for files stored on a GNUnet node + * @brief Management for the datastore for files stored on a GNUnet node. Implements + * a priority queue for requests (with timeouts). * @author Christian Grothoff */ #include "platform.h" +#include "gnunet_arm_service.h" +#include "gnunet_constants.h" #include "gnunet_datastore_service.h" +#include "gnunet_statistics_service.h" #include "datastore.h" /** - * Handle to the datastore service. Followed - * by 65536 bytes used for storing messages. + * If a client stopped asking for more results, how many more do + * we receive from the DB before killing the connection? Trade-off + * between re-doing TCP handshakes and (needlessly) receiving + * useless results. + */ +#define MAX_EXCESS_RESULTS 8 + +/** + * Context for processing status messages. + */ +struct StatusContext +{ + /** + * Continuation to call with the status. + */ + GNUNET_DATASTORE_ContinuationWithStatus cont; + + /** + * Closure for cont. + */ + void *cont_cls; + +}; + + +/** + * Context for processing result messages. + */ +struct ResultContext +{ + /** + * Function to call with the result. + */ + GNUNET_DATASTORE_DatumProcessor proc; + + /** + * Closure for proc. + */ + void *proc_cls; + +}; + + +/** + * Context for a queue operation. + */ +union QueueContext +{ + + struct StatusContext sc; + + struct ResultContext rc; + +}; + + + +/** + * Entry in our priority queue. + */ +struct GNUNET_DATASTORE_QueueEntry +{ + + /** + * This is a linked list. + */ + struct GNUNET_DATASTORE_QueueEntry *next; + + /** + * This is a linked list. + */ + struct GNUNET_DATASTORE_QueueEntry *prev; + + /** + * Handle to the master context. + */ + struct GNUNET_DATASTORE_Handle *h; + + /** + * Response processor (NULL if we are not waiting for a response). + * This struct should be used for the closure, function-specific + * arguments can be passed via 'qc'. + */ + GNUNET_CLIENT_MessageHandler response_proc; + + /** + * Function to call after transmission of the request. + */ + GNUNET_DATASTORE_ContinuationWithStatus cont; + + /** + * Closure for 'cont'. + */ + void *cont_cls; + + /** + * Context for the operation. + */ + union QueueContext qc; + + /** + * Task for timeout signalling. + */ + GNUNET_SCHEDULER_TaskIdentifier task; + + /** + * Timeout for the current operation. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Priority in the queue. + */ + unsigned int priority; + + /** + * Maximum allowed length of queue (otherwise + * this request should be discarded). + */ + unsigned int max_queue; + + /** + * Number of bytes in the request message following + * this struct. 32-bit value for nicer memory + * access (and overall struct alignment). + */ + uint32_t message_size; + + /** + * Has this message been transmitted to the service? + * Only ever GNUNET_YES for the head of the queue. + * Note that the overall struct should end at a + * multiple of 64 bits. + */ + int was_transmitted; + +}; + +/** + * Handle to the datastore service. */ struct GNUNET_DATASTORE_Handle { @@ -40,93 +182,118 @@ struct GNUNET_DATASTORE_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Our scheduler. + * Current connection to the datastore service. */ - struct GNUNET_SCHEDULER_Handle *sched; + struct GNUNET_CLIENT_Connection *client; /** - * Current connection to the datastore service. + * Handle for statistics. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_STATISTICS_Handle *stats; /** - * Current response processor (NULL if we are not waiting for a - * response). The specific type depends on the kind of message we - * just transmitted. + * Current transmit handle. */ - void *response_proc; - + struct GNUNET_CLIENT_TransmitHandle *th; + /** - * Closure for response_proc. + * Current head of priority queue. */ - void *response_proc_cls; + struct GNUNET_DATASTORE_QueueEntry *queue_head; /** - * Timeout for the current operation. + * Current tail of priority queue. */ - struct GNUNET_TIME_Absolute timeout; + struct GNUNET_DATASTORE_QueueEntry *queue_tail; + + /** + * Task for trying to reconnect. + */ + GNUNET_SCHEDULER_TaskIdentifier reconnect_task; + + /** + * How quickly should we retry? Used for exponential back-off on + * connect-errors. + */ + struct GNUNET_TIME_Relative retry_time; + + /** + * Number of entries in the queue. + */ + unsigned int queue_size; + + /** + * Number of results we're receiving for the current query + * after application stopped to care. Used to determine when + * to reset the connection. + */ + unsigned int result_count; + + /** + * Are we currently trying to receive from the service? + */ + int in_receive; /** - * Number of bytes in the message following - * this struct, 0 if we have no request pending. + * We should ignore the next message(s) from the service. */ - size_t message_size; + unsigned int skip_next_messages; }; + /** * Connect to the datastore service. * * @param cfg configuration to use - * @param sched scheduler to use * @return handle to use to access the service */ -struct GNUNET_DATASTORE_Handle *GNUNET_DATASTORE_connect (const struct - GNUNET_CONFIGURATION_Handle - *cfg, - struct - GNUNET_SCHEDULER_Handle - *sched) +struct GNUNET_DATASTORE_Handle * +GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) { struct GNUNET_CLIENT_Connection *c; struct GNUNET_DATASTORE_Handle *h; - - c = GNUNET_CLIENT_connect (sched, "datastore", cfg); + + c = GNUNET_CLIENT_connect ("datastore", cfg); if (c == NULL) - return NULL; /* oops */ - h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + - GNUNET_SERVER_MAX_MESSAGE_SIZE); + return NULL; /* oops */ + h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) + + GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); h->client = c; h->cfg = cfg; - h->sched = sched; + h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); return h; } /** * Transmit DROP message to datastore service. + * + * @param cls the 'struct GNUNET_DATASTORE_Handle' + * @param size number of bytes that can be copied to buf + * @param buf where to copy the drop message + * @return number of bytes written to buf */ static size_t -transmit_drop (void *cls, - size_t size, void *buf) +transmit_drop (void *cls, size_t size, void *buf) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_MessageHeader *hdr; - + if (buf == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to drop database.\n")); - GNUNET_DATASTORE_disconnect (h, GNUNET_NO); - return 0; - } - GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader)); + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to transmit request to drop database.\n")); + GNUNET_DATASTORE_disconnect (h, GNUNET_NO); + return 0; + } + GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); hdr = buf; - hdr->size = htons(sizeof(struct GNUNET_MessageHeader)); - hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP); + hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); + hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP); GNUNET_DATASTORE_disconnect (h, GNUNET_NO); - return sizeof(struct GNUNET_MessageHeader); + return sizeof (struct GNUNET_MessageHeader); } @@ -137,195 +304,510 @@ transmit_drop (void *cls, * @param h handle to the datastore * @param drop set to GNUNET_YES to delete all data in datastore (!) */ -void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, - int drop) +void +GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop) { - GNUNET_assert (0 == h->message_size); - GNUNET_assert (NULL == h->response_proc); - if ( (GNUNET_YES == drop) && - (h->client != NULL) ) + struct GNUNET_DATASTORE_QueueEntry *qe; + +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n"); +#endif + if (NULL != h->th) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); + h->th = NULL; + } + if (h->client != NULL) + { + GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + h->client = NULL; + } + if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->reconnect_task); + h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; + } + while (NULL != (qe = h->queue_head)) + { + GNUNET_assert (NULL != qe->response_proc); + qe->response_proc (h, NULL); + } + if (GNUNET_YES == drop) + { + h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); + if (h->client != NULL) { - if (NULL != - GNUNET_CLIENT_notify_transmit_ready (h->client, - sizeof(struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_MINUTES, - &transmit_drop, - h)) - return; - GNUNET_break (0); + if (NULL != + GNUNET_CLIENT_notify_transmit_ready (h->client, + sizeof (struct + GNUNET_MessageHeader), + GNUNET_TIME_UNIT_MINUTES, + GNUNET_YES, &transmit_drop, h)) + return; + GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + h->client = NULL; } - if (h->client != NULL) - GNUNET_CLIENT_disconnect (h->client); + GNUNET_break (0); + } + GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO); + h->stats = NULL; GNUNET_free (h); } /** - * Type of a function to call when we receive a message - * from the service. This specific function is used - * to handle messages of type "struct StatusMessage". + * A request has timed out (before being transmitted to the service). * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error + * @param cls the 'struct GNUNET_DATASTORE_QueueEntry' + * @param tc scheduler context */ -static void -with_status_response_handler (void *cls, - const struct - GNUNET_MessageHeader * msg) +static void +timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct GNUNET_DATASTORE_Handle *h = cls; - GNUNET_DATASTORE_ContinuationWithStatus cont = h->response_proc; - const struct StatusMessage *sm; - const char *emsg; - int status; + struct GNUNET_DATASTORE_QueueEntry *qe = cls; - if (msg == NULL) - { - h->response_proc = NULL; - GNUNET_CLIENT_disconnect (h->client); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - cont (h->response_proc_cls, - GNUNET_SYSERR, - _("Timeout trying to read response from datastore service\n")); - return; - } - if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || - (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) - { - GNUNET_break (0); - h->response_proc = NULL; - GNUNET_CLIENT_disconnect (h->client); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - cont (h->response_proc_cls, - GNUNET_SYSERR, - _("Error reading response from datastore service\n")); - return; - } - sm = (const struct StatusMessage*) msg; - status = ntohl(sm->status); - emsg = NULL; - if (ntohs(msg->size) > sizeof(struct StatusMessage)) - { - emsg = (const char*) &sm[1]; - if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0') - { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); - } - } - if ( (status == GNUNET_SYSERR) && - (emsg == NULL) ) + GNUNET_STATISTICS_update (qe->h->stats, + gettext_noop ("# queue entry timeouts"), 1, + GNUNET_NO); + qe->task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_assert (qe->was_transmitted == GNUNET_NO); +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout of request in datastore queue\n"); +#endif + qe->response_proc (qe->h, NULL); +} + + +/** + * Create a new entry for our priority queue (and possibly discard other entires if + * the queue is getting too long). + * + * @param h handle to the datastore + * @param msize size of the message to queue + * @param queue_priority priority of the entry + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout timeout for the operation + * @param response_proc function to call with replies (can be NULL) + * @param qc client context (NOT a closure for response_proc) + * @return NULL if the queue is full + */ +static struct GNUNET_DATASTORE_QueueEntry * +make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize, + unsigned int queue_priority, unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_CLIENT_MessageHandler response_proc, + const union QueueContext *qc) +{ + struct GNUNET_DATASTORE_QueueEntry *ret; + struct GNUNET_DATASTORE_QueueEntry *pos; + unsigned int c; + + c = 0; + pos = h->queue_head; + while ((pos != NULL) && (c < max_queue_size) && + (pos->priority >= queue_priority)) + { + c++; + pos = pos->next; + } + if (c >= max_queue_size) + { + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1, + GNUNET_NO); + return NULL; + } + ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); + ret->h = h; + ret->response_proc = response_proc; + ret->qc = *qc; + ret->timeout = GNUNET_TIME_relative_to_absolute (timeout); + ret->priority = queue_priority; + ret->max_queue = max_queue_size; + ret->message_size = msize; + ret->was_transmitted = GNUNET_NO; + if (pos == NULL) + { + /* append at the tail */ + pos = h->queue_tail; + } + else + { + pos = pos->prev; + /* do not insert at HEAD if HEAD query was already + * transmitted and we are still receiving replies! */ + if ((pos == NULL) && (h->queue_head->was_transmitted)) + pos = h->queue_head; + } + c++; + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"), + 1, GNUNET_NO); + GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret); + h->queue_size++; + ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret); + pos = ret->next; + while (pos != NULL) + { + if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO)) { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); + GNUNET_assert (pos->response_proc != NULL); + /* move 'pos' element to head so that it will be + * killed on 'NULL' call below */ +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropping request from datastore queue\n"); +#endif + GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos); + GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos); + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# Requests dropped from datastore queue"), 1, + GNUNET_NO); + GNUNET_assert (h->queue_head == pos); + pos->response_proc (h, NULL); + break; } - h->response_proc = NULL; + pos = pos->next; + } + return ret; +} + + +/** + * Process entries in the queue (or do nothing if we are already + * doing so). + * + * @param h handle to the datastore + */ +static void process_queue (struct GNUNET_DATASTORE_Handle *h); + + +/** + * Try reconnecting to the datastore service. + * + * @param cls the 'struct GNUNET_DATASTORE_Handle' + * @param tc scheduler context + */ +static void +try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + + if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value) + h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY; + else + h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2); + if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value) + h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT; + h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; + h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); + if (h->client == NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "DATASTORE reconnect failed (fatally)\n"); + return; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# datastore connections (re)created"), 1, + GNUNET_NO); #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received status %d/%s\n", - status, - emsg); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); #endif - cont (h->response_proc_cls, - status, - emsg); + process_queue (h); } /** - * Transmit message to datastore service and then - * read a status message. + * Disconnect from the service and then try reconnecting to the datastore service + * after some delay. * - * @param cls closure with handle to datastore - * @param size number of bytes we can transmit at most - * @param buf where to write transmission, NULL on - * timeout - * @return number of bytes copied to buf + * @param h handle to datastore to disconnect and reconnect + */ +static void +do_disconnect (struct GNUNET_DATASTORE_Handle *h) +{ + if (h->client == NULL) + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "client NULL in disconnect, will not try to reconnect\n"); +#endif + return; + } +#if 0 + GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"), + 1, GNUNET_NO); +#endif + GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + h->skip_next_messages = 0; + h->client = NULL; + h->reconnect_task = + GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h); +} + + +/** + * Function called whenever we receive a message from + * the service. Calls the appropriate handler. + * + * @param cls the 'struct GNUNET_DATASTORE_Handle' + * @param msg the received message + */ +static void +receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + + h->in_receive = GNUNET_NO; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n"); +#endif + if (h->skip_next_messages > 0) + { + h->skip_next_messages--; + process_queue (h); + return; + } + if (NULL == (qe = h->queue_head)) + { + GNUNET_break (0); + process_queue (h); + return; + } + qe->response_proc (h, msg); +} + + +/** + * Transmit request from queue to datastore service. + * + * @param cls the 'struct GNUNET_DATASTORE_Handle' + * @param size number of bytes that can be copied to buf + * @param buf where to copy the drop message + * @return number of bytes written to buf */ static size_t -transmit_get_status (void *cls, - size_t size, - void *buf) +transmit_request (void *cls, size_t size, void *buf) { struct GNUNET_DATASTORE_Handle *h = cls; - GNUNET_DATASTORE_ContinuationWithStatus cont = h->response_proc; - uint16_t msize; + struct GNUNET_DATASTORE_QueueEntry *qe; + size_t msize; + h->th = NULL; + if (NULL == (qe = h->queue_head)) + return 0; /* no entry in queue */ if (buf == NULL) - { - h->message_size = 0; - h->response_proc = NULL; - cont (h->response_proc_cls, - GNUNET_SYSERR, - gettext_noop ("Error transmitting message to datastore service.\n")); - return 0; - } - msize = h->message_size; - GNUNET_assert (msize <= size); - memcpy (buf, &h[1], msize); + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to transmit request to DATASTORE.\n")); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# transmission request failures"), + 1, GNUNET_NO); + do_disconnect (h); + return 0; + } + if (size < (msize = qe->message_size)) + { + process_queue (h); + return 0; + } #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitted %u byte message to datastore service, now waiting for status.\n", - msize); -#endif - h->message_size = 0; - GNUNET_CLIENT_receive (h->client, - &with_status_response_handler, - h, - GNUNET_TIME_absolute_get_remaining (h->timeout)); + "Transmitting %u byte request to DATASTORE\n", msize); +#endif + memcpy (buf, &qe[1], msize); + qe->was_transmitted = GNUNET_YES; + GNUNET_SCHEDULER_cancel (qe->task); + qe->task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_assert (GNUNET_NO == h->in_receive); + h->in_receive = GNUNET_YES; + GNUNET_CLIENT_receive (h->client, &receive_cb, h, + GNUNET_TIME_absolute_get_remaining (qe->timeout)); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# bytes sent to datastore"), 1, + GNUNET_NO); return msize; } /** - * Helper function that will initiate the - * transmission of a message to the datastore - * service. The message must already be prepared - * and stored in the buffer at the end of the - * handle. The message must be of a type that - * expects a "StatusMessage" in response. - * - * @param h handle to the service with prepared message - * @param cont function to call with result - * @param cont_cls closure - * @param timeout timeout for the operation + * Process entries in the queue (or do nothing if we are already + * doing so). + * + * @param h handle to the datastore */ static void -transmit_for_status (struct GNUNET_DATASTORE_Handle *h, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) +process_queue (struct GNUNET_DATASTORE_Handle *h) { - const struct GNUNET_MessageHeader *hdr; - uint16_t msize; + struct GNUNET_DATASTORE_QueueEntry *qe; - GNUNET_assert (cont != NULL); - hdr = (const struct GNUNET_MessageHeader*) &h[1]; - msize = ntohs(hdr->size); + if (NULL == (qe = h->queue_head)) + { +#if DEBUG_DATASTORE > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n"); +#endif + return; /* no entry in queue */ + } + if (qe->was_transmitted == GNUNET_YES) + { +#if DEBUG_DATASTORE > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n"); +#endif + return; /* waiting for replies */ + } + if (h->th != NULL) + { +#if DEBUG_DATASTORE > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n"); +#endif + return; /* request pending */ + } + if (h->client == NULL) + { +#if DEBUG_DATASTORE > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n"); +#endif + return; /* waiting for reconnect */ + } + if (GNUNET_YES == h->in_receive) + { + /* wait for response to previous query */ + return; + } #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u byte message of type %u to datastore service\n", - msize, - ntohs(hdr->type)); -#endif - GNUNET_assert (h->response_proc == NULL); - h->response_proc = cont; - h->response_proc_cls = cont_cls; - h->timeout = GNUNET_TIME_relative_to_absolute (timeout); - h->message_size = msize; - if (NULL == GNUNET_CLIENT_notify_transmit_ready (h->client, - msize, - timeout, - &transmit_get_status, - h)) + "Queueing %u byte request to DATASTORE\n", qe->message_size); +#endif + h->th = + GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, + GNUNET_TIME_absolute_get_remaining + (qe->timeout), GNUNET_YES, + &transmit_request, h); + GNUNET_assert (GNUNET_NO == h->in_receive); + GNUNET_break (NULL != h->th); +} + + +/** + * Dummy continuation used to do nothing (but be non-zero). + * + * @param cls closure + * @param result result + * @param emsg error message + */ +static void +drop_status_cont (void *cls, int32_t result, const char *emsg) +{ + /* do nothing */ +} + + +/** + * Free a queue entry. Removes the given entry from the + * queue and releases associated resources. Does NOT + * call the callback. + * + * @param qe entry to free. + */ +static void +free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) +{ + struct GNUNET_DATASTORE_Handle *h = qe->h; + + GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe); + if (qe->task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (qe->task); + qe->task = GNUNET_SCHEDULER_NO_TASK; + } + h->queue_size--; + qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ + GNUNET_free (qe); +} + + +/** + * Type of a function to call when we receive a message + * from the service. + * + * @param cls closure + * @param msg message received, NULL on timeout or fatal error + */ +static void +process_status_message (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct StatusContext rc; + const struct StatusMessage *sm; + const char *emsg; + int32_t status; + int was_transmitted; + + if (NULL == (qe = h->queue_head)) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + rc = qe->qc.sc; + if (msg == NULL) + { + was_transmitted = qe->was_transmitted; + free_queue_entry (qe); + if (was_transmitted == GNUNET_YES) + do_disconnect (h); + else + process_queue (h); + if (rc.cont != NULL) + rc.cont (rc.cont_cls, GNUNET_SYSERR, + _("Failed to receive status response from database.")); + return; + } + GNUNET_assert (GNUNET_YES == qe->was_transmitted); + free_queue_entry (qe); + if ((ntohs (msg->size) < sizeof (struct StatusMessage)) || + (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS)) + { + GNUNET_break (0); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + if (rc.cont != NULL) + rc.cont (rc.cont_cls, GNUNET_SYSERR, + _("Error reading response from datastore service")); + return; + } + sm = (const struct StatusMessage *) msg; + status = ntohl (sm->status); + emsg = NULL; + if (ntohs (msg->size) > sizeof (struct StatusMessage)) + { + emsg = (const char *) &sm[1]; + if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0') { GNUNET_break (0); - h->response_proc = NULL; - h->message_size = 0; - cont (cont_cls, - GNUNET_SYSERR, - gettext_noop ("Not ready to transmit request to datastore service")); + emsg = _("Invalid error message received from datastore service"); } + } + if ((status == GNUNET_SYSERR) && (emsg == NULL)) + { + GNUNET_break (0); + emsg = _("Invalid error message received from datastore service"); + } +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, + emsg); +#endif + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# status messages received"), 1, + GNUNET_NO); + h->retry_time.rel_value = 0; + process_queue (h); + if (rc.cont != NULL) + rc.cont (rc.cont_cls, status, emsg); } @@ -335,54 +817,81 @@ transmit_for_status (struct GNUNET_DATASTORE_Handle *h, * lower anonymity level is used. * * @param h handle to the datastore + * @param rid reservation ID to use (from "reserve"); use 0 if no + * prior reservation was made * @param key key for the value * @param size number of bytes in data * @param data content stored * @param type type of the content * @param priority priority of the content * @param anonymity anonymity-level for the content + * @param replication how often should the content be replicated to other peers? * @param expiration expiration time for the content + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) * @param timeout timeout for the operation * @param cont continuation to call when done * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void -GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, - int rid, - const GNUNET_HashCode * key, - uint32_t size, - const void *data, - uint32_t type, - uint32_t priority, - uint32_t anonymity, +struct GNUNET_DATASTORE_QueueEntry * +GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid, + const GNUNET_HashCode * key, size_t size, + const void *data, enum GNUNET_BLOCK_Type type, + uint32_t priority, uint32_t anonymity, + uint32_t replication, struct GNUNET_TIME_Absolute expiration, + unsigned int queue_priority, unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { + struct GNUNET_DATASTORE_QueueEntry *qe; struct DataMessage *dm; size_t msize; + union QueueContext qc; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to put %u bytes of data\n", - size); -#endif - msize = sizeof(struct DataMessage) + size; - GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - dm = (struct DataMessage*) &h[1]; - dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); - dm->header.size = htons(msize); - dm->rid = htonl(rid); - dm->size = htonl(size); - dm->type = htonl(type); - dm->priority = htonl(priority); - dm->anonymity = htonl(anonymity); - dm->uid = GNUNET_htonll(0); - dm->expiration = GNUNET_TIME_absolute_hton(expiration); + "Asked to put %u bytes of data under key `%s' for %llu ms\n", + size, GNUNET_h2s (key), + GNUNET_TIME_absolute_get_remaining (expiration).rel_value); +#endif + msize = sizeof (struct DataMessage) + size; + GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, + &process_status_message, &qc); + if (qe == NULL) + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for PUT\n"); +#endif + return NULL; + } + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"), + 1, GNUNET_NO); + dm = (struct DataMessage *) &qe[1]; + dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT); + dm->header.size = htons (msize); + dm->rid = htonl (rid); + dm->size = htonl ((uint32_t) size); + dm->type = htonl (type); + dm->priority = htonl (priority); + dm->anonymity = htonl (anonymity); + dm->replication = htonl (replication); + dm->reserved = htonl (0); + dm->uid = GNUNET_htonll (0); + dm->expiration = GNUNET_TIME_absolute_hton (expiration); dm->key = *key; memcpy (&dm[1], data, size); - transmit_for_status (h, cont, cont_cls, timeout); + process_queue (h); + return qe; } @@ -394,27 +903,58 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * @param h handle to the datastore * @param amount how much space (in bytes) should be reserved (for content only) * @param entries how many entries will be created (to calculate per-entry overhead) + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response (or before dying in queue) * @param cont continuation to call when done; "success" will be set to * a positive reservation value if space could be reserved. * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void -GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, - uint64_t amount, - uint32_t entries, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) +struct GNUNET_DATASTORE_QueueEntry * +GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, + uint32_t entries, unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { + struct GNUNET_DATASTORE_QueueEntry *qe; struct ReserveMessage *rm; + union QueueContext qc; - rm = (struct ReserveMessage*) &h[1]; - rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); - rm->header.size = htons(sizeof (struct ReserveMessage)); - rm->entries = htonl(entries); - rm->amount = GNUNET_htonll(amount); - transmit_for_status (h, cont, cont_cls, timeout); + if (cont == NULL) + cont = &drop_status_cont; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asked to reserve %llu bytes of data and %u entries\n", + (unsigned long long) amount, (unsigned int) entries); +#endif + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority, + max_queue_size, timeout, &process_status_message, &qc); + if (qe == NULL) + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry to reserve\n"); +#endif + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# RESERVE requests executed"), 1, + GNUNET_NO); + rm = (struct ReserveMessage *) &qe[1]; + rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); + rm->header.size = htons (sizeof (struct ReserveMessage)); + rm->entries = htonl (entries); + rm->amount = GNUNET_htonll (amount); + process_queue (h); + return qe; } @@ -426,24 +966,59 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, * @param h handle to the datastore * @param rid reservation ID (value of "success" in original continuation * from the "reserve" function). + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void +struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, - int rid, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) + uint32_t rid, unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { + struct GNUNET_DATASTORE_QueueEntry *qe; struct ReleaseReserveMessage *rrm; + union QueueContext qc; - rrm = (struct ReleaseReserveMessage*) &h[1]; - rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); - rrm->header.size = htons(sizeof (struct ReleaseReserveMessage)); - rrm->rid = htonl(rid); - transmit_for_status (h, cont, cont_cls, timeout); + if (cont == NULL) + cont = &drop_status_cont; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid); +#endif + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage), + queue_priority, max_queue_size, timeout, + &process_status_message, &qc); + if (qe == NULL) + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry to release reserve\n"); +#endif + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# RELEASE RESERVE requests executed"), 1, + GNUNET_NO); + rrm = (struct ReleaseReserveMessage *) &qe[1]; + rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); + rrm->header.size = htons (sizeof (struct ReleaseReserveMessage)); + rrm->rid = htonl (rid); + process_queue (h); + return qe; } @@ -454,315 +1029,477 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, * @param uid identifier for the value * @param priority how much to increase the priority of the value * @param expiration new expiration value should be MAX of existing and this argument + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) */ -void -GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, - unsigned long long uid, - uint32_t priority, - struct GNUNET_TIME_Absolute expiration, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) +struct GNUNET_DATASTORE_QueueEntry * +GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, + uint32_t priority, + struct GNUNET_TIME_Absolute expiration, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { + struct GNUNET_DATASTORE_QueueEntry *qe; struct UpdateMessage *um; + union QueueContext qc; - um = (struct UpdateMessage*) &h[1]; - um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); - um->header.size = htons(sizeof (struct UpdateMessage)); - um->priority = htonl(priority); - um->expiration = GNUNET_TIME_absolute_hton(expiration); - um->uid = GNUNET_htonll(uid); - transmit_for_status (h, cont, cont_cls, timeout); + if (cont == NULL) + cont = &drop_status_cont; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asked to update entry %llu raising priority by %u and expiration to %llu\n", + uid, (unsigned int) priority, + (unsigned long long) expiration.abs_value); +#endif + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority, + max_queue_size, timeout, &process_status_message, &qc); + if (qe == NULL) + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for UPDATE\n"); +#endif + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# UPDATE requests executed"), 1, + GNUNET_NO); + um = (struct UpdateMessage *) &qe[1]; + um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); + um->header.size = htons (sizeof (struct UpdateMessage)); + um->priority = htonl (priority); + um->expiration = GNUNET_TIME_absolute_hton (expiration); + um->uid = GNUNET_htonll (uid); + process_queue (h); + return qe; } +/** + * Explicitly remove some content from the database. + * The "cont"inuation will be called with status + * "GNUNET_OK" if content was removed, "GNUNET_NO" + * if no matching entry was found and "GNUNET_SYSERR" + * on all other types of errors. + * + * @param h handle to the datastore + * @param key key for the value + * @param size number of bytes in data + * @param data content stored + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response + * @param cont continuation to call when done + * @param cont_cls closure for cont + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel; note that even if NULL is returned, the callback will be invoked + * (or rather, will already have been invoked) + */ +struct GNUNET_DATASTORE_QueueEntry * +GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, + const GNUNET_HashCode * key, size_t size, + const void *data, unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) +{ + struct GNUNET_DATASTORE_QueueEntry *qe; + struct DataMessage *dm; + size_t msize; + union QueueContext qc; + + if (cont == NULL) + cont = &drop_status_cont; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asked to remove %u bytes under key `%s'\n", size, + GNUNET_h2s (key)); +#endif + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + msize = sizeof (struct DataMessage) + size; + GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); + qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, + &process_status_message, &qc); + if (qe == NULL) + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for REMOVE\n"); +#endif + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# REMOVE requests executed"), 1, + GNUNET_NO); + dm = (struct DataMessage *) &qe[1]; + dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); + dm->header.size = htons (msize); + dm->rid = htonl (0); + dm->size = htonl (size); + dm->type = htonl (0); + dm->priority = htonl (0); + dm->anonymity = htonl (0); + dm->uid = GNUNET_htonll (0); + dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); + dm->key = *key; + memcpy (&dm[1], data, size); + process_queue (h); + return qe; +} /** * Type of a function to call when we receive a message - * from the service. This specific function is used - * to handle messages of type "struct DataMessage". + * from the service. * * @param cls closure * @param msg message received, NULL on timeout or fatal error */ -static void -with_result_response_handler (void *cls, - const struct - GNUNET_MessageHeader * msg) +static void +process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) { - static struct GNUNET_TIME_Absolute zero; struct GNUNET_DATASTORE_Handle *h = cls; - GNUNET_DATASTORE_Iterator cont = h->response_proc; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct ResultContext rc; const struct DataMessage *dm; - size_t msize; + int was_transmitted; if (msg == NULL) + { + qe = h->queue_head; + GNUNET_assert (NULL != qe); + rc = qe->qc.rc; + was_transmitted = qe->was_transmitted; + free_queue_entry (qe); + if (was_transmitted == GNUNET_YES) { - h->response_proc = NULL; - GNUNET_CLIENT_disconnect (h->client); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - cont (h->response_proc_cls, - NULL, 0, NULL, 0, 0, 0, zero, 0); - return; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to receive response from database.\n")); + do_disconnect (h); } - if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) + else { - GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); - h->response_proc = NULL; + process_queue (h); + } + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + return; + } + if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) + { + GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)); + qe = h->queue_head; + rc = qe->qc.rc; + GNUNET_assert (GNUNET_YES == qe->was_transmitted); + free_queue_entry (qe); #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received end of result set\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received end of result set, new queue size is %u\n", + h->queue_size); #endif - cont (h->response_proc_cls, - NULL, 0, NULL, 0, 0, 0, zero, 0); - return; - } - if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || - (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ) - { - GNUNET_break (0); - GNUNET_CLIENT_disconnect (h->client); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - h->response_proc = NULL; - cont (h->response_proc_cls, - NULL, 0, NULL, 0, 0, 0, zero, 0); - return; - } - dm = (const struct DataMessage*) msg; - msize = ntohl(dm->size); - if (ntohs(msg->size) != msize + sizeof(struct DataMessage)) - { - GNUNET_break (0); - GNUNET_CLIENT_disconnect (h->client); - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - h->response_proc = NULL; - cont (h->response_proc_cls, - NULL, 0, NULL, 0, 0, 0, zero, 0); - return; - } + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + h->retry_time.rel_value = 0; + h->result_count = 0; + process_queue (h); + return; + } + qe = h->queue_head; + GNUNET_assert (NULL != qe); + rc = qe->qc.rc; + if (GNUNET_YES != qe->was_transmitted) + { + GNUNET_break (0); + free_queue_entry (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + return; + } + if ((ntohs (msg->size) < sizeof (struct DataMessage)) || + (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || + (ntohs (msg->size) != + sizeof (struct DataMessage) + + ntohl (((const struct DataMessage *) msg)->size))) + { + GNUNET_break (0); + free_queue_entry (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + return; + } + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1, + GNUNET_NO); + dm = (const struct DataMessage *) msg; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received result %llu with type %u and size %u with key %s\n", - (unsigned long long) GNUNET_ntohll(dm->uid), - ntohl(dm->type), - msize, - GNUNET_h2s(&dm->key)); -#endif - cont (h->response_proc_cls, - &dm->key, - msize, - &dm[1], - ntohl(dm->type), - ntohl(dm->priority), - ntohl(dm->anonymity), - GNUNET_TIME_absolute_ntoh(dm->expiration), - GNUNET_ntohll(dm->uid)); - GNUNET_CLIENT_receive (h->client, - &with_result_response_handler, - h, - GNUNET_TIME_absolute_get_remaining (h->timeout)); + "Received result %llu with type %u and size %u with key %s\n", + (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type), + ntohl (dm->size), GNUNET_h2s (&dm->key)); +#endif + free_queue_entry (qe); + h->retry_time.rel_value = 0; + process_queue (h); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type), + ntohl (dm->priority), ntohl (dm->anonymity), + GNUNET_TIME_absolute_ntoh (dm->expiration), + GNUNET_ntohll (dm->uid)); } /** - * Transmit message to datastore service and then - * read a result message. + * Get a random value from the datastore for content replication. + * Returns a single, random value among those with the highest + * replication score, lowering positive replication scores by one for + * the chosen value (if only content with a replication score exists, + * a random value is returned and replication scores are not changed). * - * @param cls closure with handle to datastore - * @param size number of bytes we can transmit at most - * @param buf where to write transmission, NULL on - * timeout - * @return number of bytes copied to buf + * @param h handle to the datastore + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response + * @param proc function to call on a random value; it + * will be called once with a value (if available) + * and always once with a value of NULL. + * @param proc_cls closure for proc + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel */ -static size_t -transmit_get_result (void *cls, - size_t size, - void *buf) +struct GNUNET_DATASTORE_QueueEntry * +GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { - struct GNUNET_DATASTORE_Handle *h = cls; - GNUNET_DATASTORE_ContinuationWithStatus cont = h->response_proc; - uint16_t msize; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct GNUNET_MessageHeader *m; + union QueueContext qc; - if (buf == NULL) - { - h->response_proc = NULL; - h->message_size = 0; - cont (h->response_proc_cls, - GNUNET_SYSERR, - gettext_noop ("Error transmitting message to datastore service.\n")); - return 0; - } - msize = h->message_size; - GNUNET_assert (msize <= size); - memcpy (buf, &h[1], msize); + GNUNET_assert (NULL != proc); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitted %u byte message to datastore service, now waiting for result.\n", - msize); -#endif - h->message_size = 0; - GNUNET_CLIENT_receive (h->client, - &with_result_response_handler, - h, - GNUNET_TIME_absolute_get_remaining (h->timeout)); - return msize; + "Asked to get replication entry in %llu ms\n", + (unsigned long long) timeout.rel_value); +#endif + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; + qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader), + queue_priority, max_queue_size, timeout, + &process_result_message, &qc); + if (qe == NULL) + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for GET REPLICATION\n"); +#endif + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# GET REPLICATION requests executed"), 1, + GNUNET_NO); + m = (struct GNUNET_MessageHeader *) &qe[1]; + m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); + m->size = htons (sizeof (struct GNUNET_MessageHeader)); + process_queue (h); + return qe; } /** - * Helper function that will initiate the - * transmission of a message to the datastore - * service. The message must already be prepared - * and stored in the buffer at the end of the - * handle. The message must be of a type that - * expects a "DataMessage" in response. + * Get a single zero-anonymity value from the datastore. * - * @param h handle to the service with prepared message - * @param cont function to call with result - * @param cont_cls closure - * @param timeout timeout for the operation + * @param h handle to the datastore + * @param offset offset of the result (modulo num-results); set to + * a random 64-bit value initially; then increment by + * one each time; detect that all results have been found by uid + * being again the first uid ever returned. + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response + * @param type allowed type for the operation (never zero) + * @param proc function to call on a random value; it + * will be called once with a value (if available) + * or with NULL if none value exists. + * @param proc_cls closure for proc + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel */ -static void -transmit_for_result (struct GNUNET_DATASTORE_Handle *h, - GNUNET_DATASTORE_Iterator cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) +struct GNUNET_DATASTORE_QueueEntry * +GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, + uint64_t offset, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + enum GNUNET_BLOCK_Type type, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { - static struct GNUNET_TIME_Absolute zero; - const struct GNUNET_MessageHeader *hdr; - uint16_t msize; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct GetZeroAnonymityMessage *m; + union QueueContext qc; - GNUNET_assert (cont != NULL); - hdr = (const struct GNUNET_MessageHeader*) &h[1]; - msize = ntohs(hdr->size); + GNUNET_assert (NULL != proc); + GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u byte message of type %u to datastore service\n", - msize, - ntohs(hdr->type)); -#endif - GNUNET_assert (h->response_proc == NULL); - h->response_proc = cont; - h->response_proc_cls = cont_cls; - h->timeout = GNUNET_TIME_relative_to_absolute (timeout); - h->message_size = msize; - if (NULL == GNUNET_CLIENT_notify_transmit_ready (h->client, - msize, - timeout, - &transmit_get_result, - h)) - { - GNUNET_break (0); - h->response_proc = NULL; - h->message_size = 0; - cont (h->response_proc_cls, - NULL, 0, NULL, 0, 0, 0, zero, 0); - } + "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", + (unsigned long long) offset, type, + (unsigned long long) timeout.rel_value); +#endif + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; + qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage), + queue_priority, max_queue_size, timeout, + &process_result_message, &qc); + if (qe == NULL) + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for zero-anonymity procation\n"); +#endif + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# GET ZERO ANONYMITY requests executed"), 1, + GNUNET_NO); + m = (struct GetZeroAnonymityMessage *) &qe[1]; + m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); + m->header.size = htons (sizeof (struct GetZeroAnonymityMessage)); + m->type = htonl ((uint32_t) type); + m->offset = GNUNET_htonll (offset); + process_queue (h); + return qe; } /** - * Iterate over the results for a particular key - * in the datastore. + * Get a result for a particular key from the datastore. The processor + * will only be called once. * * @param h handle to the datastore + * @param offset offset of the result (modulo num-results); set to + * a random 64-bit value initially; then increment by + * one each time; detect that all results have been found by uid + * being again the first uid ever returned. * @param key maybe NULL (to match all entries) * @param type desired type, 0 for any - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) * @param timeout how long to wait at most for a response + * @param proc function to call on each matching value; + * will be called once with a NULL value at the end + * @param proc_cls closure for proc + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel */ -void -GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode * key, - uint32_t type, - GNUNET_DATASTORE_Iterator iter, void *iter_cls, - struct GNUNET_TIME_Relative timeout) +struct GNUNET_DATASTORE_QueueEntry * +GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, + const GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { + struct GNUNET_DATASTORE_QueueEntry *qe; struct GetMessage *gm; + union QueueContext qc; - gm = (struct GetMessage*) &h[1]; - gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); - gm->type = htonl(type); + GNUNET_assert (NULL != proc); +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asked to look for data of type %u under key `%s'\n", + (unsigned int) type, GNUNET_h2s (key)); +#endif + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; + qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority, + max_queue_size, timeout, &process_result_message, &qc); + if (qe == NULL) + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n", + GNUNET_h2s (key)); +#endif + return NULL; + } + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"), + 1, GNUNET_NO); + gm = (struct GetMessage *) &qe[1]; + gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET); + gm->type = htonl (type); + gm->offset = GNUNET_htonll (offset); if (key != NULL) - { - gm->header.size = htons(sizeof (struct GetMessage)); - gm->key = *key; - } + { + gm->header.size = htons (sizeof (struct GetMessage)); + gm->key = *key; + } else - { - gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode)); - } - transmit_for_result (h, iter, iter_cls, timeout); -} - - -/** - * Get a random value from the datastore. - * - * @param h handle to the datastore - * @param iter function to call on a random value; it - * will be called exactly once; if no values - * are available, the value will be NULL. - * @param iter_cls closure for iter - * @param timeout how long to wait at most for a response - */ -void -GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, - GNUNET_DATASTORE_Iterator iter, void *iter_cls, - struct GNUNET_TIME_Relative timeout) -{ - struct GNUNET_MessageHeader *m; - - m = (struct GNUNET_MessageHeader*) &h[1]; - m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM); - m->size = htons(sizeof (struct GNUNET_MessageHeader)); - transmit_for_result (h, iter, iter_cls, timeout); + { + gm->header.size = + htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode)); + } + process_queue (h); + return qe; } /** - * Explicitly remove some content from the database. - * - * @param h handle to the datastore - * @param key key for the value - * @param size number of bytes in data - * @param data content stored - * @param cont continuation to call when done - * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response + * Cancel a datastore operation. The final callback from the + * operation must not have been done yet. + * + * @param qe operation to cancel */ void -GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode * key, - uint32_t size, const void *data, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout) +GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) { - struct DataMessage *dm; - size_t msize; + struct GNUNET_DATASTORE_Handle *h; - msize = sizeof(struct DataMessage) + size; - GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - dm = (struct DataMessage*) &h[1]; - dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); - dm->header.size = htons(msize); - dm->rid = htonl(0); - dm->size = htonl(size); - dm->type = htonl(0); - dm->priority = htonl(0); - dm->anonymity = htonl(0); - dm->uid = GNUNET_htonll(0); - dm->expiration.value = 0; - dm->key = *key; - memcpy (&dm[1], data, size); - transmit_for_status (h, cont, cont_cls, timeout); + GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); + h = qe->h; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Pending DATASTORE request %p cancelled (%d, %d)\n", qe, + qe->was_transmitted, h->queue_head == qe); +#endif + if (GNUNET_YES == qe->was_transmitted) + { + free_queue_entry (qe); + h->skip_next_messages++; + return; + } + free_queue_entry (qe); + process_queue (h); }