-simplify
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_common.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_consensus_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_stream_lib.h"
35
36 #include "consensus_protocol.h"
37 #include "consensus.h"
38 #include "ibf.h"
39 #include "strata_estimator.h"
40
41
42 /*
43  * Log macro that prefixes the local peer and the peer we are in contact with.
44  */
45 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
46    cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
47
48
49 /**
50  * Number of IBFs in a strata estimator.
51  */
52 #define SE_STRATA_COUNT 32
53 /**
54  * Size of the IBFs in the strata estimator.
55  */
56 #define SE_IBF_SIZE 80
57 /**
58  * hash num parameter for the difference digests and strata estimators
59  */
60 #define SE_IBF_HASH_NUM 3
61
62 /**
63  * Number of buckets that can be transmitted in one message.
64  */
65 #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
66
67 /**
68  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
69  * Choose this value so that computing the IBF is still cheaper
70  * than transmitting all values.
71  */
72 #define MAX_IBF_ORDER (16)
73
74 /**
75  * Number of exponential rounds, used in the inventory and completion round.
76  */
77 #define NUM_EXP_ROUNDS (4)
78
79
80 /* forward declarations */
81
82 /* mutual recursion with struct ConsensusSession */
83 struct ConsensusPeerInformation;
84
85 struct MessageQueue;
86
87 /* mutual recursion with round_over */
88 static void
89 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
90
91 /* mutial recursion with transmit_queued */
92 static void
93 client_send_next (struct MessageQueue *mq);
94
95 /* mutual recursion with mst_session_callback */
96 static void
97 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket);
98
99 static int
100 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message);
101
102
103 /**
104  * Additional information about a consensus element.
105  */
106 struct ElementInfo
107 {
108   /**
109    * The element itself.
110    */
111   struct GNUNET_CONSENSUS_Element *element;
112   /**
113    * Hash of the element
114    */
115   struct GNUNET_HashCode *element_hash;
116   /**
117    * Number of other peers that have the element in the inventory.
118    */
119   unsigned int inventory_count;
120   /**
121    * Bitmap of peers that have this element in their inventory
122    */
123   uint8_t *inventory_bitmap;
124 };
125
126
127 /**
128  * Describes the current round a consensus session is in.
129  */
130 enum ConsensusRound
131 {
132   /**
133    * Not started the protocol yet.
134    */
135   CONSENSUS_ROUND_BEGIN=0,
136   /**
137    * Distribution of elements with the exponential scheme.
138    */
139   CONSENSUS_ROUND_EXCHANGE,
140   /**
141    * Exchange which elements each peer has, but not the elements.
142    * This round uses the all-to-all scheme.
143    */
144   CONSENSUS_ROUND_INVENTORY,
145   /**
146    * Collect and distribute missing values with the exponential scheme.
147    */
148   CONSENSUS_ROUND_COMPLETION,
149   /**
150    * Consensus concluded. After timeout and finished communication with client,
151    * consensus session will be destroyed.
152    */
153   CONSENSUS_ROUND_FINISH
154 };
155
156 /* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */
157
158 /**
159  * State of another peer with respect to the
160  * current ibf.
161  */
162 enum ConsensusIBFState {
163   /**
164    * There is nothing going on with the IBF.
165    */
166   IBF_STATE_NONE=0,
167   /**
168    * We currently receive an ibf.
169    */
170   IBF_STATE_RECEIVING,
171   /*
172    * we decode a received ibf
173   */
174   IBF_STATE_DECODING,
175   /**
176    *  wait for elements and element requests
177    */
178   IBF_STATE_ANTICIPATE_DIFF
179 };
180
181
182 typedef void (*AddCallback) (struct MessageQueue *mq);
183 typedef void (*MessageSentCallback) (void *cls);
184
185
186 /**
187  * Collection of the state necessary to read and write gnunet messages 
188  * to a stream socket. Should be used as closure for stream_data_processor.
189  */
190 struct MessageStreamState
191 {
192   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
193   struct MessageQueue *mq;
194   void *mst_cls;
195   struct GNUNET_STREAM_Socket *socket;
196   struct GNUNET_STREAM_ReadHandle *rh;
197   struct GNUNET_STREAM_WriteHandle *wh;
198 };
199
200
201 struct ServerClientSocketState
202 {
203   struct GNUNET_SERVER_Client *client;
204   struct GNUNET_SERVER_TransmitHandle* th;
205 };
206
207
208 /**
209  * Generic message queue, for queueing outgoing messages.
210  */
211 struct MessageQueue
212 {
213   void *state;
214   AddCallback add_cb;
215   struct PendingMessage *pending_head;
216   struct PendingMessage *pending_tail;
217   struct PendingMessage *current_pm;
218 };
219
220
221 struct PendingMessage
222 {
223   struct GNUNET_MessageHeader *msg;
224   struct MessageQueue *parent_queue;
225   struct PendingMessage *next;
226   struct PendingMessage *prev;
227   MessageSentCallback sent_cb;
228   void *sent_cb_cls;
229 };
230
231
232 /**
233  * A consensus session consists of one local client and the remote authorities.
234  */
235 struct ConsensusSession
236 {
237   /**
238    * Consensus sessions are kept in a DLL.
239    */
240   struct ConsensusSession *next;
241
242   /**
243    * Consensus sessions are kept in a DLL.
244    */
245   struct ConsensusSession *prev;
246
247   /**
248    * Join message. Used to initialize the session later,
249    * if the identity of the local peer is not yet known.
250    * NULL if the session has been fully initialized.
251    */
252   struct GNUNET_CONSENSUS_JoinMessage *join_msg;
253
254   /**
255   * Global consensus identification, computed
256   * from the session id and participating authorities.
257   */
258   struct GNUNET_HashCode global_id;
259
260   /**
261    * The server's client and associated local state
262    */
263   struct ServerClientSocketState scss;
264
265   /**
266    * Queued messages to the client.
267    */
268   struct MessageQueue *client_mq;
269
270   /**
271    * IBF_Key -> 2^(HashCode*)
272    * FIXME:
273    * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *.
274    */
275   struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map;
276
277   /**
278    * Maps HashCodes to ElementInfos
279    */
280   struct GNUNET_CONTAINER_MultiHashMap *values;
281
282   /**
283    * Currently active transmit handle for sending to the client
284    */
285   struct GNUNET_SERVER_TransmitHandle *client_th;
286
287   /**
288    * Timeout for all rounds together, single rounds will schedule a timeout task
289    * with a fraction of the conclude timeout.
290    */
291   struct GNUNET_TIME_Relative conclude_timeout;
292   
293   /**
294    * Timeout task identifier for the current round
295    */
296   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
297
298   /**
299    * Number of other peers in the consensus
300    */
301   unsigned int num_peers;
302
303   /**
304    * Information about the other peers,
305    * their state, etc.
306    */
307   struct ConsensusPeerInformation *info;
308
309   /**
310    * GNUNET_YES if the client has called conclude.
311    * */
312   int conclude;
313
314   /**
315    * Index of the local peer in the peers array
316    */
317   unsigned int local_peer_idx;
318
319   /**
320    * Strata estimator, computed online
321    */
322   struct StrataEstimator *se;
323
324   /**
325    * Pre-computed IBFs
326    */
327   struct InvertibleBloomFilter **ibfs;
328
329   /**
330    * Current round
331    */
332   enum ConsensusRound current_round;
333
334   /**
335    * Permutation of peers for the current round,
336    * maps logical index (for current round) to physical index (location in info array)
337    */
338   int *shuffle;
339
340   int exp_round;
341
342   int exp_subround;
343
344   /**
345    * The partner for the current exp-round
346    */
347   struct ConsensusPeerInformation* partner_outgoing;
348
349   /**
350    * The partner for the current exp-round
351    */
352   struct ConsensusPeerInformation* partner_incoming;
353 };
354
355
356 /**
357  * Information about a peer that is in a consensus session.
358  */
359 struct ConsensusPeerInformation
360 {
361   /**
362    * Peer identitty of the peer in the consensus session
363    */
364   struct GNUNET_PeerIdentity peer_id;
365
366   /**
367    * Do we connect to the peer, or does the peer connect to us?
368    * Only valid for all-to-all phases
369    */
370   int is_outgoing;
371
372   /**
373    * Did we receive/send a consensus hello?
374    */
375   int hello;
376
377   /*
378    * FIXME
379    */
380   struct MessageStreamState mss;
381
382   /**
383    * Current state
384    */
385   enum ConsensusIBFState ibf_state;
386
387   /**
388    * What is the order (=log2 size) of the ibf
389    * we're currently dealing with?
390    * Interpretation depends on ibf_state.
391    */
392   int ibf_order;
393
394   /**
395    * The current IBF for this peer,
396    * purpose dependent on ibf_state
397    */
398   struct InvertibleBloomFilter *ibf;
399
400   /**
401    * How many buckets have we transmitted/received? 
402    * Interpretatin depends on ibf_state
403    */
404   int ibf_bucket_counter;
405
406   /**
407    * Strata estimator of the peer, NULL if our peer
408    * initiated the reconciliation.
409    */
410   struct StrataEstimator *se;
411
412   /**
413    * Back-reference to the consensus session,
414    * to that ConsensusPeerInformation can be used as a closure
415    */
416   struct ConsensusSession *session;
417
418   /**
419    * True if we are actually replaying the strata message,
420    * e.g. currently handling the premature_strata_message.
421    */
422   int replaying_strata_message;
423
424   /**
425    * A strata message that is not actually for the current round,
426    * used in the exp-scheme.
427    */
428   struct StrataMessage *premature_strata_message;
429
430   /**
431    * We have finishes the exp-subround with the peer.
432    */
433   int exp_subround_finished;
434
435   /**
436    * GNUNET_YES if we synced inventory with this peer;
437    * GNUNET_NO otherwise.
438    */
439   int inventory_synced;
440
441   /**
442    * Round this peer seems to be in, according to the last SE we got.
443    * Necessary to store this, as we sometimes need to respond to a request from an
444    * older round, while we are already in the next round.
445    */
446   enum ConsensusRound apparent_round;
447 };
448
449
450 /**
451  * Sockets from other peers who want to communicate with us.
452  * It may not be known yet which consensus session they belong to, we have to wait for the
453  * peer's hello.
454  * Also, the session might not exist yet locally, we have to wait for a local client to connect.
455  */
456 struct IncomingSocket
457 {
458   /**
459    * Incoming sockets are kept in a double linked list.
460    */
461   struct IncomingSocket *next;
462
463   /**
464    * Incoming sockets are kept in a double linked list.
465    */
466   struct IncomingSocket *prev;
467
468   /**
469    * Peer that connected to us with the socket.
470    */
471   struct GNUNET_PeerIdentity peer_id;
472
473   /**
474    * Peer-in-session this socket belongs to, once known, otherwise NULL.
475    */
476   struct ConsensusPeerInformation *cpi;
477
478   /**
479    * Set to the global session id, if the peer sent us a hello-message,
480    * but the session does not exist yet.
481    */
482   struct GNUNET_HashCode *requested_gid;
483
484   /*
485    * Timeout, will disconnect the socket if not yet in a session.
486    * FIXME: implement
487    */
488   GNUNET_SCHEDULER_TaskIdentifier timeout;
489
490   /* FIXME */
491   struct MessageStreamState mss;
492 };
493
494
495 /**
496  * Linked list of incoming sockets.
497  */
498 static struct IncomingSocket *incoming_sockets_head;
499
500 /**
501  * Linked list of incoming sockets.
502  */
503 static struct IncomingSocket *incoming_sockets_tail;
504
505 /**
506  * Linked list of sessions this peer participates in.
507  */
508 static struct ConsensusSession *sessions_head;
509
510 /**
511  * Linked list of sessions this peer participates in.
512  */
513 static struct ConsensusSession *sessions_tail;
514
515 /**
516  * Configuration of the consensus service.
517  */
518 static const struct GNUNET_CONFIGURATION_Handle *cfg;
519
520 /**
521  * Handle to the server for this service.
522  */
523 static struct GNUNET_SERVER_Handle *srv;
524
525 /**
526  * Peer that runs this service.
527  */
528 static struct GNUNET_PeerIdentity *my_peer;
529
530 /**
531  * Handle to the core service. Only used during service startup, will be NULL after that.
532  */
533 static struct GNUNET_CORE_Handle *core;
534
535 /**
536  * Listener for sockets from peers that want to reconcile with us.
537  */
538 static struct GNUNET_STREAM_ListenSocket *listener;
539
540
541 /**
542  * Transmit a queued message to the session's client.
543  *
544  * @param cls consensus session
545  * @param size number of bytes available in buf
546  * @param buf where the callee should write the message
547  * @return number of bytes written to buf
548  */
549 static size_t
550 transmit_queued (void *cls, size_t size,
551                  void *buf)
552 {
553   struct MessageQueue *mq = cls;
554   struct PendingMessage *pm = mq->pending_head;
555   struct ServerClientSocketState *state = mq->state;
556   size_t msg_size;
557
558   GNUNET_assert (NULL != pm);
559   GNUNET_assert (NULL != buf);
560   msg_size = ntohs (pm->msg->size);
561   GNUNET_assert (size >= msg_size);
562   memcpy (buf, pm->msg, msg_size);
563   GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
564   state->th = NULL;
565   client_send_next (cls);
566   GNUNET_free (pm);
567   return msg_size;
568 }
569
570
571 static void
572 client_send_next (struct MessageQueue *mq)
573 {
574   struct ServerClientSocketState *state = mq->state;
575   int msize;
576
577   GNUNET_assert (NULL != state);
578
579   if ( (NULL != state->th) ||
580        (NULL == mq->pending_head) )
581     return;
582   msize = ntohs (mq->pending_head->msg->size);
583   state->th = 
584       GNUNET_SERVER_notify_transmit_ready (state->client, msize, 
585                                            GNUNET_TIME_UNIT_FOREVER_REL,
586                                            &transmit_queued, mq);
587 }
588
589
590 struct MessageQueue *
591 create_message_queue_for_server_client (struct ServerClientSocketState *scss)
592 {
593   struct MessageQueue *mq;
594   mq = GNUNET_new (struct MessageQueue);
595   mq->add_cb = client_send_next;
596   mq->state = scss;
597   return mq;
598 }
599
600
601 /**
602  * Functions of this signature are called whenever writing operations
603  * on a stream are executed
604  *
605  * @param cls the closure from GNUNET_STREAM_write
606  * @param status the status of the stream at the time this function is called;
607  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
608  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
609  *          (this doesn't mean that the data is never sent, the receiver may
610  *          have read the data but its ACKs may have been lost);
611  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
612  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
613  *          be processed.
614  * @param size the number of bytes written
615  */
616 static void 
617 write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
618 {
619   struct MessageQueue *mq = cls;
620   struct MessageStreamState *mss = mq->state;
621   struct PendingMessage *pm;
622
623   GNUNET_assert (GNUNET_STREAM_OK == status);
624   
625   /* call cb for message we finished sending */
626   pm = mq->current_pm;
627   if (NULL != pm)
628   {
629     if (NULL != pm->sent_cb)
630       pm->sent_cb (pm->sent_cb_cls);
631     GNUNET_free (pm);
632   }
633
634   mss->wh = NULL;
635
636   pm = mq->pending_head;
637   mq->current_pm = pm;
638   if (NULL == pm)
639     return;
640   GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
641   mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size),
642                                  GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls);
643   GNUNET_assert (NULL != mss->wh);
644 }
645
646
647 static void
648 stream_socket_add_cb (struct MessageQueue *mq)
649 {
650   if (NULL != mq->current_pm)
651     return;
652   write_queued (mq, GNUNET_STREAM_OK, 0);
653 }
654
655
656 struct MessageQueue *
657 create_message_queue_for_stream_socket (struct MessageStreamState *mss)
658 {
659   struct MessageQueue *mq;
660   mq = GNUNET_new (struct MessageQueue);
661   mq->state = mss;
662   mq->add_cb = stream_socket_add_cb;
663   return mq;
664 }
665
666
667 struct PendingMessage *
668 new_pending_message (uint16_t size, uint16_t type)
669 {
670   struct PendingMessage *pm;
671   pm = GNUNET_malloc (sizeof *pm + size);
672   pm->msg = (void *) &pm[1];
673   pm->msg->size = htons (size);
674   pm->msg->type = htons (type);
675   return pm;
676 }
677
678
679 /**
680  * Queue a message in a message queue.
681  *
682  * @param queue the message queue
683  * @param pending message, message with additional information
684  */
685 void
686 message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg)
687 {
688   GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg);
689   queue->add_cb (queue);
690 }
691
692
693 /**
694  * Called when we receive data from a peer via stream.
695  *
696  * @param cls the closure from GNUNET_STREAM_read
697  * @param status the status of the stream at the time this function is called
698  * @param data traffic from the other side
699  * @param size the number of bytes available in data read; will be 0 on timeout 
700  * @return number of bytes of processed from 'data' (any data remaining should be
701  *         given to the next time the read processor is called).
702  */
703 static size_t
704 stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size)
705 {
706   struct MessageStreamState *mss = cls;
707   int ret;
708
709   mss->rh = NULL;
710
711   if (GNUNET_STREAM_OK != status)
712   {
713     /* FIXME: handle this correctly */
714     GNUNET_break (0);
715     return 0;
716   }
717   GNUNET_assert (NULL != mss->mst);
718   ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES);
719   if (GNUNET_SYSERR == ret)
720   {
721     /* FIXME: handle this correctly */
722     GNUNET_break (0);
723     return 0;
724   }
725   /* read again */
726   mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss);
727   /* we always read all data */
728   return size;
729 }
730
731
732 /**
733  * Send element or element report to the peer specified in cpi.
734  *
735  * @param cpi peer to send the elements to
736  * @param head head of the element list
737  */
738 static void
739 send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e)
740 {
741   struct PendingMessage *pm;
742
743   switch (cpi->apparent_round)
744   {
745     case CONSENSUS_ROUND_COMPLETION:
746     case CONSENSUS_ROUND_EXCHANGE:
747       pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size,
748                                 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
749       memcpy (&pm->msg[1], e->element->data, e->element->size);
750       message_queue_add (cpi->mss.mq, pm);
751       break;
752     case CONSENSUS_ROUND_INVENTORY:
753       pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode),
754                                 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
755       memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode));
756       message_queue_add (cpi->mss.mq, pm);
757       break;
758     default:
759       GNUNET_break (0);
760   }
761 }
762
763
764 /**
765  * Iterator to insert values into an ibf.
766  *
767  * @param cls closure
768  * @param key current key code
769  * @param value value in the hash map
770  * @return GNUNET_YES if we should continue to
771  *         iterate,
772  *         GNUNET_NO if not.
773  */
774 static int
775 ibf_values_iterator (void *cls,
776                      const struct GNUNET_HashCode *key,
777                      void *value)
778 {
779   struct ConsensusPeerInformation *cpi = cls;
780   struct ElementInfo *e = value;
781   struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash);
782
783   GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
784   ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
785   return GNUNET_YES;
786 }
787
788 /**
789  * Create and populate an IBF for the specified peer,
790  * if it does not already exist.
791  *
792  * @param cpi peer to create the ibf for
793  */
794 static void
795 prepare_ibf (struct ConsensusPeerInformation *cpi)
796 {
797   if (NULL != cpi->session->ibfs[cpi->ibf_order])
798     return;
799   cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
800   GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
801 }
802
803
804 /**
805  * Called when a remote peer wants to inform the local peer
806  * that the remote peer misses elements.
807  * Elements are not reconciled.
808  *
809  * @param cpi session
810  * @param msg message
811  */
812 static int
813 handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
814 {
815   GNUNET_assert (0);
816 }
817
818
819 static int
820 exp_subround_finished (const struct ConsensusSession *session)
821 {
822   int not_finished;
823   not_finished = 0;
824   if ( (NULL != session->partner_outgoing) && 
825        (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
826     not_finished++;
827   if ( (NULL != session->partner_incoming) &&
828        (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
829     not_finished++;
830   if (0 == not_finished)
831     return GNUNET_YES;
832   return GNUNET_NO;
833 }
834
835
836 static int
837 inventory_round_finished (struct ConsensusSession *session)
838 {
839   int i;
840   int finished;
841   finished = 0;
842   for (i = 0; i < session->num_peers; i++)
843     if (GNUNET_YES == session->info[i].inventory_synced)
844       finished++;
845   if (finished >= (session->num_peers / 2))
846     return GNUNET_YES;
847   return GNUNET_NO;
848 }
849
850
851 static void
852 clear_message_stream_state (struct MessageStreamState *mss)
853 {
854   if (NULL != mss->mst)
855   {
856     GNUNET_SERVER_mst_destroy (mss->mst);
857     mss->mst = NULL;
858   }
859   if (NULL != mss->rh)
860   {
861     GNUNET_STREAM_read_cancel (mss->rh);
862     mss->rh = NULL;
863   } 
864   if (NULL != mss->wh)
865   {
866     GNUNET_STREAM_write_cancel (mss->wh);
867     mss->wh = NULL;
868   } 
869   if (NULL != mss->socket)
870   {
871     GNUNET_STREAM_close (mss->socket);
872     mss->socket = NULL;
873   }
874   if (NULL != mss->mq)
875   {
876     GNUNET_free (mss->mq);
877     mss->mq = NULL;
878   }
879 }
880
881
882 /**
883  * Iterator over hash map entries.
884  *
885  * @param cls closure
886  * @param key current key code
887  * @param value value in the hash map
888  * @return GNUNET_YES if we should continue to
889  *         iterate,
890  *         GNUNET_NO if not.
891  */
892 static int
893 destroy_element_info_iter (void *cls,
894                            const struct GNUNET_HashCode * key,
895                            void *value)
896 {
897   struct ElementInfo *ei = value;
898   GNUNET_free (ei->element);
899   GNUNET_free (ei->element_hash);
900   GNUNET_free (ei);
901   return GNUNET_YES;
902 }
903
904
905 /**
906  * Destroy a session, free all resources associated with it.
907  * 
908  * @param session the session to destroy
909  */
910 static void
911 destroy_session (struct ConsensusSession *session)
912 {
913   int i;
914
915   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
916   GNUNET_SERVER_client_drop (session->scss.client);
917   session->scss.client = NULL;
918   if (NULL != session->client_mq)
919   {
920     GNUNET_free (session->client_mq);
921     session->client_mq = NULL;
922   }
923   if (NULL != session->shuffle)
924   {
925     GNUNET_free (session->shuffle);
926     session->shuffle = NULL;
927   }
928   if (NULL != session->se)
929   {
930     strata_estimator_destroy (session->se);
931     session->se = NULL;
932   }
933   if (NULL != session->info)
934   {
935     for (i = 0; i < session->num_peers; i++)
936     {
937       struct ConsensusPeerInformation *cpi;
938       cpi = &session->info[i];
939       clear_message_stream_state (&cpi->mss);
940       if (NULL != cpi->se)
941       {
942         strata_estimator_destroy (cpi->se);
943         cpi->se = NULL;
944       }
945       if (NULL != cpi->ibf)
946       {
947         ibf_destroy (cpi->ibf);
948         cpi->ibf = NULL;
949       }
950     }
951     GNUNET_free (session->info);
952     session->info = NULL;
953   }
954   if (NULL != session->ibfs)
955   {
956     for (i = 0; i <= MAX_IBF_ORDER; i++)
957     {
958       if (NULL != session->ibfs[i])
959       {
960         ibf_destroy (session->ibfs[i]);
961         session->ibfs[i] = NULL;
962       }
963     }
964     GNUNET_free (session->ibfs);
965     session->ibfs = NULL;
966   }
967   if (NULL != session->values)
968   {
969     GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL);
970     GNUNET_CONTAINER_multihashmap_destroy (session->values);
971     session->values = NULL;
972   }
973
974   if (NULL != session->ibf_key_map)
975   {
976     GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map);
977     session->ibf_key_map = NULL;
978   }
979   GNUNET_free (session);
980 }
981
982
983 static void
984 send_client_conclude_done (struct ConsensusSession *session)
985 {
986   struct PendingMessage *pm;
987
988   /* check if client is even there anymore */
989   if (NULL == session->scss.client)
990     return;
991   pm = new_pending_message (sizeof (struct GNUNET_MessageHeader),
992                             GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
993   message_queue_add (session->client_mq, pm);
994 }
995
996
997 /**
998  * Check if a strata message is for the current round or not
999  *
1000  * @param session session we are in
1001  * @param strata_msg the strata message to check
1002  * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise
1003  */
1004 static int
1005 is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg)
1006 {
1007   switch (strata_msg->round)
1008   {
1009     case CONSENSUS_ROUND_COMPLETION:
1010     case CONSENSUS_ROUND_EXCHANGE:
1011       /* here, we also have to compare subrounds */
1012       if ( (strata_msg->round != session->current_round) ||
1013            (strata_msg->exp_round != session->exp_round) ||
1014            (strata_msg->exp_subround != session->exp_subround) )
1015         return GNUNET_YES;
1016       break;
1017     default:
1018       if (session->current_round != strata_msg->round)
1019         return GNUNET_YES;
1020     break;
1021   }
1022   return GNUNET_NO;
1023 }
1024
1025
1026 /**
1027  * Send a strata estimator.
1028  *
1029  * @param cpi the peer
1030  */
1031 static void
1032 send_strata_estimator (struct ConsensusPeerInformation *cpi)
1033 {
1034   struct PendingMessage *pm;
1035   struct StrataMessage *strata_msg;
1036
1037   /* FIXME: why is this correct? */
1038   cpi->apparent_round = cpi->session->current_round;
1039   cpi->ibf_state = IBF_STATE_NONE;
1040   cpi->ibf_bucket_counter = 0;
1041
1042   LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round);
1043
1044   pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE),
1045                             GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1046   strata_msg = (struct StrataMessage *) pm->msg;
1047   strata_msg->round = cpi->session->current_round;
1048   strata_msg->exp_round = cpi->session->exp_round;
1049   strata_msg->exp_subround = cpi->session->exp_subround;
1050   strata_estimator_write (cpi->session->se, &strata_msg[1]);
1051   message_queue_add (cpi->mss.mq, pm);
1052 }
1053
1054
1055 /**
1056  * Send an IBF of the order specified in cpi.
1057  *
1058  * @param cpi the peer
1059  */
1060 static void
1061 send_ibf (struct ConsensusPeerInformation *cpi)
1062 {
1063   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
1064               cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1065
1066   cpi->ibf_bucket_counter = 0;
1067   while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
1068   {
1069     unsigned int num_buckets;
1070     struct PendingMessage *pm;
1071     struct DifferenceDigest *digest;
1072
1073     num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
1074     /* limit to maximum */
1075     if (num_buckets > BUCKETS_PER_MESSAGE)
1076       num_buckets = BUCKETS_PER_MESSAGE;
1077
1078     pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE),
1079                               GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1080     digest = (struct DifferenceDigest *) pm->msg;
1081     digest->order = cpi->ibf_order;
1082     digest->round = cpi->apparent_round;
1083     ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]);
1084     cpi->ibf_bucket_counter += num_buckets;
1085     message_queue_add (cpi->mss.mq, pm);
1086   }
1087   cpi->ibf_bucket_counter = 0;
1088   cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
1089 }
1090
1091
1092 /**
1093  * Called when a peer sends us its strata estimator.
1094  * In response, we sent out IBF of appropriate size back.
1095  *
1096  * @param cpi session
1097  * @param strata_msg message
1098  */
1099 static int
1100 handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
1101 {
1102   unsigned int diff;
1103
1104   if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) &&
1105        (strata_msg->round == CONSENSUS_ROUND_INVENTORY) )
1106   {
1107     /* we still have to handle this request appropriately */
1108     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n",
1109                 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1110   }
1111   else if (is_premature_strata_message (cpi->session, strata_msg))
1112   {
1113     if (GNUNET_NO == cpi->replaying_strata_message)
1114     {
1115       LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n",
1116               strata_msg->exp_round, strata_msg->exp_subround);
1117       cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header);
1118     }
1119     return GNUNET_YES;
1120   }
1121
1122   if (NULL == cpi->se)
1123     cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
1124
1125   cpi->apparent_round = strata_msg->round;
1126
1127   if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
1128   {
1129     LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n");
1130     return GNUNET_NO;
1131   }
1132   strata_estimator_read (&strata_msg[1], cpi->se);
1133   GNUNET_assert (NULL != cpi->session->se);
1134   diff = strata_estimator_difference (cpi->session->se, cpi->se);
1135
1136   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
1137               cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff);
1138
1139   switch (cpi->session->current_round)
1140   {
1141     case CONSENSUS_ROUND_EXCHANGE:
1142     case CONSENSUS_ROUND_INVENTORY:
1143     case CONSENSUS_ROUND_COMPLETION:
1144       /* send IBF of the right size */
1145       cpi->ibf_order = 0;
1146       while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) )
1147         cpi->ibf_order++;
1148       if (cpi->ibf_order > MAX_IBF_ORDER)
1149         cpi->ibf_order = MAX_IBF_ORDER;
1150       cpi->ibf_order += 1;
1151       /* create ibf if not already pre-computed */
1152       prepare_ibf (cpi);
1153       if (NULL != cpi->ibf)
1154         ibf_destroy (cpi->ibf);
1155       cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1156       cpi->ibf_bucket_counter = 0;
1157       send_ibf (cpi);
1158       break;
1159     default:
1160       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n",
1161                   cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1162       break;
1163   }
1164   return GNUNET_YES;
1165 }
1166
1167
1168
1169 static int
1170 send_elements_iterator (void *cls,
1171                         const struct GNUNET_HashCode * key,
1172                         void *value)
1173 {
1174   struct ConsensusPeerInformation *cpi = cls;
1175   struct ElementInfo *ei;
1176   ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value);
1177   if (NULL == ei)
1178   {
1179     LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n",
1180             GNUNET_h2s((struct GNUNET_HashCode *) value));
1181     return GNUNET_YES;
1182   }
1183   LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n");
1184   send_element_or_report (cpi, ei);
1185   return GNUNET_YES;
1186 }
1187
1188
1189 /**
1190  * Decode the current diff ibf, and send elements/requests/reports/
1191  *
1192  * @param cpi partner peer
1193  */
1194 static void
1195 decode (struct ConsensusPeerInformation *cpi)
1196 {
1197   struct IBF_Key key;
1198   int side;
1199
1200   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1201
1202   while (1)
1203   {
1204     int res;
1205
1206     res = ibf_decode (cpi->ibf, &side, &key);
1207     if (GNUNET_SYSERR == res)
1208     {
1209       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
1210       /* decoding failed, we tell the other peer by sending our ibf with a larger order */
1211       cpi->ibf_order++;
1212       prepare_ibf (cpi);
1213       cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1214       cpi->ibf_bucket_counter = 0;
1215       send_ibf (cpi);
1216       return;
1217     }
1218     if (GNUNET_NO == res)
1219     {
1220       struct PendingMessage *pm;
1221       struct ConsensusRoundMessage *rmsg;
1222       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx);
1223
1224       pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
1225       rmsg = (struct ConsensusRoundMessage *) pm->msg;
1226       rmsg->round = cpi->apparent_round;
1227       message_queue_add (cpi->mss.mq, pm);
1228       return;
1229     }
1230     if (-1 == side)
1231     {
1232       struct GNUNET_HashCode hashcode;
1233       /* we have the element(s), send it to the other peer */
1234       ibf_hashcode_from_key (key, &hashcode);
1235       GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
1236     }
1237     else
1238     {
1239       struct PendingMessage *pm;
1240       uint16_t type;
1241
1242       switch (cpi->apparent_round)
1243       {
1244         case CONSENSUS_ROUND_COMPLETION:
1245           /* FIXME: check if we really want to request the element */
1246         case CONSENSUS_ROUND_EXCHANGE:
1247         case CONSENSUS_ROUND_INVENTORY:
1248           type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST;
1249           break;
1250         default:
1251           GNUNET_assert (0);
1252       }
1253       pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key),
1254                                 type);
1255       *(struct IBF_Key *) &pm->msg[1] = key;
1256       message_queue_add (cpi->mss.mq, pm);
1257     }
1258   }
1259 }
1260
1261
1262 static int
1263 handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
1264 {
1265   int num_buckets;
1266
1267   /* FIXME: find out if we're still expecting the same ibf! */
1268
1269   cpi->apparent_round = cpi->session->current_round;
1270   // FIXME: check header.size >= sizeof (DD)
1271   num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
1272   switch (cpi->ibf_state)
1273   {
1274     case IBF_STATE_NONE:
1275       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1276       cpi->ibf_state = IBF_STATE_RECEIVING;
1277       cpi->ibf_order = digest->order;
1278       cpi->ibf_bucket_counter = 0;
1279       if (NULL != cpi->ibf)
1280       {
1281         ibf_destroy (cpi->ibf);
1282         cpi->ibf = NULL;
1283       }
1284       break;
1285     case IBF_STATE_ANTICIPATE_DIFF:
1286       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n",
1287                   cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1288       cpi->ibf_state = IBF_STATE_RECEIVING;
1289       cpi->ibf_order = digest->order;
1290       cpi->ibf_bucket_counter = 0;
1291       if (NULL != cpi->ibf)
1292       {
1293         ibf_destroy (cpi->ibf);
1294         cpi->ibf = NULL;
1295       }
1296       break;
1297     case IBF_STATE_RECEIVING:
1298       break;
1299     default:
1300       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1301       return GNUNET_YES;
1302   }
1303
1304   if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
1305   {
1306     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1307     return GNUNET_YES;
1308   }
1309
1310   if (NULL == cpi->ibf)
1311     cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
1312
1313   ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
1314   cpi->ibf_bucket_counter += num_buckets;
1315
1316   if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1317   {
1318     cpi->ibf_state = IBF_STATE_DECODING;
1319     cpi->ibf_bucket_counter = 0;
1320     prepare_ibf (cpi);
1321     ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
1322     decode (cpi);
1323   }
1324   return GNUNET_YES;
1325 }
1326
1327
1328 /**
1329  * Insert an element into the consensus set of the specified session.
1330  * The element will not be copied, and freed when destroying the session.
1331  *
1332  * @param session session for new element
1333  * @param element element to insert
1334  */
1335 static void
1336 insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
1337 {
1338   struct GNUNET_HashCode hash;
1339   struct ElementInfo *e;
1340   struct IBF_Key ibf_key;
1341   int i;
1342
1343   e = GNUNET_new (struct ElementInfo);
1344   e->element = element;
1345   e->element_hash = GNUNET_new (struct GNUNET_HashCode);
1346   GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash);
1347   ibf_key = ibf_key_from_hashcode (e->element_hash);
1348   ibf_hashcode_from_key (ibf_key, &hash);
1349   strata_estimator_insert (session->se, &hash);
1350   GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e,
1351                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1352   GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash,
1353                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1354
1355   for (i = 0; i <= MAX_IBF_ORDER; i++)
1356   {
1357     if (NULL == session->ibfs[i])
1358       continue;
1359     ibf_insert (session->ibfs[i], ibf_key);
1360   }
1361 }
1362
1363
1364 /**
1365  * Handle an element that another peer sent us
1366  */
1367 static int
1368 handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
1369 {
1370   struct GNUNET_CONSENSUS_Element *element;
1371   size_t size;
1372
1373   switch (cpi->session->current_round)
1374   {
1375     case CONSENSUS_ROUND_COMPLETION:
1376       /* FIXME: check if we really expect the element */
1377     case CONSENSUS_ROUND_EXCHANGE:
1378       break;
1379     default:
1380       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n");
1381       return GNUNET_YES;
1382   }
1383
1384   size = ntohs (element_msg->size) - sizeof *element_msg;
1385
1386   element = GNUNET_malloc (size + sizeof *element);
1387   element->size = size;
1388   memcpy (&element[1], &element_msg[1], size);
1389   element->data = &element[1];
1390
1391   LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n");
1392
1393   insert_element (cpi->session, element);
1394
1395   return GNUNET_YES;
1396 }
1397
1398
1399 /**
1400  * Handle a request for elements.
1401  * 
1402  * @param cpi peer that is requesting the element
1403  * @param msg the element request message
1404  */
1405 static int
1406 handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
1407 {
1408   struct GNUNET_HashCode hashcode;
1409   struct IBF_Key *ibf_key;
1410   unsigned int num;
1411
1412   /* element requests are allowed in every round */
1413
1414   num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
1415   
1416   ibf_key = (struct IBF_Key *) &msg[1];
1417   while (num--)
1418   {
1419     ibf_hashcode_from_key (*ibf_key, &hashcode);
1420     GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
1421     ibf_key++;
1422   }
1423   return GNUNET_YES;
1424 }
1425
1426 static int
1427 is_peer_connected (struct ConsensusPeerInformation *cpi)
1428 {
1429   if (NULL == cpi->mss.socket)
1430     return GNUNET_NO;
1431   return GNUNET_YES;
1432 }
1433
1434
1435 static void
1436 ensure_peer_connected (struct ConsensusPeerInformation *cpi)
1437 {
1438   if (NULL != cpi->mss.socket)
1439     return;
1440   cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
1441                                         open_cb, cpi, GNUNET_STREAM_OPTION_END);
1442 }
1443
1444
1445 /**
1446  * If necessary, send a message to the peer, depending on the current
1447  * round.
1448  */
1449 static void
1450 embrace_peer (struct ConsensusPeerInformation *cpi)
1451 {
1452   if (GNUNET_NO == is_peer_connected (cpi))
1453   {
1454     ensure_peer_connected (cpi);
1455     return;
1456   }
1457   if (GNUNET_NO == cpi->hello)
1458     return;
1459   /* FIXME: correctness of switch */
1460   switch (cpi->session->current_round)
1461   {
1462     case CONSENSUS_ROUND_EXCHANGE:
1463     case CONSENSUS_ROUND_INVENTORY:
1464       if (cpi->session->partner_outgoing != cpi)
1465         break;
1466       /* fallthrough */
1467     case CONSENSUS_ROUND_COMPLETION:
1468         send_strata_estimator (cpi);
1469     default:
1470       break;
1471   }
1472 }
1473
1474
1475 /**
1476  * Called when stream has finishes writing the hello message
1477  */
1478 static void
1479 hello_cont (void *cls)
1480 {
1481   struct ConsensusPeerInformation *cpi = cls;
1482
1483   cpi->hello = GNUNET_YES;
1484   embrace_peer (cpi);
1485 }
1486
1487
1488 /**
1489  * Called when we established a stream connection to another peer
1490  *
1491  * @param cls cpi of the peer we just connected to
1492  * @param socket socket to use to communicate with the other side (read/write)
1493  */
1494 static void
1495 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1496 {
1497   struct ConsensusPeerInformation *cpi = cls;
1498   struct PendingMessage *pm;
1499   struct ConsensusHello *hello;
1500
1501   GNUNET_assert (NULL == cpi->mss.mst);
1502   GNUNET_assert (NULL == cpi->mss.mq);
1503
1504   cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss);
1505   cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
1506   cpi->mss.mst_cls = cpi;
1507
1508   pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1509   hello = (struct ConsensusHello *) pm->msg;
1510   memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1511   pm->sent_cb = hello_cont;
1512   pm->sent_cb_cls = cpi;
1513   message_queue_add (cpi->mss.mq, pm);
1514   cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1515                                 &stream_data_processor, &cpi->mss);
1516 }
1517
1518
1519 static void
1520 replay_premature_message (struct ConsensusPeerInformation *cpi)
1521 {
1522   if (NULL != cpi->premature_strata_message)
1523   {
1524     struct StrataMessage *sm;
1525
1526     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
1527     sm = cpi->premature_strata_message;
1528     cpi->premature_strata_message = NULL;
1529
1530     cpi->replaying_strata_message = GNUNET_YES;
1531     handle_p2p_strata (cpi, sm);
1532     cpi->replaying_strata_message = GNUNET_NO;
1533
1534     GNUNET_free (sm);
1535   }
1536 }
1537
1538
1539 /**
1540  * Start the inventory round, contact all peers we are supposed to contact.
1541  *
1542  * @param session the current session
1543  */
1544 static void
1545 start_inventory (struct ConsensusSession *session)
1546 {
1547   int i;
1548   int last;
1549
1550   for (i = 0; i < session->num_peers; i++)
1551   {
1552     session->info[i].ibf_bucket_counter = 0;
1553     session->info[i].ibf_state = IBF_STATE_NONE;
1554     session->info[i].is_outgoing = GNUNET_NO;
1555   }
1556
1557   last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1558   i = (session->local_peer_idx + 1) % session->num_peers;
1559   while (i != last)
1560   {
1561     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
1562     session->info[i].is_outgoing = GNUNET_YES;
1563     embrace_peer (&session->info[i]);
1564     i = (i + 1) % session->num_peers;
1565   }
1566   // tie-breaker for even number of peers
1567   if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
1568   {
1569     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
1570     session->info[last].is_outgoing = GNUNET_YES;
1571     embrace_peer (&session->info[last]);
1572   }
1573
1574   for (i = 0; i < session->num_peers; i++)
1575   {
1576     if (GNUNET_NO == session->info[i].is_outgoing)
1577       replay_premature_message (&session->info[i]);
1578   }
1579 }
1580
1581
1582 /**
1583  * Iterator over hash map entries.
1584  *
1585  * @param cls closure
1586  * @param key current key code
1587  * @param value value in the hash map
1588  * @return GNUNET_YES if we should continue to
1589  *         iterate,
1590  *         GNUNET_NO if not.
1591  */
1592 static int
1593 send_client_elements_iter (void *cls,
1594                            const struct GNUNET_HashCode * key,
1595                            void *value)
1596 {
1597   struct ConsensusSession *session = cls;
1598   struct ElementInfo *ei = value;
1599   struct PendingMessage *pm;
1600
1601   /* is the client still there? */
1602   if (NULL == session->scss.client)
1603     return GNUNET_NO;
1604
1605   pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size,
1606                             GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
1607   message_queue_add (session->client_mq, pm);
1608   return GNUNET_YES;
1609 }
1610
1611
1612
1613 /**
1614  * Start the next round.
1615  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
1616  *
1617  * @param cls the session
1618  * @param tc task context, for when this task is invoked by the scheduler,
1619  *           NULL if invoked for another reason
1620  */
1621 static void 
1622 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1623 {
1624   struct ConsensusSession *session;
1625
1626   /* don't kick off next round if we're shutting down */
1627   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1628     return;
1629
1630   session = cls;
1631   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx);
1632
1633   if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
1634   {
1635     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
1636     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
1637   }
1638
1639   switch (session->current_round)
1640   {
1641     case CONSENSUS_ROUND_BEGIN:
1642       session->current_round = CONSENSUS_ROUND_EXCHANGE;
1643       session->exp_round = 0;
1644       subround_over (session, NULL);
1645       break;
1646     case CONSENSUS_ROUND_EXCHANGE:
1647       /* handle two peers specially */
1648       if (session->num_peers <= 2)
1649       {
1650         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx);
1651         GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session);
1652         send_client_conclude_done (session);
1653         session->current_round = CONSENSUS_ROUND_FINISH;
1654         return;
1655       }
1656       session->current_round = CONSENSUS_ROUND_INVENTORY;
1657       start_inventory (session);
1658       break;
1659     case CONSENSUS_ROUND_INVENTORY:
1660       session->current_round = CONSENSUS_ROUND_COMPLETION;
1661       session->exp_round = 0;
1662       subround_over (session, NULL);
1663       break;
1664     case CONSENSUS_ROUND_COMPLETION:
1665       session->current_round = CONSENSUS_ROUND_FINISH;
1666       send_client_conclude_done (session);
1667       break;
1668     default:
1669       GNUNET_assert (0);
1670   }
1671 }
1672
1673
1674 static void
1675 fin_sent_cb (void *cls)
1676 {
1677   struct ConsensusPeerInformation *cpi;
1678   cpi = cls;
1679   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
1680   switch (cpi->session->current_round)
1681   {
1682     case CONSENSUS_ROUND_EXCHANGE:
1683     case CONSENSUS_ROUND_COMPLETION:
1684       if (cpi->session->current_round != cpi->apparent_round)
1685       {
1686         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx);
1687         break;
1688       }
1689       cpi->exp_subround_finished = GNUNET_YES;
1690       /* the subround is only really over if *both* partners are done */
1691       if (GNUNET_YES == exp_subround_finished (cpi->session))
1692         subround_over (cpi->session, NULL);
1693       else
1694         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx);
1695       break;
1696     case CONSENSUS_ROUND_INVENTORY:
1697       cpi->inventory_synced = GNUNET_YES;
1698       if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round)
1699         round_over (cpi->session, NULL);
1700       /* FIXME: maybe go to next round */
1701       break;
1702     default:
1703       GNUNET_break (0);
1704   }
1705 }
1706
1707
1708 /**
1709  * The other peer wants us to inform that he sent us all the elements we requested.
1710  */
1711 static int
1712 handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
1713 {
1714   struct ConsensusRoundMessage *round_msg;
1715   round_msg = (struct ConsensusRoundMessage *) msg;
1716   /* FIXME: only call subround_over if round is the current one! */
1717   switch (cpi->session->current_round)
1718   {
1719     case CONSENSUS_ROUND_EXCHANGE:
1720     case CONSENSUS_ROUND_COMPLETION:
1721       if (cpi->session->current_round != round_msg->round)
1722       {
1723         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1724         cpi->ibf_state = IBF_STATE_NONE;
1725         cpi->ibf_bucket_counter = 0;
1726         break;
1727       }
1728       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1729       cpi->exp_subround_finished = GNUNET_YES;
1730       if (GNUNET_YES == exp_subround_finished (cpi->session))
1731         subround_over (cpi->session, NULL);
1732       else
1733         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx);
1734     break;
1735     case CONSENSUS_ROUND_INVENTORY:
1736       cpi->inventory_synced = GNUNET_YES;
1737       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1738       if (inventory_round_finished (cpi->session))
1739         round_over (cpi->session, NULL);
1740       break;
1741     default:
1742       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
1743       break;
1744   }
1745   return GNUNET_YES;
1746 }
1747
1748
1749 /**
1750  * Gets called when the other peer wants us to inform that
1751  * it has decoded our ibf and sent us all elements / requests
1752  */
1753 static int
1754 handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
1755 {
1756   struct PendingMessage *pm;
1757   struct ConsensusRoundMessage *fin_msg;
1758
1759   /* FIXME: why handle current round?? */
1760   switch (cpi->session->current_round)
1761   {
1762     case CONSENSUS_ROUND_INVENTORY:
1763       cpi->inventory_synced = GNUNET_YES;
1764     case CONSENSUS_ROUND_COMPLETION:
1765     case CONSENSUS_ROUND_EXCHANGE:
1766       LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n");
1767       pm = new_pending_message (sizeof *fin_msg,
1768                                 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
1769       fin_msg = (struct ConsensusRoundMessage *) pm->msg;
1770       fin_msg->round = cpi->apparent_round;
1771       /* the subround is over once we kicked off sending the fin msg */
1772       /* FIXME: assert we are talking to the right peer! */
1773       /* FIXME: mark peer as synced */
1774       pm->sent_cb = fin_sent_cb;
1775       pm->sent_cb_cls = cpi;
1776       message_queue_add (cpi->mss.mq, pm);
1777       break;
1778     default:
1779       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
1780       break;
1781   }
1782   return GNUNET_YES;
1783 }
1784
1785
1786 /**
1787  * Functions with this signature are called whenever a
1788  * complete message is received by the tokenizer.
1789  *
1790  * Do not call GNUNET_SERVER_mst_destroy in callback
1791  *
1792  * @param cls closure
1793  * @param client identification of the client
1794  * @param message the actual message
1795  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1796  */
1797 static int
1798 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
1799 {
1800   struct ConsensusPeerInformation *cpi = cls;
1801   GNUNET_assert (NULL == client);
1802   GNUNET_assert (NULL != cls);
1803   switch (ntohs (message->type))
1804   {
1805     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
1806       return handle_p2p_strata (cpi, (struct StrataMessage *) message);
1807     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
1808       return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
1809     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
1810       return handle_p2p_element (cpi, message);
1811     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT:
1812       return handle_p2p_element_report (cpi, message);
1813     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
1814       return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
1815     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED:
1816       return handle_p2p_synced (cpi, message);
1817     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN:
1818       return handle_p2p_fin (cpi, message);
1819     default:
1820       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n",
1821                   ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey));
1822   }
1823   return GNUNET_OK;
1824 }
1825
1826
1827 static void
1828 shuffle (struct ConsensusSession *session)
1829 {
1830   /* adapted from random_permute in util/crypto_random.c */
1831   /* FIXME
1832   unsigned int *ret;
1833   unsigned int i;
1834   unsigned int tmp;
1835   uint32_t x;
1836
1837   GNUNET_assert (n > 0);
1838   ret = GNUNET_malloc (n * sizeof (unsigned int));
1839   for (i = 0; i < n; i++)
1840     ret[i] = i;
1841   for (i = n - 1; i > 0; i--)
1842   {
1843     x = GNUNET_CRYPTO_random_u32 (mode, i + 1);
1844     tmp = ret[x];
1845     ret[x] = ret[i];
1846     ret[i] = tmp;
1847   }
1848   */
1849 }
1850
1851
1852 /**
1853  * Find and set the partner_incoming and partner_outgoing of our peer,
1854  * one of them may not exist in most cases.
1855  *
1856  * @param session the consensus session
1857  */
1858 static void
1859 find_partners (struct ConsensusSession *session)
1860 {
1861   int mark[session->num_peers];
1862   int i;
1863   memset (mark, 0, session->num_peers * sizeof (int));
1864   session->partner_incoming = session->partner_outgoing = NULL;
1865   for (i = 0; i < session->num_peers; i++)
1866   {
1867     int arc;
1868     if (0 != mark[i])
1869       continue;
1870     arc = (i + (1 << session->exp_subround)) % session->num_peers;
1871     mark[i] = mark[arc] = 1;
1872     GNUNET_assert (i != arc);
1873     if (i == session->local_peer_idx)
1874     {
1875       GNUNET_assert (NULL == session->partner_outgoing);
1876       session->partner_outgoing = &session->info[session->shuffle[arc]];
1877       session->partner_outgoing->exp_subround_finished = GNUNET_NO;
1878     }
1879     if (arc == session->local_peer_idx)
1880     {
1881       GNUNET_assert (NULL == session->partner_incoming);
1882       session->partner_incoming = &session->info[session->shuffle[i]];
1883       session->partner_incoming->exp_subround_finished = GNUNET_NO;
1884     }
1885   }
1886 }
1887
1888
1889 /**
1890  * Do the next subround in the exp-scheme.
1891  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
1892  *
1893  * @param cls the session
1894  * @param tc task context, for when this task is invoked by the scheduler,
1895  *           NULL if invoked for another reason
1896  */
1897 static void
1898 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1899 {
1900   struct ConsensusSession *session;
1901   int i;
1902
1903   /* don't kick off next subround if we're shutting down */
1904   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1905     return;
1906   session = cls;
1907   /* cancel timeout */
1908   if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
1909     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
1910   session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
1911   /* check if we are done with the log phase, 2-peer consensus only does one log round */
1912   if ( (session->exp_round == NUM_EXP_ROUNDS) ||
1913        ((session->num_peers == 2) && (session->exp_round == 1)))
1914   {
1915     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
1916     round_over (session, NULL);
1917     return;
1918   }
1919   if (session->exp_round == 0)
1920   {
1921     /* initialize everything for the log-rounds */
1922     session->exp_round = 1;
1923     session->exp_subround = 0;
1924     if (NULL == session->shuffle)
1925       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
1926     for (i = 0; i < session->num_peers; i++)
1927       session->shuffle[i] = i;
1928   }
1929   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
1930   {
1931     /* subrounds done, start new log-round */
1932     session->exp_round++;
1933     session->exp_subround = 0;
1934     shuffle (session);
1935   }
1936   else 
1937   {
1938     session->exp_subround++;
1939   }
1940
1941   find_partners (session);
1942
1943 #ifdef GNUNET_EXTRA_LOGGING
1944   {
1945     int in;
1946     int out;
1947     if (session->partner_outgoing == NULL)
1948       out = -1;
1949     else
1950       out = (int) (session->partner_outgoing - session->info);
1951     if (session->partner_incoming == NULL)
1952       in = -1;
1953     else
1954       in = (int) (session->partner_incoming - session->info);
1955     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
1956                 session->exp_round, session->exp_subround, in, out);
1957   }
1958 #endif /* GNUNET_EXTRA_LOGGING */
1959
1960   if (NULL != session->partner_incoming)
1961   {
1962     session->partner_incoming->ibf_state = IBF_STATE_NONE;
1963     session->partner_incoming->exp_subround_finished = GNUNET_NO;
1964     session->partner_incoming->ibf_bucket_counter = 0;
1965
1966     /* maybe there's an early strata estimator? */
1967     replay_premature_message (session->partner_incoming);
1968   }
1969
1970   if (NULL != session->partner_outgoing)
1971   {
1972     session->partner_outgoing->ibf_state = IBF_STATE_NONE;
1973     session->partner_outgoing->ibf_bucket_counter = 0;
1974     session->partner_outgoing->exp_subround_finished = GNUNET_NO;
1975     /* make sure peer is connected and send the SE */
1976     embrace_peer (session->partner_outgoing);
1977   }
1978
1979   /*
1980   session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
1981                                                                    subround_over, session);
1982   */
1983 }
1984
1985
1986 /**
1987  * Search peer in the list of peers in session.
1988  *
1989  * @param peer peer to find
1990  * @param session session with peer
1991  * @return index of peer, -1 if peer is not in session
1992  */
1993 static int
1994 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
1995 {
1996   int i;
1997   for (i = 0; i < session->num_peers; i++)
1998     if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
1999       return i;
2000   return -1;
2001 }
2002
2003
2004 /**
2005  * Handle a HELLO-message, send when another peer wants to join a session where
2006  * our peer is a member. The session may or may not be inhabited yet.
2007  */
2008 static int
2009 handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
2010 {
2011   struct ConsensusSession *session;
2012
2013   if (NULL != inc->requested_gid)
2014   {
2015     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n");
2016     return GNUNET_YES;
2017   }
2018   if (NULL != inc->cpi)
2019   {
2020     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n");
2021     return GNUNET_YES;
2022   }
2023
2024   for (session = sessions_head; NULL != session; session = session->next)
2025   {
2026     int idx;
2027     struct ConsensusPeerInformation *cpi;
2028     if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
2029       continue;
2030     idx = get_peer_idx (&inc->peer_id, session);
2031     GNUNET_assert (-1 != idx);
2032     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx);
2033     cpi = &session->info[idx];
2034     inc->cpi = cpi;
2035     cpi->mss = inc->mss;
2036     cpi = &session->info[idx];
2037     cpi->hello = GNUNET_YES;
2038     cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss);
2039     embrace_peer (cpi);
2040     return GNUNET_YES;        
2041   }
2042   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n");
2043   inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode));
2044   return GNUNET_YES;
2045 }
2046
2047
2048
2049 /**
2050  * Handle tokenized messages from stream sockets.
2051  * Delegate them if the socket belongs to a session,
2052  * handle hello messages otherwise.
2053  *
2054  * Do not call GNUNET_SERVER_mst_destroy in callback
2055  *
2056  * @param cls closure, unused
2057  * @param client incoming socket this message comes from
2058  * @param message the actual message
2059  *
2060  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
2061  */
2062 static int
2063 mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
2064 {
2065   struct IncomingSocket *inc;
2066   GNUNET_assert (NULL == client);
2067   GNUNET_assert (NULL != cls);
2068   inc = (struct IncomingSocket *) cls;
2069   switch (ntohs( message->type))
2070   {
2071     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
2072       return handle_p2p_hello (inc, (struct ConsensusHello *) message);
2073     default:
2074       if (NULL != inc->cpi)
2075         return mst_session_callback (inc->cpi, client, message);
2076       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n",
2077                   ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey));
2078   }
2079   return GNUNET_OK;
2080 }
2081
2082
2083 /**
2084  * Functions of this type are called upon new stream connection from other peers
2085  * or upon binding error which happen when the app_port given in
2086  * GNUNET_STREAM_listen() is already taken.
2087  *
2088  * @param cls the closure from GNUNET_STREAM_listen
2089  * @param socket the socket representing the stream; NULL on binding error
2090  * @param initiator the identity of the peer who wants to establish a stream
2091  *            with us; NULL on binding error
2092  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
2093  *             stream (the socket will be invalid after the call)
2094  */
2095 static int
2096 listen_cb (void *cls,
2097            struct GNUNET_STREAM_Socket *socket,
2098            const struct GNUNET_PeerIdentity *initiator)
2099 {
2100   struct IncomingSocket *incoming;
2101
2102   if (NULL == socket)
2103   {
2104     GNUNET_break (0);
2105     return GNUNET_SYSERR;
2106   }
2107   incoming = GNUNET_malloc (sizeof *incoming);
2108   incoming->peer_id = *initiator;
2109   incoming->mss.socket = socket;
2110   incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
2111                                      &stream_data_processor, &incoming->mss);
2112   incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
2113   incoming->mss.mst_cls = incoming;
2114   GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
2115   return GNUNET_OK;
2116 }
2117
2118
2119 /**
2120  * Disconnect a client, and destroy all sessions associated with it.
2121  *
2122  * @param client the client to disconnect
2123  */
2124 static void
2125 disconnect_client (struct GNUNET_SERVER_Client *client)
2126 {
2127   struct ConsensusSession *session;
2128   GNUNET_SERVER_client_disconnect (client);
2129   
2130   /* if the client owns a session, remove it */
2131   session = sessions_head;
2132   while (NULL != session)
2133   {
2134     if (client == session->scss.client)
2135     {
2136       destroy_session (session);
2137       break;
2138     }
2139     session = session->next;
2140   }
2141 }
2142
2143
2144 /**
2145  * Compute a global, (hopefully) unique consensus session id,
2146  * from the local id of the consensus session, and the identities of all participants.
2147  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2148  * exactly the same peers, the global id will be different.
2149  *
2150  * @param session session to generate the global id for
2151  * @param session_id local id of the consensus session
2152  */
2153 static void
2154 compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id)
2155 {
2156   int i;
2157   struct GNUNET_HashCode tmp;
2158
2159   session->global_id = *session_id;
2160   for (i = 0; i < session->num_peers; ++i)
2161   {
2162     GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp);
2163     session->global_id = tmp;
2164     GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
2165     session->global_id = tmp;
2166   }
2167 }
2168
2169
2170 /**
2171  * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
2172  * the correct signature to be used with e.g. qsort.
2173  * We use this function instead.
2174  *
2175  * @param h1 some hash code
2176  * @param h2 some hash code
2177  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2178  */
2179 static int
2180 hash_cmp (const void *h1, const void *h2)
2181 {
2182   return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
2183 }
2184
2185
2186 /**
2187  * Create the sorted list of peers for the session,
2188  * add the local peer if not in the join message.
2189  */
2190 static void
2191 initialize_session_peer_list (struct ConsensusSession *session)
2192 {
2193   unsigned int local_peer_in_list;
2194   uint32_t listed_peers;
2195   const struct GNUNET_PeerIdentity *msg_peers;
2196   struct GNUNET_PeerIdentity *peers;
2197   unsigned int i;
2198
2199   GNUNET_assert (NULL != session->join_msg);
2200
2201   /* peers in the join message, may or may not include the local peer */
2202   listed_peers = ntohl (session->join_msg->num_peers);
2203   
2204   session->num_peers = listed_peers;
2205
2206   msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
2207
2208   local_peer_in_list = GNUNET_NO;
2209   for (i = 0; i < listed_peers; i++)
2210   {
2211     if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
2212     {
2213       local_peer_in_list = GNUNET_YES;
2214       break;
2215     }
2216   }
2217
2218   if (GNUNET_NO == local_peer_in_list)
2219     session->num_peers++;
2220
2221   peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2222
2223   if (GNUNET_NO == local_peer_in_list)
2224     peers[session->num_peers - 1] = *my_peer;
2225
2226   memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2227   qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
2228
2229   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
2230
2231   for (i = 0; i < session->num_peers; ++i)
2232   {
2233     /* initialize back-references, so consensus peer information can
2234      * be used as closure */
2235     session->info[i].session = session;
2236     session->info[i].peer_id = peers[i];
2237   }
2238
2239   free (peers);
2240 }
2241
2242
2243 /**
2244  * Add incoming peer connections to the session,
2245  * for peers who have connected to us before the local session has been established
2246  *
2247  * @param session ...
2248  */
2249 static void
2250 add_incoming_peers (struct ConsensusSession *session)
2251 {
2252   struct IncomingSocket *inc;
2253   int i;
2254   struct ConsensusPeerInformation *cpi;
2255
2256   for (inc = incoming_sockets_head; NULL != inc; inc = inc->next)
2257   {
2258     if ( (NULL == inc->requested_gid) ||
2259          (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) )
2260       continue;
2261     for (i = 0; i < session->num_peers; i++)
2262     {
2263       cpi = &session->info[i];
2264       cpi->peer_id = inc->peer_id;
2265       cpi->mss = inc->mss;
2266       cpi->hello = GNUNET_YES;
2267       inc->cpi = cpi;
2268       break;
2269     }
2270   }
2271 }
2272
2273
2274 /**
2275  * Initialize the session, continue receiving messages from the owning client
2276  *
2277  * @param session the session to initialize
2278  */
2279 static void
2280 initialize_session (struct ConsensusSession *session)
2281 {
2282   struct ConsensusSession *other_session;
2283
2284   GNUNET_assert (NULL != session->join_msg);
2285   initialize_session_peer_list (session);
2286   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
2287   compute_global_id (session, &session->join_msg->session_id);
2288
2289   /* Check if some local client already owns the session. */
2290   other_session = sessions_head;
2291   while (NULL != other_session)
2292   {
2293     if ((other_session != session) && 
2294         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2295     {
2296       if (GNUNET_NO == other_session->conclude)
2297       {
2298         GNUNET_break (0);
2299         destroy_session (session);
2300         return;
2301       }
2302       GNUNET_SERVER_client_drop (other_session->scss.client);
2303       other_session->scss.client = NULL;
2304       break;
2305     }
2306     other_session = other_session->next;
2307   }
2308
2309   session->local_peer_idx = get_peer_idx (my_peer, session);
2310   GNUNET_assert (-1 != session->local_peer_idx);
2311   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
2312   GNUNET_free (session->join_msg);
2313   session->join_msg = NULL;
2314   add_incoming_peers (session);
2315   GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK);
2316   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2317 }
2318
2319
2320 /**
2321  * Called when a client wants to join a consensus session.
2322  *
2323  * @param cls unused
2324  * @param client client that sent the message
2325  * @param m message sent by the client
2326  */
2327 static void
2328 client_join (void *cls,
2329              struct GNUNET_SERVER_Client *client,
2330              const struct GNUNET_MessageHeader *m)
2331 {
2332   struct ConsensusSession *session;
2333
2334   // make sure the client has not already joined a session
2335   session = sessions_head;
2336   while (NULL != session)
2337   {
2338     if (session->scss.client == client)
2339     {
2340       GNUNET_break (0);
2341       disconnect_client (client);
2342       return;
2343     }
2344     session = session->next;
2345   }
2346
2347   session = GNUNET_new (struct ConsensusSession);
2348   session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
2349   /* these have to be initialized here, as the client can already start to give us values */
2350   session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
2351   session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2352   session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2353   session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
2354   session->scss.client = client;
2355   session->client_mq = create_message_queue_for_server_client (&session->scss);
2356   GNUNET_SERVER_client_keep (client);
2357
2358   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
2359
2360   // Initialize session later if local peer identity is not known yet.
2361   if (NULL == my_peer)
2362   {
2363     GNUNET_SERVER_disable_receive_done_warning (client);
2364     return;
2365   }
2366
2367   initialize_session (session);
2368 }
2369
2370
2371
2372
2373 /**
2374  * Called when a client performs an insert operation.
2375  *
2376  * @param cls (unused)
2377  * @param client client handle
2378  * @param m message sent by the client
2379  */
2380 void
2381 client_insert (void *cls,
2382              struct GNUNET_SERVER_Client *client,
2383              const struct GNUNET_MessageHeader *m)
2384 {
2385   struct ConsensusSession *session;
2386   struct GNUNET_CONSENSUS_ElementMessage *msg;
2387   struct GNUNET_CONSENSUS_Element *element;
2388   int element_size;
2389
2390   session = sessions_head;
2391   while (NULL != session)
2392   {
2393     if (session->scss.client == client)
2394       break;
2395   }
2396
2397   if (NULL == session)
2398   {
2399     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
2400     GNUNET_SERVER_client_disconnect (client);
2401     return;
2402   }
2403
2404   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
2405   element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
2406   element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
2407   element->type = msg->element_type;
2408   element->size = element_size;
2409   memcpy (&element[1], &msg[1], element_size);
2410   element->data = &element[1];
2411   GNUNET_assert (NULL != element->data);
2412   insert_element (session, element);
2413
2414   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2415 }
2416
2417
2418 /**
2419  * Called when a client performs the conclude operation.
2420  *
2421  * @param cls (unused)
2422  * @param client client handle
2423  * @param message message sent by the client
2424  */
2425 static void
2426 client_conclude (void *cls,
2427                  struct GNUNET_SERVER_Client *client,
2428                  const struct GNUNET_MessageHeader *message)
2429 {
2430   struct ConsensusSession *session;
2431   struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
2432
2433   cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
2434
2435   session = sessions_head;
2436   while ((session != NULL) && (session->scss.client != client))
2437     session = session->next;
2438   if (NULL == session)
2439   {
2440     /* client not found */
2441     GNUNET_break (0);
2442     GNUNET_SERVER_client_disconnect (client);
2443     return;
2444   }
2445
2446   if (CONSENSUS_ROUND_BEGIN != session->current_round)
2447   {
2448     /* client requested conclude twice */
2449     GNUNET_break (0);
2450     /* client may still own a session, destroy it */
2451     disconnect_client (client);
2452     return;
2453   }
2454
2455   session->conclude = GNUNET_YES;
2456
2457   if (session->num_peers <= 1)
2458   {
2459     send_client_conclude_done (session);
2460   }
2461   else
2462   {
2463     session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
2464     /* the 'begin' round is over, start with the next, real round */
2465     round_over (session, NULL);
2466   }
2467
2468   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2469 }
2470
2471
2472 /**
2473  * Task that disconnects from core.
2474  *
2475  * @param cls core handle
2476  * @param tc context information (why was this task triggered now)
2477  */
2478 static void
2479 disconnect_core (void *cls,
2480                  const struct GNUNET_SCHEDULER_TaskContext *tc)
2481 {
2482   if (core != NULL)
2483   {
2484     GNUNET_CORE_disconnect (core);
2485     core = NULL;
2486   }
2487   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
2488 }
2489
2490
2491 static void
2492 core_startup (void *cls,
2493               struct GNUNET_CORE_Handle *core,
2494               const struct GNUNET_PeerIdentity *peer)
2495 {
2496   struct ConsensusSession *session;
2497
2498   my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
2499   /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
2500   GNUNET_SCHEDULER_add_now (&disconnect_core, core);
2501   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
2502   /* initialize sessions that are waiting for the local peer identity */
2503   for (session = sessions_head; NULL != session; session = session->next)
2504     if (NULL != session->join_msg)
2505       initialize_session (session);
2506 }
2507
2508
2509 /**
2510  * Called to clean up, after a shutdown has been requested.
2511  *
2512  * @param cls closure
2513  * @param tc context information (why was this task triggered now)
2514  */
2515 static void
2516 shutdown_task (void *cls,
2517                const struct GNUNET_SCHEDULER_TaskContext *tc)
2518 {
2519   while (NULL != incoming_sockets_head)
2520   {
2521     struct IncomingSocket *socket;
2522     socket = incoming_sockets_head;
2523     if (NULL == socket->cpi)
2524       clear_message_stream_state (&socket->mss);
2525     incoming_sockets_head = incoming_sockets_head->next;
2526     GNUNET_free (socket);
2527   }
2528
2529   while (NULL != sessions_head)
2530   {
2531     struct ConsensusSession *session;
2532     session = sessions_head->next;
2533     destroy_session (sessions_head);
2534     sessions_head = session;
2535   }
2536
2537   if (NULL != core)
2538   {
2539     GNUNET_CORE_disconnect (core);
2540     core = NULL;
2541   }
2542
2543   if (NULL != listener)
2544   {
2545     GNUNET_STREAM_listen_close (listener);
2546     listener = NULL;
2547   } 
2548
2549   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
2550 }
2551
2552
2553 /**
2554  * Start processing consensus requests.
2555  *
2556  * @param cls closure
2557  * @param server the initialized server
2558  * @param c configuration to use
2559  */
2560 static void
2561 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
2562 {
2563   /* core is only used to retrieve the peer identity */
2564   static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
2565     {NULL, 0, 0}
2566   };
2567   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2568     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
2569     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
2570     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
2571         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
2572     {NULL, NULL, 0, 0}
2573   };
2574
2575   cfg = c;
2576   srv = server;
2577
2578   GNUNET_SERVER_add_handlers (server, server_handlers);
2579
2580   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
2581
2582   listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
2583                                    &listen_cb, NULL,
2584                                    GNUNET_STREAM_OPTION_END);
2585
2586   /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
2587   core = GNUNET_CORE_connect (c, NULL, 
2588                               &core_startup, NULL, 
2589                               NULL, NULL, GNUNET_NO, NULL, 
2590                               GNUNET_NO, core_handlers);
2591   GNUNET_assert (NULL != core);
2592   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
2593 }
2594
2595
2596 /**
2597  * The main function for the consensus service.
2598  *
2599  * @param argc number of arguments from the command line
2600  * @param argv command line arguments
2601  * @return 0 ok, 1 on error
2602  */
2603 int
2604 main (int argc, char *const *argv)
2605 {
2606   int ret;
2607   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
2608   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
2609   return (GNUNET_OK == ret) ? 0 : 1;
2610 }
2611