#include "gnunet_arm_service.h"
#include "gnunet_constants.h"
#include "gnunet_datastore_service.h"
+#include "gnunet_statistics_service.h"
#include "datastore.h"
+/**
+ * If a client stopped asking for more results, how many more do
+ * we receive from the DB before killing the connection? Trade-off
+ * between re-doing TCP handshakes and (needlessly) receiving
+ * useless results.
+ */
+#define MAX_EXCESS_RESULTS 8
/**
* Context for processing status messages.
* multiple of 64 bits.
*/
int32_t was_transmitted;
-
+
};
/**
*/
struct GNUNET_CLIENT_Connection *client;
+ /**
+ * Handle for statistics.
+ */
+ struct GNUNET_STATISTICS_Handle *stats;
+
/**
* Current transmit handle.
*/
*/
unsigned int queue_size;
+ /**
+ * Number of results we're receiving for the current query
+ * after application stopped to care. Used to determine when
+ * to reset the connection.
+ */
+ unsigned int result_count;
+
/**
* Are we currently trying to receive from the service?
*/
h->client = c;
h->cfg = cfg;
h->sched = sched;
+ h->stats = GNUNET_STATISTICS_create (sched,
+ "datastore-api",
+ cfg);
return h;
}
}
GNUNET_break (0);
}
+ GNUNET_STATISTICS_destroy (h->stats,
+ GNUNET_NO);
GNUNET_free (h);
}
{
struct GNUNET_DATASTORE_QueueEntry *qe = cls;
+ GNUNET_STATISTICS_update (qe->h->stats,
+ gettext_noop ("# queue entry timeouts"),
+ 1,
+ GNUNET_NO);
qe->task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_assert (qe->was_transmitted == GNUNET_NO);
qe->response_proc (qe, NULL);
pos = h->queue_head;
}
c++;
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# queue entries created"),
+ 1,
+ GNUNET_NO);
GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
h->queue_tail,
pos,
h->queue_size++;
if (c > max_queue_size)
{
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# queue overflows"),
+ 1,
+ GNUNET_NO);
response_proc (ret, NULL);
return NULL;
}
{
struct GNUNET_DATASTORE_Handle *h = cls;
- if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
+ if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
else
h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
- if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
+ if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
"DATASTORE reconnect failed (fatally)\n");
return;
}
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# datastore connections (re)created"),
+ 1,
+ GNUNET_NO);
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Reconnected to DATASTORE\n");
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Failed to transmit request to DATASTORE.\n"));
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# transmission request failures"),
+ 1,
+ GNUNET_NO);
do_disconnect (h);
return 0;
}
qe->response_proc,
qe,
GNUNET_TIME_absolute_get_remaining (qe->timeout));
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# bytes sent to datastore"),
+ 1,
+ GNUNET_NO);
return msize;
}
free_queue_entry (qe);
if (NULL == h->client)
return; /* forced disconnect */
- rc.cont (rc.cont_cls,
- GNUNET_SYSERR,
- _("Failed to receive response from database."));
+ if (rc.cont != NULL)
+ rc.cont (rc.cont_cls,
+ GNUNET_SYSERR,
+ _("Failed to receive status response from database."));
if (was_transmitted == GNUNET_YES)
do_disconnect (h);
return;
GNUNET_break (0);
h->retry_time = GNUNET_TIME_UNIT_ZERO;
do_disconnect (h);
- rc.cont (rc.cont_cls,
- GNUNET_SYSERR,
- _("Error reading response from datastore service"));
+ if (rc.cont != NULL)
+ rc.cont (rc.cont_cls,
+ GNUNET_SYSERR,
+ _("Error reading response from datastore service"));
return;
}
sm = (const struct StatusMessage*) msg;
(int) status,
emsg);
#endif
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# status messages received"),
+ 1,
+ GNUNET_NO);
+ h->retry_time.rel_value = 0;
process_queue (h);
- rc.cont (rc.cont_cls,
- status,
- emsg);
+ if (rc.cont != NULL)
+ rc.cont (rc.cont_cls,
+ status,
+ emsg);
}
queue_priority, max_queue_size, timeout,
&process_status_message, &qc);
if (qe == NULL)
- return NULL;
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Could not create queue entry for PUT\n");
+#endif
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# PUT requests executed"),
+ 1,
+ GNUNET_NO);
dm = (struct DataMessage* ) &qe[1];
dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
dm->header.size = htons(msize);
queue_priority, max_queue_size, timeout,
&process_status_message, &qc);
if (qe == NULL)
- return NULL;
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Could not create queue entry to reserve\n");
+#endif
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# RESERVE requests executed"),
+ 1,
+ GNUNET_NO);
rm = (struct ReserveMessage*) &qe[1];
rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
rm->header.size = htons(sizeof (struct ReserveMessage));
queue_priority, max_queue_size, timeout,
&process_status_message, &qc);
if (qe == NULL)
- return NULL;
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Could not create queue entry to release reserve\n");
+#endif
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# RELEASE RESERVE requests executed"),
+ 1,
+ GNUNET_NO);
rrm = (struct ReleaseReserveMessage*) &qe[1];
rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
"Asked to update entry %llu raising priority by %u and expiration to %llu\n",
uid,
(unsigned int) priority,
- (unsigned long long) expiration.value);
+ (unsigned long long) expiration.abs_value);
#endif
qc.sc.cont = cont;
qc.sc.cont_cls = cont_cls;
queue_priority, max_queue_size, timeout,
&process_status_message, &qc);
if (qe == NULL)
- return NULL;
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Could not create queue entry for UPDATE\n");
+#endif
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# UPDATE requests executed"),
+ 1,
+ GNUNET_NO);
um = (struct UpdateMessage*) &qe[1];
um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
um->header.size = htons(sizeof (struct UpdateMessage));
queue_priority, max_queue_size, timeout,
&process_status_message, &qc);
if (qe == NULL)
- return NULL;
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Could not create queue entry for REMOVE\n");
+#endif
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# REMOVE requests executed"),
+ 1,
+ GNUNET_NO);
dm = (struct DataMessage*) &qe[1];
dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
dm->header.size = htons(msize);
_("Failed to receive response from database.\n"));
do_disconnect (h);
}
+ else
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Request dropped due to finite datastore queue length.\n");
+#endif
+ }
if (rc.iter != NULL)
rc.iter (rc.iter_cls,
NULL, 0, NULL, 0, 0, 0,
rc.iter (rc.iter_cls,
NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ h->retry_time.rel_value = 0;
+ h->result_count = 0;
process_queue (h);
return;
}
GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# Results received"),
+ 1,
+ GNUNET_NO);
if (rc.iter == NULL)
{
- /* abort iteration */
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aborting iteration via disconnect (client has cancelled)\n");
-#endif
- free_queue_entry (qe);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
+ h->result_count++;
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# Excess results received"),
+ 1,
+ GNUNET_NO);
+ if (h->result_count > MAX_EXCESS_RESULTS)
+ {
+ free_queue_entry (qe);
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# Forced database connection resets"),
+ 1,
+ GNUNET_NO);
+ h->retry_time = GNUNET_TIME_UNIT_ZERO;
+ do_disconnect (h);
+ return;
+ }
+ GNUNET_DATASTORE_get_next (h, GNUNET_NO);
return;
}
dm = (const struct DataMessage*) msg;
ntohl(dm->size),
GNUNET_h2s(&dm->key));
#endif
+ h->retry_time.rel_value = 0;
rc.iter (rc.iter_cls,
&dm->key,
ntohl(dm->size),
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asked to get random entry in %llu ms\n",
- (unsigned long long) timeout.value);
+ (unsigned long long) timeout.abs_value);
#endif
qc.rc.iter = iter;
qc.rc.iter_cls = iter_cls;
queue_priority, max_queue_size, timeout,
&process_result_message, &qc);
if (qe == NULL)
- return NULL;
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Could not create queue entry for GET RANDOM\n");
+#endif
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# GET RANDOM requests executed"),
+ 1,
+ GNUNET_NO);
m = (struct GNUNET_MessageHeader*) &qe[1];
m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
m->size = htons(sizeof (struct GNUNET_MessageHeader));
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asked to get zero-anonymity entry in %llu ms\n",
- (unsigned long long) timeout.value);
+ (unsigned long long) timeout.abs_value);
#endif
qc.rc.iter = iter;
qc.rc.iter_cls = iter_cls;
queue_priority, max_queue_size, timeout,
&process_result_message, &qc);
if (qe == NULL)
- return NULL;
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Could not create queue entry for zero-anonymity iteration\n");
+#endif
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# GET ZERO ANONYMITY requests executed"),
+ 1,
+ GNUNET_NO);
m = (struct GetZeroAnonymityMessage*) &qe[1];
- m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
+ m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
m->type = htonl ((uint32_t) type);
process_queue (h);
queue_priority, max_queue_size, timeout,
&process_result_message, &qc);
if (qe == NULL)
- return NULL;
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Could not queue request for `%s'\n",
+ GNUNET_h2s (key));
+#endif
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# GET requests executed"),
+ 1,
+ GNUNET_NO);
gm = (struct GetMessage*) &qe[1];
gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
gm->type = htonl(type);
struct ResultContext rc = qe->qc.rc;
GNUNET_assert (&process_result_message == qe->response_proc);
- if (GNUNET_YES == more)
- {
- h->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (h->client,
- qe->response_proc,
- qe,
- GNUNET_TIME_absolute_get_remaining (qe->timeout));
- return;
+ if (GNUNET_YES != more)
+ {
+ qe->qc.rc.iter = NULL;
+ qe->qc.rc.iter_cls = NULL;
+ if (rc.iter != NULL)
+ rc.iter (rc.iter_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
- free_queue_entry (qe);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
- rc.iter (rc.iter_cls,
- NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ h->in_receive = GNUNET_YES;
+ GNUNET_CLIENT_receive (h->client,
+ qe->response_proc,
+ qe,
+ GNUNET_TIME_absolute_get_remaining (qe->timeout));
}
qe->qc.rc.iter = NULL;
if (GNUNET_YES != h->in_receive)
GNUNET_DATASTORE_get_next (h, GNUNET_YES);
- return;
}
- reconnect = GNUNET_YES;
+ else
+ {
+ qe->qc.sc.cont = NULL;
+ }
+ return;
}
free_queue_entry (qe);
- if (reconnect)
- {
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
- }
- else
- {
- process_queue (h);
- }
+ process_queue (h);
}