X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fconsensus%2Fconsensus_api.c;h=d8e65c52fd202ecd150737219c9189183b605cb8;hb=9a50e92d4a43f917b0cd1ed34d4932470cb3a3da;hp=90b0fdf16ddbea86acd9c0ecd95cce309fcc272f;hpb=12dc2e752c7faf28d63049b2b308ce5316df8dc3;p=oweals%2Fgnunet.git diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 90b0fdf16..d8e65c52f 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -20,10 +20,11 @@ /** * @file consensus/consensus_api.c - * @brief + * @brief * @author Florian Dold */ #include "platform.h" +#include "gnunet_util_lib.h" #include "gnunet_protocols.h" #include "gnunet_client_lib.h" #include "gnunet_consensus_service.h" @@ -44,14 +45,14 @@ struct GNUNET_CONSENSUS_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Client connected to the consensus service, may be NULL if not connected. */ struct GNUNET_CLIENT_Connection *client; /** * Callback for new elements. Not called for elements added locally. */ - GNUNET_CONSENSUS_NewElementCallback new_element_cb; + GNUNET_CONSENSUS_ElementCallback new_element_cb; /** * Closure for new_element_cb @@ -59,46 +60,15 @@ struct GNUNET_CONSENSUS_Handle void *new_element_cls; /** - * Session identifier for the consensus session. + * The (local) session identifier for the consensus session. */ struct GNUNET_HashCode session_id; - /** - * Number of peers in the consensus. Optionally includes the local peer. - */ - int num_peers; - - /** - * Peer identities of peers in the consensus. Optionally includes the local peer. - */ - struct GNUNET_PeerIdentity *peers; - - /** - * Currently active transmit request. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - /** * GNUNES_YES iff the join message has been sent to the service. */ int joined; - /** - * Called when the current insertion operation finishes. - * NULL if there is no insert operation active. - */ - GNUNET_CONSENSUS_InsertDoneCallback idc; - - /** - * Closure for the insert done callback. - */ - void *idc_cls; - - /** - * An element that was requested to be inserted. - */ - struct GNUNET_CONSENSUS_Element *insert_element; - /** * Called when the conclude operation finishes or fails. */ @@ -113,261 +83,75 @@ struct GNUNET_CONSENSUS_Handle * Deadline for the conclude operation. */ struct GNUNET_TIME_Absolute conclude_deadline; -}; - - -static void -handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ElementMessage *msg) -{ - struct GNUNET_CONSENSUS_Element element; - element.type = msg->element_type; - element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); - element.data = &msg[1]; - consensus->new_element_cb (consensus->new_element_cls, &element); -} - -static void -handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) -{ - GNUNET_assert (NULL != consensus->conclude_cb); - consensus->conclude_cb(consensus->conclude_cls, - msg->num_peers, - (struct GNUNET_PeerIdentity *) &msg[1]); - consensus->conclude_cb = NULL; -} - - - -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_CONSENSUS_Handle *consensus = cls; - - LOG (GNUNET_ERROR_TYPE_INFO, "received message from consensus service\n"); - - if (msg == NULL) - { - /* Error, timeout, death */ - LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n"); - GNUNET_CLIENT_disconnect (consensus->client); - consensus->client = NULL; - consensus->new_element_cb (NULL, NULL); - if (NULL != consensus->idc) - { - consensus->idc(consensus->idc_cls, GNUNET_NO); - consensus->idc = NULL; - consensus->idc_cls = NULL; - } - return; - } - - switch (ntohs(msg->type)) - { - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: - handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); - break; - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: - handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); - break; - default: - LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring"); - } - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); -} - - + /** + * Message queue for the client. + */ + struct GNUNET_MQ_Handle *mq; +}; /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * FIXME: this should not bee necessary when the API + * issue has been fixed */ -static size_t -transmit_insert (void *cls, size_t size, void *buf) +struct InsertDoneInfo { - struct GNUNET_CONSENSUS_ElementMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; GNUNET_CONSENSUS_InsertDoneCallback idc; - int msize; - void *idc_cls; - - GNUNET_assert (NULL != buf); - - consensus = cls; - - GNUNET_assert (NULL != consensus->insert_element); - - consensus->th = NULL; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) + - consensus->insert_element->size; - - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - msg->header.size = htons (msize); - memcpy (&msg[1], - consensus->insert_element->data, - consensus->insert_element->size); - - - idc = consensus->idc; - consensus->idc = NULL; - idc_cls = consensus->idc_cls; - consensus->idc_cls = NULL; - idc (idc_cls, GNUNET_YES); - - return msize; -} - - -/** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_join (void *cls, size_t size, void *buf) -{ - struct GNUNET_CONSENSUS_JoinMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); - - consensus = cls; - consensus->th = NULL; - consensus->joined = 1; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) + - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity); - - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); - msg->header.size = htons (msize); - msg->session_id = consensus->session_id; - msg->num_peers = htons (consensus->num_peers); - memcpy(&msg[1], - consensus->peers, - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); - - if (consensus->insert_element != NULL) - { - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_insert, consensus); - } - - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); - - return msize; -} + void *cls; +}; /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. + * Called when the server has sent is a new element * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls consensus handle + * @param mh element message */ -static size_t -transmit_conclude (void *cls, size_t size, void *buf) +static void +handle_new_element (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct GNUNET_CONSENSUS_ConcludeMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - consensus = cls; - consensus->th = NULL; - - msg = buf; + struct GNUNET_CONSENSUS_Handle *consensus = cls; + const struct GNUNET_CONSENSUS_ElementMessage *msg + = (const struct GNUNET_CONSENSUS_ElementMessage *) mh; + struct GNUNET_SET_Element element; - msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); + LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); - msg->header.size = htons (msize); - msg->timeout = - GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline)); + element.type = msg->element_type; + element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); + element.data = &msg[1]; - return msize; + consensus->new_element_cb (consensus->new_element_cls, &element); } /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. + * Called when the server has announced + * that the conclusion is over. * - * @param cls the consensus handle - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls consensus handle + * @param msg conclude done message */ -static size_t -transmit_begin (void *cls, size_t size, void *buf) +static void +handle_conclude_done (void *cls, + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_MessageHeader *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - consensus = cls; - consensus->th = NULL; - - msg = buf; - - msize = sizeof (struct GNUNET_MessageHeader); + struct GNUNET_CONSENSUS_Handle *consensus = cls; - msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN); - msg->size = htons (msize); + GNUNET_CONSENSUS_ConcludeCallback cc; - return msize; + GNUNET_assert (NULL != (cc = consensus->conclude_cb)); + consensus->conclude_cb = NULL; + cc (consensus->conclude_cls); } /** * Create a consensus session. * - * @param cfg - * @param num_peers + * @param cfg configuration to use for connecting to the consensus service + * @param num_peers number of peers in the peers array * @param peers array of peers participating in this consensus session * Inclusion of the local peer is optional. * @param session_id session identifier @@ -382,51 +166,53 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, unsigned int num_peers, const struct GNUNET_PeerIdentity *peers, const struct GNUNET_HashCode *session_id, - GNUNET_CONSENSUS_NewElementCallback new_element_cb, + GNUNET_CONSENSUS_ElementCallback new_element_cb, void *new_element_cls) { struct GNUNET_CONSENSUS_Handle *consensus; - size_t join_message_size; + struct GNUNET_CONSENSUS_JoinMessage *join_msg; + struct GNUNET_MQ_Envelope *ev; + const static struct GNUNET_MQ_MessageHandler mq_handlers[] = { + {handle_new_element, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0}, + {handle_conclude_done, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0}, + GNUNET_MQ_HANDLERS_END + }; consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); consensus->cfg = cfg; consensus->new_element_cb = new_element_cb; consensus->new_element_cls = new_element_cls; - consensus->num_peers = num_peers; consensus->session_id = *session_id; - - if (0 == num_peers) - { - consensus->peers = NULL; - } - else if (num_peers > 0) - { - consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); - } - else - { - GNUNET_break (0); - } - consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); + consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, + mq_handlers, NULL, consensus); GNUNET_assert (consensus->client != NULL); - join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + - (num_peers * sizeof (struct GNUNET_PeerIdentity)); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - join_message_size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_join, consensus); + ev = GNUNET_MQ_msg_extra (join_msg, + (num_peers * sizeof (struct GNUNET_PeerIdentity)), + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); - GNUNET_assert (consensus->th != NULL); + join_msg->session_id = consensus->session_id; + join_msg->num_peers = htonl (num_peers); + memcpy(&join_msg[1], + peers, + num_peers * sizeof (struct GNUNET_PeerIdentity)); + GNUNET_MQ_send (consensus->mq, ev); return consensus; } +static void +idc_adapter (void *cls) +{ + struct InsertDoneInfo *i = cls; + i->idc (i->cls, GNUNET_OK); + GNUNET_free (i); +} /** * Insert an element in the set being reconsiled. Must not be called after @@ -434,62 +220,43 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, * * @param consensus handle for the consensus session * @param element the element to be inserted - * @param idc function called when we are done with this element and it + * @param idc function called when we are done with this element and it * is thus allowed to call GNUNET_CONSENSUS_insert again * @param idc_cls closure for 'idc' */ void GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, - const struct GNUNET_CONSENSUS_Element *element, + const struct GNUNET_SET_Element *element, GNUNET_CONSENSUS_InsertDoneCallback idc, void *idc_cls) { + struct GNUNET_CONSENSUS_ElementMessage *element_msg; + struct GNUNET_MQ_Envelope *ev; + struct InsertDoneInfo *i; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); - GNUNET_assert (NULL == consensus->idc); - GNUNET_assert (NULL == consensus->insert_element); + ev = GNUNET_MQ_msg_extra (element_msg, element->size, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - consensus->idc = idc; - consensus->idc_cls = idc_cls; - consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size); + memcpy (&element_msg[1], element->data, element->size); - if (consensus->joined == 0) + if (NULL != idc) { - GNUNET_assert (NULL != consensus->th); - return; + i = GNUNET_new (struct InsertDoneInfo); + i->idc = idc; + i->cls = idc_cls; + GNUNET_MQ_notify_sent (ev, idc_adapter, i); } - - GNUNET_assert (NULL == consensus->th); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_insert, consensus); -} - - -/** - * Begin reconciling elements with other peers. - * - * @param consensus handle for the consensus session - */ -void -GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus) -{ - GNUNET_assert (NULL == consensus->idc); - GNUNET_assert (NULL == consensus->insert_element); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - sizeof (struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_begin, consensus); + GNUNET_MQ_send (consensus->mq, ev); } /** - * We are finished inserting new elements into the consensus; + * We are done with inserting new elements into the consensus; * try to conclude the consensus within a given time window. + * After conclude has been called, no further elements may be + * inserted by the client. * * @param consensus consensus session * @param timeout timeout after which the conculde callback @@ -503,22 +270,19 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, GNUNET_CONSENSUS_ConcludeCallback conclude, void *conclude_cls) { - GNUNET_assert (NULL == consensus->th); + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; + + GNUNET_assert (NULL != conclude); GNUNET_assert (NULL == consensus->conclude_cb); consensus->conclude_cls = conclude_cls; consensus->conclude_cb = conclude; - consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - sizeof (struct GNUNET_CONSENSUS_ConcludeMessage), - timeout, - GNUNET_NO, &transmit_conclude, consensus); - if (NULL == consensus->th) - { - conclude(conclude_cls, 0, NULL); - } + + ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); + conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); + + GNUNET_MQ_send (consensus->mq, ev); } @@ -536,7 +300,6 @@ GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) GNUNET_CLIENT_disconnect (consensus->client); consensus->client = NULL; } - GNUNET_free (consensus->peers); GNUNET_free (consensus); }