- delayed requests correctly when in 'begin' round
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_common.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
36
37
38 /**
39  * Log macro that prefixes the local peer and the peer we are in contact with.
40  */
41 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
42    cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
43
44
45 /**
46  * Number of exponential rounds, used in the exp and completion round.
47  */
48 #define NUM_EXP_ROUNDS 4
49
50 /* forward declarations */
51
52 /* mutual recursion with struct ConsensusSession */
53 struct ConsensusPeerInformation;
54
55 /* mutual recursion with round_over */
56 static void
57 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
58
59
60 /**
61  * Describes the current round a consensus session is in.
62  */
63 enum ConsensusRound
64 {
65   /**
66    * Not started the protocol yet.
67    */
68   CONSENSUS_ROUND_BEGIN=0,
69   /**
70    * Distribution of elements with the exponential scheme.
71    */
72   CONSENSUS_ROUND_EXCHANGE,
73   /**
74    * Exchange which elements each peer has, but don't
75    * transmit the element's data, only their SHA-512 hashes.
76    * This round uses the all-to-all scheme.
77    */
78   CONSENSUS_ROUND_INVENTORY,
79   /**
80    * Collect and distribute missing values with the exponential scheme.
81    */
82   CONSENSUS_ROUND_COMPLETION,
83   /**
84    * Consensus concluded. After timeout and finished communication with client,
85    * consensus session will be destroyed.
86    */
87   CONSENSUS_ROUND_FINISH
88 };
89
90
91 /**
92  * Complete information about the current round and all
93  * subrounds.
94  */
95 struct RoundInfo
96 {
97   /**
98    * The current main round.
99    */
100   enum ConsensusRound round;
101   /**
102    * The current exp round, valid if
103    * the main round is an exp round.
104    */
105   uint32_t exp_round;
106   /**
107    * The current exp subround, valid if
108    * the main round is an exp round.
109    */
110   uint32_t exp_subround;
111 };
112
113
114 /**
115  * A consensus session consists of one local client and the remote authorities.
116  */
117 struct ConsensusSession
118 {
119   /**
120    * Consensus sessions are kept in a DLL.
121    */
122   struct ConsensusSession *next;
123
124   /**
125    * Consensus sessions are kept in a DLL.
126    */
127   struct ConsensusSession *prev;
128
129   /**
130   * Global consensus identification, computed
131   * from the session id and participating authorities.
132   */
133   struct GNUNET_HashCode global_id;
134
135   /**
136    * Client that inhabits the session
137    */
138   struct GNUNET_SERVER_Client *client;
139
140   /**
141    * Queued messages to the client.
142    */
143   struct GNUNET_MQ_Handle *client_mq;
144
145   /**
146    * Timeout for all rounds together, single rounds will schedule a timeout task
147    * with a fraction of the conclude timeout.
148    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
149    */
150   struct GNUNET_TIME_Relative conclude_timeout;
151   
152   /**
153    * Timeout task identifier for the current round.
154    */
155   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
156
157   /**
158    * Number of other peers in the consensus.
159    */
160   unsigned int num_peers;
161
162   /**
163    * Information about the other peers,
164    * their state, etc.
165    */
166   struct ConsensusPeerInformation *info;
167
168   /**
169    * Index of the local peer in the peers array
170    */
171   unsigned int local_peer_idx;
172
173   /**
174    * Current round
175    */
176   enum ConsensusRound current_round;
177
178   /**
179    * Permutation of peers for the current round,
180    * maps logical index (for current round) to physical index (location in info array)
181    */
182   uint32_t *shuffle;
183
184   /**
185    * Current round of the exponential scheme.
186    */
187   uint32_t exp_round;
188
189   /**
190    * Current sub-round of the exponential scheme.
191    */
192   uint32_t exp_subround;
193
194   /**
195    * The partner for the current exp-round
196    */
197   struct ConsensusPeerInformation *partner_outgoing;
198
199   /**
200    * The partner for the current exp-round
201    */
202   struct ConsensusPeerInformation *partner_incoming;
203
204   /**
205    * The consensus set of this session.
206    */
207   struct GNUNET_SET_Handle *element_set;
208
209   /**
210    * Listener for requests from other peers.
211    * Uses the session's global id as app id.
212    */
213   struct GNUNET_SET_ListenHandle *set_listener;
214 };
215
216
217 /**
218  * Information about a peer that is in a consensus session.
219  */
220 struct ConsensusPeerInformation
221 {
222   /**
223    * Peer identitty of the peer in the consensus session
224    */
225   struct GNUNET_PeerIdentity peer_id;
226
227   /**
228    * Back-reference to the consensus session,
229    * to that ConsensusPeerInformation can be used as a closure
230    */
231   struct ConsensusSession *session;
232
233   /**
234    * We have finishes the exp-subround with the peer.
235    */
236   int exp_subround_finished;
237
238   /**
239    * Set operation we are currently executing with this peer.
240    */
241   struct GNUNET_SET_OperationHandle *set_op;
242
243   /**
244    * Set operation we are planning on executing with this peer.
245    */
246   struct GNUNET_SET_OperationHandle *delayed_set_op;
247
248   /**
249    * Info about the round of the delayed set operation.
250    */
251   struct RoundInfo delayed_round_info;
252 };
253
254
255 /**
256  * Linked list of sessions this peer participates in.
257  */
258 static struct ConsensusSession *sessions_head;
259
260 /**
261  * Linked list of sessions this peer participates in.
262  */
263 static struct ConsensusSession *sessions_tail;
264
265 /**
266  * Configuration of the consensus service.
267  */
268 static const struct GNUNET_CONFIGURATION_Handle *cfg;
269
270 /**
271  * Handle to the server for this service.
272  */
273 static struct GNUNET_SERVER_Handle *srv;
274
275 /**
276  * Peer that runs this service.
277  */
278 static struct GNUNET_PeerIdentity my_peer;
279
280
281 static int
282 have_exp_subround_finished (const struct ConsensusSession *session)
283 {
284   int not_finished;
285   not_finished = 0;
286   if ( (NULL != session->partner_outgoing) && 
287        (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
288     not_finished++;
289   if ( (NULL != session->partner_incoming) &&
290        (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
291     not_finished++;
292   if (0 == not_finished)
293     return GNUNET_YES;
294   return GNUNET_NO;
295 }
296
297
298 /**
299  * Destroy a session, free all resources associated with it.
300  * 
301  * @param session the session to destroy
302  */
303 static void
304 destroy_session (struct ConsensusSession *session)
305 {
306   int i;
307
308   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
309   if (NULL != session->element_set)
310   {
311     GNUNET_SET_destroy (session->element_set);
312     session->element_set = NULL;
313   }
314   if (NULL != session->set_listener)
315   {
316     GNUNET_SET_listen_cancel (session->set_listener);
317     session->set_listener = NULL;
318   }
319   if (NULL != session->client_mq)
320   {
321     GNUNET_MQ_destroy (session->client_mq);
322     session->client_mq = NULL;
323   }
324   if (NULL != session->client)
325   {
326     GNUNET_SERVER_client_disconnect (session->client);
327     session->client = NULL;
328   }
329   if (NULL != session->shuffle)
330   {
331     GNUNET_free (session->shuffle);
332     session->shuffle = NULL;
333   }
334   if (NULL != session->info)
335   {
336     for (i = 0; i < session->num_peers; i++)
337     {
338       struct ConsensusPeerInformation *cpi;
339       cpi = &session->info[i];
340       if (NULL != cpi->set_op)
341       {
342         GNUNET_SET_operation_cancel (cpi->set_op);
343         cpi->set_op = NULL;
344       }
345     }
346     GNUNET_free (session->info);
347     session->info = NULL;
348   }
349   GNUNET_free (session);
350 }
351
352
353 /**
354  * Iterator for set elements.
355  *
356  * @param cls closure
357  * @param element the current element, NULL if all elements have been
358  *        iterated over
359  * @return GNUNET_YES to continue iterating, GNUNET_NO to stop.
360  */
361 static int
362 send_to_client_iter (void *cls,
363                      const struct GNUNET_SET_Element *element)
364 {
365   struct ConsensusSession *session = cls;
366   struct GNUNET_MQ_Envelope *ev;
367
368   if (NULL != element)
369   {
370     struct GNUNET_CONSENSUS_ElementMessage *m;
371
372     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n",
373                 session->local_peer_idx);
374
375     ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
376     m->element_type = htons (element->type);
377     memcpy (&m[1], element->data, element->size);
378     GNUNET_MQ_send (session->client_mq, ev);
379   }
380   else
381   {
382     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n",
383                 session->local_peer_idx);
384     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
385     GNUNET_MQ_send (session->client_mq, ev);
386   }
387   return GNUNET_YES;
388 }
389
390
391 /**
392  * Start the next round.
393  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
394  *
395  * @param cls the session
396  * @param tc task context, for when this task is invoked by the scheduler,
397  *           NULL if invoked for another reason
398  */
399 static void 
400 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
401 {
402   struct ConsensusSession *session;
403
404   /* don't kick off next round if we're shutting down */
405   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
406     return;
407
408   session = cls;
409   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
410
411   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
412   {
413     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
414     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
415   }
416
417   switch (session->current_round)
418   {
419     case CONSENSUS_ROUND_BEGIN:
420       session->current_round = CONSENSUS_ROUND_EXCHANGE;
421       session->exp_round = 0;
422       subround_over (session, NULL);
423       break;
424     case CONSENSUS_ROUND_EXCHANGE:
425       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
426                   session->local_peer_idx);
427       session->current_round = CONSENSUS_ROUND_FINISH;
428       GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
429       break;
430     default:
431       GNUNET_assert (0);
432   }
433 }
434
435
436 /**
437  * Create a new permutation for the session's peers in session->shuffle.
438  * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
439  * both the global session id and the current round index.
440  *
441  * @param session the session to create the new permutation for
442  */
443 static void
444 shuffle (struct ConsensusSession *session)
445 {
446   uint32_t i;
447   uint32_t randomness[session->num_peers-1];
448
449   if (NULL == session->shuffle)
450     session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
451
452   GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), 
453                      &session->exp_round, sizeof (uint32_t),
454                      &session->global_id, sizeof (struct GNUNET_HashCode),
455                      NULL);
456
457   for (i = 0; i < session->num_peers; i++)
458     session->shuffle[i] = i;
459
460   for (i = session->num_peers - 1; i > 0; i--)
461   {
462     uint32_t x;
463     uint32_t tmp;
464     x = randomness[i-1] % session->num_peers;
465     tmp = session->shuffle[x];
466     session->shuffle[x] = session->shuffle[i];
467     session->shuffle[i] = tmp;
468   }
469 }
470
471
472 /**
473  * Find and set the partner_incoming and partner_outgoing of our peer,
474  * one of them may not exist (and thus set to NULL) if the number of peers
475  * in the session is not a power of two.
476  *
477  * @param session the consensus session
478  */
479 static void
480 find_partners (struct ConsensusSession *session)
481 {
482   unsigned int arc;
483   unsigned int num_ghosts;
484   unsigned int largest_arc;
485   int partner_idx;
486
487   /* shuffled local index */
488   int my_idx = session->shuffle[session->local_peer_idx];
489
490   /* distance to neighboring peer in current subround */
491   arc = 1 << session->exp_subround;
492   largest_arc = 1;
493   while (largest_arc < session->num_peers)
494     largest_arc <<= 1;
495   num_ghosts = largest_arc - session->num_peers;
496
497   if (0 == (my_idx & arc))
498   {
499     /* we are outgoing */
500     partner_idx = (my_idx + arc) % session->num_peers;
501     session->partner_outgoing = &session->info[session->shuffle[partner_idx]];
502     session->partner_outgoing->exp_subround_finished = GNUNET_NO;
503     /* are we a 'ghost' of a peer that would exist if
504      * the number of peers was a power of two, and thus have to partner
505      * with an additional peer?
506      */
507     if (my_idx < num_ghosts)
508     {
509       int ghost_partner_idx;
510       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
511       ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
512       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is before %d\n", ghost_partner_idx);
513       /* platform dependent; modulo sometimes returns negative values */
514       if (ghost_partner_idx < 0)
515         ghost_partner_idx += session->num_peers;
516       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is after %d\n", ghost_partner_idx);
517       session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]];
518       session->partner_incoming->exp_subround_finished = GNUNET_NO;
519       return;
520     }
521     session->partner_incoming = NULL;
522     return;
523   }
524   partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
525   if (partner_idx < 0)
526     partner_idx += session->num_peers;
527   session->partner_outgoing = NULL;
528   session->partner_incoming = &session->info[session->shuffle[partner_idx]];
529   session->partner_incoming->exp_subround_finished = GNUNET_NO;
530 }
531
532
533 /**
534  * Callback for set operation results. Called for each element
535  * in the result set.
536  *
537  * @param cls closure
538  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
539  * @param status see enum GNUNET_SET_Status
540  */
541 static void 
542 set_result_cb (void *cls,
543                const struct GNUNET_SET_Element *element,
544                enum GNUNET_SET_Status status)
545 {
546   struct ConsensusPeerInformation *cpi = cls;
547   unsigned int remote_idx = cpi - cpi->session->info;
548   unsigned int local_idx = cpi->session->local_peer_idx;
549
550   GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
551                  (cpi == cpi->session->partner_incoming));
552
553   switch (status)
554   {
555     case GNUNET_SET_STATUS_OK:
556       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
557                   local_idx, remote_idx);
558       break;
559     case GNUNET_SET_STATUS_FAILURE:
560       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
561                   local_idx, remote_idx);
562       cpi->set_op = NULL;
563       return;
564     case GNUNET_SET_STATUS_HALF_DONE:
565     case GNUNET_SET_STATUS_DONE:
566       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
567                   local_idx, remote_idx);
568       cpi->exp_subround_finished = GNUNET_YES;
569       cpi->set_op = NULL;
570       if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
571       {
572         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
573                     local_idx);
574         subround_over (cpi->session, NULL);
575       }
576       else
577       {
578         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
579                     local_idx);
580       }
581       return;
582     default:
583       GNUNET_break (0);
584       return;
585   }
586
587   switch (cpi->session->current_round)
588   {
589     case CONSENSUS_ROUND_EXCHANGE:
590       GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
591       break;
592     default:
593       GNUNET_break (0);
594       return;
595   }
596 }
597
598
599 /**
600  * Compare the round the session is in with the round of the given context message.
601  *
602  * @param session a consensus session
603  * @param round a round context message
604  * @return 0 if it's the same round, -1 if the session is in an earlier round,
605  *         1 if the session is in a later round
606  */
607 static int
608 rounds_compare (struct ConsensusSession *session,
609                 struct RoundInfo* ri)
610 {
611   if (session->current_round < ri->round)
612     return -1;
613   if (session->current_round > ri->round)
614     return 1;
615   if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
616   {
617     if (session->exp_round < ri->exp_round)
618       return -1;
619     if (session->exp_round > ri->exp_round)
620       return 1;
621     if (session->exp_subround < ri->exp_subround)
622       return -1;
623     if (session->exp_subround < ri->exp_subround)
624       return 1;
625     return 0;
626   }
627   /* comparing rounds when we are not in a exp round */
628   GNUNET_assert (0);
629 }
630
631
632 /**
633  * Do the next subround in the exp-scheme.
634  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
635  *
636  * @param cls the session
637  * @param tc task context, for when this task is invoked by the scheduler,
638  *           NULL if invoked for another reason
639  */
640 static void
641 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
642 {
643   struct ConsensusSession *session;
644   int i;
645
646   /* don't kick off next subround if we're shutting down */
647   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
648     return;
649   session = cls;
650   /* cancel timeout */
651   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
652   {
653     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
654     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
655   }
656   
657   if (session->exp_round >= NUM_EXP_ROUNDS)
658   {
659     round_over (session, NULL);
660     return;
661   }
662
663   if (session->exp_round == 0)
664   {
665     /* initialize everything for the log-rounds */
666     session->exp_round = 1;
667     session->exp_subround = 0;
668     if (NULL == session->shuffle)
669       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
670     for (i = 0; i < session->num_peers; i++)
671       session->shuffle[i] = i;
672   }
673   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
674   {
675     /* subrounds done, start new log-round */
676     session->exp_round++;
677     session->exp_subround = 0;
678     //shuffle (session);
679   }
680   else 
681   {
682     session->exp_subround++;
683   }
684
685   /* determine the incoming and outgoing partner */
686   find_partners (session);
687
688   GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
689   GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
690
691   /* initiate set operation with the outgoing partner */
692   if (NULL != session->partner_outgoing)
693   {
694     struct GNUNET_CONSENSUS_RoundContextMessage *msg;
695     msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
696     msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
697     msg->header.size = htons (sizeof *msg);
698     msg->round = htonl (session->current_round);
699     msg->exp_round = htonl (session->exp_round);
700     msg->exp_subround = htonl (session->exp_subround);
701
702     if (NULL != session->partner_outgoing->set_op)
703     {
704       GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
705     }
706     session->partner_outgoing->set_op =
707         GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
708                             &session->global_id,
709                             (struct GNUNET_MessageHeader *) msg,
710                             0, /* FIXME: salt */
711                             GNUNET_SET_RESULT_ADDED,
712                             set_result_cb, session->partner_outgoing);
713     GNUNET_free (msg);
714     GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set);
715   }
716
717   /* commit to the delayed set operation */
718   if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
719   {
720     int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
721
722     if (NULL != session->partner_incoming->set_op)
723     {
724       GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
725       session->partner_incoming->set_op = NULL;
726     }
727     if (cmp == 0)
728     {
729       GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set);
730       session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
731       session->partner_incoming->delayed_set_op = NULL;
732       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
733                   session->local_peer_idx, (int) (session->partner_incoming - session->info));
734     }
735     else
736     {
737       /* this should not happen -- a round has been skipped! */
738       GNUNET_break_op (0);
739     }
740   }
741
742 #ifdef GNUNET_EXTRA_LOGGING
743   {
744     int in;
745     int out;
746     if (session->partner_outgoing == NULL)
747       out = -1;
748     else
749       out = (int) (session->partner_outgoing - session->info);
750     if (session->partner_incoming == NULL)
751       in = -1;
752     else
753       in = (int) (session->partner_incoming - session->info);
754     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
755                 session->exp_round, session->exp_subround, in, out);
756   }
757 #endif /* GNUNET_EXTRA_LOGGING */
758
759 }
760
761
762 /**
763  * Search peer in the list of peers in session.
764  *
765  * @param peer peer to find
766  * @param session session with peer
767  * @return index of peer, -1 if peer is not in session
768  */
769 static int
770 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
771 {
772   int i;
773   for (i = 0; i < session->num_peers; i++)
774     if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
775       return i;
776   return -1;
777 }
778
779
780 /**
781  * Compute a global, (hopefully) unique consensus session id,
782  * from the local id of the consensus session, and the identities of all participants.
783  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
784  * exactly the same peers, the global id will be different.
785  *
786  * @param session session to generate the global id for
787  * @param session_id local id of the consensus session
788  */
789 static void
790 compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id)
791 {
792   int i;
793   struct GNUNET_HashCode tmp;
794
795   /* FIXME: use kdf? */
796
797   session->global_id = *session_id;
798   for (i = 0; i < session->num_peers; ++i)
799   {
800     GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp);
801     session->global_id = tmp;
802     GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
803     session->global_id = tmp;
804   }
805 }
806
807
808 /**
809  * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
810  * the correct signature to be used with e.g. qsort.
811  * We use this function instead.
812  *
813  * @param h1 some hash code
814  * @param h2 some hash code
815  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
816  */
817 static int
818 hash_cmp (const void *h1, const void *h2)
819 {
820   return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
821 }
822
823
824 /**
825  * Create the sorted list of peers for the session,
826  * add the local peer if not in the join message.
827  */
828 static void
829 initialize_session_peer_list (struct ConsensusSession *session,
830                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
831 {
832   unsigned int local_peer_in_list;
833   uint32_t listed_peers;
834   const struct GNUNET_PeerIdentity *msg_peers;
835   struct GNUNET_PeerIdentity *peers;
836   unsigned int i;
837
838   GNUNET_assert (NULL != join_msg);
839
840   /* peers in the join message, may or may not include the local peer */
841   listed_peers = ntohl (join_msg->num_peers);
842   
843   session->num_peers = listed_peers;
844
845   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
846
847   local_peer_in_list = GNUNET_NO;
848   for (i = 0; i < listed_peers; i++)
849   {
850     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
851     {
852       local_peer_in_list = GNUNET_YES;
853       break;
854     }
855   }
856
857   if (GNUNET_NO == local_peer_in_list)
858     session->num_peers++;
859
860   peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
861
862   if (GNUNET_NO == local_peer_in_list)
863     peers[session->num_peers - 1] = my_peer;
864
865   memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
866   qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
867
868   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
869
870   for (i = 0; i < session->num_peers; ++i)
871   {
872     /* initialize back-references, so consensus peer information can
873      * be used as closure */
874     session->info[i].session = session;
875     session->info[i].peer_id = peers[i];
876   }
877
878   GNUNET_free (peers);
879 }
880
881
882 /**
883  * Called when another peer wants to do a set operation with the
884  * local peer.
885  *
886  * @param cls closure
887  * @param other_peer the other peer
888  * @param context_msg message with application specific information from
889  *        the other peer
890  * @param request request from the other peer, use GNUNET_SET_accept
891  *        to accept it, otherwise the request will be refused
892  *        Note that we don't use a return value here, as it is also
893  *        necessary to specify the set we want to do the operation with,
894  *        whith sometimes can be derived from the context message.
895  *        Also necessary to specify the timeout.
896  */
897 static void
898 set_listen_cb (void *cls,
899                const struct GNUNET_PeerIdentity *other_peer,
900                const struct GNUNET_MessageHeader *context_msg,
901                struct GNUNET_SET_Request *request)
902 {
903   struct ConsensusSession *session = cls;
904   struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
905   struct ConsensusPeerInformation *cpi;
906   struct GNUNET_SET_OperationHandle *set_op;
907   struct RoundInfo round_info;
908   int index;
909   int cmp;
910
911   if (NULL == context_msg)
912   {
913     GNUNET_break_op (0);
914     return;
915   }
916
917   index = get_peer_idx (other_peer, session);
918
919   if (index < 0)
920   {
921     GNUNET_break_op (0);
922     return;
923   }
924
925   round_info.round = ntohl (msg->round);
926   round_info.exp_round = ntohl (msg->exp_round);
927   round_info.exp_subround = ntohl (msg->exp_subround);
928
929   cpi = &session->info[index];
930
931   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
932
933   switch (session->current_round)
934   {
935     case CONSENSUS_ROUND_BEGIN:
936       /* we're in the begin round, so requests for the exchange round may
937        * come in, they will be delayed for now! */
938     case CONSENSUS_ROUND_EXCHANGE:
939       cmp = rounds_compare (session, &round_info);
940       if (cmp > 0)
941       {
942         /* the other peer is too late */
943         GNUNET_break_op (0);
944         return;
945       }
946       /* kill old request, if any. this is legal,
947        * as the other peer would not make a new request if it would want to
948        * complete the old one! */
949       if (NULL != cpi->set_op)
950       {
951         GNUNET_SET_operation_cancel (cpi->set_op);
952         cpi->set_op = NULL;
953       }
954       set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
955                                        set_result_cb, &session->info[index]);
956       if (cmp == 0)
957       {
958         cpi->set_op = set_op;
959         GNUNET_SET_commit (set_op, session->element_set);
960         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
961       }
962       else
963       {
964         /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */
965         cpi->delayed_set_op = set_op;
966         cpi->delayed_round_info = round_info;
967         cpi->exp_subround_finished = GNUNET_YES;
968         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
969       }
970       break;
971     default:
972       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
973                   session->local_peer_idx, session->current_round, index);
974       GNUNET_break_op (0);
975       return;
976   }
977 }
978
979
980 /**
981  * Initialize the session, continue receiving messages from the owning client
982  *
983  * @param session the session to initialize
984  * @param join_msg the join message from the client
985  */
986 static void
987 initialize_session (struct ConsensusSession *session,
988                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
989 {
990   struct ConsensusSession *other_session;
991
992   initialize_session_peer_list (session, join_msg);
993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
994   compute_global_id (session, &join_msg->session_id);
995
996   /* check if some local client already owns the session.
997    * it is only legal to have a session with an existing global id
998    * if all other sessions with this global id are finished.*/
999   other_session = sessions_head;
1000   while (NULL != other_session)
1001   {
1002     if ((other_session != session) && 
1003         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1004     {
1005       if (CONSENSUS_ROUND_FINISH != other_session->current_round)
1006       {
1007         GNUNET_break (0);
1008         destroy_session (session);
1009         return;
1010       }
1011       break;
1012     }
1013     other_session = other_session->next;
1014   }
1015
1016   session->local_peer_idx = get_peer_idx (&my_peer, session);
1017   GNUNET_assert (-1 != session->local_peer_idx);
1018   session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1019   GNUNET_assert (NULL != session->element_set);
1020   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
1021                                              &session->global_id,
1022                                              set_listen_cb, session);
1023   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
1024   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1025 }
1026
1027
1028 static struct ConsensusSession *
1029 get_session_by_client (struct GNUNET_SERVER_Client *client)
1030 {
1031   struct ConsensusSession *session;
1032
1033   session = sessions_head;
1034   while (NULL != session)
1035   {
1036     if (session->client == client)
1037       return session;
1038     session = session->next;
1039   }
1040   return NULL;
1041 }
1042
1043
1044 /**
1045  * Called when a client wants to join a consensus session.
1046  *
1047  * @param cls unused
1048  * @param client client that sent the message
1049  * @param m message sent by the client
1050  */
1051 static void
1052 client_join (void *cls,
1053              struct GNUNET_SERVER_Client *client,
1054              const struct GNUNET_MessageHeader *m)
1055 {
1056   struct ConsensusSession *session;
1057
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
1059
1060   session = get_session_by_client (client);
1061   if (NULL != session)
1062   {
1063     GNUNET_break (0);
1064     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1065     return;
1066   }
1067   session = GNUNET_new (struct ConsensusSession);
1068   session->client = client;
1069   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
1070   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1071   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
1072   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1073
1074   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
1075 }
1076
1077
1078 /**
1079  * Called when a client performs an insert operation.
1080  *
1081  * @param cls (unused)
1082  * @param client client handle
1083  * @param m message sent by the client
1084  */
1085 void
1086 client_insert (void *cls,
1087                struct GNUNET_SERVER_Client *client,
1088                const struct GNUNET_MessageHeader *m)
1089 {
1090   struct ConsensusSession *session;
1091   struct GNUNET_CONSENSUS_ElementMessage *msg;
1092   struct GNUNET_SET_Element *element;
1093   ssize_t element_size;
1094
1095   session = get_session_by_client (client);
1096
1097   if (NULL == session)
1098   {
1099     GNUNET_break (0);
1100     GNUNET_SERVER_client_disconnect (client);
1101     return;
1102   }
1103
1104   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1105   {
1106     GNUNET_break (0);
1107     GNUNET_SERVER_client_disconnect (client);
1108     return;
1109   }
1110
1111   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1112   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1113   if (element_size < 0)
1114   {
1115     GNUNET_break (0);
1116     return;
1117   }
1118
1119   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
1120   element->type = msg->element_type;
1121   element->size = element_size;
1122   memcpy (&element[1], &msg[1], element_size);
1123   element->data = &element[1];
1124   GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
1125   GNUNET_free (element);
1126   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1127
1128   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
1129 }
1130
1131
1132 /**
1133  * Called when a client performs the conclude operation.
1134  *
1135  * @param cls (unused)
1136  * @param client client handle
1137  * @param message message sent by the client
1138  */
1139 static void
1140 client_conclude (void *cls,
1141                  struct GNUNET_SERVER_Client *client,
1142                  const struct GNUNET_MessageHeader *message)
1143 {
1144   struct ConsensusSession *session;
1145   struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
1146
1147
1148   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
1149   cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
1150   session = get_session_by_client (client);
1151   if (NULL == session)
1152   {
1153     /* client not found */
1154     GNUNET_break (0);
1155     GNUNET_SERVER_client_disconnect (client);
1156     return;
1157   }
1158   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1159   {
1160     /* client requested conclude twice */
1161     GNUNET_break (0);
1162     return;
1163   }
1164   if (session->num_peers <= 1)
1165   {
1166     /* FIXME: what to do here? */
1167     //send_client_conclude_done (session);
1168   }
1169   else
1170   {
1171     session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
1172     /* the 'begin' round is over, start with the next, actual round */
1173     round_over (session, NULL);
1174   }
1175
1176   GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
1177   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1178 }
1179
1180
1181 /**
1182  * Called to clean up, after a shutdown has been requested.
1183  *
1184  * @param cls closure
1185  * @param tc context information (why was this task triggered now)
1186  */
1187 static void
1188 shutdown_task (void *cls,
1189                const struct GNUNET_SCHEDULER_TaskContext *tc)
1190 {
1191   while (NULL != sessions_head)
1192     destroy_session (sessions_head);
1193
1194   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1195 }
1196
1197
1198 /**
1199  * Clean up after a client after it is
1200  * disconnected (either by us or by itself)
1201  *
1202  * @param cls closure, unused
1203  * @param client the client to clean up after
1204  */
1205 void
1206 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1207 {
1208   struct ConsensusSession *session;
1209
1210   session = get_session_by_client (client);
1211   if (NULL == session)
1212     return;
1213   if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
1214       (CONSENSUS_ROUND_FINISH == session->current_round))
1215     destroy_session (session);
1216   else
1217     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1218 }
1219
1220
1221 /**
1222  * Start processing consensus requests.
1223  *
1224  * @param cls closure
1225  * @param server the initialized server
1226  * @param c configuration to use
1227  */
1228 static void
1229 run (void *cls, struct GNUNET_SERVER_Handle *server,
1230      const struct GNUNET_CONFIGURATION_Handle *c)
1231 {
1232   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1233     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1234         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
1235     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1236     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1237     {NULL, NULL, 0, 0}
1238   };
1239
1240   cfg = c;
1241   srv = server;
1242   if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &my_peer))
1243   {
1244     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
1245     GNUNET_break (0);
1246     GNUNET_SCHEDULER_shutdown ();
1247     return;
1248   }
1249   GNUNET_SERVER_add_handlers (server, server_handlers);
1250   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1251   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
1252   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1253 }
1254
1255
1256 /**
1257  * The main function for the consensus service.
1258  *
1259  * @param argc number of arguments from the command line
1260  * @param argv command line arguments
1261  * @return 0 ok, 1 on error
1262  */
1263 int
1264 main (int argc, char *const *argv)
1265 {
1266   int ret;
1267   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1268   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
1269   return (GNUNET_OK == ret) ? 0 : 1;
1270 }
1271