From 388921507b4f56b448ec05aca67d20128f348711 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 20 Oct 2010 21:04:54 +0000 Subject: [PATCH] stats and fixes --- src/datastore/Makefile.am | 1 + src/datastore/datastore_api.c | 161 +++++++++++++++++++++++++++++----- 2 files changed, 138 insertions(+), 24 deletions(-) diff --git a/src/datastore/Makefile.am b/src/datastore/Makefile.am index 55d972cd7..db382f1e4 100644 --- a/src/datastore/Makefile.am +++ b/src/datastore/Makefile.am @@ -18,6 +18,7 @@ lib_LTLIBRARIES = \ libgnunetdatastore_la_SOURCES = \ datastore_api.c datastore.h plugin_datastore.h libgnunetdatastore_la_LIBADD = \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ $(top_builddir)/src/util/libgnunetutil.la \ $(GN_LIBINTL) libgnunetdatastore_la_LDFLAGS = \ diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index abc8c7645..2837b0b8e 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -28,8 +28,16 @@ #include "gnunet_arm_service.h" #include "gnunet_constants.h" #include "gnunet_datastore_service.h" +#include "gnunet_statistics_service.h" #include "datastore.h" +/** + * If a client stopped asking for more results, how many more do + * we receive from the DB before killing the connection? Trade-off + * between re-doing TCP handshakes and (needlessly) receiving + * useless results. + */ +#define MAX_EXCESS_RESULTS 8 /** * Context for processing status messages. @@ -183,6 +191,11 @@ struct GNUNET_DATASTORE_Handle */ struct GNUNET_CLIENT_Connection *client; + /** + * Handle for statistics. + */ + struct GNUNET_STATISTICS_Handle *stats; + /** * Current transmit handle. */ @@ -214,6 +227,13 @@ struct GNUNET_DATASTORE_Handle */ unsigned int queue_size; + /** + * Number of results we're receiving for the current query + * after application stopped to care. Used to determine when + * to reset the connection. + */ + unsigned int result_count; + /** * Are we currently trying to receive from the service? */ @@ -249,6 +269,9 @@ GNUNET_DATASTORE_connect (const struct h->client = c; h->cfg = cfg; h->sched = sched; + h->stats = GNUNET_STATISTICS_create (sched, + "datastore-api", + cfg); return h; } @@ -330,6 +353,8 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, } GNUNET_break (0); } + GNUNET_STATISTICS_destroy (h->stats, + GNUNET_NO); GNUNET_free (h); } @@ -346,6 +371,10 @@ timeout_queue_entry (void *cls, { struct GNUNET_DATASTORE_QueueEntry *qe = cls; + GNUNET_STATISTICS_update (qe->h->stats, + gettext_noop ("# queue entry timeouts"), + 1, + GNUNET_NO); qe->task = GNUNET_SCHEDULER_NO_TASK; GNUNET_assert (qe->was_transmitted == GNUNET_NO); qe->response_proc (qe, NULL); @@ -412,6 +441,10 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, pos = h->queue_head; } c++; + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue entries created"), + 1, + GNUNET_NO); GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, @@ -419,6 +452,10 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, h->queue_size++; if (c > max_queue_size) { + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue overflows"), + 1, + GNUNET_NO); response_proc (ret, NULL); return NULL; } @@ -477,6 +514,10 @@ try_reconnect (void *cls, "DATASTORE reconnect failed (fatally)\n"); return; } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# datastore connections (re)created"), + 1, + GNUNET_NO); #if DEBUG_DATASTORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n"); @@ -541,6 +582,10 @@ transmit_request (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to transmit request to DATASTORE.\n")); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# transmission request failures"), + 1, + GNUNET_NO); do_disconnect (h); return 0; } @@ -564,6 +609,10 @@ transmit_request (void *cls, qe->response_proc, qe, GNUNET_TIME_absolute_get_remaining (qe->timeout)); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# bytes sent to datastore"), + 1, + GNUNET_NO); return msize; } @@ -731,6 +780,11 @@ process_status_message (void *cls, (int) status, emsg); #endif + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# status messages received"), + 1, + GNUNET_NO); + h->retry_time.value = 0; process_queue (h); if (rc.cont != NULL) rc.cont (rc.cont_cls, @@ -806,6 +860,10 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, #endif return NULL; } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# PUT requests executed"), + 1, + GNUNET_NO); dm = (struct DataMessage* ) &qe[1]; dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); dm->header.size = htons(msize); @@ -877,6 +935,10 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, #endif return NULL; } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# RESERVE requests executed"), + 1, + GNUNET_NO); rm = (struct ReserveMessage*) &qe[1]; rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); rm->header.size = htons(sizeof (struct ReserveMessage)); @@ -941,6 +1003,10 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, #endif return NULL; } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# RELEASE RESERVE requests executed"), + 1, + GNUNET_NO); rrm = (struct ReleaseReserveMessage*) &qe[1]; rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); rrm->header.size = htons(sizeof (struct ReleaseReserveMessage)); @@ -1004,6 +1070,10 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, #endif return NULL; } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# UPDATE requests executed"), + 1, + GNUNET_NO); um = (struct UpdateMessage*) &qe[1]; um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); um->header.size = htons(sizeof (struct UpdateMessage)); @@ -1068,7 +1138,17 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, queue_priority, max_queue_size, timeout, &process_status_message, &qc); if (qe == NULL) - return NULL; + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for REMOVE\n"); +#endif + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# REMOVE requests executed"), + 1, + GNUNET_NO); dm = (struct DataMessage*) &qe[1]; dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); dm->header.size = htons(msize); @@ -1141,6 +1221,8 @@ process_result_message (void *cls, rc.iter (rc.iter_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + h->retry_time.value = 0; + h->result_count = 0; process_queue (h); return; } @@ -1158,16 +1240,29 @@ process_result_message (void *cls, GNUNET_TIME_UNIT_ZERO_ABS, 0); return; } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# Results received"), + 1, + GNUNET_NO); if (rc.iter == NULL) { - /* abort iteration */ -#if DEBUG_DATASTORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aborting iteration via disconnect (client has cancelled)\n"); -#endif - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); + h->result_count++; + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# Excess results received"), + 1, + GNUNET_NO); + if (h->result_count > MAX_EXCESS_RESULTS) + { + free_queue_entry (qe); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# Forced database connection resets"), + 1, + GNUNET_NO); + h->retry_time = GNUNET_TIME_UNIT_ZERO; + do_disconnect (h); + return; + } + GNUNET_DATASTORE_get_next (h, GNUNET_NO); return; } dm = (const struct DataMessage*) msg; @@ -1179,6 +1274,7 @@ process_result_message (void *cls, ntohl(dm->size), GNUNET_h2s(&dm->key)); #endif + h->retry_time.value = 0; rc.iter (rc.iter_cls, &dm->key, ntohl(dm->size), @@ -1230,7 +1326,17 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, queue_priority, max_queue_size, timeout, &process_result_message, &qc); if (qe == NULL) - return NULL; + { +#if DEBUG_DATASTORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for GET RANDOM\n"); +#endif + return NULL; + } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# GET RANDOM requests executed"), + 1, + GNUNET_NO); m = (struct GNUNET_MessageHeader*) &qe[1]; m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM); m->size = htons(sizeof (struct GNUNET_MessageHeader)); @@ -1287,6 +1393,10 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, #endif return NULL; } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# GET ZERO ANONYMITY requests executed"), + 1, + GNUNET_NO); m = (struct GetZeroAnonymityMessage*) &qe[1]; m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM); m->header.size = htons(sizeof (struct GetZeroAnonymityMessage)); @@ -1352,6 +1462,10 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, #endif return NULL; } + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# GET requests executed"), + 1, + GNUNET_NO); gm = (struct GetMessage*) &qe[1]; gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET); gm->type = htonl(type); @@ -1385,21 +1499,20 @@ GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, struct ResultContext rc = qe->qc.rc; GNUNET_assert (&process_result_message == qe->response_proc); - if (GNUNET_YES == more) - { - h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, - qe->response_proc, - qe, - GNUNET_TIME_absolute_get_remaining (qe->timeout)); - return; + if (GNUNET_YES != more) + { + qe->qc.rc.iter = NULL; + qe->qc.rc.iter_cls = NULL; + if (rc.iter != NULL) + rc.iter (rc.iter_cls, + NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); } - free_queue_entry (qe); - h->retry_time = GNUNET_TIME_UNIT_ZERO; - do_disconnect (h); - rc.iter (rc.iter_cls, - NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); + h->in_receive = GNUNET_YES; + GNUNET_CLIENT_receive (h->client, + qe->response_proc, + qe, + GNUNET_TIME_absolute_get_remaining (qe->timeout)); } -- 2.25.1