#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_protocols.h"
+#include "gnunet_statistics_service.h"
#include "plugin_datastore.h"
#include "datastore.h"
static unsigned long long quota;
/**
- * How much space are we using for the cache?
- * (space available for insertions that will be
- * instantly reclaimed by discarding less
- * important content --- or possibly whatever
- * we just inserted into the "cache").
+ * How much space are we using for the cache? (space available for
+ * insertions that will be instantly reclaimed by discarding less
+ * important content --- or possibly whatever we just inserted into
+ * the "cache").
*/
static unsigned long long cache_size;
*/
struct GNUNET_SCHEDULER_Handle *sched;
+/**
+ * Handle for reporting statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
+
/**
* Function called once the transmit operation has
* either failed or succeeded.
int status);
+/**
+ * Context for transmitting replies to clients.
+ */
struct TransmitCallbackContext
{
+
+ /**
+ * We keep these in a doubly-linked list (for cleanup).
+ */
+ struct TransmitCallbackContext *next;
+
+ /**
+ * We keep these in a doubly-linked list (for cleanup).
+ */
+ struct TransmitCallbackContext *prev;
+
/**
* The message that we're asked to transmit.
*/
struct GNUNET_MessageHeader *msg;
+
+ /**
+ * Handle for the transmission request.
+ */
+ struct GNUNET_CONNECTION_TransmitHandle *th;
/**
* Client that we are transmitting to.
int end;
};
+
+/**
+ * Head of the doubly-linked list (for cleanup).
+ */
+static struct TransmitCallbackContext *tcc_head;
+
+/**
+ * Tail of the doubly-linked list (for cleanup).
+ */
+static struct TransmitCallbackContext *tcc_tail;
+
+/**
+ * Have we already cleaned up the TCCs and are hence no longer
+ * willing (or able) to transmit anything to anyone?
+ */
+static int cleaning_done;
/**
* Task that is used to remove expired entries from
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
{
struct GNUNET_TIME_Absolute now;
- expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
if (key == NULL)
{
expired_kill_task
= GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_NO,
- GNUNET_SCHEDULER_PRIORITY_IDLE,
- GNUNET_SCHEDULER_NO_TASK,
MAX_EXPIRE_DELAY,
&delete_expired,
NULL);
"Deleting content that expired %llu ms ago\n",
(unsigned long long) (now.value - expiration.value));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes expired"),
+ size,
+ GNUNET_YES);
GNUNET_CONTAINER_bloomfilter_remove (filter,
key);
return GNUNET_NO; /* delete */
delete_expired (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
plugin->api->iter_ascending_expiration (plugin->api->cls,
0,
&expired_processor,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
(0 == *need) ? GNUNET_YES : GNUNET_NO);
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Deleting %llu bytes of low-priority content (still trying to recover %llu bytes)\n",
+ "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
*need);
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes purged (low-priority)"),
+ size,
+ GNUNET_YES);
GNUNET_CONTAINER_bloomfilter_remove (filter,
key);
return GNUNET_NO;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to recover %llu bytes of cache space\n",
+ "Asked to free up %llu bytes of cache space\n",
need);
#endif
n = GNUNET_malloc (sizeof(unsigned long long));
struct TransmitCallbackContext *tcc = cls;
size_t msize;
+ tcc->th = NULL;
+ GNUNET_CONTAINER_DLL_remove (tcc_head,
+ tcc_tail,
+ tcc);
msize = ntohs(tcc->msg->size);
if (size == 0)
{
if (tcc->tc != NULL)
tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
if (GNUNET_YES == tcc->end)
- {
- GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
- }
+ GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
+ GNUNET_SERVER_client_drop (tcc->client);
GNUNET_free (tcc->msg);
GNUNET_free (tcc);
return 0;
"Response transmitted, more pending!\n");
#endif
}
+ GNUNET_SERVER_client_drop (tcc->client);
GNUNET_free (tcc->msg);
GNUNET_free (tcc);
return msize;
*
* @param client target of the message
* @param msg message to transmit, will be freed!
+ * @param tc function to call afterwards
+ * @param tc_cls closure for tc
* @param end is this the last response (and we should
* signal the server completion accodingly after
* transmitting this message)?
{
struct TransmitCallbackContext *tcc;
+ if (GNUNET_YES == cleaning_done)
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Shutdown in progress, aborting transmission.\n");
+#endif
+ if (NULL != tc)
+ tc (tc_cls, GNUNET_SYSERR);
+ return;
+ }
tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
tcc->msg = msg;
tcc->client = client;
tcc->tc = tc;
tcc->tc_cls = tc_cls;
tcc->end = end;
-
if (NULL ==
- GNUNET_SERVER_notify_transmit_ready (client,
- ntohs(msg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_callback,
- tcc))
+ (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
+ ntohs(msg->size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_callback,
+ tcc)))
{
GNUNET_break (0);
if (GNUNET_YES == end)
tc (tc_cls, GNUNET_SYSERR);
GNUNET_free (msg);
GNUNET_free (tcc);
+ return;
}
+ GNUNET_SERVER_client_keep (client);
+ GNUNET_CONTAINER_DLL_insert (tcc_head,
+ tcc_tail,
+ tcc);
}
* Function called once the transmit operation has
* either failed or succeeded.
*
- * @param cls closure
+ * @param next_cls closure for calling "next_request" callback
* @param status GNUNET_OK on success, GNUNET_SYSERR on error
*/
static void
if (status != GNUNET_OK)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _("Failed to transmit an item to the client; aborting iteration.\n"));
- plugin->api->next_request (next_cls, GNUNET_YES);
+ _("Failed to transmit an item to the client; aborting iteration.\n"));
+ if (plugin != NULL)
+ plugin->api->next_request (next_cls, GNUNET_YES);
return;
}
plugin->api->next_request (next_cls, GNUNET_NO);
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
"Transmitting `%s' message\n",
"DATA");
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# results found"),
+ 1,
+ GNUNET_NO);
transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
return GNUNET_OK;
}
return;
}
prev = pos;
- pos = next;
}
GNUNET_break (0);
transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
struct ReservationList *pos;
uint32_t size;
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing `%s' request\n",
- "PUT");
-#endif
- if (ntohl(dm->type) == 0)
- {
- GNUNET_break (0);
- dm = NULL;
- }
- if (dm == NULL)
+ if ( (dm == NULL) ||
+ (ntohl(dm->type) == 0) )
{
GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Processing `%s' request for `%s'\n",
+ "PUT",
+ GNUNET_h2s (&dm->key));
+#endif
rid = ntohl(dm->rid);
size = ntohl(dm->size);
if (rid > 0)
GNUNET_TIME_absolute_ntoh(dm->expiration),
&msg);
if (GNUNET_OK == ret)
- GNUNET_CONTAINER_bloomfilter_add (filter,
- &dm->key);
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes stored"),
+ size,
+ GNUNET_YES);
+ GNUNET_CONTAINER_bloomfilter_add (filter,
+ &dm->key);
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Successfully stored %u bytes under key `%s'\n",
+ size,
+ GNUNET_h2s (&dm->key));
+#endif
+ }
transmit_status (client,
(GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK,
msg);
*/
static void
handle_get (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
{
- static struct GNUNET_TIME_Absolute zero;
const struct GetMessage *msg;
uint16_t size;
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing `%s' request\n",
- "GET");
-#endif
size = ntohs(message->size);
if ( (size != sizeof(struct GetMessage)) &&
(size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
return;
}
msg = (const struct GetMessage*) message;
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Processing `%s' request for `%s' of type %u\n",
+ "GET",
+ GNUNET_h2s (&msg->key),
+ ntohl (msg->type));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# GET requests received"),
+ 1,
+ GNUNET_NO);
+ GNUNET_SERVER_client_keep (client);
if ( (size == sizeof(struct GetMessage)) &&
(GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
&msg->key)) )
/* don't bother database... */
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Empty result set for `%s' request.\n",
- "GET");
+ "Empty result set for `%s' request for `%s'.\n",
+ "GET",
+ GNUNET_h2s (&msg->key));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests filtered by bloomfilter"),
+ 1,
+ GNUNET_NO);
transmit_item (client,
- NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
+ NULL, NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- GNUNET_SERVER_client_keep (client);
plugin->api->get (plugin->api->cls,
((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
NULL,
int ret;
char *emsg;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# UPDATE requests received"),
+ 1,
+ GNUNET_NO);
+ msg = (const struct UpdateMessage*) message;
+ emsg = NULL;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing `%s' request\n",
- "UPDATE");
+ "Processing `%s' request for %llu\n",
+ "UPDATE",
+ (unsigned long long) GNUNET_ntohll (msg->uid));
#endif
- msg = (const struct UpdateMessage*) message;
- emsg = NULL;
ret = plugin->api->update (plugin->api->cls,
GNUNET_ntohll(msg->uid),
(int32_t) ntohl(msg->priority),
"Processing `%s' request\n",
"GET_RANDOM");
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# GET RANDOM requests received"),
+ 1,
+ GNUNET_NO);
GNUNET_SERVER_client_keep (client);
plugin->api->iter_migration_order (plugin->api->cls,
0,
const GNUNET_HashCode * key,
uint32_t size,
const void *data,
- uint32_t type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute
if (GNUNET_YES == rc->found)
transmit_status (rc->client, GNUNET_OK, NULL);
else
- transmit_status (rc->client, GNUNET_SYSERR, _("Content not found"));
+ transmit_status (rc->client, GNUNET_NO, _("Content not found"));
GNUNET_SERVER_client_drop (rc->client);
GNUNET_free (rc);
return GNUNET_OK; /* last item */
rc->found = GNUNET_YES;
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Item %llu matches `%s' request.\n",
+ "Item %llu matches `%s' request for key `%s'.\n",
(unsigned long long) uid,
- "REMOVE");
+ "REMOVE",
+ GNUNET_h2s (key));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes removed (explicit request)"),
+ size,
+ GNUNET_YES);
GNUNET_CONTAINER_bloomfilter_remove (filter,
key);
plugin->api->next_request (next_cls, GNUNET_YES);
GNUNET_HashCode vhash;
struct RemoveContext *rc;
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing `%s' request\n",
- "REMOVE");
-#endif
if (dm == NULL)
{
GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Processing `%s' request for `%s'\n",
+ "REMOVE",
+ GNUNET_h2s (&dm->key));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# REMOVE requests received"),
+ 1,
+ GNUNET_NO);
rc = GNUNET_malloc (sizeof(struct RemoveContext));
GNUNET_SERVER_client_keep (client);
rc->client = client;
GNUNET_CRYPTO_hash (&dm[1],
ntohl(dm->size),
&vhash);
- GNUNET_SERVER_client_keep (client);
plugin->api->get (plugin->api->cls,
&dm->key,
&vhash,
}
+/**
+ * Final task run after shutdown. Unloads plugins and disconnects us from
+ * statistics.
+ */
+static void
+unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ unload_plugin (plugin);
+ plugin = NULL;
+ if (filter != NULL)
+ {
+ GNUNET_CONTAINER_bloomfilter_free (filter);
+ filter = NULL;
+ }
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
+ stats = NULL;
+ }
+}
+
+
/**
* Last task run during shutdown. Disconnects us from
* the transport and core.
static void
cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- unload_plugin (plugin);
- plugin = NULL;
+ struct TransmitCallbackContext *tcc;
+
+ cleaning_done = GNUNET_YES;
+ while (NULL != (tcc = tcc_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (tcc_head,
+ tcc_tail,
+ tcc);
+ if (tcc->th != NULL)
+ {
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
+ GNUNET_SERVER_client_drop (tcc->client);
+ }
+ if (NULL != tcc->tc)
+ tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
+ GNUNET_free (tcc->msg);
+ GNUNET_free (tcc);
+ }
+ if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched,
+ expired_kill_task);
+ expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_SCHEDULER_add_continuation (sched,
+ &unload_task,
+ NULL,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
}
struct ReservationList *prev;
struct ReservationList *next;
+ if (client == NULL)
+ return;
prev = NULL;
pos = reservations;
while (NULL != pos)
"DATASTORE");
return;
}
+ stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
cache_size = quota / 8; /* Or should we make this an option? */
bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
fn = NULL;
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Failed to initialize bloomfilter.\n"));
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
+ stats = NULL;
+ }
return;
}
plugin = load_plugin ();
if (NULL == plugin)
{
GNUNET_CONTAINER_bloomfilter_free (filter);
+ filter = NULL;
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
+ stats = NULL;
+ }
return;
}
GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
GNUNET_SERVER_add_handlers (server, handlers);
expired_kill_task
- = GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_NO,
- GNUNET_SCHEDULER_PRIORITY_IDLE,
- GNUNET_SCHEDULER_NO_TASK,
- GNUNET_TIME_UNIT_ZERO,
- &delete_expired, NULL);
+ = GNUNET_SCHEDULER_add_with_priority (sched,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ &delete_expired, NULL);
GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_YES,
- GNUNET_SCHEDULER_PRIORITY_IDLE,
- GNUNET_SCHEDULER_NO_TASK,
GNUNET_TIME_UNIT_FOREVER_REL,
&cleaning_task, NULL);
ret = (GNUNET_OK ==
GNUNET_SERVICE_run (argc,
argv,
- "datastore", &run, NULL, NULL, NULL)) ? 0 : 1;
+ "datastore",
+ GNUNET_SERVICE_OPTION_NONE,
+ &run, NULL)) ? 0 : 1;
return ret;
}