From bbf3c4e04289295df264539e1017f3778f45f97f Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 21 Apr 2011 15:14:03 +0000 Subject: [PATCH] fixes --- src/datastore/datastore.h | 2 +- src/datastore/datastore_api.c | 96 +++++++++++++++++++++++------------ 2 files changed, 64 insertions(+), 34 deletions(-) diff --git a/src/datastore/datastore.h b/src/datastore/datastore.h index 8fa0ca044..55ca7c8e5 100644 --- a/src/datastore/datastore.h +++ b/src/datastore/datastore.h @@ -27,7 +27,7 @@ #ifndef DATASTORE_H #define DATASTORE_H -#define DEBUG_DATASTORE GNUNET_NO +#define DEBUG_DATASTORE GNUNET_YES #include "gnunet_util_lib.h" diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index ef3736950..2bba2e8ee 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -187,7 +187,6 @@ struct GNUNET_DATASTORE_Handle */ const struct GNUNET_CONFIGURATION_Handle *cfg; - /** * Current connection to the datastore service. */ @@ -241,6 +240,12 @@ struct GNUNET_DATASTORE_Handle */ int in_receive; + /** + * We should either receive (and ignore) an 'END' message or force a + * disconnect for the next message from the service. + */ + unsigned int expect_end_or_disconnect; + }; @@ -600,7 +605,7 @@ transmit_request (void *cls, h->in_receive = GNUNET_YES; GNUNET_CLIENT_receive (h->client, qe->response_proc, - qe, + h, GNUNET_TIME_absolute_get_remaining (qe->timeout)); GNUNET_STATISTICS_update (h->stats, gettext_noop ("# bytes sent to datastore"), @@ -710,16 +715,23 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader * msg) { - struct GNUNET_DATASTORE_QueueEntry *qe = cls; - struct GNUNET_DATASTORE_Handle *h = qe->h; - struct StatusContext rc = qe->qc.sc; + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct StatusContext rc; const struct StatusMessage *sm; const char *emsg; int32_t status; int was_transmitted; h->in_receive = GNUNET_NO; + if (NULL == (qe = h->queue_head)) + { + GNUNET_break (0); + do_disconnect (h); + return; + } was_transmitted = qe->was_transmitted; + rc = qe->qc.sc; if (msg == NULL) { free_queue_entry (qe); @@ -734,7 +746,6 @@ process_status_message (void *cls, return; } GNUNET_assert (GNUNET_YES == qe->was_transmitted); - GNUNET_assert (h->queue_head == qe); free_queue_entry (qe); if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) @@ -1169,43 +1180,54 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, */ static void process_result_message (void *cls, - const struct GNUNET_MessageHeader * msg) + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_DATASTORE_QueueEntry *qe = cls; - struct GNUNET_DATASTORE_Handle *h = qe->h; - struct ResultContext rc = qe->qc.rc; + struct GNUNET_DATASTORE_Handle *h = cls; + struct GNUNET_DATASTORE_QueueEntry *qe; + struct ResultContext rc; const struct DataMessage *dm; int was_transmitted; h->in_receive = GNUNET_NO; if (msg == NULL) - { - was_transmitted = qe->was_transmitted; - free_queue_entry (qe); - if (was_transmitted == GNUNET_YES) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to receive response from database.\n")); - do_disconnect (h); - } - else + { + if (NULL != (qe = h->queue_head)) { + was_transmitted = qe->was_transmitted; + free_queue_entry (qe); + rc = qe->qc.rc; + if (was_transmitted == GNUNET_YES) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to receive response from database.\n")); + do_disconnect (h); + } + else + { #if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Request dropped due to finite datastore queue length.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Request dropped due to finite datastore queue length.\n"); #endif + } + if (rc.iter != NULL) + rc.iter (rc.iter_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); } - if (rc.iter != NULL) - rc.iter (rc.iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } - GNUNET_assert (GNUNET_YES == qe->was_transmitted); - GNUNET_assert (h->queue_head == qe); if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) { GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); + if (h->expect_end_or_disconnect > 0) + { + h->expect_end_or_disconnect--; + process_queue (h); + return; + } + qe = h->queue_head; + rc = qe->qc.rc; + GNUNET_assert (GNUNET_YES == qe->was_transmitted); free_queue_entry (qe); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1221,6 +1243,16 @@ process_result_message (void *cls, process_queue (h); return; } + if (h->expect_end_or_disconnect > 0) + { + /* only 'END' allowed, must reconnect */ + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + return; + } + qe = h->queue_head; + rc = qe->qc.rc; + GNUNET_assert (GNUNET_YES == qe->was_transmitted); if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) @@ -1500,11 +1532,10 @@ GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h) { struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; - GNUNET_assert (&process_result_message == qe->response_proc); h->in_receive = GNUNET_YES; GNUNET_CLIENT_receive (h->client, - qe->response_proc, - qe, + &process_result_message, + h, GNUNET_TIME_absolute_get_remaining (qe->timeout)); } @@ -1531,8 +1562,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) if (GNUNET_YES == qe->was_transmitted) { free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); + h->expect_end_or_disconnect++; return; } free_queue_entry (qe); -- 2.25.1