message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_CONSENSUS_Handle *consensus = cls;
- GNUNET_CONSENSUS_InsertDoneCallback idc;
- void *idc_cls;
if (msg == NULL)
{
switch (ntohs(msg->type))
{
- case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT_ACK:
- idc = consensus->idc;
- consensus->idc = NULL;
- idc_cls = consensus->idc_cls;
- consensus->idc_cls = NULL;
- idc(idc_cls, GNUNET_YES);
- break;
case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
handle_new_element(consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
break;
{
struct GNUNET_CONSENSUS_ElementMessage *msg;
struct GNUNET_CONSENSUS_Handle *consensus;
+ GNUNET_CONSENSUS_InsertDoneCallback idc;
int msize;
+ void *idc_cls;
GNUNET_assert (NULL != buf);
consensus->th = NULL;
-
msg = buf;
msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
consensus->insert_element->data,
consensus->insert_element->size);
+
+ idc = consensus->idc;
+ consensus->idc = NULL;
+ idc_cls = consensus->idc_cls;
+ consensus->idc_cls = NULL;
+ idc(idc_cls, GNUNET_YES);
+
return msize;
}
}
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls the consensus handle
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_begin (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_MessageHeader *msg;
+ struct GNUNET_CONSENSUS_Handle *consensus;
+ int msize;
+
+ GNUNET_assert (NULL != buf);
+
+ consensus = cls;
+ consensus->th = NULL;
+
+ msg = buf;
+
+ msize = sizeof (struct GNUNET_MessageHeader);
+
+ msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
+ msg->size = htons (msize);
+
+ return msize;
+}
/**
* Inclusion of the local peer is optional.
* @param session_id session identifier
* Allows a group of peers to have more than consensus session.
- * @param num_initial_elements number of entries in the 'initial_elements' array
- * @param initial_elements our elements for the consensus (each of 'element_size'
- * @param new_element callback, called when a new element is added to the set by
+ * @param new_element_cb callback, called when a new element is added to the set by
* another peer
* @param new_element_cls closure for new_element
* @return handle to use, NULL on error
unsigned int num_peers,
const struct GNUNET_PeerIdentity *peers,
const struct GNUNET_HashCode *session_id,
- /*
- unsigned int num_initial_elements,
- const struct GNUNET_CONSENSUS_Element **initial_elements,
- */
- GNUNET_CONSENSUS_NewElementCallback new_element,
+ GNUNET_CONSENSUS_NewElementCallback new_element_cb,
void *new_element_cls)
{
struct GNUNET_CONSENSUS_Handle *consensus;
consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
consensus->cfg = cfg;
- consensus->new_element_cb = new_element;
+ consensus->new_element_cb = new_element_cb;
consensus->new_element_cls = new_element_cls;
consensus->num_peers = num_peers;
consensus->session_id = *session_id;
}
+/**
+ * Begin reconciling elements with other peers.
+ *
+ * @param consensus handle for the consensus session
+ */
+void
+GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
+{
+ GNUNET_assert (NULL == consensus->idc);
+ GNUNET_assert (NULL == consensus->insert_element);
+
+ consensus->th =
+ GNUNET_CLIENT_notify_transmit_ready (consensus->client,
+ sizeof (struct GNUNET_MessageHeader),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_NO, &transmit_begin, consensus);
+}
+
+
/**
* We are finished inserting new elements into the consensus;
* try to conclude the consensus within a given time window.
/**
* Create a consensus session.
+ * The set being reconciled is initially empty. Only reconcile with other peers
+ * after GNUNET_CONSENSUS_reconcile has been called.
*
* @param cfg
* @param num_peers
* Inclusion of the local peer is optional.
* @param session_id session identifier
* Allows a group of peers to have more than consensus session.
- * @param num_initial_elements number of entries in the 'initial_elements' array
- * @param initial_elements our elements for the consensus (each of 'element_size'
- * @param new_element callback, called when a new element is added to the set by
+ * @param new_element_cb callback, called when a new element is added to the set by
* another peer
* @param new_element_cls closure for new_element
* @return handle to use, NULL on error
unsigned int num_peers,
const struct GNUNET_PeerIdentity *peers,
const struct GNUNET_HashCode *session_id,
- /*
- unsigned int num_initial_elements,
- const struct GNUNET_CONSENSUS_Element **initial_elements,
- */
- GNUNET_CONSENSUS_NewElementCallback new_element,
+ GNUNET_CONSENSUS_NewElementCallback new_element_cb,
void *new_element_cls);
/**
- * Insert an element in the set being reconsiled. Must not be called after
- * "GNUNET_CONSENSUS_conclude".
+ * Insert an element in the set being reconsiled. Only transmit changes to
+ * other peers if "GNUNET_CONSENSUS_begin" has been called.
+ * Must not be called after "GNUNET_CONSENSUS_conclude".
*
* @param consensus handle for the consensus session
* @param element the element to be inserted
void *idc_cls);
+/**
+ * Begin reconciling elements with other peers.
+ * May not be called if an insert operation has not yet finished.
+ *
+ * @param consensus handle for the consensus session
+ */
+void
+GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus);
+
+
+
+
/**
* Called when a conclusion was successful.
*