d223360dcbc9f89355e59af5e65f4c7869481f93
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief 
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_common.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_consensus_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_stream_lib.h"
35 #include "consensus_protocol.h"
36 #include "ibf.h"
37 #include "consensus.h"
38
39
40 /**
41  * Number of IBFs in a strata estimator.
42  */
43 #define STRATA_COUNT 32
44 /**
45  * Number of buckets per IBF.
46  */
47 #define STRATA_IBF_BUCKETS 80
48 /**
49  * hash num parameter for the difference digests and strata estimators
50  */
51 #define STRATA_HASH_NUM 3
52
53 /**
54  * Number of buckets that can be transmitted in one message.
55  */
56 #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
57
58 /**
59  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60  * Choose this value so that computing the IBF is still cheaper
61  * than transmitting all values.
62  */
63 #define MAX_IBF_ORDER (32)
64
65
66 /* forward declarations */
67
68 struct ConsensusSession;
69 struct IncomingSocket;
70 struct ConsensusPeerInformation;
71
72 static void
73 send_next (struct ConsensusSession *session);
74
75 static void 
76 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
77
78 static void 
79 write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size);
80
81 static void 
82 write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size);
83
84 static int
85 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
86
87
88 /**
89  * An element that is waiting to be transmitted.
90  */
91 struct PendingElement
92 {
93   /**
94    * Pending elements are kept in a DLL.
95    */
96   struct PendingElement *next;
97
98   /**
99    * Pending elements are kept in a DLL.
100    */
101   struct PendingElement *prev;
102
103   /**
104    * The actual element
105    */
106   struct GNUNET_CONSENSUS_Element *element;
107
108   /* peer this element is coming from */
109   struct ConsensusPeerInformation *cpi;
110 };
111
112 /**
113  * Information about a peer that is in a consensus session.
114  */
115 struct ConsensusPeerInformation
116 {
117   struct GNUNET_STREAM_Socket *socket;
118
119   /**
120    * Is socket's connection established, i.e. can we write to it?
121    * Only relevent on outgoing cpi.
122    */
123   int is_connected;
124
125   /**
126    * Type of the peer in the all-to-all rounds,
127    * GNUNET_YES if we initiate reconciliation.
128    */
129   int is_outgoing;
130
131   /**
132    * if the peer did something wrong, and was disconnected,
133    * never interact with this peer again.
134    */
135   int is_bad;
136
137   /**
138    * Did we receive/send a consensus hello?
139    */
140   int hello;
141
142   /**
143    * Handle for currently active read
144    */
145   struct GNUNET_STREAM_ReadHandle *rh;
146
147   /**
148    * Handle for currently active read
149    */
150   struct GNUNET_STREAM_WriteHandle *wh;
151
152   enum {
153     IBF_STATE_NONE,
154     IBF_STATE_RECEIVING,
155     IBF_STATE_TRANSMITTING,
156     IBF_STATE_DECODING
157   } ibf_state ;
158
159   /**
160    * What is the order (=log2 size) of the ibf
161    * we're currently dealing with?
162    */
163   int ibf_order;
164
165   /**
166    * The current IBF for this peer,
167    * purpose dependent on ibf_state
168    */
169   struct InvertibleBloomFilter *ibf;
170
171   /**
172    * How many buckets have we transmitted/received (depending on state)?
173    */
174   int ibf_bucket_counter;
175
176   /**
177    * Strata estimator of the peer, NULL if our peer
178    * initiated the reconciliation.
179    */
180   struct InvertibleBloomFilter **strata;
181
182   /**
183    * difference estimated with the current strata estimator
184    */
185   unsigned int diff;
186
187   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
188
189   /**
190    * Back-reference to the consensus session,
191    * to that ConsensusPeerInformation can be used as a closure
192    */
193   struct ConsensusSession *session;
194
195   struct PendingElement *send_pending_head;
196   struct PendingElement *send_pending_tail;
197 };
198
199 struct QueuedMessage
200 {
201   struct GNUNET_MessageHeader *msg;
202
203   /**
204    * Queued messages are stored in a doubly linked list.
205    */
206   struct QueuedMessage *next;
207
208   /**
209    * Queued messages are stored in a doubly linked list.
210    */
211   struct QueuedMessage *prev;
212 };
213
214 enum ConsensusRound
215 {
216   /**
217    * distribution of information with the exponential scheme
218    */
219   CONSENSUS_ROUND_EXP_EXCHANGE,
220   /**
221    * All-to-all, exchange missing values
222    */
223   CONSENSUS_ROUND_A2A_EXCHANGE,
224   /**
225    * All-to-all, check what values are missing, don't exchange anything
226    */
227   CONSENSUS_ROUND_A2A_INVENTORY
228
229   /*
230   a round to exchange the information for fraud-detection
231   CONSENSUS_ROUNT_A2_INVENTORY_AGREEMENT
232   */
233 };
234
235
236 /**
237  * A consensus session consists of one local client and the remote authorities.
238  *
239  */
240 struct ConsensusSession
241 {
242   /**
243    * Consensus sessions are kept in a DLL.
244    */
245   struct ConsensusSession *next;
246
247   /**
248    * Consensus sessions are kept in a DLL.
249    */
250   struct ConsensusSession *prev;
251
252   /**
253    * Join message. Used to initialize the session later,
254    * if the identity of the local peer is not yet known.
255    * NULL if the session has been fully initialized.
256    */
257   struct GNUNET_CONSENSUS_JoinMessage *join_msg;
258
259   /**
260   * Global consensus identification, computed
261   * from the local id and participating authorities.
262   */
263   struct GNUNET_HashCode global_id;
264
265   /**
266    * Local client in this consensus session.
267    * There is only one client per consensus session.
268    */
269   struct GNUNET_SERVER_Client *client;
270
271   /**
272    * Values in the consensus set of this session,
273    * all of them either have been sent by or approved by the client.
274    * Contains GNUNET_CONSENSUS_Element.
275    */
276   struct GNUNET_CONTAINER_MultiHashMap *values;
277
278   /**
279    * Elements that have not been approved (or rejected) by the client yet.
280    */
281   struct PendingElement *approval_pending_head;
282
283   /**
284    * Elements that have not been approved (or rejected) by the client yet.
285    */
286   struct PendingElement *approval_pending_tail;
287
288   /**
289    * Messages to be sent to the local client that owns this session
290    */
291   struct QueuedMessage *client_messages_head;
292
293   /**
294    * Messages to be sent to the local client that owns this session
295    */
296   struct QueuedMessage *client_messages_tail;
297
298   /**
299    * Currently active transmit handle for sending to the client
300    */
301   struct GNUNET_SERVER_TransmitHandle *th;
302
303   /**
304    * Once conclude_requested is GNUNET_YES, the client may not
305    * insert any more values.
306    */
307   int conclude_requested;
308
309   /**
310    * Minimum number of peers to form a consensus group
311    */
312   int conclude_group_min;
313
314   /**
315    * Number of other peers in the consensus
316    */
317   unsigned int num_peers;
318
319   /**
320    * Information about the other peers,
321    * their state, etc.
322    */
323   struct ConsensusPeerInformation *info;
324
325   /**
326    * Sorted array of peer identities in this consensus session,
327    * includes the local peer.
328    */
329   struct GNUNET_PeerIdentity *peers;
330
331   /**
332    * Index of the local peer in the peers array
333    */
334   int local_peer_idx;
335
336   /**
337    * Strata estimator, computed online
338    */
339   struct InvertibleBloomFilter **strata;
340
341   /**
342    * Pre-computed IBFs
343    */
344   struct InvertibleBloomFilter **ibfs;
345
346   /**
347    * Current round
348    */
349   enum ConsensusRound current_round;
350 };
351
352
353 /**
354  * Sockets from other peers who want to communicate with us.
355  * It may not be known yet which consensus session they belong to.
356  */
357 struct IncomingSocket
358 {
359   /**
360    * Incoming sockets are kept in a double linked list.
361    */
362   struct IncomingSocket *next;
363
364   /**
365    * Incoming sockets are kept in a double linked list.
366    */
367   struct IncomingSocket *prev;
368
369   /**
370    * The actual socket.
371    */
372   struct GNUNET_STREAM_Socket *socket;
373
374   /**
375    * Handle for currently active read
376    */
377   struct GNUNET_STREAM_ReadHandle *rh;
378
379   /**
380    * Peer that connected to us with the socket.
381    */
382   struct GNUNET_PeerIdentity *peer;
383
384   /**
385    * Message stream tokenizer for this socket.
386    */
387   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
388
389   /**
390    * Peer-in-session this socket belongs to, once known, otherwise NULL.
391    */
392   struct ConsensusPeerInformation *cpi;
393
394   /**
395    * Set to the global session id, if the peer sent us a hello-message,
396    * but the session does not exist yet.
397    *
398    * FIXME: not implemented yet
399    */
400   struct GNUNET_HashCode *requested_gid;
401 };
402
403 static struct IncomingSocket *incoming_sockets_head;
404 static struct IncomingSocket *incoming_sockets_tail;
405
406 /**
407  * Linked list of sesstions this peer participates in.
408  */
409 static struct ConsensusSession *sessions_head;
410
411 /**
412  * Linked list of sesstions this peer participates in.
413  */
414 static struct ConsensusSession *sessions_tail;
415
416 /**
417  * Configuration of the consensus service.
418  */
419 static const struct GNUNET_CONFIGURATION_Handle *cfg;
420
421 /**
422  * Handle to the server for this service.
423  */
424 static struct GNUNET_SERVER_Handle *srv;
425
426 /**
427  * Peer that runs this service.
428  */
429 static struct GNUNET_PeerIdentity *my_peer;
430
431 /**
432  * Handle to the core service. Only used during service startup, will be NULL after that.
433  */
434 static struct GNUNET_CORE_Handle *core;
435
436 /**
437  * Listener for sockets from peers that want to reconcile with us.
438  */
439 static struct GNUNET_STREAM_ListenSocket *listener;
440
441
442 /**
443  * Queue a message to be sent to the inhabiting client of a sessino
444  *
445  * @param session session
446  * @param msg message we want to queue
447  */
448 static void
449 queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg)
450 {
451   struct QueuedMessage *qm;
452   qm = GNUNET_malloc (sizeof *qm);
453   qm->msg = msg;
454   GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm);
455 }
456
457 /**
458  * Get peer index associated with the peer information,
459  * unique for every session among all peers.
460  */
461 static int
462 get_cpi_index (struct ConsensusPeerInformation *cpi)
463 {
464   return cpi - cpi->session->info;
465 }
466
467 /**
468  * Mark the peer as bad, free as state we don't need anymore.
469  */
470 static void
471 mark_peer_bad (struct ConsensusPeerInformation *cpi)
472 {
473   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%u marked as bad\n", get_cpi_index (cpi));
474   cpi->is_bad = GNUNET_YES;
475   /* FIXME: free ibfs etc. */
476 }
477
478
479 /**
480  * Estimate set difference with two strata estimators,
481  * i.e. arrays of IBFs.
482  */
483 static int
484 estimate_difference (struct InvertibleBloomFilter** strata1,
485                      struct InvertibleBloomFilter** strata2)
486 {
487   int i;
488   int count;
489   count = 0;
490   for (i = STRATA_COUNT - 1; i >= 0; i--)
491   {
492     struct InvertibleBloomFilter *diff;
493     int ibf_count;
494     int more;
495     ibf_count = 0;
496     diff = ibf_dup (strata1[i]);
497     ibf_subtract (diff, strata2[i]);
498     for (;;)
499     {
500       more = ibf_decode (diff, NULL, NULL);
501       if (GNUNET_NO == more)
502       {
503         count += ibf_count;
504         break;
505       }
506       if (GNUNET_SYSERR == more)
507       {
508         ibf_destroy (diff);
509         return count * (1 << (i + 1));
510       }
511       ibf_count++;
512     }
513     ibf_destroy (diff);
514   }
515   return count;
516 }
517
518
519 /**
520  * Called when receiving data from a peer that is member of
521  * an inhabited consensus session.
522  *
523  * @param cls the closure from GNUNET_STREAM_read
524  * @param status the status of the stream at the time this function is called
525  * @param data traffic from the other side
526  * @param size the number of bytes available in data read; will be 0 on timeout 
527  * @return number of bytes of processed from 'data' (any data remaining should be
528  *         given to the next time the read processor is called).
529  */
530 static size_t
531 session_stream_data_processor (void *cls,
532                        enum GNUNET_STREAM_Status status,
533                        const void *data,
534                        size_t size)
535 {
536   struct ConsensusPeerInformation *cpi;
537   int ret;
538
539   GNUNET_assert (GNUNET_STREAM_OK == status);
540
541   cpi = cls;
542
543   GNUNET_assert (NULL != cpi->mst);
544
545   ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
546   if (GNUNET_SYSERR == ret)
547   {
548     /* FIXME: handle this correctly */
549     GNUNET_assert (0);
550   }
551
552   /* read again */
553   cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
554                                 &session_stream_data_processor, cpi);
555
556   /* we always read all data */
557   return size;
558 }
559
560 /**
561  * Called when we receive data from a peer that is not member of
562  * a session yet, or the session is not yet inhabited.
563  *
564  * @param cls the closure from GNUNET_STREAM_read
565  * @param status the status of the stream at the time this function is called
566  * @param data traffic from the other side
567  * @param size the number of bytes available in data read; will be 0 on timeout 
568  * @return number of bytes of processed from 'data' (any data remaining should be
569  *         given to the next time the read processor is called).
570  */
571 static size_t
572 incoming_stream_data_processor (void *cls,
573                        enum GNUNET_STREAM_Status status,
574                        const void *data,
575                        size_t size)
576 {
577   struct IncomingSocket *incoming;
578   int ret;
579
580   GNUNET_assert (GNUNET_STREAM_OK == status);
581
582   incoming = cls;
583
584   ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES);
585   if (GNUNET_SYSERR == ret)
586   {
587     /* FIXME: handle this correctly */
588     GNUNET_assert (0);
589   }
590
591   /* read again */
592   incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
593                                      &incoming_stream_data_processor, incoming);
594
595   /* we always read all data */
596   return size;
597 }
598
599
600 /**
601  * Iterator to insert values into an ibf.
602  *
603  * @param cls closure
604  * @param key current key code
605  * @param value value in the hash map
606  * @return GNUNET_YES if we should continue to
607  *         iterate,
608  *         GNUNET_NO if not.
609  */
610 static int
611 ibf_values_iterator (void *cls,
612                      const struct GNUNET_HashCode *key,
613                      void *value)
614 {
615   struct ConsensusPeerInformation *cpi;
616   cpi = cls;
617   ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key));
618   return GNUNET_YES;
619 }
620
621 static void
622 prepare_ibf (struct ConsensusPeerInformation *cpi)
623 {
624   if (NULL == cpi->session->ibfs[cpi->ibf_order])
625   {
626     cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
627     GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
628   }
629 }
630
631
632 /**
633  * Called when a peer sends us its strata estimator.
634  * In response, we sent out IBF of appropriate size back.
635  *
636  * @param cpi session
637  * @param strata_msg message
638  */
639 static int
640 handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
641 {
642   int i;
643   uint64_t *key_src;
644   uint32_t *hash_src;
645   uint8_t *count_src;
646
647   GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
648
649   if (NULL == cpi->strata)
650   {
651     cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
652     for (i = 0; i < STRATA_COUNT; i++)
653       cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
654   }
655
656   /* for correct message alignment, copy bucket types seperately */
657   key_src = (uint64_t *) &strata_msg[1];
658
659   for (i = 0; i < STRATA_COUNT; i++)
660   {
661     memcpy (cpi->strata[i]->id_sum, key_src, STRATA_IBF_BUCKETS * sizeof *key_src);
662     key_src += STRATA_IBF_BUCKETS;
663   }
664
665   hash_src = (uint32_t *) key_src;
666
667   for (i = 0; i < STRATA_COUNT; i++)
668   {
669     memcpy (cpi->strata[i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
670     hash_src += STRATA_IBF_BUCKETS;
671   }
672
673   count_src = (uint8_t *) hash_src;
674
675   for (i = 0; i < STRATA_COUNT; i++)
676   {
677     memcpy (cpi->strata[i]->count, count_src, STRATA_IBF_BUCKETS);
678     count_src += STRATA_IBF_BUCKETS;
679   }
680
681   cpi->diff = estimate_difference (cpi->session->strata, cpi->strata);
682   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff);
683
684   /* send IBF of the right size */
685   cpi->ibf_order = 0;
686   while ((1 << cpi->ibf_order) < cpi->diff)
687     cpi->ibf_order++;
688   if (cpi->ibf_order > MAX_IBF_ORDER)
689     cpi->ibf_order = MAX_IBF_ORDER;
690   cpi->ibf_order += 2;
691   /* create ibf if not already pre-computed */
692   prepare_ibf (cpi);
693   cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
694   cpi->ibf_state = IBF_STATE_TRANSMITTING;
695   write_ibf (cpi, GNUNET_STREAM_OK, 0);
696
697   return GNUNET_YES;
698 }
699
700
701 static int
702 handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
703 {
704   int num_buckets;
705   uint64_t *key_src;
706   uint32_t *hash_src;
707   uint8_t *count_src;
708
709   num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
710
711   if (IBF_STATE_NONE == cpi->ibf_state)
712   {
713     cpi->ibf_state = IBF_STATE_RECEIVING;
714     cpi->ibf_order = digest->order;
715     cpi->ibf_bucket_counter = 0;
716   }
717
718   if ( (IBF_STATE_RECEIVING != cpi->ibf_state) ||
719        (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) )
720   {
721     mark_peer_bad (cpi);
722     return GNUNET_NO;
723   }
724
725
726   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->ibf_bucket_counter, (1 << cpi->ibf_order));
727
728   if (NULL == cpi->ibf)
729     cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
730
731   key_src = (uint64_t *) &digest[1];
732
733   memcpy (cpi->ibf->hash_sum, key_src, num_buckets * sizeof *key_src);
734   hash_src += num_buckets;
735
736   hash_src = (uint32_t *) key_src;
737
738   memcpy (cpi->ibf->id_sum, hash_src, num_buckets * sizeof *hash_src);
739   hash_src += num_buckets;
740
741   count_src = (uint8_t *) hash_src;
742
743   memcpy (cpi->ibf->count, count_src, num_buckets * sizeof *count_src);
744
745   cpi->ibf_bucket_counter += num_buckets;
746
747   if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
748   {
749     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
750     GNUNET_assert (NULL != cpi->wh);
751     cpi->ibf_state = IBF_STATE_DECODING;
752     prepare_ibf (cpi);
753     ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
754     write_requests_and_elements (cpi, GNUNET_STREAM_OK, 0);
755   }
756   return GNUNET_YES;
757 }
758
759
760 static int
761 handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
762 {
763   struct PendingElement *pending_element;
764   struct GNUNET_CONSENSUS_Element *element;
765   struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
766   size_t size;
767
768   size = ntohs (element_msg->size) - sizeof *element_msg;
769
770   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size);
771
772   element = GNUNET_malloc (size + sizeof *element);
773   element->size = size;
774   memcpy (&element[1], &element_msg[1], size);
775   element->data = &element[1];
776
777   pending_element = GNUNET_malloc (sizeof *pending_element);
778   pending_element->element = element;
779   GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element);
780
781   client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
782   client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
783   client_element_msg->header.size = htons (size + sizeof *client_element_msg);
784   memcpy (&client_element_msg[1], &element[1], size);
785
786   queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg);
787
788   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n");
789
790   send_next (cpi->session);
791   
792   return GNUNET_YES;
793 }
794
795
796 /**
797  * Handle a request for elements.
798  * Only allowed in exchange-rounds.
799  *
800  * FIXME: implement
801  */
802 static int
803 handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
804 {
805   /* FIXME: implement */
806   return GNUNET_YES;
807 }
808
809
810 /**
811  * Handle a HELLO-message, send when another peer wants to join a session where
812  * our peer is a member. The session may or may not be inhabited yet.
813  */
814 static int
815 handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
816 {
817   /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */
818   struct ConsensusSession *session;
819   session = sessions_head;
820   while (NULL != session)
821   {
822     if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
823     {
824       int idx;
825       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n");
826       idx = get_peer_idx (inc->peer, session);
827       GNUNET_assert (-1 != idx);
828       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx);
829       inc->cpi = &session->info[idx];
830       GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
831       inc->cpi->mst = inc->mst;
832       inc->cpi->hello = GNUNET_YES;
833       inc->cpi->socket = inc->socket;
834       return GNUNET_YES;
835     }
836     session = session->next;
837   }
838   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer tried to HELLO uninhabited session\n");
839   GNUNET_break (0);
840   return GNUNET_NO;
841 }
842
843
844 /**
845  * Functions with this signature are called whenever a
846  * complete message is received by the tokenizer.
847  *
848  * Do not call GNUNET_SERVER_mst_destroy in callback
849  *
850  * @param cls closure
851  * @param client identification of the client
852  * @param message the actual message
853  *
854  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
855  */
856 static int
857 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
858 {
859   struct ConsensusPeerInformation *cpi;
860   cpi =  cls;
861   switch (ntohs (message->type))
862   {
863     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
864       return handle_p2p_strata (cpi, (struct StrataMessage *) message);
865     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
866       return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
867     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
868       return handle_p2p_element (cpi, message);
869     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
870       return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
871     default:
872       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type));
873       /* FIXME: handle correctly */
874       GNUNET_assert (0);
875   }
876   return GNUNET_OK;
877 }
878
879
880 /**
881  * Handle tokenized messages from stream sockets.
882  * Delegate them if the socket belongs to a session,
883  * handle hello messages otherwise.
884  *
885  * Do not call GNUNET_SERVER_mst_destroy in callback
886  *
887  * @param cls closure, unused
888  * @param client incoming socket this message comes from
889  * @param message the actual message
890  *
891  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
892  */
893 static int
894 mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
895 {
896   struct IncomingSocket *inc;
897   inc = (struct IncomingSocket *) client;
898   switch (ntohs( message->type))
899   {
900     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
901       return handle_p2p_hello (inc, (struct ConsensusHello *) message);
902     default:
903       if (NULL != inc->cpi)
904         return mst_session_callback (inc->cpi, client, message);
905       /* FIXME: disconnect peer properly */
906       GNUNET_assert (0);
907   }
908   return GNUNET_OK;
909 }
910
911
912 /**
913  * Functions of this type are called upon new stream connection from other peers
914  * or upon binding error which happen when the app_port given in
915  * GNUNET_STREAM_listen() is already taken.
916  *
917  * @param cls the closure from GNUNET_STREAM_listen
918  * @param socket the socket representing the stream; NULL on binding error
919  * @param initiator the identity of the peer who wants to establish a stream
920  *            with us; NULL on binding error
921  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
922  *             stream (the socket will be invalid after the call)
923  */
924 static int
925 listen_cb (void *cls,
926            struct GNUNET_STREAM_Socket *socket,
927            const struct GNUNET_PeerIdentity *initiator)
928 {
929   struct IncomingSocket *incoming;
930
931   GNUNET_assert (NULL != socket);
932
933   incoming = GNUNET_malloc (sizeof *incoming);
934
935   incoming->socket = socket;
936   incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
937
938   incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
939                                      &incoming_stream_data_processor, incoming);
940
941
942   incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
943
944   GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
945
946   return GNUNET_OK;
947 }
948
949
950 static void
951 destroy_session (struct ConsensusSession *session)
952 {
953   /* FIXME: more stuff to free! */
954   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
955   GNUNET_SERVER_client_drop (session->client);
956   GNUNET_free (session);
957 }
958
959
960 /**
961  * Disconnect a client, and destroy all sessions associated with it.
962  *
963  * @param client the client to disconnect
964  */
965 static void
966 disconnect_client (struct GNUNET_SERVER_Client *client)
967 {
968   struct ConsensusSession *session;
969   GNUNET_SERVER_client_disconnect (client);
970   
971   /* if the client owns a session, remove it */
972   session = sessions_head;
973   while (NULL != session)
974   {
975     if (client == session->client)
976     {
977       destroy_session (session);
978       break;
979     }
980     session = session->next;
981   }
982 }
983
984
985 /**
986  * Compute a global, (hopefully) unique consensus session id,
987  * from the local id of the consensus session, and the identities of all participants.
988  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
989  * exactly the same peers, the global id will be different.
990  *
991  * @param local_id local id of the consensus session
992  * @param peers array of all peers participating in the consensus session
993  * @param num_peers number of elements in the peers array
994  * @param dst where the result is stored, may not be NULL
995  */
996 static void
997 compute_global_id (const struct GNUNET_HashCode *local_id,
998                    const struct GNUNET_PeerIdentity *peers, int num_peers, 
999                    struct GNUNET_HashCode *dst)
1000 {
1001   int i;
1002   struct GNUNET_HashCode tmp;
1003
1004   *dst = *local_id;
1005   for (i = 0; i < num_peers; ++i)
1006   {
1007     GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
1008     *dst = tmp;
1009     GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
1010     *dst = tmp;
1011   }
1012 }
1013
1014
1015 /**
1016  * Function called to notify a client about the connection
1017  * begin ready to queue more data.  "buf" will be
1018  * NULL and "size" zero if the connection was closed for
1019  * writing in the meantime.
1020  *
1021  * @param cls consensus session
1022  * @param size number of bytes available in buf
1023  * @param buf where the callee should write the message
1024  * @return number of bytes written to buf
1025  */
1026 static size_t
1027 transmit_queued (void *cls, size_t size,
1028                  void *buf)
1029 {
1030   struct ConsensusSession *session;
1031   struct QueuedMessage *qmsg;
1032   size_t msg_size;
1033
1034   session = cls;
1035   session->th = NULL;
1036
1037
1038   qmsg = session->client_messages_head;
1039   GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
1040   GNUNET_assert (qmsg);
1041
1042   if (NULL == buf)
1043   {
1044     destroy_session (session);
1045     return 0;
1046   }
1047
1048   msg_size = ntohs (qmsg->msg->size);
1049
1050   GNUNET_assert (size >= msg_size);
1051
1052   memcpy (buf, qmsg->msg, msg_size);
1053   GNUNET_free (qmsg->msg);
1054   GNUNET_free (qmsg);
1055
1056   send_next (session);
1057
1058   return msg_size;
1059 }
1060
1061
1062 /**
1063  * Schedule sending the next message (if there is any) to a client.
1064  *
1065  * @param cli the client to send the next message to
1066  */
1067 static void
1068 send_next (struct ConsensusSession *session)
1069 {
1070
1071   GNUNET_assert (NULL != session);
1072
1073   if (NULL != session->th)
1074     return;
1075
1076   if (NULL != session->client_messages_head)
1077   {
1078     int msize;
1079     msize = ntohs (session->client_messages_head->msg->size);
1080     session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, 
1081                                                        GNUNET_TIME_UNIT_FOREVER_REL,
1082                                                        &transmit_queued, session);
1083   }
1084 }
1085
1086
1087 /**
1088  * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
1089  * the correct signature to be used with e.g. qsort.
1090  * We use this function instead.
1091  *
1092  * @param h1 some hash code
1093  * @param h2 some hash code
1094  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
1095  */
1096 static int
1097 hash_cmp (const void *a, const void *b)
1098 {
1099   return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b);
1100 }
1101
1102
1103 /**
1104  * Search peer in the list of peers in session.
1105  *
1106  * @param peer peer to find
1107  * @param session session with peer
1108  * @return index of peer, -1 if peer is not in session
1109  */
1110 static int
1111 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
1112 {
1113   const struct GNUNET_PeerIdentity *needle;
1114   needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
1115   if (NULL == needle)
1116     return -1;
1117   return needle - session->peers;
1118 }
1119
1120
1121
1122 /**
1123  * Called when stream has finishes writing the hello message
1124  */
1125 static void
1126 hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1127 {
1128   struct ConsensusPeerInformation *cpi;
1129
1130   cpi = cls;
1131   cpi->hello = GNUNET_YES;
1132   
1133   GNUNET_assert (GNUNET_STREAM_OK == status);
1134
1135   if (cpi->session->conclude_requested)
1136   {
1137     write_strata (cpi, GNUNET_STREAM_OK, 0);  
1138   }
1139 }
1140
1141
1142 /**
1143  * Functions of this type will be called when a stream is established
1144  *
1145  * @param cls the closure from GNUNET_STREAM_open
1146  * @param socket socket to use to communicate with the other side (read/write)
1147  */
1148 static void
1149 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1150 {
1151   struct ConsensusPeerInformation *cpi;
1152   struct ConsensusHello *hello;
1153
1154
1155   cpi = cls;
1156   cpi->is_connected = GNUNET_YES;
1157
1158   hello = GNUNET_malloc (sizeof *hello);
1159   hello->header.size = htons (sizeof *hello);
1160   hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1161   memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1162
1163   cpi->wh =
1164       GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
1165
1166   cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1167                                 &session_stream_data_processor, cpi);
1168
1169 }
1170
1171
1172 static void
1173 initialize_session_info (struct ConsensusSession *session)
1174 {
1175   int i;
1176   int last;
1177
1178   for (i = 0; i < session->num_peers; ++i)
1179   {
1180     /* initialize back-references, so consensus peer information can
1181      * be used as closure */
1182     session->info[i].session = session;
1183   }
1184
1185   session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE;
1186
1187   last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1188   i = (session->local_peer_idx + 1) % session->num_peers;
1189   while (i != last)
1190   {
1191     session->info[i].is_outgoing = GNUNET_YES;
1192     session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS,
1193                                                   open_cb, &session->info[i], GNUNET_STREAM_OPTION_END);
1194     session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]);
1195     i = (i + 1) % session->num_peers;
1196
1197     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i);
1198   }
1199   // tie-breaker for even number of peers
1200   if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
1201   {
1202     session->info[last].is_outgoing = GNUNET_YES;
1203     session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
1204                                                      open_cb, &session->info[last], GNUNET_STREAM_OPTION_END);
1205     session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]);
1206
1207     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last);
1208   }
1209 }
1210
1211
1212 /**
1213  * Create the sorted list of peers for the session,
1214  * add the local peer if not in the join message.
1215  */
1216 static void
1217 initialize_session_peer_list (struct ConsensusSession *session)
1218 {
1219   int local_peer_in_list;
1220   int listed_peers;
1221   const struct GNUNET_PeerIdentity *msg_peers;
1222   unsigned int i;
1223
1224   GNUNET_assert (NULL != session->join_msg);
1225
1226   /* peers in the join message, may or may not include the local peer */
1227   listed_peers = ntohs (session->join_msg->num_peers);
1228   
1229   session->num_peers = listed_peers;
1230
1231   msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
1232
1233   local_peer_in_list = GNUNET_NO;
1234   for (i = 0; i < listed_peers; i++)
1235   {
1236     if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
1237     {
1238       local_peer_in_list = GNUNET_YES;
1239       break;
1240     }
1241   }
1242
1243   if (GNUNET_NO == local_peer_in_list)
1244     session->num_peers++;
1245
1246   session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
1247
1248   if (GNUNET_NO == local_peer_in_list)
1249     session->peers[session->num_peers - 1] = *my_peer;
1250
1251   memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
1252   qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
1253 }
1254
1255
1256 static void
1257 strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key)
1258 {
1259   uint32_t v;
1260   int i;
1261   v = key->bits[0];
1262   /* count trailing '1'-bits of v */
1263   for (i = 0; v & 1; v>>=1, i++)
1264     /* empty */;
1265   ibf_insert (strata[i], ibf_key_from_hashcode (key));
1266 }
1267
1268
1269 /**
1270  * Initialize the session, continue receiving messages from the owning client
1271  *
1272  * @param session the session to initialize
1273  */
1274 static void
1275 initialize_session (struct ConsensusSession *session)
1276 {
1277   const struct ConsensusSession *other_session;
1278   int i;
1279
1280   GNUNET_assert (NULL != session->join_msg);
1281
1282   initialize_session_peer_list (session);
1283
1284   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
1285
1286   compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id);
1287
1288   /* Check if some local client already owns the session. */
1289   other_session = sessions_head;
1290   while (NULL != other_session)
1291   {
1292     if ((other_session != session) && 
1293         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1294     {
1295       /* session already owned by another client */
1296       GNUNET_break (0);
1297       disconnect_client (session->client);
1298       return;
1299     }
1300     other_session = other_session->next;
1301   }
1302
1303   session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
1304
1305   session->local_peer_idx = get_peer_idx (my_peer, session);
1306   GNUNET_assert (-1 != session->local_peer_idx);
1307
1308   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
1309
1310   session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
1311   for (i = 0; i < STRATA_COUNT; i++)
1312     session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
1313
1314   session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *));
1315
1316   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1317   initialize_session_info (session);
1318
1319   GNUNET_free (session->join_msg);
1320   session->join_msg = NULL;
1321
1322   GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1323   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1324 }
1325
1326
1327 /**
1328  * Called when a client wants to join a consensus session.
1329  *
1330  * @param cls unused
1331  * @param client client that sent the message
1332  * @param m message sent by the client
1333  */
1334 static void
1335 client_join (void *cls,
1336              struct GNUNET_SERVER_Client *client,
1337              const struct GNUNET_MessageHeader *m)
1338 {
1339   struct ConsensusSession *session;
1340
1341   // make sure the client has not already joined a session
1342   session = sessions_head;
1343   while (NULL != session)
1344   {
1345     if (session->client == client)
1346     {
1347       GNUNET_break (0);
1348       disconnect_client (client);
1349       return;
1350     }
1351     session = session->next;
1352   }
1353
1354   session = GNUNET_malloc (sizeof (struct ConsensusSession));
1355   session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
1356   session->client = client;
1357   GNUNET_SERVER_client_keep (client);
1358
1359   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1360
1361   // Initialize session later if local peer identity is not known yet.
1362   if (NULL == my_peer)
1363   {
1364     GNUNET_SERVER_disable_receive_done_warning (client);
1365     return;
1366   }
1367
1368   initialize_session (session);
1369 }
1370
1371
1372 /**
1373  * Called when a client performs an insert operation.
1374  *
1375  * @param cls (unused)
1376  * @param client client handle
1377  * @param message message sent by the client
1378  */
1379 void
1380 client_insert (void *cls,
1381              struct GNUNET_SERVER_Client *client,
1382              const struct GNUNET_MessageHeader *m)
1383 {
1384   struct ConsensusSession *session;
1385   struct GNUNET_CONSENSUS_ElementMessage *msg;
1386   struct GNUNET_CONSENSUS_Element *element;
1387   struct GNUNET_HashCode key;
1388   int element_size;
1389
1390   session = sessions_head;
1391   while (NULL != session)
1392   {
1393     if (session->client == client)
1394       break;
1395   }
1396
1397   if (NULL == session)
1398   {
1399     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
1400     GNUNET_SERVER_client_disconnect (client);
1401     return;
1402   }
1403
1404   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1405   element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1406
1407   element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
1408
1409   element->type = msg->element_type;
1410   element->size = element_size;
1411   memcpy (&element[1], &msg[1], element_size);
1412   element->data = &element[1];
1413
1414   GNUNET_assert (NULL != element->data);
1415
1416   GNUNET_CRYPTO_hash (element, element_size, &key);
1417
1418   GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1419                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1420
1421   strata_insert (session->strata, &key);
1422
1423   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1424
1425   send_next (session);
1426 }
1427
1428
1429
1430
1431 /**
1432  * Functions of this signature are called whenever writing operations
1433  * on a stream are executed
1434  *
1435  * @param cls the closure from GNUNET_STREAM_write
1436  * @param status the status of the stream at the time this function is called;
1437  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
1438  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1439  *          (this doesn't mean that the data is never sent, the receiver may
1440  *          have read the data but its ACKs may have been lost);
1441  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1442  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1443  *          be processed.
1444  * @param size the number of bytes written
1445  */
1446 static void 
1447 write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1448 {
1449   GNUNET_assert (GNUNET_STREAM_OK == status);
1450   /* just wait for the ibf */
1451 }
1452
1453 /**
1454  * Functions of this signature are called whenever writing operations
1455  * on a stream are executed
1456  *
1457  * @param cls the closure from GNUNET_STREAM_write
1458  * @param status the status of the stream at the time this function is called;
1459  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
1460  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1461  *          (this doesn't mean that the data is never sent, the receiver may
1462  *          have read the data but its ACKs may have been lost);
1463  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1464  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1465  *          be processed.
1466  * @param size the number of bytes written
1467  */
1468 static void 
1469 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1470 {
1471   struct ConsensusPeerInformation *cpi;
1472   struct StrataMessage *strata_msg;
1473   size_t msize;
1474   int i;
1475   uint64_t *key_dst;
1476   uint32_t *hash_dst;
1477   uint8_t *count_dst;
1478
1479   cpi = cls;
1480   cpi->wh = NULL;
1481
1482   GNUNET_assert (GNUNET_STREAM_OK == status);
1483
1484   GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
1485
1486   /* FIXME: handle this */
1487   GNUNET_assert (GNUNET_STREAM_OK == status);
1488
1489   msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1490
1491   strata_msg = GNUNET_malloc (msize);
1492   strata_msg->header.size = htons (msize);
1493   strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1494
1495   /* for correct message alignment, copy bucket types seperately */
1496   key_dst = (uint64_t *) &strata_msg[1];
1497
1498   for (i = 0; i < STRATA_COUNT; i++)
1499   {
1500     memcpy (key_dst, cpi->session->strata[i]->id_sum, STRATA_IBF_BUCKETS * sizeof *key_dst);
1501     key_dst += STRATA_IBF_BUCKETS;
1502   }
1503
1504   hash_dst = (uint32_t *) key_dst;
1505
1506   for (i = 0; i < STRATA_COUNT; i++)
1507   {
1508     memcpy (hash_dst, cpi->session->strata[i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1509     hash_dst += STRATA_IBF_BUCKETS;
1510   }
1511
1512   count_dst = (uint8_t *) hash_dst;
1513
1514   for (i = 0; i < STRATA_COUNT; i++)
1515   {
1516     memcpy (count_dst, cpi->session->strata[i]->count, STRATA_IBF_BUCKETS);
1517     count_dst += STRATA_IBF_BUCKETS;
1518   }
1519
1520   cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1521                                  write_strata_done, cpi);
1522
1523   GNUNET_assert (NULL != cpi->wh);
1524 }
1525
1526
1527 /**
1528  * Functions of this signature are called whenever writing operations
1529  * on a stream are executed
1530  *
1531  * @param cls the closure from GNUNET_STREAM_write
1532  * @param status the status of the stream at the time this function is called;
1533  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
1534  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1535  *          (this doesn't mean that the data is never sent, the receiver may
1536  *          have read the data but its ACKs may have been lost);
1537  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1538  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1539  *          be processed.
1540  * @param size the number of bytes written
1541  */
1542 static void 
1543 write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1544 {
1545   struct ConsensusPeerInformation *cpi;
1546   struct DifferenceDigest *digest;
1547   int msize;
1548   uint64_t *key_dst;
1549   uint32_t *hash_dst;
1550   uint8_t *count_dst;
1551   int num_buckets;
1552
1553   cpi = cls;
1554   cpi->wh = NULL;
1555
1556   GNUNET_assert (GNUNET_STREAM_OK == status);
1557
1558   GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state);
1559
1560   if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1561   {
1562     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
1563     /* we now wait for values / requests / another IBF because peer could not decode with our IBF */
1564     return;
1565   }
1566
1567   /* remaining buckets */
1568   num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
1569
1570   /* limit to maximum */
1571   if (num_buckets > BUCKETS_PER_MESSAGE)
1572     num_buckets = BUCKETS_PER_MESSAGE;
1573
1574   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->ibf_bucket_counter, (1<<cpi->ibf_order));
1575
1576   msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
1577
1578   digest = GNUNET_malloc (msize);
1579   digest->header.size = htons (msize);
1580   digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1581   digest->order = cpi->ibf_order;
1582
1583   key_dst = (uint64_t *) &digest[1];
1584
1585   memcpy (key_dst, cpi->ibf->id_sum, num_buckets * sizeof *key_dst);
1586   key_dst += num_buckets;
1587
1588   hash_dst = (uint32_t *) key_dst;
1589
1590   memcpy (hash_dst, cpi->ibf->id_sum, num_buckets * sizeof *hash_dst);
1591   hash_dst += num_buckets;
1592
1593   count_dst = (uint8_t *) hash_dst;
1594
1595   memcpy (count_dst, cpi->ibf->count, num_buckets * sizeof *count_dst);
1596
1597   cpi->ibf_bucket_counter += num_buckets;
1598
1599   cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1600                                  write_ibf, cpi);
1601
1602   GNUNET_assert (NULL != cpi->wh);
1603 }
1604
1605
1606 /**
1607  * Functions of this signature are called whenever writing operations
1608  * on a stream are executed
1609  *
1610  * @param cls the closure from GNUNET_STREAM_write
1611  * @param status the status of the stream at the time this function is called;
1612  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
1613  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1614  *          (this doesn't mean that the data is never sent, the receiver may
1615  *          have read the data but its ACKs may have been lost);
1616  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1617  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1618  *          be processed.
1619  * @param size the number of bytes written
1620  */
1621 static void 
1622 write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1623 {
1624   struct ConsensusPeerInformation *cpi;
1625   uint64_t key;
1626   struct GNUNET_HashCode hashcode;
1627   int side;
1628   int msize;
1629
1630   GNUNET_assert (GNUNET_STREAM_OK == status);
1631
1632   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n");
1633
1634   cpi = cls;
1635   cpi->wh = NULL;
1636
1637   GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state);
1638
1639   for (;;)
1640   {
1641     int res;
1642     res = ibf_decode (cpi->ibf, &side, &key);
1643     if (GNUNET_SYSERR == res)
1644     {
1645       cpi->ibf_order++;
1646       prepare_ibf (cpi);
1647       cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1648       cpi->ibf_state = IBF_STATE_TRANSMITTING;
1649       write_ibf (cls, status, size);
1650       return;
1651     }
1652     if (GNUNET_NO == res)
1653     {
1654       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n");
1655       return;
1656     }
1657     if (-1 == side)
1658     {
1659       struct GNUNET_CONSENSUS_Element *element;
1660       struct GNUNET_MessageHeader *element_msg;
1661       ibf_hashcode_from_key (key, &hashcode);
1662       /* FIXME: this only transmits one element stored with the key */
1663       element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
1664       if (NULL == element)
1665         continue;
1666       msize = sizeof (struct GNUNET_MessageHeader) + element->size;
1667       element_msg = GNUNET_malloc (msize);
1668       element_msg->size = htons (msize);
1669       element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
1670       GNUNET_assert (NULL != element->data);
1671       memcpy (&element_msg[1], element->data, element->size);
1672       cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1673                                      write_requests_and_elements, cpi);
1674       GNUNET_free (element_msg);
1675       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n");
1676
1677       GNUNET_assert (NULL != cpi->wh);
1678       return;
1679     }
1680     else
1681     {
1682       struct ElementRequest *msg;
1683       size_t msize;
1684       uint64_t *p;
1685
1686       msize = (sizeof *msg) + sizeof (uint64_t);
1687       msg = GNUNET_malloc (msize);
1688       msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1689       msg->header.size = htons (msize);
1690       p = (uint64_t *) &msg[1];
1691       *p = key;
1692
1693       cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1694                                      write_requests_and_elements, cpi);
1695       GNUNET_assert (NULL != cpi->wh);
1696       GNUNET_free (msg);
1697       return;
1698     }
1699   }
1700
1701 }
1702
1703
1704
1705 /*
1706 static void
1707 select_best_group (struct ConsensusSession *session)
1708 {
1709 }
1710 */
1711
1712
1713 /**
1714  * Called when a client performs the conclude operation.
1715  *
1716  * @param cls (unused)
1717  * @param client client handle
1718  * @param message message sent by the client
1719  */
1720 static void
1721 client_conclude (void *cls,
1722              struct GNUNET_SERVER_Client *client,
1723              const struct GNUNET_MessageHeader *message)
1724 {
1725   struct ConsensusSession *session;
1726   int i;
1727
1728   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
1729
1730   session = sessions_head;
1731   while ((session != NULL) && (session->client != client))
1732     session = session->next;
1733   if (NULL == session)
1734   {
1735     /* client not found */
1736     GNUNET_break (0);
1737     GNUNET_SERVER_client_disconnect (client);
1738     return;
1739   }
1740
1741   if (GNUNET_YES == session->conclude_requested)
1742   {
1743     /* client requested conclude twice */
1744     GNUNET_break (0);
1745     disconnect_client (client);
1746     return;
1747   }
1748
1749   session->conclude_requested = GNUNET_YES;
1750
1751   for (i = 0; i < session->num_peers; i++)
1752   {
1753     if ( (GNUNET_YES == session->info[i].is_outgoing) &&
1754          (GNUNET_YES == session->info[i].hello) )
1755     {
1756       /* kick off transmitting strata by calling the write continuation */
1757       write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
1758     }
1759   }
1760
1761   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1762   send_next (session);
1763 }
1764
1765
1766 /**
1767  * Called when a client sends an ack
1768  *
1769  * @param cls (unused)
1770  * @param client client handle
1771  * @param message message sent by the client
1772  */
1773 void
1774 client_ack (void *cls,
1775              struct GNUNET_SERVER_Client *client,
1776              const struct GNUNET_MessageHeader *message)
1777 {
1778   struct ConsensusSession *session;
1779   struct GNUNET_CONSENSUS_AckMessage *msg;
1780   struct PendingElement *pending;
1781   struct GNUNET_CONSENSUS_Element *element;
1782   struct GNUNET_HashCode key;
1783
1784   session = sessions_head;
1785   while (NULL != session)
1786   {
1787     if (session->client == client)
1788       break;
1789   }
1790
1791   if (NULL == session)
1792   {
1793     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n");
1794     GNUNET_SERVER_client_disconnect (client);
1795     return;
1796   }
1797
1798   pending = session->approval_pending_head;
1799
1800   GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, session->approval_pending_tail, pending);
1801
1802   msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
1803
1804   if (msg->keep)
1805   {
1806     element = pending->element;
1807     GNUNET_CRYPTO_hash (element, element->size, &key);
1808
1809     GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1810                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1811     strata_insert (session->strata, &key);
1812   }
1813
1814   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1815 }
1816
1817 /**
1818  * Task that disconnects from core.
1819  *
1820  * @param cls core handle
1821  * @param tc context information (why was this task triggered now)
1822  */
1823 static void
1824 disconnect_core (void *cls,
1825                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1826 {
1827   GNUNET_CORE_disconnect (core);
1828   core = NULL;
1829   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
1830 }
1831
1832
1833 static void
1834 core_startup (void *cls,
1835               struct GNUNET_CORE_Handle *core,
1836               const struct GNUNET_PeerIdentity *peer)
1837 {
1838   struct ConsensusSession *session;
1839
1840   my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
1841   /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
1842   GNUNET_SCHEDULER_add_now (&disconnect_core, core);
1843   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
1844
1845   session = sessions_head;
1846   while (NULL != session)
1847   {
1848     if (NULL != session->join_msg)
1849       initialize_session (session);
1850     session = session->next;
1851   }
1852 }
1853
1854
1855 /**
1856  * Called to clean up, after a shutdown has been requested.
1857  *
1858  * @param cls closure
1859  * @param tc context information (why was this task triggered now)
1860  */
1861 static void
1862 shutdown_task (void *cls,
1863                const struct GNUNET_SCHEDULER_TaskContext *tc)
1864 {
1865
1866   /* FIXME: complete; write separate destructors for different data types */
1867
1868   while (NULL != incoming_sockets_head)
1869   {
1870     struct IncomingSocket *socket;
1871     socket = incoming_sockets_head;
1872     if (NULL == socket->cpi)
1873     {
1874       GNUNET_STREAM_close (socket->socket);
1875     }
1876     incoming_sockets_head = incoming_sockets_head->next;
1877     GNUNET_free (socket);
1878   }
1879
1880   while (NULL != sessions_head)
1881   {
1882     struct ConsensusSession *session;
1883     int i;
1884
1885     session = sessions_head;
1886
1887     for (i = 0; session->num_peers; i++)
1888     {
1889       struct ConsensusPeerInformation *cpi;
1890       cpi = &session->info[i];
1891       if ((NULL != cpi) && (NULL != cpi->socket))
1892       {
1893         GNUNET_STREAM_close (cpi->socket);
1894       }
1895     }
1896
1897     if (NULL != session->client)
1898       GNUNET_SERVER_client_disconnect (session->client);
1899
1900     sessions_head = sessions_head->next;
1901     GNUNET_free (session);
1902   }
1903
1904   if (NULL != core)
1905   {
1906     GNUNET_CORE_disconnect (core);
1907     core = NULL;
1908   }
1909
1910   if (NULL != listener)
1911   {
1912     GNUNET_STREAM_listen_close (listener);
1913     listener = NULL;
1914   } 
1915
1916   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
1917 }
1918
1919
1920 /**
1921  * Start processing consensus requests.
1922  *
1923  * @param cls closure
1924  * @param server the initialized server
1925  * @param c configuration to use
1926  */
1927 static void
1928 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
1929 {
1930   /* core is only used to retrieve the peer identity */
1931   static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
1932     {NULL, 0, 0}
1933   };
1934   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1935     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1936     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1937     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1938         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
1939     {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
1940         sizeof (struct GNUNET_CONSENSUS_AckMessage)},
1941     {NULL, NULL, 0, 0}
1942   };
1943
1944   cfg = c;
1945   srv = server;
1946
1947   GNUNET_SERVER_add_handlers (server, server_handlers);
1948
1949   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1950
1951   listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
1952                                    listen_cb, NULL,
1953                                    GNUNET_STREAM_OPTION_END);
1954
1955
1956   /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
1957   core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
1958   GNUNET_assert (NULL != core);
1959
1960   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1961 }
1962
1963
1964 /**
1965  * The main function for the consensus service.
1966  *
1967  * @param argc number of arguments from the command line
1968  * @param argv command line arguments
1969  * @return 0 ok, 1 on error
1970  */
1971 int
1972 main (int argc, char *const *argv)
1973 {
1974   int ret;
1975   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1976   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
1977   return (GNUNET_OK == ret) ? 0 : 1;
1978 }
1979