2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * @file consensus/gnunet-service-consensus.c
25 * @author Florian Dold
29 #include "gnunet_common.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_consensus_service.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_mesh_service.h"
36 #include "consensus.h"
39 struct ConsensusSession;
42 send_next (struct ConsensusSession *session);
46 * An element that is waiting to be transmitted to a client.
51 * Pending elements are kept in a DLL.
53 struct PendingElement *next;
56 * Pending elements are kept in a DLL.
58 struct PendingElement *prev;
63 struct GNUNET_CONSENSUS_Element *element;
68 * A peer that is also in a consensus session.
69 * Note that 'this' peer is not in the list.
73 struct GNUNET_PeerIdentity *peer_id;
76 * Incoming tunnel from the peer.
78 struct GNUNET_MESH_Tunnel *incoming_tunnel;
80 struct InvertibleBloomFilter *last_ibf;
86 * A consensus session consists of one local client and the remote authorities.
88 struct ConsensusSession
91 * Consensus sessions are kept in a DLL.
93 struct ConsensusSession *next;
96 * Consensus sessions are kept in a DLL.
98 struct ConsensusSession *prev;
101 * Local consensus identification, chosen by clients.
103 struct GNUNET_HashCode *local_id;
106 * Global consensus identification, computed
107 * from the local id and participating authorities.
109 struct GNUNET_HashCode *global_id;
112 * Local client in this consensus session.
113 * There is only one client per consensus session.
115 struct GNUNET_SERVER_Client *client;
118 * Values in the consensus set of this session,
119 * all of them either have been sent by or approved by the client.
121 struct GNUNET_CONTAINER_MultiHashMap *values;
124 * Elements that have not been sent to the client yet.
126 struct PendingElement *transmit_pending_head;
129 * Elements that have not been sent to the client yet.
131 struct PendingElement *transmit_pending_tail;
134 * Elements that have not been approved (or rejected) by the client yet.
136 struct PendingElement *approval_pending_head;
139 * Elements that have not been approved (or rejected) by the client yet.
141 struct PendingElement *approval_pending_tail;
144 * Currently active transmit handle for sending to the client
146 struct GNUNET_SERVER_TransmitHandle *th;
149 * Once conclude_requested is GNUNET_YES, the client may not
150 * insert any more values.
152 int conclude_requested;
155 * Client has been informed about the conclusion.
160 * Minimum number of peers to form a consensus group
162 int conclude_group_min;
165 * Current round of the conclusion
170 * Soft deadline for conclude.
171 * Speed up the speed of the consensus at the cost of consensus quality, as
172 * the time approached or crosses the deadline.
174 struct GNUNET_TIME_Absolute conclude_deadline;
177 * Number of other peers in the consensus
179 unsigned int num_peers;
182 * Other peers in the consensus, array of ConsensusPeer
184 struct ConsensusPeer *peers;
187 * Tunnel for broadcasting to all other authorities
189 struct GNUNET_MESH_Tunnel *broadcast_tunnel;
192 * Time limit for one round of pairwise exchange.
193 * FIXME: should not actually be a constant
195 struct GNUNET_TIME_Relative round_time;
198 * Task identifier for the round timeout task
200 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
205 * Linked list of sesstions this peer participates in.
207 static struct ConsensusSession *sessions_head;
210 * Linked list of sesstions this peer participates in.
212 static struct ConsensusSession *sessions_tail;
215 * Configuration of the consensus service.
217 static const struct GNUNET_CONFIGURATION_Handle *cfg;
220 * Handle to the server for this service.
222 static struct GNUNET_SERVER_Handle *srv;
225 * Peer that runs this service
227 static struct GNUNET_PeerIdentity *my_peer;
230 * Handle to the mesh service.
232 static struct GNUNET_MESH_Handle *mesh;
235 * Handle to the core service. Only used during service startup, will be NULL after that.
237 static struct GNUNET_CORE_Handle *core;
240 disconnect_client (struct GNUNET_SERVER_Client *client)
242 GNUNET_SERVER_client_disconnect (client);
243 /* FIXME: free data structures that this client owns */
247 compute_global_id (struct GNUNET_HashCode *dst,
248 const struct GNUNET_HashCode *local_id,
249 const struct GNUNET_PeerIdentity *peers,
253 struct GNUNET_HashCode tmp;
256 for (i = 0; i < num_peers; ++i)
258 GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
260 GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
267 transmit_pending (void *cls, size_t size, void *buf)
269 struct GNUNET_CONSENSUS_Element *element;
270 struct GNUNET_CONSENSUS_ElementMessage *msg;
271 struct ConsensusSession *session;
273 session = (struct ConsensusSession *) cls;
274 msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf;
275 element = session->transmit_pending_head->element;
277 GNUNET_assert (NULL != element);
281 msg->element_type = element->type;
282 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
283 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size);
284 memcpy (&msg[1], element->data, element->size);
286 session->transmit_pending_head = session->transmit_pending_head->next;
290 return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size;
295 transmit_conclude_done (void *cls, size_t size, void *buf)
297 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg;
299 msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf;
300 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
301 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage));
302 msg->num_peers = htons (0);
304 return sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage);
309 * Schedule sending the next message (if there is any) to a client.
311 * @param cli the client to send the next message to
314 send_next (struct ConsensusSession *session)
318 GNUNET_assert (NULL != session);
320 if (NULL != session->th)
325 if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO))
328 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
330 GNUNET_SERVER_notify_transmit_ready (session->client, msize,
331 GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, session);
332 session->conclude_sent = GNUNET_YES;
334 else if (NULL != session->transmit_pending_head)
336 msize = session->transmit_pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage);
338 GNUNET_SERVER_notify_transmit_ready (session->client, msize,
339 GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, session);
340 /* TODO: insert into ack pending */
346 * Method called whenever a peer has disconnected from the tunnel.
347 * Implementations of this callback must NOT call
348 * GNUNET_MESH_tunnel_destroy immediately, but instead schedule those
349 * to run in some other task later. However, calling
350 * "GNUNET_MESH_notify_transmit_ready_cancel" is allowed.
353 * @param peer peer identity the tunnel stopped working with
356 disconnect_handler (void *cls, const struct GNUNET_PeerIdentity *peer)
358 /* FIXME: how do we handle this */
363 * Method called whenever a peer has connected to the tunnel.
366 * @param peer peer identity the tunnel was created to, NULL on timeout
367 * @param atsi performance data for the connection
370 connect_handler (void *cls,
371 const struct GNUNET_PeerIdentity *peer,
372 const struct GNUNET_ATS_Information *atsi)
374 /* not much we can do here, now we know the other peer has been added to our broadcast tunnel */
379 * Called when a client wants to join a consensus session.
382 * @param client client that sent the message
383 * @param m message sent by the client
386 client_join (void *cls,
387 struct GNUNET_SERVER_Client *client,
388 const struct GNUNET_MessageHeader *m)
390 struct GNUNET_HashCode global_id;
391 const struct GNUNET_CONSENSUS_JoinMessage *msg;
392 struct ConsensusSession *session;
395 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n");
397 msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
399 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session id is %s\n", GNUNET_h2s (&msg->session_id));
401 compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers);
403 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "computed global id is %s\n", GNUNET_h2s (&global_id));
405 session = sessions_head;
406 while (NULL != session)
408 if (client == session->client)
411 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n");
412 disconnect_client (client);
415 if (0 == memcmp (session->global_id, &global_id, sizeof (struct GNUNET_HashCode)))
417 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "session already owned by another client\n");
418 disconnect_client (client);
421 session = session->next;
424 GNUNET_SERVER_client_keep (client);
426 /* session does not exist yet, create it */
427 session = GNUNET_malloc (sizeof (struct ConsensusSession));
428 session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode));
429 session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode));
430 session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
431 session->client = client;
432 /* FIXME: should not be a constant, but chosen adaptively */
433 session->round_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
435 session->broadcast_tunnel = GNUNET_MESH_tunnel_create (mesh, session, connect_handler, disconnect_handler, session);
437 session->num_peers = 0;
439 /* count the peers that are not the local peer */
440 for (i = 0; i < msg->num_peers; i++)
442 struct GNUNET_PeerIdentity *peers;
443 peers = (struct GNUNET_PeerIdentity *) &msg[1];
444 if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
445 session->num_peers++;
448 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeer));
450 /* copy the peer identities and add peers to broadcast tunnel */
451 for (i = 0; i < msg->num_peers; i++)
453 struct GNUNET_PeerIdentity *peers;
454 peers = (struct GNUNET_PeerIdentity *) &msg[1];
455 if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
457 *session->peers->peer_id = peers[i];
458 GNUNET_MESH_peer_request_connect_add (session->broadcast_tunnel, &peers[i]);
462 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n");
464 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
466 GNUNET_SERVER_receive_done (client, GNUNET_OK);
471 * Called when a client performs an insert operation.
474 client_insert (void *cls,
475 struct GNUNET_SERVER_Client *client,
476 const struct GNUNET_MessageHeader *m)
478 struct ConsensusSession *session;
479 struct GNUNET_CONSENSUS_ElementMessage *msg;
480 struct GNUNET_CONSENSUS_Element *element;
481 struct GNUNET_HashCode key;
484 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n");
486 session = sessions_head;
487 while (NULL != session)
489 if (session->client == client)
495 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
496 GNUNET_SERVER_client_disconnect (client);
500 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
501 element_size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
503 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
505 element->type = msg->element_type;
506 element->size = element_size;
507 memcpy (&element[1], &msg[1], element_size);
508 element->data = &element[1];
510 GNUNET_CRYPTO_hash (element, element_size, &key);
512 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
513 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
515 GNUNET_SERVER_receive_done (client, GNUNET_OK);
522 * Do one round of the conclusion.
523 * Start by broadcasting the set difference estimator (IBF strata).
527 conclude_do_round (struct ConsensusSession *session)
534 * Cancel the current round if necessary, decide to run another round or
538 conclude_round_done (struct ConsensusSession *session)
545 * Called when a client performs the conclude operation.
548 client_conclude (void *cls,
549 struct GNUNET_SERVER_Client *client,
550 const struct GNUNET_MessageHeader *message)
552 struct ConsensusSession *session;
554 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
556 session = sessions_head;
557 while ((session != NULL) && (session->client != client))
559 session = session->next;
563 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client not found\n");
564 GNUNET_SERVER_client_disconnect (client);
568 if (GNUNET_YES == session->conclude_requested)
570 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client requested conclude twice\n");
571 GNUNET_SERVER_client_disconnect (client);
575 session->conclude_requested = GNUNET_YES;
577 conclude_do_round (session);
579 GNUNET_SERVER_receive_done (client, GNUNET_OK);
586 * Called when a client sends an ack
589 client_ack (void *cls,
590 struct GNUNET_SERVER_Client *client,
591 const struct GNUNET_MessageHeader *message)
593 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n");
597 * Task that disconnects from core.
599 * @param cls core handle
600 * @param tc context information (why was this task triggered now)
603 disconnect_core (void *cls,
604 const struct GNUNET_SCHEDULER_TaskContext *tc)
606 GNUNET_CORE_disconnect (core);
608 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
613 core_startup (void *cls,
614 struct GNUNET_CORE_Handle *core,
615 const struct GNUNET_PeerIdentity *peer)
617 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
618 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
619 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
620 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
621 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
622 {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
623 sizeof (struct GNUNET_CONSENSUS_AckMessage)},
627 GNUNET_SERVER_add_handlers (srv, handlers);
628 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
629 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
630 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
631 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
637 * Method called whenever another peer has added us to a tunnel
638 * the other peer initiated.
639 * Only called (once) upon reception of data with a message type which was
640 * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
641 * causes te tunnel to be ignored and no further notifications are sent about
645 * @param tunnel new handle to the tunnel
646 * @param initiator peer that started the tunnel
647 * @param atsi performance information for the tunnel
648 * @return initial tunnel context for the tunnel
649 * (can be NULL -- that's not an error)
652 new_tunnel (void *cls,
653 struct GNUNET_MESH_Tunnel *tunnel,
654 const struct GNUNET_PeerIdentity *initiator,
655 const struct GNUNET_ATS_Information *atsi)
657 /* there's nothing we can do here, as we don't have the global consensus id yet */
663 * Function called whenever an inbound tunnel is destroyed. Should clean up
664 * any associated state. This function is NOT called if the client has
665 * explicitly asked for the tunnel to be destroyed using
666 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
669 * @param cls closure (set from GNUNET_MESH_connect)
670 * @param tunnel connection to the other end (henceforth invalid)
671 * @param tunnel_ctx place where local state associated
672 * with the tunnel is stored
675 cleaner (void *cls, const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
677 /* FIXME: what to do here? */
683 * Called to clean up, after a shutdown has been requested.
686 * @param tc context information (why was this task triggered now)
689 shutdown_task (void *cls,
690 const struct GNUNET_SCHEDULER_TaskContext *tc)
692 /* mesh requires all the tunnels to be destroyed manually */
693 while (NULL != sessions_head)
695 struct ConsensusSession *session;
696 session = sessions_head;
697 GNUNET_MESH_tunnel_destroy (sessions_head->broadcast_tunnel);
698 sessions_head = sessions_head->next;
699 GNUNET_free (session);
704 GNUNET_MESH_disconnect (mesh);
709 GNUNET_CORE_disconnect (core);
717 * Functions with this signature are called whenever a message is
720 * @param cls closure (set from GNUNET_MESH_connect)
721 * @param tunnel connection to the other end
722 * @param tunnel_ctx place to store local state associated with the tunnel
723 * @param sender who sent the message
724 * @param message the actual message
725 * @param atsi performance data for the connection
726 * @return GNUNET_OK to keep the connection open,
727 * GNUNET_SYSERR to close it (signal serious error)
730 p2p_delta_estimate (void *cls,
731 struct GNUNET_MESH_Tunnel * tunnel,
733 const struct GNUNET_PeerIdentity *sender,
734 const struct GNUNET_MessageHeader *message,
735 const struct GNUNET_ATS_Information *atsi)
743 * Functions with this signature are called whenever a message is
746 * @param cls closure (set from GNUNET_MESH_connect)
747 * @param tunnel connection to the other end
748 * @param tunnel_ctx place to store local state associated with the tunnel
749 * @param sender who sent the message
750 * @param message the actual message
751 * @param atsi performance data for the connection
752 * @return GNUNET_OK to keep the connection open,
753 * GNUNET_SYSERR to close it (signal serious error)
756 p2p_difference_digest (void *cls,
757 struct GNUNET_MESH_Tunnel * tunnel,
759 const struct GNUNET_PeerIdentity *sender,
760 const struct GNUNET_MessageHeader *message,
761 const struct GNUNET_ATS_Information *atsi)
769 * Functions with this signature are called whenever a message is
772 * @param cls closure (set from GNUNET_MESH_connect)
773 * @param tunnel connection to the other end
774 * @param tunnel_ctx place to store local state associated with the tunnel
775 * @param sender who sent the message
776 * @param message the actual message
777 * @param atsi performance data for the connection
778 * @return GNUNET_OK to keep the connection open,
779 * GNUNET_SYSERR to close it (signal serious error)
782 p2p_elements_and_requests (void *cls,
783 struct GNUNET_MESH_Tunnel * tunnel,
785 const struct GNUNET_PeerIdentity *sender,
786 const struct GNUNET_MessageHeader *message,
787 const struct GNUNET_ATS_Information *atsi)
795 * Start processing consensus requests.
798 * @param server the initialized server
799 * @param c configuration to use
802 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
804 static const struct GNUNET_CORE_MessageHandler handlers[] = {
807 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
808 {p2p_delta_estimate, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE, 0},
809 {p2p_difference_digest, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST, 0},
810 {p2p_elements_and_requests, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS, 0},
813 static const GNUNET_MESH_ApplicationType app_types[] = {
814 GNUNET_APPLICATION_TYPE_CONSENSUS,
815 GNUNET_APPLICATION_TYPE_END
818 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
823 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
825 mesh = GNUNET_MESH_connect (cfg, NULL, new_tunnel, cleaner, mesh_handlers, app_types);
826 GNUNET_assert (NULL != mesh);
828 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
829 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers);
830 GNUNET_assert (NULL != core);
835 * The main function for the consensus service.
837 * @param argc number of arguments from the command line
838 * @param argv command line arguments
839 * @return 0 ok, 1 on error
842 main (int argc, char *const *argv)
845 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
846 return (GNUNET_OK == ret) ? 0 : 1;