GNUNET_POSTGRES_exec(plugin->dbh,
"CREATE TABLE IF NOT EXISTS channels (\n"
" id SERIAL,\n"
- " pub_key BYTEA,\n"
- " max_state_message_id INT,\n"
- " state_hash_message_id INT,\n"
+ " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
+ " max_state_message_id BIGINT,\n"
+ " state_hash_message_id BIGINT,\n"
" PRIMARY KEY(id)\n"
")" "WITH OIDS")) ||
(GNUNET_OK !=
GNUNET_POSTGRES_exec(plugin->dbh,
"CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n"
- " ON channels (substring(pub_key from 1 for 5)\n"
- ")")) ||
+ " ON channels (pub_key)")) ||
(GNUNET_OK !=
GNUNET_POSTGRES_exec(plugin->dbh,
GNUNET_POSTGRES_exec(plugin->dbh,
"CREATE TABLE IF NOT EXISTS slaves (\n"
" id SERIAL,\n"
- " pub_key BYTEA,\n"
+ " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n"
" PRIMARY KEY(id)\n"
")" "WITH OIDS")) ||
(GNUNET_OK !=
GNUNET_POSTGRES_exec(plugin->dbh,
"CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n"
- " ON slaves (substring(pub_key from 1 for 5)\n"
- ")")) ||
+ " ON slaves (pub_key)")) ||
(GNUNET_OK !=
GNUNET_POSTGRES_exec(plugin->dbh,
(GNUNET_OK !=
GNUNET_POSTGRES_exec(plugin->dbh,
"CREATE TABLE IF NOT EXISTS membership (\n"
- " channel_id INT NOT NULL REFERENCES channels(id),\n"
- " slave_id INT NOT NULL REFERENCES slaves(id),\n"
+ " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
+ " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n"
" did_join INT NOT NULL,\n"
" announced_at BIGINT NOT NULL,\n"
" effective_since BIGINT NOT NULL,\n"
(GNUNET_OK !=
GNUNET_POSTGRES_exec(plugin->dbh,
"CREATE TABLE IF NOT EXISTS messages (\n"
- " channel_id INT NOT NULL REFERENCES channels(id),\n"
- " hop_counter BIGINT NOT NULL,\n"
- " signature BYTEA,\n"
- " purpose BYTEA,\n"
+ " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
+ " hop_counter INT NOT NULL,\n"
+ " signature BYTEA CHECK (LENGTH(signature)=64),\n"
+ " purpose BYTEA CHECK (LENGTH(purpose)=8),\n"
" fragment_id BIGINT NOT NULL,\n"
" fragment_offset BIGINT NOT NULL,\n"
" message_id BIGINT NOT NULL,\n"
" group_generation BIGINT NOT NULL,\n"
- " multicast_flags BIGINT NOT NULL,\n"
- " psycstore_flags BIGINT NOT NULL,\n"
+ " multicast_flags INT NOT NULL,\n"
+ " psycstore_flags INT NOT NULL,\n"
" data BYTEA,\n"
" PRIMARY KEY (channel_id, fragment_id),\n"
" UNIQUE (channel_id, message_id, fragment_offset)\n"
(GNUNET_OK !=
GNUNET_POSTGRES_exec(plugin->dbh,
"CREATE TABLE IF NOT EXISTS state (\n"
- " channel_id INT NOT NULL REFERENCES channels(id),\n"
+ " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
" name TEXT NOT NULL,\n"
" value_current BYTEA,\n"
- " value_signed BYTEA\n"
+ " value_signed BYTEA,\n"
+ " PRIMARY KEY (channel_id, name)\n"
")" "WITH OIDS")) ||
-
- (GNUNET_OK !=
- GNUNET_POSTGRES_exec(plugin->dbh,
- "CREATE UNIQUE INDEX IF NOT EXISTS state_uniq_idx \n"
- " ON state (channel_id, substring(name from 1 for 5)\n"
- ")")) ||
-
(GNUNET_OK !=
GNUNET_POSTGRES_exec(plugin->dbh,
"CREATE TABLE IF NOT EXISTS state_sync (\n"
- " channel_id INT NOT NULL REFERENCES channels(id),\n"
+ " channel_id BIGINT NOT NULL REFERENCES channels(id),\n"
" name TEXT NOT NULL,\n"
" value BYTEA,\n"
- " PRIMARY KEY (channel_id)\n"
- ")" "WITH OIDS")) ||
-
- (GNUNET_OK !=
- GNUNET_POSTGRES_exec(plugin->dbh,
- "CREATE UNIQUE INDEX IF NOT EXISTS state_sync_name_idx \n"
- " ON state_sync (substring(name from 1 for 5)\n"
- ")")))
+ " PRIMARY KEY (channel_id, name)\n"
+ ")" "WITH OIDS")))
{
PQfinish (plugin->dbh);
plugin->dbh = NULL;
/** @todo select_messages: add method_prefix filter */
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "select_messages",
- "SELECT hop_counter, signature, purpose, fragment_id,\n"
- " fragment_offset, message_id, group_generation,\n"
- " multicast_flags, psycstore_flags, data\n"
- "FROM messages\n"
- "WHERE channel_id = get_chan_id($1) \n"
- " AND $2 <= message_id AND message_id <= $3"
- "LIMIT $4;", 4)) ||
+ "select_messages",
+ "SELECT hop_counter, signature, purpose, fragment_id,\n"
+ " fragment_offset, message_id, group_generation,\n"
+ " multicast_flags, psycstore_flags, data\n"
+ "FROM messages\n"
+ "WHERE channel_id = get_chan_id($1) \n"
+ " AND $2 <= message_id AND message_id <= $3\n"
+ "LIMIT $4;", 4)) ||
/** @todo select_latest_messages: add method_prefix filter */
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "select_latest_fragments",
- "SELECT rev.hop_counter AS hop_counter,\n"
- " rev.signature AS signature,\n"
- " rev.purpose AS purpose,\n"
- " rev.fragment_id AS fragment_id,\n"
- " rev.fragment_offset AS fragment_offset,\n"
- " rev.message_id AS message_id,\n"
- " rev.group_generation AS group_generation,\n"
- " rev.multicast_flags AS multicast_flags,\n"
- " rev.psycstore_flags AS psycstore_flags,\n"
- " rev.data AS data\n"
- " FROM\n"
- " (SELECT hop_counter, signature, purpose, fragment_id,\n"
- " fragment_offset, message_id, group_generation,\n"
- " multicast_flags, psycstore_flags, data \n"
- " FROM messages\n"
- " WHERE channel_id = get_chan_id($1) \n"
- " ORDER BY fragment_id DESC\n"
- " LIMIT $2) AS rev\n"
- " ORDER BY rev.fragment_id;", 2)) ||
+ "select_latest_fragments",
+ "SELECT rev.hop_counter AS hop_counter,\n"
+ " rev.signature AS signature,\n"
+ " rev.purpose AS purpose,\n"
+ " rev.fragment_id AS fragment_id,\n"
+ " rev.fragment_offset AS fragment_offset,\n"
+ " rev.message_id AS message_id,\n"
+ " rev.group_generation AS group_generation,\n"
+ " rev.multicast_flags AS multicast_flags,\n"
+ " rev.psycstore_flags AS psycstore_flags,\n"
+ " rev.data AS data\n"
+ " FROM\n"
+ " (SELECT hop_counter, signature, purpose, fragment_id,\n"
+ " fragment_offset, message_id, group_generation,\n"
+ " multicast_flags, psycstore_flags, data \n"
+ " FROM messages\n"
+ " WHERE channel_id = get_chan_id($1) \n"
+ " ORDER BY fragment_id DESC\n"
+ " LIMIT $2) AS rev\n"
+ " ORDER BY rev.fragment_id;", 2)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "select_latest_messages",
- "SELECT hop_counter, signature, purpose, fragment_id,\n"
- " fragment_offset, message_id, group_generation,\n"
- " multicast_flags, psycstore_flags, data\n"
- "FROM messages\n"
- "WHERE channel_id = get_chan_id($1)\n"
- " AND message_id IN\n"
- " (SELECT message_id\n"
- " FROM messages\n"
- " WHERE channel_id = get_chan_id($2) \n"
- " GROUP BY message_id\n"
- " ORDER BY message_id\n"
- " DESC LIMIT $3)\n"
- "ORDER BY fragment_id", 3)) ||
+ "select_latest_messages",
+ "SELECT hop_counter, signature, purpose, fragment_id,\n"
+ " fragment_offset, message_id, group_generation,\n"
+ " multicast_flags, psycstore_flags, data\n"
+ "FROM messages\n"
+ "WHERE channel_id = get_chan_id($1)\n"
+ " AND message_id IN\n"
+ " (SELECT message_id\n"
+ " FROM messages\n"
+ " WHERE channel_id = get_chan_id($2) \n"
+ " GROUP BY message_id\n"
+ " ORDER BY message_id\n"
+ " DESC LIMIT $3)\n"
+ "ORDER BY fragment_id", 3)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "select_message_fragment",
- "SELECT hop_counter, signature, purpose, fragment_id,\n"
- " fragment_offset, message_id, group_generation,\n"
- " multicast_flags, psycstore_flags, data\n"
- "FROM messages\n"
- "WHERE channel_id = get_chan_id($1) \n"
- " AND message_id = $2 AND fragment_offset = $3", 3)) ||
+ "select_message_fragment",
+ "SELECT hop_counter, signature, purpose, fragment_id,\n"
+ " fragment_offset, message_id, group_generation,\n"
+ " multicast_flags, psycstore_flags, data\n"
+ "FROM messages\n"
+ "WHERE channel_id = get_chan_id($1) \n"
+ " AND message_id = $2 AND fragment_offset = $3", 3)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "select_counters_message",
- "SELECT fragment_id, message_id, group_generation\n"
- "FROM messages\n"
- "WHERE channel_id = get_chan_id($1)\n"
- "ORDER BY fragment_id DESC LIMIT 1", 1)) ||
+ "select_counters_message",
+ "SELECT fragment_id, message_id, group_generation\n"
+ "FROM messages\n"
+ "WHERE channel_id = get_chan_id($1)\n"
+ "ORDER BY fragment_id DESC LIMIT 1", 1)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "select_counters_state",
- "SELECT max_state_message_id\n"
- "FROM channels\n"
- "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1)) ||
+ "select_counters_state",
+ "SELECT max_state_message_id\n"
+ "FROM channels\n"
+ "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "update_max_state_message_id",
- "UPDATE channels\n"
- "SET max_state_message_id = $1\n"
- "WHERE pub_key = $2", 2)) ||
+ "update_max_state_message_id",
+ "UPDATE channels\n"
+ "SET max_state_message_id = $1\n"
+ "WHERE pub_key = $2", 2)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "update_state_hash_message_id",
- "UPDATE channels\n"
- "SET state_hash_message_id = $1\n"
- "WHERE pub_key = $2", 2)) ||
+ "update_state_hash_message_id",
+ "UPDATE channels\n"
+ "SET state_hash_message_id = $1\n"
+ "WHERE pub_key = $2", 2)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "insert_state_current",
- "INSERT INTO state\n"
- " (channel_id, name, value_current, value_signed)\n"
- "SELECT new.channel_id, new.name,\n"
- " new.value_current, old.value_signed\n"
- "FROM (SELECT get_chan_id($1) AS channel_id,\n"
- " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
- "LEFT JOIN (SELECT channel_id, name, value_signed\n"
- " FROM state) AS old\n"
- "ON new.channel_id = old.channel_id AND new.name = old.name\n"
- "ON CONFLICT ( channel_id, substring(name from 1 for 5) )\n"
- " DO UPDATE SET value_current = EXCLUDED.value_current,\n"
- " value_signed = EXCLUDED.value_signed", 3)) ||
+ "insert_state_current",
+ "INSERT INTO state\n"
+ " (channel_id, name, value_current, value_signed)\n"
+ "SELECT new.channel_id, new.name,\n"
+ " new.value_current, old.value_signed\n"
+ "FROM (SELECT get_chan_id($1) AS channel_id,\n"
+ " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n"
+ "LEFT JOIN (SELECT channel_id, name, value_signed\n"
+ " FROM state) AS old\n"
+ "ON new.channel_id = old.channel_id AND new.name = old.name\n"
+ "ON CONFLICT (channel_id, name)\n"
+ " DO UPDATE SET value_current = EXCLUDED.value_current,\n"
+ " value_signed = EXCLUDED.value_signed", 3)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "delete_state_empty",
- "DELETE FROM state\n"
- "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
- " AND (value_current IS NULL OR length(value_current) = 0)\n"
- " AND (value_signed IS NULL OR length(value_signed) = 0)", 1)) ||
+ "delete_state_empty",
+ "DELETE FROM state\n"
+ "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n"
+ " AND (value_current IS NULL OR length(value_current) = 0)\n"
+ " AND (value_signed IS NULL OR length(value_signed) = 0)", 1)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "update_state_signed",
- "UPDATE state\n"
- "SET value_signed = value_current\n"
- "WHERE channel_id = get_chan_id($1) ", 1)) ||
+ "update_state_signed",
+ "UPDATE state\n"
+ "SET value_signed = value_current\n"
+ "WHERE channel_id = get_chan_id($1) ", 1)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "delete_state",
- "DELETE FROM state\n"
- "WHERE channel_id = get_chan_id($1) ", 1)) ||
+ "delete_state",
+ "DELETE FROM state\n"
+ "WHERE channel_id = get_chan_id($1) ", 1)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "insert_state_sync",
- "INSERT INTO state_sync (channel_id, name, value)\n"
- "VALUES (get_chan_id($1), $2, $3)", 3)) ||
+ "insert_state_sync",
+ "INSERT INTO state_sync (channel_id, name, value)\n"
+ "VALUES (get_chan_id($1), $2, $3)", 3)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "insert_state_from_sync",
- "INSERT INTO state\n"
- " (channel_id, name, value_current, value_signed)\n"
- "SELECT channel_id, name, value, value\n"
- "FROM state_sync\n"
- "WHERE channel_id = get_chan_id($1)", 1)) ||
+ "insert_state_from_sync",
+ "INSERT INTO state\n"
+ " (channel_id, name, value_current, value_signed)\n"
+ "SELECT channel_id, name, value, value\n"
+ "FROM state_sync\n"
+ "WHERE channel_id = get_chan_id($1)", 1)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "delete_state_sync",
- "DELETE FROM state_sync\n"
- "WHERE channel_id = get_chan_id($1)", 1)) ||
+ "delete_state_sync",
+ "DELETE FROM state_sync\n"
+ "WHERE channel_id = get_chan_id($1)", 1)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "select_state_one",
- "SELECT value_current\n"
- "FROM state\n"
- "WHERE channel_id = get_chan_id($1)\n"
- " AND name = $2", 2)) ||
+ "select_state_one",
+ "SELECT value_current\n"
+ "FROM state\n"
+ "WHERE channel_id = get_chan_id($1)\n"
+ " AND name = $2", 2)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "select_state_prefix",
- "SELECT name, value_current\n"
- "FROM state\n"
- "WHERE channel_id = get_chan_id($1)\n"
- " AND (name = $2 OR substr(name, 1, $3) = $4 || '_')", 4)) ||
+ "select_state_prefix",
+ "SELECT name, value_current\n"
+ "FROM state\n"
+ "WHERE channel_id = get_chan_id($1)\n"
+ " AND (name = $2 OR substr(name, 1, $3) = $4)", 4)) ||
(GNUNET_OK != GNUNET_POSTGRES_prepare (plugin->dbh,
- "select_state_signed",
- "SELECT name, value_signed\n"
- "FROM state\n"
- "WHERE channel_id = get_chan_id($1)\n"
- " AND value_signed IS NOT NULL", 1)))
+ "select_state_signed",
+ "SELECT name, value_signed\n"
+ "FROM state\n"
+ "WHERE channel_id = get_chan_id($1)\n"
+ " AND value_signed IS NOT NULL", 1)))
{
PQfinish (plugin->dbh);
plugin->dbh = NULL;
GNUNET_PQ_query_param_end
};
- ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_slave_key", params);
+ ret = GNUNET_PQ_exec_prepared (plugin->dbh, "insert_membership", params);
if (GNUNET_OK !=
GNUNET_POSTGRES_check_result (plugin->dbh,
ret,
PGRES_COMMAND_OK,
- "PQexecPrepared", "insert_slave_key"))
+ "PQexecPrepared", "insert_membership"))
{
return GNUNET_SYSERR;
}
if (GNUNET_OK !=
GNUNET_POSTGRES_check_result (plugin->dbh,
res,
- PGRES_COMMAND_OK,
+ PGRES_TUPLES_OK,
"PQexecPrepared", "select_membership"))
{
return GNUNET_SYSERR;
GNUNET_PQ_result_spec_end
};
- switch(GNUNET_PQ_extract_result (res, results_select, 0))
+ switch (GNUNET_PQ_extract_result (res, results_select, 0))
{
case GNUNET_OK:
ret = GNUNET_YES;
break;
+
default:
ret = GNUNET_NO;
break;
uint64_t message_id = GNUNET_ntohll (msg->message_id);
uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
- uint64_t hop_counter = ntohl(msg->hop_counter);
- uint64_t flags = ntohl(msg->flags);
+ uint32_t hop_counter = ntohl(msg->hop_counter);
+ uint32_t flags = ntohl(msg->flags);
if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
message_id > INT64_MAX || group_generation > INT64_MAX)
struct GNUNET_PQ_QueryParam params_insert[] = {
GNUNET_PQ_query_param_auto_from_type (channel_key),
- GNUNET_PQ_query_param_uint64 (&hop_counter),
+ GNUNET_PQ_query_param_uint32 (&hop_counter),
GNUNET_PQ_query_param_auto_from_type (&msg->signature),
GNUNET_PQ_query_param_auto_from_type (&msg->purpose),
GNUNET_PQ_query_param_uint64 (&fragment_id),
GNUNET_PQ_query_param_uint64 (&fragment_offset),
GNUNET_PQ_query_param_uint64 (&message_id),
GNUNET_PQ_query_param_uint64 (&group_generation),
- GNUNET_PQ_query_param_uint64 (&flags),
+ GNUNET_PQ_query_param_uint32 (&flags),
GNUNET_PQ_query_param_uint32 (&psycstore_flags),
- GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size)
- - sizeof (*msg)),
+ GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size) - sizeof (*msg)),
GNUNET_PQ_query_param_end
};
message_add_flags (void *cls,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
uint64_t message_id,
- uint64_t psycstore_flags)
+ uint32_t psycstore_flags)
{
PGresult *res;
struct Plugin *plugin = cls;
- int ret = GNUNET_SYSERR;
-
struct GNUNET_PQ_QueryParam params_update[] = {
- GNUNET_PQ_query_param_uint64 (&psycstore_flags),
+ GNUNET_PQ_query_param_uint32 (&psycstore_flags),
GNUNET_PQ_query_param_auto_from_type (channel_key),
GNUNET_PQ_query_param_uint64 (&message_id),
GNUNET_PQ_query_param_end
res = GNUNET_PQ_exec_prepared (plugin->dbh, "update_message_flags", params_update);
if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
- res,
- PGRES_COMMAND_OK,
- "PQexecPrepared", "update_message_flags"))
- return ret;
+ res,
+ PGRES_COMMAND_OK,
+ "PQexecPrepared","update_message_flags"))
+ return GNUNET_SYSERR;
PQclear (res);
- return ret;
+ return GNUNET_OK;
}
const char *stmt,
PGresult *res,
GNUNET_PSYCSTORE_FragmentCallback cb,
- void *cb_cls)
+ void *cb_cls,
+ uint64_t *returned_fragments)
{
uint32_t hop_counter;
void *signature = NULL;
uint64_t fragment_offset;
uint64_t message_id;
uint64_t group_generation;
- uint64_t flags;
+ uint32_t flags;
void *buf;
size_t buf_size;
int ret = GNUNET_SYSERR;
struct GNUNET_MULTICAST_MessageHeader *mp;
- uint64_t msg_flags;
- unsigned int cnt;
+ uint32_t msg_flags;
struct GNUNET_PQ_ResultSpec results[] = {
GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
- GNUNET_PQ_result_spec_uint64 ("msg_flags", &msg_flags),
- GNUNET_PQ_result_spec_uint64 ("flags", &flags),
- GNUNET_PQ_result_spec_variable_size ("data", &buf,
- &buf_size),
+ GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
+ GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
+ GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
GNUNET_PQ_result_spec_end
};
return GNUNET_SYSERR;
}
- cnt = PQntuples (res);
- if (cnt == 0)
+ int nrows = PQntuples (res);
+ for (int row = 0; row < nrows; row++)
{
- ret = GNUNET_NO;
- }
- else
- {
- if (GNUNET_OK != GNUNET_PQ_extract_result(res, results, 0)) {
- PQclear (res);
- return GNUNET_SYSERR;
+ if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
+ {
+ break;
}
mp = GNUNET_malloc (sizeof (*mp) + buf_size);
buf_size);
GNUNET_PQ_cleanup_result(results);
ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
+ if (NULL != returned_fragments)
+ (*returned_fragments)++;
}
- PQclear (res);
return ret;
}
static int
-fragment_select (struct Plugin *plugin, const char *stmt,
+fragment_select (struct Plugin *plugin,
+ const char *stmt,
struct GNUNET_PQ_QueryParam *params,
uint64_t *returned_fragments,
- GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
+ GNUNET_PSYCSTORE_FragmentCallback cb,
+ void *cb_cls)
{
PGresult *res;
int ret = GNUNET_SYSERR;
- // FIXME
- if (NULL == plugin->dbh || NULL == stmt || NULL == params)
- {
- fprintf(stderr, "%p %p %p\n", plugin->dbh, stmt, params);
- return GNUNET_SYSERR;
- }
-
res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
if (GNUNET_YES ==
GNUNET_POSTGRES_check_result (plugin->dbh,
res,
- PGRES_COMMAND_OK,
+ PGRES_TUPLES_OK,
"PQexecPrepared", stmt))
{
if (PQntuples (res) == 0)
ret = GNUNET_NO;
else
{
- ret = fragment_row (plugin, stmt, res, cb, cb_cls);
- (*returned_fragments)++;
+ ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments);
}
PQclear (res);
}
struct Plugin *plugin = cls;
*returned_fragments = 0;
+ if (0 == fragment_limit)
+ fragment_limit = INT64_MAX;
+
struct GNUNET_PQ_QueryParam params_select[] = {
GNUNET_PQ_query_param_auto_from_type (channel_key),
GNUNET_PQ_query_param_uint64 (&first_message_id),
res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh,
- res,
- PGRES_COMMAND_OK,
- "PQexecPrepared", stmt))
+ res,
+ PGRES_TUPLES_OK,
+ "PQexecPrepared", stmt))
{
if (PQntuples (res) == 0)
ret = GNUNET_NO;
else
- ret = fragment_row (plugin, stmt, res, cb, cb_cls);
+ ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL);
PQclear (res);
}
res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
- res,
- PGRES_COMMAND_OK,
- "PQexecPrepared", stmt))
+ res,
+ PGRES_TUPLES_OK,
+ "PQexecPrepared", stmt))
{
return GNUNET_SYSERR;
}
struct GNUNET_PQ_ResultSpec results_select[] = {
- GNUNET_PQ_result_spec_uint64 ("max_fragment_id", max_fragment_id),
- GNUNET_PQ_result_spec_uint64 ("max_message_id", max_message_id),
- GNUNET_PQ_result_spec_uint64 ("max_group_generation", max_group_generation),
+ GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id),
+ GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id),
+ GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation),
GNUNET_PQ_result_spec_end
};
- if (GNUNET_OK != GNUNET_PQ_extract_result (res,
- results_select, 0))
+ if (GNUNET_OK != GNUNET_PQ_extract_result (res, results_select, 0))
{
PQclear (res);
return GNUNET_SYSERR;
res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
- res,
- PGRES_COMMAND_OK,
- "PQexecPrepared", stmt))
+ res,
+ PGRES_TUPLES_OK,
+ "PQexecPrepared", stmt))
{
return GNUNET_SYSERR;
}
GNUNET_PQ_result_spec_end
};
- ret = GNUNET_PQ_extract_result (res,
- results_select, 0);
+ ret = GNUNET_PQ_extract_result (res, results_select, 0);
if (GNUNET_OK != ret)
{
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (channel_key),
GNUNET_PQ_query_param_string (name),
- GNUNET_PQ_query_param_auto_from_type (value),
+ GNUNET_PQ_query_param_fixed_size (value, value_size),
GNUNET_PQ_query_param_end
};
res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
- res,
- PGRES_COMMAND_OK,
- "PQexecPrepared", stmt))
+ res,
+ PGRES_COMMAND_OK,
+ "PQexecPrepared", stmt))
{
return GNUNET_SYSERR;
}
res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params);
if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
- res,
- PGRES_COMMAND_OK,
- "PQexecPrepared", stmt))
+ res,
+ PGRES_COMMAND_OK,
+ "PQexecPrepared", stmt))
{
return GNUNET_SYSERR;
}
case GNUNET_NO: // no state yet
ret = GNUNET_OK;
break;
+
default:
return ret;
}
switch (op)
{
case GNUNET_PSYC_OP_ASSIGN:
- return state_assign (plugin, "insert_state_current", channel_key,
- name, value, value_size);
+ return state_assign (plugin, "insert_state_current",
+ channel_key, name, value, value_size);
default: /** @todo implement more state operations */
GNUNET_break (0);
const char *name, const void *value, size_t value_size)
{
struct Plugin *plugin = cls;
- return state_assign (plugin, "insert_state_sync", channel_key,
- name, value, value_size);
+ return state_assign (plugin, "insert_state_sync",
+ channel_key, name, value, value_size);
}
res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
- res,
- PGRES_COMMAND_OK,
- "PQexecPrepared", stmt))
+ res,
+ PGRES_TUPLES_OK,
+ "PQexecPrepared", stmt))
{
return GNUNET_SYSERR;
}
ret = GNUNET_NO;
}
- ret = GNUNET_PQ_extract_result (res,
- results, 0);
+ ret = GNUNET_PQ_extract_result (res, results, 0);
if (GNUNET_OK != ret)
{
{
PGresult *res;
struct Plugin *plugin = cls;
- int ret = GNUNET_SYSERR;
+ int ret = GNUNET_NO;
const char *stmt = "select_state_prefix";
GNUNET_PQ_result_spec_end
};
- do
+ res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
+ if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
+ res,
+ PGRES_TUPLES_OK,
+ "PQexecPrepared", stmt))
{
- res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
- if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
- res,
- PGRES_COMMAND_OK,
- "PQexecPrepared", stmt))
- {
- break;
- }
-
- if (PQntuples (res) == 0)
- {
- PQclear (res);
- ret = GNUNET_NO;
- break;
- }
+ return GNUNET_SYSERR;
+ }
- if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, 0))
+ int nrows = PQntuples (res);
+ for (int row = 0; row < nrows; row++)
+ {
+ if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
{
- PQclear (res);
break;
}
value_size);
GNUNET_PQ_cleanup_result(results);
}
- while (ret == GNUNET_YES);
PQclear (res);
{
PGresult *res;
struct Plugin *plugin = cls;
- int ret = GNUNET_SYSERR;
+ int ret = GNUNET_NO;
const char *stmt = "select_state_signed";
GNUNET_PQ_result_spec_end
};
- do
+ res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
+ if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
+ res,
+ PGRES_TUPLES_OK,
+ "PQexecPrepared", stmt))
{
- res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
- if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
- res,
- PGRES_COMMAND_OK,
- "PQexecPrepared", stmt))
- {
- break;
- }
-
- if (PQntuples (res) == 0)
- {
- PQclear (res);
- ret = GNUNET_NO;
- break;
- }
+ return GNUNET_SYSERR;
+ }
- if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, 0))
+ int nrows = PQntuples (res);
+ for (int row = 0; row < nrows; row++)
+ {
+ if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
{
- PQclear (res);
break;
}
value_signed,
value_size);
- GNUNET_PQ_cleanup_result(results);
+ GNUNET_PQ_cleanup_result (results);
}
- while (ret == GNUNET_YES);
PQclear (res);