2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file consensus/consensus_api.c
24 * @author Florian Dold
27 #include "gnunet_protocols.h"
28 #include "gnunet_client_lib.h"
29 #include "gnunet_consensus_service.h"
30 #include "consensus.h"
33 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
37 * Handle for the service.
39 struct GNUNET_CONSENSUS_Handle
42 * Configuration to use.
44 const struct GNUNET_CONFIGURATION_Handle *cfg;
47 * Socket (if available).
49 struct GNUNET_CLIENT_Connection *client;
52 * Callback for new elements. Not called for elements added locally.
54 GNUNET_CONSENSUS_NewElementCallback new_element_cb;
57 * Closure for new_element_cb
59 void *new_element_cls;
62 * Session identifier for the consensus session.
64 struct GNUNET_HashCode session_id;
67 * Number of peers in the consensus. Optionally includes the local peer.
72 * Peer identities of peers in the consensus. Optionally includes the local peer.
74 struct GNUNET_PeerIdentity *peers;
77 * Currently active transmit request.
79 struct GNUNET_CLIENT_TransmitHandle *th;
82 * GNUNES_YES iff the join message has been sent to the service.
87 * Called when the current insertion operation finishes.
88 * NULL if there is no insert operation active.
90 GNUNET_CONSENSUS_InsertDoneCallback idc;
93 * Closure for the insert done callback.
98 * An element that was requested to be inserted.
100 struct GNUNET_CONSENSUS_Element *insert_element;
103 * Called when the conclude operation finishes or fails.
105 GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
108 * Closure for the conclude callback.
113 * Deadline for the conclude operation.
115 struct GNUNET_TIME_Absolute conclude_deadline;
120 handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
121 struct GNUNET_CONSENSUS_ElementMessage *msg)
123 struct GNUNET_CONSENSUS_Element element;
124 element.type = msg->element_type;
125 element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
126 element.data = &msg[1];
127 consensus->new_element_cb(consensus->new_element_cls, &element);
131 handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
132 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
134 GNUNET_assert (NULL != consensus->conclude_cb);
135 consensus->conclude_cb(consensus->conclude_cls,
137 (struct GNUNET_PeerIdentity *) &msg[1]);
138 consensus->conclude_cb = NULL;
144 * Type of a function to call when we receive a message
148 * @param msg message received, NULL on timeout or fatal error
151 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
153 struct GNUNET_CONSENSUS_Handle *consensus = cls;
157 /* Error, timeout, death */
158 GNUNET_CLIENT_disconnect (consensus->client);
159 consensus->client = NULL;
160 consensus->new_element_cb(NULL, NULL);
161 if (NULL != consensus->idc)
163 consensus->idc(consensus->idc_cls, GNUNET_NO);
164 consensus->idc = NULL;
165 consensus->idc_cls = NULL;
170 switch (ntohs(msg->type))
172 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
173 handle_new_element(consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
175 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
176 handle_conclude_done(consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
179 LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
181 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
182 GNUNET_TIME_UNIT_FOREVER_REL);
189 * Function called to notify a client about the connection
190 * begin ready to queue more data. "buf" will be
191 * NULL and "size" zero if the connection was closed for
192 * writing in the meantime.
195 * @param size number of bytes available in buf
196 * @param buf where the callee should write the message
197 * @return number of bytes written to buf
200 transmit_insert (void *cls, size_t size, void *buf)
202 struct GNUNET_CONSENSUS_ElementMessage *msg;
203 struct GNUNET_CONSENSUS_Handle *consensus;
204 GNUNET_CONSENSUS_InsertDoneCallback idc;
208 GNUNET_assert (NULL != buf);
212 GNUNET_assert (NULL != consensus->insert_element);
214 consensus->th = NULL;
218 msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
219 consensus->insert_element->size;
221 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
222 msg->header.size = htons (msize);
224 consensus->insert_element->data,
225 consensus->insert_element->size);
228 idc = consensus->idc;
229 consensus->idc = NULL;
230 idc_cls = consensus->idc_cls;
231 consensus->idc_cls = NULL;
232 idc(idc_cls, GNUNET_YES);
239 * Function called to notify a client about the connection
240 * begin ready to queue more data. "buf" will be
241 * NULL and "size" zero if the connection was closed for
242 * writing in the meantime.
245 * @param size number of bytes available in buf
246 * @param buf where the callee should write the message
247 * @return number of bytes written to buf
250 transmit_join (void *cls, size_t size, void *buf)
252 struct GNUNET_CONSENSUS_JoinMessage *msg;
253 struct GNUNET_CONSENSUS_Handle *consensus;
256 LOG(GNUNET_ERROR_TYPE_DEBUG, "transmitting CLIENT_JOIN to service\n");
258 GNUNET_assert (NULL != buf);
261 consensus->th = NULL;
262 consensus->joined = 1;
266 msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
267 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
269 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
270 msg->header.size = htons (msize);
271 msg->session_id = consensus->session_id;
272 msg->num_peers = htons (consensus->num_peers);
275 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
277 if (consensus->insert_element != NULL)
280 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
282 GNUNET_TIME_UNIT_FOREVER_REL,
283 GNUNET_NO, &transmit_insert, consensus);
287 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
288 GNUNET_TIME_UNIT_FOREVER_REL);
295 * Function called to notify a client about the connection
296 * begin ready to queue more data. "buf" will be
297 * NULL and "size" zero if the connection was closed for
298 * writing in the meantime.
301 * @param size number of bytes available in buf
302 * @param buf where the callee should write the message
303 * @return number of bytes written to buf
306 transmit_conclude (void *cls, size_t size, void *buf)
308 struct GNUNET_CONSENSUS_ConcludeMessage *msg;
309 struct GNUNET_CONSENSUS_Handle *consensus;
312 GNUNET_assert (NULL != buf);
315 consensus->th = NULL;
319 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
321 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
322 msg->header.size = htons (msize);
324 GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
331 * Function called to notify a client about the connection
332 * begin ready to queue more data. "buf" will be
333 * NULL and "size" zero if the connection was closed for
334 * writing in the meantime.
336 * @param cls the consensus handle
337 * @param size number of bytes available in buf
338 * @param buf where the callee should write the message
339 * @return number of bytes written to buf
342 transmit_begin (void *cls, size_t size, void *buf)
344 struct GNUNET_MessageHeader *msg;
345 struct GNUNET_CONSENSUS_Handle *consensus;
348 GNUNET_assert (NULL != buf);
351 consensus->th = NULL;
355 msize = sizeof (struct GNUNET_MessageHeader);
357 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
358 msg->size = htons (msize);
365 * Create a consensus session.
369 * @param peers array of peers participating in this consensus session
370 * Inclusion of the local peer is optional.
371 * @param session_id session identifier
372 * Allows a group of peers to have more than consensus session.
373 * @param new_element_cb callback, called when a new element is added to the set by
375 * @param new_element_cls closure for new_element
376 * @return handle to use, NULL on error
378 struct GNUNET_CONSENSUS_Handle *
379 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
380 unsigned int num_peers,
381 const struct GNUNET_PeerIdentity *peers,
382 const struct GNUNET_HashCode *session_id,
383 GNUNET_CONSENSUS_NewElementCallback new_element_cb,
384 void *new_element_cls)
386 struct GNUNET_CONSENSUS_Handle *consensus;
387 size_t join_message_size;
390 consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
391 consensus->cfg = cfg;
392 consensus->new_element_cb = new_element_cb;
393 consensus->new_element_cls = new_element_cls;
394 consensus->num_peers = num_peers;
395 consensus->session_id = *session_id;
401 consensus->peers = NULL;
403 else if (num_peers > 0)
406 consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
414 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
416 GNUNET_assert (consensus->client != NULL);
418 join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
419 (num_peers * sizeof (struct GNUNET_PeerIdentity));
422 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
424 GNUNET_TIME_UNIT_FOREVER_REL,
425 GNUNET_NO, &transmit_join, consensus);
428 GNUNET_assert (consensus->th != NULL);
436 * Insert an element in the set being reconsiled. Must not be called after
437 * "GNUNET_CONSENSUS_conclude".
439 * @param consensus handle for the consensus session
440 * @param element the element to be inserted
441 * @param idc function called when we are done with this element and it
442 * is thus allowed to call GNUNET_CONSENSUS_insert again
443 * @param idc_cls closure for 'idc'
446 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
447 const struct GNUNET_CONSENSUS_Element *element,
448 GNUNET_CONSENSUS_InsertDoneCallback idc,
452 GNUNET_assert (NULL == consensus->idc);
453 GNUNET_assert (NULL == consensus->insert_element);
455 consensus->idc = idc;
456 consensus->idc_cls = idc_cls;
457 consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
459 if (consensus->joined == 0)
461 GNUNET_assert (NULL != consensus->th);
465 GNUNET_assert (NULL == consensus->th);
468 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
469 element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage),
470 GNUNET_TIME_UNIT_FOREVER_REL,
471 GNUNET_NO, &transmit_insert, consensus);
476 * Begin reconciling elements with other peers.
478 * @param consensus handle for the consensus session
481 GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
483 GNUNET_assert (NULL == consensus->idc);
484 GNUNET_assert (NULL == consensus->insert_element);
487 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
488 sizeof (struct GNUNET_MessageHeader),
489 GNUNET_TIME_UNIT_FOREVER_REL,
490 GNUNET_NO, &transmit_begin, consensus);
495 * We are finished inserting new elements into the consensus;
496 * try to conclude the consensus within a given time window.
498 * @param consensus consensus session
499 * @param timeout timeout after which the conculde callback
501 * @param conclude called when the conclusion was successful
502 * @param conclude_cls closure for the conclude callback
505 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
506 struct GNUNET_TIME_Relative timeout,
507 GNUNET_CONSENSUS_ConcludeCallback conclude,
510 GNUNET_assert (NULL == consensus->th);
511 GNUNET_assert (NULL == consensus->conclude_cb);
513 consensus->conclude_cls = conclude_cls;
514 consensus->conclude_cb = conclude;
515 consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
518 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
519 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
521 GNUNET_NO, &transmit_conclude, consensus);
522 if (NULL == consensus->th)
524 conclude(conclude_cls, 0, NULL);
530 * Destroy a consensus handle (free all state associated with
531 * it, no longer call any of the callbacks).
533 * @param consensus handle to destroy
536 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
538 if (consensus->client != NULL)
540 GNUNET_CLIENT_disconnect (consensus->client);
541 consensus->client = NULL;
543 GNUNET_free (consensus->peers);
544 GNUNET_free (consensus);