This resolves issue #4965.
}
-/**
- * Context for a PUT request used to see if the content is
- * already present.
- */
-struct PutContext
-{
- /**
- * Client to notify on completion.
- */
- struct GNUNET_SERVICE_Client *client;
-
-#if ! HAVE_UNALIGNED_64_ACCESS
- void *reserved;
-#endif
-
- /* followed by the 'struct DataMessage' */
-};
-
-
/**
* Put continuation.
*
* @param cls closure
* @param key key for the item stored
* @param size size of the item stored
- * @param status #GNUNET_OK or #GNUNET_SYSERROR
+ * @param status #GNUNET_OK if inserted, #GNUNET_NO if updated,
+ * or #GNUNET_SYSERROR if error
* @param msg error message on error
*/
static void
put_continuation (void *cls,
- const struct GNUNET_HashCode *key,
- uint32_t size,
+ const struct GNUNET_HashCode *key,
+ uint32_t size,
int status,
- const char *msg)
+ const char *msg)
{
- struct PutContext *pc = cls;
+ struct GNUNET_SERVICE_Client *client = cls;
if (GNUNET_OK == status)
{
size,
GNUNET_h2s (key));
}
- transmit_status (pc->client,
- status,
+ transmit_status (client,
+ GNUNET_SYSERR == status ? GNUNET_SYSERR : GNUNET_OK,
msg);
- GNUNET_free (pc);
if (quota - reserved - cache_size < payload)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
}
-/**
- * Actually put the data message.
- *
- * @param pc put context
- */
-static void
-execute_put (struct PutContext *pc)
-{
- const struct DataMessage *dm;
-
- dm = (const struct DataMessage *) &pc[1];
- plugin->api->put (plugin->api->cls,
- &dm->key,
- ntohl (dm->size),
- &dm[1],
- ntohl (dm->type),
- ntohl (dm->priority),
- ntohl (dm->anonymity),
- ntohl (dm->replication),
- GNUNET_TIME_absolute_ntoh (dm->expiration),
- &put_continuation,
- pc);
-}
-
-
-/**
- *
- * @param cls closure
- * @param status #GNUNET_OK or #GNUNET_SYSERR
- * @param msg error message on error
- */
-static void
-check_present_continuation (void *cls,
- int status,
- const char *msg)
-{
- struct GNUNET_SERVICE_Client *client = cls;
-
- transmit_status (client,
- GNUNET_NO,
- NULL);
-}
-
-
-/**
- * Function that will check if the given datastore entry
- * matches the put and if none match executes the put.
- *
- * @param cls closure, pointer to the client (of type `struct PutContext`).
- * @param key key for the content
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param replication replication-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- * maybe 0 if no unique identifier is available
- * @return #GNUNET_OK usually
- * #GNUNET_NO to delete the item
- */
-static int
-check_present (void *cls,
- const struct GNUNET_HashCode *key,
- uint32_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expiration,
- uint64_t uid)
-{
- struct PutContext *pc = cls;
- const struct DataMessage *dm;
-
- dm = (const struct DataMessage *) &pc[1];
- if (key == NULL)
- {
- execute_put (pc);
- return GNUNET_OK;
- }
- if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
- (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
- ( (size == ntohl (dm->size)) &&
- (0 == memcmp (&dm[1],
- data,
- size)) ) )
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Result already present in datastore\n");
- if ( (ntohl (dm->priority) > 0) ||
- (ntohl (dm->replication) > 0) ||
- (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value_us >
- expiration.abs_value_us) )
- plugin->api->update (plugin->api->cls,
- uid,
- ntohl (dm->priority),
- ntohl (dm->replication),
- GNUNET_TIME_absolute_ntoh (dm->expiration),
- &check_present_continuation,
- pc->client);
- else
- {
- transmit_status (pc->client,
- GNUNET_NO,
- NULL);
- }
- GNUNET_free (pc);
- }
- else
- {
- execute_put (pc);
- }
- return GNUNET_OK;
-}
-
-
/**
* Verify PUT-message.
*
struct GNUNET_SERVICE_Client *client = cls;
int rid;
struct ReservationList *pos;
- struct PutContext *pc;
- struct GNUNET_HashCode vhash;
uint32_t size;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_NO);
}
}
- pc = GNUNET_malloc (sizeof (struct PutContext) + size +
- sizeof (struct DataMessage));
- pc->client = client;
- GNUNET_memcpy (&pc[1],
- dm,
- size + sizeof (struct DataMessage));
- if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
- &dm->key))
- {
- GNUNET_CRYPTO_hash (&dm[1],
- size,
- &vhash);
- plugin->api->get_key (plugin->api->cls,
- 0,
- false,
- &dm->key,
- &vhash,
- ntohl (dm->type),
- &check_present,
- pc);
- GNUNET_SERVICE_client_continue (client);
- return;
- }
- execute_put (pc);
+ bool absent = GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (filter,
+ &dm->key);
+ plugin->api->put (plugin->api->cls,
+ &dm->key,
+ absent,
+ ntohl (dm->size),
+ &dm[1],
+ ntohl (dm->type),
+ ntohl (dm->priority),
+ ntohl (dm->anonymity),
+ ntohl (dm->replication),
+ GNUNET_TIME_absolute_ntoh (dm->expiration),
+ &put_continuation,
+ client);
GNUNET_SERVICE_client_continue (client);
}
value[0] = crc->i;
GNUNET_memcpy (&value[4], &i, sizeof (i));
prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
- crc->api->put (crc->api->cls, &key, size, value, 1 + i % 4 /* type */ ,
- prio, i % 4 /* anonymity */ ,
+ crc->api->put (crc->api->cls,
+ &key,
+ false /* absent */,
+ size,
+ value,
+ 1 + i % 4 /* type */ ,
+ prio,
+ i % 4 /* anonymity */ ,
0 /* replication */ ,
GNUNET_TIME_relative_to_absolute
(GNUNET_TIME_relative_multiply
60 * 60 * 60 * 1000 +
GNUNET_CRYPTO_random_u32
(GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
- put_continuation, crc);
+ put_continuation,
+ crc);
i++;
}
}
+/**
+ * Closure for iterator for updating.
+ */
+struct UpdateContext
+{
+ /**
+ * Number of bytes in 'data'.
+ */
+ uint32_t size;
+
+ /**
+ * Pointer to the data.
+ */
+ const void *data;
+
+ /**
+ * Priority of the value.
+ */
+ uint32_t priority;
+
+ /**
+ * Replication level for the value.
+ */
+ uint32_t replication;
+
+ /**
+ * Expiration time for this value.
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * True if the value was found and updated.
+ */
+ bool updated;
+};
+
+
+/**
+ * Update the matching value.
+ *
+ * @param cls the 'struct UpdateContext'
+ * @param key unused
+ * @param val the 'struct Value'
+ * @return GNUNET_YES (continue iteration), GNUNET_NO if value was found
+ */
+static int
+update_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *val)
+{
+ struct UpdateContext *uc = cls;
+ struct Value *value = val;
+
+ if (value->size != uc->size)
+ return GNUNET_YES;
+ if (0 != memcmp (value->data, uc->data, uc->size))
+ return GNUNET_YES;
+ uc->expiration = GNUNET_TIME_absolute_max (value->expiration,
+ uc->expiration);
+ if (value->expiration.abs_value_us != uc->expiration.abs_value_us)
+ {
+ value->expiration = uc->expiration;
+ GNUNET_CONTAINER_heap_update_cost (value->expire_heap,
+ value->expiration.abs_value_us);
+ }
+ /* Saturating adds, don't overflow */
+ if (value->priority > UINT32_MAX - uc->priority)
+ value->priority = UINT32_MAX;
+ else
+ value->priority += uc->priority;
+ if (value->replication > UINT32_MAX - uc->replication)
+ value->replication = UINT32_MAX;
+ else
+ value->replication += uc->replication;
+ uc->updated = true;
+ return GNUNET_NO;
+}
+
/**
* Store an item in the datastore.
*
* @param cls closure
* @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
* @param size number of bytes in data
* @param data content stored
* @param type type of the content
*/
static void
heap_plugin_put (void *cls,
- const struct GNUNET_HashCode * key,
- uint32_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority, uint32_t anonymity,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expiration,
- PluginPutCont cont,
- void *cont_cls)
+ const struct GNUNET_HashCode *key,
+ bool absent,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ uint32_t replication,
+ struct GNUNET_TIME_Absolute expiration,
+ PluginPutCont cont,
+ void *cont_cls)
{
struct Plugin *plugin = cls;
struct Value *value;
+ if (!absent) {
+ struct UpdateContext uc;
+
+ uc.size = size;
+ uc.data = data;
+ uc.priority = priority;
+ uc.replication = replication;
+ uc.expiration = expiration;
+ uc.updated = false;
+ GNUNET_CONTAINER_multihashmap_get_multiple (plugin->keyvalue,
+ key,
+ &update_iterator,
+ &uc);
+ if (uc.updated)
+ {
+ cont (cont_cls, key, size, GNUNET_NO, NULL);
+ return;
+ }
+ }
value = GNUNET_malloc (sizeof (struct Value) + size);
value->key = *key;
value->data = &value[1];
}
-/**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore. If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept. The specified priority and replication is added
- * to the existing value.
- *
- * @param cls our `struct Plugin *`
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- * change?
- * @param replication by how much should the replication
- * change?
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-static void
-heap_plugin_update (void *cls,
- uint64_t uid,
- uint32_t priority,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expire,
- PluginUpdateCont cont,
- void *cont_cls)
-{
- struct Value *value;
-
- value = (struct Value*) (intptr_t) uid;
- GNUNET_assert (NULL != value);
- if (value->expiration.abs_value_us != expire.abs_value_us)
- {
- value->expiration = expire;
- GNUNET_CONTAINER_heap_update_cost (value->expire_heap,
- expire.abs_value_us);
- }
- /* Saturating adds, don't overflow */
- if (value->priority > UINT32_MAX - priority)
- value->priority = UINT32_MAX;
- else
- value->priority += priority;
- if (value->replication > UINT32_MAX - replication)
- value->replication = UINT32_MAX;
- else
- value->replication += replication;
- cont (cont_cls, GNUNET_OK, NULL);
-}
-
-
/**
* Call the given processor on an item with zero anonymity.
*
api->cls = plugin;
api->estimate_size = &heap_plugin_estimate_size;
api->put = &heap_plugin_put;
- api->update = &heap_plugin_update;
api->get_key = &heap_plugin_get_key;
api->get_replication = &heap_plugin_get_replication;
api->get_expiration = &heap_plugin_get_expiration;
#define UPDATE_ENTRY "UPDATE gn090 SET "\
"prio = prio + ?, "\
"repl = repl + ?, "\
- "expire = IF(expire >= ?, expire, ?) "\
- "WHERE uid = ?"
+ "expire = GREATEST(expire, ?) "\
+ "WHERE hash = ? AND vhash = ?"
struct GNUNET_MYSQL_StatementHandle *update_entry;
#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (1, repl) - 1 WHERE uid=?"
*
* @param cls closure
* @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
* @param size number of bytes in @a data
* @param data content stored
* @param type type of the content
static void
mysql_plugin_put (void *cls,
const struct GNUNET_HashCode *key,
+ bool absent,
uint32_t size,
const void *data,
enum GNUNET_BLOCK_Type type,
{
struct Plugin *plugin = cls;
uint64_t lexpiration = expiration.abs_value_us;
+ struct GNUNET_HashCode vhash;
+
+ GNUNET_CRYPTO_hash (data,
+ size,
+ &vhash);
+ if (!absent)
+ {
+ struct GNUNET_MY_QueryParam params_update[] = {
+ GNUNET_MY_query_param_uint32 (&priority),
+ GNUNET_MY_query_param_uint32 (&replication),
+ GNUNET_MY_query_param_uint64 (&lexpiration),
+ GNUNET_MY_query_param_auto_from_type (key),
+ GNUNET_MY_query_param_auto_from_type (&vhash),
+ GNUNET_MY_query_param_end
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_MY_exec_prepared (plugin->mc,
+ plugin->update_entry,
+ params_update))
+ {
+ cont (cont_cls,
+ key,
+ size,
+ GNUNET_SYSERR,
+ _("MySQL statement run failure"));
+ return;
+ }
+
+ MYSQL_STMT *stmt = GNUNET_MYSQL_statement_get_stmt (plugin->update_entry);
+ my_ulonglong rows = mysql_stmt_affected_rows (stmt);
+
+ GNUNET_break (GNUNET_NO ==
+ GNUNET_MY_extract_result (plugin->update_entry,
+ NULL));
+ if (0 != rows)
+ {
+ cont (cont_cls,
+ key,
+ size,
+ GNUNET_NO,
+ NULL);
+ return;
+ }
+ }
+
uint64_t lrvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
UINT64_MAX);
- struct GNUNET_HashCode vhash;
struct GNUNET_MY_QueryParam params_insert[] = {
GNUNET_MY_query_param_uint32 (&replication),
GNUNET_MY_query_param_uint32 (&type),
cont (cont_cls, key, size, GNUNET_SYSERR, _("Data too large"));
return;
}
- GNUNET_CRYPTO_hash (data,
- size,
- &vhash);
if (GNUNET_OK !=
GNUNET_MY_exec_prepared (plugin->mc,
}
-/**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore. If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept. The specified priority and replication is added
- * to the existing value.
- *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- * change?
- * @param replication by how much should the replication
- * change?
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-static void
-mysql_plugin_update (void *cls,
- uint64_t uid,
- uint32_t priority,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expire,
- PluginUpdateCont cont,
- void *cont_cls)
-{
- struct Plugin *plugin = cls;
- uint64_t lexpire = expire.abs_value_us;
- int ret;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Updating value %llu adding %d to priority %d to replication and maxing exp at %s\n",
- (unsigned long long) uid,
- priority,
- replication,
- GNUNET_STRINGS_absolute_time_to_string (expire));
-
- struct GNUNET_MY_QueryParam params_update[] = {
- GNUNET_MY_query_param_uint32 (&priority),
- GNUNET_MY_query_param_uint32 (&replication),
- GNUNET_MY_query_param_uint64 (&lexpire),
- GNUNET_MY_query_param_uint64 (&lexpire),
- GNUNET_MY_query_param_uint64 (&uid),
- GNUNET_MY_query_param_end
- };
-
- ret = GNUNET_MY_exec_prepared (plugin->mc,
- plugin->update_entry,
- params_update);
-
- if (GNUNET_OK != ret)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to update value %llu\n",
- (unsigned long long) uid);
- }
- else
- {
- GNUNET_break (GNUNET_NO ==
- GNUNET_MY_extract_result (plugin->update_entry,
- NULL));
- }
- cont (cont_cls,
- ret,
- NULL);
-}
-
-
/**
* Run the given select statement and call 'proc' on the resulting
* values (which must be in particular positions).
api->cls = plugin;
api->estimate_size = &mysql_plugin_estimate_size;
api->put = &mysql_plugin_put;
- api->update = &mysql_plugin_update;
api->get_key = &mysql_plugin_get_key;
api->get_replication = &mysql_plugin_get_replication;
api->get_expiration = &mysql_plugin_get_expiration;
"UPDATE gn090 "
"SET prio = prio + $1, "
"repl = repl + $2, "
- "expire = CASE WHEN expire < $3 THEN $3 ELSE expire END "
- "WHERE oid = $4", 4)) ||
+ "expire = GREATEST(expire, $3) "
+ "WHERE hash = $4 AND vhash = $5", 5)) ||
(GNUNET_OK !=
GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
"UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
*
* @param cls closure with the `struct Plugin`
* @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
* @param size number of bytes in data
* @param data content stored
* @param type type of the content
*/
static void
postgres_plugin_put (void *cls,
- const struct GNUNET_HashCode *key,
- uint32_t size,
+ const struct GNUNET_HashCode *key,
+ bool absent,
+ uint32_t size,
const void *data,
- enum GNUNET_BLOCK_Type type,
+ enum GNUNET_BLOCK_Type type,
uint32_t priority,
- uint32_t anonymity,
+ uint32_t anonymity,
uint32_t replication,
struct GNUNET_TIME_Absolute expiration,
- PluginPutCont cont,
+ PluginPutCont cont,
void *cont_cls)
{
struct Plugin *plugin = cls;
- uint32_t utype = type;
struct GNUNET_HashCode vhash;
+ PGresult *ret;
+
+ GNUNET_CRYPTO_hash (data,
+ size,
+ &vhash);
+
+ if (!absent)
+ {
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_uint32 (&priority),
+ GNUNET_PQ_query_param_uint32 (&replication),
+ GNUNET_PQ_query_param_absolute_time (&expiration),
+ GNUNET_PQ_query_param_auto_from_type (key),
+ GNUNET_PQ_query_param_auto_from_type (&vhash),
+ GNUNET_PQ_query_param_end
+ };
+ ret = GNUNET_PQ_exec_prepared (plugin->dbh,
+ "update",
+ params);
+ if (GNUNET_OK !=
+ GNUNET_POSTGRES_check_result (plugin->dbh,
+ ret,
+ PGRES_COMMAND_OK,
+ "PQexecPrepared",
+ "update"))
+ {
+ cont (cont_cls,
+ key,
+ size,
+ GNUNET_SYSERR,
+ _("Postgress exec failure"));
+ return;
+ }
+ /* What an awful API, this function really does return a string */
+ bool affected = 0 != strcmp ("0", PQcmdTuples (ret));
+ PQclear (ret);
+ if (affected)
+ {
+ cont (cont_cls,
+ key,
+ size,
+ GNUNET_NO,
+ NULL);
+ return;
+ }
+ }
+
+ uint32_t utype = type;
uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
UINT64_MAX);
- PGresult *ret;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_uint32 (&replication),
GNUNET_PQ_query_param_uint32 (&utype),
GNUNET_PQ_query_param_end
};
- GNUNET_CRYPTO_hash (data, size, &vhash);
ret = GNUNET_PQ_exec_prepared (plugin->dbh,
"put",
params);
}
-/**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore. If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept. The specified priority and replication is added
- * to the existing value.
- *
- * @param cls our `struct Plugin *`
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- * change?
- * @param replication by how much should the replication
- * change?
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-static void
-postgres_plugin_update (void *cls,
- uint64_t uid,
- uint32_t priority,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expire,
- PluginUpdateCont cont,
- void *cont_cls)
-{
- struct Plugin *plugin = cls;
- uint32_t oid = (uint32_t) uid;
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_uint32 (&priority),
- GNUNET_PQ_query_param_uint32 (&replication),
- GNUNET_PQ_query_param_absolute_time (&expire),
- GNUNET_PQ_query_param_uint32 (&oid),
- GNUNET_PQ_query_param_end
- };
- PGresult *ret;
-
- ret = GNUNET_PQ_exec_prepared (plugin->dbh,
- "update",
- params);
- if (GNUNET_OK !=
- GNUNET_POSTGRES_check_result (plugin->dbh,
- ret,
- PGRES_COMMAND_OK,
- "PQexecPrepared",
- "update"))
- {
- cont (cont_cls,
- GNUNET_SYSERR,
- NULL);
- return;
- }
- PQclear (ret);
- cont (cont_cls,
- GNUNET_OK,
- NULL);
-}
-
-
/**
* Get all of the keys in the datastore.
*
api->cls = plugin;
api->estimate_size = &postgres_plugin_estimate_size;
api->put = &postgres_plugin_put;
- api->update = &postgres_plugin_update;
api->get_key = &postgres_plugin_get_key;
api->get_replication = &postgres_plugin_get_replication;
api->get_expiration = &postgres_plugin_get_expiration;
/**
* Precompiled SQL for update.
*/
- sqlite3_stmt *updPrio;
+ sqlite3_stmt *update;
/**
* Get maximum repl value in database.
"SET prio = prio + ?, "
"repl = repl + ?, "
"expire = MAX(expire, ?) "
- "WHERE _ROWID_ = ?",
- &plugin->updPrio)) ||
+ "WHERE hash = ? AND vhash = ?",
+ &plugin->update)) ||
(SQLITE_OK !=
sq_prepare (plugin->dbh,
"UPDATE gn090 " "SET repl = MAX (0, repl - 1) WHERE _ROWID_ = ?",
if (NULL != plugin->delRow)
sqlite3_finalize (plugin->delRow);
- if (NULL != plugin->updPrio)
- sqlite3_finalize (plugin->updPrio);
+ if (NULL != plugin->update)
+ sqlite3_finalize (plugin->update);
if (NULL != plugin->updRepl)
sqlite3_finalize (plugin->updRepl);
if (NULL != plugin->selRepl)
*
* @param cls closure
* @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
* @param size number of bytes in @a data
* @param data content stored
* @param type type of the content
static void
sqlite_plugin_put (void *cls,
const struct GNUNET_HashCode *key,
+ bool absent,
uint32_t size,
const void *data,
enum GNUNET_BLOCK_Type type,
PluginPutCont cont,
void *cont_cls)
{
- uint64_t rvalue;
+ struct Plugin *plugin = cls;
struct GNUNET_HashCode vhash;
+ char *msg = NULL;
+
+ GNUNET_CRYPTO_hash (data,
+ size,
+ &vhash);
+
+ if (!absent)
+ {
+ struct GNUNET_SQ_QueryParam params[] = {
+ GNUNET_SQ_query_param_uint32 (&priority),
+ GNUNET_SQ_query_param_uint32 (&replication),
+ GNUNET_SQ_query_param_absolute_time (&expiration),
+ GNUNET_SQ_query_param_auto_from_type (key),
+ GNUNET_SQ_query_param_auto_from_type (&vhash),
+ GNUNET_SQ_query_param_end
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_SQ_bind (plugin->update,
+ params))
+ {
+ cont (cont_cls,
+ key,
+ size,
+ GNUNET_SYSERR,
+ _("sqlite bind failure"));
+ return;
+ }
+ if (SQLITE_DONE != sqlite3_step (plugin->update))
+ {
+ LOG_SQLITE_MSG (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_step");
+ cont (cont_cls,
+ key,
+ size,
+ GNUNET_SYSERR,
+ msg);
+ GNUNET_free_non_null (msg);
+ return;
+ }
+ int changes = sqlite3_changes (plugin->dbh);
+ GNUNET_SQ_reset (plugin->dbh,
+ plugin->update);
+ if (0 != changes)
+ {
+ cont (cont_cls,
+ key,
+ size,
+ GNUNET_NO,
+ NULL);
+ return;
+ }
+ }
+
+ uint64_t rvalue;
uint32_t type32 = (uint32_t) type;
struct GNUNET_SQ_QueryParam params[] = {
GNUNET_SQ_query_param_uint32 (&replication),
GNUNET_SQ_query_param_fixed_size (data, size),
GNUNET_SQ_query_param_end
};
- struct Plugin *plugin = cls;
int n;
int ret;
sqlite3_stmt *stmt;
- char *msg = NULL;
if (size > MAX_ITEM_SIZE)
{
GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
GNUNET_YES),
GNUNET_STRINGS_absolute_time_to_string (expiration));
- GNUNET_CRYPTO_hash (data, size, &vhash);
stmt = plugin->insertContent;
rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
if (GNUNET_OK !=
GNUNET_SQ_bind (stmt,
params))
{
- cont (cont_cls, key, size, GNUNET_SYSERR, msg);
- GNUNET_free_non_null(msg);
+ cont (cont_cls, key, size, GNUNET_SYSERR, NULL);
return;
}
n = sqlite3_step (stmt);
}
-/**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore. If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept. The specified priority and replication is added
- * to the existing value.
- *
- * @param cls the plugin context (state for this module)
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- * change?
- * @param replication by how much should the replication
- * change?
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param cont continuation called with success or failure status
- * @param cons_cls closure for @a cont
- */
-static void
-sqlite_plugin_update (void *cls,
- uint64_t uid,
- uint32_t priority,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expire,
- PluginUpdateCont cont,
- void *cont_cls)
-{
- struct Plugin *plugin = cls;
- struct GNUNET_SQ_QueryParam params[] = {
- GNUNET_SQ_query_param_uint32 (&priority),
- GNUNET_SQ_query_param_uint32 (&replication),
- GNUNET_SQ_query_param_absolute_time (&expire),
- GNUNET_SQ_query_param_uint64 (&uid),
- GNUNET_SQ_query_param_end
- };
- int n;
- char *msg = NULL;
-
- if (GNUNET_OK !=
- GNUNET_SQ_bind (plugin->updPrio,
- params))
- {
- cont (cont_cls, GNUNET_SYSERR, msg);
- GNUNET_free_non_null(msg);
- return;
- }
- n = sqlite3_step (plugin->updPrio);
- GNUNET_SQ_reset (plugin->dbh,
- plugin->updPrio);
- switch (n)
- {
- case SQLITE_DONE:
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", "Block updated\n");
- cont (cont_cls, GNUNET_OK, NULL);
- return;
- case SQLITE_BUSY:
- LOG_SQLITE_MSG (plugin, &msg,
- GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
- "sqlite3_step");
- cont (cont_cls, GNUNET_NO, msg);
- GNUNET_free_non_null(msg);
- return;
- default:
- LOG_SQLITE_MSG (plugin, &msg, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "sqlite3_step");
- cont (cont_cls, GNUNET_SYSERR, msg);
- GNUNET_free_non_null(msg);
- return;
- }
-}
-
-
/**
* Execute statement that gets a row and call the callback
* with the result. Resets the statement afterwards.
api->cls = &plugin;
api->estimate_size = &sqlite_plugin_estimate_size;
api->put = &sqlite_plugin_put;
- api->update = &sqlite_plugin_update;
api->get_key = &sqlite_plugin_get_key;
api->get_replication = &sqlite_plugin_get_replication;
api->get_expiration = &sqlite_plugin_get_expiration;
*
* @param cls closure
* @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
* @param size number of bytes in data
* @param data content stored
* @param type type of the content
* @param cont_cls continuation closure
*/
static void
-template_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
- const void *data, enum GNUNET_BLOCK_Type type,
- uint32_t priority, uint32_t anonymity,
+template_plugin_put (void *cls,
+ const struct GNUNET_HashCode *key,
+ bool absent,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
uint32_t replication,
- struct GNUNET_TIME_Absolute expiration, PluginPutCont cont,
+ struct GNUNET_TIME_Absolute expiration,
+ PluginPutCont cont,
void *cont_cls)
{
GNUNET_break (0);
}
-/**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore. If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept. The specified priority and replication is added
- * to the existing value.
- *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- * change?
- * @param replication by how much should the replication
- * change?
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-static void
-template_plugin_update (void *cls,
- uint64_t uid,
- uint32_t priority,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expire,
- PluginUpdateCont cont,
- void *cont_cls)
-{
- GNUNET_break (0);
- cont (cont_cls, GNUNET_SYSERR, "not implemented");
-}
-
-
/**
* Call the given processor on an item with zero anonymity.
*
api->cls = plugin;
api->estimate_size = &template_plugin_estimate_size;
api->put = &template_plugin_put;
- api->update = &template_plugin_update;
api->get_key = &template_plugin_get_key;
api->get_replication = &template_plugin_get_replication;
api->get_expiration = &template_plugin_get_expiration;
RP_ERROR = 0,
RP_PUT,
RP_GET,
- RP_UPDATE,
RP_ITER_ZERO,
RP_REPL_GET,
RP_EXPI_GET,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"putting type %u, anon %u under key %s\n", i + 1, i,
GNUNET_h2s (&key));
- crc->api->put (crc->api->cls, &key, size, value, i + 1 /* type */ ,
- prio, i /* anonymity */ ,
+ crc->api->put (crc->api->cls,
+ &key,
+ false /* absent */,
+ size,
+ value, i + 1 /* type */ ,
+ prio,
+ i /* anonymity */ ,
0 /* replication */ ,
GNUNET_TIME_relative_to_absolute
(GNUNET_TIME_relative_multiply
60 * 60 * 60 * 1000 +
GNUNET_CRYPTO_random_u32
(GNUNET_CRYPTO_QUALITY_WEAK, 1000))),
- put_continuation, crc);
+ put_continuation,
+ crc);
i++;
}
}
-static void
-update_continuation (void *cls,
- int status,
- const char *msg)
-{
- struct CpsRunContext *crc = cls;
-
- GNUNET_assert (GNUNET_OK == status);
- crc->phase++;
- GNUNET_SCHEDULER_add_now (&test, crc);
-}
-
-
static void
test (void *cls)
{
&iterate_one_shot,
crc);
break;
- case RP_UPDATE:
- crc->api->update (crc->api->cls,
- guid,
- 1,
- 1,
- GNUNET_TIME_UNIT_ZERO_ABS,
- &update_continuation,
- crc);
- break;
-
case RP_ITER_ZERO:
if (crc->cnt == 1)
{
* @param cls closure
* @param key key for the item stored
* @param size size of the item stored
- * @param status #GNUNET_OK or #GNUNET_SYSERROR
+ * @param status #GNUNET_OK if inserted, #GNUNET_NO if updated,
+ * or #GNUNET_SYSERROR if error
* @param msg error message on error
*/
typedef void
*
* @param cls closure
* @param key key for the item
+ * @param absent true if the key was not found in the bloom filter
* @param size number of bytes in @a data
* @param data content stored
* @param type type of the content
typedef void
(*PluginPut) (void *cls,
const struct GNUNET_HashCode *key,
- uint32_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expiration,
- PluginPutCont cont,
- void *cont_cls);
+ bool absent,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ uint32_t replication,
+ struct GNUNET_TIME_Absolute expiration,
+ PluginPutCont cont,
+ void *cont_cls);
/**
void *proc_cls);
-/**
- * Update continuation.
- *
- * @param cls closure
- * @param status #GNUNET_OK or #GNUNET_SYSERR
- * @param msg error message on error
- */
-typedef void
-(*PluginUpdateCont) (void *cls,
- int status,
- const char *msg);
-
-
-/**
- * Update the priority, replication and expiration for a particular
- * unique ID in the datastore. If the expiration time in value is
- * different than the time found in the datastore, the higher value
- * should be kept. The specified priority and replication is added
- * to the existing value.
- *
- * @param cls closure
- * @param uid unique identifier of the datum
- * @param priority by how much should the priority
- * change?
- * @param replication by how much should the replication
- * change?
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param cont continuation called with success or failure status
- * @param cons_cls continuation closure
- */
-typedef void
-(*PluginUpdate) (void *cls,
- uint64_t uid,
- uint32_t priority,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expire,
- PluginUpdateCont cont,
- void *cont_cls);
-
-
/**
* Select a single item from the datastore (among those applicable).
*
*/
PluginPut put;
- /**
- * Update the priority for a particular key in the datastore. If
- * the expiration time in value is different than the time found in
- * the datastore, the higher value should be kept. For the
- * anonymity level, the lower value is to be used. The specified
- * priority should be added to the existing priority, ignoring the
- * priority in value.
- */
- PluginUpdate update;
-
/**
* Get a particular datum matching a given hash from the datastore.
*/