2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file consensus/consensus_api.c
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
38 struct ElementAck *next;
39 struct ElementAck *prev;
41 struct GNUNET_CONSENSUS_Element *element;
45 * Handle for the service.
47 struct GNUNET_CONSENSUS_Handle
50 * Configuration to use.
52 const struct GNUNET_CONFIGURATION_Handle *cfg;
55 * Socket (if available).
57 struct GNUNET_CLIENT_Connection *client;
60 * Callback for new elements. Not called for elements added locally.
62 GNUNET_CONSENSUS_NewElementCallback new_element_cb;
65 * Closure for new_element_cb
67 void *new_element_cls;
70 * Session identifier for the consensus session.
72 struct GNUNET_HashCode session_id;
75 * Number of peers in the consensus. Optionally includes the local peer.
80 * Peer identities of peers in the consensus. Optionally includes the local peer.
82 struct GNUNET_PeerIdentity *peers;
85 * Currently active transmit request.
87 struct GNUNET_CLIENT_TransmitHandle *th;
90 * GNUNES_YES iff the join message has been sent to the service.
95 * Called when the current insertion operation finishes.
96 * NULL if there is no insert operation active.
98 GNUNET_CONSENSUS_InsertDoneCallback idc;
101 * Closure for the insert done callback.
106 * An element that was requested to be inserted.
108 struct GNUNET_CONSENSUS_Element *insert_element;
111 * Called when the conclude operation finishes or fails.
113 GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
116 * Closure for the conclude callback.
121 * Deadline for the conclude operation.
123 struct GNUNET_TIME_Absolute conclude_deadline;
125 struct ElementAck *ack_head;
126 struct ElementAck *ack_tail;
129 * Set to GNUNET_YES if the begin message has been transmitted to the service
134 * Set to GNUNET_YES it the begin message should be transmitted to the service
141 transmit_ack (void *cls, size_t size, void *buf);
144 transmit_insert (void *cls, size_t size, void *buf);
147 transmit_conclude (void *cls, size_t size, void *buf);
150 transmit_begin (void *cls, size_t size, void *buf);
154 * Call notify_transmit_ready for ack if necessary and possible.
157 ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus)
159 if ((NULL == consensus->th) && (NULL != consensus->ack_head))
162 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
163 sizeof (struct GNUNET_CONSENSUS_AckMessage),
164 GNUNET_TIME_UNIT_FOREVER_REL,
165 GNUNET_NO, &transmit_ack, consensus);
171 * Call notify_transmit_ready for ack if necessary and possible.
174 ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus)
176 if ((NULL == consensus->th) && (NULL != consensus->insert_element))
179 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
180 sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
181 consensus->insert_element->size,
182 GNUNET_TIME_UNIT_FOREVER_REL,
183 GNUNET_NO, &transmit_insert, consensus);
189 * Call notify_transmit_ready for ack if necessary and possible.
192 ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus)
194 if ((NULL == consensus->th) && (NULL != consensus->conclude_cb))
197 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
198 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
199 GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline),
200 GNUNET_NO, &transmit_conclude, consensus);
206 * Call notify_transmit_ready for ack if necessary and possible.
209 ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus)
211 if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) &&
212 (GNUNET_NO == consensus->begin_sent))
215 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
216 sizeof (struct GNUNET_MessageHeader),
217 GNUNET_TIME_UNIT_FOREVER_REL,
218 GNUNET_NO, &transmit_begin, consensus);
223 * Called when the server has sent is a new element
225 * @param consensus consensus handle
226 * @param msg element message
229 handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
230 struct GNUNET_CONSENSUS_ElementMessage *msg)
232 struct GNUNET_CONSENSUS_Element element;
233 struct ElementAck *ack;
236 element.type = msg->element_type;
237 element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
238 element.data = &msg[1];
240 ret = consensus->new_element_cb (consensus->new_element_cls, &element);
241 ack = GNUNET_malloc (sizeof (struct ElementAck));
243 GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack);
250 * Called when the server has announced
251 * that the conclusion is over.
253 * @param consensus consensus handle
254 * @param msg conclude done message
257 handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
258 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
260 GNUNET_assert (NULL != consensus->conclude_cb);
261 consensus->conclude_cb(consensus->conclude_cls,
263 (struct GNUNET_PeerIdentity *) &msg[1]);
264 consensus->conclude_cb = NULL;
270 * Type of a function to call when we receive a message
274 * @param msg message received, NULL on timeout or fatal error
277 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
279 struct GNUNET_CONSENSUS_Handle *consensus = cls;
281 LOG (GNUNET_ERROR_TYPE_INFO, "received message from consensus service\n");
285 /* Error, timeout, death */
286 LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
287 GNUNET_CLIENT_disconnect (consensus->client);
288 consensus->client = NULL;
289 consensus->new_element_cb (NULL, NULL);
290 if (NULL != consensus->idc)
292 consensus->idc(consensus->idc_cls, GNUNET_NO);
293 consensus->idc = NULL;
294 consensus->idc_cls = NULL;
299 switch (ntohs (msg->type))
301 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
302 handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
304 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
305 handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
308 LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
310 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
311 GNUNET_TIME_UNIT_FOREVER_REL);
318 * Function called to notify a client about the connection
319 * begin ready to queue more data. "buf" will be
320 * NULL and "size" zero if the connection was closed for
321 * writing in the meantime.
324 * @param size number of bytes available in buf
325 * @param buf where the callee should write the message
326 * @return number of bytes written to buf
329 transmit_ack (void *cls, size_t size, void *buf)
331 struct GNUNET_CONSENSUS_AckMessage *msg;
332 struct GNUNET_CONSENSUS_Handle *consensus;
334 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
336 GNUNET_assert (NULL != consensus->ack_head);
338 msg = (struct GNUNET_CONSENSUS_AckMessage *) buf;
339 msg->keep = consensus->ack_head->keep;
340 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
341 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage));
343 consensus->ack_head = consensus->ack_head->next;
345 consensus->th = NULL;
347 ntr_insert (consensus);
349 ntr_conclude (consensus);
351 return sizeof (struct GNUNET_CONSENSUS_AckMessage);
355 * Function called to notify a client about the connection
356 * begin ready to queue more data. "buf" will be
357 * NULL and "size" zero if the connection was closed for
358 * writing in the meantime.
361 * @param size number of bytes available in buf
362 * @param buf where the callee should write the message
363 * @return number of bytes written to buf
366 transmit_insert (void *cls, size_t size, void *buf)
368 struct GNUNET_CONSENSUS_ElementMessage *msg;
369 struct GNUNET_CONSENSUS_Handle *consensus;
370 GNUNET_CONSENSUS_InsertDoneCallback idc;
374 GNUNET_assert (NULL != buf);
378 GNUNET_assert (NULL != consensus->insert_element);
380 consensus->th = NULL;
384 msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
385 consensus->insert_element->size;
387 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
388 msg->header.size = htons (msize);
390 consensus->insert_element->data,
391 consensus->insert_element->size);
393 consensus->insert_element = NULL;
395 idc = consensus->idc;
396 consensus->idc = NULL;
397 idc_cls = consensus->idc_cls;
398 consensus->idc_cls = NULL;
399 idc (idc_cls, GNUNET_YES);
403 ntr_insert (consensus);
404 ntr_conclude (consensus);
411 * Function called to notify a client about the connection
412 * begin ready to queue more data. "buf" will be
413 * NULL and "size" zero if the connection was closed for
414 * writing in the meantime.
417 * @param size number of bytes available in buf
418 * @param buf where the callee should write the message
419 * @return number of bytes written to buf
422 transmit_join (void *cls, size_t size, void *buf)
424 struct GNUNET_CONSENSUS_JoinMessage *msg;
425 struct GNUNET_CONSENSUS_Handle *consensus;
428 GNUNET_assert (NULL != buf);
430 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
433 consensus->th = NULL;
434 consensus->joined = 1;
438 msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
439 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
441 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
442 msg->header.size = htons (msize);
443 msg->session_id = consensus->session_id;
444 msg->num_peers = htons (consensus->num_peers);
445 if (0 != msg->num_peers)
448 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
450 ntr_insert (consensus);
451 ntr_begin (consensus);
452 ntr_conclude (consensus);
454 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
455 GNUNET_TIME_UNIT_FOREVER_REL);
462 * Function called to notify a client about the connection
463 * begin ready to queue more data. "buf" will be
464 * NULL and "size" zero if the connection was closed for
465 * writing in the meantime.
468 * @param size number of bytes available in buf
469 * @param buf where the callee should write the message
470 * @return number of bytes written to buf
473 transmit_conclude (void *cls, size_t size, void *buf)
475 struct GNUNET_CONSENSUS_ConcludeMessage *msg;
476 struct GNUNET_CONSENSUS_Handle *consensus;
479 GNUNET_assert (NULL != buf);
482 consensus->th = NULL;
486 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
488 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
489 msg->header.size = htons (msize);
491 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
500 * Function called to notify a client about the connection
501 * begin ready to queue more data. "buf" will be
502 * NULL and "size" zero if the connection was closed for
503 * writing in the meantime.
505 * @param cls the consensus handle
506 * @param size number of bytes available in buf
507 * @param buf where the callee should write the message
508 * @return number of bytes written to buf
511 transmit_begin (void *cls, size_t size, void *buf)
513 struct GNUNET_MessageHeader *msg;
514 struct GNUNET_CONSENSUS_Handle *consensus;
517 GNUNET_assert (NULL != buf);
520 consensus->th = NULL;
524 msize = sizeof (struct GNUNET_MessageHeader);
526 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
527 msg->size = htons (msize);
530 ntr_insert (consensus);
531 ntr_conclude (consensus);
538 * Create a consensus session.
542 * @param peers array of peers participating in this consensus session
543 * Inclusion of the local peer is optional.
544 * @param session_id session identifier
545 * Allows a group of peers to have more than consensus session.
546 * @param new_element_cb callback, called when a new element is added to the set by
548 * @param new_element_cls closure for new_element
549 * @return handle to use, NULL on error
551 struct GNUNET_CONSENSUS_Handle *
552 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
553 unsigned int num_peers,
554 const struct GNUNET_PeerIdentity *peers,
555 const struct GNUNET_HashCode *session_id,
556 GNUNET_CONSENSUS_NewElementCallback new_element_cb,
557 void *new_element_cls)
559 struct GNUNET_CONSENSUS_Handle *consensus;
560 size_t join_message_size;
562 consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
563 consensus->cfg = cfg;
564 consensus->new_element_cb = new_element_cb;
565 consensus->new_element_cls = new_element_cls;
566 consensus->num_peers = num_peers;
567 consensus->session_id = *session_id;
571 consensus->peers = NULL;
573 else if (num_peers > 0)
575 consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
582 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
584 GNUNET_assert (consensus->client != NULL);
586 join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
587 (num_peers * sizeof (struct GNUNET_PeerIdentity));
590 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
592 GNUNET_TIME_UNIT_FOREVER_REL,
593 GNUNET_NO, &transmit_join, consensus);
596 GNUNET_assert (consensus->th != NULL);
603 * Insert an element in the set being reconsiled. Must not be called after
604 * "GNUNET_CONSENSUS_conclude".
606 * @param consensus handle for the consensus session
607 * @param element the element to be inserted
608 * @param idc function called when we are done with this element and it
609 * is thus allowed to call GNUNET_CONSENSUS_insert again
610 * @param idc_cls closure for 'idc'
613 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
614 const struct GNUNET_CONSENSUS_Element *element,
615 GNUNET_CONSENSUS_InsertDoneCallback idc,
618 GNUNET_assert (NULL == consensus->idc);
619 GNUNET_assert (NULL == consensus->insert_element);
620 GNUNET_assert (NULL == consensus->conclude_cb);
622 consensus->idc = idc;
623 consensus->idc_cls = idc_cls;
624 consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
626 if (consensus->joined == 0)
631 ntr_insert (consensus);
636 * Begin reconciling elements with other peers.
638 * @param consensus handle for the consensus session
641 GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
643 GNUNET_assert (NULL == consensus->idc);
644 GNUNET_assert (NULL == consensus->insert_element);
645 GNUNET_assert (GNUNET_NO == consensus->begin_requested);
646 GNUNET_assert (GNUNET_NO == consensus->begin_sent);
648 consensus->begin_requested = GNUNET_YES;
650 ntr_begin (consensus);
655 * We are finished inserting new elements into the consensus;
656 * try to conclude the consensus within a given time window.
658 * @param consensus consensus session
659 * @param timeout timeout after which the conculde callback
661 * @param conclude called when the conclusion was successful
662 * @param conclude_cls closure for the conclude callback
665 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
666 struct GNUNET_TIME_Relative timeout,
667 GNUNET_CONSENSUS_ConcludeCallback conclude,
670 GNUNET_assert (NULL != conclude);
671 GNUNET_assert (NULL == consensus->conclude_cb);
673 consensus->conclude_cls = conclude_cls;
674 consensus->conclude_cb = conclude;
675 consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
678 /* if transmitting the conclude message is not possible right now, transmit_join
679 * or transmit_ack will handle it */
680 ntr_conclude (consensus);
685 * Destroy a consensus handle (free all state associated with
686 * it, no longer call any of the callbacks).
688 * @param consensus handle to destroy
691 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
693 if (consensus->client != NULL)
695 GNUNET_CLIENT_disconnect (consensus->client);
696 consensus->client = NULL;
698 if (NULL != consensus->peers)
699 GNUNET_free (consensus->peers);
700 GNUNET_free (consensus);