*/
#define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
+#define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
+
+/**
+ * After how many payload-changing operations
+ * do we sync our statistics?
+ */
+#define MAX_STAT_SYNC_LAG 50
/**
};
+
/**
* Our datastore plugin (NULL if not available).
*/
* How much space have we currently reserved?
*/
static unsigned long long reserved;
+
+/**
+ * How much data are we currently storing
+ * in the database?
+ */
+static unsigned long long payload;
+
+/**
+ * Number of updates that were made to the
+ * payload value since we last synchronized
+ * it with the statistics service.
+ */
+static unsigned int lastSync;
+
+/**
+ * Did we get an answer from statistics?
+ */
+static int stats_worked;
/**
* Identity of the task that is used to delete
static struct GNUNET_STATISTICS_Handle *stats;
+/**
+ * Synchronize our utilization statistics with the
+ * statistics service.
+ */
+static void
+sync_stats ()
+{
+ GNUNET_STATISTICS_set (stats,
+ QUOTA_STAT_NAME,
+ payload,
+ GNUNET_YES);
+ lastSync = 0;
+}
+
+
+
+
/**
* Function called once the transmit operation has
* either failed or succeeded.
*/
static int cleaning_done;
+/**
+ * Handle for pending get request.
+ */
+static struct GNUNET_STATISTICS_GetHandle *stat_get;
+
+
/**
* Task that is used to remove expired entries from
* the datastore. This task will schedule itself
#endif
amount = GNUNET_ntohll(msg->amount);
entries = ntohl(msg->entries);
- used = plugin->api->get_size (plugin->api->cls) + reserved;
+ used = payload + reserved;
req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
if (used + req > quota)
{
}
+/**
+ * Context for a put request used to see if the content is
+ * already present.
+ */
+struct PutContext
+{
+ /**
+ * Client to notify on completion.
+ */
+ struct GNUNET_SERVER_Client *client;
+
+ /**
+ * Did we find the data already in the database?
+ */
+ int is_present;
+
+ /* followed by the 'struct DataMessage' */
+};
+
+
+/**
+ * Actually put the data message.
+ */
+static void
+execute_put (struct GNUNET_SERVER_Client *client,
+ const struct DataMessage *dm)
+{
+ uint32_t size;
+ char *msg;
+ int ret;
+
+ size = ntohl(dm->size);
+ msg = NULL;
+ ret = plugin->api->put (plugin->api->cls,
+ &dm->key,
+ size,
+ &dm[1],
+ ntohl(dm->type),
+ ntohl(dm->priority),
+ ntohl(dm->anonymity),
+ GNUNET_TIME_absolute_ntoh(dm->expiration),
+ &msg);
+ if (GNUNET_OK == ret)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes stored"),
+ size,
+ GNUNET_YES);
+ GNUNET_CONTAINER_bloomfilter_add (filter,
+ &dm->key);
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Successfully stored %u bytes of type %u under key `%s'\n",
+ size,
+ ntohl(dm->type),
+ GNUNET_h2s (&dm->key));
+#endif
+ }
+ transmit_status (client,
+ (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK,
+ msg);
+ GNUNET_free_non_null (msg);
+ if (quota - reserved - cache_size < payload)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
+ (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
+ (unsigned long long) (quota - reserved - cache_size),
+ (unsigned long long) payload);
+ manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+ }
+}
+
+
+
+/**
+ * Function that will check if the given datastore entry
+ * matches the put and if none match executes the put.
+ *
+ * @param cls closure, pointer to the client (of type 'struct PutContext').
+ * @param next_cls closure to use to ask for the next item
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
+ * GNUNET_NO to delete the item and continue (if supported)
+ */
+static int
+check_present (void *cls,
+ void *next_cls,
+ const GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid)
+{
+ struct PutContext *pc = cls;
+ const struct DataMessage *dm;
+
+ dm = (const struct DataMessage*) &pc[1];
+ if (key == NULL)
+ {
+ if (pc->is_present == GNUNET_YES)
+ transmit_status (pc->client, GNUNET_OK, NULL);
+ else
+ execute_put (pc->client, dm);
+ GNUNET_SERVER_client_drop (pc->client);
+ GNUNET_free (pc);
+ return GNUNET_SYSERR;
+ }
+ if ( (size == ntohl(dm->size)) &&
+ (0 == memcmp (&dm[1],
+ data,
+ size)) )
+ {
+ pc->is_present = GNUNET_YES;
+ plugin->api->next_request (next_cls, GNUNET_YES);
+ }
+ else
+ {
+ plugin->api->next_request (next_cls, GNUNET_NO);
+ }
+ return GNUNET_OK;
+}
+
+
/**
* Handle PUT-message.
*
const struct GNUNET_MessageHeader *message)
{
const struct DataMessage *dm = check_data (message);
- char *msg;
- int ret;
int rid;
struct ReservationList *pos;
+ struct PutContext *pc;
uint32_t size;
if ( (dm == NULL) ||
if (NULL != pos)
{
GNUNET_break (pos->entries > 0);
- GNUNET_break (pos->amount > size);
+ GNUNET_break (pos->amount >= size);
pos->entries--;
pos->amount -= size;
reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
GNUNET_NO);
}
}
- msg = NULL;
- ret = plugin->api->put (plugin->api->cls,
- &dm->key,
- size,
- &dm[1],
- ntohl(dm->type),
- ntohl(dm->priority),
- ntohl(dm->anonymity),
- GNUNET_TIME_absolute_ntoh(dm->expiration),
- &msg);
- if (GNUNET_OK == ret)
- {
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# bytes stored"),
- size,
- GNUNET_YES);
- GNUNET_CONTAINER_bloomfilter_add (filter,
- &dm->key);
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Successfully stored %u bytes of type %u under key `%s'\n",
- size,
- ntohl(dm->type),
- GNUNET_h2s (&dm->key));
-#endif
- }
- transmit_status (client,
- (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK,
- msg);
- GNUNET_free_non_null (msg);
- if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
+ if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
+ &dm->key))
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
- (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
- (unsigned long long) (quota - reserved - cache_size),
- (unsigned long long) plugin->api->get_size (plugin->api->cls));
- manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
+ pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
+ pc->client = client;
+ GNUNET_SERVER_client_keep (client);
+ memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
+ plugin->api->get (plugin->api->cls,
+ &dm->key,
+ NULL,
+ ntohl (dm->type),
+ &check_present,
+ pc);
+ return;
}
+ execute_put (client, dm);
}
}
+/**
+ * Function called by plugins to notify us about a
+ * change in their disk utilization.
+ *
+ * @param cls closure (NULL)
+ * @param delta change in disk utilization,
+ * 0 for "reset to empty"
+ */
+static void
+disk_utilization_change_cb (void *cls,
+ int delta)
+{
+ if ( (delta < 0) &&
+ (payload < -delta) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Datastore payload inaccurate (%lld < %lld). Trying to fix.\n"),
+ (long long) payload,
+ (long long) -delta);
+ payload = plugin->api->get_size (plugin->api->cls);
+ sync_stats ();
+ return;
+ }
+ payload += delta;
+ lastSync++;
+ if (lastSync >= MAX_STAT_SYNC_LAG)
+ sync_stats ();
+}
+
+
+/**
+ * Callback function to process statistic values.
+ *
+ * @param cls closure (struct Plugin*)
+ * @param subsystem name of subsystem that created the statistic
+ * @param name the name of the datum
+ * @param value the current value
+ * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
+ * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
+ */
+static int
+process_stat_in (void *cls,
+ const char *subsystem,
+ const char *name,
+ uint64_t value,
+ int is_persistent)
+{
+ GNUNET_assert (stats_worked == GNUNET_NO);
+ stats_worked = GNUNET_YES;
+ payload += value;
+#if DEBUG_SQLITE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Notification from statistics about existing payload (%llu), new payload is %llu\n",
+ value,
+ payload);
+#endif
+ return GNUNET_OK;
+}
+
+
+static void
+process_stat_done (void *cls,
+ int success)
+{
+ struct DatastorePlugin *plugin = cls;
+
+ stat_get = NULL;
+ if (stats_worked == GNUNET_NO)
+ payload = plugin->api->get_size (plugin->api->cls);
+}
+
+
/**
* Load the datastore plugin.
*/
ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
ret->env.cfg = cfg;
ret->env.sched = sched;
+ ret->env.duc = &disk_utilization_change_cb;
+ ret->env.cls = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Loading `%s' datastore plugin\n"), name);
GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
GNUNET_CONTAINER_bloomfilter_free (filter);
filter = NULL;
}
+ if (lastSync > 0)
+ sync_stats ();
+ if (stat_get != NULL)
+ {
+ GNUNET_STATISTICS_get_cancel (stat_get);
+ stat_get = NULL;
+ }
if (stats != NULL)
{
GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
}
return;
}
+ stat_get = GNUNET_STATISTICS_get (stats,
+ "datastore",
+ QUOTA_STAT_NAME,
+ GNUNET_TIME_UNIT_SECONDS,
+ &process_stat_done,
+ &process_stat_in,
+ plugin);
GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
GNUNET_SERVER_add_handlers (server, handlers);
expired_kill_task
GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_TIME_UNIT_FOREVER_REL,
&cleaning_task, NULL);
-
}