+ req->slave_key = *slave_key;
+ req->do_membership_test = GNUNET_YES;
+ }
+
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = op_create (h, h->op, result_cb, cls);
+ op->fragment_cb = fragment_cb;
+ op->cls = cls;
+ return op_send (h, op, env, &req->op_id);
+}
+
+
+/**
+ * Retrieve latest message fragments.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param slave_key
+ * The slave requesting the fragment. If not NULL, a membership test is
+ * performed first and the fragment is only returned if the slave has
+ * access to it.
+ * @param first_fragment_id
+ * First fragment ID to retrieve.
+ * Use 0 to get the latest message fragment.
+ * @param last_fragment_id
+ * Last consecutive fragment ID to retrieve.
+ * Use 0 to get the latest message fragment.
+ * @param fragment_limit
+ * Maximum number of fragments to retrieve.
+ * @param fragment_cb
+ * Callback to call with the retrieved fragments.
+ * @param result_cb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ uint64_t fragment_limit,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback result_cb,
+ void *cls)
+{
+ struct FragmentGetRequest *req;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
+ req->channel_key = *channel_key;
+ req->fragment_limit = GNUNET_ntohll (fragment_limit);
+ if (NULL != slave_key)
+ {
+ req->slave_key = *slave_key;
+ req->do_membership_test = GNUNET_YES;
+ }
+
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = op_create (h, h->op, result_cb, cls);
+ op->fragment_cb = fragment_cb;
+ op->cls = cls;
+ return op_send (h, op, env, &req->op_id);
+}
+
+
+/**
+ * Retrieve all fragments of messages in a message ID range.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param slave_key
+ * The slave requesting the message.
+ * If not NULL, a membership test is performed first
+ * and the message is only returned if the slave has access to it.
+ * @param first_message_id
+ * First message ID to retrieve.
+ * @param last_message_id
+ * Last consecutive message ID to retrieve.
+ * @param fragment_limit
+ * Maximum number of fragments to retrieve.
+ * @param method_prefix
+ * Retrieve only messages with a matching method prefix.
+ * @todo Implement method_prefix query.
+ * @param fragment_cb
+ * Callback to call with the retrieved fragments.
+ * @param result_cb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ uint64_t first_message_id,
+ uint64_t last_message_id,
+ uint64_t fragment_limit,
+ const char *method_prefix,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback result_cb,
+ void *cls)
+{
+ struct MessageGetRequest *req;
+ if (NULL == method_prefix)
+ method_prefix = "";
+ uint16_t method_size = strnlen (method_prefix,
+ GNUNET_MAX_MESSAGE_SIZE
+ - sizeof (*req)) + 1;
+
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg_extra (req, method_size,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
+ req->channel_key = *channel_key;
+ req->first_message_id = GNUNET_htonll (first_message_id);
+ req->last_message_id = GNUNET_htonll (last_message_id);
+ req->fragment_limit = GNUNET_htonll (fragment_limit);
+ if (NULL != slave_key)
+ {
+ req->slave_key = *slave_key;
+ req->do_membership_test = GNUNET_YES;
+ }
+ GNUNET_memcpy (&req[1], method_prefix, method_size);
+ ((char *) &req[1])[method_size - 1] = '\0';
+
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = op_create (h, h->op, result_cb, cls);
+ op->fragment_cb = fragment_cb;
+ op->cls = cls;
+ return op_send (h, op, env, &req->op_id);
+}
+
+
+/**
+ * Retrieve all fragments of the latest messages.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param slave_key
+ * The slave requesting the message.
+ * If not NULL, a membership test is performed first
+ * and the message is only returned if the slave has access to it.
+ * @param message_limit
+ * Maximum number of messages to retrieve.
+ * @param method_prefix
+ * Retrieve only messages with a matching method prefix.
+ * @todo Implement method_prefix query.
+ * @param fragment_cb
+ * Callback to call with the retrieved fragments.
+ * @param result_cb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ uint64_t message_limit,
+ const char *method_prefix,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback result_cb,
+ void *cls)
+{
+ struct MessageGetRequest *req;
+
+ if (NULL == method_prefix)
+ method_prefix = "";
+ uint16_t method_size = strnlen (method_prefix,
+ GNUNET_MAX_MESSAGE_SIZE
+ - sizeof (*req)) + 1;
+ GNUNET_assert ('\0' == method_prefix[method_size - 1]);
+
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg_extra (req, method_size,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
+ req->channel_key = *channel_key;
+ req->message_limit = GNUNET_ntohll (message_limit);
+ if (NULL != slave_key)
+ {
+ req->slave_key = *slave_key;
+ req->do_membership_test = GNUNET_YES;
+ }
+ GNUNET_memcpy (&req[1], method_prefix, method_size);
+
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = op_create (h, h->op, result_cb, cls);
+ op->fragment_cb = fragment_cb;
+ op->cls = cls;
+ return op_send (h, op, env, &req->op_id);
+}
+
+
+/**
+ * Retrieve a fragment of message specified by its message ID and fragment
+ * offset.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param slave_key
+ * The slave requesting the message fragment. If not NULL, a membership
+ * test is performed first and the message fragment is only returned
+ * if the slave has access to it.
+ * @param message_id
+ * Message ID to retrieve. Use 0 to get the latest message.
+ * @param fragment_offset
+ * Offset of the fragment to retrieve.
+ * @param fragment_cb
+ * Callback to call with the retrieved fragments.
+ * @param result_cb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ uint64_t message_id,
+ uint64_t fragment_offset,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback result_cb,
+ void *cls)
+{
+ struct MessageGetFragmentRequest *req;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
+
+ req->channel_key = *channel_key;
+ req->message_id = GNUNET_htonll (message_id);
+ req->fragment_offset = GNUNET_htonll (fragment_offset);
+ if (NULL != slave_key)
+ {
+ req->slave_key = *slave_key;
+ req->do_membership_test = GNUNET_YES;
+ }
+
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = op_create (h, h->op, result_cb, cls);
+ op->fragment_cb = fragment_cb;
+ op->cls = cls;
+ return op_send (h, op, env, &req->op_id);
+}
+
+
+/**
+ * Retrieve latest values of counters for a channel master.
+ *
+ * The current value of counters are needed when a channel master is restarted,
+ * so that it can continue incrementing the counters from their last value.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * Public key that identifies the channel.
+ * @param ccb
+ * Callback to call with the result.
+ * @param ccb_cls
+ * Closure for the @a ccb callback.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
+ struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ GNUNET_PSYCSTORE_CountersCallback counters_cb,
+ void *cls)
+{
+ struct OperationRequest *req;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
+ req->channel_key = *channel_key;
+
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = op_create (h, h->op, NULL, NULL);
+ op->counters_cb = counters_cb;
+ op->cls = cls;
+ return op_send (h, op, env, &req->op_id);
+}
+
+
+/**
+ * Apply modifiers of a message to the current channel state.
+ *
+ * An error is returned if there are missing messages containing state
+ * operations before the current one.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param message_id
+ * ID of the message that contains the @a modifiers.
+ * @param state_delta
+ * Value of the _state_delta PSYC header variable of the message.
+ * @param result_cb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for @a result_cb.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ uint64_t message_id,
+ uint64_t state_delta,
+ GNUNET_PSYCSTORE_ResultCallback result_cb,
+ void *cls)
+{
+ struct StateModifyRequest *req;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
+ req->channel_key = *channel_key;
+ req->message_id = GNUNET_htonll (message_id);
+ req->state_delta = GNUNET_htonll (state_delta);
+
+ return op_send (h, op_create (h, h->op, result_cb, cls),
+ env, &req->op_id);
+}
+
+
+struct StateSyncClosure
+{
+ GNUNET_PSYCSTORE_ResultCallback result_cb;
+ void *cls;
+ uint8_t last;
+};
+
+
+static void
+state_sync_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
+{
+ struct StateSyncClosure *ssc = cls;
+ if (GNUNET_OK != result || ssc->last)
+ ssc->result_cb (ssc->cls, result, err_msg, err_msg_size);
+ GNUNET_free (ssc);
+}
+
+
+/**
+ * Store synchronized state.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param max_state_message_id
+ * ID of the last stateful message before @a state_hash_message_id.
+ * @param state_hash_message_id
+ * ID of the message that contains the state_hash PSYC header variable.
+ * @param modifier_count
+ * Number of elements in the @a modifiers array.
+ * @param modifiers
+ * Full state to store.
+ * @param result_cb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for the callback.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ uint64_t max_state_message_id,
+ uint64_t state_hash_message_id,
+ size_t modifier_count,
+ const struct GNUNET_PSYC_Modifier *modifiers,
+ GNUNET_PSYCSTORE_ResultCallback result_cb,
+ void *cls)
+{
+ struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
+ size_t i;
+
+ for (i = 0; i < modifier_count; i++) {
+ struct StateSyncRequest *req;
+ uint16_t name_size = strlen (modifiers[i].name) + 1;
+
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg_extra (req,
+ sizeof (*req) + name_size + modifiers[i].value_size,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
+
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
+ req->header.size = htons (sizeof (*req) + name_size
+ + modifiers[i].value_size);
+ req->channel_key = *channel_key;
+ req->max_state_message_id = GNUNET_htonll (max_state_message_id);
+ req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
+ req->name_size = htons (name_size);
+ req->flags
+ = (0 == i)
+ ? STATE_OP_FIRST
+ : (modifier_count - 1 == i)
+ ? STATE_OP_LAST
+ : 0;
+
+ GNUNET_memcpy (&req[1], modifiers[i].name, name_size);
+ GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
+
+ struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc));
+ ssc->last = (req->flags & STATE_OP_LAST);
+ ssc->result_cb = result_cb;
+ ssc->cls = cls;
+
+ op_send (h, op_create (h, h->op, state_sync_result, ssc),
+ env, &req->op_id);