-fix URIs
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_consensus_service.h"
33 #include "consensus_protocol.h"
34 #include "consensus.h"
35
36
37 /**
38  * Log macro that prefixes the local peer and the peer we are in contact with.
39  *
40  * @param kind log level
41  * @param cpi ConsensusPeerInformation of the partner peer
42  * @param m log message
43  */
44 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
45    cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
46
47
48 /**
49  * Number of exponential rounds, used in the exp and completion round.
50  */
51 #define NUM_EXP_REPETITIONS 4
52
53
54 /* forward declarations */
55
56 /* mutual recursion with struct ConsensusSession */
57 struct ConsensusPeerInformation;
58
59 /* mutual recursion with round_over */
60 static void
61 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
62
63
64 /**
65  * Describes the current round a consensus session is in.
66  */
67 enum ConsensusRound
68 {
69   /**
70    * Not started the protocol yet.
71    */
72   CONSENSUS_ROUND_BEGIN=0,
73   /**
74    * Distribution of elements with the exponential scheme.
75    */
76   CONSENSUS_ROUND_EXCHANGE,
77   /**
78    * Collect and distribute missing values.
79    */
80   CONSENSUS_ROUND_COMPLETION,
81   /**
82    * Consensus concluded. After timeout and finished communication with client,
83    * consensus session will be destroyed.
84    */
85   CONSENSUS_ROUND_FINISH
86 };
87
88
89 /**
90  * Information about the current round.
91  */
92 struct RoundInfo
93 {
94   /**
95    * The current main round.
96    */
97   enum ConsensusRound round;
98   /**
99    * The current exp round repetition, valid if
100    * the main round is an exp round.
101    */
102   uint32_t exp_repetition;
103   /**
104    * The current exp subround, valid if
105    * the main round is an exp round.
106    */
107   uint32_t exp_subround;
108 };
109
110
111 /**
112  * A consensus session consists of one local client and the remote authorities.
113  */
114 struct ConsensusSession
115 {
116   /**
117    * Consensus sessions are kept in a DLL.
118    */
119   struct ConsensusSession *next;
120
121   /**
122    * Consensus sessions are kept in a DLL.
123    */
124   struct ConsensusSession *prev;
125
126   /**
127   * Global consensus identification, computed
128   * from the session id and participating authorities.
129   */
130   struct GNUNET_HashCode global_id;
131
132   /**
133    * Client that inhabits the session
134    */
135   struct GNUNET_SERVER_Client *client;
136
137   /**
138    * Queued messages to the client.
139    */
140   struct GNUNET_MQ_Handle *client_mq;
141
142   /**
143    * Time when the conclusion of the consensus should begin.
144    */
145   struct GNUNET_TIME_Absolute conclude_start;
146
147   /**
148    * Timeout for all rounds together, single rounds will schedule a timeout task
149    * with a fraction of the conclude timeout.
150    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
151    */
152   struct GNUNET_TIME_Absolute conclude_deadline;
153
154   /**
155    * Timeout task identifier for the current round or subround.
156    */
157   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
158
159   /**
160    * Number of other peers in the consensus.
161    */
162   unsigned int num_peers;
163
164   /**
165    * Information about the other peers,
166    * their state, etc.
167    */
168   struct ConsensusPeerInformation *info;
169
170   /**
171    * Index of the local peer in the peers array
172    */
173   unsigned int local_peer_idx;
174
175   /**
176    * Current round
177    */
178   enum ConsensusRound current_round;
179
180   /**
181    * Permutation of peers for the current round,
182    */
183   uint32_t *shuffle;
184
185   /**
186    * Inverse permutation of peers for the current round,
187    */
188   uint32_t *shuffle_inv;
189
190   /**
191    * Current round of the exponential scheme.
192    */
193   uint32_t exp_repetition;
194
195   /**
196    * Current sub-round of the exponential scheme.
197    */
198   uint32_t exp_subround;
199
200   /**
201    * The partner for the current exp-round.
202    * The local peer will initiate the set reconciliation with the
203    * outgoing peer.
204    */
205   struct ConsensusPeerInformation *partner_outgoing;
206
207   /**
208    * The partner for the current exp-round
209    * The incoming peer will initiate the set reconciliation with
210    * the incoming peer.
211    */
212   struct ConsensusPeerInformation *partner_incoming;
213
214   /**
215    * The consensus set of this session.
216    */
217   struct GNUNET_SET_Handle *element_set;
218
219   /**
220    * Listener for requests from other peers.
221    * Uses the session's global id as app id.
222    */
223   struct GNUNET_SET_ListenHandle *set_listener;
224 };
225
226
227 /**
228  * Information about a peer that is in a consensus session.
229  */
230 struct ConsensusPeerInformation
231 {
232   /**
233    * Peer identitty of the peer in the consensus session
234    */
235   struct GNUNET_PeerIdentity peer_id;
236
237   /**
238    * Back-reference to the consensus session,
239    * to that ConsensusPeerInformation can be used as a closure
240    */
241   struct ConsensusSession *session;
242
243   /**
244    * Have we finished the set operation for this (sub-)round?
245    */
246   int set_op_finished;
247
248   /**
249    * Set operation we are currently executing with this peer.
250    */
251   struct GNUNET_SET_OperationHandle *set_op;
252
253   /**
254    * Set operation we are planning on executing with this peer.
255    */
256   struct GNUNET_SET_OperationHandle *delayed_set_op;
257
258   /**
259    * Info about the round of the delayed set operation.
260    */
261   struct RoundInfo delayed_round_info;
262 };
263
264
265 /**
266  * Linked list of sessions this peer participates in.
267  */
268 static struct ConsensusSession *sessions_head;
269
270 /**
271  * Linked list of sessions this peer participates in.
272  */
273 static struct ConsensusSession *sessions_tail;
274
275 /**
276  * Configuration of the consensus service.
277  */
278 static const struct GNUNET_CONFIGURATION_Handle *cfg;
279
280 /**
281  * Handle to the server for this service.
282  */
283 static struct GNUNET_SERVER_Handle *srv;
284
285 /**
286  * Peer that runs this service.
287  */
288 static struct GNUNET_PeerIdentity my_peer;
289
290
291 /**
292  * Check if the current subround has finished.
293  * Must only be called when an exp-round is the current round.
294  *
295  * @param session session to check for exp-round completion
296  * @return GNUNET_YES if the subround has finished,
297  *         GNUNET_NO if not
298  */
299 static int
300 have_exp_subround_finished (const struct ConsensusSession *session)
301 {
302   int not_finished;
303
304   GNUNET_assert (CONSENSUS_ROUND_EXCHANGE == session->current_round);
305
306   not_finished = 0;
307   if ( (NULL != session->partner_outgoing) &&
308        (GNUNET_NO == session->partner_outgoing->set_op_finished) )
309     not_finished++;
310   if ( (NULL != session->partner_incoming) &&
311        (GNUNET_NO == session->partner_incoming->set_op_finished) )
312     not_finished++;
313   if (0 == not_finished)
314     return GNUNET_YES;
315   return GNUNET_NO;
316 }
317
318
319 /**
320  * Destroy a session, free all resources associated with it.
321  *
322  * @param session the session to destroy
323  */
324 static void
325 destroy_session (struct ConsensusSession *session)
326 {
327   int i;
328
329   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
330   if (NULL != session->element_set)
331   {
332     GNUNET_SET_destroy (session->element_set);
333     session->element_set = NULL;
334   }
335   if (NULL != session->set_listener)
336   {
337     GNUNET_SET_listen_cancel (session->set_listener);
338     session->set_listener = NULL;
339   }
340   if (NULL != session->client_mq)
341   {
342     GNUNET_MQ_destroy (session->client_mq);
343     session->client_mq = NULL;
344   }
345   if (NULL != session->client)
346   {
347     GNUNET_SERVER_client_disconnect (session->client);
348     session->client = NULL;
349   }
350   if (NULL != session->shuffle)
351   {
352     GNUNET_free (session->shuffle);
353     session->shuffle = NULL;
354   }
355   if (NULL != session->shuffle_inv)
356   {
357     GNUNET_free (session->shuffle_inv);
358     session->shuffle_inv = NULL;
359   }
360   if (NULL != session->info)
361   {
362     for (i = 0; i < session->num_peers; i++)
363     {
364       struct ConsensusPeerInformation *cpi;
365       cpi = &session->info[i];
366       if (NULL != cpi->set_op)
367       {
368         GNUNET_SET_operation_cancel (cpi->set_op);
369         cpi->set_op = NULL;
370       }
371     }
372     GNUNET_free (session->info);
373     session->info = NULL;
374   }
375   GNUNET_free (session);
376 }
377
378
379 /**
380  * Iterator for set elements.
381  *
382  * @param cls closure
383  * @param element the current element, NULL if all elements have been
384  *        iterated over
385  * @return GNUNET_YES to continue iterating, GNUNET_NO to stop.
386  */
387 static int
388 send_to_client_iter (void *cls,
389                      const struct GNUNET_SET_Element *element)
390 {
391   struct ConsensusSession *session = cls;
392   struct GNUNET_MQ_Envelope *ev;
393
394   if (NULL != element)
395   {
396     struct GNUNET_CONSENSUS_ElementMessage *m;
397
398     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n",
399                 session->local_peer_idx);
400
401     ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
402     m->element_type = htons (element->type);
403     memcpy (&m[1], element->data, element->size);
404     GNUNET_MQ_send (session->client_mq, ev);
405   }
406   else
407   {
408     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n",
409                 session->local_peer_idx);
410     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
411     GNUNET_MQ_send (session->client_mq, ev);
412   }
413   return GNUNET_YES;
414 }
415
416
417 /**
418  * Start the next round.
419  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
420  *
421  * @param cls the session
422  * @param tc task context, for when this task is invoked by the scheduler,
423  *           NULL if invoked for another reason
424  */
425 static void
426 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
427 {
428   struct ConsensusSession *session;
429   unsigned int i;
430   int res;
431
432   /* don't kick off next round if we're shutting down */
433   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
434     return;
435
436   session = cls;
437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
438
439   if (tc != NULL)
440     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
441
442   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
443   {
444     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
445     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
446   }
447
448   for (i = 0; i < session->num_peers; i++)
449   {
450     if (NULL != session->info[i].set_op)
451     {
452       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: canceling stray op with P%d\n",
453                   session->local_peer_idx, i);
454       GNUNET_SET_operation_cancel (session->info[i].set_op);
455       session->info[i].set_op = NULL;
456     }
457     /* we're in the new round, nothing finished yet */
458     session->info[i].set_op_finished = GNUNET_NO;
459   }
460
461   switch (session->current_round)
462   {
463     case CONSENSUS_ROUND_BEGIN:
464       session->current_round = CONSENSUS_ROUND_EXCHANGE;
465       session->exp_repetition = 0;
466       subround_over (session, NULL);
467       break;
468     case CONSENSUS_ROUND_EXCHANGE:
469       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
470                   session->local_peer_idx);
471       session->current_round = CONSENSUS_ROUND_FINISH;
472       res = GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
473       if (GNUNET_SYSERR == res)
474       {
475         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "can't iterate set: set invalid\n");
476       }
477       else if (GNUNET_NO == res)
478       {
479         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "can't iterate set: iterator already active\n");
480       }
481       break;
482     default:
483       GNUNET_assert (0);
484   }
485 }
486
487
488 /**
489  * Create a new permutation for the session's peers in session->shuffle.
490  * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
491  * both the global session id and the current round index.
492  *
493  * @param session the session to create the new permutation for
494  */
495 static void
496 shuffle (struct ConsensusSession *session)
497 {
498   uint32_t i;
499   uint32_t randomness[session->num_peers-1];
500
501   if (NULL == session->shuffle)
502     session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
503   if (NULL == session->shuffle_inv)
504     session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
505
506   GNUNET_CRYPTO_kdf (randomness, sizeof (randomness),
507                      &session->exp_repetition, sizeof (uint32_t),
508                      &session->global_id, sizeof (struct GNUNET_HashCode),
509                      NULL);
510
511   for (i = 0; i < session->num_peers; i++)
512     session->shuffle[i] = i;
513
514   for (i = session->num_peers - 1; i > 0; i--)
515   {
516     uint32_t x;
517     uint32_t tmp;
518     x = randomness[i-1] % session->num_peers;
519     tmp = session->shuffle[x];
520     session->shuffle[x] = session->shuffle[i];
521     session->shuffle[i] = tmp;
522   }
523
524   /* create the inverse */
525   for (i = 0; i < session->num_peers; i++)
526     session->shuffle_inv[session->shuffle[i]] = i;
527 }
528
529
530 /**
531  * Find and set the partner_incoming and partner_outgoing of our peer,
532  * one of them may not exist (and thus set to NULL) if the number of peers
533  * in the session is not a power of two.
534  *
535  * @param session the consensus session
536  */
537 static void
538 find_partners (struct ConsensusSession *session)
539 {
540   unsigned int arc;
541   unsigned int num_ghosts;
542   unsigned int largest_arc;
543   int partner_idx;
544
545   /* shuffled local index */
546   int my_idx = session->shuffle[session->local_peer_idx];
547
548   /* distance to neighboring peer in current subround */
549   arc = 1 << session->exp_subround;
550   largest_arc = 1;
551   while (largest_arc < session->num_peers)
552     largest_arc <<= 1;
553   num_ghosts = largest_arc - session->num_peers;
554   // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
555   // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
556   // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
557
558   if (0 == (my_idx & arc))
559   {
560     /* we are outgoing */
561     partner_idx = (my_idx + arc) % session->num_peers;
562     session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
563     GNUNET_assert (GNUNET_NO == session->partner_outgoing->set_op_finished);
564     /* are we a 'ghost' of a peer that would exist if
565      * the number of peers was a power of two, and thus have to partner
566      * with an additional peer?
567      */
568     if (my_idx < num_ghosts)
569     {
570       int ghost_partner_idx;
571       // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
572       ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
573       /* platform dependent; modulo sometimes returns negative values */
574       if (ghost_partner_idx < 0)
575         ghost_partner_idx += session->num_peers;
576       /* we only need to have a ghost partner if the partner is outgoing */
577       if (0 == (ghost_partner_idx & arc))
578       {
579         // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
580         session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
581         GNUNET_assert (GNUNET_NO == session->partner_incoming->set_op_finished);
582         return;
583       }
584     }
585     session->partner_incoming = NULL;
586     return;
587   }
588   /* we only have an incoming connection */
589   partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
590   if (partner_idx < 0)
591     partner_idx += session->num_peers;
592   session->partner_outgoing = NULL;
593   session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
594   GNUNET_assert (GNUNET_NO == session->partner_incoming->set_op_finished);
595 }
596
597
598 /**
599  * Callback for set operation results. Called for each element
600  * in the result set.
601  *
602  * @param cls closure
603  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
604  * @param status see enum GNUNET_SET_Status
605  */
606 static void
607 set_result_cb (void *cls,
608                const struct GNUNET_SET_Element *element,
609                enum GNUNET_SET_Status status)
610 {
611   struct ConsensusPeerInformation *cpi = cls;
612   unsigned int remote_idx = cpi - cpi->session->info;
613   unsigned int local_idx = cpi->session->local_peer_idx;
614
615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u with status %u\n",
616               local_idx, remote_idx, (unsigned int) status);
617
618   GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
619                  (cpi == cpi->session->partner_incoming));
620
621   switch (status)
622   {
623     case GNUNET_SET_STATUS_OK:
624       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
625                   local_idx, remote_idx);
626       break;
627     case GNUNET_SET_STATUS_FAILURE:
628       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
629                   local_idx, remote_idx);
630       cpi->set_op = NULL;
631       return;
632     case GNUNET_SET_STATUS_HALF_DONE:
633     case GNUNET_SET_STATUS_DONE:
634       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
635                   local_idx, remote_idx);
636       cpi->set_op_finished = GNUNET_YES;
637       cpi->set_op = NULL;
638       if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
639       {
640         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
641                     local_idx);
642         subround_over (cpi->session, NULL);
643       }
644       else
645       {
646         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
647                     local_idx);
648       }
649       return;
650     default:
651       GNUNET_break (0);
652       return;
653   }
654
655   switch (cpi->session->current_round)
656   {
657     case CONSENSUS_ROUND_COMPLETION:
658     case CONSENSUS_ROUND_EXCHANGE:
659       GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
660       break;
661     default:
662       GNUNET_break (0);
663       return;
664   }
665 }
666
667
668 /**
669  * Compare the round the session is in with the round of the given context message.
670  *
671  * @param session a consensus session
672  * @param ri a round context message
673  * @return 0 if it's the same round, -1 if the session is in an earlier round,
674  *         1 if the session is in a later round
675  */
676 static int
677 rounds_compare (struct ConsensusSession *session,
678                 struct RoundInfo* ri)
679 {
680   if (session->current_round < ri->round)
681     return -1;
682   if (session->current_round > ri->round)
683     return 1;
684   if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
685   {
686     if (session->exp_repetition < ri->exp_repetition)
687       return -1;
688     if (session->exp_repetition > ri->exp_repetition)
689       return 1;
690     if (session->exp_subround < ri->exp_subround)
691       return -1;
692     if (session->exp_subround > ri->exp_subround)
693       return 1;
694     return 0;
695   }
696   /* other rounds have no subrounds / repetitions to compare */
697   return 0;
698 }
699
700
701 /**
702  * Do the next subround in the exp-scheme.
703  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
704  *
705  * @param cls the session
706  * @param tc task context, for when this task is invoked by the scheduler,
707  *           NULL if invoked for another reason
708  */
709 static void
710 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
711 {
712   struct ConsensusSession *session;
713   struct GNUNET_TIME_Relative subround_timeout;
714   int i;
715
716   /* don't kick off next subround if we're shutting down */
717   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
718     return;
719
720   session = cls;
721
722   GNUNET_assert (CONSENSUS_ROUND_EXCHANGE == session->current_round);
723
724   if (tc != NULL)
725   {
726     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
727     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "P%u: consensus subround timed out\n",
728                 session->local_peer_idx);
729   }
730
731   /* cancel timeout */
732   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
733   {
734     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
735     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
736   }
737
738   for (i = 0; i < session->num_peers; i++)
739   {
740     if (NULL != session->info[i].set_op)
741     {
742       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: canceling stray op with P%d\n",
743                   session->local_peer_idx, i);
744       GNUNET_SET_operation_cancel (session->info[i].set_op);
745       session->info[i].set_op = NULL;
746     }
747     /* we're in the new round, nothing finished yet */
748     session->info[i].set_op_finished = GNUNET_NO;
749   }
750
751   if (session->exp_repetition >= NUM_EXP_REPETITIONS)
752   {
753     round_over (session, NULL);
754     return;
755   }
756
757   if (session->exp_repetition == 0)
758   {
759     /* initialize everything for the log-rounds */
760     session->exp_repetition = 1;
761     session->exp_subround = 0;
762     if (NULL == session->shuffle)
763       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
764     if (NULL == session->shuffle_inv)
765       session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
766     for (i = 0; i < session->num_peers; i++)
767       session->shuffle[i] = session->shuffle_inv[i] = i;
768   }
769   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
770   {
771     /* subrounds done, start new log-round */
772     session->exp_repetition++;
773     session->exp_subround = 0;
774     shuffle (session);
775   }
776   else
777   {
778     session->exp_subround++;
779   }
780
781   subround_timeout =
782       GNUNET_TIME_relative_divide (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline),
783                                    2 * NUM_EXP_REPETITIONS * ((int) ceil (log2 (session->num_peers))));
784
785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "subround timeout: %u ms\n", subround_timeout.rel_value_us / 1000);
786
787   session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (subround_timeout, subround_over, session);
788
789   /* determine the incoming and outgoing partner */
790   find_partners (session);
791
792   GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
793   GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
794
795   /* initiate set operation with the outgoing partner */
796   if (NULL != session->partner_outgoing)
797   {
798     struct GNUNET_CONSENSUS_RoundContextMessage *msg;
799     msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
800     msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
801     msg->header.size = htons (sizeof *msg);
802     msg->round = htonl (session->current_round);
803     msg->exp_repetition = htonl (session->exp_repetition);
804     msg->exp_subround = htonl (session->exp_subround);
805
806     if (NULL != session->partner_outgoing->set_op)
807     {
808       GNUNET_break (0);
809       GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
810     }
811     session->partner_outgoing->set_op =
812         GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
813                             &session->global_id,
814                             (struct GNUNET_MessageHeader *) msg,
815                             0, /* FIXME: salt */
816                             GNUNET_SET_RESULT_ADDED,
817                             set_result_cb, session->partner_outgoing);
818     GNUNET_free (msg);
819     if (GNUNET_OK != GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set))
820     {
821       GNUNET_break (0);
822       session->partner_outgoing->set_op = NULL;
823       session->partner_outgoing->set_op_finished = GNUNET_YES;
824     }
825   }
826
827   /* commit to the delayed set operation */
828   if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
829   {
830     int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
831
832     if (NULL != session->partner_incoming->set_op)
833     {
834       GNUNET_break (0);
835       GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
836       session->partner_incoming->set_op = NULL;
837     }
838     if (cmp == 0)
839     {
840       if (GNUNET_OK != GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set))
841       {
842         GNUNET_break (0);
843       }
844       session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
845       session->partner_incoming->delayed_set_op = NULL;
846       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
847                   session->local_peer_idx, (int) (session->partner_incoming - session->info));
848     }
849     else
850     {
851       /* this should not happen -- a round has been skipped! */
852       GNUNET_break_op (0);
853     }
854   }
855
856 #ifdef GNUNET_EXTRA_LOGGING
857   {
858     int in;
859     int out;
860     if (session->partner_outgoing == NULL)
861       out = -1;
862     else
863       out = (int) (session->partner_outgoing - session->info);
864     if (session->partner_incoming == NULL)
865       in = -1;
866     else
867       in = (int) (session->partner_incoming - session->info);
868     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
869                 session->exp_repetition, session->exp_subround, in, out);
870   }
871 #endif /* GNUNET_EXTRA_LOGGING */
872
873 }
874
875
876 /**
877  * Search peer in the list of peers in session.
878  *
879  * @param peer peer to find
880  * @param session session with peer
881  * @return index of peer, -1 if peer is not in session
882  */
883 static int
884 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
885 {
886   int i;
887   for (i = 0; i < session->num_peers; i++)
888     if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
889       return i;
890   return -1;
891 }
892
893
894 /**
895  * Compute a global, (hopefully) unique consensus session id,
896  * from the local id of the consensus session, and the identities of all participants.
897  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
898  * exactly the same peers, the global id will be different.
899  *
900  * @param session session to generate the global id for
901  * @param session_id local id of the consensus session
902  */
903 static void
904 compute_global_id (struct ConsensusSession *session,
905                    const struct GNUNET_HashCode *session_id)
906 {
907   int i;
908   struct GNUNET_HashCode tmp;
909   struct GNUNET_HashCode phash;
910
911   /* FIXME: use kdf? */
912
913   session->global_id = *session_id;
914   for (i = 0; i < session->num_peers; ++i)
915   {
916     GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash);
917     GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp);
918     session->global_id = tmp;
919     GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
920     session->global_id = tmp;
921   }
922 }
923
924
925 /**
926  * Compare two peer identities.
927  *
928  * @param h1 some peer identity
929  * @param h2 some peer identity
930  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
931  */
932 static int
933 peer_id_cmp (const void *h1, const void *h2)
934 {
935   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
936 }
937
938
939 /**
940  * Create the sorted list of peers for the session,
941  * add the local peer if not in the join message.
942  */
943 static void
944 initialize_session_peer_list (struct ConsensusSession *session,
945                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
946 {
947   unsigned int local_peer_in_list;
948   uint32_t listed_peers;
949   const struct GNUNET_PeerIdentity *msg_peers;
950   struct GNUNET_PeerIdentity *peers;
951   unsigned int i;
952
953   GNUNET_assert (NULL != join_msg);
954
955   /* peers in the join message, may or may not include the local peer */
956   listed_peers = ntohl (join_msg->num_peers);
957
958   session->num_peers = listed_peers;
959
960   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
961
962   local_peer_in_list = GNUNET_NO;
963   for (i = 0; i < listed_peers; i++)
964   {
965     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
966     {
967       local_peer_in_list = GNUNET_YES;
968       break;
969     }
970   }
971
972   if (GNUNET_NO == local_peer_in_list)
973     session->num_peers++;
974
975   peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
976
977   if (GNUNET_NO == local_peer_in_list)
978     peers[session->num_peers - 1] = my_peer;
979
980   memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
981   qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
982
983   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
984
985   for (i = 0; i < session->num_peers; ++i)
986   {
987     /* initialize back-references, so consensus peer information can
988      * be used as closure */
989     session->info[i].session = session;
990     session->info[i].peer_id = peers[i];
991   }
992
993   GNUNET_free (peers);
994 }
995
996
997 /**
998  * Called when another peer wants to do a set operation with the
999  * local peer.
1000  *
1001  * @param cls closure
1002  * @param other_peer the other peer
1003  * @param context_msg message with application specific information from
1004  *        the other peer
1005  * @param request request from the other peer, use GNUNET_SET_accept
1006  *        to accept it, otherwise the request will be refused
1007  *        Note that we don't use a return value here, as it is also
1008  *        necessary to specify the set we want to do the operation with,
1009  *        whith sometimes can be derived from the context message.
1010  *        Also necessary to specify the timeout.
1011  */
1012 static void
1013 set_listen_cb (void *cls,
1014                const struct GNUNET_PeerIdentity *other_peer,
1015                const struct GNUNET_MessageHeader *context_msg,
1016                struct GNUNET_SET_Request *request)
1017 {
1018   struct ConsensusSession *session = cls;
1019   struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
1020   struct ConsensusPeerInformation *cpi;
1021   struct GNUNET_SET_OperationHandle *set_op;
1022   struct RoundInfo round_info;
1023   int index;
1024   int cmp;
1025
1026   if (NULL == context_msg)
1027   {
1028     GNUNET_break_op (0);
1029     return;
1030   }
1031
1032   index = get_peer_idx (other_peer, session);
1033
1034   if (index < 0)
1035   {
1036     GNUNET_break_op (0);
1037     return;
1038   }
1039
1040   round_info.round = ntohl (msg->round);
1041   round_info.exp_repetition = ntohl (msg->exp_repetition);
1042   round_info.exp_subround = ntohl (msg->exp_subround);
1043
1044   cpi = &session->info[index];
1045
1046   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
1047
1048   switch (session->current_round)
1049   {
1050     case CONSENSUS_ROUND_BEGIN:
1051       /* we're in the begin round, so requests for the exchange round may
1052        * come in, they will be delayed for now! */
1053     case CONSENSUS_ROUND_EXCHANGE:
1054       cmp = rounds_compare (session, &round_info);
1055       if (cmp > 0)
1056       {
1057         /* the other peer is too late */
1058         LOG_PP (GNUNET_ERROR_TYPE_DEBUG, cpi, "too late for the current round\n");
1059         return;
1060       }
1061       /* kill old request, if any. this is legal,
1062        * as the other peer would not make a new request if it would want to
1063        * complete the old one! */
1064       if (NULL != cpi->set_op)
1065       {
1066         LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got new request from same peer, canceling old one\n");
1067         GNUNET_SET_operation_cancel (cpi->set_op);
1068         cpi->set_op = NULL;
1069       }
1070       set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
1071                                   set_result_cb, &session->info[index]);
1072       if (cmp == 0)
1073       {
1074         /* we're in exactly the right round for the incoming request */
1075         if (cpi != cpi->session->partner_incoming)
1076         {
1077           GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%u: got request from %u (with matching round), "
1078                       "but incoming partner is %d\n", cpi->session->local_peer_idx, cpi - cpi->session->info,
1079                       ((NULL == cpi->session->partner_incoming) ? -1 : (cpi->session->partner_incoming - cpi->session->info)));
1080           GNUNET_SET_operation_cancel (set_op);
1081           return;
1082         }
1083         cpi->set_op = set_op;
1084         if (GNUNET_OK != GNUNET_SET_commit (set_op, session->element_set))
1085         {
1086           GNUNET_break (0);
1087         }
1088         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
1089       }
1090       else
1091       {
1092         /* we still have wait until we have finished the current round,
1093          * as the other peer's round is larger */
1094         cpi->delayed_set_op = set_op;
1095         cpi->delayed_round_info = round_info;
1096         /* The current setop is finished, as we canceled the current setop above. */
1097         cpi->set_op_finished = GNUNET_YES;
1098         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
1099       }
1100       break;
1101     default:
1102       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
1103                   session->local_peer_idx, session->current_round, index);
1104       GNUNET_break_op (0);
1105       return;
1106   }
1107 }
1108
1109
1110 /**
1111  * Initialize the session, continue receiving messages from the owning client
1112  *
1113  * @param session the session to initialize
1114  * @param join_msg the join message from the client
1115  */
1116 static void
1117 initialize_session (struct ConsensusSession *session,
1118                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
1119 {
1120   struct ConsensusSession *other_session;
1121
1122   initialize_session_peer_list (session, join_msg);
1123   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
1124   compute_global_id (session, &join_msg->session_id);
1125
1126   /* check if some local client already owns the session.
1127    * it is only legal to have a session with an existing global id
1128    * if all other sessions with this global id are finished.*/
1129   other_session = sessions_head;
1130   while (NULL != other_session)
1131   {
1132     if ((other_session != session) &&
1133         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1134     {
1135       if (CONSENSUS_ROUND_FINISH != other_session->current_round)
1136       {
1137         GNUNET_break (0);
1138         destroy_session (session);
1139         return;
1140       }
1141       break;
1142     }
1143     other_session = other_session->next;
1144   }
1145
1146   session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
1147   session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
1148
1149   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %ums created\n",
1150               (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
1151
1152   session->local_peer_idx = get_peer_idx (&my_peer, session);
1153   GNUNET_assert (-1 != session->local_peer_idx);
1154   session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1155   GNUNET_assert (NULL != session->element_set);
1156   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
1157                                              &session->global_id,
1158                                              set_listen_cb, session);
1159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
1160   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1161 }
1162
1163
1164 static struct ConsensusSession *
1165 get_session_by_client (struct GNUNET_SERVER_Client *client)
1166 {
1167   struct ConsensusSession *session;
1168
1169   session = sessions_head;
1170   while (NULL != session)
1171   {
1172     if (session->client == client)
1173       return session;
1174     session = session->next;
1175   }
1176   return NULL;
1177 }
1178
1179
1180 /**
1181  * Called when a client wants to join a consensus session.
1182  *
1183  * @param cls unused
1184  * @param client client that sent the message
1185  * @param m message sent by the client
1186  */
1187 static void
1188 client_join (void *cls,
1189              struct GNUNET_SERVER_Client *client,
1190              const struct GNUNET_MessageHeader *m)
1191 {
1192   struct ConsensusSession *session;
1193
1194   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
1195
1196   session = get_session_by_client (client);
1197   if (NULL != session)
1198   {
1199     GNUNET_break (0);
1200     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1201     return;
1202   }
1203   session = GNUNET_new (struct ConsensusSession);
1204   session->client = client;
1205   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
1206   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1207   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
1208   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1209
1210   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
1211 }
1212
1213
1214 /**
1215  * Called when a client performs an insert operation.
1216  *
1217  * @param cls (unused)
1218  * @param client client handle
1219  * @param m message sent by the client
1220  */
1221 void
1222 client_insert (void *cls,
1223                struct GNUNET_SERVER_Client *client,
1224                const struct GNUNET_MessageHeader *m)
1225 {
1226   struct ConsensusSession *session;
1227   struct GNUNET_CONSENSUS_ElementMessage *msg;
1228   struct GNUNET_SET_Element *element;
1229   ssize_t element_size;
1230
1231   session = get_session_by_client (client);
1232
1233   if (NULL == session)
1234   {
1235     GNUNET_break (0);
1236     GNUNET_SERVER_client_disconnect (client);
1237     return;
1238   }
1239
1240   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1241   {
1242     GNUNET_break (0);
1243     GNUNET_SERVER_client_disconnect (client);
1244     return;
1245   }
1246
1247   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1248   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1249   if (element_size < 0)
1250   {
1251     GNUNET_break (0);
1252     return;
1253   }
1254
1255   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
1256   element->type = msg->element_type;
1257   element->size = element_size;
1258   memcpy (&element[1], &msg[1], element_size);
1259   element->data = &element[1];
1260   GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
1261   GNUNET_free (element);
1262   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1263
1264   // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
1265 }
1266
1267
1268 /**
1269  * Called when a client performs the conclude operation.
1270  *
1271  * @param cls (unused)
1272  * @param client client handle
1273  * @param message message sent by the client
1274  */
1275 static void
1276 client_conclude (void *cls,
1277                  struct GNUNET_SERVER_Client *client,
1278                  const struct GNUNET_MessageHeader *message)
1279 {
1280   struct ConsensusSession *session;
1281
1282   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
1283   session = get_session_by_client (client);
1284   if (NULL == session)
1285   {
1286     /* client not found */
1287     GNUNET_break (0);
1288     GNUNET_SERVER_client_disconnect (client);
1289     return;
1290   }
1291   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1292   {
1293     /* client requested conclude twice */
1294     GNUNET_break (0);
1295     return;
1296   }
1297   if (session->num_peers <= 1)
1298   {
1299     session->current_round = CONSENSUS_ROUND_FINISH;
1300     GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
1301   }
1302   else
1303   {
1304     /* the 'begin' round is over, start with the next, actual round */
1305     round_over (session, NULL);
1306   }
1307
1308   GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
1309   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1310 }
1311
1312
1313 /**
1314  * Called to clean up, after a shutdown has been requested.
1315  *
1316  * @param cls closure
1317  * @param tc context information (why was this task triggered now)
1318  */
1319 static void
1320 shutdown_task (void *cls,
1321                const struct GNUNET_SCHEDULER_TaskContext *tc)
1322 {
1323   while (NULL != sessions_head)
1324     destroy_session (sessions_head);
1325
1326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1327 }
1328
1329
1330 /**
1331  * Clean up after a client after it is
1332  * disconnected (either by us or by itself)
1333  *
1334  * @param cls closure, unused
1335  * @param client the client to clean up after
1336  */
1337 void
1338 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1339 {
1340   struct ConsensusSession *session;
1341
1342   session = get_session_by_client (client);
1343   if (NULL == session)
1344     return;
1345   if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
1346       (CONSENSUS_ROUND_FINISH == session->current_round))
1347   {
1348     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, destroying session\n");
1349     destroy_session (session);
1350   }
1351   else
1352     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1353 }
1354
1355
1356 /**
1357  * Start processing consensus requests.
1358  *
1359  * @param cls closure
1360  * @param server the initialized server
1361  * @param c configuration to use
1362  */
1363 static void
1364 run (void *cls, struct GNUNET_SERVER_Handle *server,
1365      const struct GNUNET_CONFIGURATION_Handle *c)
1366 {
1367   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1368     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1369         sizeof (struct GNUNET_MessageHeader)},
1370     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1371     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1372     {NULL, NULL, 0, 0}
1373   };
1374
1375   cfg = c;
1376   srv = server;
1377   if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
1378   {
1379     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
1380     GNUNET_break (0);
1381     GNUNET_SCHEDULER_shutdown ();
1382     return;
1383   }
1384   GNUNET_SERVER_add_handlers (server, server_handlers);
1385   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1386   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
1387   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1388 }
1389
1390
1391 /**
1392  * The main function for the consensus service.
1393  *
1394  * @param argc number of arguments from the command line
1395  * @param argv command line arguments
1396  * @return 0 ok, 1 on error
1397  */
1398 int
1399 main (int argc, char *const *argv)
1400 {
1401   int ret;
1402   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1403   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
1404   return (GNUNET_OK == ret) ? 0 : 1;
1405 }
1406