2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file consensus/gnunet-service-consensus.c
24 * @author Florian Dold
28 #include "gnunet_common.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_consensus_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_stream_lib.h"
35 #include "consensus_protocol.h"
37 #include "consensus.h"
41 * Number of IBFs in a strata estimator.
43 #define STRATA_COUNT 32
45 * Number of buckets per IBF.
47 #define STRATA_IBF_BUCKETS 80
49 * hash num parameter for the difference digests and strata estimators
51 #define STRATA_HASH_NUM 3
54 * Number of buckets that can be transmitted in one message.
56 #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
59 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60 * Choose this value so that computing the IBF is still cheaper
61 * than transmitting all values.
63 #define MAX_IBF_ORDER (32)
66 /* forward declarations */
68 struct ConsensusSession;
69 struct IncomingSocket;
70 struct ConsensusPeerInformation;
73 send_next (struct ConsensusSession *session);
76 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
79 write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size);
82 write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size);
85 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
89 * An element that is waiting to be transmitted.
94 * Pending elements are kept in a DLL.
96 struct PendingElement *next;
99 * Pending elements are kept in a DLL.
101 struct PendingElement *prev;
106 struct GNUNET_CONSENSUS_Element *element;
108 /* peer this element is coming from */
109 struct ConsensusPeerInformation *cpi;
113 * Information about a peer that is in a consensus session.
115 struct ConsensusPeerInformation
117 struct GNUNET_STREAM_Socket *socket;
120 * Is socket's connection established, i.e. can we write to it?
121 * Only relevent on outgoing cpi.
126 * Type of the peer in the all-to-all rounds,
127 * GNUNET_YES if we initiate reconciliation.
132 * if the peer did something wrong, and was disconnected,
133 * never interact with this peer again.
138 * Did we receive/send a consensus hello?
143 * Handle for currently active read
145 struct GNUNET_STREAM_ReadHandle *rh;
148 * Handle for currently active read
150 struct GNUNET_STREAM_WriteHandle *wh;
155 IBF_STATE_TRANSMITTING,
160 * What is the order (=log2 size) of the ibf
161 * we're currently dealing with?
166 * The current IBF for this peer,
167 * purpose dependent on ibf_state
169 struct InvertibleBloomFilter *ibf;
172 * How many buckets have we transmitted/received (depending on state)?
174 int ibf_bucket_counter;
177 * Strata estimator of the peer, NULL if our peer
178 * initiated the reconciliation.
180 struct InvertibleBloomFilter **strata;
183 * difference estimated with the current strata estimator
187 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
190 * Back-reference to the consensus session,
191 * to that ConsensusPeerInformation can be used as a closure
193 struct ConsensusSession *session;
195 struct PendingElement *send_pending_head;
196 struct PendingElement *send_pending_tail;
201 struct GNUNET_MessageHeader *msg;
204 * Queued messages are stored in a doubly linked list.
206 struct QueuedMessage *next;
209 * Queued messages are stored in a doubly linked list.
211 struct QueuedMessage *prev;
217 * distribution of information with the exponential scheme
219 CONSENSUS_ROUND_EXP_EXCHANGE,
221 * All-to-all, exchange missing values
223 CONSENSUS_ROUND_A2A_EXCHANGE,
225 * All-to-all, check what values are missing, don't exchange anything
227 CONSENSUS_ROUND_A2A_INVENTORY
230 a round to exchange the information for fraud-detection
231 CONSENSUS_ROUNT_A2_INVENTORY_AGREEMENT
237 * A consensus session consists of one local client and the remote authorities.
240 struct ConsensusSession
243 * Consensus sessions are kept in a DLL.
245 struct ConsensusSession *next;
248 * Consensus sessions are kept in a DLL.
250 struct ConsensusSession *prev;
253 * Join message. Used to initialize the session later,
254 * if the identity of the local peer is not yet known.
255 * NULL if the session has been fully initialized.
257 struct GNUNET_CONSENSUS_JoinMessage *join_msg;
260 * Global consensus identification, computed
261 * from the local id and participating authorities.
263 struct GNUNET_HashCode global_id;
266 * Local client in this consensus session.
267 * There is only one client per consensus session.
269 struct GNUNET_SERVER_Client *client;
272 * Values in the consensus set of this session,
273 * all of them either have been sent by or approved by the client.
274 * Contains GNUNET_CONSENSUS_Element.
276 struct GNUNET_CONTAINER_MultiHashMap *values;
279 * Elements that have not been approved (or rejected) by the client yet.
281 struct PendingElement *approval_pending_head;
284 * Elements that have not been approved (or rejected) by the client yet.
286 struct PendingElement *approval_pending_tail;
289 * Messages to be sent to the local client that owns this session
291 struct QueuedMessage *client_messages_head;
294 * Messages to be sent to the local client that owns this session
296 struct QueuedMessage *client_messages_tail;
299 * Currently active transmit handle for sending to the client
301 struct GNUNET_SERVER_TransmitHandle *th;
304 * Once conclude_requested is GNUNET_YES, the client may not
305 * insert any more values.
307 int conclude_requested;
310 * Minimum number of peers to form a consensus group
312 int conclude_group_min;
315 * Number of other peers in the consensus
317 unsigned int num_peers;
320 * Information about the other peers,
323 struct ConsensusPeerInformation *info;
326 * Sorted array of peer identities in this consensus session,
327 * includes the local peer.
329 struct GNUNET_PeerIdentity *peers;
332 * Index of the local peer in the peers array
337 * Strata estimator, computed online
339 struct InvertibleBloomFilter **strata;
344 struct InvertibleBloomFilter **ibfs;
349 enum ConsensusRound current_round;
354 * Sockets from other peers who want to communicate with us.
355 * It may not be known yet which consensus session they belong to.
357 struct IncomingSocket
360 * Incoming sockets are kept in a double linked list.
362 struct IncomingSocket *next;
365 * Incoming sockets are kept in a double linked list.
367 struct IncomingSocket *prev;
372 struct GNUNET_STREAM_Socket *socket;
375 * Handle for currently active read
377 struct GNUNET_STREAM_ReadHandle *rh;
380 * Peer that connected to us with the socket.
382 struct GNUNET_PeerIdentity *peer;
385 * Message stream tokenizer for this socket.
387 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
390 * Peer-in-session this socket belongs to, once known, otherwise NULL.
392 struct ConsensusPeerInformation *cpi;
395 * Set to the global session id, if the peer sent us a hello-message,
396 * but the session does not exist yet.
398 * FIXME: not implemented yet
400 struct GNUNET_HashCode *requested_gid;
403 static struct IncomingSocket *incoming_sockets_head;
404 static struct IncomingSocket *incoming_sockets_tail;
407 * Linked list of sesstions this peer participates in.
409 static struct ConsensusSession *sessions_head;
412 * Linked list of sesstions this peer participates in.
414 static struct ConsensusSession *sessions_tail;
417 * Configuration of the consensus service.
419 static const struct GNUNET_CONFIGURATION_Handle *cfg;
422 * Handle to the server for this service.
424 static struct GNUNET_SERVER_Handle *srv;
427 * Peer that runs this service.
429 static struct GNUNET_PeerIdentity *my_peer;
432 * Handle to the core service. Only used during service startup, will be NULL after that.
434 static struct GNUNET_CORE_Handle *core;
437 * Listener for sockets from peers that want to reconcile with us.
439 static struct GNUNET_STREAM_ListenSocket *listener;
443 * Queue a message to be sent to the inhabiting client of a sessino
445 * @param session session
446 * @param msg message we want to queue
449 queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg)
451 struct QueuedMessage *qm;
452 qm = GNUNET_malloc (sizeof *qm);
454 GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm);
458 * Get peer index associated with the peer information,
459 * unique for every session among all peers.
462 get_cpi_index (struct ConsensusPeerInformation *cpi)
464 return cpi - cpi->session->info;
468 * Mark the peer as bad, free as state we don't need anymore.
471 mark_peer_bad (struct ConsensusPeerInformation *cpi)
473 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%u marked as bad\n", get_cpi_index (cpi));
474 cpi->is_bad = GNUNET_YES;
475 /* FIXME: free ibfs etc. */
480 * Estimate set difference with two strata estimators,
481 * i.e. arrays of IBFs.
484 estimate_difference (struct InvertibleBloomFilter** strata1,
485 struct InvertibleBloomFilter** strata2)
490 for (i = STRATA_COUNT - 1; i >= 0; i--)
492 struct InvertibleBloomFilter *diff;
496 diff = ibf_dup (strata1[i]);
497 ibf_subtract (diff, strata2[i]);
500 more = ibf_decode (diff, NULL, NULL);
501 if (GNUNET_NO == more)
506 if (GNUNET_SYSERR == more)
509 return count * (1 << (i + 1));
520 * Called when receiving data from a peer that is member of
521 * an inhabited consensus session.
523 * @param cls the closure from GNUNET_STREAM_read
524 * @param status the status of the stream at the time this function is called
525 * @param data traffic from the other side
526 * @param size the number of bytes available in data read; will be 0 on timeout
527 * @return number of bytes of processed from 'data' (any data remaining should be
528 * given to the next time the read processor is called).
531 session_stream_data_processor (void *cls,
532 enum GNUNET_STREAM_Status status,
536 struct ConsensusPeerInformation *cpi;
539 GNUNET_assert (GNUNET_STREAM_OK == status);
543 GNUNET_assert (NULL != cpi->mst);
545 ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
546 if (GNUNET_SYSERR == ret)
548 /* FIXME: handle this correctly */
553 cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
554 &session_stream_data_processor, cpi);
556 /* we always read all data */
561 * Called when we receive data from a peer that is not member of
562 * a session yet, or the session is not yet inhabited.
564 * @param cls the closure from GNUNET_STREAM_read
565 * @param status the status of the stream at the time this function is called
566 * @param data traffic from the other side
567 * @param size the number of bytes available in data read; will be 0 on timeout
568 * @return number of bytes of processed from 'data' (any data remaining should be
569 * given to the next time the read processor is called).
572 incoming_stream_data_processor (void *cls,
573 enum GNUNET_STREAM_Status status,
577 struct IncomingSocket *incoming;
580 GNUNET_assert (GNUNET_STREAM_OK == status);
584 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES);
585 if (GNUNET_SYSERR == ret)
587 /* FIXME: handle this correctly */
592 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
593 &incoming_stream_data_processor, incoming);
595 /* we always read all data */
601 * Iterator to insert values into an ibf.
604 * @param key current key code
605 * @param value value in the hash map
606 * @return GNUNET_YES if we should continue to
611 ibf_values_iterator (void *cls,
612 const struct GNUNET_HashCode *key,
615 struct ConsensusPeerInformation *cpi;
617 ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key));
622 prepare_ibf (struct ConsensusPeerInformation *cpi)
624 if (NULL == cpi->session->ibfs[cpi->ibf_order])
626 cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
627 GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
633 * Called when a peer sends us its strata estimator.
634 * In response, we sent out IBF of appropriate size back.
637 * @param strata_msg message
640 handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
647 GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
649 if (NULL == cpi->strata)
651 cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
652 for (i = 0; i < STRATA_COUNT; i++)
653 cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
656 /* for correct message alignment, copy bucket types seperately */
657 key_src = (uint64_t *) &strata_msg[1];
659 for (i = 0; i < STRATA_COUNT; i++)
661 memcpy (cpi->strata[i]->id_sum, key_src, STRATA_IBF_BUCKETS * sizeof *key_src);
662 key_src += STRATA_IBF_BUCKETS;
665 hash_src = (uint32_t *) key_src;
667 for (i = 0; i < STRATA_COUNT; i++)
669 memcpy (cpi->strata[i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
670 hash_src += STRATA_IBF_BUCKETS;
673 count_src = (uint8_t *) hash_src;
675 for (i = 0; i < STRATA_COUNT; i++)
677 memcpy (cpi->strata[i]->count, count_src, STRATA_IBF_BUCKETS);
678 count_src += STRATA_IBF_BUCKETS;
681 cpi->diff = estimate_difference (cpi->session->strata, cpi->strata);
682 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff);
684 /* send IBF of the right size */
686 while ((1 << cpi->ibf_order) < cpi->diff)
688 if (cpi->ibf_order > MAX_IBF_ORDER)
689 cpi->ibf_order = MAX_IBF_ORDER;
691 /* create ibf if not already pre-computed */
693 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
694 cpi->ibf_state = IBF_STATE_TRANSMITTING;
695 write_ibf (cpi, GNUNET_STREAM_OK, 0);
702 handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
709 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
711 if (IBF_STATE_NONE == cpi->ibf_state)
713 cpi->ibf_state = IBF_STATE_RECEIVING;
714 cpi->ibf_order = digest->order;
715 cpi->ibf_bucket_counter = 0;
718 if ( (IBF_STATE_RECEIVING != cpi->ibf_state) ||
719 (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) )
726 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->ibf_bucket_counter, (1 << cpi->ibf_order));
728 if (NULL == cpi->ibf)
729 cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
731 key_src = (uint64_t *) &digest[1];
733 memcpy (cpi->ibf->hash_sum, key_src, num_buckets * sizeof *key_src);
734 hash_src += num_buckets;
736 hash_src = (uint32_t *) key_src;
738 memcpy (cpi->ibf->id_sum, hash_src, num_buckets * sizeof *hash_src);
739 hash_src += num_buckets;
741 count_src = (uint8_t *) hash_src;
743 memcpy (cpi->ibf->count, count_src, num_buckets * sizeof *count_src);
745 cpi->ibf_bucket_counter += num_buckets;
747 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
749 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
750 GNUNET_assert (NULL != cpi->wh);
751 cpi->ibf_state = IBF_STATE_DECODING;
753 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
754 write_requests_and_elements (cpi, GNUNET_STREAM_OK, 0);
761 handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
763 struct PendingElement *pending_element;
764 struct GNUNET_CONSENSUS_Element *element;
765 struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
768 size = ntohs (element_msg->size) - sizeof *element_msg;
770 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size);
772 element = GNUNET_malloc (size + sizeof *element);
773 element->size = size;
774 memcpy (&element[1], &element_msg[1], size);
775 element->data = &element[1];
777 pending_element = GNUNET_malloc (sizeof *pending_element);
778 pending_element->element = element;
779 GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element);
781 client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
782 client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
783 client_element_msg->header.size = htons (size + sizeof *client_element_msg);
784 memcpy (&client_element_msg[1], &element[1], size);
786 queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg);
788 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n");
790 send_next (cpi->session);
797 * Handle a request for elements.
798 * Only allowed in exchange-rounds.
803 handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
805 /* FIXME: implement */
811 * Handle a HELLO-message, send when another peer wants to join a session where
812 * our peer is a member. The session may or may not be inhabited yet.
815 handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
817 /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */
818 struct ConsensusSession *session;
819 session = sessions_head;
820 while (NULL != session)
822 if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
825 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n");
826 idx = get_peer_idx (inc->peer, session);
827 GNUNET_assert (-1 != idx);
828 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx);
829 inc->cpi = &session->info[idx];
830 GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
831 inc->cpi->mst = inc->mst;
832 inc->cpi->hello = GNUNET_YES;
833 inc->cpi->socket = inc->socket;
836 session = session->next;
838 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer tried to HELLO uninhabited session\n");
845 * Functions with this signature are called whenever a
846 * complete message is received by the tokenizer.
848 * Do not call GNUNET_SERVER_mst_destroy in callback
851 * @param client identification of the client
852 * @param message the actual message
854 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
857 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
859 struct ConsensusPeerInformation *cpi;
861 switch (ntohs (message->type))
863 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
864 return handle_p2p_strata (cpi, (struct StrataMessage *) message);
865 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
866 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
867 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
868 return handle_p2p_element (cpi, message);
869 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
870 return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
872 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type));
873 /* FIXME: handle correctly */
881 * Handle tokenized messages from stream sockets.
882 * Delegate them if the socket belongs to a session,
883 * handle hello messages otherwise.
885 * Do not call GNUNET_SERVER_mst_destroy in callback
887 * @param cls closure, unused
888 * @param client incoming socket this message comes from
889 * @param message the actual message
891 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
894 mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
896 struct IncomingSocket *inc;
897 inc = (struct IncomingSocket *) client;
898 switch (ntohs( message->type))
900 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
901 return handle_p2p_hello (inc, (struct ConsensusHello *) message);
903 if (NULL != inc->cpi)
904 return mst_session_callback (inc->cpi, client, message);
905 /* FIXME: disconnect peer properly */
913 * Functions of this type are called upon new stream connection from other peers
914 * or upon binding error which happen when the app_port given in
915 * GNUNET_STREAM_listen() is already taken.
917 * @param cls the closure from GNUNET_STREAM_listen
918 * @param socket the socket representing the stream; NULL on binding error
919 * @param initiator the identity of the peer who wants to establish a stream
920 * with us; NULL on binding error
921 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
922 * stream (the socket will be invalid after the call)
925 listen_cb (void *cls,
926 struct GNUNET_STREAM_Socket *socket,
927 const struct GNUNET_PeerIdentity *initiator)
929 struct IncomingSocket *incoming;
931 GNUNET_assert (NULL != socket);
933 incoming = GNUNET_malloc (sizeof *incoming);
935 incoming->socket = socket;
936 incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
938 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
939 &incoming_stream_data_processor, incoming);
942 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
944 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
951 destroy_session (struct ConsensusSession *session)
953 /* FIXME: more stuff to free! */
954 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
955 GNUNET_SERVER_client_drop (session->client);
956 GNUNET_free (session);
961 * Disconnect a client, and destroy all sessions associated with it.
963 * @param client the client to disconnect
966 disconnect_client (struct GNUNET_SERVER_Client *client)
968 struct ConsensusSession *session;
969 GNUNET_SERVER_client_disconnect (client);
971 /* if the client owns a session, remove it */
972 session = sessions_head;
973 while (NULL != session)
975 if (client == session->client)
977 destroy_session (session);
980 session = session->next;
986 * Compute a global, (hopefully) unique consensus session id,
987 * from the local id of the consensus session, and the identities of all participants.
988 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
989 * exactly the same peers, the global id will be different.
991 * @param local_id local id of the consensus session
992 * @param peers array of all peers participating in the consensus session
993 * @param num_peers number of elements in the peers array
994 * @param dst where the result is stored, may not be NULL
997 compute_global_id (const struct GNUNET_HashCode *local_id,
998 const struct GNUNET_PeerIdentity *peers, int num_peers,
999 struct GNUNET_HashCode *dst)
1002 struct GNUNET_HashCode tmp;
1005 for (i = 0; i < num_peers; ++i)
1007 GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
1009 GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
1016 * Function called to notify a client about the connection
1017 * begin ready to queue more data. "buf" will be
1018 * NULL and "size" zero if the connection was closed for
1019 * writing in the meantime.
1021 * @param cls consensus session
1022 * @param size number of bytes available in buf
1023 * @param buf where the callee should write the message
1024 * @return number of bytes written to buf
1027 transmit_queued (void *cls, size_t size,
1030 struct ConsensusSession *session;
1031 struct QueuedMessage *qmsg;
1038 qmsg = session->client_messages_head;
1039 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
1040 GNUNET_assert (qmsg);
1044 destroy_session (session);
1048 msg_size = ntohs (qmsg->msg->size);
1050 GNUNET_assert (size >= msg_size);
1052 memcpy (buf, qmsg->msg, msg_size);
1053 GNUNET_free (qmsg->msg);
1056 send_next (session);
1063 * Schedule sending the next message (if there is any) to a client.
1065 * @param cli the client to send the next message to
1068 send_next (struct ConsensusSession *session)
1071 GNUNET_assert (NULL != session);
1073 if (NULL != session->th)
1076 if (NULL != session->client_messages_head)
1079 msize = ntohs (session->client_messages_head->msg->size);
1080 session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize,
1081 GNUNET_TIME_UNIT_FOREVER_REL,
1082 &transmit_queued, session);
1088 * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
1089 * the correct signature to be used with e.g. qsort.
1090 * We use this function instead.
1092 * @param h1 some hash code
1093 * @param h2 some hash code
1094 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
1097 hash_cmp (const void *a, const void *b)
1099 return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b);
1104 * Search peer in the list of peers in session.
1106 * @param peer peer to find
1107 * @param session session with peer
1108 * @return index of peer, -1 if peer is not in session
1111 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
1113 const struct GNUNET_PeerIdentity *needle;
1114 needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
1117 return needle - session->peers;
1123 * Called when stream has finishes writing the hello message
1126 hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1128 struct ConsensusPeerInformation *cpi;
1131 cpi->hello = GNUNET_YES;
1133 GNUNET_assert (GNUNET_STREAM_OK == status);
1135 if (cpi->session->conclude_requested)
1137 write_strata (cpi, GNUNET_STREAM_OK, 0);
1143 * Functions of this type will be called when a stream is established
1145 * @param cls the closure from GNUNET_STREAM_open
1146 * @param socket socket to use to communicate with the other side (read/write)
1149 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1151 struct ConsensusPeerInformation *cpi;
1152 struct ConsensusHello *hello;
1156 cpi->is_connected = GNUNET_YES;
1158 hello = GNUNET_malloc (sizeof *hello);
1159 hello->header.size = htons (sizeof *hello);
1160 hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1161 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1164 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
1166 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1167 &session_stream_data_processor, cpi);
1173 initialize_session_info (struct ConsensusSession *session)
1178 for (i = 0; i < session->num_peers; ++i)
1180 /* initialize back-references, so consensus peer information can
1181 * be used as closure */
1182 session->info[i].session = session;
1185 session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE;
1187 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1188 i = (session->local_peer_idx + 1) % session->num_peers;
1191 session->info[i].is_outgoing = GNUNET_YES;
1192 session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS,
1193 open_cb, &session->info[i], GNUNET_STREAM_OPTION_END);
1194 session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]);
1195 i = (i + 1) % session->num_peers;
1197 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i);
1199 // tie-breaker for even number of peers
1200 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
1202 session->info[last].is_outgoing = GNUNET_YES;
1203 session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
1204 open_cb, &session->info[last], GNUNET_STREAM_OPTION_END);
1205 session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]);
1207 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last);
1213 * Create the sorted list of peers for the session,
1214 * add the local peer if not in the join message.
1217 initialize_session_peer_list (struct ConsensusSession *session)
1219 int local_peer_in_list;
1221 const struct GNUNET_PeerIdentity *msg_peers;
1224 GNUNET_assert (NULL != session->join_msg);
1226 /* peers in the join message, may or may not include the local peer */
1227 listed_peers = ntohs (session->join_msg->num_peers);
1229 session->num_peers = listed_peers;
1231 msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
1233 local_peer_in_list = GNUNET_NO;
1234 for (i = 0; i < listed_peers; i++)
1236 if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
1238 local_peer_in_list = GNUNET_YES;
1243 if (GNUNET_NO == local_peer_in_list)
1244 session->num_peers++;
1246 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
1248 if (GNUNET_NO == local_peer_in_list)
1249 session->peers[session->num_peers - 1] = *my_peer;
1251 memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
1252 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
1257 strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key)
1262 /* count trailing '1'-bits of v */
1263 for (i = 0; v & 1; v>>=1, i++)
1265 ibf_insert (strata[i], ibf_key_from_hashcode (key));
1270 * Initialize the session, continue receiving messages from the owning client
1272 * @param session the session to initialize
1275 initialize_session (struct ConsensusSession *session)
1277 const struct ConsensusSession *other_session;
1280 GNUNET_assert (NULL != session->join_msg);
1282 initialize_session_peer_list (session);
1284 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
1286 compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id);
1288 /* Check if some local client already owns the session. */
1289 other_session = sessions_head;
1290 while (NULL != other_session)
1292 if ((other_session != session) &&
1293 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1295 /* session already owned by another client */
1297 disconnect_client (session->client);
1300 other_session = other_session->next;
1303 session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
1305 session->local_peer_idx = get_peer_idx (my_peer, session);
1306 GNUNET_assert (-1 != session->local_peer_idx);
1308 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
1310 session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
1311 for (i = 0; i < STRATA_COUNT; i++)
1312 session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
1314 session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *));
1316 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1317 initialize_session_info (session);
1319 GNUNET_free (session->join_msg);
1320 session->join_msg = NULL;
1322 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1323 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1328 * Called when a client wants to join a consensus session.
1331 * @param client client that sent the message
1332 * @param m message sent by the client
1335 client_join (void *cls,
1336 struct GNUNET_SERVER_Client *client,
1337 const struct GNUNET_MessageHeader *m)
1339 struct ConsensusSession *session;
1341 // make sure the client has not already joined a session
1342 session = sessions_head;
1343 while (NULL != session)
1345 if (session->client == client)
1348 disconnect_client (client);
1351 session = session->next;
1354 session = GNUNET_malloc (sizeof (struct ConsensusSession));
1355 session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
1356 session->client = client;
1357 GNUNET_SERVER_client_keep (client);
1359 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1361 // Initialize session later if local peer identity is not known yet.
1362 if (NULL == my_peer)
1364 GNUNET_SERVER_disable_receive_done_warning (client);
1368 initialize_session (session);
1373 * Called when a client performs an insert operation.
1375 * @param cls (unused)
1376 * @param client client handle
1377 * @param message message sent by the client
1380 client_insert (void *cls,
1381 struct GNUNET_SERVER_Client *client,
1382 const struct GNUNET_MessageHeader *m)
1384 struct ConsensusSession *session;
1385 struct GNUNET_CONSENSUS_ElementMessage *msg;
1386 struct GNUNET_CONSENSUS_Element *element;
1387 struct GNUNET_HashCode key;
1390 session = sessions_head;
1391 while (NULL != session)
1393 if (session->client == client)
1397 if (NULL == session)
1399 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
1400 GNUNET_SERVER_client_disconnect (client);
1404 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1405 element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1407 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
1409 element->type = msg->element_type;
1410 element->size = element_size;
1411 memcpy (&element[1], &msg[1], element_size);
1412 element->data = &element[1];
1414 GNUNET_assert (NULL != element->data);
1416 GNUNET_CRYPTO_hash (element, element_size, &key);
1418 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1419 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1421 strata_insert (session->strata, &key);
1423 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1425 send_next (session);
1432 * Functions of this signature are called whenever writing operations
1433 * on a stream are executed
1435 * @param cls the closure from GNUNET_STREAM_write
1436 * @param status the status of the stream at the time this function is called;
1437 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1438 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1439 * (this doesn't mean that the data is never sent, the receiver may
1440 * have read the data but its ACKs may have been lost);
1441 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1442 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1444 * @param size the number of bytes written
1447 write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1449 GNUNET_assert (GNUNET_STREAM_OK == status);
1450 /* just wait for the ibf */
1454 * Functions of this signature are called whenever writing operations
1455 * on a stream are executed
1457 * @param cls the closure from GNUNET_STREAM_write
1458 * @param status the status of the stream at the time this function is called;
1459 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1460 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1461 * (this doesn't mean that the data is never sent, the receiver may
1462 * have read the data but its ACKs may have been lost);
1463 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1464 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1466 * @param size the number of bytes written
1469 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1471 struct ConsensusPeerInformation *cpi;
1472 struct StrataMessage *strata_msg;
1482 GNUNET_assert (GNUNET_STREAM_OK == status);
1484 GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
1486 /* FIXME: handle this */
1487 GNUNET_assert (GNUNET_STREAM_OK == status);
1489 msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1491 strata_msg = GNUNET_malloc (msize);
1492 strata_msg->header.size = htons (msize);
1493 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1495 /* for correct message alignment, copy bucket types seperately */
1496 key_dst = (uint64_t *) &strata_msg[1];
1498 for (i = 0; i < STRATA_COUNT; i++)
1500 memcpy (key_dst, cpi->session->strata[i]->id_sum, STRATA_IBF_BUCKETS * sizeof *key_dst);
1501 key_dst += STRATA_IBF_BUCKETS;
1504 hash_dst = (uint32_t *) key_dst;
1506 for (i = 0; i < STRATA_COUNT; i++)
1508 memcpy (hash_dst, cpi->session->strata[i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1509 hash_dst += STRATA_IBF_BUCKETS;
1512 count_dst = (uint8_t *) hash_dst;
1514 for (i = 0; i < STRATA_COUNT; i++)
1516 memcpy (count_dst, cpi->session->strata[i]->count, STRATA_IBF_BUCKETS);
1517 count_dst += STRATA_IBF_BUCKETS;
1520 cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1521 write_strata_done, cpi);
1523 GNUNET_assert (NULL != cpi->wh);
1528 * Functions of this signature are called whenever writing operations
1529 * on a stream are executed
1531 * @param cls the closure from GNUNET_STREAM_write
1532 * @param status the status of the stream at the time this function is called;
1533 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1534 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1535 * (this doesn't mean that the data is never sent, the receiver may
1536 * have read the data but its ACKs may have been lost);
1537 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1538 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1540 * @param size the number of bytes written
1543 write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1545 struct ConsensusPeerInformation *cpi;
1546 struct DifferenceDigest *digest;
1556 GNUNET_assert (GNUNET_STREAM_OK == status);
1558 GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state);
1560 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1562 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
1563 /* we now wait for values / requests / another IBF because peer could not decode with our IBF */
1567 /* remaining buckets */
1568 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
1570 /* limit to maximum */
1571 if (num_buckets > BUCKETS_PER_MESSAGE)
1572 num_buckets = BUCKETS_PER_MESSAGE;
1574 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->ibf_bucket_counter, (1<<cpi->ibf_order));
1576 msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
1578 digest = GNUNET_malloc (msize);
1579 digest->header.size = htons (msize);
1580 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1581 digest->order = cpi->ibf_order;
1583 key_dst = (uint64_t *) &digest[1];
1585 memcpy (key_dst, cpi->ibf->id_sum, num_buckets * sizeof *key_dst);
1586 key_dst += num_buckets;
1588 hash_dst = (uint32_t *) key_dst;
1590 memcpy (hash_dst, cpi->ibf->id_sum, num_buckets * sizeof *hash_dst);
1591 hash_dst += num_buckets;
1593 count_dst = (uint8_t *) hash_dst;
1595 memcpy (count_dst, cpi->ibf->count, num_buckets * sizeof *count_dst);
1597 cpi->ibf_bucket_counter += num_buckets;
1599 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1602 GNUNET_assert (NULL != cpi->wh);
1607 * Functions of this signature are called whenever writing operations
1608 * on a stream are executed
1610 * @param cls the closure from GNUNET_STREAM_write
1611 * @param status the status of the stream at the time this function is called;
1612 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1613 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1614 * (this doesn't mean that the data is never sent, the receiver may
1615 * have read the data but its ACKs may have been lost);
1616 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1617 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1619 * @param size the number of bytes written
1622 write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1624 struct ConsensusPeerInformation *cpi;
1626 struct GNUNET_HashCode hashcode;
1630 GNUNET_assert (GNUNET_STREAM_OK == status);
1632 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n");
1637 GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state);
1642 res = ibf_decode (cpi->ibf, &side, &key);
1643 if (GNUNET_SYSERR == res)
1647 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1648 cpi->ibf_state = IBF_STATE_TRANSMITTING;
1649 write_ibf (cls, status, size);
1652 if (GNUNET_NO == res)
1654 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n");
1659 struct GNUNET_CONSENSUS_Element *element;
1660 struct GNUNET_MessageHeader *element_msg;
1661 ibf_hashcode_from_key (key, &hashcode);
1662 /* FIXME: this only transmits one element stored with the key */
1663 element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
1664 if (NULL == element)
1666 msize = sizeof (struct GNUNET_MessageHeader) + element->size;
1667 element_msg = GNUNET_malloc (msize);
1668 element_msg->size = htons (msize);
1669 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
1670 GNUNET_assert (NULL != element->data);
1671 memcpy (&element_msg[1], element->data, element->size);
1672 cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1673 write_requests_and_elements, cpi);
1674 GNUNET_free (element_msg);
1675 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n");
1677 GNUNET_assert (NULL != cpi->wh);
1682 struct ElementRequest *msg;
1686 msize = (sizeof *msg) + sizeof (uint64_t);
1687 msg = GNUNET_malloc (msize);
1688 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1689 msg->header.size = htons (msize);
1690 p = (uint64_t *) &msg[1];
1693 cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1694 write_requests_and_elements, cpi);
1695 GNUNET_assert (NULL != cpi->wh);
1707 select_best_group (struct ConsensusSession *session)
1714 * Called when a client performs the conclude operation.
1716 * @param cls (unused)
1717 * @param client client handle
1718 * @param message message sent by the client
1721 client_conclude (void *cls,
1722 struct GNUNET_SERVER_Client *client,
1723 const struct GNUNET_MessageHeader *message)
1725 struct ConsensusSession *session;
1728 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
1730 session = sessions_head;
1731 while ((session != NULL) && (session->client != client))
1732 session = session->next;
1733 if (NULL == session)
1735 /* client not found */
1737 GNUNET_SERVER_client_disconnect (client);
1741 if (GNUNET_YES == session->conclude_requested)
1743 /* client requested conclude twice */
1745 disconnect_client (client);
1749 session->conclude_requested = GNUNET_YES;
1751 for (i = 0; i < session->num_peers; i++)
1753 if ( (GNUNET_YES == session->info[i].is_outgoing) &&
1754 (GNUNET_YES == session->info[i].hello) )
1756 /* kick off transmitting strata by calling the write continuation */
1757 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
1761 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1762 send_next (session);
1767 * Called when a client sends an ack
1769 * @param cls (unused)
1770 * @param client client handle
1771 * @param message message sent by the client
1774 client_ack (void *cls,
1775 struct GNUNET_SERVER_Client *client,
1776 const struct GNUNET_MessageHeader *message)
1778 struct ConsensusSession *session;
1779 struct GNUNET_CONSENSUS_AckMessage *msg;
1780 struct PendingElement *pending;
1781 struct GNUNET_CONSENSUS_Element *element;
1782 struct GNUNET_HashCode key;
1784 session = sessions_head;
1785 while (NULL != session)
1787 if (session->client == client)
1791 if (NULL == session)
1793 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n");
1794 GNUNET_SERVER_client_disconnect (client);
1798 pending = session->approval_pending_head;
1800 GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, session->approval_pending_tail, pending);
1802 msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
1806 element = pending->element;
1807 GNUNET_CRYPTO_hash (element, element->size, &key);
1809 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1810 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1811 strata_insert (session->strata, &key);
1814 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1818 * Task that disconnects from core.
1820 * @param cls core handle
1821 * @param tc context information (why was this task triggered now)
1824 disconnect_core (void *cls,
1825 const struct GNUNET_SCHEDULER_TaskContext *tc)
1827 GNUNET_CORE_disconnect (core);
1829 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
1834 core_startup (void *cls,
1835 struct GNUNET_CORE_Handle *core,
1836 const struct GNUNET_PeerIdentity *peer)
1838 struct ConsensusSession *session;
1840 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
1841 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
1842 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
1843 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
1845 session = sessions_head;
1846 while (NULL != session)
1848 if (NULL != session->join_msg)
1849 initialize_session (session);
1850 session = session->next;
1856 * Called to clean up, after a shutdown has been requested.
1858 * @param cls closure
1859 * @param tc context information (why was this task triggered now)
1862 shutdown_task (void *cls,
1863 const struct GNUNET_SCHEDULER_TaskContext *tc)
1866 /* FIXME: complete; write separate destructors for different data types */
1868 while (NULL != incoming_sockets_head)
1870 struct IncomingSocket *socket;
1871 socket = incoming_sockets_head;
1872 if (NULL == socket->cpi)
1874 GNUNET_STREAM_close (socket->socket);
1876 incoming_sockets_head = incoming_sockets_head->next;
1877 GNUNET_free (socket);
1880 while (NULL != sessions_head)
1882 struct ConsensusSession *session;
1885 session = sessions_head;
1887 for (i = 0; session->num_peers; i++)
1889 struct ConsensusPeerInformation *cpi;
1890 cpi = &session->info[i];
1891 if ((NULL != cpi) && (NULL != cpi->socket))
1893 GNUNET_STREAM_close (cpi->socket);
1897 if (NULL != session->client)
1898 GNUNET_SERVER_client_disconnect (session->client);
1900 sessions_head = sessions_head->next;
1901 GNUNET_free (session);
1906 GNUNET_CORE_disconnect (core);
1910 if (NULL != listener)
1912 GNUNET_STREAM_listen_close (listener);
1916 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
1921 * Start processing consensus requests.
1923 * @param cls closure
1924 * @param server the initialized server
1925 * @param c configuration to use
1928 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
1930 /* core is only used to retrieve the peer identity */
1931 static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
1934 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1935 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1936 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1937 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1938 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
1939 {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
1940 sizeof (struct GNUNET_CONSENSUS_AckMessage)},
1947 GNUNET_SERVER_add_handlers (server, server_handlers);
1949 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1951 listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
1953 GNUNET_STREAM_OPTION_END);
1956 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
1957 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
1958 GNUNET_assert (NULL != core);
1960 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1965 * The main function for the consensus service.
1967 * @param argc number of arguments from the command line
1968 * @param argv command line arguments
1969 * @return 0 ok, 1 on error
1972 main (int argc, char *const *argv)
1975 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1976 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
1977 return (GNUNET_OK == ret) ? 0 : 1;