X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdatastore%2Fdatastore_api.c;h=3bf8ecd94f26cbee17fc407b21110d034d858bdf;hb=6c471eeb15e27f8226492b4860a3c2acb94c5f25;hp=12ec8bf0535bc2bfe7bf6c76f62a9005965f2e79;hpb=3a52873d301e7f458dd9622e3632d4056551d7b0;p=oweals%2Fgnunet.git diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 12ec8bf05..3bf8ecd94 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -1,10 +1,10 @@ /* This file is part of GNUnet - (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors) + (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 2, or (at your + by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but @@ -28,8 +28,69 @@ #include "gnunet_arm_service.h" #include "gnunet_constants.h" #include "gnunet_datastore_service.h" +#include "gnunet_statistics_service.h" #include "datastore.h" +#define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__) + +/** + * If a client stopped asking for more results, how many more do + * we receive from the DB before killing the connection? Trade-off + * between re-doing TCP handshakes and (needlessly) receiving + * useless results. + */ +#define MAX_EXCESS_RESULTS 8 + +/** + * Context for processing status messages. + */ +struct StatusContext +{ + /** + * Continuation to call with the status. + */ + GNUNET_DATASTORE_ContinuationWithStatus cont; + + /** + * Closure for cont. + */ + void *cont_cls; + +}; + + +/** + * Context for processing result messages. + */ +struct ResultContext +{ + /** + * Function to call with the result. + */ + GNUNET_DATASTORE_DatumProcessor proc; + + /** + * Closure for proc. + */ + void *proc_cls; + +}; + + +/** + * Context for a queue operation. + */ +union QueueContext +{ + + struct StatusContext sc; + + struct ResultContext rc; + +}; + + + /** * Entry in our priority queue. */ @@ -54,26 +115,25 @@ struct GNUNET_DATASTORE_QueueEntry /** * Response processor (NULL if we are not waiting for a response). * This struct should be used for the closure, function-specific - * arguments can be passed via 'client_ctx'. + * arguments can be passed via 'qc'. */ GNUNET_CLIENT_MessageHandler response_proc; - - /** - * Specific context (variable argument that - * can be used by the response processor). - */ - void *client_ctx; /** * Function to call after transmission of the request. */ GNUNET_DATASTORE_ContinuationWithStatus cont; - + /** * Closure for 'cont'. */ void *cont_cls; + /** + * Context for the operation. + */ + union QueueContext qc; + /** * Task for timeout signalling. */ @@ -105,15 +165,15 @@ struct GNUNET_DATASTORE_QueueEntry /** * Has this message been transmitted to the service? * Only ever GNUNET_YES for the head of the queue. - * Note that the overall struct should end at a + * Note that the overall struct should end at a * multiple of 64 bits. */ - int32_t was_transmitted; + int was_transmitted; }; /** - * Handle to the datastore service. + * Handle to the datastore service. */ struct GNUNET_DATASTORE_Handle { @@ -124,14 +184,14 @@ struct GNUNET_DATASTORE_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Our scheduler. + * Current connection to the datastore service. */ - struct GNUNET_SCHEDULER_Handle *sched; + struct GNUNET_CLIENT_Connection *client; /** - * Current connection to the datastore service. + * Handle for statistics. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_STATISTICS_Handle *stats; /** * Current transmit handle. @@ -164,6 +224,23 @@ struct GNUNET_DATASTORE_Handle */ unsigned int queue_size; + /** + * Number of results we're receiving for the current query + * after application stopped to care. Used to determine when + * to reset the connection. + */ + unsigned int result_count; + + /** + * Are we currently trying to receive from the service? + */ + int in_receive; + + /** + * We should ignore the next message(s) from the service. + */ + unsigned int skip_next_messages; + }; @@ -172,32 +249,42 @@ struct GNUNET_DATASTORE_Handle * Connect to the datastore service. * * @param cfg configuration to use - * @param sched scheduler to use * @return handle to use to access the service */ struct GNUNET_DATASTORE_Handle * -GNUNET_DATASTORE_connect (const struct - GNUNET_CONFIGURATION_Handle - *cfg, - struct - GNUNET_SCHEDULER_Handle - *sched) +GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) { struct GNUNET_CLIENT_Connection *c; struct GNUNET_DATASTORE_Handle *h; - - c = GNUNET_CLIENT_connect (sched, "datastore", cfg); + + c = GNUNET_CLIENT_connect ("datastore", cfg); if (c == NULL) - return NULL; /* oops */ - h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + - GNUNET_SERVER_MAX_MESSAGE_SIZE); + return NULL; /* oops */ + h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) + + GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); h->client = c; h->cfg = cfg; - h->sched = sched; + h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); return h; } +/** + * Task used by 'transmit_drop' to disconnect the datastore. + * + * @param cls the datastore handle + * @param tc scheduler context + */ +static void +disconnect_after_drop (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + + GNUNET_DATASTORE_disconnect (h, GNUNET_NO); +} + + /** * Transmit DROP message to datastore service. * @@ -207,26 +294,26 @@ GNUNET_DATASTORE_connect (const struct * @return number of bytes written to buf */ static size_t -transmit_drop (void *cls, - size_t size, - void *buf) +transmit_drop (void *cls, size_t size, void *buf) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_MessageHeader *hdr; - + if (buf == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to drop database.\n")); - GNUNET_DATASTORE_disconnect (h, GNUNET_NO); - return 0; - } - GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader)); + { + LOG (GNUNET_ERROR_TYPE_WARNING, + _("Failed to transmit request to drop database.\n")); + GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); + return 0; + } + GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); hdr = buf; - hdr->size = htons(sizeof(struct GNUNET_MessageHeader)); - hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP); - GNUNET_DATASTORE_disconnect (h, GNUNET_NO); - return sizeof(struct GNUNET_MessageHeader); + hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); + hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP); + GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); + return sizeof (struct GNUNET_MessageHeader); } @@ -237,44 +324,51 @@ transmit_drop (void *cls, * @param h handle to the datastore * @param drop set to GNUNET_YES to delete all data in datastore (!) */ -void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, - int drop) +void +GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop) { struct GNUNET_DATASTORE_QueueEntry *qe; + LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n"); + if (NULL != h->th) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); + h->th = NULL; + } if (h->client != NULL) - { - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = NULL; - } + { + GNUNET_CLIENT_disconnect (h->client); + h->client = NULL; + } if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, - h->reconnect_task); - h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - } + { + GNUNET_SCHEDULER_cancel (h->reconnect_task); + h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; + } while (NULL != (qe = h->queue_head)) + { + GNUNET_assert (NULL != qe->response_proc); + qe->response_proc (h, NULL); + } + if (GNUNET_YES == drop) + { + h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); + if (h->client != NULL) { - GNUNET_assert (NULL != qe->response_proc); - qe->response_proc (qe, NULL); - } - if (GNUNET_YES == drop) - { - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); - if (h->client != NULL) - { - if (NULL != - GNUNET_CLIENT_notify_transmit_ready (h->client, - sizeof(struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_MINUTES, - GNUNET_YES, - &transmit_drop, - h)) - return; - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - } - GNUNET_break (0); + if (NULL != + GNUNET_CLIENT_notify_transmit_ready (h->client, + sizeof (struct + GNUNET_MessageHeader), + GNUNET_TIME_UNIT_MINUTES, + GNUNET_YES, &transmit_drop, h)) + return; + GNUNET_CLIENT_disconnect (h->client); + h->client = NULL; } + GNUNET_break (0); + } + GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO); + h->stats = NULL; GNUNET_free (h); } @@ -286,14 +380,17 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, * @param tc scheduler context */ static void -timeout_queue_entry (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_DATASTORE_QueueEntry *qe = cls; + GNUNET_STATISTICS_update (qe->h->stats, + gettext_noop ("# queue entry timeouts"), 1, + GNUNET_NO); qe->task = GNUNET_SCHEDULER_NO_TASK; GNUNET_assert (qe->was_transmitted == GNUNET_NO); - qe->response_proc (qe, NULL); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout of request in datastore queue\n"); + qe->response_proc (qe->h, NULL); } @@ -308,17 +405,15 @@ timeout_queue_entry (void *cls, * (if other requests of higher priority are in the queue) * @param timeout timeout for the operation * @param response_proc function to call with replies (can be NULL) - * @param client_ctx client context (NOT a closure for response_proc) - * @return NULL if the queue is full (and this entry was dropped) + * @param qc client context (NOT a closure for response_proc) + * @return NULL if the queue is full */ static struct GNUNET_DATASTORE_QueueEntry * -make_queue_entry (struct GNUNET_DATASTORE_Handle *h, - size_t msize, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_CLIENT_MessageHandler response_proc, - void *client_ctx) +make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize, + unsigned int queue_priority, unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_CLIENT_MessageHandler response_proc, + const union QueueContext *qc) { struct GNUNET_DATASTORE_QueueEntry *ret; struct GNUNET_DATASTORE_QueueEntry *pos; @@ -326,63 +421,67 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, c = 0; pos = h->queue_head; - while ( (pos != NULL) && - (c < max_queue_size) && - (pos->priority >= queue_priority) ) - { - c++; - pos = pos->next; - } + while ((pos != NULL) && (c < max_queue_size) && + (pos->priority >= queue_priority)) + { + c++; + pos = pos->next; + } + if (c >= max_queue_size) + { + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1, + GNUNET_NO); + return NULL; + } ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); ret->h = h; ret->response_proc = response_proc; - ret->client_ctx = client_ctx; + ret->qc = *qc; ret->timeout = GNUNET_TIME_relative_to_absolute (timeout); ret->priority = queue_priority; ret->max_queue = max_queue_size; ret->message_size = msize; ret->was_transmitted = GNUNET_NO; if (pos == NULL) - { - /* append at the tail */ - pos = h->queue_tail; - } + { + /* append at the tail */ + pos = h->queue_tail; + } else - { - pos = pos->prev; - /* do not insert at HEAD if HEAD query was already - transmitted and we are still receiving replies! */ - if ( (pos == NULL) && - (h->queue_head->was_transmitted) ) - pos = h->queue_head; - } + { + pos = pos->prev; + /* do not insert at HEAD if HEAD query was already + * transmitted and we are still receiving replies! */ + if ((pos == NULL) && (h->queue_head->was_transmitted)) + pos = h->queue_head; + } c++; - GNUNET_CONTAINER_DLL_insert_after (h->queue_head, - h->queue_tail, - pos, - ret); + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"), + 1, GNUNET_NO); + GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret); h->queue_size++; - if (c > max_queue_size) - { - response_proc (ret, NULL); - GNUNET_free (ret); - return NULL; - } - ret->task = GNUNET_SCHEDULER_add_delayed (h->sched, - timeout, - &timeout_queue_entry, - ret); + ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret); pos = ret->next; - while (pos != NULL) + while (pos != NULL) + { + if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO)) { - if (pos->max_queue < h->queue_size) - { - GNUNET_assert (pos->response_proc != NULL); - pos->response_proc (pos, NULL); - break; - } - pos = pos->next; + GNUNET_assert (pos->response_proc != NULL); + /* move 'pos' element to head so that it will be + * killed on 'NULL' call below */ + LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping request from datastore queue\n"); + GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos); + GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos); + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# Requests dropped from datastore queue"), 1, + GNUNET_NO); + GNUNET_assert (h->queue_head == pos); + pos->response_proc (h, NULL); + break; } + pos = pos->next; + } return ret; } @@ -390,7 +489,7 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, /** * Process entries in the queue (or do nothing if we are already * doing so). - * + * * @param h handle to the datastore */ static void @@ -404,21 +503,28 @@ process_queue (struct GNUNET_DATASTORE_Handle *h); * @param tc scheduler context */ static void -try_reconnect (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_DATASTORE_Handle *h = cls; - if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value) + if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value) h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY; else h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2); - if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value) + if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value) h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT; h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); + h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); if (h->client == NULL) + { + LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n"); return; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# datastore connections (re)created"), 1, + GNUNET_NO); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); process_queue (h); } @@ -433,13 +539,51 @@ static void do_disconnect (struct GNUNET_DATASTORE_Handle *h) { if (h->client == NULL) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "client NULL in disconnect, will not try to reconnect\n"); return; - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + } +#if 0 + GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"), + 1, GNUNET_NO); +#endif + GNUNET_CLIENT_disconnect (h->client); + h->skip_next_messages = 0; h->client = NULL; - h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched, - h->retry_time, - &try_reconnect, - h); + h->reconnect_task = + GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h); +} + + +/** + * Function called whenever we receive a message from + * the service. Calls the appropriate handler. + * + * @param cls the 'struct GNUNET_DATASTORE_Handle' + * @param msg the received message + */ +static void +receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + + h->in_receive = GNUNET_NO; + LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n"); + if (h->skip_next_messages > 0) + { + h->skip_next_messages--; + process_queue (h); + return; + } + if (NULL == (qe = h->queue_head)) + { + GNUNET_break (0); + process_queue (h); + return; + } + qe->response_proc (h, msg); } @@ -452,9 +596,7 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h) * @return number of bytes written to buf */ static size_t -transmit_request (void *cls, - size_t size, - void *buf) +transmit_request (void *cls, size_t size, void *buf) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; @@ -462,28 +604,34 @@ transmit_request (void *cls, h->th = NULL; if (NULL == (qe = h->queue_head)) - return 0; /* no entry in queue */ + return 0; /* no entry in queue */ if (buf == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to database.\n")); - do_disconnect (h); - return 0; - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to DATASTORE.\n"); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# transmission request failures"), + 1, GNUNET_NO); + do_disconnect (h); + return 0; + } if (size < (msize = qe->message_size)) - { - process_queue (h); - return 0; - } + { + process_queue (h); + return 0; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n", + msize); memcpy (buf, &qe[1], msize); qe->was_transmitted = GNUNET_YES; - GNUNET_SCHEDULER_cancel (h->sched, - qe->task); + GNUNET_SCHEDULER_cancel (qe->task); qe->task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_CLIENT_receive (h->client, - qe->response_proc, - qe, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); + GNUNET_assert (GNUNET_NO == h->in_receive); + h->in_receive = GNUNET_YES; + GNUNET_CLIENT_receive (h->client, &receive_cb, h, + GNUNET_TIME_absolute_get_remaining (qe->timeout)); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# bytes sent to datastore"), 1, + GNUNET_NO); return msize; } @@ -491,7 +639,7 @@ transmit_request (void *cls, /** * Process entries in the queue (or do nothing if we are already * doing so). - * + * * @param h handle to the datastore */ static void @@ -500,74 +648,83 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) struct GNUNET_DATASTORE_QueueEntry *qe; if (NULL == (qe = h->queue_head)) - return; /* no entry in queue */ + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n"); + return; /* no entry in queue */ + } if (qe->was_transmitted == GNUNET_YES) - return; /* waiting for replies */ + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n"); + return; /* waiting for replies */ + } if (h->th != NULL) - return; /* request pending */ + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n"); + return; /* request pending */ + } if (h->client == NULL) - return; /* waiting for reconnect */ - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, - qe->message_size, - GNUNET_TIME_absolute_get_remaining (qe->timeout), - GNUNET_YES, - &transmit_request, - h); + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n"); + return; /* waiting for reconnect */ + } + if (GNUNET_YES == h->in_receive) + { + /* wait for response to previous query */ + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n", + qe->message_size); + h->th = + GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, + GNUNET_TIME_absolute_get_remaining + (qe->timeout), GNUNET_YES, + &transmit_request, h); + GNUNET_assert (GNUNET_NO == h->in_receive); + GNUNET_break (NULL != h->th); } - - -/** - * Context for processing status messages. - */ -struct StatusContext -{ - /** - * Continuation to call with the status. - */ - GNUNET_DATASTORE_ContinuationWithStatus cont; - - /** - * Closure for cont. - */ - void *cont_cls; - -}; - - /** * Dummy continuation used to do nothing (but be non-zero). * * @param cls closure - * @param result result + * @param result result + * @param min_expiration expiration time * @param emsg error message */ static void -drop_status_cont (void *cls, int result, const char *emsg) +drop_status_cont (void *cls, int32_t result, + struct GNUNET_TIME_Absolute min_expiration, + const char *emsg) { /* do nothing */ } +/** + * Free a queue entry. Removes the given entry from the + * queue and releases associated resources. Does NOT + * call the callback. + * + * @param qe entry to free. + */ static void free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) { struct GNUNET_DATASTORE_Handle *h = qe->h; - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); + GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe); if (qe->task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, - qe->task); - qe->task = GNUNET_SCHEDULER_NO_TASK; - } + { + GNUNET_SCHEDULER_cancel (qe->task); + qe->task = GNUNET_SCHEDULER_NO_TASK; + } h->queue_size--; + qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ GNUNET_free (qe); } + /** * Type of a function to call when we receive a message * from the service. @@ -575,70 +732,79 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) * @param cls closure * @param msg message received, NULL on timeout or fatal error */ -static void -process_status_message (void *cls, - const struct - GNUNET_MessageHeader * msg) +static void +process_status_message (void *cls, const struct GNUNET_MessageHeader *msg) { - struct GNUNET_DATASTORE_QueueEntry *qe = cls; - struct GNUNET_DATASTORE_Handle *h = qe->h; - struct StatusContext *rc = qe->client_ctx; + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct StatusContext rc; const struct StatusMessage *sm; const char *emsg; int32_t status; + int was_transmitted; - free_queue_entry (qe); + if (NULL == (qe = h->queue_head)) + { + GNUNET_break (0); + do_disconnect (h); + return; + } + rc = qe->qc.sc; if (msg == NULL) - { - if (NULL == h->client) - return; /* forced disconnect */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to receive response from database.\n")); - do_disconnect (h); - return; - } - - if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || - (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) - { - GNUNET_break (0); - h->retry_time = GNUNET_TIME_UNIT_ZERO; + { + was_transmitted = qe->was_transmitted; + free_queue_entry (qe); + if (was_transmitted == GNUNET_YES) do_disconnect (h); - rc->cont (rc->cont_cls, - GNUNET_SYSERR, - _("Error reading response from datastore service")); - GNUNET_free (rc); - return; - } - sm = (const struct StatusMessage*) msg; - status = ntohl(sm->status); + else + process_queue (h); + if (rc.cont != NULL) + rc.cont (rc.cont_cls, GNUNET_SYSERR, + GNUNET_TIME_UNIT_ZERO_ABS, + _("Failed to receive status response from database.")); + return; + } + GNUNET_assert (GNUNET_YES == qe->was_transmitted); + free_queue_entry (qe); + if ((ntohs (msg->size) < sizeof (struct StatusMessage)) || + (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS)) + { + GNUNET_break (0); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + if (rc.cont != NULL) + rc.cont (rc.cont_cls, GNUNET_SYSERR, + GNUNET_TIME_UNIT_ZERO_ABS, + _("Error reading response from datastore service")); + return; + } + sm = (const struct StatusMessage *) msg; + status = ntohl (sm->status); emsg = NULL; - if (ntohs(msg->size) > sizeof(struct StatusMessage)) - { - emsg = (const char*) &sm[1]; - if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0') - { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); - } - } - if ( (status == GNUNET_SYSERR) && - (emsg == NULL) ) + if (ntohs (msg->size) > sizeof (struct StatusMessage)) + { + emsg = (const char *) &sm[1]; + if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0') { GNUNET_break (0); emsg = _("Invalid error message received from datastore service"); } -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received status %d/%s\n", - (int) status, - emsg); -#endif - rc->cont (rc->cont_cls, - status, - emsg); - GNUNET_free (rc); + } + if ((status == GNUNET_SYSERR) && (emsg == NULL)) + { + GNUNET_break (0); + emsg = _("Invalid error message received from datastore service"); + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# status messages received"), 1, + GNUNET_NO); + h->retry_time.rel_value = 0; process_queue (h); + if (rc.cont != NULL) + rc.cont (rc.cont_cls, status, + GNUNET_TIME_absolute_ntoh (sm->min_expiration), + emsg); } @@ -656,6 +822,7 @@ process_status_message (void *cls, * @param type type of the content * @param priority priority of the content * @param anonymity anonymity-level for the content + * @param replication how often should the content be replicated to other peers? * @param expiration expiration time for the content * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped @@ -668,52 +835,51 @@ process_status_message (void *cls, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, - int rid, - const GNUNET_HashCode * key, - uint32_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, +GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid, + const struct GNUNET_HashCode * key, size_t size, + const void *data, enum GNUNET_BLOCK_Type type, + uint32_t priority, uint32_t anonymity, + uint32_t replication, struct GNUNET_TIME_Absolute expiration, - unsigned int queue_priority, - unsigned int max_queue_size, + unsigned int queue_priority, unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { - struct StatusContext *scont; struct GNUNET_DATASTORE_QueueEntry *qe; struct DataMessage *dm; size_t msize; - -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to put %u bytes of data under key `%s'\n", - size, - GNUNET_h2s (key)); -#endif - msize = sizeof(struct DataMessage) + size; - GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - scont = GNUNET_malloc (sizeof (struct StatusContext)); - scont->cont = cont; - scont->cont_cls = cont_cls; - qe = make_queue_entry (h, msize, - queue_priority, max_queue_size, timeout, - &process_status_message, scont); + union QueueContext qc; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to put %u bytes of data under key `%s' for %llu ms\n", size, + GNUNET_h2s (key), + GNUNET_TIME_absolute_get_remaining (expiration).rel_value); + msize = sizeof (struct DataMessage) + size; + GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, + &process_status_message, &qc); if (qe == NULL) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n"); return NULL; - dm = (struct DataMessage* ) &qe[1]; - dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); - dm->header.size = htons(msize); - dm->rid = htonl(rid); - dm->size = htonl(size); - dm->type = htonl(type); - dm->priority = htonl(priority); - dm->anonymity = htonl(anonymity); - dm->uid = GNUNET_htonll(0); - dm->expiration = GNUNET_TIME_absolute_hton(expiration); + } + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"), + 1, GNUNET_NO); + dm = (struct DataMessage *) &qe[1]; + dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT); + dm->header.size = htons (msize); + dm->rid = htonl (rid); + dm->size = htonl ((uint32_t) size); + dm->type = htonl (type); + dm->priority = htonl (priority); + dm->anonymity = htonl (anonymity); + dm->replication = htonl (replication); + dm->reserved = htonl (0); + dm->uid = GNUNET_htonll (0); + dm->expiration = GNUNET_TIME_absolute_hton (expiration); dm->key = *key; memcpy (&dm[1], data, size); process_queue (h); @@ -741,40 +907,39 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, - uint64_t amount, - uint32_t entries, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) +GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, + uint32_t entries, unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct ReserveMessage *rm; - struct StatusContext *scont; + union QueueContext qc; if (cont == NULL) cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to reserve %llu bytes of data and %u entries'\n", - (unsigned long long) amount, - (unsigned int) entries); -#endif - scont = GNUNET_malloc (sizeof (struct StatusContext)); - scont->cont = cont; - scont->cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof(struct ReserveMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, scont); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to reserve %llu bytes of data and %u entries\n", + (unsigned long long) amount, (unsigned int) entries); + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority, + max_queue_size, timeout, &process_status_message, &qc); if (qe == NULL) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry to reserve\n"); return NULL; - rm = (struct ReserveMessage*) &qe[1]; - rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); - rm->header.size = htons(sizeof (struct ReserveMessage)); - rm->entries = htonl(entries); - rm->amount = GNUNET_htonll(amount); + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# RESERVE requests executed"), 1, + GNUNET_NO); + rm = (struct ReserveMessage *) &qe[1]; + rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); + rm->header.size = htons (sizeof (struct ReserveMessage)); + rm->entries = htonl (entries); + rm->amount = GNUNET_htonll (amount); process_queue (h); return qe; } @@ -803,36 +968,38 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, - int rid, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) + uint32_t rid, unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct ReleaseReserveMessage *rrm; - struct StatusContext *scont; + union QueueContext qc; if (cont == NULL) cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to release reserve %d\n", - rid); -#endif - scont = GNUNET_malloc (sizeof (struct StatusContext)); - scont->cont = cont; - scont->cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, scont); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid); + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage), + queue_priority, max_queue_size, timeout, + &process_status_message, &qc); if (qe == NULL) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry to release reserve\n"); return NULL; - rrm = (struct ReleaseReserveMessage*) &qe[1]; - rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); - rrm->header.size = htons(sizeof (struct ReleaseReserveMessage)); - rrm->rid = htonl(rid); + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# RELEASE RESERVE requests executed"), 1, + GNUNET_NO); + rrm = (struct ReleaseReserveMessage *) &qe[1]; + rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); + rrm->header.size = htons (sizeof (struct ReleaseReserveMessage)); + rrm->rid = htonl (rid); process_queue (h); return qe; } @@ -856,43 +1023,42 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, - unsigned long long uid, - uint32_t priority, - struct GNUNET_TIME_Absolute expiration, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) +GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, + uint32_t priority, + struct GNUNET_TIME_Absolute expiration, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct UpdateMessage *um; - struct StatusContext *scont; + union QueueContext qc; if (cont == NULL) cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to update entry %llu raising priority by %u and expiration to %llu\n", - uid, - (unsigned int) priority, - (unsigned long long) expiration.value); -#endif - scont = GNUNET_malloc (sizeof (struct StatusContext)); - scont->cont = cont; - scont->cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof(struct UpdateMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, scont); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to update entry %llu raising priority by %u and expiration to %llu\n", + uid, (unsigned int) priority, (unsigned long long) expiration.abs_value); + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority, + max_queue_size, timeout, &process_status_message, &qc); if (qe == NULL) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for UPDATE\n"); return NULL; - um = (struct UpdateMessage*) &qe[1]; - um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); - um->header.size = htons(sizeof (struct UpdateMessage)); - um->priority = htonl(priority); - um->expiration = GNUNET_TIME_absolute_hton(expiration); - um->uid = GNUNET_htonll(uid); + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# UPDATE requests executed"), 1, + GNUNET_NO); + um = (struct UpdateMessage *) &qe[1]; + um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); + um->header.size = htons (sizeof (struct UpdateMessage)); + um->priority = htonl (priority); + um->expiration = GNUNET_TIME_absolute_hton (expiration); + um->uid = GNUNET_htonll (uid); process_queue (h); return qe; } @@ -921,48 +1087,46 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode *key, - uint32_t size, - const void *data, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) + const struct GNUNET_HashCode * key, size_t size, + const void *data, unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct DataMessage *dm; size_t msize; - struct StatusContext *scont; + union QueueContext qc; if (cont == NULL) cont = &drop_status_cont; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to remove %u bytes under key `%s'\n", - size, - GNUNET_h2s (key)); -#endif - scont = GNUNET_malloc (sizeof (struct StatusContext)); - scont->cont = cont; - scont->cont_cls = cont_cls; - msize = sizeof(struct DataMessage) + size; - GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - qe = make_queue_entry (h, msize, - queue_priority, max_queue_size, timeout, - &process_status_message, scont); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n", + size, GNUNET_h2s (key)); + qc.sc.cont = cont; + qc.sc.cont_cls = cont_cls; + msize = sizeof (struct DataMessage) + size; + GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); + qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, + &process_status_message, &qc); if (qe == NULL) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n"); return NULL; - dm = (struct DataMessage*) &qe[1]; - dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); - dm->header.size = htons(msize); - dm->rid = htonl(0); - dm->size = htonl(size); - dm->type = htonl(0); - dm->priority = htonl(0); - dm->anonymity = htonl(0); - dm->uid = GNUNET_htonll(0); - dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS); + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# REMOVE requests executed"), 1, + GNUNET_NO); + dm = (struct DataMessage *) &qe[1]; + dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); + dm->header.size = htons (msize); + dm->rid = htonl (0); + dm->size = htonl (size); + dm->type = htonl (0); + dm->priority = htonl (0); + dm->anonymity = htonl (0); + dm->uid = GNUNET_htonll (0); + dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); dm->key = *key; memcpy (&dm[1], data, size); process_queue (h); @@ -970,25 +1134,6 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, } - -/** - * Context for processing result messages. - */ -struct ResultContext -{ - /** - * Iterator to call with the result. - */ - GNUNET_DATASTORE_Iterator iter; - - /** - * Closure for iter. - */ - void *iter_cls; - -}; - - /** * Type of a function to call when we receive a message * from the service. @@ -996,252 +1141,316 @@ struct ResultContext * @param cls closure * @param msg message received, NULL on timeout or fatal error */ -static void -process_result_message (void *cls, - const struct GNUNET_MessageHeader * msg) +static void +process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) { - struct GNUNET_DATASTORE_QueueEntry *qe = cls; - struct GNUNET_DATASTORE_Handle *h = qe->h; - struct ResultContext *rc = qe->client_ctx; + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct ResultContext rc; const struct DataMessage *dm; + int was_transmitted; - GNUNET_assert (h->queue_head == qe); if (msg == NULL) + { + qe = h->queue_head; + GNUNET_assert (NULL != qe); + rc = qe->qc.rc; + was_transmitted = qe->was_transmitted; + free_queue_entry (qe); + if (was_transmitted == GNUNET_YES) { -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to receive response from datastore\n")); -#endif - free_queue_entry (qe); + LOG (GNUNET_ERROR_TYPE_WARNING, + _("Failed to receive response from database.\n")); do_disconnect (h); - rc->iter (rc->iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (rc); - return; } - if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) + else { - GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received end of result set\n"); -#endif - free_queue_entry (qe); - rc->iter (rc->iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (rc); process_queue (h); - return; } - if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || - (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || - (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) - { - GNUNET_break (0); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - rc->iter (rc->iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (rc); - return; - } - dm = (const struct DataMessage*) msg; -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received result %llu with type %u and size %u with key %s\n", - (unsigned long long) GNUNET_ntohll(dm->uid), - ntohl(dm->type), - ntohl(dm->size), - GNUNET_h2s(&dm->key)); -#endif - rc->iter (rc->iter_cls, - &dm->key, - ntohl(dm->size), - &dm[1], - ntohl(dm->type), - ntohl(dm->priority), - ntohl(dm->anonymity), - GNUNET_TIME_absolute_ntoh(dm->expiration), - GNUNET_ntohll(dm->uid)); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + return; + } + if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) + { + GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)); + qe = h->queue_head; + rc = qe->qc.rc; + GNUNET_assert (GNUNET_YES == qe->was_transmitted); + free_queue_entry (qe); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received end of result set, new queue size is %u\n", h->queue_size); + h->retry_time.rel_value = 0; + h->result_count = 0; + process_queue (h); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + return; + } + qe = h->queue_head; + GNUNET_assert (NULL != qe); + rc = qe->qc.rc; + if (GNUNET_YES != qe->was_transmitted) + { + GNUNET_break (0); + free_queue_entry (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + return; + } + if ((ntohs (msg->size) < sizeof (struct DataMessage)) || + (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || + (ntohs (msg->size) != + sizeof (struct DataMessage) + + ntohl (((const struct DataMessage *) msg)->size))) + { + GNUNET_break (0); + free_queue_entry (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + return; + } + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1, + GNUNET_NO); + dm = (const struct DataMessage *) msg; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received result %llu with type %u and size %u with key %s\n", + (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type), + ntohl (dm->size), GNUNET_h2s (&dm->key)); + free_queue_entry (qe); + h->retry_time.rel_value = 0; + process_queue (h); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type), + ntohl (dm->priority), ntohl (dm->anonymity), + GNUNET_TIME_absolute_ntoh (dm->expiration), + GNUNET_ntohll (dm->uid)); } /** - * Get a random value from the datastore. + * Get a random value from the datastore for content replication. + * Returns a single, random value among those with the highest + * replication score, lowering positive replication scores by one for + * the chosen value (if only content with a replication score exists, + * a random value is returned and replication scores are not changed). * * @param h handle to the datastore * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) * @param timeout how long to wait at most for a response - * @param iter function to call on a random value; it + * @param proc function to call on a random value; it * will be called once with a value (if available) * and always once with a value of NULL. - * @param iter_cls closure for iter + * @param proc_cls closure for proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls) +GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct GNUNET_MessageHeader *m; - struct ResultContext *rcont; - -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get random entry in %llu ms\n", - (unsigned long long) timeout.value); -#endif - rcont = GNUNET_malloc (sizeof (struct ResultContext)); - rcont->iter = iter; - rcont->iter_cls = iter_cls; - qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), - queue_priority, max_queue_size, timeout, - &process_result_message, rcont); + union QueueContext qc; + + GNUNET_assert (NULL != proc); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to get replication entry in %llu ms\n", + (unsigned long long) timeout.rel_value); + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; + qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader), + queue_priority, max_queue_size, timeout, + &process_result_message, &qc); if (qe == NULL) - return NULL; - m = (struct GNUNET_MessageHeader*) &qe[1]; - m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM); - m->size = htons(sizeof (struct GNUNET_MessageHeader)); + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for GET REPLICATION\n"); + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# GET REPLICATION requests executed"), 1, + GNUNET_NO); + m = (struct GNUNET_MessageHeader *) &qe[1]; + m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); + m->size = htons (sizeof (struct GNUNET_MessageHeader)); process_queue (h); return qe; } - /** - * Iterate over the results for a particular key - * in the datastore. The iterator will only be called - * once initially; if the first call did contain a - * result, further results can be obtained by calling - * "GNUNET_DATASTORE_get_next" with the given argument. + * Get a single zero-anonymity value from the datastore. * * @param h handle to the datastore - * @param key maybe NULL (to match all entries) - * @param type desired type, 0 for any + * @param offset offset of the result (modulo num-results); set to + * a random 64-bit value initially; then increment by + * one each time; detect that all results have been found by uid + * being again the first uid ever returned. * @param queue_priority ranking of this request in the priority queue * @param max_queue_size at what queue size should this request be dropped * (if other requests of higher priority are in the queue) * @param timeout how long to wait at most for a response - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter + * @param type allowed type for the operation (never zero) + * @param proc function to call on a random value; it + * will be called once with a value (if available) + * or with NULL if none value exists. + * @param proc_cls closure for proc * @return NULL if the entry was not queued, otherwise a handle that can be used to - * cancel; note that even if NULL is returned, the callback will be invoked - * (or rather, will already have been invoked) + * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode * key, - enum GNUNET_BLOCK_Type type, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_Iterator iter, - void *iter_cls) +GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, + uint64_t offset, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + enum GNUNET_BLOCK_Type type, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; - struct GetMessage *gm; - struct ResultContext *rcont; - -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to look for data of type %u under key `%s'\n", - (unsigned int) type, - GNUNET_h2s (key)); -#endif - rcont = GNUNET_malloc (sizeof (struct ResultContext)); - rcont->iter = iter; - rcont->iter_cls = iter_cls; - qe = make_queue_entry (h, sizeof(struct GetMessage), - queue_priority, max_queue_size, timeout, - &process_result_message, rcont); + struct GetZeroAnonymityMessage *m; + union QueueContext qc; + + GNUNET_assert (NULL != proc); + GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", + (unsigned long long) offset, type, + (unsigned long long) timeout.rel_value); + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; + qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage), + queue_priority, max_queue_size, timeout, + &process_result_message, &qc); if (qe == NULL) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for zero-anonymity procation\n"); return NULL; - gm = (struct GetMessage*) &qe[1]; - gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); - gm->type = htonl(type); - if (key != NULL) - { - gm->header.size = htons(sizeof (struct GetMessage)); - gm->key = *key; - } - else - { - gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode)); - } + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# GET ZERO ANONYMITY requests executed"), 1, + GNUNET_NO); + m = (struct GetZeroAnonymityMessage *) &qe[1]; + m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); + m->header.size = htons (sizeof (struct GetZeroAnonymityMessage)); + m->type = htonl ((uint32_t) type); + m->offset = GNUNET_htonll (offset); process_queue (h); return qe; } /** - * Function called to trigger obtaining the next result - * from the datastore. - * + * Get a result for a particular key from the datastore. The processor + * will only be called once. + * * @param h handle to the datastore - * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort - * iteration (with a final call to "iter" with key/data == NULL). + * @param offset offset of the result (modulo num-results); set to + * a random 64-bit value initially; then increment by + * one each time; detect that all results have been found by uid + * being again the first uid ever returned. + * @param key maybe NULL (to match all entries) + * @param type desired type, 0 for any + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response + * @param proc function to call on each matching value; + * will be called once with a NULL value at the end + * @param proc_cls closure for proc + * @return NULL if the entry was not queued, otherwise a handle that can be used to + * cancel */ -void -GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, - int more) +struct GNUNET_DATASTORE_QueueEntry * +GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, + const struct GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { - struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; - struct ResultContext *rc = qe->client_ctx; - - GNUNET_assert (NULL != qe); - GNUNET_assert (&process_result_message == qe->response_proc); - if (GNUNET_YES == more) - { - GNUNET_CLIENT_receive (h->client, - qe->response_proc, - qe, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); - return; - } - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - rc->iter (rc->iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_free (rc); + struct GNUNET_DATASTORE_QueueEntry *qe; + struct GetMessage *gm; + union QueueContext qc; + + GNUNET_assert (NULL != proc); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to look for data of type %u under key `%s'\n", + (unsigned int) type, GNUNET_h2s (key)); + qc.rc.proc = proc; + qc.rc.proc_cls = proc_cls; + qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority, + max_queue_size, timeout, &process_result_message, &qc); + if (qe == NULL) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n", + GNUNET_h2s (key)); + return NULL; + } + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"), + 1, GNUNET_NO); + gm = (struct GetMessage *) &qe[1]; + gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET); + gm->type = htonl (type); + gm->offset = GNUNET_htonll (offset); + if (key != NULL) + { + gm->header.size = htons (sizeof (struct GetMessage)); + gm->key = *key; + } + else + { + gm->header.size = + htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode)); + } + process_queue (h); + return qe; } /** * Cancel a datastore operation. The final callback from the * operation must not have been done yet. - * + * * @param qe operation to cancel */ void GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) { struct GNUNET_DATASTORE_Handle *h; - int reconnect; + GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); h = qe->h; - reconnect = qe->was_transmitted; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Pending DATASTORE request %p cancelled (%d, %d)\n", qe, + qe->was_transmitted, h->queue_head == qe); + if (GNUNET_YES == qe->was_transmitted) + { + free_queue_entry (qe); + h->skip_next_messages++; + return; + } free_queue_entry (qe); - h->queue_size--; - if (reconnect) - { - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - } + process_queue (h); }