X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fconsensus%2Fconsensus_api.c;h=30ab9f36128714601c5301e75e211d3616f51e49;hb=1cfcc6a13f75ff69ec6ab851a471f1939cb5f295;hp=b1de10eddacb1f9bc8640333f5fc5d38901645c9;hpb=691d973ea5dc7e297f598c7261fc2704b381972f;p=oweals%2Fgnunet.git diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index b1de10edd..30ab9f361 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2012 Christian Grothoff (and other contributing authors) + Copyright (C) 2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -20,10 +20,11 @@ /** * @file consensus/consensus_api.c - * @brief + * @brief * @author Florian Dold */ #include "platform.h" +#include "gnunet_util_lib.h" #include "gnunet_protocols.h" #include "gnunet_client_lib.h" #include "gnunet_consensus_service.h" @@ -44,14 +45,14 @@ struct GNUNET_CONSENSUS_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Client connected to the consensus service, may be NULL if not connected. */ struct GNUNET_CLIENT_Connection *client; /** * Callback for new elements. Not called for elements added locally. */ - GNUNET_CONSENSUS_NewElementCallback new_element_cb; + GNUNET_CONSENSUS_ElementCallback new_element_cb; /** * Closure for new_element_cb @@ -59,46 +60,15 @@ struct GNUNET_CONSENSUS_Handle void *new_element_cls; /** - * Session identifier for the consensus session. + * The (local) session identifier for the consensus session. */ struct GNUNET_HashCode session_id; - /** - * Number of peers in the consensus. Optionally includes the local peer. - */ - int num_peers; - - /** - * Peer identities of peers in the consensus. Optionally includes the local peer. - */ - struct GNUNET_PeerIdentity *peers; - - /** - * Currently active transmit request. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - /** * GNUNES_YES iff the join message has been sent to the service. */ int joined; - /** - * Called when the current insertion operation finishes. - * NULL if there is no insert operation active. - */ - GNUNET_CONSENSUS_InsertDoneCallback idc; - - /** - * Closure for the insert done callback. - */ - void *idc_cls; - - /** - * An element that was requested to be inserted. - */ - struct GNUNET_CONSENSUS_Element *insert_element; - /** * Called when the conclude operation finishes or fails. */ @@ -113,263 +83,106 @@ struct GNUNET_CONSENSUS_Handle * Deadline for the conclude operation. */ struct GNUNET_TIME_Absolute conclude_deadline; -}; - - -static void -handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ElementMessage *msg) -{ - struct GNUNET_CONSENSUS_Element element; - element.type = msg->element_type; - element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); - element.data = &msg[1]; - consensus->new_element_cb(consensus->new_element_cls, &element); -} - -static void -handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) -{ - GNUNET_assert (NULL != consensus->conclude_cb); - consensus->conclude_cb(consensus->conclude_cls, - msg->num_peers, - (struct GNUNET_PeerIdentity *) &msg[1]); - consensus->conclude_cb = NULL; -} - - - -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_CONSENSUS_Handle *consensus = cls; - - if (msg == NULL) - { - /* Error, timeout, death */ - GNUNET_CLIENT_disconnect (consensus->client); - consensus->client = NULL; - consensus->new_element_cb(NULL, NULL); - if (NULL != consensus->idc) - { - consensus->idc(consensus->idc_cls, GNUNET_NO); - consensus->idc = NULL; - consensus->idc_cls = NULL; - } - return; - } - - switch (ntohs(msg->type)) - { - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: - handle_new_element(consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); - break; - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: - handle_conclude_done(consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); - break; - default: - LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring"); - } - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); -} - - + /** + * Message queue for the client. + */ + struct GNUNET_MQ_Handle *mq; +}; /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * FIXME: this should not bee necessary when the API + * issue has been fixed */ -static size_t -transmit_insert (void *cls, size_t size, void *buf) +struct InsertDoneInfo { - struct GNUNET_CONSENSUS_ElementMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; GNUNET_CONSENSUS_InsertDoneCallback idc; - int msize; - void *idc_cls; - - GNUNET_assert (NULL != buf); - - consensus = cls; - - GNUNET_assert (NULL != consensus->insert_element); - - consensus->th = NULL; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) + - consensus->insert_element->size; - - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - msg->header.size = htons (msize); - memcpy(&msg[1], - consensus->insert_element->data, - consensus->insert_element->size); - - - idc = consensus->idc; - consensus->idc = NULL; - idc_cls = consensus->idc_cls; - consensus->idc_cls = NULL; - idc(idc_cls, GNUNET_YES); - - return msize; -} + void *cls; +}; /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. + * Called when the server has sent is a new element * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls consensus handle + * @param mh element message */ -static size_t -transmit_join (void *cls, size_t size, void *buf) +static void +handle_new_element (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct GNUNET_CONSENSUS_JoinMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - LOG(GNUNET_ERROR_TYPE_DEBUG, "transmitting CLIENT_JOIN to service\n"); - - GNUNET_assert (NULL != buf); - - consensus = cls; - consensus->th = NULL; - consensus->joined = 1; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) + - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity); - - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); - msg->header.size = htons (msize); - msg->session_id = consensus->session_id; - msg->num_peers = htons (consensus->num_peers); - memcpy(&msg[1], - consensus->peers, - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); + struct GNUNET_CONSENSUS_Handle *consensus = cls; + const struct GNUNET_CONSENSUS_ElementMessage *msg + = (const struct GNUNET_CONSENSUS_ElementMessage *) mh; + struct GNUNET_SET_Element element; - if (consensus->insert_element != NULL) - { - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_insert, consensus); - } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "received new element\n"); + element.element_type = msg->element_type; + element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); + element.data = &msg[1]; - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); - - return msize; + consensus->new_element_cb (consensus->new_element_cls, &element); } /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. + * Called when the server has announced + * that the conclusion is over. * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls consensus handle + * @param msg conclude done message */ -static size_t -transmit_conclude (void *cls, size_t size, void *buf) +static void +handle_conclude_done (void *cls, + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_CONSENSUS_ConcludeMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); + struct GNUNET_CONSENSUS_Handle *consensus = cls; - consensus = cls; - consensus->th = NULL; + GNUNET_CONSENSUS_ConcludeCallback cc; - msg = buf; + GNUNET_MQ_destroy (consensus->mq); + consensus->mq = NULL; - msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); + GNUNET_CLIENT_disconnect (consensus->client); + consensus->client = NULL; - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); - msg->header.size = htons (msize); - msg->timeout = - GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline)); - return msize; + GNUNET_assert (NULL != (cc = consensus->conclude_cb)); + consensus->conclude_cb = NULL; + cc (consensus->conclude_cls); } /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. * - * @param cls the consensus handle - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls closure, same closure as for the message handlers + * @param error error code */ -static size_t -transmit_begin (void *cls, size_t size, void *buf) +static void +mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { - struct GNUNET_MessageHeader *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - consensus = cls; - consensus->th = NULL; - - msg = buf; - - msize = sizeof (struct GNUNET_MessageHeader); - - msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN); - msg->size = htons (msize); - - return msize; + LOG (GNUNET_ERROR_TYPE_WARNING, "consensus service disconnected us\n"); } /** * Create a consensus session. * - * @param cfg - * @param num_peers + * @param cfg configuration to use for connecting to the consensus service + * @param num_peers number of peers in the peers array * @param peers array of peers participating in this consensus session * Inclusion of the local peer is optional. * @param session_id session identifier * Allows a group of peers to have more than consensus session. + * @param start start time of the consensus, conclude should be called before + * the start time. + * @param deadline time when the consensus should have concluded * @param new_element_cb callback, called when a new element is added to the set by * another peer * @param new_element_cls closure for new_element @@ -380,57 +193,57 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, unsigned int num_peers, const struct GNUNET_PeerIdentity *peers, const struct GNUNET_HashCode *session_id, - GNUNET_CONSENSUS_NewElementCallback new_element_cb, + struct GNUNET_TIME_Absolute start, + struct GNUNET_TIME_Absolute deadline, + GNUNET_CONSENSUS_ElementCallback new_element_cb, void *new_element_cls) { struct GNUNET_CONSENSUS_Handle *consensus; - size_t join_message_size; - - - consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); + struct GNUNET_CONSENSUS_JoinMessage *join_msg; + struct GNUNET_MQ_Envelope *ev; + const static struct GNUNET_MQ_MessageHandler mq_handlers[] = { + {handle_new_element, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0}, + {handle_conclude_done, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0}, + GNUNET_MQ_HANDLERS_END + }; + + consensus = GNUNET_new (struct GNUNET_CONSENSUS_Handle); consensus->cfg = cfg; consensus->new_element_cb = new_element_cb; consensus->new_element_cls = new_element_cls; - consensus->num_peers = num_peers; consensus->session_id = *session_id; - - - - if (0 == num_peers) - { - consensus->peers = NULL; - } - else if (num_peers > 0) - { - - consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); - } - else - { - GNUNET_break (0); - } - - consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); + consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, + mq_handlers, mq_error_handler, consensus); GNUNET_assert (consensus->client != NULL); - join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + - (num_peers * sizeof (struct GNUNET_PeerIdentity)); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - join_message_size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_join, consensus); + ev = GNUNET_MQ_msg_extra (join_msg, + (num_peers * sizeof (struct GNUNET_PeerIdentity)), + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); + join_msg->session_id = consensus->session_id; + join_msg->start = GNUNET_TIME_absolute_hton (start); + join_msg->deadline = GNUNET_TIME_absolute_hton (deadline); + join_msg->num_peers = htonl (num_peers); + memcpy(&join_msg[1], + peers, + num_peers * sizeof (struct GNUNET_PeerIdentity)); - GNUNET_assert (consensus->th != NULL); - + GNUNET_MQ_send (consensus->mq, ev); return consensus; } +static void +idc_adapter (void *cls) +{ + struct InsertDoneInfo *i = cls; + i->idc (i->cls, GNUNET_OK); + GNUNET_free (i); +} /** * Insert an element in the set being reconsiled. Must not be called after @@ -438,91 +251,65 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, * * @param consensus handle for the consensus session * @param element the element to be inserted - * @param idc function called when we are done with this element and it + * @param idc function called when we are done with this element and it * is thus allowed to call GNUNET_CONSENSUS_insert again * @param idc_cls closure for 'idc' */ void GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, - const struct GNUNET_CONSENSUS_Element *element, + const struct GNUNET_SET_Element *element, GNUNET_CONSENSUS_InsertDoneCallback idc, void *idc_cls) { + struct GNUNET_CONSENSUS_ElementMessage *element_msg; + struct GNUNET_MQ_Envelope *ev; + struct InsertDoneInfo *i; - GNUNET_assert (NULL == consensus->idc); - GNUNET_assert (NULL == consensus->insert_element); + LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); - consensus->idc = idc; - consensus->idc_cls = idc_cls; - consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size); + ev = GNUNET_MQ_msg_extra (element_msg, element->size, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - if (consensus->joined == 0) + memcpy (&element_msg[1], element->data, element->size); + + if (NULL != idc) { - GNUNET_assert (NULL != consensus->th); - return; + i = GNUNET_new (struct InsertDoneInfo); + i->idc = idc; + i->cls = idc_cls; + GNUNET_MQ_notify_sent (ev, idc_adapter, i); } - - GNUNET_assert (NULL == consensus->th); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_insert, consensus); -} - - -/** - * Begin reconciling elements with other peers. - * - * @param consensus handle for the consensus session - */ -void -GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus) -{ - GNUNET_assert (NULL == consensus->idc); - GNUNET_assert (NULL == consensus->insert_element); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - sizeof (struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_begin, consensus); + GNUNET_MQ_send (consensus->mq, ev); } /** - * We are finished inserting new elements into the consensus; + * We are done with inserting new elements into the consensus; * try to conclude the consensus within a given time window. + * After conclude has been called, no further elements may be + * inserted by the client. * * @param consensus consensus session - * @param timeout timeout after which the conculde callback + * @param deadline deadline after which the conculde callback * must be called * @param conclude called when the conclusion was successful * @param conclude_cls closure for the conclude callback */ void GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_TIME_Relative timeout, GNUNET_CONSENSUS_ConcludeCallback conclude, void *conclude_cls) { - GNUNET_assert (NULL == consensus->th); + struct GNUNET_MQ_Envelope *ev; + + GNUNET_assert (NULL != conclude); GNUNET_assert (NULL == consensus->conclude_cb); consensus->conclude_cls = conclude_cls; consensus->conclude_cb = conclude; - consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - sizeof (struct GNUNET_CONSENSUS_ConcludeMessage), - timeout, - GNUNET_NO, &transmit_conclude, consensus); - if (NULL == consensus->th) - { - conclude(conclude_cls, 0, NULL); - } + + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); + GNUNET_MQ_send (consensus->mq, ev); } @@ -535,12 +322,16 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, void GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) { - if (consensus->client != NULL) + if (NULL != consensus->mq) + { + GNUNET_MQ_destroy (consensus->mq); + consensus->mq = NULL; + } + if (NULL != consensus->client) { GNUNET_CLIENT_disconnect (consensus->client); consensus->client = NULL; } - GNUNET_free (consensus->peers); GNUNET_free (consensus); }