/*
This file is part of GNUnet.
- Copyright (C) 2012 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2012, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
*/
const struct GNUNET_CONFIGURATION_Handle *cfg;
- /**
- * Client connected to the consensus service, may be NULL if not connected.
- */
- struct GNUNET_CLIENT_Connection *client;
-
/**
* Callback for new elements. Not called for elements added locally.
*/
GNUNET_CONSENSUS_ElementCallback new_element_cb;
/**
- * Closure for new_element_cb
+ * Closure for @e new_element_cb
*/
void *new_element_cls;
struct GNUNET_HashCode session_id;
/**
- * GNUNES_YES iff the join message has been sent to the service.
+ * #GNUNET_YES iff the join message has been sent to the service.
*/
int joined;
GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
/**
- * Closure for the conclude callback.
+ * Closure for the @e conclude_cb callback.
*/
void *conclude_cls;
struct GNUNET_MQ_Handle *mq;
};
+
/**
* FIXME: this should not bee necessary when the API
* issue has been fixed
* Called when the server has sent is a new element
*
* @param cls consensus handle
- * @param mh element message
+ * @param msg element message
+ */
+static int
+check_new_element (void *cls,
+ const struct GNUNET_CONSENSUS_ElementMessage *msg)
+{
+ /* any size is fine, elements are variable-size */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Called when the server has sent is a new element
+ *
+ * @param cls consensus handle
+ * @param msg element message
*/
static void
handle_new_element (void *cls,
- const struct GNUNET_MessageHeader *mh)
+ const struct GNUNET_CONSENSUS_ElementMessage *msg)
{
struct GNUNET_CONSENSUS_Handle *consensus = cls;
- const struct GNUNET_CONSENSUS_ElementMessage *msg
- = (const struct GNUNET_CONSENSUS_ElementMessage *) mh;
struct GNUNET_SET_Element element;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"received new element\n");
-
element.element_type = msg->element_type;
element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
element.data = &msg[1];
-
- consensus->new_element_cb (consensus->new_element_cls, &element);
+ consensus->new_element_cb (consensus->new_element_cls,
+ &element);
}
const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_CONSENSUS_Handle *consensus = cls;
-
GNUNET_CONSENSUS_ConcludeCallback cc;
GNUNET_MQ_destroy (consensus->mq);
consensus->mq = NULL;
-
- GNUNET_CLIENT_disconnect (consensus->client);
- consensus->client = NULL;
-
-
GNUNET_assert (NULL != (cc = consensus->conclude_cb));
consensus->conclude_cb = NULL;
cc (consensus->conclude_cls);
* @param error error code
*/
static void
-mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
{
- LOG (GNUNET_ERROR_TYPE_WARNING, "consensus service disconnected us\n");
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "consensus service disconnected us\n");
}
GNUNET_CONSENSUS_ElementCallback new_element_cb,
void *new_element_cls)
{
- struct GNUNET_CONSENSUS_Handle *consensus;
+ struct GNUNET_CONSENSUS_Handle *consensus
+ = GNUNET_new (struct GNUNET_CONSENSUS_Handle);
+ struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+ GNUNET_MQ_hd_var_size (new_element,
+ GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT,
+ struct GNUNET_CONSENSUS_ElementMessage,
+ consensus),
+ GNUNET_MQ_hd_fixed_size (conclude_done,
+ GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE,
+ struct GNUNET_MessageHeader,
+ consensus),
+ GNUNET_MQ_handler_end ()
+ };
struct GNUNET_CONSENSUS_JoinMessage *join_msg;
struct GNUNET_MQ_Envelope *ev;
- const static struct GNUNET_MQ_MessageHandler mq_handlers[] = {
- {handle_new_element,
- GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0},
- {handle_conclude_done,
- GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0},
- GNUNET_MQ_HANDLERS_END
- };
- consensus = GNUNET_new (struct GNUNET_CONSENSUS_Handle);
consensus->cfg = cfg;
consensus->new_element_cb = new_element_cb;
consensus->new_element_cls = new_element_cls;
consensus->session_id = *session_id;
- consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
- consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
- mq_handlers, mq_error_handler, consensus);
-
- GNUNET_assert (consensus->client != NULL);
-
+ consensus->mq = GNUNET_CLIENT_connect (cfg,
+ "consensus",
+ mq_handlers,
+ &mq_error_handler,
+ consensus);
+ if (NULL == consensus->mq)
+ {
+ GNUNET_free (consensus);
+ return NULL;
+ }
ev = GNUNET_MQ_msg_extra (join_msg,
(num_peers * sizeof (struct GNUNET_PeerIdentity)),
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
join_msg->start = GNUNET_TIME_absolute_hton (start);
join_msg->deadline = GNUNET_TIME_absolute_hton (deadline);
join_msg->num_peers = htonl (num_peers);
- memcpy(&join_msg[1],
+ GNUNET_memcpy(&join_msg[1],
peers,
num_peers * sizeof (struct GNUNET_PeerIdentity));
ev = GNUNET_MQ_msg_extra (element_msg, element->size,
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
- memcpy (&element_msg[1], element->data, element->size);
+ GNUNET_memcpy (&element_msg[1], element->data, element->size);
if (NULL != idc)
{
GNUNET_MQ_destroy (consensus->mq);
consensus->mq = NULL;
}
- if (NULL != consensus->client)
- {
- GNUNET_CLIENT_disconnect (consensus->client);
- consensus->client = NULL;
- }
GNUNET_free (consensus);
}
+/* end of consensus_api.c */