*/
#include "platform.h"
-#include "plugin_datastore.h"
+#include "gnunet_datastore_plugin.h"
#include <postgresql/libpq-fe.h>
#define DEBUG_POSTGRES GNUNET_NO
#define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
+/**
+ * Closure for 'postgres_next_request_cont'.
+ */
struct NextRequestClosure
{
+ /**
+ * Global plugin data.
+ */
struct Plugin *plugin;
+
+ /**
+ * Function to call for each matching entry.
+ */
PluginIterator iter;
+
+ /**
+ * Closure for 'iter'.
+ */
void *iter_cls;
+
+ /**
+ * Parameters for the prepared statement.
+ */
const char *paramValues[5];
+
+ /**
+ * Name of the prepared statement to run.
+ */
const char *pname;
+
+ /**
+ * Size of values pointed to by paramValues.
+ */
int paramLengths[5];
+
+ /**
+ * Number of paramters in paramValues/paramLengths.
+ */
int nparams;
+
+ /**
+ * Current time (possible parameter), big-endian.
+ */
uint64_t bnow;
+
+ /**
+ * Key (possible parameter)
+ */
GNUNET_HashCode key;
+
+ /**
+ * Hash of value (possible parameter)
+ */
GNUNET_HashCode vhash;
+
+ /**
+ * Number of entries found so far
+ */
long long count;
+
+ /**
+ * Offset this iteration starts at.
+ */
uint64_t off;
+
+ /**
+ * Current offset to use in query, big-endian.
+ */
uint64_t blimit_off;
+
+ /**
+ * Overall number of matching entries.
+ */
unsigned long long total;
+
+ /**
+ * Expiration value of previous result (possible parameter), big-endian.
+ */
uint64_t blast_expire;
+
+ /**
+ * Row ID of last result (possible paramter), big-endian.
+ */
uint32_t blast_rowid;
+
+ /**
+ * Priority of last result (possible parameter), big-endian.
+ */
uint32_t blast_prio;
+
+ /**
+ * Type of block (possible paramter), big-endian.
+ */
uint32_t btype;
+
+ /**
+ * Flag set to GNUNET_YES to stop iteration.
+ */
int end_it;
};
*/
GNUNET_SCHEDULER_TaskIdentifier next_task;
- unsigned long long payload;
-
- unsigned int lastSync;
-
};
* the desired status code. If not, log an error, clear the
* result and return GNUNET_SYSERR.
*
+ * @param plugin global context
+ * @param ret result to check
+ * @param expected_status expected return value
+ * @param command name of SQL command that was run
+ * @param args arguments to SQL command
+ * @param line line number for error reporting
* @return GNUNET_OK if the result is acceptable
*/
static int
/**
* Run simple SQL statement (without results).
+ *
+ * @param plugin global context
+ * @param sql statement to run
+ * @param line code line for error reporting
*/
static int
pq_exec (struct Plugin *plugin,
/**
* Prepare SQL statement.
+ *
+ * @param plugin global context
+ * @param name name for the prepared SQL statement
+ * @param sql SQL code to prepare
+ * @param nparams number of parameters in sql
+ * @param line code line for error reporting
+ * @return GNUNET_OK on success
*/
static int
pq_prepare (struct Plugin *plugin,
- const char *name, const char *sql, int nparms, int line)
+ const char *name, const char *sql, int nparams, int line)
{
PGresult *ret;
- ret = PQprepare (plugin->dbh, name, sql, nparms, NULL);
+ ret = PQprepare (plugin->dbh, name, sql, nparams, NULL);
if (GNUNET_OK !=
check_result (plugin,
ret, PGRES_COMMAND_OK, "PQprepare", sql, line))
/**
* @brief Get a database handle
+ *
+ * @param plugin global context
* @return GNUNET_OK on success, GNUNET_SYSERR on error
*/
static int
"CONFIG",
&conninfo);
plugin->dbh = PQconnectdb (conninfo == NULL ? "" : conninfo);
- GNUNET_free_non_null (conninfo);
if (NULL == plugin->dbh)
{
/* FIXME: warn about out-of-memory? */
+ GNUNET_free_non_null (conninfo);
return GNUNET_SYSERR;
}
if (PQstatus (plugin->dbh) != CONNECTION_OK)
{
GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
- "postgres",
- _("Unable to initialize Postgres: %s"),
+ "datastore-postgres",
+ _("Unable to initialize Postgres with configuration `%s': %s"),
+ conninfo,
PQerrorMessage (plugin->dbh));
PQfinish (plugin->dbh);
plugin->dbh = NULL;
+ GNUNET_free_non_null (conninfo);
return GNUNET_SYSERR;
}
+ GNUNET_free_non_null (conninfo);
ret = PQexec (plugin->dbh,
"CREATE TABLE gn090 ("
" type INTEGER NOT NULL DEFAULT 0,"
* Delete the row identified by the given rowid (qid
* in postgres).
*
+ * @param plugin global context
+ * @param rowid which row to delete
* @return GNUNET_OK on success
*/
static int
postgres_plugin_get_size (void *cls)
{
struct Plugin *plugin = cls;
- double ret;
+ unsigned long long total;
+ PGresult *ret;
- ret = plugin->payload;
- return (unsigned long long) (ret * 1.00);
- /* benchmarking shows XX% overhead */
+ ret = PQexecParams (plugin->dbh,
+ "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090",
+ 0, NULL, NULL, NULL, NULL, 1);
+ if (GNUNET_OK != check_result (plugin,
+ ret,
+ PGRES_TUPLES_OK,
+ "PQexecParams",
+ "get_size",
+ __LINE__))
+ {
+ return 0;
+ }
+ if ((PQntuples (ret) != 1) ||
+ (PQnfields (ret) != 1) ||
+ (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
+ {
+ GNUNET_break (0);
+ PQclear (ret);
+ return 0;
+ }
+ total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
+ PQclear (ret);
+ return total;
}
uint32_t btype = htonl (type);
uint32_t bprio = htonl (priority);
uint32_t banon = htonl (anonymity);
- uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).value__;
+ uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
const char *paramValues[] = {
(const char *) &btype,
(const char *) &bprio,
"PQexecPrepared", "put", __LINE__))
return GNUNET_SYSERR;
PQclear (ret);
- plugin->payload += size;
+ plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "postgres",
- "Stored %u bytes in database, new payload is %llu\n",
- (unsigned int) size,
- (unsigned long long) plugin->payload);
+ "datastore-postgres",
+ "Stored %u bytes in database\n",
+ (unsigned int) size);
#endif
return GNUNET_OK;
}
* asking the database plugin to call the iterator
* with the next item.
*
- * @param cls the 'struct NextRequestClosure'
+ * @param next_cls the 'struct NextRequestClosure'
* @param tc scheduler context
*/
static void
{
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "postgres",
+ "datastore-postgres",
"Ending iteration (%s)\n",
(GNUNET_YES == nrc->end_it) ? "client requested it" : "completed result set");
#endif
{
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "postgres",
+ "datastore-postgres",
"Ending iteration (postgres error)\n");
#endif
nrc->iter (nrc->iter_cls,
/* no result */
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "postgres",
+ "datastore-postgres",
"Ending iteration (no more results)\n");
#endif
nrc->iter (nrc->iter_cls,
type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
- expiration_time.value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
+ expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
size = PQgetlength (res, 0, 5);
nrc->blast_prio = htonl (priority);
- nrc->blast_expire = GNUNET_htonll (expiration_time.value);
+ nrc->blast_expire = GNUNET_htonll (expiration_time.abs_value);
nrc->blast_rowid = htonl (rowid);
nrc->count++;
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "postgres",
+ "datastore-postgres",
"Found result of size %u bytes and type %u in database\n",
(unsigned int) size,
(unsigned int) type);
{
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "postgres",
+ "datastore-postgres",
"Ending iteration (client error)\n");
#endif
return;
{
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "postgres",
- "Deleting %u bytes from database, current payload is %llu\n",
- (unsigned int) size,
- (unsigned long long) plugin->payload);
+ "datastore-postgres",
+ "Deleting %u bytes from database\n",
+ (unsigned int) size);
#endif
- GNUNET_assert (plugin->payload >= size);
- plugin->payload -= size;
+ plugin->env->duc (plugin->env->cls,
+ - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "postgres",
- "Deleted %u bytes from database, new payload is %llu\n",
- (unsigned int) size,
- (unsigned long long) plugin->payload);
+ "datastore-postgres",
+ "Deleted %u bytes from database\n",
+ (unsigned int) size);
#endif
}
}
if (GNUNET_YES == end_it)
nrc->end_it = GNUNET_YES;
nrc->plugin->next_task_nc = nrc;
- nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (nrc->plugin->env->sched,
- &postgres_next_request_cont,
+ nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&postgres_next_request_cont,
nrc);
}
PGresult *ret;
int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
uint32_t boid = htonl ( (uint32_t) uid);
- uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).value__;
+ uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
const char *paramValues[] = {
(const char *) &bdelta,
(const char *) &bexpire,
* Call a method for each key in the database and
* call the callback method on it.
*
+ * @param plugin global context
* @param type entries of which type should be considered?
+ * @param is_asc ascending or descending iteration?
+ * @param iter_select which SELECT method should be used?
* @param iter maybe NULL (to just count); iter
* should return GNUNET_SYSERR to abort the
* iteration, GNUNET_NO to delete the entry and
* continue and GNUNET_OK to continue iterating
+ * @param iter_cls closure for 'iter'
*/
static void
postgres_iterate (struct Plugin *plugin,
struct NextRequestClosure *nrc;
nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+ nrc->count = UINT32_MAX;
nrc->plugin = plugin;
nrc->iter = iter;
nrc->iter_cls = iter_cls;
iter (iter_cls,
NULL, NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ GNUNET_free (nrc);
return;
}
- nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).value__;
+ nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).abs_value__;
postgres_plugin_next_request (nrc,
GNUNET_NO);
}
iter (iter_cls,
NULL, NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ GNUNET_free (nrc);
return;
}
if ((PQntuples (ret) != 1) ||
iter (iter_cls,
NULL, NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ GNUNET_free (nrc);
return;
}
nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
iter (iter_cls,
NULL, NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ GNUNET_free (nrc);
return;
}
nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
}
-
/**
* Select a subset of the items in the datastore and call
* the given iterator for each of them.
}
-
/**
* Select a subset of the items in the datastore and call
* the given iterator for each of them.
api->iter_all_now = &postgres_plugin_iter_all_now;
api->drop = &postgres_plugin_drop;
GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
- "postgres", _("Postgres database running\n"));
+ "datastore-postgres",
+ _("Postgres database running\n"));
return api;
}
if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
{
- GNUNET_SCHEDULER_cancel (plugin->env->sched,
- plugin->next_task);
+ GNUNET_SCHEDULER_cancel (plugin->next_task);
plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_free (plugin->next_task_nc);
plugin->next_task_nc = NULL;