#include "platform.h"
#include "gnunet_util_lib.h"
-#include "gnunet_arm_service.h"
#include "gnunet_protocols.h"
+#include "gnunet_statistics_service.h"
#include "plugin_datastore.h"
#include "datastore.h"
*/
struct GNUNET_SCHEDULER_Handle *sched;
+/**
+ * Handle for reporting statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
+
/**
* Function called once the transmit operation has
* either failed or succeeded.
*/
static struct TransmitCallbackContext *tcc_tail;
+/**
+ * Have we already cleaned up the TCCs and are hence no longer
+ * willing (or able) to transmit anything to anyone?
+ */
+static int cleaning_done;
/**
* Task that is used to remove expired entries from
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
{
struct GNUNET_TIME_Absolute now;
- expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
if (key == NULL)
{
expired_kill_task
"Deleting content that expired %llu ms ago\n",
(unsigned long long) (now.value - expiration.value));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes expired"),
+ size,
+ GNUNET_YES);
GNUNET_CONTAINER_bloomfilter_remove (filter,
key);
return GNUNET_NO; /* delete */
delete_expired (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
plugin->api->iter_ascending_expiration (plugin->api->cls,
0,
&expired_processor,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
*need);
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes purged (low-priority)"),
+ size,
+ GNUNET_YES);
GNUNET_CONTAINER_bloomfilter_remove (filter,
key);
return GNUNET_NO;
if (tcc->tc != NULL)
tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
if (GNUNET_YES == tcc->end)
- {
- GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
- }
+ GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
+ GNUNET_SERVER_client_drop (tcc->client);
GNUNET_free (tcc->msg);
GNUNET_free (tcc);
return 0;
"Response transmitted, more pending!\n");
#endif
}
+ GNUNET_SERVER_client_drop (tcc->client);
GNUNET_free (tcc->msg);
GNUNET_free (tcc);
return msize;
{
struct TransmitCallbackContext *tcc;
+ if (GNUNET_YES == cleaning_done)
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Shutdown in progress, aborting transmission.\n");
+#endif
+ if (NULL != tc)
+ tc (tc_cls, GNUNET_SYSERR);
+ return;
+ }
tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
tcc->msg = msg;
tcc->client = client;
tcc->end = end;
if (NULL ==
(tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
- ntohs(msg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_callback,
+ ntohs(msg->size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_callback,
tcc)))
{
GNUNET_break (0);
GNUNET_free (tcc);
return;
}
+ GNUNET_SERVER_client_keep (client);
GNUNET_CONTAINER_DLL_insert (tcc_head,
tcc_tail,
tcc);
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
"Transmitting `%s' message\n",
"DATA");
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# results found"),
+ 1,
+ GNUNET_NO);
transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
return GNUNET_OK;
}
struct ReservationList *pos;
uint32_t size;
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing `%s' request\n",
- "PUT");
-#endif
- if (ntohl(dm->type) == 0)
- {
- GNUNET_break (0);
- dm = NULL;
- }
- if (dm == NULL)
+ if ( (dm == NULL) ||
+ (ntohl(dm->type) == 0) )
{
GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Processing `%s' request for `%s'\n",
+ "PUT",
+ GNUNET_h2s (&dm->key));
+#endif
rid = ntohl(dm->rid);
size = ntohl(dm->size);
if (rid > 0)
&msg);
if (GNUNET_OK == ret)
{
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes stored"),
+ size,
+ GNUNET_YES);
GNUNET_CONTAINER_bloomfilter_add (filter,
&dm->key);
#if DEBUG_DATASTORE
*/
static void
handle_get (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
{
const struct GetMessage *msg;
uint16_t size;
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing `%s' request\n",
- "GET");
-#endif
size = ntohs(message->size);
if ( (size != sizeof(struct GetMessage)) &&
(size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
- GNUNET_SERVER_client_keep (client);
msg = (const struct GetMessage*) message;
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Processing `%s' request for `%s' of type %u\n",
+ "GET",
+ GNUNET_h2s (&msg->key),
+ ntohl (msg->type));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# GET requests received"),
+ 1,
+ GNUNET_NO);
+ GNUNET_SERVER_client_keep (client);
if ( (size == sizeof(struct GetMessage)) &&
(GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
&msg->key)) )
"GET",
GNUNET_h2s (&msg->key));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests filtered by bloomfilter"),
+ 1,
+ GNUNET_NO);
transmit_item (client,
NULL, NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
int ret;
char *emsg;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# UPDATE requests received"),
+ 1,
+ GNUNET_NO);
+ msg = (const struct UpdateMessage*) message;
+ emsg = NULL;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing `%s' request\n",
- "UPDATE");
+ "Processing `%s' request for %llu\n",
+ "UPDATE",
+ (unsigned long long) GNUNET_ntohll (msg->uid));
#endif
- msg = (const struct UpdateMessage*) message;
- emsg = NULL;
ret = plugin->api->update (plugin->api->cls,
GNUNET_ntohll(msg->uid),
(int32_t) ntohl(msg->priority),
"Processing `%s' request\n",
"GET_RANDOM");
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# GET RANDOM requests received"),
+ 1,
+ GNUNET_NO);
GNUNET_SERVER_client_keep (client);
plugin->api->iter_migration_order (plugin->api->cls,
0,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
rc->found = GNUNET_YES;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Item %llu matches `%s' request.\n",
+ "Item %llu matches `%s' request for key `%s'.\n",
(unsigned long long) uid,
- "REMOVE");
+ "REMOVE",
+ GNUNET_h2s (key));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes removed (explicit request)"),
+ size,
+ GNUNET_YES);
GNUNET_CONTAINER_bloomfilter_remove (filter,
key);
plugin->api->next_request (next_cls, GNUNET_YES);
GNUNET_HashCode vhash;
struct RemoveContext *rc;
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing `%s' request\n",
- "REMOVE");
-#endif
if (dm == NULL)
{
GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Processing `%s' request for `%s'\n",
+ "REMOVE",
+ GNUNET_h2s (&dm->key));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# REMOVE requests received"),
+ 1,
+ GNUNET_NO);
rc = GNUNET_malloc (sizeof(struct RemoveContext));
GNUNET_SERVER_client_keep (client);
rc->client = client;
}
+/**
+ * Final task run after shutdown. Unloads plugins and disconnects us from
+ * statistics.
+ */
+static void
+unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ unload_plugin (plugin);
+ plugin = NULL;
+ if (filter != NULL)
+ {
+ GNUNET_CONTAINER_bloomfilter_free (filter);
+ filter = NULL;
+ }
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
+ stats = NULL;
+ }
+}
+
+
/**
* Last task run during shutdown. Disconnects us from
* the transport and core.
{
struct TransmitCallbackContext *tcc;
+ cleaning_done = GNUNET_YES;
while (NULL != (tcc = tcc_head))
{
GNUNET_CONTAINER_DLL_remove (tcc_head,
tcc_tail,
tcc);
if (tcc->th != NULL)
- GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
- if (NULL != tcc->tc)
+ {
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
+ GNUNET_SERVER_client_drop (tcc->client);
+ }
+ if (NULL != tcc->tc)
tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
GNUNET_free (tcc->msg);
GNUNET_free (tcc);
expired_kill_task);
expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
}
- unload_plugin (plugin);
- plugin = NULL;
- if (filter != NULL)
- {
- GNUNET_CONTAINER_bloomfilter_free (filter);
- filter = NULL;
- }
- GNUNET_ARM_stop_services (cfg, tc->sched, "statistics", NULL);
+ GNUNET_SCHEDULER_add_continuation (sched,
+ &unload_task,
+ NULL,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
}
"DATASTORE");
return;
}
+ stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
cache_size = quota / 8; /* Or should we make this an option? */
bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
fn = NULL;
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Failed to initialize bloomfilter.\n"));
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
+ stats = NULL;
+ }
return;
}
- GNUNET_ARM_start_services (cfg, sched, "statistics", NULL);
plugin = load_plugin ();
if (NULL == plugin)
{
GNUNET_CONTAINER_bloomfilter_free (filter);
filter = NULL;
- GNUNET_ARM_stop_services (cfg, sched, "statistics", NULL);
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
+ stats = NULL;
+ }
return;
}
GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);