X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fconsensus%2Fconsensus_api.c;h=d8e65c52fd202ecd150737219c9189183b605cb8;hb=9a50e92d4a43f917b0cd1ed34d4932470cb3a3da;hp=0d7e6c8e46b515cf8054ab2de0cf66f6405f05f9;hpb=6b1559589726297aa372048b4e388d2e1473a6f6;p=oweals%2Fgnunet.git diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 0d7e6c8e4..d8e65c52f 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -20,10 +20,11 @@ /** * @file consensus/consensus_api.c - * @brief + * @brief * @author Florian Dold */ #include "platform.h" +#include "gnunet_util_lib.h" #include "gnunet_protocols.h" #include "gnunet_client_lib.h" #include "gnunet_consensus_service.h" @@ -44,14 +45,14 @@ struct GNUNET_CONSENSUS_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Client connected to the consensus service, may be NULL if not connected. */ struct GNUNET_CLIENT_Connection *client; /** * Callback for new elements. Not called for elements added locally. */ - GNUNET_CONSENSUS_NewElementCallback new_element_cb; + GNUNET_CONSENSUS_ElementCallback new_element_cb; /** * Closure for new_element_cb @@ -59,46 +60,15 @@ struct GNUNET_CONSENSUS_Handle void *new_element_cls; /** - * Session identifier for the consensus session. + * The (local) session identifier for the consensus session. */ struct GNUNET_HashCode session_id; - /** - * Number of peers in the consensus. Optionally includes the local peer. - */ - int num_peers; - - /** - * Peer identities of peers in the consensus. Optionally includes the local peer. - */ - struct GNUNET_PeerIdentity *peers; - - /** - * Currently active transmit request. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - /** * GNUNES_YES iff the join message has been sent to the service. */ int joined; - /** - * Called when the current insertion operation finishes. - * NULL if there is no insert operation active. - */ - GNUNET_CONSENSUS_InsertDoneCallback idc; - - /** - * Closure for the insert done callback. - */ - void *idc_cls; - - /** - * An element that was requested to be inserted. - */ - struct GNUNET_CONSENSUS_Element *insert_element; - /** * Called when the conclude operation finishes or fails. */ @@ -113,235 +83,80 @@ struct GNUNET_CONSENSUS_Handle * Deadline for the conclude operation. */ struct GNUNET_TIME_Absolute conclude_deadline; -}; - - -static void -handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ElementMessage *msg) -{ - struct GNUNET_CONSENSUS_Element element; - element.type = msg->element_type; - element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); - element.data = &msg[1]; - consensus->new_element_cb(consensus->new_element_cls, &element); -} - -static void -handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) -{ - GNUNET_assert (NULL != consensus->conclude_cb); - consensus->conclude_cb(consensus->conclude_cls, - msg->num_peers, - (struct GNUNET_PeerIdentity *) &msg[1]); - consensus->conclude_cb = NULL; -} - + /** + * Message queue for the client. + */ + struct GNUNET_MQ_Handle *mq; +}; /** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error + * FIXME: this should not bee necessary when the API + * issue has been fixed */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +struct InsertDoneInfo { - struct GNUNET_CONSENSUS_Handle *consensus = cls; GNUNET_CONSENSUS_InsertDoneCallback idc; - void *idc_cls; - - if (msg == NULL) - { - /* Error, timeout, death */ - GNUNET_CLIENT_disconnect (consensus->client); - consensus->client = NULL; - consensus->new_element_cb(NULL, NULL); - if (NULL != consensus->idc) - { - consensus->idc(consensus->idc_cls, GNUNET_NO); - consensus->idc = NULL; - consensus->idc_cls = NULL; - } - return; - } - - switch (ntohs(msg->type)) - { - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT_ACK: - idc = consensus->idc; - consensus->idc = NULL; - idc_cls = consensus->idc_cls; - consensus->idc_cls = NULL; - idc(idc_cls, GNUNET_YES); - break; - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: - handle_new_element(consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); - break; - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: - handle_conclude_done(consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); - break; - default: - LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring"); - } - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); -} - - - - -/** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_insert (void *cls, size_t size, void *buf) -{ - struct GNUNET_CONSENSUS_ElementMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - consensus = cls; - - GNUNET_assert (NULL != consensus->insert_element); - - consensus->th = NULL; - - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) + - consensus->insert_element->size; - - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - msg->header.size = htons (msize); - memcpy(&msg[1], - consensus->insert_element->data, - consensus->insert_element->size); - - return msize; -} + void *cls; +}; /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. + * Called when the server has sent is a new element * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls consensus handle + * @param mh element message */ -static size_t -transmit_join (void *cls, size_t size, void *buf) +static void +handle_new_element (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct GNUNET_CONSENSUS_JoinMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - LOG(GNUNET_ERROR_TYPE_DEBUG, "transmitting CLIENT_JOIN to service\n"); - - GNUNET_assert (NULL != buf); - - consensus = cls; - consensus->th = NULL; - consensus->joined = 1; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) + - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity); + struct GNUNET_CONSENSUS_Handle *consensus = cls; + const struct GNUNET_CONSENSUS_ElementMessage *msg + = (const struct GNUNET_CONSENSUS_ElementMessage *) mh; + struct GNUNET_SET_Element element; - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); - msg->header.size = htons (msize); - msg->session_id = consensus->session_id; - msg->num_peers = htons (consensus->num_peers); - memcpy(&msg[1], - consensus->peers, - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); - - if (consensus->insert_element != NULL) - { - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_insert, consensus); - } + LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); + element.type = msg->element_type; + element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); + element.data = &msg[1]; - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); - - return msize; + consensus->new_element_cb (consensus->new_element_cls, &element); } /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. + * Called when the server has announced + * that the conclusion is over. * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls consensus handle + * @param msg conclude done message */ -static size_t -transmit_conclude (void *cls, size_t size, void *buf) +static void +handle_conclude_done (void *cls, + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_CONSENSUS_ConcludeMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - consensus = cls; - consensus->th = NULL; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); + struct GNUNET_CONSENSUS_Handle *consensus = cls; - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); - msg->header.size = htons (msize); - msg->timeout = - GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline)); + GNUNET_CONSENSUS_ConcludeCallback cc; - return msize; + GNUNET_assert (NULL != (cc = consensus->conclude_cb)); + consensus->conclude_cb = NULL; + cc (consensus->conclude_cls); } - - /** * Create a consensus session. * - * @param cfg - * @param num_peers + * @param cfg configuration to use for connecting to the consensus service + * @param num_peers number of peers in the peers array * @param peers array of peers participating in this consensus session * Inclusion of the local peer is optional. * @param session_id session identifier * Allows a group of peers to have more than consensus session. - * @param num_initial_elements number of entries in the 'initial_elements' array - * @param initial_elements our elements for the consensus (each of 'element_size' - * @param new_element callback, called when a new element is added to the set by + * @param new_element_cb callback, called when a new element is added to the set by * another peer * @param new_element_cls closure for new_element * @return handle to use, NULL on error @@ -351,61 +166,53 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, unsigned int num_peers, const struct GNUNET_PeerIdentity *peers, const struct GNUNET_HashCode *session_id, - /* - unsigned int num_initial_elements, - const struct GNUNET_CONSENSUS_Element **initial_elements, - */ - GNUNET_CONSENSUS_NewElementCallback new_element, + GNUNET_CONSENSUS_ElementCallback new_element_cb, void *new_element_cls) { struct GNUNET_CONSENSUS_Handle *consensus; - size_t join_message_size; - + struct GNUNET_CONSENSUS_JoinMessage *join_msg; + struct GNUNET_MQ_Envelope *ev; + const static struct GNUNET_MQ_MessageHandler mq_handlers[] = { + {handle_new_element, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0}, + {handle_conclude_done, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0}, + GNUNET_MQ_HANDLERS_END + }; consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); consensus->cfg = cfg; - consensus->new_element_cb = new_element; + consensus->new_element_cb = new_element_cb; consensus->new_element_cls = new_element_cls; - consensus->num_peers = num_peers; consensus->session_id = *session_id; - - - - if (0 == num_peers) - { - consensus->peers = NULL; - } - else if (num_peers > 0) - { - - consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); - } - else - { - GNUNET_break (0); - } - - consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); + consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, + mq_handlers, NULL, consensus); GNUNET_assert (consensus->client != NULL); - join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + - (num_peers * sizeof (struct GNUNET_PeerIdentity)); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - join_message_size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_join, consensus); + ev = GNUNET_MQ_msg_extra (join_msg, + (num_peers * sizeof (struct GNUNET_PeerIdentity)), + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); + join_msg->session_id = consensus->session_id; + join_msg->num_peers = htonl (num_peers); + memcpy(&join_msg[1], + peers, + num_peers * sizeof (struct GNUNET_PeerIdentity)); - GNUNET_assert (consensus->th != NULL); - + GNUNET_MQ_send (consensus->mq, ev); return consensus; } +static void +idc_adapter (void *cls) +{ + struct InsertDoneInfo *i = cls; + i->idc (i->cls, GNUNET_OK); + GNUNET_free (i); +} /** * Insert an element in the set being reconsiled. Must not be called after @@ -413,43 +220,43 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, * * @param consensus handle for the consensus session * @param element the element to be inserted - * @param idc function called when we are done with this element and it + * @param idc function called when we are done with this element and it * is thus allowed to call GNUNET_CONSENSUS_insert again * @param idc_cls closure for 'idc' */ void GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, - const struct GNUNET_CONSENSUS_Element *element, + const struct GNUNET_SET_Element *element, GNUNET_CONSENSUS_InsertDoneCallback idc, void *idc_cls) { + struct GNUNET_CONSENSUS_ElementMessage *element_msg; + struct GNUNET_MQ_Envelope *ev; + struct InsertDoneInfo *i; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); - GNUNET_assert (NULL == consensus->idc); - GNUNET_assert (NULL == consensus->insert_element); + ev = GNUNET_MQ_msg_extra (element_msg, element->size, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - consensus->idc = idc; - consensus->idc_cls = idc_cls; - consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size); + memcpy (&element_msg[1], element->data, element->size); - if (consensus->joined == 0) + if (NULL != idc) { - GNUNET_assert (NULL != consensus->th); - return; + i = GNUNET_new (struct InsertDoneInfo); + i->idc = idc; + i->cls = idc_cls; + GNUNET_MQ_notify_sent (ev, idc_adapter, i); } - - GNUNET_assert (NULL == consensus->th); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_insert, consensus); + GNUNET_MQ_send (consensus->mq, ev); } /** - * We are finished inserting new elements into the consensus; + * We are done with inserting new elements into the consensus; * try to conclude the consensus within a given time window. + * After conclude has been called, no further elements may be + * inserted by the client. * * @param consensus consensus session * @param timeout timeout after which the conculde callback @@ -463,22 +270,19 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, GNUNET_CONSENSUS_ConcludeCallback conclude, void *conclude_cls) { - GNUNET_assert (NULL == consensus->th); + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; + + GNUNET_assert (NULL != conclude); GNUNET_assert (NULL == consensus->conclude_cb); consensus->conclude_cls = conclude_cls; consensus->conclude_cb = conclude; - consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - sizeof (struct GNUNET_CONSENSUS_ConcludeMessage), - timeout, - GNUNET_NO, &transmit_conclude, consensus); - if (NULL == consensus->th) - { - conclude(conclude_cls, 0, NULL); - } + + ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); + conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); + + GNUNET_MQ_send (consensus->mq, ev); } @@ -496,7 +300,6 @@ GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) GNUNET_CLIENT_disconnect (consensus->client); consensus->client = NULL; } - GNUNET_free (consensus->peers); GNUNET_free (consensus); }