X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fpsycstore%2Fpsycstore_api.c;h=d79daa35774906a65f2fd6b3a4a1ca27b3586180;hb=b14f2e69adb39f4eaaaf3e26a847f29d256a1c68;hp=b729d7440a4d894552d77e8bf50be3ecf25df758;hpb=bb5fe91d23b0938baa3c4f0e92a83df659df216a;p=oweals%2Fgnunet.git diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c index b729d7440..d79daa357 100644 --- a/src/psycstore/psycstore_api.c +++ b/src/psycstore/psycstore_api.c @@ -37,8 +37,6 @@ #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__) -typedef void (*DataCallback) (); - /** * Handle for an operation with the PSYCstore service. */ @@ -51,40 +49,28 @@ struct GNUNET_PSYCSTORE_OperationHandle struct GNUNET_PSYCSTORE_Handle *h; /** - * We keep operations in a DLL. - */ - struct GNUNET_PSYCSTORE_OperationHandle *next; - - /** - * We keep operations in a DLL. + * Data callbacks. */ - struct GNUNET_PSYCSTORE_OperationHandle *prev; + union { + GNUNET_PSYCSTORE_FragmentCallback fragment_cb; + GNUNET_PSYCSTORE_CountersCallback counters_cb; + GNUNET_PSYCSTORE_StateCallback state_cb; + }; /** - * Continuation to invoke with the result of an operation. + * Closure for callbacks. */ - GNUNET_PSYCSTORE_ResultCallback res_cb; - - /** - * Continuation to invoke with the result of an operation returning data. - */ - DataCallback data_cb; + void *cls; /** - * Closure for the callbacks. + * Message envelope. */ - void *cls; + struct GNUNET_MQ_Envelope *env; /** * Operation ID. */ uint64_t op_id; - - /** - * Message to send to the PSYCstore service. - * Allocated at the end of this struct. - */ - const struct GNUNET_MessageHeader *msg; }; @@ -99,431 +85,272 @@ struct GNUNET_PSYCSTORE_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). - */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Head of operations to transmit. + * Client connection. */ - struct GNUNET_PSYCSTORE_OperationHandle *transmit_head; + struct GNUNET_MQ_Handle *mq; /** - * Tail of operations to transmit. + * Async operations. */ - struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail; - - /** - * Head of active operations waiting for response. - */ - struct GNUNET_PSYCSTORE_OperationHandle *op_head; - - /** - * Tail of active operations waiting for response. - */ - struct GNUNET_PSYCSTORE_OperationHandle *op_tail; - - /** - * Currently pending transmission request, or NULL for none. - */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_OP_Handle *op; /** * Task doing exponential back-off trying to reconnect. */ - struct GNUNET_SCHEDULER_Task * reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** - * Time for next connect retry. + * Delay for next connect retry. */ struct GNUNET_TIME_Relative reconnect_delay; - /** - * Last operation ID used. - */ - uint64_t last_op_id; + GNUNET_PSYCSTORE_FragmentCallback *fragment_cb; + + GNUNET_PSYCSTORE_CountersCallback *counters_cb; + + GNUNET_PSYCSTORE_StateCallback *state_cb; /** - * Are we polling for incoming messages right now? + * Closure for callbacks. */ - uint8_t in_receive; + void *cb_cls; }; -/** - * Get a fresh operation ID to distinguish between PSYCstore requests. - * - * @param h Handle to the PSYCstore service. - * @return next operation id to use - */ -static uint64_t -get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h) +static int +check_result_code (void *cls, const struct OperationResult *opres) { - return h->last_op_id++; -} - - -/** - * Find operation by ID. - * - * @return OperationHandle if found, or NULL otherwise. - */ -static struct GNUNET_PSYCSTORE_OperationHandle * -find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id) -{ - struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; - while (NULL != op) + uint16_t size = ntohs (opres->header.size); + const char *str = (const char *) &opres[1]; + if ( (sizeof (*opres) < size) && + ('\0' != str[size - sizeof (*opres) - 1]) ) { - if (op->op_id == op_id) - return op; - op = op->next; + GNUNET_break (0); + return GNUNET_SYSERR; } - return NULL; -} - -/** - * Try again to connect to the PSYCstore service. - * - * @param cls handle to the PSYCstore service. - * @param tc scheduler context - */ -static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + return GNUNET_OK; +} -/** - * Reschedule a connect attempt to the service. - * - * @param h transport service to reconnect - */ static void -reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h) +handle_result_code (void *cls, const struct OperationResult *opres) { - GNUNET_assert (h->reconnect_task == NULL); + struct GNUNET_PSYCSTORE_Handle *h = cls; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; + uint16_t size = ntohs (opres->header.size); + + const char * + str = (sizeof (*opres) < size) ? (const char *) &opres[1] : ""; - if (NULL != h->th) + if (GNUNET_YES == GNUNET_OP_result (h->op, GNUNET_ntohll (opres->op_id), + GNUNET_ntohll (opres->result_code) + INT64_MIN, + str, size - sizeof (*opres), (void **) &op)) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_code: Received result message with operation ID: %" PRIu64 "\n", + GNUNET_ntohll (opres->op_id)); + GNUNET_free (op); } - if (NULL != h->client) + else { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_code: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (opres->op_id)); } - h->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to PSYCstore service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); - h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); - h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } -/** - * Schedule transmission of the next message from our queue. - * - * @param h PSYCstore handle - */ static void -transmit_next (struct GNUNET_PSYCSTORE_Handle *h); - - -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +handle_result_counters (void *cls, const struct CountersResult *cres) { struct GNUNET_PSYCSTORE_Handle *h = cls; - struct GNUNET_PSYCSTORE_OperationHandle *op; - const struct OperationResult *opres; - const struct CountersResult *cres; - const struct FragmentResult *fres; - const struct StateResult *sres; - const char *str; - - if (NULL == msg) - { - reschedule_connect (h); - return; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %d from PSYCstore service.\n", - ntohs (msg->type)); - uint16_t size = ntohs (msg->size); - uint16_t type = ntohs (msg->type); - switch (type) - { - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE: - if (size < sizeof (struct OperationResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected >= %lu\n", - type, size, sizeof (struct OperationResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } - - opres = (const struct OperationResult *) msg; - str = (const char *) &opres[1]; - if ( (size > sizeof (struct OperationResult)) && - ('\0' != str[size - sizeof (struct OperationResult) - 1]) ) - { - GNUNET_break (0); - reschedule_connect (h); - return; - } - if (size == sizeof (struct OperationResult)) - str = ""; - - op = find_op_by_id (h, GNUNET_ntohll (opres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %" PRIu64 ".\n", - type, GNUNET_ntohll (opres->op_id)); - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received result message (type %d) with operation ID: %" PRIu64 "\n", - type, op->op_id); - - int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN; - GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); - if (NULL != op->res_cb) - { - const struct StateSyncRequest *ssreq; - switch (ntohs (op->msg->type)) - { - case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC: - ssreq = (const struct StateSyncRequest *) op->msg; - if (!(ssreq->flags & STATE_OP_LAST - || GNUNET_OK != result_code)) - op->res_cb = NULL; - break; - } - } - if (NULL != op->res_cb) - op->res_cb (op->cls, result_code, str, size - sizeof (*opres)); - GNUNET_free (op); - } - break; - - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS: - if (size != sizeof (struct CountersResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected %lu\n", - type, size, sizeof (struct CountersResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } - - cres = (const struct CountersResult *) msg; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; - op = find_op_by_id (h, GNUNET_ntohll (cres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %" PRIu64 ".\n", - type, GNUNET_ntohll (cres->op_id)); - } - else + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (cres->op_id), + NULL, NULL, (void **) &op)) + { + GNUNET_assert (NULL != op); + if (NULL != op->counters_cb) { - GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); - if (NULL != op->data_cb) - ((GNUNET_PSYCSTORE_CountersCallback) - op->data_cb) (op->cls, + op->counters_cb (op->cls, ntohl (cres->result_code), GNUNET_ntohll (cres->max_fragment_id), GNUNET_ntohll (cres->max_message_id), GNUNET_ntohll (cres->max_group_generation), GNUNET_ntohll (cres->max_state_message_id)); - GNUNET_free (op); } - break; + GNUNET_OP_remove (h->op, GNUNET_ntohll (cres->op_id)); + GNUNET_free (op); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_counters: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (cres->op_id)); + } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT: - if (size < sizeof (struct FragmentResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected >= %lu\n", - type, size, sizeof (struct FragmentResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } - fres = (const struct FragmentResult *) msg; - struct GNUNET_MULTICAST_MessageHeader *mmsg = - (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; - if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected = %lu\n", - type, size, - sizeof (struct FragmentResult) + ntohs (mmsg->header.size)); - GNUNET_break (0); - reschedule_connect (h); - return; - } +static int +check_result_fragment (void *cls, const struct FragmentResult *fres) +{ + uint16_t size = ntohs (fres->header.size); + struct GNUNET_MULTICAST_MessageHeader *mmsg = + (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; + if (sizeof (*fres) + sizeof (*mmsg) < size + && sizeof (*fres) + ntohs (mmsg->header.size) != size) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "check_result_fragment: Received message with invalid length %lu bytes.\n", + size, sizeof (*fres)); + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} - op = find_op_by_id (h, GNUNET_ntohll (fres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %" PRIu64 ".\n", - type, GNUNET_ntohll (fres->op_id)); - } - else - { - if (NULL != op->data_cb) - ((GNUNET_PSYCSTORE_FragmentCallback) - op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags)); - } - break; - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE: - if (size < sizeof (struct StateResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected >= %lu\n", - type, size, sizeof (struct StateResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } +static void +handle_result_fragment (void *cls, const struct FragmentResult *fres) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; - sres = (const struct StateResult *) msg; - const char *name = (const char *) &sres[1]; - uint16_t name_size = ntohs (sres->name_size); + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (fres->op_id), + NULL, NULL, (void **) &op)) + { + GNUNET_assert (NULL != op); + if (NULL != op->fragment_cb) + op->fragment_cb (op->cls, + (struct GNUNET_MULTICAST_MessageHeader *) &fres[1], + ntohl (fres->psycstore_flags)); + //GNUNET_OP_remove (h->op, GNUNET_ntohll (fres->op_id)); + //GNUNET_free (op); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_fragment: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (fres->op_id)); + } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} - if (name_size <= 2 || '\0' != name[name_size - 1]) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received state result message (type %d) with invalid name.\n", - type); - GNUNET_break (0); - reschedule_connect (h); - return; - } - op = find_op_by_id (h, GNUNET_ntohll (sres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %" PRIu64 ".\n", - type, GNUNET_ntohll (sres->op_id)); - } - else - { - if (NULL != op->data_cb) - ((GNUNET_PSYCSTORE_StateCallback) - op->data_cb) (op->cls, name, (char *) &sres[1] + name_size, - ntohs (sres->header.size) - sizeof (*sres) - name_size); - } - break; +static int +check_result_state (void *cls, const struct StateResult *sres) +{ + const char *name = (const char *) &sres[1]; + uint16_t size = ntohs (sres->header.size); + uint16_t name_size = ntohs (sres->name_size); - default: + if (name_size <= 2 + || size - sizeof (*sres) < name_size + || '\0' != name[name_size - 1]) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "check_result_state: Received state result message with invalid name.\n"); GNUNET_break (0); - reschedule_connect (h); - return; + return GNUNET_SYSERR; } - - GNUNET_CLIENT_receive (h->client, &message_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); + return GNUNET_OK; } -/** - * Transmit next message to service. - * - * @param cls The 'struct GNUNET_PSYCSTORE_Handle'. - * @param size Number of bytes available in buf. - * @param buf Where to copy the message. - * @return Number of bytes copied to buf. - */ -static size_t -send_next_message (void *cls, size_t size, void *buf) +static void +handle_result_state (void *cls, const struct StateResult *sres) { struct GNUNET_PSYCSTORE_Handle *h = cls; - struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; - size_t ret; - - h->th = NULL; - if (NULL == op) - return 0; - ret = ntohs (op->msg->size); - if (ret > size) - { - reschedule_connect (h); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n", - ntohs (op->msg->type), op->op_id); - memcpy (buf, op->msg, ret); + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; - GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); + const char *name = (const char *) &sres[1]; + uint16_t name_size = ntohs (sres->name_size); - if (NULL == op->res_cb && NULL == op->data_cb) + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (sres->op_id), + NULL, NULL, (void **) &op)) { - GNUNET_free (op); + GNUNET_assert (NULL != op); + if (NULL != op->state_cb) + op->state_cb (op->cls, name, (char *) &sres[1] + name_size, + ntohs (sres->header.size) - sizeof (*sres) - name_size); + //GNUNET_OP_remove (h->op, GNUNET_ntohll (sres->op_id)); + //GNUNET_free (op); } else { - GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_state: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (sres->op_id)); } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} - if (NULL != h->transmit_head) - transmit_next (h); - if (GNUNET_NO == h->in_receive) - { - h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, &message_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return ret; -} +static void +reconnect (void *cls); /** - * Schedule transmission of the next message from our queue. + * Client disconnected from service. * - * @param h PSYCstore handle. + * Reconnect after backoff period.= */ static void -transmit_next (struct GNUNET_PSYCSTORE_Handle *h) +disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Origin client disconnected (%d), re-connecting\n", + (int) error); + if (NULL != h->mq) + { + GNUNET_MQ_destroy (h->mq); + GNUNET_OP_destroy (h->op); + h->mq = NULL; + h->op = NULL; + } + + h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, + &reconnect, h); + h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); +} + + +static void +do_connect (struct GNUNET_PSYCSTORE_Handle *h) { - if (NULL != h->th || NULL == h->client) - return; - - struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; - if (NULL == op) - return; - - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, - ntohs (op->msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &send_next_message, - h); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Connecting to PSYCstore service.\n"); + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (result_code, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE, + struct OperationResult, + h), + GNUNET_MQ_hd_fixed_size (result_counters, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS, + struct CountersResult, + h), + GNUNET_MQ_hd_var_size (result_fragment, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT, + struct FragmentResult, + h), + GNUNET_MQ_hd_var_size (result_state, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE, + struct StateResult, + h), + GNUNET_MQ_handler_end () + }; + + h->op = GNUNET_OP_create (); + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connect (h->cfg, "psycstore", + handlers, disconnected, h); + GNUNET_assert (NULL != h->mq); } @@ -531,20 +358,11 @@ transmit_next (struct GNUNET_PSYCSTORE_Handle *h) * Try again to connect to the PSYCstore service. * * @param cls Handle to the PSYCstore service. - * @param tc Scheduler context. */ static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +reconnect (void *cls) { - struct GNUNET_PSYCSTORE_Handle *h = cls; - - h->reconnect_task = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to PSYCstore service.\n"); - GNUNET_assert (NULL == h->client); - h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg); - GNUNET_assert (NULL != h->client); - transmit_next (h); + do_connect (cls); } @@ -560,8 +378,8 @@ GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) struct GNUNET_PSYCSTORE_Handle *h = GNUNET_new (struct GNUNET_PSYCSTORE_Handle); h->cfg = cfg; - h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h); + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; + do_connect (h); return h; } @@ -580,20 +398,79 @@ GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) GNUNET_SCHEDULER_cancel (h->reconnect_task); h->reconnect_task = NULL; } - if (NULL != h->th) + if (NULL != h->mq) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - if (NULL != h->client) - { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + // FIXME: free data structures for pending operations + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } GNUNET_free (h); } +/** + * Message sent notification. + * + * Remove invalidated envelope pointer. + */ +static void +message_sent (void *cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle *op = cls; + op->env = NULL; +} + + +/** + * Create a new operation. + */ +static struct GNUNET_PSYCSTORE_OperationHandle * +op_create (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_OP_Handle *hop, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op)); + op->h = h; + op->op_id = GNUNET_OP_add (hop, + (GNUNET_ResultCallback) result_cb, + cls, op); + return op; +} + + +/** + * Send a message associated with an operation. + * + * @param h + * PSYCstore handle. + * @param op + * Operation handle. + * @param env + * Message envelope to send. + * @param[out] op_id + * Operation ID to write in network byte order. NULL if not needed. + * + * @return Operation handle. + * + */ +static struct GNUNET_PSYCSTORE_OperationHandle * +op_send (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_PSYCSTORE_OperationHandle *op, + struct GNUNET_MQ_Envelope *env, + uint64_t *op_id) +{ + op->env = env; + if (NULL != op_id) + *op_id = GNUNET_htonll (op->op_id); + + GNUNET_MQ_notify_sent (env, message_sent, op); + GNUNET_MQ_send (h->mq, env); + return op; +} + + /** * Cancel a PSYCstore operation. Note that the operation MAY still * be executed; this merely cancels the continuation; if the request @@ -601,32 +478,26 @@ GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) * the operation. * * @param op Operation to cancel. + * + * @return #GNUNET_YES if message was not sent yet and got discarded, + * #GNUNET_NO if it was already sent, and only the callbacks got cancelled. */ -void +int GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) { struct GNUNET_PSYCSTORE_Handle *h = op->h; + int ret = GNUNET_NO; - if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client)) + if (NULL != op->env) { - /* request not active, can simply remove */ - GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); - GNUNET_free (op); - return; + GNUNET_MQ_send_cancel (op->env); + ret = GNUNET_YES; } - if (NULL != h->th) - { - /* request active but not yet with service, can still abort */ - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); - GNUNET_free (op); - transmit_next (h); - return; - } - /* request active with service, simply ensure continuations are not called */ - op->res_cb = NULL; - op->data_cb = NULL; + + GNUNET_OP_remove (h->op, op->op_id); + GNUNET_free (op); + + return ret; } @@ -651,9 +522,9 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) * In case of a part, the last group generation the slave has access to. * It has relevance when a larger message have fragments with different * group generations. - * @param rcb + * @param result_cb * Callback to call with the result of the storage operation. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Operation handle that can be used to cancel the operation. @@ -666,8 +537,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, uint64_t announced_at, uint64_t effective_since, uint64_t group_generation, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { GNUNET_assert (NULL != h); GNUNET_assert (NULL != channel_key); @@ -678,16 +549,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, : effective_since == 0); struct MembershipStoreRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct MembershipStoreRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE); req->channel_key = *channel_key; req->slave_key = *slave_key; req->did_join = did_join; @@ -695,13 +558,9 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, req->effective_since = GNUNET_htonll (effective_since); req->group_generation = GNUNET_htonll (group_generation); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } @@ -725,9 +584,9 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, * Group generation of the fragment of the message to test. * It has relevance if the message consists of multiple fragments with * different group generations. - * @param rcb + * @param result_cb * Callback to call with the test result. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Operation handle that can be used to cancel the operation. @@ -738,32 +597,20 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t message_id, uint64_t group_generation, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct MembershipTestRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct MembershipTestRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST); req->channel_key = *channel_key; req->slave_key = *slave_key; req->message_id = GNUNET_htonll (message_id); req->group_generation = GNUNET_htonll (group_generation); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } @@ -775,8 +622,8 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, * @param message Message to store. * @param psycstore_flags Flags indicating whether the PSYC message contains * state modifiers. - * @param rcb Callback to call with the result of the operation. - * @param rcb_cls Closure for the callback. + * @param result_cb Callback to call with the result of the operation. + * @param cls Closure for the callback. * * @return Handle that can be used to cancel the operation. */ @@ -785,32 +632,21 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const struct GNUNET_MULTICAST_MessageHeader *msg, enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { uint16_t size = ntohs (msg->header.size); struct FragmentStoreRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct FragmentStoreRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE); - req->header.size = htons (sizeof (*req) + size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE); req->channel_key = *channel_key; req->psycstore_flags = htonl (psycstore_flags); - memcpy (&req[1], msg, size); + GNUNET_memcpy (&req[1], msg, size); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } @@ -835,7 +671,7 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, * Maximum number of fragments to retrieve. * @param fragment_cb * Callback to call with the retrieved fragments. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -849,21 +685,12 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, uint64_t first_fragment_id, uint64_t last_fragment_id, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct FragmentGetRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fragment_cb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct FragmentGetRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); req->channel_key = *channel_key; req->first_fragment_id = GNUNET_htonll (first_fragment_id); req->last_fragment_id = GNUNET_htonll (last_fragment_id); @@ -873,13 +700,11 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, req->do_membership_test = GNUNET_YES; } - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -904,7 +729,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, * Maximum number of fragments to retrieve. * @param fragment_cb * Callback to call with the retrieved fragments. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -917,21 +742,12 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t fragment_limit, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct FragmentGetRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fragment_cb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct FragmentGetRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); req->channel_key = *channel_key; req->fragment_limit = GNUNET_ntohll (fragment_limit); if (NULL != slave_key) @@ -940,13 +756,11 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, req->do_membership_test = GNUNET_YES; } - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -988,27 +802,19 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, uint64_t fragment_limit, const char *method_prefix, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct MessageGetRequest *req; if (NULL == method_prefix) method_prefix = ""; uint16_t method_size = strnlen (method_prefix, - GNUNET_SERVER_MAX_MESSAGE_SIZE + GNUNET_MAX_MESSAGE_SIZE - sizeof (*req)) + 1; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fragment_cb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct MessageGetRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); - req->header.size = htons (sizeof (*req) + method_size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, method_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); req->channel_key = *channel_key; req->first_message_id = GNUNET_htonll (first_message_id); req->last_message_id = GNUNET_htonll (last_message_id); @@ -1018,16 +824,14 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, req->slave_key = *slave_key; req->do_membership_test = GNUNET_YES; } - memcpy (&req[1], method_prefix, method_size); + GNUNET_memcpy (&req[1], method_prefix, method_size); ((char *) &req[1])[method_size - 1] = '\0'; - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -1063,7 +867,7 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, uint64_t message_limit, const char *method_prefix, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct MessageGetRequest *req; @@ -1071,21 +875,13 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, if (NULL == method_prefix) method_prefix = ""; uint16_t method_size = strnlen (method_prefix, - GNUNET_SERVER_MAX_MESSAGE_SIZE + GNUNET_MAX_MESSAGE_SIZE - sizeof (*req)) + 1; GNUNET_assert ('\0' == method_prefix[method_size - 1]); - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + method_size); - op->h = h; - op->data_cb = (DataCallback) fragment_cb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct MessageGetRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); - req->header.size = htons (sizeof (*req) + method_size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, method_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); req->channel_key = *channel_key; req->message_limit = GNUNET_ntohll (message_limit); if (NULL != slave_key) @@ -1093,15 +889,13 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, req->slave_key = *slave_key; req->do_membership_test = GNUNET_YES; } + GNUNET_memcpy (&req[1], method_prefix, method_size); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - memcpy (&req[1], method_prefix, method_size); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -1137,21 +931,13 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, uint64_t message_id, uint64_t fragment_offset, GNUNET_PSYCSTORE_FragmentCallback fragment_cb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct MessageGetFragmentRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fragment_cb; - op->res_cb = rcb; - op->cls = cls; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT); - req = (struct MessageGetFragmentRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT); - req->header.size = htons (sizeof (*req)); req->channel_key = *channel_key; req->message_id = GNUNET_htonll (message_id); req->fragment_offset = GNUNET_htonll (fragment_offset); @@ -1161,13 +947,11 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, req->do_membership_test = GNUNET_YES; } - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -1191,29 +975,19 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - GNUNET_PSYCSTORE_CountersCallback ccb, - void *ccb_cls) + GNUNET_PSYCSTORE_CountersCallback counters_cb, + void *cls) { struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = ccb; - op->cls = ccb_cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET); req->channel_key = *channel_key; - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, NULL, NULL); + op->counters_cb = counters_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -1231,10 +1005,10 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, * ID of the message that contains the @a modifiers. * @param state_delta * Value of the _state_delta PSYC header variable of the message. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls - * Closure for the @a rcb callback. + * @param cls + * Closure for @a result_cb. * * @return Handle that can be used to cancel the operation. */ @@ -1243,35 +1017,37 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, uint64_t state_delta, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { - struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; struct StateModifyRequest *req; - - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct StateModifyRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); req->channel_key = *channel_key; req->message_id = GNUNET_htonll (message_id); req->state_delta = GNUNET_htonll (state_delta); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); + return op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); +} - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - return op; - /* FIXME: only the last operation is returned, - * operation_cancel() should be able to cancel all of them. - */ +struct StateSyncClosure +{ + GNUNET_PSYCSTORE_ResultCallback result_cb; + void *cls; + uint8_t last; +}; + + +static void +state_sync_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct StateSyncClosure *ssc = cls; + if (GNUNET_OK != result || ssc->last) + ssc->result_cb (ssc->cls, result, err_msg, err_msg_size); + GNUNET_free (ssc); } @@ -1290,9 +1066,9 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, * Number of elements in the @a modifiers array. * @param modifiers * Full state to store. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Handle that can be used to cancel the operation. @@ -1304,8 +1080,8 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, uint64_t state_hash_message_id, size_t modifier_count, const struct GNUNET_PSYC_Modifier *modifiers, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; size_t i; @@ -1314,14 +1090,11 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, struct StateSyncRequest *req; uint16_t name_size = strlen (modifiers[i].name) + 1; - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size + - modifiers[i].value_size); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, + sizeof (*req) + name_size + modifiers[i].value_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); - req = (struct StateSyncRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); req->header.size = htons (sizeof (*req) + name_size + modifiers[i].value_size); @@ -1336,15 +1109,19 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, ? STATE_OP_LAST : 0; - memcpy (&req[1], modifiers[i].name, name_size); - memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); + GNUNET_memcpy (&req[1], modifiers[i].name, name_size); + GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); + struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc)); + ssc->last = (req->flags & STATE_OP_LAST); + ssc->result_cb = result_cb; + ssc->cls = cls; - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); + op_send (h, op_create (h, h->op, state_sync_result, ssc), + env, &req->op_id); } + // FIXME: only one operation is returned, + // add pointers to other operations and make all cancellable. return op; } @@ -1358,9 +1135,9 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, * Handle for the PSYCstore. * @param channel_key * The channel we are interested in. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls + * @param cls * Closure for the callback. * * @return Handle that can be used to cancel the operation. @@ -1369,33 +1146,20 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); req->channel_key = *channel_key; - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } - /** * Update signed values of state variables in the state store. * @@ -1407,9 +1171,9 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, * Message ID that contained the state @a hash. * @param hash * Hash of the serialized full state. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. - * @param rcb_cls + * @param cls * Closure for the callback. */ struct GNUNET_PSYCSTORE_OperationHandle * @@ -1417,30 +1181,18 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, const struct GNUNET_HashCode *hash, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct StateHashUpdateRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct StateHashUpdateRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE); req->channel_key = *channel_key; req->hash = *hash; - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } @@ -1453,9 +1205,9 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, * The channel we are interested in. * @param name * Name of variable to match, the returned variable might be less specific. - * @param scb + * @param state_cb * Callback to return the matching state variable. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -1466,37 +1218,26 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const char *name, - GNUNET_PSYCSTORE_StateCallback scb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_StateCallback state_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { size_t name_size = strlen (name) + 1; struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); - op->h = h; - op->data_cb = (DataCallback) scb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET); - req->header.size = htons (sizeof (*req) + name_size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, name_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET); req->channel_key = *channel_key; - memcpy (&req[1], name, name_size); - - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); + GNUNET_memcpy (&req[1], name, name_size); - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->state_cb = state_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } - /** * Retrieve all state variables for a channel with the given prefix. * @@ -1506,9 +1247,9 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, * The channel we are interested in. * @param name_prefix * Prefix of state variable names to match. - * @param scb + * @param state_cb * Callback to return matching state variables. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -1519,33 +1260,23 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const char *name_prefix, - GNUNET_PSYCSTORE_StateCallback scb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_StateCallback state_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { size_t name_size = strlen (name_prefix) + 1; struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle * - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); - op->h = h; - op->data_cb = (DataCallback) scb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX); - req->header.size = htons (sizeof (*req) + name_size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, name_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX); req->channel_key = *channel_key; - memcpy (&req[1], name_prefix, name_size); + GNUNET_memcpy (&req[1], name_prefix, name_size); - op->op_id = get_next_op_id (h); - req->op_id = GNUNET_htonll (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->state_cb = state_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } /* end of psycstore_api.c */