/*
This file is part of GNUnet.
- (C) 2012 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2012, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
/**
* @file consensus/consensus_api.c
- * @brief
+ * @brief
* @author Florian Dold
*/
#include "platform.h"
+#include "gnunet_util_lib.h"
#include "gnunet_protocols.h"
#include "gnunet_client_lib.h"
#include "gnunet_consensus_service.h"
*/
const struct GNUNET_CONFIGURATION_Handle *cfg;
- /**
- * Socket (if available).
- */
- struct GNUNET_CLIENT_Connection *client;
-
/**
* Callback for new elements. Not called for elements added locally.
*/
- GNUNET_CONSENSUS_NewElementCallback new_element_cb;
+ GNUNET_CONSENSUS_ElementCallback new_element_cb;
/**
- * Closure for new_element_cb
+ * Closure for @e new_element_cb
*/
void *new_element_cls;
/**
- * Session identifier for the consensus session.
+ * The (local) session identifier for the consensus session.
*/
struct GNUNET_HashCode session_id;
/**
- * Number of peers in the consensus. Optionally includes the local peer.
- */
- int num_peers;
-
- /**
- * Peer identities of peers in the consensus. Optionally includes the local peer.
- */
- struct GNUNET_PeerIdentity *peers;
-
- /**
- * Currently active transmit request.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
-
- /**
- * GNUNES_YES iff the join message has been sent to the service.
+ * #GNUNET_YES iff the join message has been sent to the service.
*/
int joined;
- /**
- * Called when the current insertion operation finishes.
- * NULL if there is no insert operation active.
- */
- GNUNET_CONSENSUS_InsertDoneCallback idc;
-
- /**
- * Closure for the insert done callback.
- */
- void *idc_cls;
-
- /**
- * An element that was requested to be inserted.
- */
- struct GNUNET_CONSENSUS_Element *insert_element;
-
/**
* Called when the conclude operation finishes or fails.
*/
GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
/**
- * Closure for the conclude callback.
+ * Closure for the @e conclude_cb callback.
*/
void *conclude_cls;
* Deadline for the conclude operation.
*/
struct GNUNET_TIME_Absolute conclude_deadline;
-};
-
-
-static void
-handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
- struct GNUNET_CONSENSUS_ElementMessage *msg)
-{
- struct GNUNET_CONSENSUS_Element element;
- element.type = msg->element_type;
- element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
- element.data = &msg[1];
- consensus->new_element_cb(consensus->new_element_cls, &element);
-}
-
-static void
-handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
- struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
-{
- GNUNET_assert (NULL != consensus->conclude_cb);
- consensus->conclude_cb(consensus->conclude_cls,
- msg->num_peers,
- (struct GNUNET_PeerIdentity *) &msg[1]);
- consensus->conclude_cb = NULL;
-}
+ /**
+ * Message queue for the client.
+ */
+ struct GNUNET_MQ_Handle *mq;
+};
/**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
+ * FIXME: this should not bee necessary when the API
+ * issue has been fixed
*/
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+struct InsertDoneInfo
{
- struct GNUNET_CONSENSUS_Handle *consensus = cls;
-
- if (msg == NULL)
- {
- /* Error, timeout, death */
- GNUNET_CLIENT_disconnect (consensus->client);
- consensus->client = NULL;
- consensus->new_element_cb(NULL, NULL);
- if (NULL != consensus->idc)
- {
- consensus->idc(consensus->idc_cls, GNUNET_NO);
- consensus->idc = NULL;
- consensus->idc_cls = NULL;
- }
- return;
- }
-
- switch (ntohs(msg->type))
- {
- case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
- handle_new_element(consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
- break;
- case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
- handle_conclude_done(consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
- break;
- default:
- LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
- }
- GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
- GNUNET_TIME_UNIT_FOREVER_REL);
-}
-
-
+ GNUNET_CONSENSUS_InsertDoneCallback idc;
+ void *cls;
+};
/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
+ * Called when the server has sent is a new element
*
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param cls consensus handle
+ * @param msg element message
*/
-static size_t
-transmit_insert (void *cls, size_t size, void *buf)
+static int
+check_new_element (void *cls,
+ const struct GNUNET_CONSENSUS_ElementMessage *msg)
{
- struct GNUNET_CONSENSUS_ElementMessage *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
- GNUNET_CONSENSUS_InsertDoneCallback idc;
- int msize;
- void *idc_cls;
-
- GNUNET_assert (NULL != buf);
-
- consensus = cls;
-
- GNUNET_assert (NULL != consensus->insert_element);
-
- consensus->th = NULL;
-
- msg = buf;
-
- msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
- consensus->insert_element->size;
-
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
- msg->header.size = htons (msize);
- memcpy(&msg[1],
- consensus->insert_element->data,
- consensus->insert_element->size);
-
-
- idc = consensus->idc;
- consensus->idc = NULL;
- idc_cls = consensus->idc_cls;
- consensus->idc_cls = NULL;
- idc(idc_cls, GNUNET_YES);
-
- return msize;
+ /* any size is fine, elements are variable-size */
+ return GNUNET_OK;
}
/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
+ * Called when the server has sent is a new element
*
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param cls consensus handle
+ * @param msg element message
*/
-static size_t
-transmit_join (void *cls, size_t size, void *buf)
+static void
+handle_new_element (void *cls,
+ const struct GNUNET_CONSENSUS_ElementMessage *msg)
{
- struct GNUNET_CONSENSUS_JoinMessage *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
- int msize;
-
- LOG(GNUNET_ERROR_TYPE_DEBUG, "transmitting CLIENT_JOIN to service\n");
-
- GNUNET_assert (NULL != buf);
-
- consensus = cls;
- consensus->th = NULL;
- consensus->joined = 1;
-
- msg = buf;
-
- msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
- consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
-
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
- msg->header.size = htons (msize);
- msg->session_id = consensus->session_id;
- msg->num_peers = htons (consensus->num_peers);
- memcpy(&msg[1],
- consensus->peers,
- consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
-
- if (consensus->insert_element != NULL)
- {
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_insert, consensus);
- }
-
+ struct GNUNET_CONSENSUS_Handle *consensus = cls;
+ struct GNUNET_SET_Element element;
- GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
- GNUNET_TIME_UNIT_FOREVER_REL);
-
- return msize;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "received new element\n");
+ element.element_type = msg->element_type;
+ element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
+ element.data = &msg[1];
+ consensus->new_element_cb (consensus->new_element_cls,
+ &element);
}
/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
+ * Called when the server has announced
+ * that the conclusion is over.
*
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param cls consensus handle
+ * @param msg conclude done message
*/
-static size_t
-transmit_conclude (void *cls, size_t size, void *buf)
+static void
+handle_conclude_done (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_CONSENSUS_ConcludeMessage *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
- int msize;
-
- GNUNET_assert (NULL != buf);
-
- consensus = cls;
- consensus->th = NULL;
-
- msg = buf;
-
- msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
-
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
- msg->header.size = htons (msize);
- msg->timeout =
- GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
+ struct GNUNET_CONSENSUS_Handle *consensus = cls;
+ GNUNET_CONSENSUS_ConcludeCallback cc;
- return msize;
+ GNUNET_MQ_destroy (consensus->mq);
+ consensus->mq = NULL;
+ GNUNET_assert (NULL != (cc = consensus->conclude_cb));
+ consensus->conclude_cb = NULL;
+ cc (consensus->conclude_cls);
}
/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
*
- * @param cls the consensus handle
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param cls closure, same closure as for the message handlers
+ * @param error error code
*/
-static size_t
-transmit_begin (void *cls, size_t size, void *buf)
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
{
- struct GNUNET_MessageHeader *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
- int msize;
-
- GNUNET_assert (NULL != buf);
-
- consensus = cls;
- consensus->th = NULL;
-
- msg = buf;
-
- msize = sizeof (struct GNUNET_MessageHeader);
-
- msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
- msg->size = htons (msize);
-
- return msize;
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "consensus service disconnected us\n");
}
/**
* Create a consensus session.
*
- * @param cfg
- * @param num_peers
+ * @param cfg configuration to use for connecting to the consensus service
+ * @param num_peers number of peers in the peers array
* @param peers array of peers participating in this consensus session
* Inclusion of the local peer is optional.
* @param session_id session identifier
* Allows a group of peers to have more than consensus session.
+ * @param start start time of the consensus, conclude should be called before
+ * the start time.
+ * @param deadline time when the consensus should have concluded
* @param new_element_cb callback, called when a new element is added to the set by
* another peer
* @param new_element_cls closure for new_element
unsigned int num_peers,
const struct GNUNET_PeerIdentity *peers,
const struct GNUNET_HashCode *session_id,
- GNUNET_CONSENSUS_NewElementCallback new_element_cb,
+ struct GNUNET_TIME_Absolute start,
+ struct GNUNET_TIME_Absolute deadline,
+ GNUNET_CONSENSUS_ElementCallback new_element_cb,
void *new_element_cls)
{
- struct GNUNET_CONSENSUS_Handle *consensus;
- size_t join_message_size;
-
+ GNUNET_MQ_hd_var_size (new_element,
+ GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT,
+ struct GNUNET_CONSENSUS_ElementMessage);
+ GNUNET_MQ_hd_fixed_size (conclude_done,
+ GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE,
+ struct GNUNET_MessageHeader);
+ struct GNUNET_CONSENSUS_Handle *consensus
+ = GNUNET_new (struct GNUNET_CONSENSUS_Handle);
+ struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+ make_new_element_handler (consensus),
+ make_conclude_done_handler (consensus),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_CONSENSUS_JoinMessage *join_msg;
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_CLIENT_Connection *client;
- consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
consensus->cfg = cfg;
consensus->new_element_cb = new_element_cb;
consensus->new_element_cls = new_element_cls;
- consensus->num_peers = num_peers;
consensus->session_id = *session_id;
-
-
-
- if (0 == num_peers)
+ client = GNUNET_CLIENT_connect ("consensus", cfg);
+ if (NULL == client)
{
- consensus->peers = NULL;
+ GNUNET_free (consensus);
+ return NULL;
}
- else if (num_peers > 0)
- {
-
- consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
- }
- else
- {
- GNUNET_break (0);
- }
-
-
- consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
-
- GNUNET_assert (consensus->client != NULL);
-
- join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
- (num_peers * sizeof (struct GNUNET_PeerIdentity));
-
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- join_message_size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_join, consensus);
-
-
- GNUNET_assert (consensus->th != NULL);
-
+ consensus->mq = GNUNET_MQ_queue_for_connection_client (client,
+ mq_handlers,
+ &mq_error_handler,
+ consensus);
+ ev = GNUNET_MQ_msg_extra (join_msg,
+ (num_peers * sizeof (struct GNUNET_PeerIdentity)),
+ GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
+
+ join_msg->session_id = consensus->session_id;
+ join_msg->start = GNUNET_TIME_absolute_hton (start);
+ join_msg->deadline = GNUNET_TIME_absolute_hton (deadline);
+ join_msg->num_peers = htonl (num_peers);
+ memcpy(&join_msg[1],
+ peers,
+ num_peers * sizeof (struct GNUNET_PeerIdentity));
+
+ GNUNET_MQ_send (consensus->mq, ev);
return consensus;
}
+static void
+idc_adapter (void *cls)
+{
+ struct InsertDoneInfo *i = cls;
+ i->idc (i->cls, GNUNET_OK);
+ GNUNET_free (i);
+}
/**
* Insert an element in the set being reconsiled. Must not be called after
*
* @param consensus handle for the consensus session
* @param element the element to be inserted
- * @param idc function called when we are done with this element and it
+ * @param idc function called when we are done with this element and it
* is thus allowed to call GNUNET_CONSENSUS_insert again
* @param idc_cls closure for 'idc'
*/
void
GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
- const struct GNUNET_CONSENSUS_Element *element,
+ const struct GNUNET_SET_Element *element,
GNUNET_CONSENSUS_InsertDoneCallback idc,
void *idc_cls)
{
+ struct GNUNET_CONSENSUS_ElementMessage *element_msg;
+ struct GNUNET_MQ_Envelope *ev;
+ struct InsertDoneInfo *i;
- GNUNET_assert (NULL == consensus->idc);
- GNUNET_assert (NULL == consensus->insert_element);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
- consensus->idc = idc;
- consensus->idc_cls = idc_cls;
- consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
+ ev = GNUNET_MQ_msg_extra (element_msg, element->size,
+ GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
- if (consensus->joined == 0)
+ memcpy (&element_msg[1], element->data, element->size);
+
+ if (NULL != idc)
{
- GNUNET_assert (NULL != consensus->th);
- return;
+ i = GNUNET_new (struct InsertDoneInfo);
+ i->idc = idc;
+ i->cls = idc_cls;
+ GNUNET_MQ_notify_sent (ev, idc_adapter, i);
}
-
- GNUNET_assert (NULL == consensus->th);
-
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_insert, consensus);
-}
-
-
-/**
- * Begin reconciling elements with other peers.
- *
- * @param consensus handle for the consensus session
- */
-void
-GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
-{
- GNUNET_assert (NULL == consensus->idc);
- GNUNET_assert (NULL == consensus->insert_element);
-
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- sizeof (struct GNUNET_MessageHeader),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_begin, consensus);
+ GNUNET_MQ_send (consensus->mq, ev);
}
/**
- * We are finished inserting new elements into the consensus;
+ * We are done with inserting new elements into the consensus;
* try to conclude the consensus within a given time window.
+ * After conclude has been called, no further elements may be
+ * inserted by the client.
*
* @param consensus consensus session
- * @param timeout timeout after which the conculde callback
+ * @param deadline deadline after which the conculde callback
* must be called
* @param conclude called when the conclusion was successful
* @param conclude_cls closure for the conclude callback
*/
void
GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
- struct GNUNET_TIME_Relative timeout,
GNUNET_CONSENSUS_ConcludeCallback conclude,
void *conclude_cls)
{
- GNUNET_assert (NULL == consensus->th);
+ struct GNUNET_MQ_Envelope *ev;
+
+ GNUNET_assert (NULL != conclude);
GNUNET_assert (NULL == consensus->conclude_cb);
consensus->conclude_cls = conclude_cls;
consensus->conclude_cb = conclude;
- consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
-
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
- timeout,
- GNUNET_NO, &transmit_conclude, consensus);
- if (NULL == consensus->th)
- {
- conclude(conclude_cls, 0, NULL);
- }
+
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
+ GNUNET_MQ_send (consensus->mq, ev);
}
void
GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
{
- if (consensus->client != NULL)
+ if (NULL != consensus->mq)
{
- GNUNET_CLIENT_disconnect (consensus->client);
- consensus->client = NULL;
+ GNUNET_MQ_destroy (consensus->mq);
+ consensus->mq = NULL;
}
- GNUNET_free (consensus->peers);
GNUNET_free (consensus);
}
+/* end of consensus_api.c */