/**
* @file consensus/consensus_api.c
- * @brief
+ * @brief
* @author Florian Dold
*/
#include "platform.h"
*/
struct GNUNET_HashCode session_id;
- /**
- * Number of peers in the consensus. Optionally includes the local peer.
- */
- int num_peers;
-
- /**
- * Peer identities of peers participating in the consensus, includes the local peer.
- */
- struct GNUNET_PeerIdentity **peers;
-
/**
* GNUNES_YES iff the join message has been sent to the service.
*/
/**
* Called when the server has sent is a new element
- *
+ *
* @param cls consensus handle
* @param mh element message
*/
/**
* Called when the server has announced
* that the conclusion is over.
- *
+ *
* @param cls consensus handle
* @param msg conclude done message
*/
GNUNET_CONSENSUS_ConcludeCallback cc;
+ GNUNET_MQ_destroy (consensus->mq);
+ consensus->mq = NULL;
+
+ GNUNET_CLIENT_disconnect (consensus->client);
+ consensus->client = NULL;
+
+
GNUNET_assert (NULL != (cc = consensus->conclude_cb));
consensus->conclude_cb = NULL;
cc (consensus->conclude_cls);
}
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure, same closure as for the message handlers
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
+{
+ LOG (GNUNET_ERROR_TYPE_WARNING, "consensus service disconnected us\n");
+}
+
+
/**
* Create a consensus session.
*
* Inclusion of the local peer is optional.
* @param session_id session identifier
* Allows a group of peers to have more than consensus session.
+ * @param start start time of the consensus, conclude should be called before
+ * the start time.
+ * @param deadline time when the consensus should have concluded
* @param new_element_cb callback, called when a new element is added to the set by
* another peer
* @param new_element_cls closure for new_element
unsigned int num_peers,
const struct GNUNET_PeerIdentity *peers,
const struct GNUNET_HashCode *session_id,
+ struct GNUNET_TIME_Absolute start,
+ struct GNUNET_TIME_Absolute deadline,
GNUNET_CONSENSUS_ElementCallback new_element_cb,
void *new_element_cls)
{
GNUNET_MQ_HANDLERS_END
};
- consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
+ consensus = GNUNET_new (struct GNUNET_CONSENSUS_Handle);
consensus->cfg = cfg;
consensus->new_element_cb = new_element_cb;
consensus->new_element_cls = new_element_cls;
- consensus->num_peers = num_peers;
consensus->session_id = *session_id;
-
- if (0 == num_peers)
- consensus->peers = NULL;
- else if (num_peers > 0)
- consensus->peers =
- GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
-
consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
- mq_handlers, consensus);
+ mq_handlers, mq_error_handler, consensus);
GNUNET_assert (consensus->client != NULL);
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
join_msg->session_id = consensus->session_id;
- join_msg->num_peers = htonl (consensus->num_peers);
+ join_msg->start = GNUNET_TIME_absolute_hton (start);
+ join_msg->deadline = GNUNET_TIME_absolute_hton (deadline);
+ join_msg->num_peers = htonl (num_peers);
memcpy(&join_msg[1],
- consensus->peers,
- consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
+ peers,
+ num_peers * sizeof (struct GNUNET_PeerIdentity));
GNUNET_MQ_send (consensus->mq, ev);
return consensus;
*
* @param consensus handle for the consensus session
* @param element the element to be inserted
- * @param idc function called when we are done with this element and it
+ * @param idc function called when we are done with this element and it
* is thus allowed to call GNUNET_CONSENSUS_insert again
* @param idc_cls closure for 'idc'
*/
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
memcpy (&element_msg[1], element->data, element->size);
-
+
if (NULL != idc)
{
i = GNUNET_new (struct InsertDoneInfo);
i->cls = idc_cls;
GNUNET_MQ_notify_sent (ev, idc_adapter, i);
}
+ GNUNET_MQ_send (consensus->mq, ev);
}
* inserted by the client.
*
* @param consensus consensus session
- * @param timeout timeout after which the conculde callback
+ * @param deadline deadline after which the conculde callback
* must be called
* @param conclude called when the conclusion was successful
* @param conclude_cls closure for the conclude callback
*/
void
GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
- struct GNUNET_TIME_Relative timeout,
GNUNET_CONSENSUS_ConcludeCallback conclude,
void *conclude_cls)
{
struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
GNUNET_assert (NULL != conclude);
GNUNET_assert (NULL == consensus->conclude_cb);
consensus->conclude_cls = conclude_cls;
consensus->conclude_cb = conclude;
- ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
- conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
-
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
GNUNET_MQ_send (consensus->mq, ev);
}
void
GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
{
- if (consensus->client != NULL)
+ if (NULL != consensus->mq)
+ {
+ GNUNET_MQ_destroy (consensus->mq);
+ consensus->mq = NULL;
+ }
+ if (NULL != consensus->client)
{
GNUNET_CLIENT_disconnect (consensus->client);
consensus->client = NULL;
}
- if (NULL != consensus->peers)
- GNUNET_free (consensus->peers);
GNUNET_free (consensus);
}