* @author Florian Dold
*/
#include "platform.h"
+#include "gnunet_util_lib.h"
#include "gnunet_protocols.h"
#include "gnunet_client_lib.h"
#include "gnunet_consensus_service.h"
#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
+/**
+ * Actions that can be queued.
+ */
+struct QueuedMessage
+{
+ /**
+ * Queued messages are stored in a doubly linked list.
+ */
+ struct QueuedMessage *next;
+
+ /**
+ * Queued messages are stored in a doubly linked list.
+ */
+ struct QueuedMessage *prev;
+
+ /**
+ * The actual queued message.
+ */
+ struct GNUNET_MessageHeader *msg;
+
+ /**
+ * Will be called after transmit, if not NULL
+ */
+ GNUNET_CONSENSUS_InsertDoneCallback idc;
+
+ /**
+ * Closure for idc
+ */
+ void *idc_cls;
+};
+
/**
* Handle for the service.
const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Socket (if available).
+ * Client connected to the consensus service, may be NULL if not connected.
*/
struct GNUNET_CLIENT_Connection *client;
/**
* Callback for new elements. Not called for elements added locally.
*/
- GNUNET_CONSENSUS_NewElementCallback new_element_cb;
+ GNUNET_CONSENSUS_ElementCallback new_element_cb;
/**
* Closure for new_element_cb
void *new_element_cls;
/**
- * Session identifier for the consensus session.
+ * The (local) session identifier for the consensus session.
*/
struct GNUNET_HashCode session_id;
int num_peers;
/**
- * Peer identities of peers in the consensus. Optionally includes the local peer.
+ * Peer identities of peers participating in the consensus, includes the local peer.
*/
- struct GNUNET_PeerIdentity *peers;
+ struct GNUNET_PeerIdentity **peers;
/**
* Currently active transmit request.
*/
int joined;
- /**
- * Called when the current insertion operation finishes.
- * NULL if there is no insert operation active.
- */
- GNUNET_CONSENSUS_InsertDoneCallback idc;
-
/**
* Closure for the insert done callback.
*/
void *idc_cls;
- /**
- * An element that was requested to be inserted.
- */
- struct GNUNET_CONSENSUS_Element *insert_element;
-
/**
* Called when the conclude operation finishes or fails.
*/
* Deadline for the conclude operation.
*/
struct GNUNET_TIME_Absolute conclude_deadline;
+
+ unsigned int conclude_min_size;
+
+ struct QueuedMessage *messages_head;
+ struct QueuedMessage *messages_tail;
};
+
+/**
+ * Schedule transmitting the next message.
+ *
+ * @param consensus consensus handle
+ */
static void
-handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
+send_next (struct GNUNET_CONSENSUS_Handle *consensus);
+
+
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_queued (void *cls, size_t size,
+ void *buf)
+{
+ struct GNUNET_CONSENSUS_Handle *consensus;
+ struct QueuedMessage *qmsg;
+ size_t msg_size;
+
+ consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
+ consensus->th = NULL;
+
+ qmsg = consensus->messages_head;
+ GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
+
+ if (NULL == buf)
+ {
+ if (NULL != qmsg->idc)
+ {
+ qmsg->idc (qmsg->idc_cls, GNUNET_YES);
+ }
+ return 0;
+ }
+
+ msg_size = ntohs (qmsg->msg->size);
+
+ GNUNET_assert (size >= msg_size);
+
+ memcpy (buf, qmsg->msg, msg_size);
+ if (NULL != qmsg->idc)
+ {
+ qmsg->idc (qmsg->idc_cls, GNUNET_YES);
+ }
+
+ /* FIXME: free the messages */
+
+ send_next (consensus);
+
+ return msg_size;
+}
+
+
+/**
+ * Schedule transmitting the next message.
+ *
+ * @param consensus consensus handle
+ */
+static void
+send_next (struct GNUNET_CONSENSUS_Handle *consensus)
+{
+ if (NULL != consensus->th)
+ return;
+
+ if (NULL != consensus->messages_head)
+ {
+ consensus->th =
+ GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_NO, &transmit_queued, consensus);
+ }
+}
+
+static void
+queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg)
+{
+ struct QueuedMessage *qm;
+ qm = GNUNET_malloc (sizeof *qm);
+ qm->msg = msg;
+ GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm);
+}
+
+
+/**
+ * Called when the server has sent is a new element
+ *
+ * @param consensus consensus handle
+ * @param msg element message
+ */
+static void
+handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
struct GNUNET_CONSENSUS_ElementMessage *msg)
{
struct GNUNET_CONSENSUS_Element element;
+ struct GNUNET_CONSENSUS_AckMessage *ack_msg;
+ int ret;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
+
element.type = msg->element_type;
- element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
+ element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
element.data = &msg[1];
- consensus->new_element_cb(consensus->new_element_cls, &element);
+
+ ret = consensus->new_element_cb (consensus->new_element_cls, &element);
+
+ ack_msg = GNUNET_malloc (sizeof *ack_msg);
+ ack_msg->header.size = htons (sizeof *ack_msg);
+ ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
+ ack_msg->keep = ret;
+
+ queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg);
+
+ send_next (consensus);
}
+
+/**
+ * Called when the server has announced
+ * that the conclusion is over.
+ *
+ * @param consensus consensus handle
+ * @param msg conclude done message
+ */
static void
-handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
+handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
{
GNUNET_assert (NULL != consensus->conclude_cb);
- consensus->conclude_cb(consensus->conclude_cls,
- msg->num_peers,
- (struct GNUNET_PeerIdentity *) &msg[1]);
+ consensus->conclude_cb (consensus->conclude_cls, NULL);
consensus->conclude_cb = NULL;
}
message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_CONSENSUS_Handle *consensus = cls;
- GNUNET_CONSENSUS_InsertDoneCallback idc;
- void *idc_cls;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
if (msg == NULL)
{
/* Error, timeout, death */
+ LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
GNUNET_CLIENT_disconnect (consensus->client);
consensus->client = NULL;
- consensus->new_element_cb(NULL, NULL);
- if (NULL != consensus->idc)
- {
- consensus->idc(consensus->idc_cls, GNUNET_NO);
- consensus->idc = NULL;
- consensus->idc_cls = NULL;
- }
+ consensus->new_element_cb (NULL, NULL);
return;
}
- switch (ntohs(msg->type))
+ switch (ntohs (msg->type))
{
- case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT_ACK:
- idc = consensus->idc;
- consensus->idc = NULL;
- idc_cls = consensus->idc_cls;
- consensus->idc_cls = NULL;
- idc(idc_cls, GNUNET_YES);
- break;
case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
- handle_new_element(consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
+ handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
break;
case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
- handle_conclude_done(consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
+ handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
break;
default:
- LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
+ GNUNET_break (0);
}
GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
GNUNET_TIME_UNIT_FOREVER_REL);
}
-
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_insert (void *cls, size_t size, void *buf)
-{
- struct GNUNET_CONSENSUS_ElementMessage *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
- int msize;
-
- GNUNET_assert (NULL != buf);
-
- consensus = cls;
-
- GNUNET_assert (NULL != consensus->insert_element);
-
- consensus->th = NULL;
-
-
- msg = buf;
-
- msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
- consensus->insert_element->size;
-
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
- msg->header.size = htons (msize);
- memcpy(&msg[1],
- consensus->insert_element->data,
- consensus->insert_element->size);
-
- return msize;
-}
-
-
/**
* Function called to notify a client about the connection
* begin ready to queue more data. "buf" will be
struct GNUNET_CONSENSUS_Handle *consensus;
int msize;
- LOG(GNUNET_ERROR_TYPE_DEBUG, "transmitting CLIENT_JOIN to service\n");
-
GNUNET_assert (NULL != buf);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
+
consensus = cls;
consensus->th = NULL;
consensus->joined = 1;
msg->header.size = htons (msize);
msg->session_id = consensus->session_id;
msg->num_peers = htons (consensus->num_peers);
- memcpy(&msg[1],
- consensus->peers,
- consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
-
- if (consensus->insert_element != NULL)
- {
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_insert, consensus);
- }
+ if (0 != msg->num_peers)
+ memcpy(&msg[1],
+ consensus->peers,
+ consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
+ send_next (consensus);
GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
GNUNET_TIME_UNIT_FOREVER_REL);
return msize;
}
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_conclude (void *cls, size_t size, void *buf)
-{
- struct GNUNET_CONSENSUS_ConcludeMessage *msg;
- struct GNUNET_CONSENSUS_Handle *consensus;
- int msize;
-
- GNUNET_assert (NULL != buf);
-
- consensus = cls;
- consensus->th = NULL;
-
- msg = buf;
-
- msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
-
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
- msg->header.size = htons (msize);
- msg->timeout =
- GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
-
- return msize;
-}
-
-
-
-
/**
* Create a consensus session.
*
- * @param cfg
- * @param num_peers
+ * @param cfg configuration to use for connecting to the consensus service
+ * @param num_peers number of peers in the peers array
* @param peers array of peers participating in this consensus session
* Inclusion of the local peer is optional.
* @param session_id session identifier
* Allows a group of peers to have more than consensus session.
- * @param num_initial_elements number of entries in the 'initial_elements' array
- * @param initial_elements our elements for the consensus (each of 'element_size'
- * @param new_element callback, called when a new element is added to the set by
+ * @param new_element_cb callback, called when a new element is added to the set by
* another peer
* @param new_element_cls closure for new_element
* @return handle to use, NULL on error
unsigned int num_peers,
const struct GNUNET_PeerIdentity *peers,
const struct GNUNET_HashCode *session_id,
- /*
- unsigned int num_initial_elements,
- const struct GNUNET_CONSENSUS_Element **initial_elements,
- */
- GNUNET_CONSENSUS_NewElementCallback new_element,
+ GNUNET_CONSENSUS_ElementCallback new_element_cb,
void *new_element_cls)
{
struct GNUNET_CONSENSUS_Handle *consensus;
size_t join_message_size;
-
consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
consensus->cfg = cfg;
- consensus->new_element_cb = new_element;
+ consensus->new_element_cb = new_element_cb;
consensus->new_element_cls = new_element_cls;
consensus->num_peers = num_peers;
consensus->session_id = *session_id;
-
-
if (0 == num_peers)
- {
consensus->peers = NULL;
- }
else if (num_peers > 0)
- {
-
- consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
- }
- else
- {
- GNUNET_break (0);
- }
-
+ consensus->peers =
+ GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
GNUNET_assert (consensus->th != NULL);
-
return consensus;
}
GNUNET_CONSENSUS_InsertDoneCallback idc,
void *idc_cls)
{
+ struct QueuedMessage *qmsg;
+ struct GNUNET_CONSENSUS_ElementMessage *element_msg;
+ size_t element_msg_size;
- GNUNET_assert (NULL == consensus->idc);
- GNUNET_assert (NULL == consensus->insert_element);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
- consensus->idc = idc;
- consensus->idc_cls = idc_cls;
- consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
+ element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
+ element->size);
- if (consensus->joined == 0)
- {
- GNUNET_assert (NULL != consensus->th);
- return;
- }
+ element_msg = GNUNET_malloc (element_msg_size);
+ element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
+ element_msg->header.size = htons (element_msg_size);
+ memcpy (&element_msg[1], element->data, element->size);
- GNUNET_assert (NULL == consensus->th);
+ qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
+ qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
+ qmsg->idc = idc;
+ qmsg->idc_cls = idc_cls;
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO, &transmit_insert, consensus);
+ GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
+
+ send_next (consensus);
}
/**
- * We are finished inserting new elements into the consensus;
+ * We are done with inserting new elements into the consensus;
* try to conclude the consensus within a given time window.
+ * After conclude has been called, no further elements may be
+ * inserted by the client.
*
* @param consensus consensus session
* @param timeout timeout after which the conculde callback
void
GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
struct GNUNET_TIME_Relative timeout,
+ unsigned int min_group_size_in_consensus,
GNUNET_CONSENSUS_ConcludeCallback conclude,
void *conclude_cls)
{
- GNUNET_assert (NULL == consensus->th);
+ struct QueuedMessage *qmsg;
+ struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
+
+ GNUNET_assert (NULL != conclude);
GNUNET_assert (NULL == consensus->conclude_cb);
consensus->conclude_cls = conclude_cls;
consensus->conclude_cb = conclude;
- consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
- consensus->th =
- GNUNET_CLIENT_notify_transmit_ready (consensus->client,
- sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
- timeout,
- GNUNET_NO, &transmit_conclude, consensus);
- if (NULL == consensus->th)
- {
- conclude(conclude_cls, 0, NULL);
- }
+ conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
+ conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
+ conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
+ conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
+ conclude_msg->min_group_size = min_group_size_in_consensus;
+
+ qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
+ qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
+
+ GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
+
+ send_next (consensus);
}
GNUNET_CLIENT_disconnect (consensus->client);
consensus->client = NULL;
}
- GNUNET_free (consensus->peers);
+ if (NULL != consensus->peers)
+ GNUNET_free (consensus->peers);
GNUNET_free (consensus);
}