X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fconsensus%2Fconsensus_api.c;h=30ab9f36128714601c5301e75e211d3616f51e49;hb=1cfcc6a13f75ff69ec6ab851a471f1939cb5f295;hp=ba0e69e48d89f46689c1f6510ae08967c6aa305b;hpb=f7eb3ed6bb391e9f87bcb3535bf04c4aeec2f7c1;p=oweals%2Fgnunet.git diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index ba0e69e48..30ab9f361 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2012 Christian Grothoff (and other contributing authors) + Copyright (C) 2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -20,7 +20,7 @@ /** * @file consensus/consensus_api.c - * @brief + * @brief * @author Florian Dold */ #include "platform.h" @@ -33,37 +33,6 @@ #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) -/** - * Actions that can be queued. - */ -struct QueuedMessage -{ - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *next; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *prev; - - /** - * The actual queued message. - */ - struct GNUNET_MessageHeader *msg; - - /** - * Will be called after transmit, if not NULL - */ - GNUNET_CONSENSUS_InsertDoneCallback idc; - - /** - * Closure for idc - */ - void *idc_cls; -}; - /** * Handle for the service. @@ -95,31 +64,11 @@ struct GNUNET_CONSENSUS_Handle */ struct GNUNET_HashCode session_id; - /** - * Number of peers in the consensus. Optionally includes the local peer. - */ - int num_peers; - - /** - * Peer identities of peers participating in the consensus, includes the local peer. - */ - struct GNUNET_PeerIdentity **peers; - - /** - * Currently active transmit request. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - /** * GNUNES_YES iff the join message has been sent to the service. */ int joined; - /** - * Closure for the insert done callback. - */ - void *idc_cls; - /** * Called when the conclude operation finishes or fails. */ @@ -135,243 +84,93 @@ struct GNUNET_CONSENSUS_Handle */ struct GNUNET_TIME_Absolute conclude_deadline; - unsigned int conclude_min_size; - - struct QueuedMessage *messages_head; - struct QueuedMessage *messages_tail; + /** + * Message queue for the client. + */ + struct GNUNET_MQ_Handle *mq; }; - - -/** - * Schedule transmitting the next message. - * - * @param consensus consensus handle - */ -static void -send_next (struct GNUNET_CONSENSUS_Handle *consensus); - - /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * FIXME: this should not bee necessary when the API + * issue has been fixed */ -static size_t -transmit_queued (void *cls, size_t size, - void *buf) +struct InsertDoneInfo { - struct GNUNET_CONSENSUS_Handle *consensus; - struct QueuedMessage *qmsg; - size_t msg_size; - - consensus = (struct GNUNET_CONSENSUS_Handle *) cls; - consensus->th = NULL; - - qmsg = consensus->messages_head; - GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); - - if (NULL == buf) - { - if (NULL != qmsg->idc) - { - qmsg->idc (qmsg->idc_cls, GNUNET_YES); - } - return 0; - } - - msg_size = ntohs (qmsg->msg->size); - - GNUNET_assert (size >= msg_size); - - memcpy (buf, qmsg->msg, msg_size); - if (NULL != qmsg->idc) - { - qmsg->idc (qmsg->idc_cls, GNUNET_YES); - } - - /* FIXME: free the messages */ - - send_next (consensus); - - return msg_size; -} - - -/** - * Schedule transmitting the next message. - * - * @param consensus consensus handle - */ -static void -send_next (struct GNUNET_CONSENSUS_Handle *consensus) -{ - if (NULL != consensus->th) - return; - - if (NULL != consensus->messages_head) - { - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_queued, consensus); - } -} - -static void -queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg) -{ - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm); -} + GNUNET_CONSENSUS_InsertDoneCallback idc; + void *cls; +}; /** * Called when the server has sent is a new element - * - * @param consensus consensus handle - * @param msg element message + * + * @param cls consensus handle + * @param mh element message */ static void -handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ElementMessage *msg) +handle_new_element (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct GNUNET_CONSENSUS_Element element; - struct GNUNET_CONSENSUS_AckMessage *ack_msg; - int ret; + struct GNUNET_CONSENSUS_Handle *consensus = cls; + const struct GNUNET_CONSENSUS_ElementMessage *msg + = (const struct GNUNET_CONSENSUS_ElementMessage *) mh; + struct GNUNET_SET_Element element; - LOG (GNUNET_ERROR_TYPE_INFO, "received new element\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "received new element\n"); - element.type = msg->element_type; + element.element_type = msg->element_type; element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); element.data = &msg[1]; - ret = consensus->new_element_cb (consensus->new_element_cls, &element); - - ack_msg = GNUNET_malloc (sizeof *ack_msg); - ack_msg->header.size = htons (sizeof *ack_msg); - ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); - ack_msg->keep = ret; - - queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg); - - send_next (consensus); + consensus->new_element_cb (consensus->new_element_cls, &element); } /** * Called when the server has announced * that the conclusion is over. - * - * @param consensus consensus handle + * + * @param cls consensus handle * @param msg conclude done message */ static void -handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) +handle_conclude_done (void *cls, + const struct GNUNET_MessageHeader *msg) { - GNUNET_assert (NULL != consensus->conclude_cb); - consensus->conclude_cb (consensus->conclude_cls, NULL); - consensus->conclude_cb = NULL; -} - + struct GNUNET_CONSENSUS_Handle *consensus = cls; + GNUNET_CONSENSUS_ConcludeCallback cc; -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_CONSENSUS_Handle *consensus = cls; + GNUNET_MQ_destroy (consensus->mq); + consensus->mq = NULL; - LOG (GNUNET_ERROR_TYPE_INFO, "received message from consensus service\n"); + GNUNET_CLIENT_disconnect (consensus->client); + consensus->client = NULL; - if (msg == NULL) - { - /* Error, timeout, death */ - LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n"); - GNUNET_CLIENT_disconnect (consensus->client); - consensus->client = NULL; - consensus->new_element_cb (NULL, NULL); - return; - } - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: - handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); - break; - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: - handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); - break; - default: - GNUNET_break (0); - } - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_assert (NULL != (cc = consensus->conclude_cb)); + consensus->conclude_cb = NULL; + cc (consensus->conclude_cls); } + /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls closure, same closure as for the message handlers + * @param error error code */ -static size_t -transmit_join (void *cls, size_t size, void *buf) +static void +mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { - struct GNUNET_CONSENSUS_JoinMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - LOG (GNUNET_ERROR_TYPE_INFO, "transmitting join message\n"); - - consensus = cls; - consensus->th = NULL; - consensus->joined = 1; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) + - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity); - - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); - msg->header.size = htons (msize); - msg->session_id = consensus->session_id; - msg->num_peers = htons (consensus->num_peers); - if (0 != msg->num_peers) - memcpy(&msg[1], - consensus->peers, - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); - - send_next (consensus); - - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); - - return msize; + LOG (GNUNET_ERROR_TYPE_WARNING, "consensus service disconnected us\n"); } + /** * Create a consensus session. * @@ -381,6 +180,9 @@ transmit_join (void *cls, size_t size, void *buf) * Inclusion of the local peer is optional. * @param session_id session identifier * Allows a group of peers to have more than consensus session. + * @param start start time of the consensus, conclude should be called before + * the start time. + * @param deadline time when the consensus should have concluded * @param new_element_cb callback, called when a new element is added to the set by * another peer * @param new_element_cls closure for new_element @@ -391,44 +193,57 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, unsigned int num_peers, const struct GNUNET_PeerIdentity *peers, const struct GNUNET_HashCode *session_id, + struct GNUNET_TIME_Absolute start, + struct GNUNET_TIME_Absolute deadline, GNUNET_CONSENSUS_ElementCallback new_element_cb, void *new_element_cls) { struct GNUNET_CONSENSUS_Handle *consensus; - size_t join_message_size; - - consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); + struct GNUNET_CONSENSUS_JoinMessage *join_msg; + struct GNUNET_MQ_Envelope *ev; + const static struct GNUNET_MQ_MessageHandler mq_handlers[] = { + {handle_new_element, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0}, + {handle_conclude_done, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0}, + GNUNET_MQ_HANDLERS_END + }; + + consensus = GNUNET_new (struct GNUNET_CONSENSUS_Handle); consensus->cfg = cfg; consensus->new_element_cb = new_element_cb; consensus->new_element_cls = new_element_cls; - consensus->num_peers = num_peers; consensus->session_id = *session_id; - - if (0 == num_peers) - consensus->peers = NULL; - else if (num_peers > 0) - consensus->peers = - GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); - consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); + consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, + mq_handlers, mq_error_handler, consensus); GNUNET_assert (consensus->client != NULL); - join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + - (num_peers * sizeof (struct GNUNET_PeerIdentity)); + ev = GNUNET_MQ_msg_extra (join_msg, + (num_peers * sizeof (struct GNUNET_PeerIdentity)), + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - join_message_size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_join, consensus); + join_msg->session_id = consensus->session_id; + join_msg->start = GNUNET_TIME_absolute_hton (start); + join_msg->deadline = GNUNET_TIME_absolute_hton (deadline); + join_msg->num_peers = htonl (num_peers); + memcpy(&join_msg[1], + peers, + num_peers * sizeof (struct GNUNET_PeerIdentity)); - - GNUNET_assert (consensus->th != NULL); + GNUNET_MQ_send (consensus->mq, ev); return consensus; } +static void +idc_adapter (void *cls) +{ + struct InsertDoneInfo *i = cls; + i->idc (i->cls, GNUNET_OK); + GNUNET_free (i); +} /** * Insert an element in the set being reconsiled. Must not be called after @@ -436,38 +251,35 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, * * @param consensus handle for the consensus session * @param element the element to be inserted - * @param idc function called when we are done with this element and it + * @param idc function called when we are done with this element and it * is thus allowed to call GNUNET_CONSENSUS_insert again * @param idc_cls closure for 'idc' */ void GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, - const struct GNUNET_CONSENSUS_Element *element, + const struct GNUNET_SET_Element *element, GNUNET_CONSENSUS_InsertDoneCallback idc, void *idc_cls) { - struct QueuedMessage *qmsg; struct GNUNET_CONSENSUS_ElementMessage *element_msg; - size_t element_msg_size; + struct GNUNET_MQ_Envelope *ev; + struct InsertDoneInfo *i; - LOG (GNUNET_ERROR_TYPE_INFO, "inserting, size=%llu\n", element->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); - element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + - element->size); + ev = GNUNET_MQ_msg_extra (element_msg, element->size, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - element_msg = GNUNET_malloc (element_msg_size); - element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - element_msg->header.size = htons (element_msg_size); memcpy (&element_msg[1], element->data, element->size); - qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); - qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; - qmsg->idc = idc; - qmsg->idc_cls = idc_cls; - - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); - - send_next (consensus); + if (NULL != idc) + { + i = GNUNET_new (struct InsertDoneInfo); + i->idc = idc; + i->cls = idc_cls; + GNUNET_MQ_notify_sent (ev, idc_adapter, i); + } + GNUNET_MQ_send (consensus->mq, ev); } @@ -478,20 +290,17 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, * inserted by the client. * * @param consensus consensus session - * @param timeout timeout after which the conculde callback + * @param deadline deadline after which the conculde callback * must be called * @param conclude called when the conclusion was successful * @param conclude_cls closure for the conclude callback */ void GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_TIME_Relative timeout, - unsigned int min_group_size_in_consensus, GNUNET_CONSENSUS_ConcludeCallback conclude, void *conclude_cls) { - struct QueuedMessage *qmsg; - struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; + struct GNUNET_MQ_Envelope *ev; GNUNET_assert (NULL != conclude); GNUNET_assert (NULL == consensus->conclude_cb); @@ -499,18 +308,8 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, consensus->conclude_cls = conclude_cls; consensus->conclude_cb = conclude; - conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); - conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); - conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); - conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); - conclude_msg->min_group_size = min_group_size_in_consensus; - - qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); - qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; - - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); - - send_next (consensus); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); + GNUNET_MQ_send (consensus->mq, ev); } @@ -523,13 +322,16 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, void GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) { - if (consensus->client != NULL) + if (NULL != consensus->mq) + { + GNUNET_MQ_destroy (consensus->mq); + consensus->mq = NULL; + } + if (NULL != consensus->client) { GNUNET_CLIENT_disconnect (consensus->client); consensus->client = NULL; } - if (NULL != consensus->peers) - GNUNET_free (consensus->peers); GNUNET_free (consensus); }