-fix unused initialization of locals
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_consensus_service.h"
33 #include "consensus_protocol.h"
34 #include "consensus.h"
35
36
37 /**
38  * Log macro that prefixes the local peer and the peer we are in contact with.
39  *
40  * @param kind log level
41  * @param cpi ConsensusPeerInformation of the partner peer
42  * @param m log message
43  */
44 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
45    cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
46
47
48 /**
49  * Number of exponential rounds, used in the exp and completion round.
50  */
51 #define NUM_EXP_REPETITIONS 4
52
53
54 /* forward declarations */
55
56 /* mutual recursion with struct ConsensusSession */
57 struct ConsensusPeerInformation;
58
59 /* mutual recursion with round_over */
60 static void
61 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
62
63
64 /**
65  * Describes the current round a consensus session is in.
66  */
67 enum ConsensusRound
68 {
69   /**
70    * Not started the protocol yet.
71    */
72   CONSENSUS_ROUND_BEGIN=0,
73   /**
74    * Distribution of elements with the exponential scheme.
75    */
76   CONSENSUS_ROUND_EXCHANGE,
77   /**
78    * Collect and distribute missing values.
79    */
80   CONSENSUS_ROUND_COMPLETION,
81   /**
82    * Consensus concluded. After timeout and finished communication with client,
83    * consensus session will be destroyed.
84    */
85   CONSENSUS_ROUND_FINISH
86 };
87
88
89 /**
90  * Information about the current round.
91  */
92 struct RoundInfo
93 {
94   /**
95    * The current main round.
96    */
97   enum ConsensusRound round;
98   /**
99    * The current exp round repetition, valid if
100    * the main round is an exp round.
101    */
102   uint32_t exp_repetition;
103   /**
104    * The current exp subround, valid if
105    * the main round is an exp round.
106    */
107   uint32_t exp_subround;
108 };
109
110
111 /**
112  * A consensus session consists of one local client and the remote authorities.
113  */
114 struct ConsensusSession
115 {
116   /**
117    * Consensus sessions are kept in a DLL.
118    */
119   struct ConsensusSession *next;
120
121   /**
122    * Consensus sessions are kept in a DLL.
123    */
124   struct ConsensusSession *prev;
125
126   /**
127   * Global consensus identification, computed
128   * from the session id and participating authorities.
129   */
130   struct GNUNET_HashCode global_id;
131
132   /**
133    * Client that inhabits the session
134    */
135   struct GNUNET_SERVER_Client *client;
136
137   /**
138    * Queued messages to the client.
139    */
140   struct GNUNET_MQ_Handle *client_mq;
141
142   /**
143    * Time when the conclusion of the consensus should begin.
144    */
145   struct GNUNET_TIME_Absolute conclude_start;
146
147   /**
148    * Timeout for all rounds together, single rounds will schedule a timeout task
149    * with a fraction of the conclude timeout.
150    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
151    */
152   struct GNUNET_TIME_Absolute conclude_deadline;
153
154   /**
155    * Timeout task identifier for the current round or subround.
156    */
157   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
158
159   /**
160    * Number of other peers in the consensus.
161    */
162   unsigned int num_peers;
163
164   /**
165    * Information about the other peers,
166    * their state, etc.
167    */
168   struct ConsensusPeerInformation *info;
169
170   /**
171    * Index of the local peer in the peers array
172    */
173   unsigned int local_peer_idx;
174
175   /**
176    * Current round
177    */
178   enum ConsensusRound current_round;
179
180   /**
181    * Permutation of peers for the current round,
182    */
183   uint32_t *shuffle;
184
185   /**
186    * Inverse permutation of peers for the current round,
187    */
188   uint32_t *shuffle_inv;
189
190   /**
191    * Current round of the exponential scheme.
192    */
193   uint32_t exp_repetition;
194
195   /**
196    * Current sub-round of the exponential scheme.
197    */
198   uint32_t exp_subround;
199
200   /**
201    * The partner for the current exp-round.
202    * The local peer will initiate the set reconciliation with the
203    * outgoing peer.
204    */
205   struct ConsensusPeerInformation *partner_outgoing;
206
207   /**
208    * The partner for the current exp-round
209    * The incoming peer will initiate the set reconciliation with
210    * the incoming peer.
211    */
212   struct ConsensusPeerInformation *partner_incoming;
213
214   /**
215    * The consensus set of this session.
216    */
217   struct GNUNET_SET_Handle *element_set;
218
219   /**
220    * Listener for requests from other peers.
221    * Uses the session's global id as app id.
222    */
223   struct GNUNET_SET_ListenHandle *set_listener;
224 };
225
226
227 /**
228  * Information about a peer that is in a consensus session.
229  */
230 struct ConsensusPeerInformation
231 {
232   /**
233    * Peer identitty of the peer in the consensus session
234    */
235   struct GNUNET_PeerIdentity peer_id;
236
237   /**
238    * Back-reference to the consensus session,
239    * to that ConsensusPeerInformation can be used as a closure
240    */
241   struct ConsensusSession *session;
242
243   /**
244    * Have we finished the set operation for this (sub-)round?
245    */
246   int set_op_finished;
247
248   /**
249    * Set operation we are currently executing with this peer.
250    */
251   struct GNUNET_SET_OperationHandle *set_op;
252
253   /**
254    * Set operation we are planning on executing with this peer.
255    */
256   struct GNUNET_SET_OperationHandle *delayed_set_op;
257
258   /**
259    * Info about the round of the delayed set operation.
260    */
261   struct RoundInfo delayed_round_info;
262 };
263
264
265 /**
266  * Linked list of sessions this peer participates in.
267  */
268 static struct ConsensusSession *sessions_head;
269
270 /**
271  * Linked list of sessions this peer participates in.
272  */
273 static struct ConsensusSession *sessions_tail;
274
275 /**
276  * Configuration of the consensus service.
277  */
278 static const struct GNUNET_CONFIGURATION_Handle *cfg;
279
280 /**
281  * Handle to the server for this service.
282  */
283 static struct GNUNET_SERVER_Handle *srv;
284
285 /**
286  * Peer that runs this service.
287  */
288 static struct GNUNET_PeerIdentity my_peer;
289
290
291 /**
292  * Check if the current subround has finished.
293  * Must only be called when an exp-round is the current round.
294  *
295  * @param session session to check for exp-round completion
296  * @return GNUNET_YES if the subround has finished,
297  *         GNUNET_NO if not
298  */
299 static int
300 have_exp_subround_finished (const struct ConsensusSession *session)
301 {
302   int not_finished;
303
304   GNUNET_assert (CONSENSUS_ROUND_EXCHANGE == session->current_round);
305
306   not_finished = 0;
307   if ( (NULL != session->partner_outgoing) &&
308        (GNUNET_NO == session->partner_outgoing->set_op_finished) )
309     not_finished++;
310   if ( (NULL != session->partner_incoming) &&
311        (GNUNET_NO == session->partner_incoming->set_op_finished) )
312     not_finished++;
313   if (0 == not_finished)
314     return GNUNET_YES;
315   return GNUNET_NO;
316 }
317
318
319 /**
320  * Destroy a session, free all resources associated with it.
321  *
322  * @param session the session to destroy
323  */
324 static void
325 destroy_session (struct ConsensusSession *session)
326 {
327   int i;
328
329   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
330   if (NULL != session->element_set)
331   {
332     GNUNET_SET_destroy (session->element_set);
333     session->element_set = NULL;
334   }
335   if (NULL != session->set_listener)
336   {
337     GNUNET_SET_listen_cancel (session->set_listener);
338     session->set_listener = NULL;
339   }
340   if (NULL != session->client_mq)
341   {
342     GNUNET_MQ_destroy (session->client_mq);
343     session->client_mq = NULL;
344   }
345   if (NULL != session->client)
346   {
347     GNUNET_SERVER_client_disconnect (session->client);
348     session->client = NULL;
349   }
350   if (NULL != session->shuffle)
351   {
352     GNUNET_free (session->shuffle);
353     session->shuffle = NULL;
354   }
355   if (NULL != session->shuffle_inv)
356   {
357     GNUNET_free (session->shuffle_inv);
358     session->shuffle_inv = NULL;
359   }
360   if (NULL != session->info)
361   {
362     for (i = 0; i < session->num_peers; i++)
363     {
364       struct ConsensusPeerInformation *cpi;
365       cpi = &session->info[i];
366       if (NULL != cpi->set_op)
367       {
368         GNUNET_SET_operation_cancel (cpi->set_op);
369         cpi->set_op = NULL;
370       }
371     }
372     GNUNET_free (session->info);
373     session->info = NULL;
374   }
375   GNUNET_free (session);
376 }
377
378
379 /**
380  * Iterator for set elements. [FIXME: bad comment]
381  *
382  * @param cls closure
383  * @param element the current element, NULL if all elements have been
384  *        iterated over
385  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
386  */
387 static int
388 send_to_client_iter (void *cls,
389                      const struct GNUNET_SET_Element *element)
390 {
391   struct ConsensusSession *session = cls;
392   struct GNUNET_MQ_Envelope *ev;
393
394   if (NULL != element)
395   {
396     struct GNUNET_CONSENSUS_ElementMessage *m;
397
398     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
399                 "P%d: got element for client\n",
400                 session->local_peer_idx);
401
402     ev = GNUNET_MQ_msg_extra (m, element->size,
403                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
404     m->element_type = htons (element->element_type);
405     memcpy (&m[1], element->data, element->size);
406     GNUNET_MQ_send (session->client_mq, ev);
407   }
408   else
409   {
410     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
411                 "P%d: finished iterating elements for client\n",
412                 session->local_peer_idx);
413     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
414     GNUNET_MQ_send (session->client_mq, ev);
415   }
416   return GNUNET_YES;
417 }
418
419
420 /**
421  * Start the next round.
422  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
423  *
424  * @param cls the session
425  * @param tc task context, for when this task is invoked by the scheduler,
426  *           NULL if invoked for another reason
427  */
428 static void
429 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
430 {
431   struct ConsensusSession *session;
432   unsigned int i;
433   int res;
434
435   /* don't kick off next round if we're shutting down */
436   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
437     return;
438
439   session = cls;
440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
441
442   if (tc != NULL)
443     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
444
445   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
446   {
447     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
448     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
449   }
450
451   for (i = 0; i < session->num_peers; i++)
452   {
453     if (NULL != session->info[i].set_op)
454     {
455       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: canceling stray op with P%d\n",
456                   session->local_peer_idx, i);
457       GNUNET_SET_operation_cancel (session->info[i].set_op);
458       session->info[i].set_op = NULL;
459     }
460     /* we're in the new round, nothing finished yet */
461     session->info[i].set_op_finished = GNUNET_NO;
462   }
463
464   switch (session->current_round)
465   {
466     case CONSENSUS_ROUND_BEGIN:
467       session->current_round = CONSENSUS_ROUND_EXCHANGE;
468       session->exp_repetition = 0;
469       subround_over (session, NULL);
470       break;
471     case CONSENSUS_ROUND_EXCHANGE:
472       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
473                   session->local_peer_idx);
474       session->current_round = CONSENSUS_ROUND_FINISH;
475       res = GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
476       if (GNUNET_SYSERR == res)
477       {
478         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "can't iterate set: set invalid\n");
479       }
480       else if (GNUNET_NO == res)
481       {
482         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "can't iterate set: iterator already active\n");
483       }
484       break;
485     default:
486       GNUNET_assert (0);
487   }
488 }
489
490
491 /**
492  * Create a new permutation for the session's peers in session->shuffle.
493  * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
494  * both the global session id and the current round index.
495  *
496  * @param session the session to create the new permutation for
497  */
498 static void
499 shuffle (struct ConsensusSession *session)
500 {
501   uint32_t i;
502   uint32_t randomness[session->num_peers-1];
503
504   if (NULL == session->shuffle)
505     session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
506   if (NULL == session->shuffle_inv)
507     session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
508
509   GNUNET_CRYPTO_kdf (randomness, sizeof (randomness),
510                      &session->exp_repetition, sizeof (uint32_t),
511                      &session->global_id, sizeof (struct GNUNET_HashCode),
512                      NULL);
513
514   for (i = 0; i < session->num_peers; i++)
515     session->shuffle[i] = i;
516
517   for (i = session->num_peers - 1; i > 0; i--)
518   {
519     uint32_t x;
520     uint32_t tmp;
521     x = randomness[i-1] % session->num_peers;
522     tmp = session->shuffle[x];
523     session->shuffle[x] = session->shuffle[i];
524     session->shuffle[i] = tmp;
525   }
526
527   /* create the inverse */
528   for (i = 0; i < session->num_peers; i++)
529     session->shuffle_inv[session->shuffle[i]] = i;
530 }
531
532
533 /**
534  * Find and set the partner_incoming and partner_outgoing of our peer,
535  * one of them may not exist (and thus set to NULL) if the number of peers
536  * in the session is not a power of two.
537  *
538  * @param session the consensus session
539  */
540 static void
541 find_partners (struct ConsensusSession *session)
542 {
543   unsigned int arc;
544   unsigned int num_ghosts;
545   unsigned int largest_arc;
546   int partner_idx;
547
548   /* shuffled local index */
549   int my_idx = session->shuffle[session->local_peer_idx];
550
551   /* distance to neighboring peer in current subround */
552   arc = 1 << session->exp_subround;
553   largest_arc = 1;
554   while (largest_arc < session->num_peers)
555     largest_arc <<= 1;
556   num_ghosts = largest_arc - session->num_peers;
557   // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
558   // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
559   // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
560
561   if (0 == (my_idx & arc))
562   {
563     /* we are outgoing */
564     partner_idx = (my_idx + arc) % session->num_peers;
565     session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
566     GNUNET_assert (GNUNET_NO == session->partner_outgoing->set_op_finished);
567     /* are we a 'ghost' of a peer that would exist if
568      * the number of peers was a power of two, and thus have to partner
569      * with an additional peer?
570      */
571     if (my_idx < num_ghosts)
572     {
573       int ghost_partner_idx;
574       // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
575       ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
576       /* platform dependent; modulo sometimes returns negative values */
577       if (ghost_partner_idx < 0)
578         ghost_partner_idx += session->num_peers;
579       /* we only need to have a ghost partner if the partner is outgoing */
580       if (0 == (ghost_partner_idx & arc))
581       {
582         // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
583         session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
584         GNUNET_assert (GNUNET_NO == session->partner_incoming->set_op_finished);
585         return;
586       }
587     }
588     session->partner_incoming = NULL;
589     return;
590   }
591   /* we only have an incoming connection */
592   partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
593   if (partner_idx < 0)
594     partner_idx += session->num_peers;
595   session->partner_outgoing = NULL;
596   session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
597   GNUNET_assert (GNUNET_NO == session->partner_incoming->set_op_finished);
598 }
599
600
601 /**
602  * Callback for set operation results. Called for each element
603  * in the result set.
604  *
605  * @param cls closure
606  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
607  * @param status see enum GNUNET_SET_Status
608  */
609 static void
610 set_result_cb (void *cls,
611                const struct GNUNET_SET_Element *element,
612                enum GNUNET_SET_Status status)
613 {
614   struct ConsensusPeerInformation *cpi = cls;
615   unsigned int remote_idx = cpi - cpi->session->info;
616   unsigned int local_idx = cpi->session->local_peer_idx;
617
618   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u with status %u\n",
619               local_idx, remote_idx, (unsigned int) status);
620
621   GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
622                  (cpi == cpi->session->partner_incoming));
623
624   switch (status)
625   {
626     case GNUNET_SET_STATUS_OK:
627       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
628                   local_idx, remote_idx);
629       break;
630     case GNUNET_SET_STATUS_FAILURE:
631       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
632                   local_idx, remote_idx);
633       cpi->set_op = NULL;
634       return;
635     case GNUNET_SET_STATUS_HALF_DONE:
636     case GNUNET_SET_STATUS_DONE:
637       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
638                   local_idx, remote_idx);
639       cpi->set_op_finished = GNUNET_YES;
640       cpi->set_op = NULL;
641       if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
642       {
643         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
644                     local_idx);
645         subround_over (cpi->session, NULL);
646       }
647       else
648       {
649         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
650                     local_idx);
651       }
652       return;
653     default:
654       GNUNET_break (0);
655       return;
656   }
657
658   switch (cpi->session->current_round)
659   {
660     case CONSENSUS_ROUND_COMPLETION:
661     case CONSENSUS_ROUND_EXCHANGE:
662       GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
663       break;
664     default:
665       GNUNET_break (0);
666       return;
667   }
668 }
669
670
671 /**
672  * Compare the round the session is in with the round of the given context message.
673  *
674  * @param session a consensus session
675  * @param ri a round context message
676  * @return 0 if it's the same round, -1 if the session is in an earlier round,
677  *         1 if the session is in a later round
678  */
679 static int
680 rounds_compare (struct ConsensusSession *session,
681                 struct RoundInfo* ri)
682 {
683   if (session->current_round < ri->round)
684     return -1;
685   if (session->current_round > ri->round)
686     return 1;
687   if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
688   {
689     if (session->exp_repetition < ri->exp_repetition)
690       return -1;
691     if (session->exp_repetition > ri->exp_repetition)
692       return 1;
693     if (session->exp_subround < ri->exp_subround)
694       return -1;
695     if (session->exp_subround > ri->exp_subround)
696       return 1;
697     return 0;
698   }
699   /* other rounds have no subrounds / repetitions to compare */
700   return 0;
701 }
702
703
704 /**
705  * Do the next subround in the exp-scheme.
706  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
707  *
708  * @param cls the session
709  * @param tc task context, for when this task is invoked by the scheduler,
710  *           NULL if invoked for another reason
711  */
712 static void
713 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
714 {
715   struct ConsensusSession *session;
716   struct GNUNET_TIME_Relative subround_timeout;
717   int i;
718
719   /* don't kick off next subround if we're shutting down */
720   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
721     return;
722
723   session = cls;
724
725   GNUNET_assert (CONSENSUS_ROUND_EXCHANGE == session->current_round);
726
727   if (tc != NULL)
728   {
729     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
730     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "P%u: consensus subround timed out\n",
731                 session->local_peer_idx);
732   }
733
734   /* cancel timeout */
735   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
736   {
737     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
738     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
739   }
740
741   for (i = 0; i < session->num_peers; i++)
742   {
743     if (NULL != session->info[i].set_op)
744     {
745       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: canceling stray op with P%d\n",
746                   session->local_peer_idx, i);
747       GNUNET_SET_operation_cancel (session->info[i].set_op);
748       session->info[i].set_op = NULL;
749     }
750     /* we're in the new round, nothing finished yet */
751     session->info[i].set_op_finished = GNUNET_NO;
752   }
753
754   if (session->exp_repetition >= NUM_EXP_REPETITIONS)
755   {
756     round_over (session, NULL);
757     return;
758   }
759
760   if (session->exp_repetition == 0)
761   {
762     /* initialize everything for the log-rounds */
763     session->exp_repetition = 1;
764     session->exp_subround = 0;
765     if (NULL == session->shuffle)
766       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
767     if (NULL == session->shuffle_inv)
768       session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
769     for (i = 0; i < session->num_peers; i++)
770       session->shuffle[i] = session->shuffle_inv[i] = i;
771   }
772   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
773   {
774     /* subrounds done, start new log-round */
775     session->exp_repetition++;
776     session->exp_subround = 0;
777     shuffle (session);
778   }
779   else
780   {
781     session->exp_subround++;
782   }
783
784   subround_timeout =
785       GNUNET_TIME_relative_divide (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline),
786                                    2 * NUM_EXP_REPETITIONS * ((int) ceil (log2 (session->num_peers))));
787
788   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "subround timeout: %u ms\n", subround_timeout.rel_value_us / 1000);
789
790   session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (subround_timeout, subround_over, session);
791
792   /* determine the incoming and outgoing partner */
793   find_partners (session);
794
795   GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
796   GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
797
798   /* initiate set operation with the outgoing partner */
799   if (NULL != session->partner_outgoing)
800   {
801     struct GNUNET_CONSENSUS_RoundContextMessage *msg;
802     msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
803     msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
804     msg->header.size = htons (sizeof *msg);
805     msg->round = htonl (session->current_round);
806     msg->exp_repetition = htonl (session->exp_repetition);
807     msg->exp_subround = htonl (session->exp_subround);
808
809     if (NULL != session->partner_outgoing->set_op)
810     {
811       GNUNET_break (0);
812       GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
813     }
814     session->partner_outgoing->set_op =
815         GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
816                             &session->global_id,
817                             (struct GNUNET_MessageHeader *) msg,
818                             GNUNET_SET_RESULT_ADDED,
819                             set_result_cb, session->partner_outgoing);
820     GNUNET_free (msg);
821     if (GNUNET_OK != GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set))
822     {
823       GNUNET_break (0);
824       session->partner_outgoing->set_op = NULL;
825       session->partner_outgoing->set_op_finished = GNUNET_YES;
826     }
827   }
828
829   /* commit to the delayed set operation */
830   if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
831   {
832     int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
833
834     if (NULL != session->partner_incoming->set_op)
835     {
836       GNUNET_break (0);
837       GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
838       session->partner_incoming->set_op = NULL;
839     }
840     if (cmp == 0)
841     {
842       if (GNUNET_OK != GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set))
843       {
844         GNUNET_break (0);
845       }
846       session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
847       session->partner_incoming->delayed_set_op = NULL;
848       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
849                   session->local_peer_idx, (int) (session->partner_incoming - session->info));
850     }
851     else
852     {
853       /* this should not happen -- a round has been skipped! */
854       GNUNET_break_op (0);
855     }
856   }
857
858 #ifdef GNUNET_EXTRA_LOGGING
859   {
860     int in;
861     int out;
862     if (session->partner_outgoing == NULL)
863       out = -1;
864     else
865       out = (int) (session->partner_outgoing - session->info);
866     if (session->partner_incoming == NULL)
867       in = -1;
868     else
869       in = (int) (session->partner_incoming - session->info);
870     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
871                 session->exp_repetition, session->exp_subround, in, out);
872   }
873 #endif /* GNUNET_EXTRA_LOGGING */
874
875 }
876
877
878 /**
879  * Search peer in the list of peers in session.
880  *
881  * @param peer peer to find
882  * @param session session with peer
883  * @return index of peer, -1 if peer is not in session
884  */
885 static int
886 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
887 {
888   int i;
889   for (i = 0; i < session->num_peers; i++)
890     if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
891       return i;
892   return -1;
893 }
894
895
896 /**
897  * Compute a global, (hopefully) unique consensus session id,
898  * from the local id of the consensus session, and the identities of all participants.
899  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
900  * exactly the same peers, the global id will be different.
901  *
902  * @param session session to generate the global id for
903  * @param session_id local id of the consensus session
904  */
905 static void
906 compute_global_id (struct ConsensusSession *session,
907                    const struct GNUNET_HashCode *session_id)
908 {
909   int i;
910   struct GNUNET_HashCode tmp;
911   struct GNUNET_HashCode phash;
912
913   /* FIXME: use kdf? */
914
915   session->global_id = *session_id;
916   for (i = 0; i < session->num_peers; ++i)
917   {
918     GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash);
919     GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp);
920     session->global_id = tmp;
921     GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
922     session->global_id = tmp;
923   }
924 }
925
926
927 /**
928  * Compare two peer identities.
929  *
930  * @param h1 some peer identity
931  * @param h2 some peer identity
932  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
933  */
934 static int
935 peer_id_cmp (const void *h1, const void *h2)
936 {
937   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
938 }
939
940
941 /**
942  * Create the sorted list of peers for the session,
943  * add the local peer if not in the join message.
944  */
945 static void
946 initialize_session_peer_list (struct ConsensusSession *session,
947                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
948 {
949   unsigned int local_peer_in_list;
950   uint32_t listed_peers;
951   const struct GNUNET_PeerIdentity *msg_peers;
952   struct GNUNET_PeerIdentity *peers;
953   unsigned int i;
954
955   GNUNET_assert (NULL != join_msg);
956
957   /* peers in the join message, may or may not include the local peer */
958   listed_peers = ntohl (join_msg->num_peers);
959
960   session->num_peers = listed_peers;
961
962   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
963
964   local_peer_in_list = GNUNET_NO;
965   for (i = 0; i < listed_peers; i++)
966   {
967     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
968     {
969       local_peer_in_list = GNUNET_YES;
970       break;
971     }
972   }
973
974   if (GNUNET_NO == local_peer_in_list)
975     session->num_peers++;
976
977   peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
978
979   if (GNUNET_NO == local_peer_in_list)
980     peers[session->num_peers - 1] = my_peer;
981
982   memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
983   qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
984
985   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
986
987   for (i = 0; i < session->num_peers; ++i)
988   {
989     /* initialize back-references, so consensus peer information can
990      * be used as closure */
991     session->info[i].session = session;
992     session->info[i].peer_id = peers[i];
993   }
994
995   GNUNET_free (peers);
996 }
997
998
999 /**
1000  * Called when another peer wants to do a set operation with the
1001  * local peer.
1002  *
1003  * @param cls closure
1004  * @param other_peer the other peer
1005  * @param context_msg message with application specific information from
1006  *        the other peer
1007  * @param request request from the other peer, use GNUNET_SET_accept
1008  *        to accept it, otherwise the request will be refused
1009  *        Note that we don't use a return value here, as it is also
1010  *        necessary to specify the set we want to do the operation with,
1011  *        whith sometimes can be derived from the context message.
1012  *        Also necessary to specify the timeout.
1013  */
1014 static void
1015 set_listen_cb (void *cls,
1016                const struct GNUNET_PeerIdentity *other_peer,
1017                const struct GNUNET_MessageHeader *context_msg,
1018                struct GNUNET_SET_Request *request)
1019 {
1020   struct ConsensusSession *session = cls;
1021   struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
1022   struct ConsensusPeerInformation *cpi;
1023   struct GNUNET_SET_OperationHandle *set_op;
1024   struct RoundInfo round_info;
1025   int index;
1026   int cmp;
1027
1028   if (NULL == context_msg)
1029   {
1030     GNUNET_break_op (0);
1031     return;
1032   }
1033
1034   index = get_peer_idx (other_peer, session);
1035
1036   if (index < 0)
1037   {
1038     GNUNET_break_op (0);
1039     return;
1040   }
1041
1042   round_info.round = ntohl (msg->round);
1043   round_info.exp_repetition = ntohl (msg->exp_repetition);
1044   round_info.exp_subround = ntohl (msg->exp_subround);
1045
1046   cpi = &session->info[index];
1047
1048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
1049
1050   switch (session->current_round)
1051   {
1052     case CONSENSUS_ROUND_BEGIN:
1053       /* we're in the begin round, so requests for the exchange round may
1054        * come in, they will be delayed for now! */
1055     case CONSENSUS_ROUND_EXCHANGE:
1056       cmp = rounds_compare (session, &round_info);
1057       if (cmp > 0)
1058       {
1059         /* the other peer is too late */
1060         LOG_PP (GNUNET_ERROR_TYPE_DEBUG, cpi, "too late for the current round\n");
1061         return;
1062       }
1063       /* kill old request, if any. this is legal,
1064        * as the other peer would not make a new request if it would want to
1065        * complete the old one! */
1066       if (NULL != cpi->set_op)
1067       {
1068         LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got new request from same peer, canceling old one\n");
1069         GNUNET_SET_operation_cancel (cpi->set_op);
1070         cpi->set_op = NULL;
1071       }
1072       set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
1073                                   set_result_cb, &session->info[index]);
1074       if (cmp == 0)
1075       {
1076         /* we're in exactly the right round for the incoming request */
1077         if (cpi != cpi->session->partner_incoming)
1078         {
1079           GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%u: got request from %u (with matching round), "
1080                       "but incoming partner is %d\n", cpi->session->local_peer_idx, cpi - cpi->session->info,
1081                       ((NULL == cpi->session->partner_incoming) ? -1 : (cpi->session->partner_incoming - cpi->session->info)));
1082           GNUNET_SET_operation_cancel (set_op);
1083           return;
1084         }
1085         cpi->set_op = set_op;
1086         if (GNUNET_OK != GNUNET_SET_commit (set_op, session->element_set))
1087         {
1088           GNUNET_break (0);
1089         }
1090         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
1091       }
1092       else
1093       {
1094         /* we still have wait until we have finished the current round,
1095          * as the other peer's round is larger */
1096         cpi->delayed_set_op = set_op;
1097         cpi->delayed_round_info = round_info;
1098         /* The current setop is finished, as we canceled the current setop above. */
1099         cpi->set_op_finished = GNUNET_YES;
1100         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
1101       }
1102       break;
1103     default:
1104       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
1105                   session->local_peer_idx, session->current_round, index);
1106       GNUNET_break_op (0);
1107       return;
1108   }
1109 }
1110
1111
1112 /**
1113  * Initialize the session, continue receiving messages from the owning client
1114  *
1115  * @param session the session to initialize
1116  * @param join_msg the join message from the client
1117  */
1118 static void
1119 initialize_session (struct ConsensusSession *session,
1120                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
1121 {
1122   struct ConsensusSession *other_session;
1123
1124   initialize_session_peer_list (session, join_msg);
1125   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
1126   compute_global_id (session, &join_msg->session_id);
1127
1128   /* check if some local client already owns the session.
1129    * it is only legal to have a session with an existing global id
1130    * if all other sessions with this global id are finished.*/
1131   other_session = sessions_head;
1132   while (NULL != other_session)
1133   {
1134     if ((other_session != session) &&
1135         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1136     {
1137       if (CONSENSUS_ROUND_FINISH != other_session->current_round)
1138       {
1139         GNUNET_break (0);
1140         destroy_session (session);
1141         return;
1142       }
1143       break;
1144     }
1145     other_session = other_session->next;
1146   }
1147
1148   session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
1149   session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
1150
1151   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %ums created\n",
1152               (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
1153
1154   session->local_peer_idx = get_peer_idx (&my_peer, session);
1155   GNUNET_assert (-1 != session->local_peer_idx);
1156   session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1157   GNUNET_assert (NULL != session->element_set);
1158   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
1159                                              &session->global_id,
1160                                              set_listen_cb, session);
1161   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
1162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1163 }
1164
1165
1166 static struct ConsensusSession *
1167 get_session_by_client (struct GNUNET_SERVER_Client *client)
1168 {
1169   struct ConsensusSession *session;
1170
1171   session = sessions_head;
1172   while (NULL != session)
1173   {
1174     if (session->client == client)
1175       return session;
1176     session = session->next;
1177   }
1178   return NULL;
1179 }
1180
1181
1182 /**
1183  * Called when a client wants to join a consensus session.
1184  *
1185  * @param cls unused
1186  * @param client client that sent the message
1187  * @param m message sent by the client
1188  */
1189 static void
1190 client_join (void *cls,
1191              struct GNUNET_SERVER_Client *client,
1192              const struct GNUNET_MessageHeader *m)
1193 {
1194   struct ConsensusSession *session;
1195
1196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
1197
1198   session = get_session_by_client (client);
1199   if (NULL != session)
1200   {
1201     GNUNET_break (0);
1202     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1203     return;
1204   }
1205   session = GNUNET_new (struct ConsensusSession);
1206   session->client = client;
1207   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
1208   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1209   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
1210   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1211
1212   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
1213 }
1214
1215
1216 /**
1217  * Called when a client performs an insert operation.
1218  *
1219  * @param cls (unused)
1220  * @param client client handle
1221  * @param m message sent by the client
1222  */
1223 void
1224 client_insert (void *cls,
1225                struct GNUNET_SERVER_Client *client,
1226                const struct GNUNET_MessageHeader *m)
1227 {
1228   struct ConsensusSession *session;
1229   struct GNUNET_CONSENSUS_ElementMessage *msg;
1230   struct GNUNET_SET_Element *element;
1231   ssize_t element_size;
1232
1233   session = get_session_by_client (client);
1234
1235   if (NULL == session)
1236   {
1237     GNUNET_break (0);
1238     GNUNET_SERVER_client_disconnect (client);
1239     return;
1240   }
1241
1242   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1243   {
1244     GNUNET_break (0);
1245     GNUNET_SERVER_client_disconnect (client);
1246     return;
1247   }
1248
1249   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1250   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1251   if (element_size < 0)
1252   {
1253     GNUNET_break (0);
1254     return;
1255   }
1256
1257   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
1258   element->element_type = msg->element_type;
1259   element->size = element_size;
1260   memcpy (&element[1], &msg[1], element_size);
1261   element->data = &element[1];
1262   GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
1263   GNUNET_free (element);
1264   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1265
1266   // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
1267 }
1268
1269
1270 /**
1271  * Called when a client performs the conclude operation.
1272  *
1273  * @param cls (unused)
1274  * @param client client handle
1275  * @param message message sent by the client
1276  */
1277 static void
1278 client_conclude (void *cls,
1279                  struct GNUNET_SERVER_Client *client,
1280                  const struct GNUNET_MessageHeader *message)
1281 {
1282   struct ConsensusSession *session;
1283
1284   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
1285   session = get_session_by_client (client);
1286   if (NULL == session)
1287   {
1288     /* client not found */
1289     GNUNET_break (0);
1290     GNUNET_SERVER_client_disconnect (client);
1291     return;
1292   }
1293   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1294   {
1295     /* client requested conclude twice */
1296     GNUNET_break (0);
1297     return;
1298   }
1299   if (session->num_peers <= 1)
1300   {
1301     session->current_round = CONSENSUS_ROUND_FINISH;
1302     GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
1303   }
1304   else
1305   {
1306     /* the 'begin' round is over, start with the next, actual round */
1307     round_over (session, NULL);
1308   }
1309
1310   GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
1311   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1312 }
1313
1314
1315 /**
1316  * Called to clean up, after a shutdown has been requested.
1317  *
1318  * @param cls closure
1319  * @param tc context information (why was this task triggered now)
1320  */
1321 static void
1322 shutdown_task (void *cls,
1323                const struct GNUNET_SCHEDULER_TaskContext *tc)
1324 {
1325   while (NULL != sessions_head)
1326     destroy_session (sessions_head);
1327
1328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1329 }
1330
1331
1332 /**
1333  * Clean up after a client after it is
1334  * disconnected (either by us or by itself)
1335  *
1336  * @param cls closure, unused
1337  * @param client the client to clean up after
1338  */
1339 void
1340 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1341 {
1342   struct ConsensusSession *session;
1343
1344   session = get_session_by_client (client);
1345   if (NULL == session)
1346     return;
1347   if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
1348       (CONSENSUS_ROUND_FINISH == session->current_round))
1349   {
1350     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, destroying session\n");
1351     destroy_session (session);
1352   }
1353   else
1354     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1355 }
1356
1357
1358 /**
1359  * Start processing consensus requests.
1360  *
1361  * @param cls closure
1362  * @param server the initialized server
1363  * @param c configuration to use
1364  */
1365 static void
1366 run (void *cls, struct GNUNET_SERVER_Handle *server,
1367      const struct GNUNET_CONFIGURATION_Handle *c)
1368 {
1369   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1370     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1371         sizeof (struct GNUNET_MessageHeader)},
1372     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1373     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1374     {NULL, NULL, 0, 0}
1375   };
1376
1377   cfg = c;
1378   srv = server;
1379   if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
1380   {
1381     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
1382     GNUNET_break (0);
1383     GNUNET_SCHEDULER_shutdown ();
1384     return;
1385   }
1386   GNUNET_SERVER_add_handlers (server, server_handlers);
1387   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1388   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
1389   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1390 }
1391
1392
1393 /**
1394  * The main function for the consensus service.
1395  *
1396  * @param argc number of arguments from the command line
1397  * @param argv command line arguments
1398  * @return 0 ok, 1 on error
1399  */
1400 int
1401 main (int argc, char *const *argv)
1402 {
1403   int ret;
1404   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1405   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
1406   return (GNUNET_OK == ret) ? 0 : 1;
1407 }
1408