X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fconsensus%2Fconsensus_api.c;h=30ab9f36128714601c5301e75e211d3616f51e49;hb=1cfcc6a13f75ff69ec6ab851a471f1939cb5f295;hp=7690dc059406f57e01fcf4fe4e789d373a1ae32b;hpb=6b8400966a5e6c2194785b3a33f91b748cfa7b7b;p=oweals%2Fgnunet.git diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 7690dc059..30ab9f361 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2012 Christian Grothoff (and other contributing authors) + Copyright (C) 2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -20,7 +20,7 @@ /** * @file consensus/consensus_api.c - * @brief + * @brief * @author Florian Dold */ #include "platform.h" @@ -64,16 +64,6 @@ struct GNUNET_CONSENSUS_Handle */ struct GNUNET_HashCode session_id; - /** - * Number of peers in the consensus. Optionally includes the local peer. - */ - int num_peers; - - /** - * Peer identities of peers participating in the consensus, includes the local peer. - */ - struct GNUNET_PeerIdentity **peers; - /** * GNUNES_YES iff the join message has been sent to the service. */ @@ -113,7 +103,7 @@ struct InsertDoneInfo /** * Called when the server has sent is a new element - * + * * @param cls consensus handle * @param mh element message */ @@ -126,9 +116,10 @@ handle_new_element (void *cls, = (const struct GNUNET_CONSENSUS_ElementMessage *) mh; struct GNUNET_SET_Element element; - LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "received new element\n"); - element.type = msg->element_type; + element.element_type = msg->element_type; element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); element.data = &msg[1]; @@ -139,7 +130,7 @@ handle_new_element (void *cls, /** * Called when the server has announced * that the conclusion is over. - * + * * @param cls consensus handle * @param msg conclude done message */ @@ -151,12 +142,35 @@ handle_conclude_done (void *cls, GNUNET_CONSENSUS_ConcludeCallback cc; + GNUNET_MQ_destroy (consensus->mq); + consensus->mq = NULL; + + GNUNET_CLIENT_disconnect (consensus->client); + consensus->client = NULL; + + GNUNET_assert (NULL != (cc = consensus->conclude_cb)); consensus->conclude_cb = NULL; cc (consensus->conclude_cls); } +/** + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure, same closure as for the message handlers + * @param error error code + */ +static void +mq_error_handler (void *cls, enum GNUNET_MQ_Error error) +{ + LOG (GNUNET_ERROR_TYPE_WARNING, "consensus service disconnected us\n"); +} + + /** * Create a consensus session. * @@ -166,6 +180,9 @@ handle_conclude_done (void *cls, * Inclusion of the local peer is optional. * @param session_id session identifier * Allows a group of peers to have more than consensus session. + * @param start start time of the consensus, conclude should be called before + * the start time. + * @param deadline time when the consensus should have concluded * @param new_element_cb callback, called when a new element is added to the set by * another peer * @param new_element_cls closure for new_element @@ -176,6 +193,8 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, unsigned int num_peers, const struct GNUNET_PeerIdentity *peers, const struct GNUNET_HashCode *session_id, + struct GNUNET_TIME_Absolute start, + struct GNUNET_TIME_Absolute deadline, GNUNET_CONSENSUS_ElementCallback new_element_cb, void *new_element_cls) { @@ -190,22 +209,14 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_MQ_HANDLERS_END }; - consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); + consensus = GNUNET_new (struct GNUNET_CONSENSUS_Handle); consensus->cfg = cfg; consensus->new_element_cb = new_element_cb; consensus->new_element_cls = new_element_cls; - consensus->num_peers = num_peers; consensus->session_id = *session_id; - - if (0 == num_peers) - consensus->peers = NULL; - else if (num_peers > 0) - consensus->peers = - GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); - consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, - mq_handlers, NULL, consensus); + mq_handlers, mq_error_handler, consensus); GNUNET_assert (consensus->client != NULL); @@ -214,10 +225,12 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); join_msg->session_id = consensus->session_id; - join_msg->num_peers = htonl (consensus->num_peers); + join_msg->start = GNUNET_TIME_absolute_hton (start); + join_msg->deadline = GNUNET_TIME_absolute_hton (deadline); + join_msg->num_peers = htonl (num_peers); memcpy(&join_msg[1], - consensus->peers, - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); + peers, + num_peers * sizeof (struct GNUNET_PeerIdentity)); GNUNET_MQ_send (consensus->mq, ev); return consensus; @@ -238,7 +251,7 @@ idc_adapter (void *cls) * * @param consensus handle for the consensus session * @param element the element to be inserted - * @param idc function called when we are done with this element and it + * @param idc function called when we are done with this element and it * is thus allowed to call GNUNET_CONSENSUS_insert again * @param idc_cls closure for 'idc' */ @@ -258,7 +271,7 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); memcpy (&element_msg[1], element->data, element->size); - + if (NULL != idc) { i = GNUNET_new (struct InsertDoneInfo); @@ -266,6 +279,7 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, i->cls = idc_cls; GNUNET_MQ_notify_sent (ev, idc_adapter, i); } + GNUNET_MQ_send (consensus->mq, ev); } @@ -276,19 +290,17 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, * inserted by the client. * * @param consensus consensus session - * @param timeout timeout after which the conculde callback + * @param deadline deadline after which the conculde callback * must be called * @param conclude called when the conclusion was successful * @param conclude_cls closure for the conclude callback */ void GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_TIME_Relative timeout, GNUNET_CONSENSUS_ConcludeCallback conclude, void *conclude_cls) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; GNUNET_assert (NULL != conclude); GNUNET_assert (NULL == consensus->conclude_cb); @@ -296,9 +308,7 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, consensus->conclude_cls = conclude_cls; consensus->conclude_cb = conclude; - ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); - conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); - + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); GNUNET_MQ_send (consensus->mq, ev); } @@ -312,13 +322,16 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, void GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) { - if (consensus->client != NULL) + if (NULL != consensus->mq) + { + GNUNET_MQ_destroy (consensus->mq); + consensus->mq = NULL; + } + if (NULL != consensus->client) { GNUNET_CLIENT_disconnect (consensus->client); consensus->client = NULL; } - if (NULL != consensus->peers) - GNUNET_free (consensus->peers); GNUNET_free (consensus); }