#include "gnunet_arm_service.h"
#include "gnunet_constants.h"
#include "gnunet_datastore_service.h"
+#include "gnunet_statistics_service.h"
#include "datastore.h"
+/**
+ * If a client stopped asking for more results, how many more do
+ * we receive from the DB before killing the connection? Trade-off
+ * between re-doing TCP handshakes and (needlessly) receiving
+ * useless results.
+ */
+#define MAX_EXCESS_RESULTS 8
/**
* Context for processing status messages.
*/
struct GNUNET_CLIENT_Connection *client;
+ /**
+ * Handle for statistics.
+ */
+ struct GNUNET_STATISTICS_Handle *stats;
+
/**
* Current transmit handle.
*/
*/
unsigned int queue_size;
+ /**
+ * Number of results we're receiving for the current query
+ * after application stopped to care. Used to determine when
+ * to reset the connection.
+ */
+ unsigned int result_count;
+
/**
* Are we currently trying to receive from the service?
*/
h->client = c;
h->cfg = cfg;
h->sched = sched;
+ h->stats = GNUNET_STATISTICS_create (sched,
+ "datastore-api",
+ cfg);
return h;
}
}
GNUNET_break (0);
}
+ GNUNET_STATISTICS_destroy (h->stats,
+ GNUNET_NO);
GNUNET_free (h);
}
{
struct GNUNET_DATASTORE_QueueEntry *qe = cls;
+ GNUNET_STATISTICS_update (qe->h->stats,
+ gettext_noop ("# queue entry timeouts"),
+ 1,
+ GNUNET_NO);
qe->task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_assert (qe->was_transmitted == GNUNET_NO);
qe->response_proc (qe, NULL);
pos = h->queue_head;
}
c++;
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# queue entries created"),
+ 1,
+ GNUNET_NO);
GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
h->queue_tail,
pos,
h->queue_size++;
if (c > max_queue_size)
{
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# queue overflows"),
+ 1,
+ GNUNET_NO);
response_proc (ret, NULL);
return NULL;
}
"DATASTORE reconnect failed (fatally)\n");
return;
}
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# datastore connections (re)created"),
+ 1,
+ GNUNET_NO);
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Reconnected to DATASTORE\n");
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Failed to transmit request to DATASTORE.\n"));
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# transmission request failures"),
+ 1,
+ GNUNET_NO);
do_disconnect (h);
return 0;
}
qe->response_proc,
qe,
GNUNET_TIME_absolute_get_remaining (qe->timeout));
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# bytes sent to datastore"),
+ 1,
+ GNUNET_NO);
return msize;
}
(int) status,
emsg);
#endif
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# status messages received"),
+ 1,
+ GNUNET_NO);
+ h->retry_time.value = 0;
process_queue (h);
if (rc.cont != NULL)
rc.cont (rc.cont_cls,
#endif
return NULL;
}
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# PUT requests executed"),
+ 1,
+ GNUNET_NO);
dm = (struct DataMessage* ) &qe[1];
dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
dm->header.size = htons(msize);
#endif
return NULL;
}
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# RESERVE requests executed"),
+ 1,
+ GNUNET_NO);
rm = (struct ReserveMessage*) &qe[1];
rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
rm->header.size = htons(sizeof (struct ReserveMessage));
#endif
return NULL;
}
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# RELEASE RESERVE requests executed"),
+ 1,
+ GNUNET_NO);
rrm = (struct ReleaseReserveMessage*) &qe[1];
rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
#endif
return NULL;
}
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# UPDATE requests executed"),
+ 1,
+ GNUNET_NO);
um = (struct UpdateMessage*) &qe[1];
um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
um->header.size = htons(sizeof (struct UpdateMessage));
queue_priority, max_queue_size, timeout,
&process_status_message, &qc);
if (qe == NULL)
- return NULL;
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Could not create queue entry for REMOVE\n");
+#endif
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# REMOVE requests executed"),
+ 1,
+ GNUNET_NO);
dm = (struct DataMessage*) &qe[1];
dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
dm->header.size = htons(msize);
rc.iter (rc.iter_cls,
NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ h->retry_time.value = 0;
+ h->result_count = 0;
process_queue (h);
return;
}
GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# Results received"),
+ 1,
+ GNUNET_NO);
if (rc.iter == NULL)
{
- /* abort iteration */
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aborting iteration via disconnect (client has cancelled)\n");
-#endif
- free_queue_entry (qe);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
+ h->result_count++;
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# Excess results received"),
+ 1,
+ GNUNET_NO);
+ if (h->result_count > MAX_EXCESS_RESULTS)
+ {
+ free_queue_entry (qe);
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# Forced database connection resets"),
+ 1,
+ GNUNET_NO);
+ h->retry_time = GNUNET_TIME_UNIT_ZERO;
+ do_disconnect (h);
+ return;
+ }
+ GNUNET_DATASTORE_get_next (h, GNUNET_NO);
return;
}
dm = (const struct DataMessage*) msg;
ntohl(dm->size),
GNUNET_h2s(&dm->key));
#endif
+ h->retry_time.value = 0;
rc.iter (rc.iter_cls,
&dm->key,
ntohl(dm->size),
queue_priority, max_queue_size, timeout,
&process_result_message, &qc);
if (qe == NULL)
- return NULL;
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Could not create queue entry for GET RANDOM\n");
+#endif
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# GET RANDOM requests executed"),
+ 1,
+ GNUNET_NO);
m = (struct GNUNET_MessageHeader*) &qe[1];
m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
m->size = htons(sizeof (struct GNUNET_MessageHeader));
#endif
return NULL;
}
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# GET ZERO ANONYMITY requests executed"),
+ 1,
+ GNUNET_NO);
m = (struct GetZeroAnonymityMessage*) &qe[1];
m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
#endif
return NULL;
}
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# GET requests executed"),
+ 1,
+ GNUNET_NO);
gm = (struct GetMessage*) &qe[1];
gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
gm->type = htonl(type);
struct ResultContext rc = qe->qc.rc;
GNUNET_assert (&process_result_message == qe->response_proc);
- if (GNUNET_YES == more)
- {
- h->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (h->client,
- qe->response_proc,
- qe,
- GNUNET_TIME_absolute_get_remaining (qe->timeout));
- return;
+ if (GNUNET_YES != more)
+ {
+ qe->qc.rc.iter = NULL;
+ qe->qc.rc.iter_cls = NULL;
+ if (rc.iter != NULL)
+ rc.iter (rc.iter_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
- free_queue_entry (qe);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
- rc.iter (rc.iter_cls,
- NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ h->in_receive = GNUNET_YES;
+ GNUNET_CLIENT_receive (h->client,
+ qe->response_proc,
+ qe,
+ GNUNET_TIME_absolute_get_remaining (qe->timeout));
}