2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file consensus/consensus_api.c
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
37 * Actions that can be queued.
42 * Queued messages are stored in a doubly linked list.
44 struct QueuedMessage *next;
47 * Queued messages are stored in a doubly linked list.
49 struct QueuedMessage *prev;
52 * The actual queued message.
54 struct GNUNET_MessageHeader *msg;
57 * Will be called after transmit, if not NULL
59 GNUNET_CONSENSUS_InsertDoneCallback idc;
69 * Handle for the service.
71 struct GNUNET_CONSENSUS_Handle
74 * Configuration to use.
76 const struct GNUNET_CONFIGURATION_Handle *cfg;
79 * Client connected to the consensus service, may be NULL if not connected.
81 struct GNUNET_CLIENT_Connection *client;
84 * Callback for new elements. Not called for elements added locally.
86 GNUNET_CONSENSUS_ElementCallback new_element_cb;
89 * Closure for new_element_cb
91 void *new_element_cls;
94 * The (local) session identifier for the consensus session.
96 struct GNUNET_HashCode session_id;
99 * Number of peers in the consensus. Optionally includes the local peer.
104 * Peer identities of peers participating in the consensus, includes the local peer.
106 struct GNUNET_PeerIdentity **peers;
109 * Currently active transmit request.
111 struct GNUNET_CLIENT_TransmitHandle *th;
114 * GNUNES_YES iff the join message has been sent to the service.
119 * Closure for the insert done callback.
124 * Called when the conclude operation finishes or fails.
126 GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
129 * Closure for the conclude callback.
134 * Deadline for the conclude operation.
136 struct GNUNET_TIME_Absolute conclude_deadline;
138 unsigned int conclude_min_size;
140 struct QueuedMessage *messages_head;
141 struct QueuedMessage *messages_tail;
144 * GNUNET_YES when currently in a section where destroy may not be
153 * Schedule transmitting the next message.
155 * @param consensus consensus handle
158 send_next (struct GNUNET_CONSENSUS_Handle *consensus);
162 * Function called to notify a client about the connection
163 * begin ready to queue more data. "buf" will be
164 * NULL and "size" zero if the connection was closed for
165 * writing in the meantime.
168 * @param size number of bytes available in buf
169 * @param buf where the callee should write the message
170 * @return number of bytes written to buf
173 transmit_queued (void *cls, size_t size,
176 struct GNUNET_CONSENSUS_Handle *consensus;
177 struct QueuedMessage *qmsg;
180 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
181 consensus->th = NULL;
183 qmsg = consensus->messages_head;
184 GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
188 if (NULL != qmsg->idc)
190 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
195 msg_size = ntohs (qmsg->msg->size);
197 GNUNET_assert (size >= msg_size);
199 memcpy (buf, qmsg->msg, msg_size);
200 if (NULL != qmsg->idc)
202 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
205 /* FIXME: free the messages */
207 send_next (consensus);
214 * Schedule transmitting the next message.
216 * @param consensus consensus handle
219 send_next (struct GNUNET_CONSENSUS_Handle *consensus)
221 if (NULL != consensus->th)
224 if (NULL != consensus->messages_head)
227 GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
228 GNUNET_TIME_UNIT_FOREVER_REL,
229 GNUNET_NO, &transmit_queued, consensus);
234 queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg)
236 struct QueuedMessage *qm;
237 qm = GNUNET_malloc (sizeof *qm);
239 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm);
244 * Called when the server has sent is a new element
246 * @param consensus consensus handle
247 * @param msg element message
250 handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
251 struct GNUNET_CONSENSUS_ElementMessage *msg)
253 struct GNUNET_CONSENSUS_Element element;
254 struct GNUNET_CONSENSUS_AckMessage *ack_msg;
257 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
259 element.type = msg->element_type;
260 element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
261 element.data = &msg[1];
263 ret = consensus->new_element_cb (consensus->new_element_cls, &element);
265 ack_msg = GNUNET_malloc (sizeof *ack_msg);
266 ack_msg->header.size = htons (sizeof *ack_msg);
267 ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
270 queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg);
272 send_next (consensus);
277 * Called when the server has announced
278 * that the conclusion is over.
280 * @param consensus consensus handle
281 * @param msg conclude done message
284 handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
285 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
287 GNUNET_assert (NULL != consensus->conclude_cb);
288 consensus->may_not_destroy = GNUNET_YES;
289 consensus->conclude_cb (consensus->conclude_cls, NULL);
290 consensus->may_not_destroy = GNUNET_NO;
291 consensus->conclude_cb = NULL;
297 * Type of a function to call when we receive a message
301 * @param msg message received, NULL on timeout or fatal error
304 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
306 struct GNUNET_CONSENSUS_Handle *consensus = cls;
308 LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
312 /* Error, timeout, death */
313 LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
314 GNUNET_CLIENT_disconnect (consensus->client);
315 consensus->client = NULL;
316 consensus->new_element_cb (NULL, NULL);
320 switch (ntohs (msg->type))
322 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
323 handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
325 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
326 handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
331 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
332 GNUNET_TIME_UNIT_FOREVER_REL);
336 * Function called to notify a client about the connection
337 * begin ready to queue more data. "buf" will be
338 * NULL and "size" zero if the connection was closed for
339 * writing in the meantime.
342 * @param size number of bytes available in buf
343 * @param buf where the callee should write the message
344 * @return number of bytes written to buf
347 transmit_join (void *cls, size_t size, void *buf)
349 struct GNUNET_CONSENSUS_JoinMessage *msg;
350 struct GNUNET_CONSENSUS_Handle *consensus;
353 GNUNET_assert (NULL != buf);
355 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
358 consensus->th = NULL;
359 consensus->joined = 1;
363 msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
364 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
366 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
367 msg->header.size = htons (msize);
368 msg->session_id = consensus->session_id;
369 msg->num_peers = htons (consensus->num_peers);
370 if (0 != msg->num_peers)
373 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
375 send_next (consensus);
377 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
378 GNUNET_TIME_UNIT_FOREVER_REL);
384 * Create a consensus session.
386 * @param cfg configuration to use for connecting to the consensus service
387 * @param num_peers number of peers in the peers array
388 * @param peers array of peers participating in this consensus session
389 * Inclusion of the local peer is optional.
390 * @param session_id session identifier
391 * Allows a group of peers to have more than consensus session.
392 * @param new_element_cb callback, called when a new element is added to the set by
394 * @param new_element_cls closure for new_element
395 * @return handle to use, NULL on error
397 struct GNUNET_CONSENSUS_Handle *
398 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
399 unsigned int num_peers,
400 const struct GNUNET_PeerIdentity *peers,
401 const struct GNUNET_HashCode *session_id,
402 GNUNET_CONSENSUS_ElementCallback new_element_cb,
403 void *new_element_cls)
405 struct GNUNET_CONSENSUS_Handle *consensus;
406 size_t join_message_size;
408 consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
409 consensus->cfg = cfg;
410 consensus->new_element_cb = new_element_cb;
411 consensus->new_element_cls = new_element_cls;
412 consensus->num_peers = num_peers;
413 consensus->session_id = *session_id;
416 consensus->peers = NULL;
417 else if (num_peers > 0)
419 GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
421 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
423 GNUNET_assert (consensus->client != NULL);
425 join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
426 (num_peers * sizeof (struct GNUNET_PeerIdentity));
429 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
431 GNUNET_TIME_UNIT_FOREVER_REL,
432 GNUNET_NO, &transmit_join, consensus);
435 GNUNET_assert (consensus->th != NULL);
442 * Insert an element in the set being reconsiled. Must not be called after
443 * "GNUNET_CONSENSUS_conclude".
445 * @param consensus handle for the consensus session
446 * @param element the element to be inserted
447 * @param idc function called when we are done with this element and it
448 * is thus allowed to call GNUNET_CONSENSUS_insert again
449 * @param idc_cls closure for 'idc'
452 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
453 const struct GNUNET_CONSENSUS_Element *element,
454 GNUNET_CONSENSUS_InsertDoneCallback idc,
457 struct QueuedMessage *qmsg;
458 struct GNUNET_CONSENSUS_ElementMessage *element_msg;
459 size_t element_msg_size;
461 LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
463 element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
466 element_msg = GNUNET_malloc (element_msg_size);
467 element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
468 element_msg->header.size = htons (element_msg_size);
469 memcpy (&element_msg[1], element->data, element->size);
471 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
472 qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
474 qmsg->idc_cls = idc_cls;
476 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
478 send_next (consensus);
483 * We are done with inserting new elements into the consensus;
484 * try to conclude the consensus within a given time window.
485 * After conclude has been called, no further elements may be
486 * inserted by the client.
488 * @param consensus consensus session
489 * @param timeout timeout after which the conculde callback
491 * @param conclude called when the conclusion was successful
492 * @param conclude_cls closure for the conclude callback
495 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
496 struct GNUNET_TIME_Relative timeout,
497 unsigned int min_group_size_in_consensus,
498 GNUNET_CONSENSUS_ConcludeCallback conclude,
501 struct QueuedMessage *qmsg;
502 struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
504 GNUNET_assert (NULL != conclude);
505 GNUNET_assert (NULL == consensus->conclude_cb);
507 consensus->conclude_cls = conclude_cls;
508 consensus->conclude_cb = conclude;
510 conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
511 conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
512 conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
513 conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
514 conclude_msg->min_group_size = min_group_size_in_consensus;
516 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
517 qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
519 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
521 send_next (consensus);
526 * Destroy a consensus handle (free all state associated with
527 * it, no longer call any of the callbacks).
529 * @param consensus handle to destroy
532 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
534 if (GNUNET_YES == consensus->may_not_destroy)
536 LOG (GNUNET_ERROR_TYPE_ERROR, "destroy may not be called right now\n");
539 if (consensus->client != NULL)
541 GNUNET_CLIENT_disconnect (consensus->client);
542 consensus->client = NULL;
544 if (NULL != consensus->peers)
545 GNUNET_free (consensus->peers);
546 GNUNET_free (consensus);