2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
23 * @file consensus/gnunet-service-consensus.c
25 * @author Florian Dold
29 #include "gnunet_common.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_consensus_service.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_stream_lib.h"
36 #include "consensus_protocol.h"
38 #include "consensus.h"
42 * Number of IBFs in a strata estimator.
44 #define STRATA_COUNT 32
46 * Number of buckets per IBF.
48 #define STRATA_IBF_BUCKETS 80
50 * hash num parameter of the IBF
52 #define STRATA_HASH_NUM 3
54 * Number of strata that can be transmitted in one message.
56 #define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS))
60 /* forward declarations */
62 struct ConsensusSession;
63 struct IncomingSocket;
66 send_next (struct ConsensusSession *session);
69 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
72 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
76 * An element that is waiting to be transmitted to a client.
81 * Pending elements are kept in a DLL.
83 struct PendingElement *next;
86 * Pending elements are kept in a DLL.
88 struct PendingElement *prev;
93 struct GNUNET_CONSENSUS_Element *element;
96 struct ConsensusPeerInformation
98 struct GNUNET_STREAM_Socket *socket;
101 * Is socket's connection established, i.e. can we write to it?
102 * Only relevent on outgoing cpi.
107 * Type of the peer in the all-to-all rounds,
108 * GNUNET_YES if we initiate reconciliation.
113 * Did we receive/send a consensus hello?
118 * Handle for currently active read
120 struct GNUNET_STREAM_ReadHandle *rh;
123 * Handle for currently active read
125 struct GNUNET_STREAM_WriteHandle *wh;
128 * How many of the strate in the ibf were
129 * sent or received in this round?
133 struct InvertibleBloomFilter *my_ibf;
135 int my_ibf_bucket_counter;
137 struct InvertibleBloomFilter *peer_ibf;
139 int peer_ibf_bucket_counter;
142 * Strata estimator of the peer, NULL if our peer
143 * initiated the reconciliation.
145 struct InvertibleBloomFilter **strata;
147 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
149 struct ConsensusSession *session;
154 struct GNUNET_MessageHeader *msg;
157 * Queued messages are stored in a doubly linked list.
159 struct QueuedMessage *next;
162 * Queued messages are stored in a doubly linked list.
164 struct QueuedMessage *prev;
169 * A consensus session consists of one local client and the remote authorities.
171 struct ConsensusSession
174 * Consensus sessions are kept in a DLL.
176 struct ConsensusSession *next;
179 * Consensus sessions are kept in a DLL.
181 struct ConsensusSession *prev;
184 * Join message. Used to initialize the session later,
185 * if the identity of the local peer is not yet known.
186 * NULL if the session has been fully initialized.
188 struct GNUNET_CONSENSUS_JoinMessage *join_msg;
191 * Global consensus identification, computed
192 * from the local id and participating authorities.
194 struct GNUNET_HashCode global_id;
197 * Local client in this consensus session.
198 * There is only one client per consensus session.
200 struct GNUNET_SERVER_Client *client;
203 * Values in the consensus set of this session,
204 * all of them either have been sent by or approved by the client.
206 struct GNUNET_CONTAINER_MultiHashMap *values;
209 * Elements that have not been sent to the client yet.
211 struct PendingElement *transmit_pending_head;
214 * Elements that have not been sent to the client yet.
216 struct PendingElement *transmit_pending_tail;
219 * Elements that have not been approved (or rejected) by the client yet.
221 struct PendingElement *approval_pending_head;
224 * Elements that have not been approved (or rejected) by the client yet.
226 struct PendingElement *approval_pending_tail;
228 struct QueuedMessage *client_messages_head;
230 struct QueuedMessage *client_messages_tail;
233 * Currently active transmit handle for sending to the client
235 struct GNUNET_SERVER_TransmitHandle *th;
238 * Once conclude_requested is GNUNET_YES, the client may not
239 * insert any more values.
241 int conclude_requested;
244 * Minimum number of peers to form a consensus group
246 int conclude_group_min;
249 * Current round of the conclusion
254 * Soft deadline for conclude.
255 * Speed up the speed of the consensus at the cost of consensus quality, as
256 * the time approached or crosses the deadline.
258 struct GNUNET_TIME_Absolute conclude_deadline;
261 * Number of other peers in the consensus
263 unsigned int num_peers;
265 struct ConsensusPeerInformation *info;
268 * Sorted array of peer identities in this consensus session,
269 * includes the local peer.
271 struct GNUNET_PeerIdentity *peers;
274 * Index of the local peer in the peers array
279 * Task identifier for the round timeout task
281 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
283 struct InvertibleBloomFilter **strata;
288 * Sockets from other peers who want to communicate with us.
289 * It may not be known yet which consensus session they belong to.
291 struct IncomingSocket
294 * Incoming sockets are kept in a double linked list.
296 struct IncomingSocket *next;
299 * Incoming sockets are kept in a double linked list.
301 struct IncomingSocket *prev;
306 struct GNUNET_STREAM_Socket *socket;
309 * Handle for currently active read
311 struct GNUNET_STREAM_ReadHandle *rh;
314 * Peer that connected to us with the socket.
316 struct GNUNET_PeerIdentity *peer;
319 * Message stream tokenizer for this socket.
321 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
324 * Peer-in-session this socket belongs to, once known, otherwise NULL.
326 struct ConsensusPeerInformation *cpi;
329 static struct IncomingSocket *incoming_sockets_head;
330 static struct IncomingSocket *incoming_sockets_tail;
333 * Linked list of sesstions this peer participates in.
335 static struct ConsensusSession *sessions_head;
338 * Linked list of sesstions this peer participates in.
340 static struct ConsensusSession *sessions_tail;
343 * Configuration of the consensus service.
345 static const struct GNUNET_CONFIGURATION_Handle *cfg;
348 * Handle to the server for this service.
350 static struct GNUNET_SERVER_Handle *srv;
353 * Peer that runs this service.
355 static struct GNUNET_PeerIdentity *my_peer;
358 * Handle to the core service. Only used during service startup, will be NULL after that.
360 static struct GNUNET_CORE_Handle *core;
363 * Listener for sockets from peers that want to reconcile with us.
365 static struct GNUNET_STREAM_ListenSocket *listener;
369 estimate_difference (struct InvertibleBloomFilter** strata1,
370 struct InvertibleBloomFilter** strata2)
375 for (i = STRATA_COUNT - 1; i >= 0; i--)
377 struct InvertibleBloomFilter *diff;
381 diff = ibf_dup (strata1[i]);
382 ibf_subtract (diff, strata2[i]);
385 more = ibf_decode (diff, NULL, NULL);
386 if (GNUNET_NO == more)
391 if (GNUNET_SYSERR == more)
393 return count * (1 << (i + 1));
404 * Functions of this signature are called whenever data is available from the
407 * @param cls the closure from GNUNET_STREAM_read
408 * @param status the status of the stream at the time this function is called
409 * @param data traffic from the other side
410 * @param size the number of bytes available in data read; will be 0 on timeout
411 * @return number of bytes of processed from 'data' (any data remaining should be
412 * given to the next time the read processor is called).
415 stream_data_processor (void *cls,
416 enum GNUNET_STREAM_Status status,
420 struct IncomingSocket *incoming;
423 GNUNET_assert (GNUNET_STREAM_OK == status);
425 incoming = (struct IncomingSocket *) cls;
427 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_NO);
428 if (GNUNET_SYSERR == ret)
430 /* FIXME: handle this correctly */
435 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
436 &stream_data_processor, incoming);
438 /* we always read all data */
443 handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
447 struct GNUNET_HashCode *hash_src;
450 GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
452 if (NULL == cpi->strata)
454 cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
455 for (i = 0; i < STRATA_COUNT; i++)
456 cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
459 num_strata = ntohs (strata_msg->num_strata);
461 /* for correct message alignment, copy bucket types seperately */
462 hash_src = (struct GNUNET_HashCode *) &strata_msg[1];
464 for (i = 0; i < num_strata; i++)
466 memcpy (cpi->strata[cpi->strata_counter+i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
467 hash_src += STRATA_IBF_BUCKETS;
470 for (i = 0; i < num_strata; i++)
472 memcpy (cpi->strata[cpi->strata_counter+i]->id_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
473 hash_src += STRATA_IBF_BUCKETS;
476 count_src = (uint8_t *) hash_src;
478 for (i = 0; i < num_strata; i++)
480 uint8_t zero[STRATA_IBF_BUCKETS];
481 memset (zero, 0, STRATA_IBF_BUCKETS);
482 memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS);
483 count_src += STRATA_IBF_BUCKETS;
486 GNUNET_assert (count_src == (((uint8_t *) &strata_msg[1]) + STRATA_IBF_BUCKETS * num_strata * IBF_BUCKET_SIZE));
488 cpi->strata_counter += num_strata;
490 if (STRATA_COUNT == cpi->strata_counter)
493 diff = estimate_difference (cpi->session->strata, cpi->strata);
494 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "diff=%d\n", diff);
502 handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *strata)
509 handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct Element *strata)
516 handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
518 struct ConsensusSession *session;
519 session = sessions_head;
520 while (NULL != session)
522 if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
525 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n");
526 idx = get_peer_idx (inc->peer, session);
527 GNUNET_assert (-1 != idx);
528 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx);
529 inc->cpi = &session->info[idx];
530 GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
531 inc->cpi->mst = inc->mst;
532 inc->cpi->hello = GNUNET_YES;
533 inc->cpi->socket = inc->socket;
536 session = session->next;
544 * Functions with this signature are called whenever a
545 * complete message is received by the tokenizer.
547 * Do not call GNUNET_SERVER_mst_destroy in callback
550 * @param client identification of the client
551 * @param message the actual message
553 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
556 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
558 struct ConsensusPeerInformation *cpi;
559 cpi = (struct ConsensusPeerInformation *) cls;
560 switch (ntohs( message->type))
562 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
563 return handle_p2p_strata (cpi, (struct StrataMessage *) message);
564 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
565 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
566 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
567 return handle_p2p_element (cpi, (struct Element *) message);
569 /* FIXME: handle correctly */
577 * Handle tokenized messages from stream sockets.
578 * Delegate them if the socket belongs to a session,
579 * handle hello messages otherwise.
581 * Do not call GNUNET_SERVER_mst_destroy in callback
583 * @param cls closure, unused
584 * @param client incoming socket this message comes from
585 * @param message the actual message
587 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
590 mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
592 struct IncomingSocket *inc;
593 inc = (struct IncomingSocket *) client;
594 switch (ntohs( message->type))
596 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
597 return handle_p2p_hello (inc, (struct ConsensusHello *) message);
599 if (NULL != inc->cpi)
600 return mst_session_callback (inc->cpi, client, message);
601 /* FIXME: disconnect peer properly */
609 * Functions of this type are called upon new stream connection from other peers
610 * or upon binding error which happen when the app_port given in
611 * GNUNET_STREAM_listen() is already taken.
613 * @param cls the closure from GNUNET_STREAM_listen
614 * @param socket the socket representing the stream; NULL on binding error
615 * @param initiator the identity of the peer who wants to establish a stream
616 * with us; NULL on binding error
617 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
618 * stream (the socket will be invalid after the call)
621 listen_cb (void *cls,
622 struct GNUNET_STREAM_Socket *socket,
623 const struct GNUNET_PeerIdentity *initiator)
625 struct IncomingSocket *incoming;
627 GNUNET_assert (NULL != socket);
629 incoming = GNUNET_malloc (sizeof *incoming);
631 incoming->socket = socket;
632 incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
634 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
635 &stream_data_processor, incoming);
638 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
640 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
647 destroy_session (struct ConsensusSession *session)
649 /* FIXME: more stuff to free! */
650 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
651 GNUNET_SERVER_client_drop (session->client);
652 GNUNET_free (session);
657 * Disconnect a client, and destroy all sessions associated with it.
659 * @param client the client to disconnect
662 disconnect_client (struct GNUNET_SERVER_Client *client)
664 struct ConsensusSession *session;
665 GNUNET_SERVER_client_disconnect (client);
667 /* if the client owns a session, remove it */
668 session = sessions_head;
669 while (NULL != session)
671 if (client == session->client)
673 destroy_session (session);
676 session = session->next;
682 * Compute a global, (hopefully) unique consensus session id,
683 * from the local id of the consensus session, and the identities of all participants.
684 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
685 * exactly the same peers, the global id will be different.
687 * @param local_id local id of the consensus session
688 * @param peers array of all peers participating in the consensus session
689 * @param num_peers number of elements in the peers array
690 * @param dst where the result is stored, may not be NULL
693 compute_global_id (const struct GNUNET_HashCode *local_id,
694 const struct GNUNET_PeerIdentity *peers, int num_peers,
695 struct GNUNET_HashCode *dst)
698 struct GNUNET_HashCode tmp;
701 for (i = 0; i < num_peers; ++i)
703 GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
705 GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
712 * Function called to notify a client about the connection
713 * begin ready to queue more data. "buf" will be
714 * NULL and "size" zero if the connection was closed for
715 * writing in the meantime.
717 * @param cls consensus session
718 * @param size number of bytes available in buf
719 * @param buf where the callee should write the message
720 * @return number of bytes written to buf
723 transmit_queued (void *cls, size_t size,
726 struct ConsensusSession *session;
727 struct QueuedMessage *qmsg;
730 session = (struct ConsensusSession *) cls;
734 qmsg = session->client_messages_head;
735 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
736 GNUNET_assert (qmsg);
740 destroy_session (session);
744 msg_size = ntohs (qmsg->msg->size);
746 GNUNET_assert (size >= msg_size);
748 memcpy (buf, qmsg->msg, msg_size);
749 GNUNET_free (qmsg->msg);
759 * Schedule sending the next message (if there is any) to a client.
761 * @param cli the client to send the next message to
764 send_next (struct ConsensusSession *session)
767 GNUNET_assert (NULL != session);
769 if (NULL != session->th)
772 if (NULL != session->client_messages_head)
775 msize = ntohs (session->client_messages_head->msg->size);
776 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
777 session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize,
778 GNUNET_TIME_UNIT_FOREVER_REL,
779 &transmit_queued, session);
785 * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
786 * the correct signature to be used with e.g. qsort.
787 * We use this function instead.
789 * @param h1 some hash code
790 * @param h2 some hash code
791 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
794 hash_cmp (const void *a, const void *b)
796 return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b);
801 * Search peer in the list of peers in session.
803 * @param peer peer to find
804 * @param session session with peer
805 * @return index of peer, -1 if peer is not in session
808 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
810 const struct GNUNET_PeerIdentity *needle;
811 needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
814 return needle - session->peers;
820 hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
822 struct ConsensusPeerInformation *cpi;
824 cpi = (struct ConsensusPeerInformation *) cls;
825 cpi->hello = GNUNET_YES;
827 GNUNET_assert (GNUNET_STREAM_OK == status);
829 cpi = (struct ConsensusPeerInformation *) cls;
831 if (cpi->session->conclude_requested)
833 write_strata (cpi, GNUNET_STREAM_OK, 0);
839 * Functions of this type will be called when a stream is established
841 * @param cls the closure from GNUNET_STREAM_open
842 * @param socket socket to use to communicate with the other side (read/write)
845 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
847 struct ConsensusPeerInformation *cpi;
848 struct ConsensusHello *hello;
851 cpi = (struct ConsensusPeerInformation *) cls;
852 cpi->is_connected = GNUNET_YES;
854 hello = GNUNET_malloc (sizeof *hello);
855 hello->header.size = htons (sizeof *hello);
856 hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
857 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
861 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
867 initialize_session_info (struct ConsensusSession *session)
872 for (i = 0; i < session->num_peers; ++i)
874 /* initialize back-references, so consensus peer information can
875 * be used as closure */
876 session->info[i].session = session;
880 last = (session->local_peer_idx + (session->num_peers / 2)) % session->num_peers;
881 i = (session->local_peer_idx + 1) % session->num_peers;
884 session->info[i].is_outgoing = GNUNET_YES;
885 session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS,
886 open_cb, &session->info[i], GNUNET_STREAM_OPTION_END);
887 session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, session);
888 i = (i + 1) % session->num_peers;
890 // tie-breaker for even number of peers
891 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
893 session->info[last].is_outgoing = GNUNET_YES;
894 session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
895 open_cb, &session->info[last], GNUNET_STREAM_OPTION_END);
901 * Create the sorted list of peers for the session,
902 * add the local peer if not in the join message.
905 initialize_session_peer_list (struct ConsensusSession *session)
907 int local_peer_in_list;
909 const struct GNUNET_PeerIdentity *msg_peers;
912 GNUNET_assert (NULL != session->join_msg);
914 /* peers in the join message, may or may not include the local peer */
915 listed_peers = ntohs (session->join_msg->num_peers);
917 session->num_peers = listed_peers;
919 msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
921 local_peer_in_list = GNUNET_NO;
922 for (i = 0; i < listed_peers; i++)
924 if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
926 local_peer_in_list = GNUNET_YES;
931 if (GNUNET_NO == local_peer_in_list)
932 session->num_peers++;
934 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
936 if (GNUNET_NO == local_peer_in_list)
937 session->peers[session->num_peers - 1] = *my_peer;
939 memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
940 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
945 strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key)
950 /* count trailing '1'-bits of v */
951 for (i = 0; v & 1; v>>=1, i++);
953 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata insert at %d\n", i);
955 ibf_insert (strata[i], key);
960 * Initialize the session, continue receiving messages from the owning client
962 * @param session the session to initialize
965 initialize_session (struct ConsensusSession *session)
967 const struct ConsensusSession *other_session;
970 GNUNET_assert (NULL != session->join_msg);
972 initialize_session_peer_list (session);
974 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
976 compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id);
978 /* Check if some local client already owns the session. */
979 other_session = sessions_head;
980 while (NULL != other_session)
982 if ((other_session != session) &&
983 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
985 /* session already owned by another client */
987 disconnect_client (session->client);
990 other_session = other_session->next;
993 session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
995 session->local_peer_idx = get_peer_idx (my_peer, session);
996 GNUNET_assert (-1 != session->local_peer_idx);
998 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
1000 session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
1001 for (i = 0; i < STRATA_COUNT; i++)
1002 session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
1004 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1006 initialize_session_info (session);
1008 GNUNET_free (session->join_msg);
1009 session->join_msg = NULL;
1011 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1012 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1017 * Called when a client wants to join a consensus session.
1020 * @param client client that sent the message
1021 * @param m message sent by the client
1024 client_join (void *cls,
1025 struct GNUNET_SERVER_Client *client,
1026 const struct GNUNET_MessageHeader *m)
1028 struct ConsensusSession *session;
1030 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join received\n");
1032 // make sure the client has not already joined a session
1033 session = sessions_head;
1034 while (NULL != session)
1036 if (session->client == client)
1039 disconnect_client (client);
1042 session = session->next;
1045 session = GNUNET_malloc (sizeof (struct ConsensusSession));
1046 session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
1047 session->client = client;
1048 GNUNET_SERVER_client_keep (client);
1050 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1052 // Initialize session later if local peer identity is not known yet.
1053 if (NULL == my_peer)
1055 GNUNET_SERVER_disable_receive_done_warning (client);
1056 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init delayed\n");
1060 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init now\n");
1061 initialize_session (session);
1066 * Called when a client performs an insert operation.
1068 * @param cls (unused)
1069 * @param client client handle
1070 * @param message message sent by the client
1073 client_insert (void *cls,
1074 struct GNUNET_SERVER_Client *client,
1075 const struct GNUNET_MessageHeader *m)
1077 struct ConsensusSession *session;
1078 struct GNUNET_CONSENSUS_ElementMessage *msg;
1079 struct GNUNET_CONSENSUS_Element *element;
1080 struct GNUNET_HashCode key;
1083 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n");
1085 session = sessions_head;
1086 while (NULL != session)
1088 if (session->client == client)
1092 if (NULL == session)
1094 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
1095 GNUNET_SERVER_client_disconnect (client);
1099 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1100 element_size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1102 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
1104 element->type = msg->element_type;
1105 element->size = element_size;
1106 memcpy (&element[1], &msg[1], element_size);
1107 element->data = &element[1];
1109 GNUNET_CRYPTO_hash (element, element_size, &key);
1111 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1112 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1114 strata_insert (session->strata, &key);
1116 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1118 send_next (session);
1124 * Functions of this signature are called whenever writing operations
1125 * on a stream are executed
1127 * @param cls the closure from GNUNET_STREAM_write
1128 * @param status the status of the stream at the time this function is called;
1129 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1130 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1131 * (this doesn't mean that the data is never sent, the receiver may
1132 * have read the data but its ACKs may have been lost);
1133 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1134 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1136 * @param size the number of bytes written
1139 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1141 struct ConsensusPeerInformation *cpi;
1142 struct StrataMessage *strata_msg;
1145 struct GNUNET_HashCode *hash_dst;
1149 cpi = (struct ConsensusPeerInformation *) cls;
1151 GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
1153 /* FIXME: handle this */
1154 GNUNET_assert (GNUNET_STREAM_OK == status);
1156 if (STRATA_COUNT == cpi->strata_counter)
1158 /* strata have been written, wait for other side's IBF */
1162 if ((STRATA_COUNT - cpi->strata_counter) < STRATA_PER_MESSAGE)
1163 num_strata = (STRATA_COUNT - cpi->strata_counter);
1165 num_strata = STRATA_PER_MESSAGE;
1168 msize = (sizeof *strata_msg) + (num_strata * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1170 strata_msg = GNUNET_malloc (msize);
1171 strata_msg->header.size = htons (msize);
1172 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1173 strata_msg->num_strata = htons (num_strata);
1175 /* for correct message alignment, copy bucket types seperately */
1176 hash_dst = (struct GNUNET_HashCode *) &strata_msg[1];
1178 for (i = 0; i < num_strata; i++)
1180 memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1181 hash_dst += STRATA_IBF_BUCKETS;
1184 for (i = 0; i < num_strata; i++)
1186 memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->id_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1187 hash_dst += STRATA_IBF_BUCKETS;
1190 count_dst = (uint8_t *) hash_dst;
1192 for (i = 0; i < num_strata; i++)
1194 memcpy (count_dst, cpi->session->strata[cpi->strata_counter+i]->count, STRATA_IBF_BUCKETS);
1195 count_dst += STRATA_IBF_BUCKETS;
1198 cpi->strata_counter += num_strata;
1200 cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1203 GNUNET_assert (NULL != cpi->wh);
1208 * Functions of this signature are called whenever writing operations
1209 * on a stream are executed
1211 * @param cls the closure from GNUNET_STREAM_write
1212 * @param status the status of the stream at the time this function is called;
1213 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1214 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1215 * (this doesn't mean that the data is never sent, the receiver may
1216 * have read the data but its ACKs may have been lost);
1217 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1218 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1220 * @param size the number of bytes written
1223 write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1225 struct ConsensusPeerInformation *cpi;
1227 cpi = (struct ConsensusPeerInformation *) cls;
1232 * Functions of this signature are called whenever writing operations
1233 * on a stream are executed
1235 * @param cls the closure from GNUNET_STREAM_write
1236 * @param status the status of the stream at the time this function is called;
1237 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1238 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1239 * (this doesn't mean that the data is never sent, the receiver may
1240 * have read the data but its ACKs may have been lost);
1241 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1242 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1244 * @param size the number of bytes written
1247 write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1249 struct ConsensusPeerInformation *cpi;
1251 cpi = (struct ConsensusPeerInformation *) cls;
1256 * Called when a client performs the conclude operation.
1258 * @param cls (unused)
1259 * @param client client handle
1260 * @param message message sent by the client
1263 client_conclude (void *cls,
1264 struct GNUNET_SERVER_Client *client,
1265 const struct GNUNET_MessageHeader *message)
1267 struct ConsensusSession *session;
1270 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
1272 session = sessions_head;
1273 while ((session != NULL) && (session->client != client))
1274 session = session->next;
1275 if (NULL == session)
1277 /* client not found */
1279 GNUNET_SERVER_client_disconnect (client);
1283 if (GNUNET_YES == session->conclude_requested)
1285 /* client requested conclude twice */
1287 disconnect_client (client);
1291 session->conclude_requested = GNUNET_YES;
1293 /* FIXME: write to already connected sockets */
1295 for (i = 0; i < session->num_peers; i++)
1297 if ( (GNUNET_YES == session->info[i].is_outgoing) &&
1298 (GNUNET_YES == session->info[i].hello) )
1300 /* kick off transmitting strata by calling the write continuation */
1301 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
1306 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1307 send_next (session);
1312 * Called when a client sends an ack
1314 * @param cls (unused)
1315 * @param client client handle
1316 * @param message message sent by the client
1319 client_ack (void *cls,
1320 struct GNUNET_SERVER_Client *client,
1321 const struct GNUNET_MessageHeader *message)
1323 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n");
1327 * Task that disconnects from core.
1329 * @param cls core handle
1330 * @param tc context information (why was this task triggered now)
1333 disconnect_core (void *cls,
1334 const struct GNUNET_SCHEDULER_TaskContext *tc)
1336 GNUNET_CORE_disconnect (core);
1338 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
1343 core_startup (void *cls,
1344 struct GNUNET_CORE_Handle *core,
1345 const struct GNUNET_PeerIdentity *peer)
1347 struct ConsensusSession *session;
1349 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
1350 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
1351 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
1352 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
1354 session = sessions_head;
1355 while (NULL != session)
1357 if (NULL != session->join_msg)
1358 initialize_session (session);
1359 session = session->next;
1365 * Called to clean up, after a shutdown has been requested.
1367 * @param cls closure
1368 * @param tc context information (why was this task triggered now)
1371 shutdown_task (void *cls,
1372 const struct GNUNET_SCHEDULER_TaskContext *tc)
1374 while (NULL != sessions_head)
1376 struct ConsensusSession *session;
1377 session = sessions_head;
1378 sessions_head = sessions_head->next;
1379 GNUNET_free (session);
1384 GNUNET_CORE_disconnect (core);
1388 if (NULL != listener)
1390 GNUNET_STREAM_listen_close (listener);
1394 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
1399 * Start processing consensus requests.
1401 * @param cls closure
1402 * @param server the initialized server
1403 * @param c configuration to use
1406 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
1408 static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
1411 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1412 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1413 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1414 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1415 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
1416 {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
1417 sizeof (struct GNUNET_CONSENSUS_AckMessage)},
1424 GNUNET_SERVER_add_handlers (server, server_handlers);
1426 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1429 listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
1431 GNUNET_STREAM_OPTION_END);
1434 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
1435 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
1436 GNUNET_assert (NULL != core);
1438 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1439 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata per msg: %d\n", STRATA_PER_MESSAGE);
1444 * The main function for the consensus service.
1446 * @param argc number of arguments from the command line
1447 * @param argv command line arguments
1448 * @return 0 ok, 1 on error
1451 main (int argc, char *const *argv)
1454 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1455 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
1456 return (GNUNET_OK == ret) ? 0 : 1;