From cb073fc0a243ccd29cb0dc7eb5c7e7cb33b6e97f Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Thu, 4 Aug 2016 20:10:17 +0000 Subject: [PATCH] psycstore: switch to MQ --- src/include/gnunet_psycstore_service.h | 90 +- src/psycstore/gnunet-service-psycstore.c | 5 + src/psycstore/psycstore_api.c | 1159 +++++++++------------- 3 files changed, 497 insertions(+), 757 deletions(-) diff --git a/src/include/gnunet_psycstore_service.h b/src/include/gnunet_psycstore_service.h index f6c6bff03..b1be5246e 100644 --- a/src/include/gnunet_psycstore_service.h +++ b/src/include/gnunet_psycstore_service.h @@ -149,9 +149,9 @@ typedef void * In case of a part, the last group generation the slave has access to. * It has relevance when a larger message have fragments with different * group generations. - * @param rcb + * @param result_cb * Callback to call with the result of the storage operation. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Operation handle that can be used to cancel the operation. @@ -164,8 +164,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, uint64_t announced_at, uint64_t effective_since, uint64_t group_generation, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls); + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls); /** @@ -188,9 +188,9 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, * Group generation of the fragment of the message to test. * It has relevance if the message consists of multiple fragments with * different group generations. - * @param rcb + * @param result_cb * Callback to call with the test result. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Operation handle that can be used to cancel the operation. @@ -201,8 +201,8 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t message_id, uint64_t group_generation, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls); + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls); /** @@ -213,8 +213,8 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, * @param msg Message to store. * @param psycstore_flags Flags indicating whether the PSYC message contains * state modifiers. - * @param rcb Callback to call with the result of the operation. - * @param rcb_cls Closure for the callback. + * @param result_cb Callback to call with the result of the operation. + * @param cls Closure for the callback. * * @return Handle that can be used to cancel the operation. */ @@ -223,8 +223,8 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const struct GNUNET_MULTICAST_MessageHeader *msg, enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls); + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls); /** @@ -303,7 +303,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, * Maximum number of fragments to retrieve. * @param fragment_cb * Callback to call with the retrieved fragments. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -316,7 +316,7 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t fragment_limit, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls); @@ -392,7 +392,7 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, uint64_t message_limit, const char *method_prefix, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls); @@ -474,9 +474,9 @@ typedef void * Handle for the PSYCstore. * @param channel_key * Public key that identifies the channel. - * @param ccb + * @param counters_cb * Callback to call with the result. - * @param ccb_cls + * @param cls * Closure for the @a ccb callback. * * @return Handle that can be used to cancel the operation. @@ -484,8 +484,8 @@ typedef void struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - GNUNET_PSYCSTORE_CountersCallback ccb, - void *ccb_cls); + GNUNET_PSYCSTORE_CountersCallback counters_cb, + void *cls); /** @@ -502,10 +502,10 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, * ID of the message that contains the @a modifiers. * @param state_delta * Value of the @e state_delta PSYC header variable of the message. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls - * Closure for the @a rcb callback. + * @param cls + * Closure for the @a result_cb callback. * * @return Handle that can be used to cancel the operation. */ @@ -514,8 +514,8 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, uint64_t state_delta, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls); + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls); /** @@ -533,9 +533,9 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, * Number of elements in the @a modifiers array. * @param modifiers * Full state to store. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Handle that can be used to cancel the operation. @@ -547,8 +547,8 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, uint64_t state_hash_message_id, size_t modifier_count, const struct GNUNET_PSYC_Modifier *modifiers, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls); + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls); @@ -561,9 +561,9 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, * Handle for the PSYCstore. * @param channel_key * The channel we are interested in. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Handle that can be used to cancel the operation. @@ -572,8 +572,8 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls); + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls); /** @@ -587,9 +587,9 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, * Message ID that contained the state @a hash. * @param hash * Hash of the serialized full state. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls + * @param cls * Closure for the callback. * */ @@ -598,8 +598,8 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, const struct GNUNET_HashCode *hash, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls); + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls); /** @@ -632,9 +632,9 @@ typedef int * The channel we are interested in. * @param name * Name of variable to match, the returned variable might be less specific. - * @param scb + * @param state_cb * Callback to return the matching state variable. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -645,8 +645,8 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const char *name, - GNUNET_PSYCSTORE_StateCallback scb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_StateCallback state_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls); @@ -659,9 +659,9 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, * The channel we are interested in. * @param name_prefix * Prefix of state variable names to match. - * @param scb + * @param state_cb * Callback to return matching state variables. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -672,8 +672,8 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const char *name_prefix, - GNUNET_PSYCSTORE_StateCallback scb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_StateCallback state_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls); @@ -682,7 +682,7 @@ GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, * * @param op Handle for the operation to cancel. */ -void +int GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op); diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c index 33e894b5e..c9a6f22b8 100644 --- a/src/psycstore/gnunet-service-psycstore.c +++ b/src/psycstore/gnunet-service-psycstore.c @@ -520,6 +520,11 @@ recv_state_message_part (void *cls, struct StateModifyClosure *scls = cls; uint16_t psize; + if (NULL == msg) + { // FIXME: error on unknown message + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "recv_state_message_part() message_id: %" PRIu64 ", fragment_offset: %" PRIu64 ", flags: %u\n", diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c index 379483e80..94b7ff9f5 100644 --- a/src/psycstore/psycstore_api.c +++ b/src/psycstore/psycstore_api.c @@ -37,8 +37,6 @@ #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__) -typedef void (*DataCallback) (); - /** * Handle for an operation with the PSYCstore service. */ @@ -51,40 +49,28 @@ struct GNUNET_PSYCSTORE_OperationHandle struct GNUNET_PSYCSTORE_Handle *h; /** - * We keep operations in a DLL. - */ - struct GNUNET_PSYCSTORE_OperationHandle *next; - - /** - * We keep operations in a DLL. + * Data callbacks. */ - struct GNUNET_PSYCSTORE_OperationHandle *prev; + union { + GNUNET_PSYCSTORE_FragmentCallback fragment_cb; + GNUNET_PSYCSTORE_CountersCallback counters_cb; + GNUNET_PSYCSTORE_StateCallback state_cb; + }; /** - * Continuation to invoke with the result of an operation. + * Closure for callbacks. */ - GNUNET_PSYCSTORE_ResultCallback res_cb; - - /** - * Continuation to invoke with the result of an operation returning data. - */ - DataCallback data_cb; + void *cls; /** - * Closure for the callbacks. + * Message envelope. */ - void *cls; + struct GNUNET_MQ_Envelope *env; /** * Operation ID. */ uint64_t op_id; - - /** - * Message to send to the PSYCstore service. - * Allocated at the end of this struct. - */ - const struct GNUNET_MessageHeader *msg; }; @@ -99,34 +85,15 @@ struct GNUNET_PSYCSTORE_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). - */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Head of operations to transmit. + * Client connection. */ - struct GNUNET_PSYCSTORE_OperationHandle *transmit_head; + struct GNUNET_MQ_Handle *mq; - /** - * Tail of operations to transmit. - */ - struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail; /** - * Head of active operations waiting for response. + * Async operations. */ - struct GNUNET_PSYCSTORE_OperationHandle *op_head; - - /** - * Tail of active operations waiting for response. - */ - struct GNUNET_PSYCSTORE_OperationHandle *op_tail; - - /** - * Currently pending transmission request, or NULL for none. - */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_OP_Handle *op; /** * Task doing exponential back-off trying to reconnect. @@ -134,395 +101,258 @@ struct GNUNET_PSYCSTORE_Handle struct GNUNET_SCHEDULER_Task *reconnect_task; /** - * Time for next connect retry. + * Delay for next connect retry. */ struct GNUNET_TIME_Relative reconnect_delay; - /** - * Last operation ID used. - */ - uint64_t last_op_id; + GNUNET_PSYCSTORE_FragmentCallback *fragment_cb; + + GNUNET_PSYCSTORE_CountersCallback *counters_cb; + + GNUNET_PSYCSTORE_StateCallback *state_cb; /** - * Are we polling for incoming messages right now? + * Closure for callbacks. */ - uint8_t in_receive; + void *cb_cls; }; -/** - * Get a fresh operation ID to distinguish between PSYCstore requests. - * - * @param h Handle to the PSYCstore service. - * @return next operation id to use - */ -static uint64_t -get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h) +static int +check_result_code (void *cls, const struct OperationResult *opres) { - return h->last_op_id++; -} - - -/** - * Find operation by ID. - * - * @return OperationHandle if found, or NULL otherwise. - */ -static struct GNUNET_PSYCSTORE_OperationHandle * -find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id) -{ - struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; - while (NULL != op) + uint16_t size = ntohs (opres->header.size); + const char *str = (const char *) &opres[1]; + if ( (sizeof (struct OperationResult) < size) && + ('\0' != str[size - sizeof (*opres) - 1]) ) { - if (op->op_id == op_id) - return op; - op = op->next; + GNUNET_break (0); + return GNUNET_SYSERR; } - return NULL; -} - -/** - * Try again to connect to the PSYCstore service. - * - * @param cls handle to the PSYCstore service. - */ -static void -reconnect (void *cls); + return GNUNET_OK; +} -/** - * Reschedule a connect attempt to the service. - * - * @param h transport service to reconnect - */ static void -reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h) +handle_result_code (void *cls, const struct OperationResult *opres) { - GNUNET_assert (h->reconnect_task == NULL); + struct GNUNET_PSYCSTORE_Handle *h = cls; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; + uint16_t size = ntohs (opres->header.size); + + const char * + str = (sizeof (*opres) < size) ? (const char *) &opres[1] : ""; - if (NULL != h->th) + if (GNUNET_YES == GNUNET_OP_result (h->op, GNUNET_ntohll (opres->op_id), + GNUNET_ntohll (opres->result_code) + INT64_MIN, + str, size - sizeof (*opres), (void **) &op)) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_code: Received result message with operation ID: %" PRIu64 "\n", + GNUNET_ntohll (opres->op_id)); + GNUNET_free (op); } - if (NULL != h->client) + else { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_code: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (opres->op_id)); } - h->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to PSYCstore service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); - h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); - h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } -/** - * Schedule transmission of the next message from our queue. - * - * @param h PSYCstore handle - */ static void -transmit_next (struct GNUNET_PSYCSTORE_Handle *h); - - -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +handle_result_counters (void *cls, const struct CountersResult *cres) { struct GNUNET_PSYCSTORE_Handle *h = cls; - struct GNUNET_PSYCSTORE_OperationHandle *op; - const struct OperationResult *opres; - const struct CountersResult *cres; - const struct FragmentResult *fres; - const struct StateResult *sres; - const char *str; - - if (NULL == msg) - { - reschedule_connect (h); - return; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %d from PSYCstore service.\n", - ntohs (msg->type)); - uint16_t size = ntohs (msg->size); - uint16_t type = ntohs (msg->type); - switch (type) - { - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE: - if (size < sizeof (struct OperationResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected >= %lu\n", - type, size, sizeof (struct OperationResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } - - opres = (const struct OperationResult *) msg; - str = (const char *) &opres[1]; - if ( (size > sizeof (struct OperationResult)) && - ('\0' != str[size - sizeof (struct OperationResult) - 1]) ) - { - GNUNET_break (0); - reschedule_connect (h); - return; - } - if (size == sizeof (struct OperationResult)) - str = ""; - - op = find_op_by_id (h, GNUNET_ntohll (opres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %" PRIu64 ".\n", - type, GNUNET_ntohll (opres->op_id)); - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received result message (type %d) with operation ID: %" PRIu64 "\n", - type, op->op_id); - - int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN; - GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); - if (NULL != op->res_cb) - { - const struct StateSyncRequest *ssreq; - switch (ntohs (op->msg->type)) - { - case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC: - ssreq = (const struct StateSyncRequest *) op->msg; - if (!(ssreq->flags & STATE_OP_LAST - || GNUNET_OK != result_code)) - op->res_cb = NULL; - break; - } - } - if (NULL != op->res_cb) - op->res_cb (op->cls, result_code, str, size - sizeof (*opres)); - GNUNET_free (op); - } - break; - - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS: - if (size != sizeof (struct CountersResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected %lu\n", - type, size, sizeof (struct CountersResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } - - cres = (const struct CountersResult *) msg; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; - op = find_op_by_id (h, GNUNET_ntohll (cres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %" PRIu64 ".\n", - type, GNUNET_ntohll (cres->op_id)); - } - else + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (cres->op_id), + NULL, NULL, (void **) &op)) + { + GNUNET_assert (NULL != op); + if (NULL != op->counters_cb) { - GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); - if (NULL != op->data_cb) - ((GNUNET_PSYCSTORE_CountersCallback) - op->data_cb) (op->cls, + op->counters_cb (op->cls, ntohl (cres->result_code), GNUNET_ntohll (cres->max_fragment_id), GNUNET_ntohll (cres->max_message_id), GNUNET_ntohll (cres->max_group_generation), GNUNET_ntohll (cres->max_state_message_id)); - GNUNET_free (op); } - break; + GNUNET_OP_remove (h->op, GNUNET_ntohll (cres->op_id)); + GNUNET_free (op); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_counters: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (cres->op_id)); + } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT: - if (size < sizeof (struct FragmentResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected >= %lu\n", - type, size, sizeof (struct FragmentResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } - fres = (const struct FragmentResult *) msg; - struct GNUNET_MULTICAST_MessageHeader *mmsg = - (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; - if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected = %lu\n", - type, size, - sizeof (struct FragmentResult) + ntohs (mmsg->header.size)); - GNUNET_break (0); - reschedule_connect (h); - return; - } +static int +check_result_fragment (void *cls, const struct FragmentResult *fres) +{ + uint16_t size = ntohs (fres->header.size); + struct GNUNET_MULTICAST_MessageHeader *mmsg = + (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; + if (sizeof (*fres) + sizeof (*mmsg) < size + && sizeof (*fres) + ntohs (mmsg->header.size) != size) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "check_result_fragment: Received message with invalid length %lu bytes.\n", + size, sizeof (*fres)); + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} - op = find_op_by_id (h, GNUNET_ntohll (fres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %" PRIu64 ".\n", - type, GNUNET_ntohll (fres->op_id)); - } - else - { - if (NULL != op->data_cb) - ((GNUNET_PSYCSTORE_FragmentCallback) - op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags)); - } - break; - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE: - if (size < sizeof (struct StateResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected >= %lu\n", - type, size, sizeof (struct StateResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } +static void +handle_result_fragment (void *cls, const struct FragmentResult *fres) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; - sres = (const struct StateResult *) msg; - const char *name = (const char *) &sres[1]; - uint16_t name_size = ntohs (sres->name_size); + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (fres->op_id), + NULL, NULL, (void **) &op)) + { + GNUNET_assert (NULL != op); + if (NULL != op->fragment_cb) + op->fragment_cb (op->cls, + (struct GNUNET_MULTICAST_MessageHeader *) &fres[1], + ntohl (fres->psycstore_flags)); + //GNUNET_OP_remove (h->op, GNUNET_ntohll (fres->op_id)); + //GNUNET_free (op); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_fragment: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (fres->op_id)); + } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} - if (name_size <= 2 || '\0' != name[name_size - 1]) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received state result message (type %d) with invalid name.\n", - type); - GNUNET_break (0); - reschedule_connect (h); - return; - } - op = find_op_by_id (h, GNUNET_ntohll (sres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %" PRIu64 ".\n", - type, GNUNET_ntohll (sres->op_id)); - } - else - { - if (NULL != op->data_cb) - ((GNUNET_PSYCSTORE_StateCallback) - op->data_cb) (op->cls, name, (char *) &sres[1] + name_size, - ntohs (sres->header.size) - sizeof (*sres) - name_size); - } - break; +static int +check_result_state (void *cls, const struct StateResult *sres) +{ + const char *name = (const char *) &sres[1]; + uint16_t name_size = ntohs (sres->name_size); - default: + if (name_size <= 2 || '\0' != name[name_size - 1]) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "check_result_state: Received state result message with invalid name.\n"); GNUNET_break (0); - reschedule_connect (h); - return; + return GNUNET_SYSERR; } - - GNUNET_CLIENT_receive (h->client, &message_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); + return GNUNET_OK; } -/** - * Transmit next message to service. - * - * @param cls The 'struct GNUNET_PSYCSTORE_Handle'. - * @param size Number of bytes available in buf. - * @param buf Where to copy the message. - * @return Number of bytes copied to buf. - */ -static size_t -send_next_message (void *cls, size_t size, void *buf) +static void +handle_result_state (void *cls, const struct StateResult *sres) { struct GNUNET_PSYCSTORE_Handle *h = cls; - struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; - size_t ret; - - h->th = NULL; - if (NULL == op) - return 0; - ret = ntohs (op->msg->size); - if (ret > size) - { - reschedule_connect (h); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n", - ntohs (op->msg->type), op->op_id); - GNUNET_memcpy (buf, op->msg, ret); + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; - GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); + const char *name = (const char *) &sres[1]; + uint16_t name_size = ntohs (sres->name_size); - if (NULL == op->res_cb && NULL == op->data_cb) + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (sres->op_id), + NULL, NULL, (void **) &op)) { - GNUNET_free (op); + GNUNET_assert (NULL != op); + if (NULL != op->state_cb) + op->state_cb (op->cls, name, (char *) &sres[1] + name_size, + ntohs (sres->header.size) - sizeof (*sres) - name_size); + //GNUNET_OP_remove (h->op, GNUNET_ntohll (sres->op_id)); + //GNUNET_free (op); } else { - GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_state: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (sres->op_id)); } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} - if (NULL != h->transmit_head) - transmit_next (h); - if (GNUNET_NO == h->in_receive) - { - h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, &message_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return ret; -} +static void +reconnect (void *cls); /** - * Schedule transmission of the next message from our queue. + * Client disconnected from service. * - * @param h PSYCstore handle. + * Reconnect after backoff period.= */ static void -transmit_next (struct GNUNET_PSYCSTORE_Handle *h) +disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Origin client disconnected (%d), re-connecting\n", + (int) error); + if (NULL != h->mq) + { + GNUNET_MQ_destroy (h->mq); + GNUNET_OP_destroy (h->op); + h->mq = NULL; + h->op = NULL; + } + + h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, + &reconnect, h); + h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); +} + + +static void +do_connect (struct GNUNET_PSYCSTORE_Handle *h) { - if (NULL != h->th || NULL == h->client) - return; - - struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; - if (NULL == op) - return; - - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, - ntohs (op->msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &send_next_message, - h); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Connecting to PSYCstore service.\n"); + + GNUNET_MQ_hd_var_size (result_code, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE, + struct OperationResult); + + GNUNET_MQ_hd_fixed_size (result_counters, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS, + struct CountersResult); + + GNUNET_MQ_hd_var_size (result_fragment, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT, + struct FragmentResult); + + GNUNET_MQ_hd_var_size (result_state, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE, + struct StateResult); + + struct GNUNET_MQ_MessageHandler handlers[] = { + make_result_code_handler (h), + make_result_counters_handler (h), + make_result_fragment_handler (h), + make_result_state_handler (h), + GNUNET_MQ_handler_end () + }; + + h->op = GNUNET_OP_create (); + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connecT (h->cfg, "psycstore", + handlers, disconnected, h); + GNUNET_assert (NULL != h->mq); } @@ -534,15 +364,7 @@ transmit_next (struct GNUNET_PSYCSTORE_Handle *h) static void reconnect (void *cls) { - struct GNUNET_PSYCSTORE_Handle *h = cls; - - h->reconnect_task = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to PSYCstore service.\n"); - GNUNET_assert (NULL == h->client); - h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg); - GNUNET_assert (NULL != h->client); - transmit_next (h); + do_connect (cls); } @@ -558,8 +380,8 @@ GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) struct GNUNET_PSYCSTORE_Handle *h = GNUNET_new (struct GNUNET_PSYCSTORE_Handle); h->cfg = cfg; - h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h); + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; + do_connect (h); return h; } @@ -578,20 +400,79 @@ GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) GNUNET_SCHEDULER_cancel (h->reconnect_task); h->reconnect_task = NULL; } - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + // FIXME: free data structures for pending operations + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } GNUNET_free (h); } +/** + * Message sent notification. + * + * Remove invalidated envelope pointer. + */ +static void +message_sent (void *cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle *op = cls; + op->env = NULL; +} + + +/** + * Create a new operation. + */ +static struct GNUNET_PSYCSTORE_OperationHandle * +op_create (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_OP_Handle *hop, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op)); + op->h = h; + op->op_id = GNUNET_OP_add (hop, + (GNUNET_ResultCallback) result_cb, + cls, op); + return op; +} + + +/** + * Send a message associated with an operation. + * + * @param h + * PSYCstore handle. + * @param op + * Operation handle. + * @param env + * Message envelope to send. + * @param[out] op_id + * Operation ID to write in network byte order. NULL if not needed. + * + * @return Operation handle. + * + */ +static struct GNUNET_PSYCSTORE_OperationHandle * +op_send (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_PSYCSTORE_OperationHandle *op, + struct GNUNET_MQ_Envelope *env, + uint64_t *op_id) +{ + op->env = env; + if (NULL != op_id) + *op_id = GNUNET_htonll (op->op_id); + + GNUNET_MQ_notify_sent (env, message_sent, op); + GNUNET_MQ_send (h->mq, env); + return op; +} + + /** * Cancel a PSYCstore operation. Note that the operation MAY still * be executed; this merely cancels the continuation; if the request @@ -599,32 +480,26 @@ GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) * the operation. * * @param op Operation to cancel. + * + * @return #GNUNET_YES if message was not sent yet and got discarded, + * #GNUNET_NO if it was already sent, and only the callbacks got cancelled. */ -void +int GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) { struct GNUNET_PSYCSTORE_Handle *h = op->h; + int ret = GNUNET_NO; - if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client)) + if (NULL != op->env) { - /* request not active, can simply remove */ - GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); - GNUNET_free (op); - return; + GNUNET_MQ_send_cancel (op->env); + ret = GNUNET_YES; } - if (NULL != h->th) - { - /* request active but not yet with service, can still abort */ - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); - GNUNET_free (op); - transmit_next (h); - return; - } - /* request active with service, simply ensure continuations are not called */ - op->res_cb = NULL; - op->data_cb = NULL; + + GNUNET_OP_remove (h->op, op->op_id); + GNUNET_free (op); + + return ret; } @@ -649,9 +524,9 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) * In case of a part, the last group generation the slave has access to. * It has relevance when a larger message have fragments with different * group generations. - * @param rcb + * @param result_cb * Callback to call with the result of the storage operation. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Operation handle that can be used to cancel the operation. @@ -664,8 +539,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, uint64_t announced_at, uint64_t effective_since, uint64_t group_generation, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { GNUNET_assert (NULL != h); GNUNET_assert (NULL != channel_key); @@ -676,16 +551,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, : effective_since == 0); struct MembershipStoreRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct MembershipStoreRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE); req->channel_key = *channel_key; req->slave_key = *slave_key; req->did_join = did_join; @@ -693,13 +560,9 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, req->effective_since = GNUNET_htonll (effective_since); req->group_generation = GNUNET_htonll (group_generation); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } @@ -723,9 +586,9 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, * Group generation of the fragment of the message to test. * It has relevance if the message consists of multiple fragments with * different group generations. - * @param rcb + * @param result_cb * Callback to call with the test result. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Operation handle that can be used to cancel the operation. @@ -736,32 +599,20 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t message_id, uint64_t group_generation, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct MembershipTestRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct MembershipTestRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST); req->channel_key = *channel_key; req->slave_key = *slave_key; req->message_id = GNUNET_htonll (message_id); req->group_generation = GNUNET_htonll (group_generation); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } @@ -773,8 +624,8 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, * @param message Message to store. * @param psycstore_flags Flags indicating whether the PSYC message contains * state modifiers. - * @param rcb Callback to call with the result of the operation. - * @param rcb_cls Closure for the callback. + * @param result_cb Callback to call with the result of the operation. + * @param cls Closure for the callback. * * @return Handle that can be used to cancel the operation. */ @@ -783,32 +634,21 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const struct GNUNET_MULTICAST_MessageHeader *msg, enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { uint16_t size = ntohs (msg->header.size); struct FragmentStoreRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct FragmentStoreRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE); - req->header.size = htons (sizeof (*req) + size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE); req->channel_key = *channel_key; req->psycstore_flags = htonl (psycstore_flags); GNUNET_memcpy (&req[1], msg, size); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } @@ -833,7 +673,7 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, * Maximum number of fragments to retrieve. * @param fragment_cb * Callback to call with the retrieved fragments. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -847,21 +687,12 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, uint64_t first_fragment_id, uint64_t last_fragment_id, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct FragmentGetRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fragment_cb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct FragmentGetRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); req->channel_key = *channel_key; req->first_fragment_id = GNUNET_htonll (first_fragment_id); req->last_fragment_id = GNUNET_htonll (last_fragment_id); @@ -871,13 +702,11 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, req->do_membership_test = GNUNET_YES; } - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -902,7 +731,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, * Maximum number of fragments to retrieve. * @param fragment_cb * Callback to call with the retrieved fragments. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -915,21 +744,12 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t fragment_limit, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct FragmentGetRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fragment_cb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct FragmentGetRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); req->channel_key = *channel_key; req->fragment_limit = GNUNET_ntohll (fragment_limit); if (NULL != slave_key) @@ -938,13 +758,11 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, req->do_membership_test = GNUNET_YES; } - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -986,7 +804,7 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, uint64_t fragment_limit, const char *method_prefix, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct MessageGetRequest *req; @@ -996,17 +814,9 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*req)) + 1; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fragment_cb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct MessageGetRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); - req->header.size = htons (sizeof (*req) + method_size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, method_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); req->channel_key = *channel_key; req->first_message_id = GNUNET_htonll (first_message_id); req->last_message_id = GNUNET_htonll (last_message_id); @@ -1019,13 +829,11 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, GNUNET_memcpy (&req[1], method_prefix, method_size); ((char *) &req[1])[method_size - 1] = '\0'; - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -1061,7 +869,7 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, uint64_t message_limit, const char *method_prefix, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct MessageGetRequest *req; @@ -1073,17 +881,9 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, - sizeof (*req)) + 1; GNUNET_assert ('\0' == method_prefix[method_size - 1]); - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + method_size); - op->h = h; - op->data_cb = (DataCallback) fragment_cb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct MessageGetRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); - req->header.size = htons (sizeof (*req) + method_size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, method_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); req->channel_key = *channel_key; req->message_limit = GNUNET_ntohll (message_limit); if (NULL != slave_key) @@ -1091,15 +891,13 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, req->slave_key = *slave_key; req->do_membership_test = GNUNET_YES; } - - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); GNUNET_memcpy (&req[1], method_prefix, method_size); - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -1135,21 +933,13 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, uint64_t message_id, uint64_t fragment_offset, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct MessageGetFragmentRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fragment_cb; - op->res_cb = rcb; - op->cls = cls; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT); - req = (struct MessageGetFragmentRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT); - req->header.size = htons (sizeof (*req)); req->channel_key = *channel_key; req->message_id = GNUNET_htonll (message_id); req->fragment_offset = GNUNET_htonll (fragment_offset); @@ -1159,13 +949,11 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, req->do_membership_test = GNUNET_YES; } - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -1189,29 +977,19 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - GNUNET_PSYCSTORE_CountersCallback ccb, - void *ccb_cls) + GNUNET_PSYCSTORE_CountersCallback counters_cb, + void *cls) { struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = ccb; - op->cls = ccb_cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET); req->channel_key = *channel_key; - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, NULL, NULL); + op->counters_cb = counters_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -1229,10 +1007,10 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, * ID of the message that contains the @a modifiers. * @param state_delta * Value of the _state_delta PSYC header variable of the message. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls - * Closure for the @a rcb callback. + * @param cls + * Closure for @a result_cb. * * @return Handle that can be used to cancel the operation. */ @@ -1241,35 +1019,37 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, uint64_t state_delta, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { - struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; struct StateModifyRequest *req; - - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct StateModifyRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); req->channel_key = *channel_key; req->message_id = GNUNET_htonll (message_id); req->state_delta = GNUNET_htonll (state_delta); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); + return op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); +} - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - return op; - /* FIXME: only the last operation is returned, - * operation_cancel() should be able to cancel all of them. - */ +struct StateSyncClosure +{ + GNUNET_PSYCSTORE_ResultCallback result_cb; + void *cls; + uint8_t last; +}; + + +static void +state_sync_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct StateSyncClosure *ssc = cls; + if (GNUNET_OK != result || ssc->last) + ssc->result_cb (ssc->cls, result, err_msg, err_msg_size); + GNUNET_free (ssc); } @@ -1288,9 +1068,9 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, * Number of elements in the @a modifiers array. * @param modifiers * Full state to store. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Handle that can be used to cancel the operation. @@ -1302,8 +1082,8 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, uint64_t state_hash_message_id, size_t modifier_count, const struct GNUNET_PSYC_Modifier *modifiers, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; size_t i; @@ -1312,14 +1092,11 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, struct StateSyncRequest *req; uint16_t name_size = strlen (modifiers[i].name) + 1; - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size + - modifiers[i].value_size); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, + sizeof (*req) + name_size + modifiers[i].value_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); - req = (struct StateSyncRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); req->header.size = htons (sizeof (*req) + name_size + modifiers[i].value_size); @@ -1337,12 +1114,16 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, GNUNET_memcpy (&req[1], modifiers[i].name, name_size); GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); + struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc)); + ssc->last = (req->flags & STATE_OP_LAST); + ssc->result_cb = result_cb; + ssc->cls = cls; - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); + op_send (h, op_create (h, h->op, state_sync_result, ssc), + env, &req->op_id); } + // FIXME: only one operation is returned, + // add pointers to other operations and make all cancellable. return op; } @@ -1356,9 +1137,9 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, * Handle for the PSYCstore. * @param channel_key * The channel we are interested in. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Handle that can be used to cancel the operation. @@ -1367,33 +1148,20 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); req->channel_key = *channel_key; - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } - /** * Update signed values of state variables in the state store. * @@ -1405,9 +1173,9 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, * Message ID that contained the state @a hash. * @param hash * Hash of the serialized full state. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls + * @param cls * Closure for the callback. */ struct GNUNET_PSYCSTORE_OperationHandle * @@ -1415,30 +1183,18 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, const struct GNUNET_HashCode *hash, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct StateHashUpdateRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct StateHashUpdateRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE); req->channel_key = *channel_key; req->hash = *hash; - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } @@ -1451,9 +1207,9 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, * The channel we are interested in. * @param name * Name of variable to match, the returned variable might be less specific. - * @param scb + * @param state_cb * Callback to return the matching state variable. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -1464,37 +1220,26 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const char *name, - GNUNET_PSYCSTORE_StateCallback scb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_StateCallback state_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { size_t name_size = strlen (name) + 1; struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); - op->h = h; - op->data_cb = (DataCallback) scb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET); - req->header.size = htons (sizeof (*req) + name_size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, name_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET); req->channel_key = *channel_key; GNUNET_memcpy (&req[1], name, name_size); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->state_cb = state_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } - /** * Retrieve all state variables for a channel with the given prefix. * @@ -1504,9 +1249,9 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, * The channel we are interested in. * @param name_prefix * Prefix of state variable names to match. - * @param scb + * @param state_cb * Callback to return matching state variables. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -1517,33 +1262,23 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const char *name_prefix, - GNUNET_PSYCSTORE_StateCallback scb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_StateCallback state_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { size_t name_size = strlen (name_prefix) + 1; struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); - op->h = h; - op->data_cb = (DataCallback) scb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX); - req->header.size = htons (sizeof (*req) + name_size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, name_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX); req->channel_key = *channel_key; GNUNET_memcpy (&req[1], name_prefix, name_size); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->state_cb = state_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } /* end of psycstore_api.c */ -- 2.25.1