2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file consensus/consensus_api.c
24 * @author Florian Dold
27 #include "gnunet_protocols.h"
28 #include "gnunet_client_lib.h"
29 #include "gnunet_consensus_service.h"
30 #include "consensus.h"
33 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
37 * Handle for the service.
39 struct GNUNET_CONSENSUS_Handle
42 * Configuration to use.
44 const struct GNUNET_CONFIGURATION_Handle *cfg;
47 * Socket (if available).
49 struct GNUNET_CLIENT_Connection *client;
52 * Callback for new elements. Not called for elements added locally.
54 GNUNET_CONSENSUS_NewElementCallback new_element_cb;
57 * Closure for new_element_cb
59 void *new_element_cls;
62 * Session identifier for the consensus session.
64 struct GNUNET_HashCode session_id;
67 * Number of peers in the consensus. Optionally includes the local peer.
72 * Peer identities of peers in the consensus. Optionally includes the local peer.
74 struct GNUNET_PeerIdentity *peers;
77 * Currently active transmit request.
79 struct GNUNET_CLIENT_TransmitHandle *th;
82 * GNUNES_YES iff the join message has been sent to the service.
87 * Called when the current insertion operation finishes.
88 * NULL if there is no insert operation active.
90 GNUNET_CONSENSUS_InsertDoneCallback idc;
93 * Closure for the insert done callback.
98 * An element that was requested to be inserted.
100 struct GNUNET_CONSENSUS_Element *insert_element;
103 * Called when the conclude operation finishes or fails.
105 GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
108 * Closure for the conclude callback.
113 * Deadline for the conclude operation.
115 struct GNUNET_TIME_Absolute conclude_deadline;
120 handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
121 struct GNUNET_CONSENSUS_ElementMessage *msg)
123 struct GNUNET_CONSENSUS_Element element;
124 element.type = msg->element_type;
125 element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
126 element.data = &msg[1];
127 consensus->new_element_cb(consensus->new_element_cls, &element);
131 handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
132 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
134 GNUNET_assert (NULL != consensus->conclude_cb);
135 consensus->conclude_cb(consensus->conclude_cls,
137 (struct GNUNET_PeerIdentity *) &msg[1]);
138 consensus->conclude_cb = NULL;
144 * Type of a function to call when we receive a message
148 * @param msg message received, NULL on timeout or fatal error
151 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
153 struct GNUNET_CONSENSUS_Handle *consensus = cls;
154 GNUNET_CONSENSUS_InsertDoneCallback idc;
159 /* Error, timeout, death */
160 GNUNET_CLIENT_disconnect (consensus->client);
161 consensus->client = NULL;
162 consensus->new_element_cb(NULL, NULL);
163 if (NULL != consensus->idc)
165 consensus->idc(consensus->idc_cls, GNUNET_NO);
166 consensus->idc = NULL;
167 consensus->idc_cls = NULL;
172 switch (ntohs(msg->type))
174 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT_ACK:
175 idc = consensus->idc;
176 consensus->idc = NULL;
177 idc_cls = consensus->idc_cls;
178 consensus->idc_cls = NULL;
179 idc(idc_cls, GNUNET_YES);
181 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
182 handle_new_element(consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
184 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
185 handle_conclude_done(consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
188 LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
190 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
191 GNUNET_TIME_UNIT_FOREVER_REL);
198 * Function called to notify a client about the connection
199 * begin ready to queue more data. "buf" will be
200 * NULL and "size" zero if the connection was closed for
201 * writing in the meantime.
204 * @param size number of bytes available in buf
205 * @param buf where the callee should write the message
206 * @return number of bytes written to buf
209 transmit_insert (void *cls, size_t size, void *buf)
211 struct GNUNET_CONSENSUS_ElementMessage *msg;
212 struct GNUNET_CONSENSUS_Handle *consensus;
215 GNUNET_assert (NULL != buf);
219 GNUNET_assert (NULL != consensus->insert_element);
221 consensus->th = NULL;
226 msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
227 consensus->insert_element->size;
229 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
230 msg->header.size = htons (msize);
232 consensus->insert_element->data,
233 consensus->insert_element->size);
240 * Function called to notify a client about the connection
241 * begin ready to queue more data. "buf" will be
242 * NULL and "size" zero if the connection was closed for
243 * writing in the meantime.
246 * @param size number of bytes available in buf
247 * @param buf where the callee should write the message
248 * @return number of bytes written to buf
251 transmit_join (void *cls, size_t size, void *buf)
253 struct GNUNET_CONSENSUS_JoinMessage *msg;
254 struct GNUNET_CONSENSUS_Handle *consensus;
257 LOG(GNUNET_ERROR_TYPE_DEBUG, "transmitting CLIENT_JOIN to service\n");
259 GNUNET_assert (NULL != buf);
262 consensus->th = NULL;
263 consensus->joined = 1;
267 msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
268 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
270 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
271 msg->header.size = htons (msize);
272 msg->session_id = consensus->session_id;
273 msg->num_peers = htons (consensus->num_peers);
276 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
278 if (consensus->insert_element != NULL)
281 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
283 GNUNET_TIME_UNIT_FOREVER_REL,
284 GNUNET_NO, &transmit_insert, consensus);
288 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
289 GNUNET_TIME_UNIT_FOREVER_REL);
296 * Function called to notify a client about the connection
297 * begin ready to queue more data. "buf" will be
298 * NULL and "size" zero if the connection was closed for
299 * writing in the meantime.
302 * @param size number of bytes available in buf
303 * @param buf where the callee should write the message
304 * @return number of bytes written to buf
307 transmit_conclude (void *cls, size_t size, void *buf)
309 struct GNUNET_CONSENSUS_ConcludeMessage *msg;
310 struct GNUNET_CONSENSUS_Handle *consensus;
313 GNUNET_assert (NULL != buf);
316 consensus->th = NULL;
320 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
322 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
323 msg->header.size = htons (msize);
325 GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
334 * Create a consensus session.
338 * @param peers array of peers participating in this consensus session
339 * Inclusion of the local peer is optional.
340 * @param session_id session identifier
341 * Allows a group of peers to have more than consensus session.
342 * @param num_initial_elements number of entries in the 'initial_elements' array
343 * @param initial_elements our elements for the consensus (each of 'element_size'
344 * @param new_element callback, called when a new element is added to the set by
346 * @param new_element_cls closure for new_element
347 * @return handle to use, NULL on error
349 struct GNUNET_CONSENSUS_Handle *
350 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
351 unsigned int num_peers,
352 const struct GNUNET_PeerIdentity *peers,
353 const struct GNUNET_HashCode *session_id,
355 unsigned int num_initial_elements,
356 const struct GNUNET_CONSENSUS_Element **initial_elements,
358 GNUNET_CONSENSUS_NewElementCallback new_element,
359 void *new_element_cls)
361 struct GNUNET_CONSENSUS_Handle *consensus;
362 size_t join_message_size;
365 consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
366 consensus->cfg = cfg;
367 consensus->new_element_cb = new_element;
368 consensus->new_element_cls = new_element_cls;
369 consensus->num_peers = num_peers;
370 consensus->session_id = *session_id;
376 consensus->peers = NULL;
378 else if (num_peers > 0)
381 consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
389 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
391 GNUNET_assert (consensus->client != NULL);
393 join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
394 (num_peers * sizeof (struct GNUNET_PeerIdentity));
397 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
399 GNUNET_TIME_UNIT_FOREVER_REL,
400 GNUNET_NO, &transmit_join, consensus);
403 GNUNET_assert (consensus->th != NULL);
411 * Insert an element in the set being reconsiled. Must not be called after
412 * "GNUNET_CONSENSUS_conclude".
414 * @param consensus handle for the consensus session
415 * @param element the element to be inserted
416 * @param idc function called when we are done with this element and it
417 * is thus allowed to call GNUNET_CONSENSUS_insert again
418 * @param idc_cls closure for 'idc'
421 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
422 const struct GNUNET_CONSENSUS_Element *element,
423 GNUNET_CONSENSUS_InsertDoneCallback idc,
427 GNUNET_assert (NULL == consensus->idc);
428 GNUNET_assert (NULL == consensus->insert_element);
430 consensus->idc = idc;
431 consensus->idc_cls = idc_cls;
432 consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
434 if (consensus->joined == 0)
436 GNUNET_assert (NULL != consensus->th);
440 GNUNET_assert (NULL == consensus->th);
443 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
444 element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage),
445 GNUNET_TIME_UNIT_FOREVER_REL,
446 GNUNET_NO, &transmit_insert, consensus);
451 * We are finished inserting new elements into the consensus;
452 * try to conclude the consensus within a given time window.
454 * @param consensus consensus session
455 * @param timeout timeout after which the conculde callback
457 * @param conclude called when the conclusion was successful
458 * @param conclude_cls closure for the conclude callback
461 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
462 struct GNUNET_TIME_Relative timeout,
463 GNUNET_CONSENSUS_ConcludeCallback conclude,
466 GNUNET_assert (NULL == consensus->th);
467 GNUNET_assert (NULL == consensus->conclude_cb);
469 consensus->conclude_cls = conclude_cls;
470 consensus->conclude_cb = conclude;
471 consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
474 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
475 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
477 GNUNET_NO, &transmit_conclude, consensus);
478 if (NULL == consensus->th)
480 conclude(conclude_cls, 0, NULL);
486 * Destroy a consensus handle (free all state associated with
487 * it, no longer call any of the callbacks).
489 * @param consensus handle to destroy
492 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
494 if (consensus->client != NULL)
496 GNUNET_CLIENT_disconnect (consensus->client);
497 consensus->client = NULL;
499 GNUNET_free (consensus->peers);
500 GNUNET_free (consensus);