4b69c5327e5799744b9b97d346ac9fc004d5a573
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_common.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_consensus_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_stream_lib.h"
35 #include "consensus_protocol.h"
36 #include "ibf.h"
37 #include "consensus.h"
38
39
40 /**
41  * Number of IBFs in a strata estimator.
42  */
43 #define STRATA_COUNT 32
44 /**
45  * Number of buckets per IBF.
46  */
47 #define STRATA_IBF_BUCKETS 80
48 /**
49  * hash num parameter for the difference digests and strata estimators
50  */
51 #define STRATA_HASH_NUM 3
52
53 /**
54  * Number of buckets that can be transmitted in one message.
55  */
56 #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
57
58 /**
59  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60  * Choose this value so that computing the IBF is still cheaper
61  * than transmitting all values.
62  */
63 #define MAX_IBF_ORDER (16)
64
65 /**
66  * Number exp-rounds.
67  */
68 #define NUM_EXP_ROUNDS (4)
69
70
71 /* forward declarations */
72
73 struct ConsensusSession;
74 struct IncomingSocket;
75 struct ConsensusPeerInformation;
76
77 static void
78 client_send_next (struct ConsensusSession *session);
79
80 static int
81 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
82
83 static void 
84 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
85
86 static void
87 send_ibf (struct ConsensusPeerInformation *cpi);
88
89 static void
90 send_strata_estimator (struct ConsensusPeerInformation *cpi);
91
92 static void
93 decode (struct ConsensusPeerInformation *cpi);
94
95 static void 
96 write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size);
97
98 static void
99 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
100
101
102 /**
103  * An element that is waiting to be transmitted to the client.
104  */
105 struct PendingElement
106 {
107   /**
108    * Pending elements are kept in a DLL.
109    */
110   struct PendingElement *next;
111
112   /**
113    * Pending elements are kept in a DLL.
114    */
115   struct PendingElement *prev;
116
117   /**
118    * The actual element
119    */
120   struct GNUNET_CONSENSUS_Element *element;
121
122   /* peer this element is coming from */
123   struct ConsensusPeerInformation *cpi;
124 };
125
126
127 struct ElementList
128 {
129   struct ElementList *next;
130   struct GNUNET_CONSENSUS_Element *element;
131   struct GNUNET_HashCode *element_hash;
132 };
133
134
135 /**
136  * Describes the current round a consensus session is in.
137  */
138 enum ConsensusRound
139 {
140   /**
141    * Not started the protocol yet.
142    */
143   CONSENSUS_ROUND_BEGIN=0,
144   /**
145    * Distribution of elements with the exponential scheme.
146    */
147   CONSENSUS_ROUND_EXCHANGE,
148   /**
149    * Exchange which elements each peer has, but not the elements.
150    */
151   CONSENSUS_ROUND_INVENTORY,
152   /**
153    * Collect and distribute missing values.
154    */
155   CONSENSUS_ROUND_STOCK,
156   /**
157    * Consensus concluded.
158    */
159   CONSENSUS_ROUND_FINISH
160 };
161
162
163 /**
164  * Information about a peer that is in a consensus session.
165  */
166 struct ConsensusPeerInformation
167 {
168   struct GNUNET_PeerIdentity peer_id;
169
170   /**
171    * Socket for communicating with the peer, either created by the local peer,
172    * or the remote peer.
173    */
174   struct GNUNET_STREAM_Socket *socket;
175
176   /**
177    * Message tokenizer, for the data received from this peer via the stream socket.
178    */
179   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
180
181   /**
182    * Do we connect to the peer, or does the peer connect to us?
183    * Only valid for all-to-all phases
184    */
185   int is_outgoing;
186
187   /**
188    * Did we receive/send a consensus hello?
189    */
190   int hello;
191
192   /**
193    * Handle for currently active read
194    */
195   struct GNUNET_STREAM_ReadHandle *rh;
196
197   /**
198    * Handle for currently active read
199    */
200   struct GNUNET_STREAM_WriteHandle *wh;
201
202   enum {
203     /* beginning of round */
204     IBF_STATE_NONE=0,
205     /* we currently receive an ibf */
206     IBF_STATE_RECEIVING,
207     /* we currently transmit an ibf */
208     IBF_STATE_TRANSMITTING,
209     /* we decode a received ibf */
210     IBF_STATE_DECODING,
211     /* wait for elements and element requests */
212     IBF_STATE_ANTICIPATE_DIFF
213   } ibf_state ;
214
215   /**
216    * What is the order (=log2 size) of the ibf
217    * we're currently dealing with?
218    * Interpretation depends on ibf_state.
219    */
220   int ibf_order;
221
222   /**
223    * The current IBF for this peer,
224    * purpose dependent on ibf_state
225    */
226   struct InvertibleBloomFilter *ibf;
227
228   /**
229    * How many buckets have we transmitted/received? 
230    * Interpretatin depends on ibf_state
231    */
232   int ibf_bucket_counter;
233
234   /**
235    * Strata estimator of the peer, NULL if our peer
236    * initiated the reconciliation.
237    */
238   struct StrataEstimator *se;
239
240   /**
241    * Element keys that this peer misses, but we have them.
242    */
243   struct GNUNET_CONTAINER_MultiHashMap *requested_keys;
244
245   /**
246    * Element keys that this peer has, but we miss.
247    */
248   struct GNUNET_CONTAINER_MultiHashMap *reported_keys;
249
250   /**
251    * Back-reference to the consensus session,
252    * to that ConsensusPeerInformation can be used as a closure
253    */
254   struct ConsensusSession *session;
255
256   /**
257    * Messages queued for the current round.
258    */
259   struct QueuedMessage *messages_head;
260
261   /**
262    * Messages queued for the current round.
263    */
264   struct QueuedMessage *messages_tail;
265
266   /**
267    * True if we are actually replaying the strata message,
268    * e.g. currently handling the premature_strata_message.
269    */
270   int replaying_strata_message;
271
272   /**
273    * A strata message that is not actually for the current round,
274    * used in the exp-scheme.
275    */
276   struct StrataMessage *premature_strata_message;
277
278   /**
279    * We have finishes the exp-subround with the peer.
280    */
281   int exp_subround_finished;
282
283   int inventory_synced;
284
285   /**
286    * Round this peer seems to be in, according to the last SE we got.
287    * Necessary to store this, as we sometimes need to respond to a request from an
288    * older round, while we are already in the next round.
289    */
290   enum ConsensusRound apparent_round;
291
292 };
293
294 typedef void (*QueuedMessageCallback) (void *msg);
295
296 /**
297  * A doubly linked list of messages.
298  */
299 struct QueuedMessage
300 {
301   struct GNUNET_MessageHeader *msg;
302
303   /**
304    * Queued messages are stored in a doubly linked list.
305    */
306   struct QueuedMessage *next;
307
308   /**
309    * Queued messages are stored in a doubly linked list.
310    */
311   struct QueuedMessage *prev;
312
313   QueuedMessageCallback cb;
314   
315   void *cls;
316 };
317
318
319 struct StrataEstimator
320 {
321   struct InvertibleBloomFilter **strata;
322 };
323
324
325 /**
326  * A consensus session consists of one local client and the remote authorities.
327  */
328 struct ConsensusSession
329 {
330   /**
331    * Consensus sessions are kept in a DLL.
332    */
333   struct ConsensusSession *next;
334
335   /**
336    * Consensus sessions are kept in a DLL.
337    */
338   struct ConsensusSession *prev;
339
340   /**
341    * Join message. Used to initialize the session later,
342    * if the identity of the local peer is not yet known.
343    * NULL if the session has been fully initialized.
344    */
345   struct GNUNET_CONSENSUS_JoinMessage *join_msg;
346
347   /**
348   * Global consensus identification, computed
349   * from the session id and participating authorities.
350   */
351   struct GNUNET_HashCode global_id;
352
353   /**
354    * Local client in this consensus session.
355    * There is only one client per consensus session.
356    */
357   struct GNUNET_SERVER_Client *client;
358
359   /**
360    * Elements in the consensus set of this session,
361    * all of them either have been sent by or approved by the client.
362    * Contains ElementList.
363    * Used as a unique-key hashmap.
364    */
365   struct GNUNET_CONTAINER_MultiHashMap *values;
366
367   /**
368    * Elements that have not been approved (or rejected) by the client yet.
369    */
370   struct PendingElement *client_approval_head;
371
372   /**
373    * Elements that have not been approved (or rejected) by the client yet.
374    */
375   struct PendingElement *client_approval_tail;
376
377   /**
378    * Messages to be sent to the local client that owns this session
379    */
380   struct QueuedMessage *client_messages_head;
381
382   /**
383    * Messages to be sent to the local client that owns this session
384    */
385   struct QueuedMessage *client_messages_tail;
386
387   /**
388    * Currently active transmit handle for sending to the client
389    */
390   struct GNUNET_SERVER_TransmitHandle *client_th;
391
392   /**
393    * Timeout for all rounds together, single rounds will schedule a timeout task
394    * with a fraction of the conclude timeout.
395    */
396   struct GNUNET_TIME_Relative conclude_timeout;
397   
398   /**
399    * Timeout task identifier for the current round
400    */
401   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
402
403   /**
404    * Number of other peers in the consensus
405    */
406   unsigned int num_peers;
407
408   /**
409    * Information about the other peers,
410    * their state, etc.
411    */
412   struct ConsensusPeerInformation *info;
413
414   /**
415    * Index of the local peer in the peers array
416    */
417   int local_peer_idx;
418
419   /**
420    * Strata estimator, computed online
421    */
422   struct StrataEstimator *se;
423
424   /**
425    * Pre-computed IBFs
426    */
427   struct InvertibleBloomFilter **ibfs;
428
429   /**
430    * Current round
431    */
432   enum ConsensusRound current_round;
433
434   int exp_round;
435
436   int exp_subround;
437
438   /**
439    * Permutation of peers for the current round,
440    * maps logical index (for current round) to physical index (location in info array)
441    */
442   int *shuffle;
443
444   /**
445    * The partner for the current exp-round
446    */
447   struct ConsensusPeerInformation* partner_outgoing;
448
449   /**
450    * The partner for the current exp-round
451    */
452   struct ConsensusPeerInformation* partner_incoming;
453 };
454
455
456 /**
457  * Sockets from other peers who want to communicate with us.
458  * It may not be known yet which consensus session they belong to.
459  * Also, the session might not exist yet locally.
460  */
461 struct IncomingSocket
462 {
463   /**
464    * Incoming sockets are kept in a double linked list.
465    */
466   struct IncomingSocket *next;
467
468   /**
469    * Incoming sockets are kept in a double linked list.
470    */
471   struct IncomingSocket *prev;
472
473   /**
474    * The actual socket.
475    */
476   struct GNUNET_STREAM_Socket *socket;
477
478   /**
479    * Handle for currently active read
480    */
481   struct GNUNET_STREAM_ReadHandle *rh;
482
483   /**
484    * Peer that connected to us with the socket.
485    */
486   struct GNUNET_PeerIdentity peer_id;
487
488   /**
489    * Message stream tokenizer for this socket.
490    */
491   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
492
493   /**
494    * Peer-in-session this socket belongs to, once known, otherwise NULL.
495    */
496   struct ConsensusPeerInformation *cpi;
497
498   /**
499    * Set to the global session id, if the peer sent us a hello-message,
500    * but the session does not exist yet.
501    */
502   struct GNUNET_HashCode *requested_gid;
503 };
504
505
506 static struct IncomingSocket *incoming_sockets_head;
507 static struct IncomingSocket *incoming_sockets_tail;
508
509 /**
510  * Linked list of sesstions this peer participates in.
511  */
512 static struct ConsensusSession *sessions_head;
513
514 /**
515  * Linked list of sesstions this peer participates in.
516  */
517 static struct ConsensusSession *sessions_tail;
518
519 /**
520  * Configuration of the consensus service.
521  */
522 static const struct GNUNET_CONFIGURATION_Handle *cfg;
523
524 /**
525  * Handle to the server for this service.
526  */
527 static struct GNUNET_SERVER_Handle *srv;
528
529 /**
530  * Peer that runs this service.
531  */
532 static struct GNUNET_PeerIdentity *my_peer;
533
534 /**
535  * Handle to the core service. Only used during service startup, will be NULL after that.
536  */
537 static struct GNUNET_CORE_Handle *core;
538
539 /**
540  * Listener for sockets from peers that want to reconcile with us.
541  */
542 static struct GNUNET_STREAM_ListenSocket *listener;
543
544
545 /**
546  * Queue a message to be sent to the inhabiting client of a session.
547  *
548  * @param session session
549  * @param msg message we want to queue
550  */
551 static void
552 queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg)
553 {
554   struct QueuedMessage *qm;
555   qm = GNUNET_malloc (sizeof *qm);
556   qm->msg = msg;
557   GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm);
558 }
559
560 /**
561  * Queue a message to be sent to another peer
562  *
563  * @param cpi peer
564  * @param msg message we want to queue
565  * @param cb callback, called when the message is given to strem
566  * @param cls closure for cb
567  */
568 static void
569 queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls)
570 {
571   struct QueuedMessage *qm;
572   qm = GNUNET_malloc (sizeof *qm);
573   qm->msg = msg;
574   qm->cls = cls;
575   qm->cb = cb;
576   GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm);
577   if (cpi->wh == NULL)
578     write_queued (cpi, GNUNET_STREAM_OK, 0);
579 }
580
581
582 /**
583  * Queue a message to be sent to another peer
584  *
585  * @param cpi peer
586  * @param msg message we want to queue
587  */
588 static void
589 queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg)
590 {
591   queue_peer_message_with_cls (cpi, msg, NULL, NULL);
592 }
593
594
595 /*
596 static void
597 clear_peer_messages (struct ConsensusPeerInformation *cpi)
598 {
599   cpi->messages_head = NULL;
600   cpi->messages_tail = NULL;
601 }
602 */
603
604
605 /**
606  * Estimate set difference with two strata estimators,
607  * i.e. arrays of IBFs.
608  * Does not not modify its arguments.
609  *
610  * @param se1 first strata estimator
611  * @param se2 second strata estimator
612  * @return the estimated difference
613  */
614 static int
615 estimate_difference (const struct StrataEstimator *se1,
616                      const struct StrataEstimator *se2)
617 {
618   int i;
619   int count;
620   count = 0;
621   for (i = STRATA_COUNT - 1; i >= 0; i--)
622   {
623     struct InvertibleBloomFilter *diff;
624     /* number of keys decoded from the ibf */
625     int ibf_count;
626     int more;
627     ibf_count = 0;
628     /* FIXME: implement this without always allocating new IBFs */
629     diff = ibf_dup (se1->strata[i]);
630     ibf_subtract (diff, se2->strata[i]);
631     for (;;)
632     {
633       more = ibf_decode (diff, NULL, NULL);
634       if (GNUNET_NO == more)
635       {
636         count += ibf_count;
637         break;
638       }
639       if (GNUNET_SYSERR == more)
640       {
641         ibf_destroy (diff);
642         return count * (1 << (i + 1));
643       }
644       ibf_count++;
645     }
646     ibf_destroy (diff);
647   }
648   return count;
649 }
650
651
652 /**
653  * Called when receiving data from a peer that is member of
654  * an inhabited consensus session.
655  *
656  * @param cls the closure from GNUNET_STREAM_read
657  * @param status the status of the stream at the time this function is called
658  * @param data traffic from the other side
659  * @param size the number of bytes available in data read; will be 0 on timeout 
660  * @return number of bytes of processed from 'data' (any data remaining should be
661  *         given to the next time the read processor is called).
662  */
663 static size_t
664 session_stream_data_processor (void *cls,
665                        enum GNUNET_STREAM_Status status,
666                        const void *data,
667                        size_t size)
668 {
669   struct ConsensusPeerInformation *cpi;
670   int ret;
671
672   GNUNET_assert (GNUNET_STREAM_OK == status);
673   cpi = cls;
674   GNUNET_assert (NULL != cpi->mst);
675   ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
676   if (GNUNET_SYSERR == ret)
677   {
678     /* FIXME: handle this correctly */
679     GNUNET_assert (0);
680   }
681   /* read again */
682   cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
683                                 &session_stream_data_processor, cpi);
684   /* we always read all data */
685   return size;
686 }
687
688 /**
689  * Called when we receive data from a peer that is not member of
690  * a session yet, or the session is not yet inhabited.
691  *
692  * @param cls the closure from GNUNET_STREAM_read
693  * @param status the status of the stream at the time this function is called
694  * @param data traffic from the other side
695  * @param size the number of bytes available in data read; will be 0 on timeout 
696  * @return number of bytes of processed from 'data' (any data remaining should be
697  *         given to the next time the read processor is called).
698  */
699 static size_t
700 incoming_stream_data_processor (void *cls,
701                        enum GNUNET_STREAM_Status status,
702                        const void *data,
703                        size_t size)
704 {
705   struct IncomingSocket *incoming;
706   int ret;
707
708   GNUNET_assert (GNUNET_STREAM_OK == status);
709   incoming = cls;
710   ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES);
711   if (GNUNET_SYSERR == ret)
712   {
713     /* FIXME: handle this correctly */
714     GNUNET_assert (0);
715   }
716   /* read again */
717   incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
718                                      &incoming_stream_data_processor, incoming);
719   /* we always read all data */
720   return size;
721 }
722
723
724 static void
725 send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head)
726 {
727   struct GNUNET_CONSENSUS_Element *element;
728   struct GNUNET_MessageHeader *element_msg;
729   size_t msize;
730
731   while (NULL != head)
732   {
733     element = head->element;
734     msize = sizeof (struct GNUNET_MessageHeader) + element->size;
735     element_msg = GNUNET_malloc (msize);
736     element_msg->size = htons (msize);
737     switch (cpi->apparent_round)
738     {
739       case CONSENSUS_ROUND_STOCK:
740       case CONSENSUS_ROUND_EXCHANGE:
741         element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
742         break;
743       case CONSENSUS_ROUND_INVENTORY:
744         element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
745         break;
746       default:
747         GNUNET_break (0);
748     }
749     GNUNET_assert (NULL != element->data);
750     memcpy (&element_msg[1], element->data, element->size);
751     queue_peer_message (cpi, element_msg);
752     head = head->next;
753   }
754 }
755
756 /**
757  * Iterator to insert values into an ibf.
758  *
759  * @param cls closure
760  * @param key current key code
761  * @param value value in the hash map
762  * @return GNUNET_YES if we should continue to
763  *         iterate,
764  *         GNUNET_NO if not.
765  */
766 static int
767 ibf_values_iterator (void *cls,
768                      const struct GNUNET_HashCode *key,
769                      void *value)
770 {
771   struct ConsensusPeerInformation *cpi;
772   struct ElementList *head;
773   struct IBF_Key ibf_key;
774   cpi = cls;
775   head = value;
776   ibf_key = ibf_key_from_hashcode (head->element_hash);
777   GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
778   ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
779   return GNUNET_YES;
780 }
781
782 /**
783  * Create and populate an IBF for the specified peer,
784  * if it does not already exist.
785  *
786  * @param cpi peer to create the ibf for
787  */
788 static void
789 prepare_ibf (struct ConsensusPeerInformation *cpi)
790 {
791   if (NULL == cpi->session->ibfs[cpi->ibf_order])
792   {
793     cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
794     GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
795   }
796 }
797
798
799 /**
800  * Called when a remote peer wants to inform the local peer
801  * that the remote peer misses elements.
802  * Elements are not reconciled.
803  *
804  * @param cpi session
805  * @param msg message
806  */
807 static int
808 handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
809 {
810   GNUNET_assert (0);
811 }
812
813
814 static int
815 exp_subround_finished (const struct ConsensusSession *session)
816 {
817   int not_finished;
818   not_finished = 0;
819   if ((session->partner_outgoing != NULL) && (session->partner_outgoing->exp_subround_finished == GNUNET_NO))
820       not_finished++;
821   if ((session->partner_incoming != NULL) && (session->partner_incoming->exp_subround_finished == GNUNET_NO))
822       not_finished++;
823   if (0 == not_finished)
824     return GNUNET_YES;
825   return GNUNET_NO;
826 }
827
828 static int
829 inventory_round_finished (struct ConsensusSession *session)
830 {
831   int i;
832   int finished;
833   finished = 0;
834   for (i = 0; i < session->num_peers; i++)
835     if (GNUNET_YES == session->info[i].inventory_synced)
836       finished++;
837   if (finished >= (session->num_peers / 2))
838     return GNUNET_YES;
839   return GNUNET_NO;
840 }
841
842
843
844 static void
845 fin_sent_cb (void *cls)
846 {
847   struct ConsensusPeerInformation *cpi;
848   cpi = cls;
849   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
850   switch (cpi->session->current_round)
851   {
852     case CONSENSUS_ROUND_EXCHANGE:
853     case CONSENSUS_ROUND_STOCK:
854       if (cpi->session->current_round != cpi->apparent_round)
855       {
856         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx);
857         break;
858       }
859       cpi->exp_subround_finished = GNUNET_YES;
860       /* the subround is only really over if *both* partners are done */
861       if (GNUNET_YES == exp_subround_finished (cpi->session))
862         subround_over (cpi->session, NULL);
863       else
864         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx);
865       break;
866     case CONSENSUS_ROUND_INVENTORY:
867       cpi->inventory_synced = GNUNET_YES;
868       if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round)
869         round_over (cpi->session, NULL);
870       /* FIXME: maybe go to next round */
871       break;
872     default:
873       GNUNET_break (0);
874   }
875 }
876
877
878 /**
879  * Gets called when the other peer wants us to inform that
880  * it has decoded our ibf and sent us all elements / requests
881  */
882 static int
883 handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
884 {
885   struct ConsensusRoundMessage *fin_msg;
886
887   switch (cpi->session->current_round)
888   {
889     case CONSENSUS_ROUND_INVENTORY:
890       cpi->inventory_synced = GNUNET_YES;
891     case CONSENSUS_ROUND_STOCK:
892     case CONSENSUS_ROUND_EXCHANGE:
893       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
894       fin_msg = GNUNET_malloc (sizeof *fin_msg);
895       fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
896       fin_msg->header.size = htons (sizeof *fin_msg);
897       fin_msg->round = cpi->apparent_round;
898       /* the subround os over once we kicked off sending the fin msg */
899       /* FIXME: assert we are talking to the right peer! */
900       queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *) fin_msg, fin_sent_cb, cpi);
901       /* FIXME: mark peer as synced */
902       break;
903     default:
904       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
905       break;
906   }
907   return GNUNET_YES;
908 }
909
910
911 /**
912  * The other peer wants us to inform that he sent us all the elements we requested.
913  */
914 static int
915 handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
916 {
917   struct ConsensusRoundMessage *round_msg;
918   round_msg = (struct ConsensusRoundMessage *) msg;
919   /* FIXME: only call subround_over if round is the current one! */
920   switch (cpi->session->current_round)
921   {
922     case CONSENSUS_ROUND_EXCHANGE:
923     case CONSENSUS_ROUND_STOCK:
924       if (cpi->session->current_round != round_msg->round)
925       {
926         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
927         cpi->ibf_state = IBF_STATE_NONE;
928         cpi->ibf_bucket_counter = 0;
929         break;
930       }
931       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
932       cpi->exp_subround_finished = GNUNET_YES;
933       if (GNUNET_YES == exp_subround_finished (cpi->session))
934         subround_over (cpi->session, NULL);
935       else
936         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx);
937     break;
938     case CONSENSUS_ROUND_INVENTORY:
939       cpi->inventory_synced = GNUNET_YES;
940       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
941       if (inventory_round_finished (cpi->session))
942         round_over (cpi->session, NULL);
943       break;
944     default:
945       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
946       break;
947   }
948   return GNUNET_YES;
949 }
950
951
952 static struct StrataEstimator *
953 strata_estimator_create ()
954 {
955   struct StrataEstimator *se;
956   int i;
957
958   /* fixme: allocate everything in one chunk */
959
960   se = GNUNET_malloc (sizeof (struct StrataEstimator));
961   se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter) * STRATA_COUNT);
962   for (i = 0; i < STRATA_COUNT; i++)
963     se->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
964
965   return se;
966 }
967
968 static void
969 strata_estimator_destroy (struct StrataEstimator *se)
970 {
971   int i;
972   for (i = 0; i < STRATA_COUNT; i++)
973     ibf_destroy (se->strata[i]);
974   GNUNET_free (se->strata);
975   GNUNET_free (se);
976 }
977
978
979 static int
980 is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg)
981 {
982   switch (strata_msg->round)
983   {
984     case CONSENSUS_ROUND_STOCK:
985     case CONSENSUS_ROUND_EXCHANGE:
986       /* here, we also have to compare subrounds */
987       if ( (strata_msg->round != session->current_round) ||
988            (strata_msg->exp_round != session->exp_round) ||
989            (strata_msg->exp_subround != session->exp_subround))
990         return GNUNET_YES;
991       break;
992     default:
993       if (session->current_round != strata_msg->round)
994         return GNUNET_YES;
995     break;
996   }
997   return GNUNET_NO;
998 }
999
1000
1001 /**
1002  * Called when a peer sends us its strata estimator.
1003  * In response, we sent out IBF of appropriate size back.
1004  *
1005  * @param cpi session
1006  * @param strata_msg message
1007  */
1008 static int
1009 handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
1010 {
1011   int i;
1012   unsigned int diff;
1013   void *buf;
1014   size_t size;
1015
1016   if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) && (strata_msg->round == CONSENSUS_ROUND_INVENTORY))
1017   {
1018     /* we still have to handle this request appropriately */
1019     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n",
1020                 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1021   }
1022   else if (is_premature_strata_message (cpi->session, strata_msg))
1023   {
1024     if (GNUNET_NO == cpi->replaying_strata_message)
1025     {
1026       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n",
1027                   cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround);
1028       cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg);
1029     }
1030     return GNUNET_YES;
1031   }
1032
1033   if (NULL == cpi->se)
1034     cpi->se = strata_estimator_create ();
1035
1036   cpi->apparent_round = strata_msg->round;
1037
1038   size = ntohs (strata_msg->header.size);
1039   buf = (void *) &strata_msg[1];
1040   for (i = 0; i < STRATA_COUNT; i++)
1041   {
1042     int res;
1043     res = ibf_read (&buf, &size, cpi->se->strata[i]);
1044     GNUNET_assert (GNUNET_OK == res);
1045   }
1046
1047   diff = estimate_difference (cpi->session->se, cpi->se);
1048
1049   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
1050               cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff);
1051
1052   switch (cpi->session->current_round)
1053   {
1054     case CONSENSUS_ROUND_EXCHANGE:
1055     case CONSENSUS_ROUND_INVENTORY:
1056     case CONSENSUS_ROUND_STOCK:
1057       /* send IBF of the right size */
1058       cpi->ibf_order = 0;
1059       while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) )
1060         cpi->ibf_order++;
1061       if (cpi->ibf_order > MAX_IBF_ORDER)
1062         cpi->ibf_order = MAX_IBF_ORDER;
1063       cpi->ibf_order += 1;
1064       /* create ibf if not already pre-computed */
1065       prepare_ibf (cpi);
1066       if (NULL != cpi->ibf)
1067         ibf_destroy (cpi->ibf);
1068       cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1069       cpi->ibf_state = IBF_STATE_TRANSMITTING;
1070       cpi->ibf_bucket_counter = 0;
1071       send_ibf (cpi);
1072       break;
1073     default:
1074       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n",
1075                   cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1076       break;
1077   }
1078   return GNUNET_YES;
1079 }
1080
1081
1082 static int
1083 handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
1084 {
1085   int num_buckets;
1086   void *buf;
1087
1088   /* FIXME: find out if we're still expecting the same ibf! */
1089
1090   cpi->apparent_round = cpi->session->current_round;
1091
1092   num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
1093   switch (cpi->ibf_state)
1094   {
1095     case IBF_STATE_NONE:
1096       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1097       cpi->ibf_state = IBF_STATE_RECEIVING;
1098       cpi->ibf_order = digest->order;
1099       cpi->ibf_bucket_counter = 0;
1100       if (NULL != cpi->ibf)
1101       {
1102         ibf_destroy (cpi->ibf);
1103         cpi->ibf = NULL;
1104       }
1105       break;
1106     case IBF_STATE_ANTICIPATE_DIFF:
1107       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n",
1108                   cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1109       cpi->ibf_state = IBF_STATE_RECEIVING;
1110       cpi->ibf_order = digest->order;
1111       cpi->ibf_bucket_counter = 0;
1112       if (NULL != cpi->ibf)
1113       {
1114         ibf_destroy (cpi->ibf);
1115         cpi->ibf = NULL;
1116       }
1117       break;
1118     case IBF_STATE_RECEIVING:
1119       break;
1120     default:
1121       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1122       return GNUNET_YES;
1123   }
1124
1125   if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
1126   {
1127     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1128     return GNUNET_YES;
1129   }
1130
1131
1132   if (NULL == cpi->ibf)
1133     cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
1134
1135   buf = (void *) &digest[1];
1136   ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
1137
1138   cpi->ibf_bucket_counter += num_buckets;
1139
1140   if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1141   {
1142     cpi->ibf_state = IBF_STATE_DECODING;
1143     cpi->ibf_bucket_counter = 0;
1144     prepare_ibf (cpi);
1145     ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
1146     decode (cpi);
1147   }
1148   return GNUNET_YES;
1149 }
1150
1151
1152 /**
1153  * Handle an element that another peer sent us
1154  */
1155 static int
1156 handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
1157 {
1158   struct PendingElement *pending_element;
1159   struct GNUNET_CONSENSUS_Element *element;
1160   struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
1161   size_t size;
1162
1163   switch (cpi->session->current_round)
1164   {
1165     case CONSENSUS_ROUND_STOCK:
1166       /* FIXME: check if we really expect the element */
1167     case CONSENSUS_ROUND_EXCHANGE:
1168       break;
1169     default:
1170       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n");
1171       return GNUNET_YES;
1172   }
1173
1174   size = ntohs (element_msg->size) - sizeof *element_msg;
1175
1176   element = GNUNET_malloc (size + sizeof *element);
1177   element->size = size;
1178   memcpy (&element[1], &element_msg[1], size);
1179   element->data = &element[1];
1180
1181   pending_element = GNUNET_malloc (sizeof *pending_element);
1182   pending_element->element = element;
1183   GNUNET_CONTAINER_DLL_insert_tail (cpi->session->client_approval_head, cpi->session->client_approval_tail, pending_element);
1184
1185   client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
1186   client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
1187   client_element_msg->header.size = htons (size + sizeof *client_element_msg);
1188   memcpy (&client_element_msg[1], &element[1], size);
1189
1190   queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg);
1191
1192   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element, size=%d\n", size);
1193
1194   client_send_next (cpi->session);
1195   
1196   return GNUNET_YES;
1197 }
1198
1199
1200 /**
1201  * Handle a request for elements.
1202  * 
1203  * @param cpi peer that is requesting the element
1204  * @param msg the element request message
1205  */
1206 static int
1207 handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
1208 {
1209   struct GNUNET_HashCode hashcode;
1210   struct IBF_Key *ibf_key;
1211   unsigned int num;
1212
1213   /* element requests are allowed in every round */
1214
1215   num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
1216   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num);
1217   
1218   ibf_key = (struct IBF_Key *) &msg[1];
1219   while (num--)
1220   {
1221     struct ElementList *head;
1222     ibf_hashcode_from_key (*ibf_key, &hashcode);
1223     head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
1224     send_elements (cpi, head);
1225     ibf_key++;
1226   }
1227   return GNUNET_YES;
1228 }
1229
1230 /**
1231  * If necessary, send a message to the peer, depending on the current
1232  * round.
1233  */
1234 static void
1235 embrace_peer (struct ConsensusPeerInformation *cpi)
1236 {
1237   GNUNET_assert (GNUNET_YES == cpi->hello);
1238   switch (cpi->session->current_round)
1239   {
1240     case CONSENSUS_ROUND_EXCHANGE:
1241       if (cpi->session->partner_outgoing != cpi)
1242         break;
1243       /* fallthrough */
1244     case CONSENSUS_ROUND_INVENTORY:
1245       /* fallthrough */
1246     case CONSENSUS_ROUND_STOCK:
1247       if (cpi == cpi->session->partner_outgoing)
1248         send_strata_estimator (cpi);
1249     default:
1250       break;
1251   }
1252 }
1253
1254
1255 /**
1256  * Handle a HELLO-message, send when another peer wants to join a session where
1257  * our peer is a member. The session may or may not be inhabited yet.
1258  */
1259 static int
1260 handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
1261 {
1262   /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */
1263   struct ConsensusSession *session;
1264
1265   session = sessions_head;
1266   while (NULL != session)
1267   {
1268     if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
1269     {
1270       int idx;
1271       idx = get_peer_idx (&inc->peer_id, session);
1272       GNUNET_assert (-1 != idx);
1273       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx);
1274       inc->cpi = &session->info[idx];
1275       inc->cpi->mst = inc->mst;
1276       inc->cpi->hello = GNUNET_YES;
1277       inc->cpi->socket = inc->socket;
1278       embrace_peer (inc->cpi);
1279       return GNUNET_YES;
1280     }
1281     session = session->next;
1282   }
1283   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n");
1284   return GNUNET_NO;
1285 }
1286
1287
1288 /**
1289  * Send a strata estimator.
1290  *
1291  * @param cpi the peer
1292  */
1293 static void
1294 send_strata_estimator (struct ConsensusPeerInformation *cpi)
1295 {
1296   struct StrataMessage *strata_msg;
1297   void *buf;
1298   size_t msize;
1299   int i;
1300
1301   cpi->apparent_round = cpi->session->current_round;
1302   cpi->ibf_state = IBF_STATE_NONE;
1303   cpi->ibf_bucket_counter = 0;
1304
1305   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n",
1306               cpi->session->local_peer_idx, cpi->session->current_round, (int) (cpi - cpi->session->info));
1307
1308   msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1309
1310   strata_msg = GNUNET_malloc (msize);
1311   strata_msg->header.size = htons (msize);
1312   strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1313   strata_msg->round = cpi->session->current_round;
1314   strata_msg->exp_round = cpi->session->exp_round;
1315   strata_msg->exp_subround = cpi->session->exp_subround;
1316   
1317   buf = &strata_msg[1];
1318   for (i = 0; i < STRATA_COUNT; i++)
1319   {
1320     ibf_write (cpi->session->se->strata[i], &buf, NULL);
1321   }
1322
1323   queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg);
1324 }
1325
1326
1327 /**
1328  * Send an IBF of the order specified in cpi.
1329  *
1330  * @param cpi the peer
1331  */
1332 static void
1333 send_ibf (struct ConsensusPeerInformation *cpi)
1334 {
1335   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
1336               cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1337
1338   cpi->ibf_bucket_counter = 0;
1339   while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
1340   {
1341     int num_buckets;
1342     void *buf;
1343     struct DifferenceDigest *digest;
1344     int msize;
1345
1346     num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
1347     /* limit to maximum */
1348     if (num_buckets > BUCKETS_PER_MESSAGE)
1349       num_buckets = BUCKETS_PER_MESSAGE;
1350
1351     msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
1352
1353     digest = GNUNET_malloc (msize);
1354     digest->header.size = htons (msize);
1355     digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1356     digest->order = cpi->ibf_order;
1357     digest->round = cpi->apparent_round;
1358
1359     buf = &digest[1];
1360     ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL);
1361
1362     queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest);
1363
1364     cpi->ibf_bucket_counter += num_buckets;
1365   }
1366   cpi->ibf_bucket_counter = 0;
1367   cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
1368 }
1369
1370
1371 /**
1372  * Decode the current diff ibf, and send elements/requests/reports/
1373  *
1374  * @param cpi partner peer
1375  */
1376 static void
1377 decode (struct ConsensusPeerInformation *cpi)
1378 {
1379   struct IBF_Key key;
1380   struct GNUNET_HashCode hashcode;
1381   int side;
1382
1383   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1384
1385   for (;;)
1386   {
1387     int res;
1388
1389     res = ibf_decode (cpi->ibf, &side, &key);
1390     if (GNUNET_SYSERR == res)
1391     {
1392       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
1393       /* decoding failed, we tell the other peer by sending our ibf with a larger order */
1394       cpi->ibf_order++;
1395       prepare_ibf (cpi);
1396       cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1397       cpi->ibf_state = IBF_STATE_TRANSMITTING;
1398       cpi->ibf_bucket_counter = 0;
1399       send_ibf (cpi);
1400       return;
1401     }
1402     if (GNUNET_NO == res)
1403     {
1404       struct ConsensusRoundMessage *msg;
1405       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx);
1406       msg = GNUNET_malloc (sizeof *msg);
1407       msg->header.size = htons (sizeof *msg);
1408       msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
1409       msg->round = cpi->apparent_round;
1410       queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg);
1411       return;
1412     }
1413     if (-1 == side)
1414     {
1415       struct ElementList *head;
1416       /* we have the element(s), send it to the other peer */
1417       ibf_hashcode_from_key (key, &hashcode);
1418       head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
1419       send_elements (cpi, head);
1420     }
1421     else
1422     {
1423       struct ElementRequest *msg;
1424       size_t msize;
1425       struct IBF_Key *p;
1426
1427       msize = (sizeof *msg) + sizeof (struct IBF_Key);
1428       msg = GNUNET_malloc (msize);
1429       switch (cpi->apparent_round)
1430       {
1431         case CONSENSUS_ROUND_STOCK:
1432           /* FIXME: check if we really want to request the element */
1433         case CONSENSUS_ROUND_EXCHANGE:
1434           msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1435           break;
1436         case CONSENSUS_ROUND_INVENTORY:
1437           msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
1438           break;
1439         default:
1440           GNUNET_assert (0);
1441       }
1442       msg->header.size = htons (msize);
1443       p = (struct IBF_Key *) &msg[1];
1444       *p = key;
1445       queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg);
1446     }
1447   }
1448 }
1449
1450
1451 /**
1452  * Functions with this signature are called whenever a
1453  * complete message is received by the tokenizer.
1454  *
1455  * Do not call GNUNET_SERVER_mst_destroy in callback
1456  *
1457  * @param cls closure
1458  * @param client identification of the client
1459  * @param message the actual message
1460  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1461  */
1462 static int
1463 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
1464 {
1465   struct ConsensusPeerInformation *cpi;
1466   cpi =  cls;
1467   switch (ntohs (message->type))
1468   {
1469     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
1470       return handle_p2p_strata (cpi, (struct StrataMessage *) message);
1471     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
1472       return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
1473     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
1474       return handle_p2p_element (cpi, message);
1475     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT:
1476       return handle_p2p_element_report (cpi, message);
1477     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
1478       return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
1479     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED:
1480       return handle_p2p_synced (cpi, message);
1481     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN:
1482       return handle_p2p_fin (cpi, message);
1483     default:
1484       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n",
1485                   ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey));
1486   }
1487   return GNUNET_OK;
1488 }
1489
1490
1491 /**
1492  * Handle tokenized messages from stream sockets.
1493  * Delegate them if the socket belongs to a session,
1494  * handle hello messages otherwise.
1495  *
1496  * Do not call GNUNET_SERVER_mst_destroy in callback
1497  *
1498  * @param cls closure, unused
1499  * @param client incoming socket this message comes from
1500  * @param message the actual message
1501  *
1502  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1503  */
1504 static int
1505 mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
1506 {
1507   struct IncomingSocket *inc;
1508   inc = (struct IncomingSocket *) client;
1509   switch (ntohs( message->type))
1510   {
1511     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
1512       return handle_p2p_hello (inc, (struct ConsensusHello *) message);
1513     default:
1514       if (NULL != inc->cpi)
1515         return mst_session_callback (inc->cpi, client, message);
1516       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n",
1517                   ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey));
1518   }
1519   return GNUNET_OK;
1520 }
1521
1522
1523 /**
1524  * Functions of this type are called upon new stream connection from other peers
1525  * or upon binding error which happen when the app_port given in
1526  * GNUNET_STREAM_listen() is already taken.
1527  *
1528  * @param cls the closure from GNUNET_STREAM_listen
1529  * @param socket the socket representing the stream; NULL on binding error
1530  * @param initiator the identity of the peer who wants to establish a stream
1531  *            with us; NULL on binding error
1532  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1533  *             stream (the socket will be invalid after the call)
1534  */
1535 static int
1536 listen_cb (void *cls,
1537            struct GNUNET_STREAM_Socket *socket,
1538            const struct GNUNET_PeerIdentity *initiator)
1539 {
1540   struct IncomingSocket *incoming;
1541   GNUNET_assert (NULL != socket);
1542   incoming = GNUNET_malloc (sizeof *incoming);
1543   incoming->socket = socket;
1544   incoming->peer_id = *initiator;
1545   incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1546                                      &incoming_stream_data_processor, incoming);
1547   incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
1548   GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
1549   return GNUNET_OK;
1550 }
1551
1552
1553 /**
1554  * Iterator over hash map entries.
1555  *
1556  * @param cls closure
1557  * @param key current key code
1558  * @param value value in the hash map
1559  * @return GNUNET_YES if we should continue to
1560  *         iterate,
1561  *         GNUNET_NO if not.
1562  */
1563 static int
1564 destroy_element_list_iter (void *cls,
1565                            const struct GNUNET_HashCode * key,
1566                            void *value)
1567 {
1568   struct ElementList *el;
1569   el = value;
1570   while (NULL != el)
1571   {
1572     struct ElementList *el_old;
1573     el_old = el;
1574     el = el->next;
1575     GNUNET_free (el_old->element_hash);
1576     GNUNET_free (el_old->element);
1577     GNUNET_free (el_old);
1578   }
1579   return GNUNET_YES;
1580 }
1581
1582
1583 /**
1584  * Destroy a session, free all resources associated with it.
1585  * 
1586  * @param session the session to destroy
1587  */
1588 static void
1589 destroy_session (struct ConsensusSession *session)
1590 {
1591   int i;
1592
1593   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
1594   GNUNET_SERVER_client_drop (session->client);
1595   session->client = NULL;
1596   if (NULL != session->shuffle)
1597   {
1598     GNUNET_free (session->shuffle);
1599     session->shuffle = NULL;
1600   }
1601   if (NULL != session->se)
1602   {
1603     strata_estimator_destroy (session->se);
1604     session->se = NULL;
1605   }
1606   if (NULL != session->info)
1607   {
1608     for (i = 0; i < session->num_peers; i++)
1609     {
1610       struct ConsensusPeerInformation *cpi;
1611       cpi = &session->info[i];
1612       if ((NULL != cpi) && (NULL != cpi->socket))
1613       {
1614         if (NULL != cpi->rh)
1615         {
1616           GNUNET_STREAM_read_cancel (cpi->rh);
1617           cpi->rh = NULL;
1618         } 
1619         if (NULL != cpi->wh)
1620         {
1621           GNUNET_STREAM_write_cancel (cpi->wh);
1622           cpi->wh = NULL;
1623         } 
1624         GNUNET_STREAM_close (cpi->socket);
1625         cpi->socket = NULL;
1626       }
1627       if (NULL != cpi->se)
1628       {
1629         strata_estimator_destroy (cpi->se);
1630         cpi->se = NULL;
1631       }
1632       if (NULL != cpi->ibf)
1633       {
1634         ibf_destroy (cpi->ibf);
1635         cpi->ibf = NULL;
1636       }
1637       if (NULL != cpi->mst)
1638       {
1639         GNUNET_SERVER_mst_destroy (cpi->mst);
1640         cpi->mst = NULL;
1641       }
1642     }
1643     GNUNET_free (session->info);
1644     session->info = NULL;
1645   }
1646   if (NULL != session->ibfs)
1647   {
1648     for (i = 0; i <= MAX_IBF_ORDER; i++)
1649     {
1650       if (NULL != session->ibfs[i])
1651       {
1652         ibf_destroy (session->ibfs[i]);
1653         session->ibfs[i] = NULL;
1654       }
1655     }
1656     GNUNET_free (session->ibfs);
1657     session->ibfs = NULL;
1658   }
1659   if (NULL != session->values)
1660   {
1661     GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_list_iter, NULL);
1662     GNUNET_CONTAINER_multihashmap_destroy (session->values);
1663     session->values = NULL;
1664   }
1665   GNUNET_free (session);
1666 }
1667
1668
1669 /**
1670  * Disconnect a client, and destroy all sessions associated with it.
1671  *
1672  * @param client the client to disconnect
1673  */
1674 static void
1675 disconnect_client (struct GNUNET_SERVER_Client *client)
1676 {
1677   struct ConsensusSession *session;
1678   GNUNET_SERVER_client_disconnect (client);
1679   
1680   /* if the client owns a session, remove it */
1681   session = sessions_head;
1682   while (NULL != session)
1683   {
1684     if (client == session->client)
1685     {
1686       destroy_session (session);
1687       break;
1688     }
1689     session = session->next;
1690   }
1691 }
1692
1693
1694 /**
1695  * Compute a global, (hopefully) unique consensus session id,
1696  * from the local id of the consensus session, and the identities of all participants.
1697  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
1698  * exactly the same peers, the global id will be different.
1699  *
1700  * @param session session to generate the global id for
1701  * @param session_id local id of the consensus session
1702  */
1703 static void
1704 compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id)
1705 {
1706   int i;
1707   struct GNUNET_HashCode tmp;
1708
1709   session->global_id = *session_id;
1710   for (i = 0; i < session->num_peers; ++i)
1711   {
1712     GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp);
1713     session->global_id = tmp;
1714     GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
1715     session->global_id = tmp;
1716   }
1717 }
1718
1719
1720 /**
1721  * Transmit a queued message to the session's client.
1722  *
1723  * @param cls consensus session
1724  * @param size number of bytes available in buf
1725  * @param buf where the callee should write the message
1726  * @return number of bytes written to buf
1727  */
1728 static size_t
1729 transmit_queued (void *cls, size_t size,
1730                  void *buf)
1731 {
1732   struct ConsensusSession *session;
1733   struct QueuedMessage *qmsg;
1734   size_t msg_size;
1735
1736   session = cls;
1737   session->client_th = NULL;
1738
1739   qmsg = session->client_messages_head;
1740   GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
1741   GNUNET_assert (qmsg);
1742
1743   if (NULL == buf)
1744   {
1745     destroy_session (session);
1746     return 0;
1747   }
1748
1749   msg_size = ntohs (qmsg->msg->size);
1750
1751   GNUNET_assert (size >= msg_size);
1752
1753   memcpy (buf, qmsg->msg, msg_size);
1754   GNUNET_free (qmsg->msg);
1755   GNUNET_free (qmsg);
1756
1757   client_send_next (session);
1758
1759   return msg_size;
1760 }
1761
1762
1763 /**
1764  * Schedule transmitting the next queued message (if any) to the inhabiting client of a session.
1765  *
1766  * @param session the consensus session
1767  */
1768 static void
1769 client_send_next (struct ConsensusSession *session)
1770 {
1771
1772   GNUNET_assert (NULL != session);
1773
1774   if (NULL != session->client_th)
1775     return;
1776
1777   if (NULL != session->client_messages_head)
1778   {
1779     int msize;
1780     msize = ntohs (session->client_messages_head->msg->size);
1781     session->client_th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, 
1782                                                               GNUNET_TIME_UNIT_FOREVER_REL,
1783                                                               &transmit_queued, session);
1784   }
1785 }
1786
1787
1788 /**
1789  * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
1790  * the correct signature to be used with e.g. qsort.
1791  * We use this function instead.
1792  *
1793  * @param h1 some hash code
1794  * @param h2 some hash code
1795  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
1796  */
1797 static int
1798 hash_cmp (const void *h1, const void *h2)
1799 {
1800   return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
1801 }
1802
1803
1804 /**
1805  * Search peer in the list of peers in session.
1806  *
1807  * @param peer peer to find
1808  * @param session session with peer
1809  * @return index of peer, -1 if peer is not in session
1810  */
1811 static int
1812 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
1813 {
1814   int i;
1815   for (i = 0; i < session->num_peers; i++)
1816     if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
1817       return i;
1818   return -1;
1819 }
1820
1821
1822 /**
1823  * Called when stream has finishes writing the hello message
1824  */
1825 static void
1826 hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1827 {
1828   struct ConsensusPeerInformation *cpi;
1829
1830   cpi = cls;
1831   cpi->wh = NULL;
1832   cpi->hello = GNUNET_YES;
1833   GNUNET_assert (GNUNET_STREAM_OK == status);
1834   embrace_peer (cpi);
1835 }
1836
1837
1838 /**
1839  * Called when we established a stream connection to another peer
1840  *
1841  * @param cls cpi of the peer we just connected to
1842  * @param socket socket to use to communicate with the other side (read/write)
1843  */
1844 static void
1845 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1846 {
1847   struct ConsensusPeerInformation *cpi;
1848   struct ConsensusHello *hello;
1849
1850   cpi = cls;
1851   hello = GNUNET_malloc (sizeof *hello);
1852   hello->header.size = htons (sizeof *hello);
1853   hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1854   memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1855   GNUNET_assert (NULL == cpi->mst);
1856   cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
1857   cpi->wh =
1858       GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
1859   GNUNET_free (hello);
1860   cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1861                                 &session_stream_data_processor, cpi);
1862 }
1863
1864
1865 /**
1866  * Create the sorted list of peers for the session,
1867  * add the local peer if not in the join message.
1868  */
1869 static void
1870 initialize_session_peer_list (struct ConsensusSession *session)
1871 {
1872   unsigned int local_peer_in_list;
1873   uint32_t listed_peers;
1874   const struct GNUNET_PeerIdentity *msg_peers;
1875   struct GNUNET_PeerIdentity *peers;
1876   unsigned int i;
1877
1878   GNUNET_assert (NULL != session->join_msg);
1879
1880   /* peers in the join message, may or may not include the local peer */
1881   listed_peers = ntohl (session->join_msg->num_peers);
1882   
1883   session->num_peers = listed_peers;
1884
1885   msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
1886
1887   local_peer_in_list = GNUNET_NO;
1888   for (i = 0; i < listed_peers; i++)
1889   {
1890     if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
1891     {
1892       local_peer_in_list = GNUNET_YES;
1893       break;
1894     }
1895   }
1896
1897   if (GNUNET_NO == local_peer_in_list)
1898     session->num_peers++;
1899
1900   peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
1901
1902   if (GNUNET_NO == local_peer_in_list)
1903     peers[session->num_peers - 1] = *my_peer;
1904
1905   memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
1906   qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
1907
1908   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1909
1910   for (i = 0; i < session->num_peers; ++i)
1911   {
1912     /* initialize back-references, so consensus peer information can
1913      * be used as closure */
1914     session->info[i].session = session;
1915     session->info[i].peer_id = peers[i];
1916   }
1917
1918   free (peers);
1919 }
1920
1921
1922 static void
1923 strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key)
1924 {
1925   uint32_t v;
1926   int i;
1927   v = key->bits[0];
1928   /* count trailing '1'-bits of v */
1929   for (i = 0; v & 1; v>>=1, i++)
1930     /* empty */;
1931   ibf_insert (se->strata[i], ibf_key_from_hashcode (key));
1932 }
1933
1934
1935 /**
1936  * Add incoming peer connections to the session,
1937  * for peers who have connected to us before the local session has been established
1938  *
1939  * @param session ...
1940  */
1941 static void
1942 add_incoming_peers (struct ConsensusSession *session)
1943 {
1944   struct IncomingSocket *inc;
1945   inc = incoming_sockets_head;
1946
1947   while (NULL != inc)
1948   {
1949     if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid))
1950     {
1951       int i;
1952       for (i = 0; i < session->num_peers; i++)
1953       {
1954         struct ConsensusPeerInformation *cpi;
1955         cpi = &session->info[i];
1956         if (0 == memcmp (&inc->peer_id, &cpi->peer_id, sizeof (struct GNUNET_PeerIdentity)))
1957         {
1958           cpi->socket = inc->socket;
1959           inc->cpi = cpi;
1960           inc->cpi->mst = inc->mst;
1961           inc->cpi->hello = GNUNET_YES;
1962           break;
1963         }
1964       }
1965     }
1966     inc = inc->next;
1967   }
1968 }
1969
1970
1971 /**
1972  * Initialize the session, continue receiving messages from the owning client
1973  *
1974  * @param session the session to initialize
1975  */
1976 static void
1977 initialize_session (struct ConsensusSession *session)
1978 {
1979   const struct ConsensusSession *other_session;
1980
1981   GNUNET_assert (NULL != session->join_msg);
1982   initialize_session_peer_list (session);
1983   session->current_round = CONSENSUS_ROUND_BEGIN;
1984   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
1985   compute_global_id (session, &session->join_msg->session_id);
1986
1987   /* Check if some local client already owns the session. */
1988   other_session = sessions_head;
1989   while (NULL != other_session)
1990   {
1991     if ((other_session != session) && 
1992         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1993     {
1994       /* session already owned by another client */
1995       GNUNET_break (0);
1996       disconnect_client (session->client);
1997       return;
1998     }
1999     other_session = other_session->next;
2000   }
2001
2002   session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2003   session->local_peer_idx = get_peer_idx (my_peer, session);
2004   GNUNET_assert (-1 != session->local_peer_idx);
2005   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
2006   session->se = strata_estimator_create ();
2007   session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
2008   GNUNET_free (session->join_msg);
2009   session->join_msg = NULL;
2010   add_incoming_peers (session);
2011   GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
2012   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2013 }
2014
2015
2016 /**
2017  * Called when a client wants to join a consensus session.
2018  *
2019  * @param cls unused
2020  * @param client client that sent the message
2021  * @param m message sent by the client
2022  */
2023 static void
2024 client_join (void *cls,
2025              struct GNUNET_SERVER_Client *client,
2026              const struct GNUNET_MessageHeader *m)
2027 {
2028   struct ConsensusSession *session;
2029
2030   // make sure the client has not already joined a session
2031   session = sessions_head;
2032   while (NULL != session)
2033   {
2034     if (session->client == client)
2035     {
2036       GNUNET_break (0);
2037       disconnect_client (client);
2038       return;
2039     }
2040     session = session->next;
2041   }
2042
2043   session = GNUNET_malloc (sizeof (struct ConsensusSession));
2044   session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
2045   session->client = client;
2046   GNUNET_SERVER_client_keep (client);
2047
2048   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
2049
2050   // Initialize session later if local peer identity is not known yet.
2051   if (NULL == my_peer)
2052   {
2053     GNUNET_SERVER_disable_receive_done_warning (client);
2054     return;
2055   }
2056
2057   initialize_session (session);
2058 }
2059
2060
2061 /**
2062  * Hash a block of data, producing a replicated ibf hash.
2063  */
2064 static void
2065 hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret)
2066 {
2067   struct IBF_Key ibf_key;
2068   GNUNET_CRYPTO_hash (block, size, ret);
2069   ibf_key = ibf_key_from_hashcode (ret);
2070   ibf_hashcode_from_key (ibf_key, ret);
2071 }
2072
2073
2074 static void
2075 insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
2076 {
2077   struct GNUNET_HashCode hash;
2078   struct ElementList *head;
2079
2080   hash_for_ibf (element->data, element->size, &hash);
2081
2082   head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash);
2083
2084   if (NULL == head)
2085   {
2086     int i;
2087
2088     head = GNUNET_malloc (sizeof *head);
2089     head->element = element;
2090     head->next = NULL;
2091     head->element_hash = GNUNET_memdup (&hash, sizeof hash);
2092     GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head,
2093                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2094     strata_estimator_insert (session->se, &hash);
2095
2096     for (i = 0; i <= MAX_IBF_ORDER; i++)
2097       if (NULL != session->ibfs[i])
2098         ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash));
2099   }
2100   else
2101   {
2102     struct ElementList *el;
2103     el = GNUNET_malloc (sizeof *el);
2104     head->element = element;
2105     head->next = NULL;
2106     head->element_hash = GNUNET_memdup (&hash, sizeof hash);
2107     while (NULL != head->next)
2108       head = head->next;
2109     head->next = el;
2110   }
2111 }
2112
2113
2114 /**
2115  * Called when a client performs an insert operation.
2116  *
2117  * @param cls (unused)
2118  * @param client client handle
2119  * @param m message sent by the client
2120  */
2121 void
2122 client_insert (void *cls,
2123              struct GNUNET_SERVER_Client *client,
2124              const struct GNUNET_MessageHeader *m)
2125 {
2126   struct ConsensusSession *session;
2127   struct GNUNET_CONSENSUS_ElementMessage *msg;
2128   struct GNUNET_CONSENSUS_Element *element;
2129   int element_size;
2130
2131   session = sessions_head;
2132   while (NULL != session)
2133   {
2134     if (session->client == client)
2135       break;
2136   }
2137
2138   if (NULL == session)
2139   {
2140     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
2141     GNUNET_SERVER_client_disconnect (client);
2142     return;
2143   }
2144
2145   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
2146   element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
2147
2148   element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
2149
2150   element->type = msg->element_type;
2151   element->size = element_size;
2152   memcpy (&element[1], &msg[1], element_size);
2153   element->data = &element[1];
2154
2155   GNUNET_assert (NULL != element->data);
2156
2157   insert_element (session, element);
2158
2159   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2160
2161   client_send_next (session);
2162 }
2163
2164
2165
2166 /**
2167  * Functions of this signature are called whenever writing operations
2168  * on a stream are executed
2169  *
2170  * @param cls the closure from GNUNET_STREAM_write
2171  * @param status the status of the stream at the time this function is called;
2172  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
2173  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
2174  *          (this doesn't mean that the data is never sent, the receiver may
2175  *          have read the data but its ACKs may have been lost);
2176  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
2177  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
2178  *          be processed.
2179  * @param size the number of bytes written
2180  */
2181 static void 
2182 write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
2183 {
2184   struct ConsensusPeerInformation *cpi;
2185
2186   GNUNET_assert (GNUNET_STREAM_OK == status);
2187   cpi = cls;
2188   cpi->wh = NULL;
2189   if (NULL != cpi->messages_head)
2190   {
2191     struct QueuedMessage *qm;
2192     qm = cpi->messages_head;
2193     GNUNET_CONTAINER_DLL_remove (cpi->messages_head, cpi->messages_tail, qm);
2194     cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
2195                                    GNUNET_TIME_UNIT_FOREVER_REL,
2196                                    write_queued, cpi);
2197     if (NULL != qm->cb)
2198       qm->cb (qm->cls);
2199     GNUNET_free (qm->msg);
2200     GNUNET_free (qm);
2201     GNUNET_assert (NULL != cpi->wh);
2202   }
2203 }
2204
2205
2206 static void
2207 shuffle (struct ConsensusSession *session)
2208 {
2209   /* FIXME: implement */
2210 }
2211
2212
2213 /**
2214  * Find and set the partner_incoming and partner_outgoing of our peer,
2215  * one of them may not exist in most cases.
2216  *
2217  * @param session the consensus session
2218  */
2219 static void
2220 find_partners (struct ConsensusSession *session)
2221 {
2222   int mark[session->num_peers];
2223   int i;
2224   memset (mark, 0, session->num_peers * sizeof (int));
2225   session->partner_incoming = session->partner_outgoing = NULL;
2226   for (i = 0; i < session->num_peers; i++)
2227   {
2228     int arc;
2229     if (0 != mark[i])
2230       continue;
2231     arc = (i + (1 << session->exp_subround)) % session->num_peers;
2232     mark[i] = mark[arc] = 1;
2233     GNUNET_assert (i != arc);
2234     if (i == session->local_peer_idx)
2235     {
2236       GNUNET_assert (NULL == session->partner_outgoing);
2237       session->partner_outgoing = &session->info[session->shuffle[arc]];
2238       session->partner_outgoing->exp_subround_finished = GNUNET_NO;
2239     }
2240     if (arc == session->local_peer_idx)
2241     {
2242       GNUNET_assert (NULL == session->partner_incoming);
2243       session->partner_incoming = &session->info[session->shuffle[i]];
2244       session->partner_incoming->exp_subround_finished = GNUNET_NO;
2245     }
2246   }
2247 }
2248
2249
2250 static void
2251 replay_premature_message (struct ConsensusPeerInformation *cpi)
2252 {
2253   if (NULL != cpi->premature_strata_message)
2254   {
2255     struct StrataMessage *sm;
2256
2257     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
2258     sm = cpi->premature_strata_message;
2259     cpi->premature_strata_message = NULL;
2260
2261     cpi->replaying_strata_message = GNUNET_YES;
2262     handle_p2p_strata (cpi, sm);
2263     cpi->replaying_strata_message = GNUNET_NO;
2264
2265     GNUNET_free (sm);
2266   }
2267 }
2268
2269
2270 /**
2271  * Do the next subround in the exp-scheme.
2272  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
2273  *
2274  * @param cls the session
2275  * @param tc task context, for when this task is invoked by the scheduler,
2276  *           NULL if invoked for another reason
2277  */
2278 static void
2279 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2280 {
2281   struct ConsensusSession *session;
2282   int i;
2283
2284   /* don't kick off next subround if we're shutting down */
2285   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2286     return;
2287   session = cls;
2288   /* don't send any messages from the last round */
2289   /*
2290   clear_peer_messages (session->partner_outgoing);
2291   clear_peer_messages (session->partner_incoming);
2292   for (i = 0; i < session->num_peers; i++)
2293     clear_peer_messages (&session->info[i]);
2294   */
2295   /* cancel timeout */
2296   if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
2297     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
2298   session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
2299   /* check if we are done with the log phase, 2-peer consensus only does one log round */
2300   if ( (session->exp_round == NUM_EXP_ROUNDS) ||
2301        ((session->num_peers == 2) && (session->exp_round == 1)))
2302   {
2303     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
2304     round_over (session, NULL);
2305     return;
2306   }
2307   if (session->exp_round == 0)
2308   {
2309     /* initialize everything for the log-rounds */
2310     session->exp_round = 1;
2311     session->exp_subround = 0;
2312     if (NULL == session->shuffle)
2313       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
2314     for (i = 0; i < session->num_peers; i++)
2315       session->shuffle[i] = i;
2316   }
2317   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
2318   {
2319     /* subrounds done, start new log-round */
2320     session->exp_round++;
2321     session->exp_subround = 0;
2322     shuffle (session);
2323   }
2324   else 
2325   {
2326     session->exp_subround++;
2327   }
2328
2329   find_partners (session);
2330
2331 #ifdef GNUNET_EXTRA_LOGGING
2332   {
2333     int in;
2334     int out;
2335     if (session->partner_outgoing == NULL)
2336       out = -1;
2337     else
2338       out = (int) (session->partner_outgoing - session->info);
2339     if (session->partner_incoming == NULL)
2340       in = -1;
2341     else
2342       in = (int) (session->partner_incoming - session->info);
2343     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
2344                 session->exp_round, session->exp_subround, in, out);
2345   }
2346 #endif /* GNUNET_EXTRA_LOGGING */
2347
2348   if (NULL != session->partner_incoming)
2349   {
2350     session->partner_incoming->ibf_state = IBF_STATE_NONE;
2351     session->partner_incoming->exp_subround_finished = GNUNET_NO;
2352     session->partner_incoming->ibf_bucket_counter = 0;
2353
2354     /* maybe there's an early strata estimator? */
2355     replay_premature_message (session->partner_incoming);
2356   }
2357
2358   if (NULL != session->partner_outgoing)
2359   {
2360     session->partner_outgoing->ibf_state = IBF_STATE_NONE;
2361     session->partner_outgoing->ibf_bucket_counter = 0;
2362     session->partner_outgoing->exp_subround_finished = GNUNET_NO;
2363
2364     if (NULL == session->partner_outgoing->socket)
2365     {
2366       session->partner_outgoing->socket =
2367           GNUNET_STREAM_open (cfg, &session->partner_outgoing->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
2368                               open_cb, session->partner_outgoing,
2369                               GNUNET_STREAM_OPTION_END);
2370     }
2371     else if (GNUNET_YES == session->partner_outgoing->hello)
2372     {
2373       send_strata_estimator (session->partner_outgoing);
2374     }
2375     /* else: do nothing, the send hello cb will handle this */
2376   }
2377
2378   /*
2379   session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
2380                                                                    subround_over, session);
2381   */
2382 }
2383
2384 static void
2385 contact_peer_a2a (struct ConsensusPeerInformation *cpi)
2386 {
2387   cpi->is_outgoing = GNUNET_YES;
2388   if (NULL == cpi->socket)
2389   {
2390     cpi->socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
2391                                       open_cb, cpi, GNUNET_STREAM_OPTION_END);
2392   }
2393   else if (GNUNET_YES == cpi->hello)
2394   {
2395     send_strata_estimator (cpi);
2396   }
2397 }
2398
2399 /**
2400  * Start the inventory round, contact all peers we are supposed to contact.
2401  *
2402  * @param session the current session
2403  */
2404 static void
2405 start_inventory (struct ConsensusSession *session)
2406 {
2407   int i;
2408   int last;
2409
2410   for (i = 0; i < session->num_peers; i++)
2411   {
2412     session->info[i].ibf_bucket_counter = 0;
2413     session->info[i].ibf_state = IBF_STATE_NONE;
2414     session->info[i].is_outgoing = GNUNET_NO;
2415   }
2416
2417   last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
2418   i = (session->local_peer_idx + 1) % session->num_peers;
2419   while (i != last)
2420   {
2421     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
2422     contact_peer_a2a (&session->info[i]);
2423     session->info[i].is_outgoing = GNUNET_YES;
2424     i = (i + 1) % session->num_peers;
2425   }
2426   // tie-breaker for even number of peers
2427   if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
2428   {
2429     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
2430     session->info[last].is_outgoing = GNUNET_YES;
2431     contact_peer_a2a (&session->info[last]);
2432   }
2433
2434   for (i = 0; i < session->num_peers; i++)
2435   {
2436     if (GNUNET_NO == session->info[i].is_outgoing)
2437       replay_premature_message (&session->info[i]);
2438   }
2439 }
2440
2441 static void
2442 send_client_conclude_done (struct ConsensusSession *session)
2443 {
2444   struct GNUNET_MessageHeader *msg;
2445   session->current_round = CONSENSUS_ROUND_FINISH;
2446   msg = GNUNET_malloc (sizeof *msg);
2447   msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
2448   msg->size = htons (sizeof *msg);
2449   queue_client_message (session, msg);
2450   client_send_next (session);
2451 }
2452
2453 /**
2454  * Start the next round.
2455  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
2456  *
2457  * @param cls the session
2458  * @param tc task context, for when this task is invoked by the scheduler,
2459  *           NULL if invoked for another reason
2460  */
2461 static void 
2462 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2463 {
2464   struct ConsensusSession *session;
2465
2466   /* don't kick off next round if we're shutting down */
2467   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2468     return;
2469
2470   session = cls;
2471   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx);
2472
2473   /*
2474   for (i = 0; i < session->num_peers; i++)
2475     clear_peer_messages (&session->info[i]);
2476   */
2477
2478   if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
2479   {
2480     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
2481     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
2482   }
2483
2484   switch (session->current_round)
2485   {
2486     case CONSENSUS_ROUND_BEGIN:
2487       session->current_round = CONSENSUS_ROUND_EXCHANGE;
2488       session->exp_round = 0;
2489       subround_over (session, NULL);
2490       break;
2491     case CONSENSUS_ROUND_EXCHANGE:
2492       /* handle two peers specially */
2493       if (session->num_peers <= 2)
2494       {
2495         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx);
2496         send_client_conclude_done (session);
2497         return;
2498       }
2499       session->current_round = CONSENSUS_ROUND_INVENTORY;
2500       start_inventory (session);
2501       break;
2502     case CONSENSUS_ROUND_INVENTORY:
2503       session->current_round = CONSENSUS_ROUND_STOCK;
2504       session->exp_round = 0;
2505       subround_over (session, NULL);
2506       break;
2507     case CONSENSUS_ROUND_STOCK:
2508       session->current_round = CONSENSUS_ROUND_FINISH;
2509       send_client_conclude_done (session);
2510       break;
2511     default:
2512       GNUNET_assert (0);
2513   }
2514 }
2515
2516
2517 /**
2518  * Called when a client performs the conclude operation.
2519  *
2520  * @param cls (unused)
2521  * @param client client handle
2522  * @param message message sent by the client
2523  */
2524 static void
2525 client_conclude (void *cls,
2526                  struct GNUNET_SERVER_Client *client,
2527                  const struct GNUNET_MessageHeader *message)
2528 {
2529   struct ConsensusSession *session;
2530   struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
2531
2532   cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
2533
2534   session = sessions_head;
2535   while ((session != NULL) && (session->client != client))
2536     session = session->next;
2537   if (NULL == session)
2538   {
2539     /* client not found */
2540     GNUNET_break (0);
2541     GNUNET_SERVER_client_disconnect (client);
2542     return;
2543   }
2544
2545   if (CONSENSUS_ROUND_BEGIN != session->current_round)
2546   {
2547     /* client requested conclude twice */
2548     GNUNET_break (0);
2549     /* client may still own a session, destroy it */
2550     disconnect_client (client);
2551     return;
2552   }
2553
2554   if (session->num_peers <= 1)
2555   {
2556     send_client_conclude_done (session);
2557   }
2558   else
2559   {
2560     session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
2561     /* the 'begin' round is over, start with the next, real round */
2562     round_over (session, NULL);
2563   }
2564
2565   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2566   client_send_next (session);
2567 }
2568
2569
2570 /**
2571  * Called when a client sends an ack
2572  *
2573  * @param cls (unused)
2574  * @param client client handle
2575  * @param message message sent by the client
2576  */
2577 void
2578 client_ack (void *cls,
2579              struct GNUNET_SERVER_Client *client,
2580              const struct GNUNET_MessageHeader *message)
2581 {
2582   struct ConsensusSession *session;
2583   struct GNUNET_CONSENSUS_AckMessage *msg;
2584   struct PendingElement *pending;
2585   struct GNUNET_CONSENSUS_Element *element;
2586
2587   session = sessions_head;
2588   while (NULL != session)
2589   {
2590     if (session->client == client)
2591       break;
2592   }
2593
2594   if (NULL == session)
2595   {
2596     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n");
2597     GNUNET_SERVER_client_disconnect (client);
2598     return;
2599   }
2600
2601   pending = session->client_approval_head;
2602
2603   GNUNET_CONTAINER_DLL_remove (session->client_approval_head, session->client_approval_tail, pending);
2604
2605   msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
2606
2607   if (msg->keep)
2608   {
2609     element = pending->element;
2610     insert_element (session, element);
2611     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n");
2612   }
2613
2614   GNUNET_free (pending);
2615
2616   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2617 }
2618
2619
2620 /**
2621  * Task that disconnects from core.
2622  *
2623  * @param cls core handle
2624  * @param tc context information (why was this task triggered now)
2625  */
2626 static void
2627 disconnect_core (void *cls,
2628                  const struct GNUNET_SCHEDULER_TaskContext *tc)
2629 {
2630   if (core != NULL)
2631   {
2632     GNUNET_CORE_disconnect (core);
2633     core = NULL;
2634   }
2635   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
2636 }
2637
2638
2639 static void
2640 core_startup (void *cls,
2641               struct GNUNET_CORE_Handle *core,
2642               const struct GNUNET_PeerIdentity *peer)
2643 {
2644   struct ConsensusSession *session;
2645
2646   my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
2647   /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
2648   GNUNET_SCHEDULER_add_now (&disconnect_core, core);
2649   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
2650
2651   session = sessions_head;
2652   while (NULL != session)
2653   {
2654     if (NULL != session->join_msg)
2655       initialize_session (session);
2656     session = session->next;
2657   }
2658 }
2659
2660
2661 /**
2662  * Called to clean up, after a shutdown has been requested.
2663  *
2664  * @param cls closure
2665  * @param tc context information (why was this task triggered now)
2666  */
2667 static void
2668 shutdown_task (void *cls,
2669                const struct GNUNET_SCHEDULER_TaskContext *tc)
2670 {
2671   /* FIXME: complete; write separate destructors for different data types */
2672
2673   while (NULL != incoming_sockets_head)
2674   {
2675     struct IncomingSocket *socket;
2676     socket = incoming_sockets_head;
2677     if (NULL != socket->rh)
2678     {
2679       GNUNET_STREAM_read_cancel (socket->rh);
2680       socket->rh = NULL;
2681     } 
2682     if (NULL == socket->cpi)
2683     {
2684       GNUNET_STREAM_close (socket->socket);
2685       socket->socket = NULL;
2686       if (NULL != socket->mst)
2687       {
2688         GNUNET_SERVER_mst_destroy (socket->mst);
2689         socket->mst = NULL;
2690       }
2691     }
2692     incoming_sockets_head = incoming_sockets_head->next;
2693     GNUNET_free (socket);
2694   }
2695
2696   while (NULL != sessions_head)
2697   {
2698     struct ConsensusSession *session;
2699     session = sessions_head->next;
2700     destroy_session (sessions_head);
2701     sessions_head = session;
2702   }
2703
2704   if (NULL != core)
2705   {
2706     GNUNET_CORE_disconnect (core);
2707     core = NULL;
2708   }
2709
2710   if (NULL != listener)
2711   {
2712     GNUNET_STREAM_listen_close (listener);
2713     listener = NULL;
2714   } 
2715
2716   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
2717 }
2718
2719
2720 /**
2721  * Start processing consensus requests.
2722  *
2723  * @param cls closure
2724  * @param server the initialized server
2725  * @param c configuration to use
2726  */
2727 static void
2728 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
2729 {
2730   /* core is only used to retrieve the peer identity */
2731   static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
2732     {NULL, 0, 0}
2733   };
2734   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2735     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
2736     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
2737     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
2738         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
2739     {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
2740         sizeof (struct GNUNET_CONSENSUS_AckMessage)},
2741     {NULL, NULL, 0, 0}
2742   };
2743
2744   cfg = c;
2745   srv = server;
2746
2747   GNUNET_SERVER_add_handlers (server, server_handlers);
2748
2749   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
2750
2751   listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
2752                                    listen_cb, NULL,
2753                                    GNUNET_STREAM_OPTION_END);
2754
2755   /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
2756   core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
2757   GNUNET_assert (NULL != core);
2758
2759   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
2760 }
2761
2762
2763 /**
2764  * The main function for the consensus service.
2765  *
2766  * @param argc number of arguments from the command line
2767  * @param argv command line arguments
2768  * @return 0 ok, 1 on error
2769  */
2770 int
2771 main (int argc, char *const *argv)
2772 {
2773   int ret;
2774   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
2775   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
2776   return (GNUNET_OK == ret) ? 0 : 1;
2777 }
2778