X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdatastore%2Fdatastore_api.c;h=f8826ce667bd7762423e4b9c2fdcafe031460ee3;hb=0514c61f6b185d3a7ae98309eca53d83de685ebe;hp=07e48931e61c0c5a1bb375f9268c63ae65a6d635;hpb=c14b2dc0cb0c501851b8e0091b5114701df34089;p=oweals%2Fgnunet.git diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 07e48931e..f8826ce66 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -34,7 +34,7 @@ /** * If a client stopped asking for more results, how many more do * we receive from the DB before killing the connection? Trade-off - * between re-doing TCP handshakes and (needlessly) receiving + * between re-doing TCP handshakes and (needlessly) receiving * useless results. */ #define MAX_EXCESS_RESULTS 8 @@ -82,7 +82,7 @@ union QueueContext { struct StatusContext sc; - + struct ResultContext rc; }; @@ -121,7 +121,7 @@ struct GNUNET_DATASTORE_QueueEntry * Function to call after transmission of the request. */ GNUNET_DATASTORE_ContinuationWithStatus cont; - + /** * Closure for 'cont'. */ @@ -163,15 +163,15 @@ struct GNUNET_DATASTORE_QueueEntry /** * Has this message been transmitted to the service? * Only ever GNUNET_YES for the head of the queue. - * Note that the overall struct should end at a + * Note that the overall struct should end at a * multiple of 64 bits. */ int was_transmitted; - + }; /** - * Handle to the datastore service. + * Handle to the datastore service. */ struct GNUNET_DATASTORE_Handle { @@ -250,22 +250,19 @@ struct GNUNET_DATASTORE_Handle * @return handle to use to access the service */ struct GNUNET_DATASTORE_Handle * -GNUNET_DATASTORE_connect (const struct - GNUNET_CONFIGURATION_Handle - *cfg) +GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) { struct GNUNET_CLIENT_Connection *c; struct GNUNET_DATASTORE_Handle *h; - + c = GNUNET_CLIENT_connect ("datastore", cfg); if (c == NULL) - return NULL; /* oops */ - h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + - GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); + return NULL; /* oops */ + h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) + + GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); h->client = c; h->cfg = cfg; - h->stats = GNUNET_STATISTICS_create ("datastore-api", - cfg); + h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); return h; } @@ -279,26 +276,24 @@ GNUNET_DATASTORE_connect (const struct * @return number of bytes written to buf */ static size_t -transmit_drop (void *cls, - size_t size, - void *buf) +transmit_drop (void *cls, size_t size, void *buf) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_MessageHeader *hdr; - + if (buf == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to drop database.\n")); - GNUNET_DATASTORE_disconnect (h, GNUNET_NO); - return 0; - } - GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader)); + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to transmit request to drop database.\n")); + GNUNET_DATASTORE_disconnect (h, GNUNET_NO); + return 0; + } + GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); hdr = buf; - hdr->size = htons(sizeof(struct GNUNET_MessageHeader)); - hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP); + hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); + hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP); GNUNET_DATASTORE_disconnect (h, GNUNET_NO); - return sizeof(struct GNUNET_MessageHeader); + return sizeof (struct GNUNET_MessageHeader); } @@ -310,51 +305,51 @@ transmit_drop (void *cls, * @param drop set to GNUNET_YES to delete all data in datastore (!) */ void -GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, - int drop) +GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop) { struct GNUNET_DATASTORE_QueueEntry *qe; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n"); +#endif + if (NULL != h->th) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); + h->th = NULL; + } if (h->client != NULL) - { - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = NULL; - } + { + GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + h->client = NULL; + } if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->reconnect_task); - h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } + { + GNUNET_SCHEDULER_cancel (h->reconnect_task); + h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; + } while (NULL != (qe = h->queue_head)) + { + GNUNET_assert (NULL != qe->response_proc); + qe->response_proc (h, NULL); + } + if (GNUNET_YES == drop) + { + h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); + if (h->client != NULL) { - GNUNET_assert (NULL != qe->response_proc); - qe->response_proc (h, NULL); - } - if (GNUNET_YES == drop) - { - h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); - if (h->client != NULL) - { - if (NULL != - GNUNET_CLIENT_notify_transmit_ready (h->client, - sizeof(struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_MINUTES, - GNUNET_YES, - &transmit_drop, - h)) - return; - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); - h->client = NULL; - } - GNUNET_break (0); + if (NULL != + GNUNET_CLIENT_notify_transmit_ready (h->client, + sizeof (struct + GNUNET_MessageHeader), + GNUNET_TIME_UNIT_MINUTES, + GNUNET_YES, &transmit_drop, h)) + return; + GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + h->client = NULL; } - GNUNET_STATISTICS_destroy (h->stats, - GNUNET_NO); + GNUNET_break (0); + } + GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO); h->stats = NULL; GNUNET_free (h); } @@ -367,17 +362,19 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, * @param tc scheduler context */ static void -timeout_queue_entry (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_DATASTORE_QueueEntry *qe = cls; GNUNET_STATISTICS_update (qe->h->stats, - gettext_noop ("# queue entry timeouts"), - 1, - GNUNET_NO); + gettext_noop ("# queue entry timeouts"), 1, + GNUNET_NO); qe->task = GNUNET_SCHEDULER_NO_TASK; GNUNET_assert (qe->was_transmitted == GNUNET_NO); +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout of request in datastore queue\n"); +#endif qe->response_proc (qe->h, NULL); } @@ -394,16 +391,14 @@ timeout_queue_entry (void *cls, * @param timeout timeout for the operation * @param response_proc function to call with replies (can be NULL) * @param qc client context (NOT a closure for response_proc) - * @return NULL if the queue is full + * @return NULL if the queue is full */ static struct GNUNET_DATASTORE_QueueEntry * -make_queue_entry (struct GNUNET_DATASTORE_Handle *h, - size_t msize, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_CLIENT_MessageHandler response_proc, - const union QueueContext *qc) +make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize, + unsigned int queue_priority, unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_CLIENT_MessageHandler response_proc, + const union QueueContext *qc) { struct GNUNET_DATASTORE_QueueEntry *ret; struct GNUNET_DATASTORE_QueueEntry *pos; @@ -411,21 +406,18 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, c = 0; pos = h->queue_head; - while ( (pos != NULL) && - (c < max_queue_size) && - (pos->priority >= queue_priority) ) - { - c++; - pos = pos->next; - } + while ((pos != NULL) && (c < max_queue_size) && + (pos->priority >= queue_priority)) + { + c++; + pos = pos->next; + } if (c >= max_queue_size) - { - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# queue overflows"), - 1, - GNUNET_NO); - return NULL; - } + { + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1, + GNUNET_NO); + return NULL; + } ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); ret->h = h; ret->response_proc = response_proc; @@ -436,51 +428,48 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, ret->message_size = msize; ret->was_transmitted = GNUNET_NO; if (pos == NULL) - { - /* append at the tail */ - pos = h->queue_tail; - } + { + /* append at the tail */ + pos = h->queue_tail; + } else - { - pos = pos->prev; - /* do not insert at HEAD if HEAD query was already - transmitted and we are still receiving replies! */ - if ( (pos == NULL) && - (h->queue_head->was_transmitted) ) - pos = h->queue_head; - } + { + pos = pos->prev; + /* do not insert at HEAD if HEAD query was already + * transmitted and we are still receiving replies! */ + if ((pos == NULL) && (h->queue_head->was_transmitted)) + pos = h->queue_head; + } c++; - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# queue entries created"), - 1, - GNUNET_NO); - GNUNET_CONTAINER_DLL_insert_after (h->queue_head, - h->queue_tail, - pos, - ret); + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"), + 1, GNUNET_NO); + GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret); h->queue_size++; - ret->task = GNUNET_SCHEDULER_add_delayed (timeout, - &timeout_queue_entry, - ret); + ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret); pos = ret->next; - while (pos != NULL) + while (pos != NULL) + { + if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO)) { - if (pos->max_queue < h->queue_size) - { - GNUNET_assert (pos->response_proc != NULL); - /* move 'pos' element to head so that it will be - killed on 'NULL' call below */ - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - pos); - GNUNET_CONTAINER_DLL_insert (h->queue_head, - h->queue_tail, - pos); - pos->response_proc (h, NULL); - break; - } - pos = pos->next; + GNUNET_assert (pos->response_proc != NULL); + /* move 'pos' element to head so that it will be + * killed on 'NULL' call below */ +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropping request from datastore queue\n"); +#endif + GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos); + GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos); + GNUNET_STATISTICS_update (h->stats, + gettext_noop + ("# Requests dropped from datastore queue"), 1, + GNUNET_NO); + GNUNET_assert (h->queue_head == pos); + pos->response_proc (h, NULL); + break; } + pos = pos->next; + } return ret; } @@ -488,7 +477,7 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, /** * Process entries in the queue (or do nothing if we are already * doing so). - * + * * @param h handle to the datastore */ static void @@ -502,8 +491,7 @@ process_queue (struct GNUNET_DATASTORE_Handle *h); * @param tc scheduler context */ static void -try_reconnect (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_DATASTORE_Handle *h = cls; @@ -516,18 +504,17 @@ try_reconnect (void *cls, h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; h->client = GNUNET_CLIENT_connect ("datastore", h->cfg); if (h->client == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "DATASTORE reconnect failed (fatally)\n"); - return; - } + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "DATASTORE reconnect failed (fatally)\n"); + return; + } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# datastore connections (re)created"), - 1, - GNUNET_NO); + gettext_noop + ("# datastore connections (re)created"), 1, + GNUNET_NO); #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Reconnected to DATASTORE\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); #endif process_queue (h); } @@ -543,25 +530,55 @@ static void do_disconnect (struct GNUNET_DATASTORE_Handle *h) { if (h->client == NULL) - { + { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "client NULL in disconnect, will not try to reconnect\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "client NULL in disconnect, will not try to reconnect\n"); #endif - return; - } + return; + } #if 0 - GNUNET_STATISTICS_update (stats, - gettext_noop ("# reconnected to DATASTORE"), - 1, - GNUNET_NO); + GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"), + 1, GNUNET_NO); #endif GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); h->skip_next_messages = 0; h->client = NULL; - h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time, - &try_reconnect, - h); + h->reconnect_task = + GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h); +} + + +/** + * Function called whenever we receive a message from + * the service. Calls the appropriate handler. + * + * @param cls the 'struct GNUNET_DATASTORE_Handle' + * @param msg the received message + */ +static void +receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + + h->in_receive = GNUNET_NO; +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n"); +#endif + if (h->skip_next_messages > 0) + { + h->skip_next_messages--; + process_queue (h); + return; + } + if (NULL == (qe = h->queue_head)) + { + GNUNET_break (0); + process_queue (h); + return; + } + qe->response_proc (h, msg); } @@ -574,9 +591,7 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h) * @return number of bytes written to buf */ static size_t -transmit_request (void *cls, - size_t size, - void *buf) +transmit_request (void *cls, size_t size, void *buf) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; @@ -584,27 +599,25 @@ transmit_request (void *cls, h->th = NULL; if (NULL == (qe = h->queue_head)) - return 0; /* no entry in queue */ + return 0; /* no entry in queue */ if (buf == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to transmit request to DATASTORE.\n")); - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# transmission request failures"), - 1, - GNUNET_NO); - do_disconnect (h); - return 0; - } + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to transmit request to DATASTORE.\n")); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# transmission request failures"), + 1, GNUNET_NO); + do_disconnect (h); + return 0; + } if (size < (msize = qe->message_size)) - { - process_queue (h); - return 0; - } - #if DEBUG_DATASTORE + { + process_queue (h); + return 0; + } +#if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u byte request to DATASTORE\n", - msize); + "Transmitting %u byte request to DATASTORE\n", msize); #endif memcpy (buf, &qe[1], msize); qe->was_transmitted = GNUNET_YES; @@ -612,14 +625,11 @@ transmit_request (void *cls, qe->task = GNUNET_SCHEDULER_NO_TASK; GNUNET_assert (GNUNET_NO == h->in_receive); h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, - qe->response_proc, - h, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); + GNUNET_CLIENT_receive (h->client, &receive_cb, h, + GNUNET_TIME_absolute_get_remaining (qe->timeout)); GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# bytes sent to datastore"), - 1, - GNUNET_NO); + gettext_noop ("# bytes sent to datastore"), 1, + GNUNET_NO); return msize; } @@ -627,7 +637,7 @@ transmit_request (void *cls, /** * Process entries in the queue (or do nothing if we are already * doing so). - * + * * @param h handle to the datastore */ static void @@ -636,53 +646,47 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) struct GNUNET_DATASTORE_QueueEntry *qe; if (NULL == (qe = h->queue_head)) - { + { #if DEBUG_DATASTORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queue empty\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n"); #endif - return; /* no entry in queue */ - } + return; /* no entry in queue */ + } if (qe->was_transmitted == GNUNET_YES) - { + { #if DEBUG_DATASTORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Head request already transmitted\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n"); #endif - return; /* waiting for replies */ - } + return; /* waiting for replies */ + } if (h->th != NULL) - { + { #if DEBUG_DATASTORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Pending transmission request\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n"); #endif - return; /* request pending */ - } + return; /* request pending */ + } if (h->client == NULL) - { + { #if DEBUG_DATASTORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not connected\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n"); #endif - return; /* waiting for reconnect */ - } + return; /* waiting for reconnect */ + } if (GNUNET_YES == h->in_receive) - { - /* wait for response to previous query */ - return; - } + { + /* wait for response to previous query */ + return; + } #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queueing %u byte request to DATASTORE\n", - qe->message_size); + "Queueing %u byte request to DATASTORE\n", qe->message_size); #endif - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, - qe->message_size, - GNUNET_TIME_absolute_get_remaining (qe->timeout), - GNUNET_YES, - &transmit_request, - h); + h->th = + GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, + GNUNET_TIME_absolute_get_remaining + (qe->timeout), GNUNET_YES, + &transmit_request, h); GNUNET_assert (GNUNET_NO == h->in_receive); GNUNET_break (NULL != h->th); } @@ -692,7 +696,7 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) * Dummy continuation used to do nothing (but be non-zero). * * @param cls closure - * @param result result + * @param result result * @param emsg error message */ static void @@ -706,7 +710,7 @@ drop_status_cont (void *cls, int32_t result, const char *emsg) * Free a queue entry. Removes the given entry from the * queue and releases associated resources. Does NOT * call the callback. - * + * * @param qe entry to free. */ static void @@ -714,16 +718,14 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) { struct GNUNET_DATASTORE_Handle *h = qe->h; - GNUNET_CONTAINER_DLL_remove (h->queue_head, - h->queue_tail, - qe); + GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe); if (qe->task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (qe->task); - qe->task = GNUNET_SCHEDULER_NO_TASK; - } + { + GNUNET_SCHEDULER_cancel (qe->task); + qe->task = GNUNET_SCHEDULER_NO_TASK; + } h->queue_size--; - qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ + qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */ GNUNET_free (qe); } @@ -735,10 +737,8 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) * @param cls closure * @param msg message received, NULL on timeout or fatal error */ -static void -process_status_message (void *cls, - const struct - GNUNET_MessageHeader * msg) +static void +process_status_message (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; @@ -748,84 +748,67 @@ process_status_message (void *cls, int32_t status; int was_transmitted; - h->in_receive = GNUNET_NO; - if (h->skip_next_messages > 0) - { - h->skip_next_messages--; - process_queue (h); - return; - } if (NULL == (qe = h->queue_head)) - { - GNUNET_break (0); - do_disconnect (h); - return; - } + { + GNUNET_break (0); + do_disconnect (h); + return; + } rc = qe->qc.sc; if (msg == NULL) - { - was_transmitted = qe->was_transmitted; - free_queue_entry (qe); - if (NULL == h->client) - return; /* forced disconnect */ - if (rc.cont != NULL) - rc.cont (rc.cont_cls, - GNUNET_SYSERR, - _("Failed to receive status response from database.")); - if (was_transmitted == GNUNET_YES) - do_disconnect (h); - else - process_queue (h); - return; - } + { + was_transmitted = qe->was_transmitted; + free_queue_entry (qe); + if (was_transmitted == GNUNET_YES) + do_disconnect (h); + else + process_queue (h); + if (rc.cont != NULL) + rc.cont (rc.cont_cls, GNUNET_SYSERR, + _("Failed to receive status response from database.")); + return; + } GNUNET_assert (GNUNET_YES == qe->was_transmitted); free_queue_entry (qe); - if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || - (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) - { - GNUNET_break (0); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - if (rc.cont != NULL) - rc.cont (rc.cont_cls, - GNUNET_SYSERR, - _("Error reading response from datastore service")); - return; - } - sm = (const struct StatusMessage*) msg; - status = ntohl(sm->status); + if ((ntohs (msg->size) < sizeof (struct StatusMessage)) || + (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS)) + { + GNUNET_break (0); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + if (rc.cont != NULL) + rc.cont (rc.cont_cls, GNUNET_SYSERR, + _("Error reading response from datastore service")); + return; + } + sm = (const struct StatusMessage *) msg; + status = ntohl (sm->status); emsg = NULL; - if (ntohs(msg->size) > sizeof(struct StatusMessage)) - { - emsg = (const char*) &sm[1]; - if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0') - { - GNUNET_break (0); - emsg = _("Invalid error message received from datastore service"); - } - } - if ( (status == GNUNET_SYSERR) && - (emsg == NULL) ) + if (ntohs (msg->size) > sizeof (struct StatusMessage)) + { + emsg = (const char *) &sm[1]; + if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0') { GNUNET_break (0); emsg = _("Invalid error message received from datastore service"); } + } + if ((status == GNUNET_SYSERR) && (emsg == NULL)) + { + GNUNET_break (0); + emsg = _("Invalid error message received from datastore service"); + } #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received status %d/%s\n", - (int) status, - emsg); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, + emsg); #endif GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# status messages received"), - 1, - GNUNET_NO); + gettext_noop ("# status messages received"), 1, + GNUNET_NO); h->retry_time.rel_value = 0; process_queue (h); if (rc.cont != NULL) - rc.cont (rc.cont_cls, - status, - emsg); + rc.cont (rc.cont_cls, status, emsg); } @@ -856,21 +839,16 @@ process_status_message (void *cls, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, - uint32_t rid, - const GNUNET_HashCode * key, - size_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - uint32_t replication, +GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid, + const GNUNET_HashCode * key, size_t size, + const void *data, enum GNUNET_BLOCK_Type type, + uint32_t priority, uint32_t anonymity, + uint32_t replication, struct GNUNET_TIME_Absolute expiration, - unsigned int queue_priority, - unsigned int max_queue_size, + unsigned int queue_priority, unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct DataMessage *dm; @@ -879,42 +857,38 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to put %u bytes of data under key `%s' for %llu ms\n", - size, - GNUNET_h2s (key), - GNUNET_TIME_absolute_get_remaining (expiration).rel_value); + "Asked to put %u bytes of data under key `%s' for %llu ms\n", + size, GNUNET_h2s (key), + GNUNET_TIME_absolute_get_remaining (expiration).rel_value); #endif - msize = sizeof(struct DataMessage) + size; + msize = sizeof (struct DataMessage) + size; GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, msize, - queue_priority, max_queue_size, timeout, - &process_status_message, &qc); + qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, + &process_status_message, &qc); if (qe == NULL) - { + { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for PUT\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for PUT\n"); #endif - return NULL; - } - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# PUT requests executed"), - 1, - GNUNET_NO); - dm = (struct DataMessage* ) &qe[1]; - dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); - dm->header.size = htons(msize); - dm->rid = htonl(rid); - dm->size = htonl( (uint32_t) size); - dm->type = htonl(type); - dm->priority = htonl(priority); - dm->anonymity = htonl(anonymity); + return NULL; + } + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"), + 1, GNUNET_NO); + dm = (struct DataMessage *) &qe[1]; + dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT); + dm->header.size = htons (msize); + dm->rid = htonl (rid); + dm->size = htonl ((uint32_t) size); + dm->type = htonl (type); + dm->priority = htonl (priority); + dm->anonymity = htonl (anonymity); dm->replication = htonl (replication); dm->reserved = htonl (0); - dm->uid = GNUNET_htonll(0); - dm->expiration = GNUNET_TIME_absolute_hton(expiration); + dm->uid = GNUNET_htonll (0); + dm->expiration = GNUNET_TIME_absolute_hton (expiration); dm->key = *key; memcpy (&dm[1], data, size); process_queue (h); @@ -942,14 +916,12 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, - uint64_t amount, - uint32_t entries, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) +GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, + uint32_t entries, unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct ReserveMessage *rm; @@ -959,32 +931,29 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, cont = &drop_status_cont; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to reserve %llu bytes of data and %u entries'\n", - (unsigned long long) amount, - (unsigned int) entries); + "Asked to reserve %llu bytes of data and %u entries\n", + (unsigned long long) amount, (unsigned int) entries); #endif qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof(struct ReserveMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, &qc); + qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority, + max_queue_size, timeout, &process_status_message, &qc); if (qe == NULL) - { + { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry to reserve\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry to reserve\n"); #endif - return NULL; - } + return NULL; + } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# RESERVE requests executed"), - 1, - GNUNET_NO); - rm = (struct ReserveMessage*) &qe[1]; - rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); - rm->header.size = htons(sizeof (struct ReserveMessage)); - rm->entries = htonl(entries); - rm->amount = GNUNET_htonll(amount); + gettext_noop ("# RESERVE requests executed"), 1, + GNUNET_NO); + rm = (struct ReserveMessage *) &qe[1]; + rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); + rm->header.size = htons (sizeof (struct ReserveMessage)); + rm->entries = htonl (entries); + rm->amount = GNUNET_htonll (amount); process_queue (h); return qe; } @@ -1013,12 +982,11 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, - uint32_t rid, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) + uint32_t rid, unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct ReleaseReserveMessage *rrm; @@ -1027,31 +995,29 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, if (cont == NULL) cont = &drop_status_cont; #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to release reserve %d\n", - rid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid); #endif qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, &qc); + qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage), + queue_priority, max_queue_size, timeout, + &process_status_message, &qc); if (qe == NULL) - { + { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry to release reserve\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry to release reserve\n"); #endif - return NULL; - } + return NULL; + } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# RELEASE RESERVE requests executed"), - 1, - GNUNET_NO); - rrm = (struct ReleaseReserveMessage*) &qe[1]; - rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); - rrm->header.size = htons(sizeof (struct ReleaseReserveMessage)); - rrm->rid = htonl(rid); + gettext_noop + ("# RELEASE RESERVE requests executed"), 1, + GNUNET_NO); + rrm = (struct ReleaseReserveMessage *) &qe[1]; + rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); + rrm->header.size = htons (sizeof (struct ReleaseReserveMessage)); + rrm->rid = htonl (rid); process_queue (h); return qe; } @@ -1075,15 +1041,14 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, - uint64_t uid, - uint32_t priority, - struct GNUNET_TIME_Absolute expiration, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) +GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, + uint32_t priority, + struct GNUNET_TIME_Absolute expiration, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct UpdateMessage *um; @@ -1093,34 +1058,31 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, cont = &drop_status_cont; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to update entry %llu raising priority by %u and expiration to %llu\n", - uid, - (unsigned int) priority, - (unsigned long long) expiration.abs_value); + "Asked to update entry %llu raising priority by %u and expiration to %llu\n", + uid, (unsigned int) priority, + (unsigned long long) expiration.abs_value); #endif qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof(struct UpdateMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, &qc); + qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority, + max_queue_size, timeout, &process_status_message, &qc); if (qe == NULL) - { + { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for UPDATE\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for UPDATE\n"); #endif - return NULL; - } + return NULL; + } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# UPDATE requests executed"), - 1, - GNUNET_NO); - um = (struct UpdateMessage*) &qe[1]; - um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); - um->header.size = htons(sizeof (struct UpdateMessage)); - um->priority = htonl(priority); - um->expiration = GNUNET_TIME_absolute_hton(expiration); - um->uid = GNUNET_htonll(uid); + gettext_noop ("# UPDATE requests executed"), 1, + GNUNET_NO); + um = (struct UpdateMessage *) &qe[1]; + um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); + um->header.size = htons (sizeof (struct UpdateMessage)); + um->priority = htonl (priority); + um->expiration = GNUNET_TIME_absolute_hton (expiration); + um->uid = GNUNET_htonll (uid); process_queue (h); return qe; } @@ -1149,14 +1111,12 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, - const GNUNET_HashCode *key, - size_t size, - const void *data, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls) + const GNUNET_HashCode * key, size_t size, + const void *data, unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct DataMessage *dm; @@ -1167,39 +1127,36 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, cont = &drop_status_cont; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to remove %u bytes under key `%s'\n", - size, - GNUNET_h2s (key)); + "Asked to remove %u bytes under key `%s'\n", size, + GNUNET_h2s (key)); #endif qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - msize = sizeof(struct DataMessage) + size; + msize = sizeof (struct DataMessage) + size; GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - qe = make_queue_entry (h, msize, - queue_priority, max_queue_size, timeout, - &process_status_message, &qc); + qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, + &process_status_message, &qc); if (qe == NULL) - { + { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for REMOVE\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for REMOVE\n"); #endif - return NULL; - } + return NULL; + } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# REMOVE requests executed"), - 1, - GNUNET_NO); - dm = (struct DataMessage*) &qe[1]; - dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); - dm->header.size = htons(msize); - dm->rid = htonl(0); - dm->size = htonl(size); - dm->type = htonl(0); - dm->priority = htonl(0); - dm->anonymity = htonl(0); - dm->uid = GNUNET_htonll(0); - dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS); + gettext_noop ("# REMOVE requests executed"), 1, + GNUNET_NO); + dm = (struct DataMessage *) &qe[1]; + dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); + dm->header.size = htons (msize); + dm->rid = htonl (0); + dm->size = htonl (size); + dm->type = htonl (0); + dm->priority = htonl (0); + dm->anonymity = htonl (0); + dm->uid = GNUNET_htonll (0); + dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS); dm->key = *key; memcpy (&dm[1], data, size); process_queue (h); @@ -1214,106 +1171,103 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, * @param cls closure * @param msg message received, NULL on timeout or fatal error */ -static void -process_result_message (void *cls, - const struct GNUNET_MessageHeader *msg) +static void +process_result_message (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_DATASTORE_QueueEntry *qe; struct ResultContext rc; const struct DataMessage *dm; + int was_transmitted; - h->in_receive = GNUNET_NO; - if (h->skip_next_messages > 0) - { - h->skip_next_messages--; - process_queue (h); - return; - } if (msg == NULL) + { + qe = h->queue_head; + GNUNET_assert (NULL != qe); + rc = qe->qc.rc; + was_transmitted = qe->was_transmitted; + free_queue_entry (qe); + if (was_transmitted == GNUNET_YES) { - qe = h->queue_head; - GNUNET_assert (NULL != qe); - if (qe->was_transmitted == GNUNET_YES) - { - rc = qe->qc.rc; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to receive response from database.\n")); - do_disconnect (h); - free_queue_entry (qe); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - } - else - process_queue (h); - return; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to receive response from database.\n")); + do_disconnect (h); } - if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) + else { - GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); - qe = h->queue_head; - rc = qe->qc.rc; - GNUNET_assert (GNUNET_YES == qe->was_transmitted); - free_queue_entry (qe); -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received end of result set, new queue size is %u\n", - h->queue_size); -#endif - if (rc.proc != NULL) - rc.proc (rc.proc_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - h->retry_time.rel_value = 0; - h->result_count = 0; process_queue (h); - return; } + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + return; + } + if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) + { + GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)); + qe = h->queue_head; + rc = qe->qc.rc; + GNUNET_assert (GNUNET_YES == qe->was_transmitted); + free_queue_entry (qe); +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received end of result set, new queue size is %u\n", + h->queue_size); +#endif + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + h->retry_time.rel_value = 0; + h->result_count = 0; + process_queue (h); + return; + } qe = h->queue_head; + GNUNET_assert (NULL != qe); rc = qe->qc.rc; - GNUNET_assert (GNUNET_YES == qe->was_transmitted); - if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || - (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || - (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) - { - GNUNET_break (0); - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - if (rc.proc != NULL) - rc.proc (rc.proc_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - return; - } - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# Results received"), - 1, - GNUNET_NO); - dm = (const struct DataMessage*) msg; + if (GNUNET_YES != qe->was_transmitted) + { + GNUNET_break (0); + free_queue_entry (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + return; + } + if ((ntohs (msg->size) < sizeof (struct DataMessage)) || + (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || + (ntohs (msg->size) != + sizeof (struct DataMessage) + + ntohl (((const struct DataMessage *) msg)->size))) + { + GNUNET_break (0); + free_queue_entry (qe); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + if (rc.proc != NULL) + rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, + 0); + return; + } + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1, + GNUNET_NO); + dm = (const struct DataMessage *) msg; #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received result %llu with type %u and size %u with key %s\n", - (unsigned long long) GNUNET_ntohll(dm->uid), - ntohl(dm->type), - ntohl(dm->size), - GNUNET_h2s(&dm->key)); + "Received result %llu with type %u and size %u with key %s\n", + (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type), + ntohl (dm->size), GNUNET_h2s (&dm->key)); #endif free_queue_entry (qe); h->retry_time.rel_value = 0; process_queue (h); if (rc.proc != NULL) - rc.proc (rc.proc_cls, - &dm->key, - ntohl(dm->size), - &dm[1], - ntohl(dm->type), - ntohl(dm->priority), - ntohl(dm->anonymity), - GNUNET_TIME_absolute_ntoh(dm->expiration), - GNUNET_ntohll(dm->uid)); + rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type), + ntohl (dm->priority), ntohl (dm->anonymity), + GNUNET_TIME_absolute_ntoh (dm->expiration), + GNUNET_ntohll (dm->uid)); } @@ -1338,11 +1292,11 @@ process_result_message (void *cls, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_DatumProcessor proc, - void *proc_cls) + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct GNUNET_MessageHeader *m; @@ -1351,29 +1305,29 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, GNUNET_assert (NULL != proc); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get replication entry in %llu ms\n", - (unsigned long long) timeout.rel_value); + "Asked to get replication entry in %llu ms\n", + (unsigned long long) timeout.rel_value); #endif qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; - qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader), - queue_priority, max_queue_size, timeout, - &process_result_message, &qc); + qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader), + queue_priority, max_queue_size, timeout, + &process_result_message, &qc); if (qe == NULL) - { + { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for GET REPLICATION\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for GET REPLICATION\n"); #endif - return NULL; - } + return NULL; + } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# GET REPLICATION requests executed"), - 1, - GNUNET_NO); - m = (struct GNUNET_MessageHeader*) &qe[1]; - m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); - m->size = htons(sizeof (struct GNUNET_MessageHeader)); + gettext_noop + ("# GET REPLICATION requests executed"), 1, + GNUNET_NO); + m = (struct GNUNET_MessageHeader *) &qe[1]; + m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); + m->size = htons (sizeof (struct GNUNET_MessageHeader)); process_queue (h); return qe; } @@ -1383,7 +1337,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, * Get a single zero-anonymity value from the datastore. * * @param h handle to the datastore - * @param offset offset of the result (mod #num-results); set to + * @param offset offset of the result (modulo num-results); set to * a random 64-bit value initially; then increment by * one each time; detect that all results have been found by uid * being again the first uid ever returned. @@ -1401,13 +1355,13 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, - uint64_t offset, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - enum GNUNET_BLOCK_Type type, - GNUNET_DATASTORE_DatumProcessor proc, - void *proc_cls) + uint64_t offset, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + enum GNUNET_BLOCK_Type type, + GNUNET_DATASTORE_DatumProcessor proc, + void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct GetZeroAnonymityMessage *m; @@ -1417,31 +1371,30 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", - (unsigned long long) offset, - type, - (unsigned long long) timeout.rel_value); + "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", + (unsigned long long) offset, type, + (unsigned long long) timeout.rel_value); #endif qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; - qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage), - queue_priority, max_queue_size, timeout, - &process_result_message, &qc); + qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage), + queue_priority, max_queue_size, timeout, + &process_result_message, &qc); if (qe == NULL) - { + { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not create queue entry for zero-anonymity procation\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for zero-anonymity procation\n"); #endif - return NULL; - } + return NULL; + } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# GET ZERO ANONYMITY requests executed"), - 1, - GNUNET_NO); - m = (struct GetZeroAnonymityMessage*) &qe[1]; - m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); - m->header.size = htons(sizeof (struct GetZeroAnonymityMessage)); + gettext_noop + ("# GET ZERO ANONYMITY requests executed"), 1, + GNUNET_NO); + m = (struct GetZeroAnonymityMessage *) &qe[1]; + m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); + m->header.size = htons (sizeof (struct GetZeroAnonymityMessage)); m->type = htonl ((uint32_t) type); m->offset = GNUNET_htonll (offset); process_queue (h); @@ -1454,7 +1407,7 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, * will only be called once. * * @param h handle to the datastore - * @param offset offset of the result (mod #num-results); set to + * @param offset offset of the result (modulo num-results); set to * a random 64-bit value initially; then increment by * one each time; detect that all results have been found by uid * being again the first uid ever returned. @@ -1471,15 +1424,13 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, * cancel */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, - uint64_t offset, - const GNUNET_HashCode * key, - enum GNUNET_BLOCK_Type type, - unsigned int queue_priority, - unsigned int max_queue_size, - struct GNUNET_TIME_Relative timeout, - GNUNET_DATASTORE_DatumProcessor proc, - void *proc_cls) +GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, + const GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls) { struct GNUNET_DATASTORE_QueueEntry *qe; struct GetMessage *gm; @@ -1488,41 +1439,37 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, GNUNET_assert (NULL != proc); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asked to look for data of type %u under key `%s'\n", - (unsigned int) type, - GNUNET_h2s (key)); + "Asked to look for data of type %u under key `%s'\n", + (unsigned int) type, GNUNET_h2s (key)); #endif qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; - qe = make_queue_entry (h, sizeof(struct GetMessage), - queue_priority, max_queue_size, timeout, - &process_result_message, &qc); + qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority, + max_queue_size, timeout, &process_result_message, &qc); if (qe == NULL) - { + { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not queue request for `%s'\n", - GNUNET_h2s (key)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n", + GNUNET_h2s (key)); #endif - return NULL; - } - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# GET requests executed"), - 1, - GNUNET_NO); - gm = (struct GetMessage*) &qe[1]; - gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); - gm->type = htonl(type); + return NULL; + } + GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"), + 1, GNUNET_NO); + gm = (struct GetMessage *) &qe[1]; + gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET); + gm->type = htonl (type); gm->offset = GNUNET_htonll (offset); if (key != NULL) - { - gm->header.size = htons(sizeof (struct GetMessage)); - gm->key = *key; - } + { + gm->header.size = htons (sizeof (struct GetMessage)); + gm->key = *key; + } else - { - gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode)); - } + { + gm->header.size = + htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode)); + } process_queue (h); return qe; } @@ -1531,7 +1478,7 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, /** * Cancel a datastore operation. The final callback from the * operation must not have been done yet. - * + * * @param qe operation to cancel */ void @@ -1542,18 +1489,16 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); h = qe->h; #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Pending DATASTORE request %p cancelled (%d, %d)\n", - qe, - qe->was_transmitted, - h->queue_head == qe); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Pending DATASTORE request %p cancelled (%d, %d)\n", qe, + qe->was_transmitted, h->queue_head == qe); #endif - if (GNUNET_YES == qe->was_transmitted) - { - free_queue_entry (qe); - h->skip_next_messages++; - return; - } + if (GNUNET_YES == qe->was_transmitted) + { + free_queue_entry (qe); + h->skip_next_messages++; + return; + } free_queue_entry (qe); process_queue (h); }