X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fpsycstore%2Fpsycstore_api.c;h=d79daa35774906a65f2fd6b3a4a1ca27b3586180;hb=b14f2e69adb39f4eaaaf3e26a847f29d256a1c68;hp=88ae1185b91021d47a59a1f3b75ff50459459a34;hpb=383a9603f7310b3156331a1ed9cc97cc6ed4d3a9;p=oweals%2Fgnunet.git diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c index 88ae1185b..d79daa357 100644 --- a/src/psycstore/psycstore_api.c +++ b/src/psycstore/psycstore_api.c @@ -1,6 +1,6 @@ /* * This file is part of GNUnet - * (C) 2013 Christian Grothoff (and other contributing authors) + * Copyright (C) 2013 GNUnet e.V. * * GNUnet is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ * * You should have received a copy of the GNU General Public License * along with GNUnet; see the file COPYING. If not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. */ /** @@ -25,6 +25,8 @@ * @author Christian Grothoff */ +#include + #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_constants.h" @@ -35,8 +37,6 @@ #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__) -typedef void (*DataCallback) (); - /** * Handle for an operation with the PSYCstore service. */ @@ -49,40 +49,28 @@ struct GNUNET_PSYCSTORE_OperationHandle struct GNUNET_PSYCSTORE_Handle *h; /** - * We keep operations in a DLL. - */ - struct GNUNET_PSYCSTORE_OperationHandle *next; - - /** - * We keep operations in a DLL. - */ - struct GNUNET_PSYCSTORE_OperationHandle *prev; - - /** - * Continuation to invoke with the result of an operation. - */ - GNUNET_PSYCSTORE_ResultCallback res_cb; - - /** - * Continuation to invoke with the result of an operation returning data. + * Data callbacks. */ - DataCallback data_cb; + union { + GNUNET_PSYCSTORE_FragmentCallback fragment_cb; + GNUNET_PSYCSTORE_CountersCallback counters_cb; + GNUNET_PSYCSTORE_StateCallback state_cb; + }; /** - * Closure for the callbacks. + * Closure for callbacks. */ void *cls; /** - * Operation ID. + * Message envelope. */ - uint32_t op_id; + struct GNUNET_MQ_Envelope *env; /** - * Message to send to the PSYCstore service. - * Allocated at the end of this struct. + * Operation ID. */ - const struct GNUNET_MessageHeader *msg; + uint64_t op_id; }; @@ -97,437 +85,272 @@ struct GNUNET_PSYCSTORE_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). - */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Head of operations to transmit. - */ - struct GNUNET_PSYCSTORE_OperationHandle *transmit_head; - - /** - * Tail of operations to transmit. - */ - struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail; - - /** - * Head of active operations waiting for response. + * Client connection. */ - struct GNUNET_PSYCSTORE_OperationHandle *op_head; + struct GNUNET_MQ_Handle *mq; /** - * Tail of active operations waiting for response. + * Async operations. */ - struct GNUNET_PSYCSTORE_OperationHandle *op_tail; - - /** - * Currently pending transmission request, or NULL for none. - */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_OP_Handle *op; /** * Task doing exponential back-off trying to reconnect. */ - GNUNET_SCHEDULER_TaskIdentifier reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** - * Time for next connect retry. + * Delay for next connect retry. */ struct GNUNET_TIME_Relative reconnect_delay; - /** - * Are we polling for incoming messages right now? - */ - int in_receive; + GNUNET_PSYCSTORE_FragmentCallback *fragment_cb; + + GNUNET_PSYCSTORE_CountersCallback *counters_cb; + + GNUNET_PSYCSTORE_StateCallback *state_cb; /** - * The last operation id used for a PSYCstore operation. + * Closure for callbacks. */ - uint32_t last_op_id_used; - + void *cb_cls; }; -/** - * Get a fresh operation ID to distinguish between PSYCstore requests. - * - * @param h Handle to the PSYCstore service. - * @return next operation id to use - */ -static uint32_t -get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h) +static int +check_result_code (void *cls, const struct OperationResult *opres) { - return h->last_op_id_used++; -} - - -/** - * Find operation by ID. - * - * @return OperationHandle if found, or NULL otherwise. - */ -static struct GNUNET_PSYCSTORE_OperationHandle * -find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint32_t op_id) -{ - struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; - while (NULL != op) + uint16_t size = ntohs (opres->header.size); + const char *str = (const char *) &opres[1]; + if ( (sizeof (*opres) < size) && + ('\0' != str[size - sizeof (*opres) - 1]) ) { - if (op->op_id == op_id) - return op; - op = op->next; + GNUNET_break (0); + return GNUNET_SYSERR; } - return NULL; -} - -/** - * Try again to connect to the PSYCstore service. - * - * @param cls handle to the PSYCstore service. - * @param tc scheduler context - */ -static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + return GNUNET_OK; +} -/** - * Reschedule a connect attempt to the service. - * - * @param h transport service to reconnect - */ static void -reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h) +handle_result_code (void *cls, const struct OperationResult *opres) { - GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK); + struct GNUNET_PSYCSTORE_Handle *h = cls; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; + uint16_t size = ntohs (opres->header.size); + + const char * + str = (sizeof (*opres) < size) ? (const char *) &opres[1] : ""; - if (NULL != h->th) + if (GNUNET_YES == GNUNET_OP_result (h->op, GNUNET_ntohll (opres->op_id), + GNUNET_ntohll (opres->result_code) + INT64_MIN, + str, size - sizeof (*opres), (void **) &op)) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_code: Received result message with operation ID: %" PRIu64 "\n", + GNUNET_ntohll (opres->op_id)); + GNUNET_free (op); } - if (NULL != h->client) + else { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_code: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (opres->op_id)); } - h->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to PSYCstore service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); - h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); - h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } -/** - * Schedule transmission of the next message from our queue. - * - * @param h PSYCstore handle - */ static void -transmit_next (struct GNUNET_PSYCSTORE_Handle *h); - - -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +handle_result_counters (void *cls, const struct CountersResult *cres) { struct GNUNET_PSYCSTORE_Handle *h = cls; - struct GNUNET_PSYCSTORE_OperationHandle *op; - const struct OperationResult *opres; - const struct CountersResult *cres; - const struct FragmentResult *fres; - const struct StateResult *sres; - const char *str; - - if (NULL == msg) - { - reschedule_connect (h); - return; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %d from PSYCstore service.\n", - ntohs (msg->type)); - uint16_t size = ntohs (msg->size); - uint16_t type = ntohs (msg->type); - switch (type) - { - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE: - if (size < sizeof (struct OperationResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected >= %lu\n", - type, size, sizeof (struct OperationResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } - - opres = (const struct OperationResult *) msg; - str = (const char *) &opres[1]; - if ( (size > sizeof (struct OperationResult)) && - ('\0' != str[size - sizeof (struct OperationResult) - 1]) ) - { - GNUNET_break (0); - reschedule_connect (h); - return; - } - if (size == sizeof (struct OperationResult)) - str = NULL; - - op = find_op_by_id (h, ntohl (opres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %ld.\n", - type, ntohl (opres->op_id)); - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received result message (type %d) with operation ID: %ld\n", - type, op->op_id); - - GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); - if (NULL != op->res_cb) - { - const struct StateModifyRequest *smreq; - const struct StateSyncRequest *ssreq; - switch (ntohs (op->msg->type)) - { - case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY: - smreq = (const struct StateModifyRequest *) op->msg; - if (!(smreq->flags & STATE_OP_LAST - || GNUNET_OK != ntohl (opres->result_code))) - op->res_cb = NULL; - break; - case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC: - ssreq = (const struct StateSyncRequest *) op->msg; - if (!(ssreq->flags & STATE_OP_LAST - || GNUNET_OK != ntohl (opres->result_code))) - op->res_cb = NULL; - break; - } - } - if (NULL != op->res_cb) - op->res_cb (op->cls, ntohl (opres->result_code), str); - GNUNET_free (op); - } - break; - - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS: - if (size != sizeof (struct CountersResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected %lu\n", - type, size, sizeof (struct CountersResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } - - cres = (const struct CountersResult *) msg; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; - op = find_op_by_id (h, ntohl (cres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %ld.\n", - type, ntohl (cres->op_id)); - } - else + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (cres->op_id), + NULL, NULL, (void **) &op)) + { + GNUNET_assert (NULL != op); + if (NULL != op->counters_cb) { - GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); - if (NULL != op->data_cb) - ((GNUNET_PSYCSTORE_CountersCallback) - op->data_cb) (op->cls, ntohl (cres->result_code), + op->counters_cb (op->cls, + ntohl (cres->result_code), GNUNET_ntohll (cres->max_fragment_id), GNUNET_ntohll (cres->max_message_id), GNUNET_ntohll (cres->max_group_generation), GNUNET_ntohll (cres->max_state_message_id)); - GNUNET_free (op); } - break; + GNUNET_OP_remove (h->op, GNUNET_ntohll (cres->op_id)); + GNUNET_free (op); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_counters: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (cres->op_id)); + } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT: - if (size < sizeof (struct FragmentResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected >= %lu\n", - type, size, sizeof (struct FragmentResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } - fres = (const struct FragmentResult *) msg; - struct GNUNET_MULTICAST_MessageHeader *mmsg = - (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; - if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected = %lu\n", - type, size, - sizeof (struct FragmentResult) + ntohs (mmsg->header.size)); - GNUNET_break (0); - reschedule_connect (h); - return; - } +static int +check_result_fragment (void *cls, const struct FragmentResult *fres) +{ + uint16_t size = ntohs (fres->header.size); + struct GNUNET_MULTICAST_MessageHeader *mmsg = + (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; + if (sizeof (*fres) + sizeof (*mmsg) < size + && sizeof (*fres) + ntohs (mmsg->header.size) != size) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "check_result_fragment: Received message with invalid length %lu bytes.\n", + size, sizeof (*fres)); + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} - op = find_op_by_id (h, ntohl (fres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %ld.\n", - type, ntohl (fres->op_id)); - } - else - { - if (NULL != op->data_cb) - ((GNUNET_PSYCSTORE_FragmentCallback) - op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags)); - } - break; - case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE: - if (size < sizeof (struct StateResult)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received message of type %d with length %lu bytes. " - "Expected >= %lu\n", - type, size, sizeof (struct StateResult)); - GNUNET_break (0); - reschedule_connect (h); - return; - } +static void +handle_result_fragment (void *cls, const struct FragmentResult *fres) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; - sres = (const struct StateResult *) msg; - const char *name = (const char *) &sres[1]; - uint16_t name_size = ntohs (sres->name_size); + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (fres->op_id), + NULL, NULL, (void **) &op)) + { + GNUNET_assert (NULL != op); + if (NULL != op->fragment_cb) + op->fragment_cb (op->cls, + (struct GNUNET_MULTICAST_MessageHeader *) &fres[1], + ntohl (fres->psycstore_flags)); + //GNUNET_OP_remove (h->op, GNUNET_ntohll (fres->op_id)); + //GNUNET_free (op); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_fragment: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (fres->op_id)); + } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} - if (name_size <= 2 || '\0' != name[name_size - 1]) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Received state result message (type %d) with invalid name.\n", - type); - GNUNET_break (0); - reschedule_connect (h); - return; - } - op = find_op_by_id (h, ntohl (sres->op_id)); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %ld.\n", - type, ntohl (sres->op_id)); - } - else - { - if (NULL != op->data_cb) - ((GNUNET_PSYCSTORE_StateCallback) - op->data_cb) (op->cls, name, (char *) &sres[1] + name_size, - ntohs (sres->header.size) - sizeof (*sres) - name_size); - } - break; +static int +check_result_state (void *cls, const struct StateResult *sres) +{ + const char *name = (const char *) &sres[1]; + uint16_t size = ntohs (sres->header.size); + uint16_t name_size = ntohs (sres->name_size); - default: + if (name_size <= 2 + || size - sizeof (*sres) < name_size + || '\0' != name[name_size - 1]) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "check_result_state: Received state result message with invalid name.\n"); GNUNET_break (0); - reschedule_connect (h); - return; + return GNUNET_SYSERR; } - - GNUNET_CLIENT_receive (h->client, &message_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); + return GNUNET_OK; } -/** - * Transmit next message to service. - * - * @param cls The 'struct GNUNET_PSYCSTORE_Handle'. - * @param size Number of bytes available in buf. - * @param buf Where to copy the message. - * @return Number of bytes copied to buf. - */ -static size_t -send_next_message (void *cls, size_t size, void *buf) +static void +handle_result_state (void *cls, const struct StateResult *sres) { struct GNUNET_PSYCSTORE_Handle *h = cls; - struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; - size_t ret; - - h->th = NULL; - if (NULL == op) - return 0; - ret = ntohs (op->msg->size); - if (ret > size) - { - reschedule_connect (h); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending message of type %d to PSYCstore service. ID: %u\n", - ntohs (op->msg->type), op->op_id); - memcpy (buf, op->msg, ret); + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; - GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); + const char *name = (const char *) &sres[1]; + uint16_t name_size = ntohs (sres->name_size); - if (NULL == op->res_cb && NULL == op->data_cb) + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (sres->op_id), + NULL, NULL, (void **) &op)) { - GNUNET_free (op); + GNUNET_assert (NULL != op); + if (NULL != op->state_cb) + op->state_cb (op->cls, name, (char *) &sres[1] + name_size, + ntohs (sres->header.size) - sizeof (*sres) - name_size); + //GNUNET_OP_remove (h->op, GNUNET_ntohll (sres->op_id)); + //GNUNET_free (op); } else { - GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_state: No callback registered for operation with ID %" PRIu64 ".\n", + GNUNET_ntohll (sres->op_id)); } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} - if (NULL != h->transmit_head) - transmit_next (h); - if (GNUNET_NO == h->in_receive) - { - h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, &message_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return ret; -} +static void +reconnect (void *cls); /** - * Schedule transmission of the next message from our queue. + * Client disconnected from service. * - * @param h PSYCstore handle. + * Reconnect after backoff period.= */ static void -transmit_next (struct GNUNET_PSYCSTORE_Handle *h) +disconnected (void *cls, enum GNUNET_MQ_Error error) { - if (NULL != h->th || NULL == h->client) - return; - - struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; - if (NULL == op) - return; - - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, - ntohs (op->msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &send_next_message, - h); + struct GNUNET_PSYCSTORE_Handle *h = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Origin client disconnected (%d), re-connecting\n", + (int) error); + if (NULL != h->mq) + { + GNUNET_MQ_destroy (h->mq); + GNUNET_OP_destroy (h->op); + h->mq = NULL; + h->op = NULL; + } + + h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, + &reconnect, h); + h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); +} + + +static void +do_connect (struct GNUNET_PSYCSTORE_Handle *h) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Connecting to PSYCstore service.\n"); + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (result_code, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE, + struct OperationResult, + h), + GNUNET_MQ_hd_fixed_size (result_counters, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS, + struct CountersResult, + h), + GNUNET_MQ_hd_var_size (result_fragment, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT, + struct FragmentResult, + h), + GNUNET_MQ_hd_var_size (result_state, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE, + struct StateResult, + h), + GNUNET_MQ_handler_end () + }; + + h->op = GNUNET_OP_create (); + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connect (h->cfg, "psycstore", + handlers, disconnected, h); + GNUNET_assert (NULL != h->mq); } @@ -535,20 +358,11 @@ transmit_next (struct GNUNET_PSYCSTORE_Handle *h) * Try again to connect to the PSYCstore service. * * @param cls Handle to the PSYCstore service. - * @param tc Scheduler context. */ static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +reconnect (void *cls) { - struct GNUNET_PSYCSTORE_Handle *h = cls; - - h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to PSYCstore service.\n"); - GNUNET_assert (NULL == h->client); - h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg); - GNUNET_assert (NULL != h->client); - transmit_next (h); + do_connect (cls); } @@ -564,8 +378,8 @@ GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) struct GNUNET_PSYCSTORE_Handle *h = GNUNET_new (struct GNUNET_PSYCSTORE_Handle); h->cfg = cfg; - h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h); + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; + do_connect (h); return h; } @@ -579,25 +393,84 @@ void GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) { GNUNET_assert (NULL != h); - if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK) + if (h->reconnect_task != NULL) { GNUNET_SCHEDULER_cancel (h->reconnect_task); - h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; + h->reconnect_task = NULL; } - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + // FIXME: free data structures for pending operations + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } GNUNET_free (h); } +/** + * Message sent notification. + * + * Remove invalidated envelope pointer. + */ +static void +message_sent (void *cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle *op = cls; + op->env = NULL; +} + + +/** + * Create a new operation. + */ +static struct GNUNET_PSYCSTORE_OperationHandle * +op_create (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_OP_Handle *hop, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op)); + op->h = h; + op->op_id = GNUNET_OP_add (hop, + (GNUNET_ResultCallback) result_cb, + cls, op); + return op; +} + + +/** + * Send a message associated with an operation. + * + * @param h + * PSYCstore handle. + * @param op + * Operation handle. + * @param env + * Message envelope to send. + * @param[out] op_id + * Operation ID to write in network byte order. NULL if not needed. + * + * @return Operation handle. + * + */ +static struct GNUNET_PSYCSTORE_OperationHandle * +op_send (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_PSYCSTORE_OperationHandle *op, + struct GNUNET_MQ_Envelope *env, + uint64_t *op_id) +{ + op->env = env; + if (NULL != op_id) + *op_id = GNUNET_htonll (op->op_id); + + GNUNET_MQ_notify_sent (env, message_sent, op); + GNUNET_MQ_send (h->mq, env); + return op; +} + + /** * Cancel a PSYCstore operation. Note that the operation MAY still * be executed; this merely cancels the continuation; if the request @@ -605,32 +478,26 @@ GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) * the operation. * * @param op Operation to cancel. + * + * @return #GNUNET_YES if message was not sent yet and got discarded, + * #GNUNET_NO if it was already sent, and only the callbacks got cancelled. */ -void +int GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) { struct GNUNET_PSYCSTORE_Handle *h = op->h; + int ret = GNUNET_NO; - if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client)) + if (NULL != op->env) { - /* request not active, can simply remove */ - GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); - GNUNET_free (op); - return; + GNUNET_MQ_send_cancel (op->env); + ret = GNUNET_YES; } - if (NULL != h->th) - { - /* request active but not yet with service, can still abort */ - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); - GNUNET_free (op); - transmit_next (h); - return; - } - /* request active with service, simply ensure continuations are not called */ - op->res_cb = NULL; - op->data_cb = NULL; + + GNUNET_OP_remove (h->op, op->op_id); + GNUNET_free (op); + + return ret; } @@ -638,64 +505,62 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) * Store join/leave events for a PSYC channel in order to be able to answer * membership test queries later. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel where the event happened. - * @param slave_key Public key of joining/leaving slave. - * @param did_join #GNUNET_YES on join, #GNUNET_NO on part. - * @param announced_at ID of the message that announced the membership change. - * @param effective_since Message ID this membership change is in effect since. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel where the event happened. + * @param slave_key + * Public key of joining/leaving slave. + * @param did_join + * #GNUNET_YES on join, #GNUNET_NO on part. + * @param announced_at + * ID of the message that announced the membership change. + * @param effective_since + * Message ID this membership change is in effect since. * For joins it is <= announced_at, for parts it is always 0. - * @param group_generation In case of a part, the last group generation the - * slave has access to. It has relevance when a larger message have - * fragments with different group generations. - * @param rcb Callback to call with the result of the storage operation. - * @param rcb_cls Closure for the callback. + * @param group_generation + * In case of a part, the last group generation the slave has access to. + * It has relevance when a larger message have fragments with different + * group generations. + * @param result_cb + * Callback to call with the result of the storage operation. + * @param cls + * Closure for the callback. * * @return Operation handle that can be used to cancel the operation. */ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, int did_join, uint64_t announced_at, uint64_t effective_since, uint64_t group_generation, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { GNUNET_assert (NULL != h); GNUNET_assert (NULL != channel_key); GNUNET_assert (NULL != slave_key); + GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join); GNUNET_assert (did_join ? effective_since <= announced_at : effective_since == 0); struct MembershipStoreRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct MembershipStoreRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE); req->channel_key = *channel_key; req->slave_key = *slave_key; - req->did_join = htonl (did_join); + req->did_join = did_join; req->announced_at = GNUNET_htonll (announced_at); req->effective_since = GNUNET_htonll (effective_since); req->group_generation = GNUNET_htonll (group_generation); - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } @@ -707,50 +572,45 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, * is also used when handling join requests to determine whether the slave is * currently admitted to the channel. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param slave_key Public key of slave whose membership to check. - * @param message_id Message ID for which to do the membership test. - * @param group_generation Group generation of the fragment of the message to - * test. It has relevance if the message consists of multiple fragments - * with different group generations. - * @param rcb Callback to call with the test result. - * @param rcb_cls Closure for the callback. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * Public key of slave whose membership to check. + * @param message_id + * Message ID for which to do the membership test. + * @param group_generation + * Group generation of the fragment of the message to test. + * It has relevance if the message consists of multiple fragments with + * different group generations. + * @param result_cb + * Callback to call with the test result. + * @param cls + * Closure for the callback. * * @return Operation handle that can be used to cancel the operation. */ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t message_id, uint64_t group_generation, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct MembershipTestRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct MembershipTestRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST); req->channel_key = *channel_key; req->slave_key = *slave_key; req->message_id = GNUNET_htonll (message_id); req->group_generation = GNUNET_htonll (group_generation); - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } @@ -762,132 +622,280 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, * @param message Message to store. * @param psycstore_flags Flags indicating whether the PSYC message contains * state modifiers. - * @param rcb Callback to call with the result of the operation. - * @param rcb_cls Closure for the callback. + * @param result_cb Callback to call with the result of the operation. + * @param cls Closure for the callback. * * @return Handle that can be used to cancel the operation. */ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - const struct GNUNET_MULTICAST_MessageHeader *message, - uint32_t psycstore_flags, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + const struct GNUNET_MULTICAST_MessageHeader *msg, + enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { - uint16_t size = ntohs (message->header.size); + uint16_t size = ntohs (msg->header.size); struct FragmentStoreRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct FragmentStoreRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE); - req->header.size = htons (sizeof (*req) + size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE); req->channel_key = *channel_key; req->psycstore_flags = htonl (psycstore_flags); - memcpy (&req[1], message, size); - - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + GNUNET_memcpy (&req[1], msg, size); - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } /** - * Retrieve a message fragment by fragment ID. + * Retrieve message fragments by fragment ID range. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param fragment_id Fragment ID to check. Use 0 to get the latest message fragment. - * @param fcb Callback to call with the retrieved fragments. - * @param rcb Callback to call with the result of the operation. - * @param cls Closure for the callbacks. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the fragment. If not NULL, a membership test is + * performed first and the fragment is only returned if the slave has + * access to it. + * @param first_fragment_id + * First fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param last_fragment_id + * Last consecutive fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param fragment_limit + * Maximum number of fragments to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. * * @return Handle that can be used to cancel the operation. */ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - uint64_t fragment_id, - GNUNET_PSYCSTORE_FragmentCallback fcb, - GNUNET_PSYCSTORE_ResultCallback rcb, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t first_fragment_id, + uint64_t last_fragment_id, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct FragmentGetRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fcb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct FragmentGetRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); req->channel_key = *channel_key; - req->fragment_id = GNUNET_htonll (fragment_id); + req->first_fragment_id = GNUNET_htonll (first_fragment_id); + req->last_fragment_id = GNUNET_htonll (last_fragment_id); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); +} - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - return op; +/** + * Retrieve latest message fragments. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the fragment. If not NULL, a membership test is + * performed first and the fragment is only returned if the slave has + * access to it. + * @param first_fragment_id + * First fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param last_fragment_id + * Last consecutive fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param fragment_limit + * Maximum number of fragments to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t fragment_limit, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct FragmentGetRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); + req->channel_key = *channel_key; + req->fragment_limit = GNUNET_ntohll (fragment_limit); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } + + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } /** - * Retrieve all fragments of a message. + * Retrieve all fragments of messages in a message ID range. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param message_id Message ID to check. Use 0 to get the latest message. - * @param fcb Callback to call with the retrieved fragments. - * @param rcb Callback to call with the result of the operation. - * @param cls Closure for the callbacks. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the message. + * If not NULL, a membership test is performed first + * and the message is only returned if the slave has access to it. + * @param first_message_id + * First message ID to retrieve. + * @param last_message_id + * Last consecutive message ID to retrieve. + * @param fragment_limit + * Maximum number of fragments to retrieve. + * @param method_prefix + * Retrieve only messages with a matching method prefix. + * @todo Implement method_prefix query. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. * * @return Handle that can be used to cancel the operation. */ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - uint64_t message_id, - GNUNET_PSYCSTORE_FragmentCallback fcb, - GNUNET_PSYCSTORE_ResultCallback rcb, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t first_message_id, + uint64_t last_message_id, + uint64_t fragment_limit, + const char *method_prefix, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct MessageGetRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fcb; - op->res_cb = rcb; + if (NULL == method_prefix) + method_prefix = ""; + uint16_t method_size = strnlen (method_prefix, + GNUNET_MAX_MESSAGE_SIZE + - sizeof (*req)) + 1; + + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, method_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); + req->channel_key = *channel_key; + req->first_message_id = GNUNET_htonll (first_message_id); + req->last_message_id = GNUNET_htonll (last_message_id); + req->fragment_limit = GNUNET_htonll (fragment_limit); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } + GNUNET_memcpy (&req[1], method_prefix, method_size); + ((char *) &req[1])[method_size - 1] = '\0'; + + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; op->cls = cls; + return op_send (h, op, env, &req->op_id); +} - req = (struct MessageGetRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); - req->header.size = htons (sizeof (*req)); - req->channel_key = *channel_key; - req->message_id = GNUNET_htonll (message_id); - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); +/** + * Retrieve all fragments of the latest messages. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the message. + * If not NULL, a membership test is performed first + * and the message is only returned if the slave has access to it. + * @param message_limit + * Maximum number of messages to retrieve. + * @param method_prefix + * Retrieve only messages with a matching method prefix. + * @todo Implement method_prefix query. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t message_limit, + const char *method_prefix, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct MessageGetRequest *req; + + if (NULL == method_prefix) + method_prefix = ""; + uint16_t method_size = strnlen (method_prefix, + GNUNET_MAX_MESSAGE_SIZE + - sizeof (*req)) + 1; + GNUNET_assert ('\0' == method_prefix[method_size - 1]); - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, method_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); + req->channel_key = *channel_key; + req->message_limit = GNUNET_ntohll (message_limit); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } + GNUNET_memcpy (&req[1], method_prefix, method_size); - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -895,48 +903,55 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, * Retrieve a fragment of message specified by its message ID and fragment * offset. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param message_id Message ID to check. Use 0 to get the latest message. - * @param fragment_offset Offset of the fragment to retrieve. - * @param fcb Callback to call with the retrieved fragments. - * @param rcb Callback to call with the result of the operation. - * @param cls Closure for the callbacks. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the message fragment. If not NULL, a membership + * test is performed first and the message fragment is only returned + * if the slave has access to it. + * @param message_id + * Message ID to retrieve. Use 0 to get the latest message. + * @param fragment_offset + * Offset of the fragment to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. * * @return Handle that can be used to cancel the operation. */ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t message_id, uint64_t fragment_offset, - GNUNET_PSYCSTORE_FragmentCallback fcb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { struct MessageGetFragmentRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = (DataCallback) fcb; - op->res_cb = rcb; - op->cls = cls; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT); - req = (struct MessageGetFragmentRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT); - req->header.size = htons (sizeof (*req)); req->channel_key = *channel_key; req->message_id = GNUNET_htonll (message_id); req->fragment_offset = GNUNET_htonll (fragment_offset); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -946,39 +961,33 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, * The current value of counters are needed when a channel master is restarted, * so that it can continue incrementing the counters from their last value. * - * @param h Handle for the PSYCstore. - * @param channel_key Public key that identifies the channel. - * @param ccb Callback to call with the result. - * @param ccb_cls Closure for the @a ccb callback. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * Public key that identifies the channel. + * @param ccb + * Callback to call with the result. + * @param ccb_cls + * Closure for the @a ccb callback. * * @return Handle that can be used to cancel the operation. */ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - GNUNET_PSYCSTORE_CountersCallback ccb, - void *ccb_cls) + GNUNET_PSYCSTORE_CountersCallback counters_cb, + void *cls) { struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->data_cb = ccb; - op->cls = ccb_cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET); req->channel_key = *channel_key; - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, NULL, NULL); + op->counters_cb = counters_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } @@ -988,14 +997,18 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, * An error is returned if there are missing messages containing state * operations before the current one. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param message_id ID of the message that contains the @a modifiers. - * @param state_delta Value of the _state_delta PSYC header variable of the message. - * @param modifier_count Number of elements in the @a modifiers array. - * @param modifiers List of modifiers to apply. - * @param rcb Callback to call with the result of the operation. - * @param rcb_cls Closure for the @a rcb callback. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param message_id + * ID of the message that contains the @a modifiers. + * @param state_delta + * Value of the _state_delta PSYC header variable of the message. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for @a result_cb. * * @return Handle that can be used to cancel the operation. */ @@ -1004,78 +1017,71 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, uint64_t state_delta, - size_t modifier_count, - const struct GNUNET_ENV_Modifier *modifiers, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { - struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; - size_t i; - - for (i = 0; i < modifier_count; i++) { - struct StateModifyRequest *req; - uint16_t name_size = strlen (modifiers[i].name) + 1; + struct StateModifyRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); + req->channel_key = *channel_key; + req->message_id = GNUNET_htonll (message_id); + req->state_delta = GNUNET_htonll (state_delta); - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size + - modifiers[i].value_size); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; + return op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); +} - req = (struct StateModifyRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); - req->header.size = htons (sizeof (*req) + name_size - + modifiers[i].value_size); - req->channel_key = *channel_key; - req->message_id = GNUNET_htonll (message_id); - req->state_delta = GNUNET_htonll (state_delta); - req->oper = modifiers[i].oper; - req->name_size = htons (name_size); - req->flags - = 0 == i - ? STATE_OP_FIRST - : modifier_count - 1 == i - ? STATE_OP_LAST - : 0; - memcpy (&req[1], modifiers[i].name, name_size); - memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); +struct StateSyncClosure +{ + GNUNET_PSYCSTORE_ResultCallback result_cb; + void *cls; + uint8_t last; +}; - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - } - return op; - /* FIXME: only the last operation is returned, - * operation_cancel() should be able to cancel all of them. - */ +static void +state_sync_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct StateSyncClosure *ssc = cls; + if (GNUNET_OK != result || ssc->last) + ssc->result_cb (ssc->cls, result, err_msg, err_msg_size); + GNUNET_free (ssc); } /** * Store synchronized state. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param message_id ID of the message that contains the state_hash PSYC header variable. - * @param modifier_count Number of elements in the @a modifiers array. - * @param modifiers Full state to store. - * @param rcb Callback to call with the result of the operation. - * @param rcb_cls Closure for the callback. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param max_state_message_id + * ID of the last stateful message before @a state_hash_message_id. + * @param state_hash_message_id + * ID of the message that contains the state_hash PSYC header variable. + * @param modifier_count + * Number of elements in the @a modifiers array. + * @param modifiers + * Full state to store. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callback. * * @return Handle that can be used to cancel the operation. */ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - uint64_t message_id, + uint64_t max_state_message_id, + uint64_t state_hash_message_id, size_t modifier_count, - const struct GNUNET_ENV_Modifier *modifiers, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + const struct GNUNET_PSYC_Modifier *modifiers, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; size_t i; @@ -1084,36 +1090,38 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, struct StateSyncRequest *req; uint16_t name_size = strlen (modifiers[i].name) + 1; - op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size + - modifiers[i].value_size); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, + sizeof (*req) + name_size + modifiers[i].value_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); - req = (struct StateSyncRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); req->header.size = htons (sizeof (*req) + name_size + modifiers[i].value_size); req->channel_key = *channel_key; - req->message_id = GNUNET_htonll (message_id); + req->max_state_message_id = GNUNET_htonll (max_state_message_id); + req->state_hash_message_id = GNUNET_htonll (state_hash_message_id); req->name_size = htons (name_size); req->flags - = 0 == i + = (0 == i) ? STATE_OP_FIRST - : modifier_count - 1 == i + : (modifier_count - 1 == i) ? STATE_OP_LAST : 0; - memcpy (&req[1], modifiers[i].name, name_size); - memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); + GNUNET_memcpy (&req[1], modifiers[i].name, name_size); + GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc)); + ssc->last = (req->flags & STATE_OP_LAST); + ssc->result_cb = result_cb; + ssc->cls = cls; - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); + op_send (h, op_create (h, h->op, state_sync_result, ssc), + env, &req->op_id); } + // FIXME: only one operation is returned, + // add pointers to other operations and make all cancellable. return op; } @@ -1123,10 +1131,14 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, * * Delete all state variables stored for the given channel. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param rcb Callback to call with the result of the operation. - * @param rcb_cls Closure for the callback. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callback. * * @return Handle that can be used to cancel the operation. */ @@ -1134,85 +1146,71 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); req->channel_key = *channel_key; - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } - /** * Update signed values of state variables in the state store. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param message_id Message ID that contained the state @a hash. - * @param hash Hash of the serialized full state. - * @param rcb Callback to call with the result of the operation. - * @param rcb_cls Closure for the callback. - * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param message_id + * Message ID that contained the state @a hash. + * @param hash + * Hash of the serialized full state. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callback. */ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, uint64_t message_id, const struct GNUNET_HashCode *hash, - GNUNET_PSYCSTORE_ResultCallback rcb, - void *rcb_cls) + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) { struct StateHashUpdateRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); - op->h = h; - op->res_cb = rcb; - op->cls = rcb_cls; - - req = (struct StateHashUpdateRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); - req->header.size = htons (sizeof (*req)); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE); req->channel_key = *channel_key; req->hash = *hash; - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); } /** * Retrieve the best matching state variable. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param name Name of variable to match, the returned variable might be less specific. - * @param scb Callback to return the matching state variable. - * @param rcb Callback to call with the result of the operation. - * @param cls Closure for the callbacks. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param name + * Name of variable to match, the returned variable might be less specific. + * @param state_cb + * Callback to return the matching state variable. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. * * @return Handle that can be used to cancel the operation. */ @@ -1220,46 +1218,41 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const char *name, - GNUNET_PSYCSTORE_StateCallback scb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_StateCallback state_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { size_t name_size = strlen (name) + 1; struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); - op->h = h; - op->data_cb = (DataCallback) scb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET); - req->header.size = htons (sizeof (*req) + name_size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, name_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET); req->channel_key = *channel_key; - memcpy (&req[1], name, name_size); - - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + GNUNET_memcpy (&req[1], name, name_size); - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); - - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->state_cb = state_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } - /** * Retrieve all state variables for a channel with the given prefix. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param name_prefix Prefix of state variable names to match. - * @param scb Callback to return matching state variables. - * @param rcb Callback to call with the result of the operation. - * @param cls Closure for the callbacks. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param name_prefix + * Prefix of state variable names to match. + * @param state_cb + * Callback to return matching state variables. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. * * @return Handle that can be used to cancel the operation. */ @@ -1267,33 +1260,23 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const char *name_prefix, - GNUNET_PSYCSTORE_StateCallback scb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_StateCallback state_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls) { size_t name_size = strlen (name_prefix) + 1; struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); - op->h = h; - op->data_cb = (DataCallback) scb; - op->res_cb = rcb; - op->cls = cls; - - req = (struct OperationRequest *) &op[1]; - op->msg = (struct GNUNET_MessageHeader *) req; - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX); - req->header.size = htons (sizeof (*req) + name_size); + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, name_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX); req->channel_key = *channel_key; - memcpy (&req[1], name_prefix, name_size); - - op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); - - GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); - transmit_next (h); + GNUNET_memcpy (&req[1], name_prefix, name_size); - return op; + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->state_cb = state_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); } /* end of psycstore_api.c */