X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fconsensus%2Fconsensus_api.c;h=6988c7b6d3425ff2cdeeae95530a2be3c1c73b2f;hb=c274294a31620fcbd8658ac047ff762a593b28fa;hp=25ace3a4dcbf3a6d5b888e4726e4e6a7b4802142;hpb=4616bf9d016bcfb319954e52b5d24fc93c3cf1b2;p=oweals%2Fgnunet.git diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 25ace3a4d..6988c7b6d 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -20,7 +20,7 @@ /** * @file consensus/consensus_api.c - * @brief + * @brief * @author Florian Dold */ #include "platform.h" @@ -33,37 +33,6 @@ #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) -/** - * Actions that can be queued. - */ -struct QueuedMessage -{ - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *next; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *prev; - - /** - * The actual queued message. - */ - struct GNUNET_MessageHeader *msg; - - /** - * Will be called after transmit, if not NULL - */ - GNUNET_CONSENSUS_InsertDoneCallback idc; - - /** - * Closure for idc - */ - void *idc_cls; -}; - /** * Handle for the service. @@ -95,31 +64,11 @@ struct GNUNET_CONSENSUS_Handle */ struct GNUNET_HashCode session_id; - /** - * Number of peers in the consensus. Optionally includes the local peer. - */ - int num_peers; - - /** - * Peer identities of peers participating in the consensus, includes the local peer. - */ - struct GNUNET_PeerIdentity **peers; - - /** - * Currently active transmit request. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - /** * GNUNES_YES iff the join message has been sent to the service. */ int joined; - /** - * Closure for the insert done callback. - */ - void *idc_cls; - /** * Called when the conclude operation finishes or fails. */ @@ -135,125 +84,37 @@ struct GNUNET_CONSENSUS_Handle */ struct GNUNET_TIME_Absolute conclude_deadline; - unsigned int conclude_min_size; - - struct QueuedMessage *messages_head; - struct QueuedMessage *messages_tail; - /** - * GNUNET_YES when currently in a section where destroy may not be - * called. + * Message queue for the client. */ - int may_not_destroy; + struct GNUNET_MQ_Handle *mq; }; - - /** - * Schedule transmitting the next message. - * - * @param consensus consensus handle + * FIXME: this should not bee necessary when the API + * issue has been fixed */ -static void -send_next (struct GNUNET_CONSENSUS_Handle *consensus); - - -/** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_queued (void *cls, size_t size, - void *buf) +struct InsertDoneInfo { - struct GNUNET_CONSENSUS_Handle *consensus; - struct QueuedMessage *qmsg; - size_t msg_size; - - consensus = (struct GNUNET_CONSENSUS_Handle *) cls; - consensus->th = NULL; - - qmsg = consensus->messages_head; - GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); - - if (NULL == buf) - { - if (NULL != qmsg->idc) - { - qmsg->idc (qmsg->idc_cls, GNUNET_YES); - } - return 0; - } - - msg_size = ntohs (qmsg->msg->size); - - GNUNET_assert (size >= msg_size); - - memcpy (buf, qmsg->msg, msg_size); - if (NULL != qmsg->idc) - { - qmsg->idc (qmsg->idc_cls, GNUNET_YES); - } - GNUNET_free (qmsg->msg); - GNUNET_free (qmsg); - /* FIXME: free the messages */ - - send_next (consensus); - - return msg_size; -} - - -/** - * Schedule transmitting the next message. - * - * @param consensus consensus handle - */ -static void -send_next (struct GNUNET_CONSENSUS_Handle *consensus) -{ - if (NULL != consensus->th) - return; - - if (NULL != consensus->messages_head) - { - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_queued, consensus); - } -} - -static void -queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg) -{ - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm); -} + GNUNET_CONSENSUS_InsertDoneCallback idc; + void *cls; +}; /** * Called when the server has sent is a new element - * - * @param consensus consensus handle - * @param msg element message + * + * @param cls consensus handle + * @param mh element message */ static void -handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ElementMessage *msg) +handle_new_element (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct GNUNET_CONSENSUS_Element element; - struct GNUNET_CONSENSUS_AckMessage *ack_msg; - int ret; + struct GNUNET_CONSENSUS_Handle *consensus = cls; + const struct GNUNET_CONSENSUS_ElementMessage *msg + = (const struct GNUNET_CONSENSUS_ElementMessage *) mh; + struct GNUNET_SET_Element element; LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); @@ -261,123 +122,54 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); element.data = &msg[1]; - ret = consensus->new_element_cb (consensus->new_element_cls, &element); - - ack_msg = GNUNET_new (struct GNUNET_CONSENSUS_AckMessage); - ack_msg->header.size = htons (sizeof *ack_msg); - ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); - ack_msg->keep = ret; - - queue_message (consensus, &ack_msg->header); - - send_next (consensus); + consensus->new_element_cb (consensus->new_element_cls, &element); } /** * Called when the server has announced * that the conclusion is over. - * - * @param consensus consensus handle + * + * @param cls consensus handle * @param msg conclude done message */ static void -handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, - const struct GNUNET_MessageHeader *msg) +handle_conclude_done (void *cls, + const struct GNUNET_MessageHeader *msg) { - GNUNET_assert (NULL != consensus->conclude_cb); - consensus->may_not_destroy = GNUNET_YES; - consensus->conclude_cb (consensus->conclude_cls); - consensus->may_not_destroy = GNUNET_NO; - consensus->conclude_cb = NULL; -} + struct GNUNET_CONSENSUS_Handle *consensus = cls; + GNUNET_CONSENSUS_ConcludeCallback cc; + GNUNET_MQ_destroy (consensus->mq); + consensus->mq = NULL; -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_CONSENSUS_Handle *consensus = cls; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n"); + GNUNET_CLIENT_disconnect (consensus->client); + consensus->client = NULL; - if (msg == NULL) - { - /* Error, timeout, death */ - LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n"); - GNUNET_CLIENT_disconnect (consensus->client); - consensus->client = NULL; - consensus->new_element_cb (consensus->new_element_cls, NULL); - return; - } - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: - handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); - break; - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: - handle_conclude_done (consensus, msg); - break; - default: - GNUNET_break (0); - } - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_assert (NULL != (cc = consensus->conclude_cb)); + consensus->conclude_cb = NULL; + cc (consensus->conclude_cls); } + /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls closure, same closure as for the message handlers + * @param error error code */ -static size_t -transmit_join (void *cls, size_t size, void *buf) +static void +mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { - struct GNUNET_CONSENSUS_JoinMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); - - consensus = cls; - consensus->th = NULL; - consensus->joined = 1; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) + - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity); - - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); - msg->header.size = htons (msize); - msg->session_id = consensus->session_id; - msg->num_peers = htonl (consensus->num_peers); - memcpy(&msg[1], - consensus->peers, - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); - send_next (consensus); - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); - - return msize; + LOG (GNUNET_ERROR_TYPE_WARNING, "consensus service disconnected us\n"); } + /** * Create a consensus session. * @@ -387,6 +179,9 @@ transmit_join (void *cls, size_t size, void *buf) * Inclusion of the local peer is optional. * @param session_id session identifier * Allows a group of peers to have more than consensus session. + * @param start start time of the consensus, conclude should be called before + * the start time. + * @param deadline time when the consensus should have concluded * @param new_element_cb callback, called when a new element is added to the set by * another peer * @param new_element_cls closure for new_element @@ -397,44 +192,57 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, unsigned int num_peers, const struct GNUNET_PeerIdentity *peers, const struct GNUNET_HashCode *session_id, + struct GNUNET_TIME_Absolute start, + struct GNUNET_TIME_Absolute deadline, GNUNET_CONSENSUS_ElementCallback new_element_cb, void *new_element_cls) { struct GNUNET_CONSENSUS_Handle *consensus; - size_t join_message_size; - - consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); + struct GNUNET_CONSENSUS_JoinMessage *join_msg; + struct GNUNET_MQ_Envelope *ev; + const static struct GNUNET_MQ_MessageHandler mq_handlers[] = { + {handle_new_element, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0}, + {handle_conclude_done, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0}, + GNUNET_MQ_HANDLERS_END + }; + + consensus = GNUNET_new (struct GNUNET_CONSENSUS_Handle); consensus->cfg = cfg; consensus->new_element_cb = new_element_cb; consensus->new_element_cls = new_element_cls; - consensus->num_peers = num_peers; consensus->session_id = *session_id; - - if (0 == num_peers) - consensus->peers = NULL; - else if (num_peers > 0) - consensus->peers = - GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); - consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); + consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, + mq_handlers, mq_error_handler, consensus); GNUNET_assert (consensus->client != NULL); - join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + - (num_peers * sizeof (struct GNUNET_PeerIdentity)); + ev = GNUNET_MQ_msg_extra (join_msg, + (num_peers * sizeof (struct GNUNET_PeerIdentity)), + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - join_message_size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_join, consensus); + join_msg->session_id = consensus->session_id; + join_msg->start = GNUNET_TIME_absolute_hton (start); + join_msg->deadline = GNUNET_TIME_absolute_hton (deadline); + join_msg->num_peers = htonl (num_peers); + memcpy(&join_msg[1], + peers, + num_peers * sizeof (struct GNUNET_PeerIdentity)); - - GNUNET_assert (consensus->th != NULL); + GNUNET_MQ_send (consensus->mq, ev); return consensus; } +static void +idc_adapter (void *cls) +{ + struct InsertDoneInfo *i = cls; + i->idc (i->cls, GNUNET_OK); + GNUNET_free (i); +} /** * Insert an element in the set being reconsiled. Must not be called after @@ -442,38 +250,35 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, * * @param consensus handle for the consensus session * @param element the element to be inserted - * @param idc function called when we are done with this element and it + * @param idc function called when we are done with this element and it * is thus allowed to call GNUNET_CONSENSUS_insert again * @param idc_cls closure for 'idc' */ void GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, - const struct GNUNET_CONSENSUS_Element *element, + const struct GNUNET_SET_Element *element, GNUNET_CONSENSUS_InsertDoneCallback idc, void *idc_cls) { - struct QueuedMessage *qmsg; struct GNUNET_CONSENSUS_ElementMessage *element_msg; - size_t element_msg_size; + struct GNUNET_MQ_Envelope *ev; + struct InsertDoneInfo *i; LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); - element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + - element->size); + ev = GNUNET_MQ_msg_extra (element_msg, element->size, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - element_msg = GNUNET_malloc (element_msg_size); - element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - element_msg->header.size = htons (element_msg_size); memcpy (&element_msg[1], element->data, element->size); - qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); - qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; - qmsg->idc = idc; - qmsg->idc_cls = idc_cls; - - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); - - send_next (consensus); + if (NULL != idc) + { + i = GNUNET_new (struct InsertDoneInfo); + i->idc = idc; + i->cls = idc_cls; + GNUNET_MQ_notify_sent (ev, idc_adapter, i); + } + GNUNET_MQ_send (consensus->mq, ev); } @@ -484,19 +289,17 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, * inserted by the client. * * @param consensus consensus session - * @param timeout timeout after which the conculde callback + * @param deadline deadline after which the conculde callback * must be called * @param conclude called when the conclusion was successful * @param conclude_cls closure for the conclude callback */ void GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_TIME_Relative timeout, GNUNET_CONSENSUS_ConcludeCallback conclude, void *conclude_cls) { - struct QueuedMessage *qmsg; - struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; + struct GNUNET_MQ_Envelope *ev; GNUNET_assert (NULL != conclude); GNUNET_assert (NULL == consensus->conclude_cb); @@ -504,17 +307,8 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, consensus->conclude_cls = conclude_cls; consensus->conclude_cb = conclude; - conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); - conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); - conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); - conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); - - qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); - qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; - - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); - - send_next (consensus); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); + GNUNET_MQ_send (consensus->mq, ev); } @@ -527,18 +321,16 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, void GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) { - if (GNUNET_YES == consensus->may_not_destroy) + if (NULL != consensus->mq) { - LOG (GNUNET_ERROR_TYPE_ERROR, "destroy may not be called right now\n"); - GNUNET_assert (0); + GNUNET_MQ_destroy (consensus->mq); + consensus->mq = NULL; } - if (consensus->client != NULL) + if (NULL != consensus->client) { GNUNET_CLIENT_disconnect (consensus->client); consensus->client = NULL; } - if (NULL != consensus->peers) - GNUNET_free (consensus->peers); GNUNET_free (consensus); }