a490ad7f42faa8f66129978b883b5eac337e2cdb
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_common.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
36
37
38 /**
39  * Log macro that prefixes the local peer and the peer we are in contact with.
40  */
41 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
42    cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
43
44
45 /**
46  * Number of exponential rounds, used in the exp and completion round.
47  */
48 #define NUM_EXP_ROUNDS 4
49
50 /* forward declarations */
51
52 /* mutual recursion with struct ConsensusSession */
53 struct ConsensusPeerInformation;
54
55 /* mutual recursion with round_over */
56 static void
57 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
58
59
60 /**
61  * Describes the current round a consensus session is in.
62  */
63 enum ConsensusRound
64 {
65   /**
66    * Not started the protocol yet.
67    */
68   CONSENSUS_ROUND_BEGIN=0,
69   /**
70    * Distribution of elements with the exponential scheme.
71    */
72   CONSENSUS_ROUND_EXCHANGE,
73   /**
74    * Exchange which elements each peer has, but don't
75    * transmit the element's data, only their SHA-512 hashes.
76    * This round uses the all-to-all scheme.
77    */
78   CONSENSUS_ROUND_INVENTORY,
79   /**
80    * Collect and distribute missing values with the exponential scheme.
81    */
82   CONSENSUS_ROUND_COMPLETION,
83   /**
84    * Consensus concluded. After timeout and finished communication with client,
85    * consensus session will be destroyed.
86    */
87   CONSENSUS_ROUND_FINISH
88 };
89
90
91 /**
92  * Complete information about the current round and all
93  * subrounds.
94  */
95 struct RoundInfo
96 {
97   /**
98    * The current main round.
99    */
100   enum ConsensusRound round;
101   /**
102    * The current exp round, valid if
103    * the main round is an exp round.
104    */
105   uint32_t exp_round;
106   /**
107    * The current exp subround, valid if
108    * the main round is an exp round.
109    */
110   uint32_t exp_subround;
111 };
112
113
114 /**
115  * A consensus session consists of one local client and the remote authorities.
116  */
117 struct ConsensusSession
118 {
119   /**
120    * Consensus sessions are kept in a DLL.
121    */
122   struct ConsensusSession *next;
123
124   /**
125    * Consensus sessions are kept in a DLL.
126    */
127   struct ConsensusSession *prev;
128
129   /**
130   * Global consensus identification, computed
131   * from the session id and participating authorities.
132   */
133   struct GNUNET_HashCode global_id;
134
135   /**
136    * Client that inhabits the session
137    */
138   struct GNUNET_SERVER_Client *client;
139
140   /**
141    * Queued messages to the client.
142    */
143   struct GNUNET_MQ_Handle *client_mq;
144
145   /**
146    * Timeout for all rounds together, single rounds will schedule a timeout task
147    * with a fraction of the conclude timeout.
148    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
149    */
150   struct GNUNET_TIME_Relative conclude_timeout;
151   
152   /**
153    * Timeout task identifier for the current round.
154    */
155   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
156
157   /**
158    * Number of other peers in the consensus.
159    */
160   unsigned int num_peers;
161
162   /**
163    * Information about the other peers,
164    * their state, etc.
165    */
166   struct ConsensusPeerInformation *info;
167
168   /**
169    * Index of the local peer in the peers array
170    */
171   unsigned int local_peer_idx;
172
173   /**
174    * Current round
175    */
176   enum ConsensusRound current_round;
177
178   /**
179    * Permutation of peers for the current round,
180    */
181   uint32_t *shuffle;
182
183   /**
184    * Inverse permutation of peers for the current round,
185    */
186   uint32_t *shuffle_inv;
187
188   /**
189    * Current round of the exponential scheme.
190    */
191   uint32_t exp_round;
192
193   /**
194    * Current sub-round of the exponential scheme.
195    */
196   uint32_t exp_subround;
197
198   /**
199    * The partner for the current exp-round
200    */
201   struct ConsensusPeerInformation *partner_outgoing;
202
203   /**
204    * The partner for the current exp-round
205    */
206   struct ConsensusPeerInformation *partner_incoming;
207
208   /**
209    * The consensus set of this session.
210    */
211   struct GNUNET_SET_Handle *element_set;
212
213   /**
214    * Listener for requests from other peers.
215    * Uses the session's global id as app id.
216    */
217   struct GNUNET_SET_ListenHandle *set_listener;
218 };
219
220
221 /**
222  * Information about a peer that is in a consensus session.
223  */
224 struct ConsensusPeerInformation
225 {
226   /**
227    * Peer identitty of the peer in the consensus session
228    */
229   struct GNUNET_PeerIdentity peer_id;
230
231   /**
232    * Back-reference to the consensus session,
233    * to that ConsensusPeerInformation can be used as a closure
234    */
235   struct ConsensusSession *session;
236
237   /**
238    * We have finishes the exp-subround with the peer.
239    */
240   int exp_subround_finished;
241
242   /**
243    * Set operation we are currently executing with this peer.
244    */
245   struct GNUNET_SET_OperationHandle *set_op;
246
247   /**
248    * Set operation we are planning on executing with this peer.
249    */
250   struct GNUNET_SET_OperationHandle *delayed_set_op;
251
252   /**
253    * Info about the round of the delayed set operation.
254    */
255   struct RoundInfo delayed_round_info;
256 };
257
258
259 /**
260  * Linked list of sessions this peer participates in.
261  */
262 static struct ConsensusSession *sessions_head;
263
264 /**
265  * Linked list of sessions this peer participates in.
266  */
267 static struct ConsensusSession *sessions_tail;
268
269 /**
270  * Configuration of the consensus service.
271  */
272 static const struct GNUNET_CONFIGURATION_Handle *cfg;
273
274 /**
275  * Handle to the server for this service.
276  */
277 static struct GNUNET_SERVER_Handle *srv;
278
279 /**
280  * Peer that runs this service.
281  */
282 static struct GNUNET_PeerIdentity my_peer;
283
284
285 static int
286 have_exp_subround_finished (const struct ConsensusSession *session)
287 {
288   int not_finished;
289   not_finished = 0;
290   if ( (NULL != session->partner_outgoing) && 
291        (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
292     not_finished++;
293   if ( (NULL != session->partner_incoming) &&
294        (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
295     not_finished++;
296   if (0 == not_finished)
297     return GNUNET_YES;
298   return GNUNET_NO;
299 }
300
301
302 /**
303  * Destroy a session, free all resources associated with it.
304  * 
305  * @param session the session to destroy
306  */
307 static void
308 destroy_session (struct ConsensusSession *session)
309 {
310   int i;
311
312   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
313   if (NULL != session->element_set)
314   {
315     GNUNET_SET_destroy (session->element_set);
316     session->element_set = NULL;
317   }
318   if (NULL != session->set_listener)
319   {
320     GNUNET_SET_listen_cancel (session->set_listener);
321     session->set_listener = NULL;
322   }
323   if (NULL != session->client_mq)
324   {
325     GNUNET_MQ_destroy (session->client_mq);
326     session->client_mq = NULL;
327   }
328   if (NULL != session->client)
329   {
330     GNUNET_SERVER_client_disconnect (session->client);
331     session->client = NULL;
332   }
333   if (NULL != session->shuffle)
334   {
335     GNUNET_free (session->shuffle);
336     session->shuffle = NULL;
337   }
338   if (NULL != session->shuffle_inv)
339   {
340     GNUNET_free (session->shuffle_inv);
341     session->shuffle_inv = NULL;
342   }
343   if (NULL != session->info)
344   {
345     for (i = 0; i < session->num_peers; i++)
346     {
347       struct ConsensusPeerInformation *cpi;
348       cpi = &session->info[i];
349       if (NULL != cpi->set_op)
350       {
351         GNUNET_SET_operation_cancel (cpi->set_op);
352         cpi->set_op = NULL;
353       }
354     }
355     GNUNET_free (session->info);
356     session->info = NULL;
357   }
358   GNUNET_free (session);
359 }
360
361
362 /**
363  * Iterator for set elements.
364  *
365  * @param cls closure
366  * @param element the current element, NULL if all elements have been
367  *        iterated over
368  * @return GNUNET_YES to continue iterating, GNUNET_NO to stop.
369  */
370 static int
371 send_to_client_iter (void *cls,
372                      const struct GNUNET_SET_Element *element)
373 {
374   struct ConsensusSession *session = cls;
375   struct GNUNET_MQ_Envelope *ev;
376
377   if (NULL != element)
378   {
379     struct GNUNET_CONSENSUS_ElementMessage *m;
380
381     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n",
382                 session->local_peer_idx);
383
384     ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
385     m->element_type = htons (element->type);
386     memcpy (&m[1], element->data, element->size);
387     GNUNET_MQ_send (session->client_mq, ev);
388   }
389   else
390   {
391     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n",
392                 session->local_peer_idx);
393     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
394     GNUNET_MQ_send (session->client_mq, ev);
395   }
396   return GNUNET_YES;
397 }
398
399
400 /**
401  * Start the next round.
402  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
403  *
404  * @param cls the session
405  * @param tc task context, for when this task is invoked by the scheduler,
406  *           NULL if invoked for another reason
407  */
408 static void 
409 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
410 {
411   struct ConsensusSession *session;
412
413   /* don't kick off next round if we're shutting down */
414   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
415     return;
416
417   session = cls;
418   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
419
420   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
421   {
422     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
423     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
424   }
425
426   switch (session->current_round)
427   {
428     case CONSENSUS_ROUND_BEGIN:
429       session->current_round = CONSENSUS_ROUND_EXCHANGE;
430       session->exp_round = 0;
431       subround_over (session, NULL);
432       break;
433     case CONSENSUS_ROUND_EXCHANGE:
434       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
435                   session->local_peer_idx);
436       session->current_round = CONSENSUS_ROUND_FINISH;
437       GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
438       break;
439     default:
440       GNUNET_assert (0);
441   }
442 }
443
444
445 /**
446  * Create a new permutation for the session's peers in session->shuffle.
447  * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
448  * both the global session id and the current round index.
449  *
450  * @param session the session to create the new permutation for
451  */
452 static void
453 shuffle (struct ConsensusSession *session)
454 {
455   uint32_t i;
456   uint32_t randomness[session->num_peers-1];
457
458   if (NULL == session->shuffle)
459     session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
460   if (NULL == session->shuffle_inv)
461     session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
462
463   GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), 
464                      &session->exp_round, sizeof (uint32_t),
465                      &session->global_id, sizeof (struct GNUNET_HashCode),
466                      NULL);
467
468   for (i = 0; i < session->num_peers; i++)
469     session->shuffle[i] = i;
470
471   for (i = session->num_peers - 1; i > 0; i--)
472   {
473     uint32_t x;
474     uint32_t tmp;
475     x = randomness[i-1] % session->num_peers;
476     tmp = session->shuffle[x];
477     session->shuffle[x] = session->shuffle[i];
478     session->shuffle[i] = tmp;
479   }
480
481   /* create the inverse */
482   for (i = 0; i < session->num_peers; i++)
483     session->shuffle_inv[session->shuffle[i]] = i;
484 }
485
486
487 /**
488  * Find and set the partner_incoming and partner_outgoing of our peer,
489  * one of them may not exist (and thus set to NULL) if the number of peers
490  * in the session is not a power of two.
491  *
492  * @param session the consensus session
493  */
494 static void
495 find_partners (struct ConsensusSession *session)
496 {
497   unsigned int arc;
498   unsigned int num_ghosts;
499   unsigned int largest_arc;
500   int partner_idx;
501
502   /* shuffled local index */
503   int my_idx = session->shuffle[session->local_peer_idx];
504
505   /* distance to neighboring peer in current subround */
506   arc = 1 << session->exp_subround;
507   largest_arc = 1;
508   while (largest_arc < session->num_peers)
509     largest_arc <<= 1;
510   num_ghosts = largest_arc - session->num_peers;
511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
512   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
513   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
514
515   if (0 == (my_idx & arc))
516   {
517     /* we are outgoing */
518     partner_idx = (my_idx + arc) % session->num_peers;
519     session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
520     session->partner_outgoing->exp_subround_finished = GNUNET_NO;
521     /* are we a 'ghost' of a peer that would exist if
522      * the number of peers was a power of two, and thus have to partner
523      * with an additional peer?
524      */
525     if (my_idx < num_ghosts)
526     {
527       int ghost_partner_idx;
528       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
529       ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
530       /* platform dependent; modulo sometimes returns negative values */
531       if (ghost_partner_idx < 0)
532         ghost_partner_idx += session->num_peers;
533       /* we only need to have a ghost partner if the partner is outgoing */
534       if (0 == (ghost_partner_idx & arc))
535       {
536         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
537         session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
538         session->partner_incoming->exp_subround_finished = GNUNET_NO;
539         return;
540       }
541     }
542     session->partner_incoming = NULL;
543     return;
544   }
545   /* we only have an incoming connection */
546   partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
547   if (partner_idx < 0)
548     partner_idx += session->num_peers;
549   session->partner_outgoing = NULL;
550   session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
551   session->partner_incoming->exp_subround_finished = GNUNET_NO;
552 }
553
554
555 /**
556  * Callback for set operation results. Called for each element
557  * in the result set.
558  *
559  * @param cls closure
560  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
561  * @param status see enum GNUNET_SET_Status
562  */
563 static void 
564 set_result_cb (void *cls,
565                const struct GNUNET_SET_Element *element,
566                enum GNUNET_SET_Status status)
567 {
568   struct ConsensusPeerInformation *cpi = cls;
569   unsigned int remote_idx = cpi - cpi->session->info;
570   unsigned int local_idx = cpi->session->local_peer_idx;
571
572   GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
573                  (cpi == cpi->session->partner_incoming));
574
575   switch (status)
576   {
577     case GNUNET_SET_STATUS_OK:
578       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
579                   local_idx, remote_idx);
580       break;
581     case GNUNET_SET_STATUS_FAILURE:
582       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
583                   local_idx, remote_idx);
584       cpi->set_op = NULL;
585       return;
586     case GNUNET_SET_STATUS_HALF_DONE:
587     case GNUNET_SET_STATUS_DONE:
588       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
589                   local_idx, remote_idx);
590       cpi->exp_subround_finished = GNUNET_YES;
591       cpi->set_op = NULL;
592       if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
593       {
594         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
595                     local_idx);
596         subround_over (cpi->session, NULL);
597       }
598       else
599       {
600         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
601                     local_idx);
602       }
603       return;
604     default:
605       GNUNET_break (0);
606       return;
607   }
608
609   switch (cpi->session->current_round)
610   {
611     case CONSENSUS_ROUND_EXCHANGE:
612       GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
613       break;
614     default:
615       GNUNET_break (0);
616       return;
617   }
618 }
619
620
621 /**
622  * Compare the round the session is in with the round of the given context message.
623  *
624  * @param session a consensus session
625  * @param round a round context message
626  * @return 0 if it's the same round, -1 if the session is in an earlier round,
627  *         1 if the session is in a later round
628  */
629 static int
630 rounds_compare (struct ConsensusSession *session,
631                 struct RoundInfo* ri)
632 {
633   if (session->current_round < ri->round)
634     return -1;
635   if (session->current_round > ri->round)
636     return 1;
637   if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
638   {
639     if (session->exp_round < ri->exp_round)
640       return -1;
641     if (session->exp_round > ri->exp_round)
642       return 1;
643     if (session->exp_subround < ri->exp_subround)
644       return -1;
645     if (session->exp_subround < ri->exp_subround)
646       return 1;
647     return 0;
648   }
649   /* comparing rounds when we are not in a exp round */
650   GNUNET_assert (0);
651 }
652
653
654 /**
655  * Do the next subround in the exp-scheme.
656  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
657  *
658  * @param cls the session
659  * @param tc task context, for when this task is invoked by the scheduler,
660  *           NULL if invoked for another reason
661  */
662 static void
663 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
664 {
665   struct ConsensusSession *session;
666   int i;
667
668   /* don't kick off next subround if we're shutting down */
669   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
670     return;
671   session = cls;
672   /* cancel timeout */
673   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
674   {
675     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
676     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
677   }
678   
679   if (session->exp_round >= NUM_EXP_ROUNDS)
680   {
681     round_over (session, NULL);
682     return;
683   }
684
685   if (session->exp_round == 0)
686   {
687     /* initialize everything for the log-rounds */
688     session->exp_round = 1;
689     session->exp_subround = 0;
690     if (NULL == session->shuffle)
691       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
692     if (NULL == session->shuffle_inv)
693       session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
694     for (i = 0; i < session->num_peers; i++)
695       session->shuffle[i] = session->shuffle_inv[i] = i;
696   }
697   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
698   {
699     /* subrounds done, start new log-round */
700     session->exp_round++;
701     session->exp_subround = 0;
702     shuffle (session);
703   }
704   else 
705   {
706     session->exp_subround++;
707   }
708
709   /* determine the incoming and outgoing partner */
710   find_partners (session);
711
712   GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
713   GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
714
715   /* initiate set operation with the outgoing partner */
716   if (NULL != session->partner_outgoing)
717   {
718     struct GNUNET_CONSENSUS_RoundContextMessage *msg;
719     msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
720     msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
721     msg->header.size = htons (sizeof *msg);
722     msg->round = htonl (session->current_round);
723     msg->exp_round = htonl (session->exp_round);
724     msg->exp_subround = htonl (session->exp_subround);
725
726     if (NULL != session->partner_outgoing->set_op)
727     {
728       GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
729     }
730     session->partner_outgoing->set_op =
731         GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
732                             &session->global_id,
733                             (struct GNUNET_MessageHeader *) msg,
734                             0, /* FIXME: salt */
735                             GNUNET_SET_RESULT_ADDED,
736                             set_result_cb, session->partner_outgoing);
737     GNUNET_free (msg);
738     GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set);
739   }
740
741   /* commit to the delayed set operation */
742   if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
743   {
744     int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
745
746     if (NULL != session->partner_incoming->set_op)
747     {
748       GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
749       session->partner_incoming->set_op = NULL;
750     }
751     if (cmp == 0)
752     {
753       GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set);
754       session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
755       session->partner_incoming->delayed_set_op = NULL;
756       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
757                   session->local_peer_idx, (int) (session->partner_incoming - session->info));
758     }
759     else
760     {
761       /* this should not happen -- a round has been skipped! */
762       GNUNET_break_op (0);
763     }
764   }
765
766 #ifdef GNUNET_EXTRA_LOGGING
767   {
768     int in;
769     int out;
770     if (session->partner_outgoing == NULL)
771       out = -1;
772     else
773       out = (int) (session->partner_outgoing - session->info);
774     if (session->partner_incoming == NULL)
775       in = -1;
776     else
777       in = (int) (session->partner_incoming - session->info);
778     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
779                 session->exp_round, session->exp_subround, in, out);
780   }
781 #endif /* GNUNET_EXTRA_LOGGING */
782
783 }
784
785
786 /**
787  * Search peer in the list of peers in session.
788  *
789  * @param peer peer to find
790  * @param session session with peer
791  * @return index of peer, -1 if peer is not in session
792  */
793 static int
794 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
795 {
796   int i;
797   for (i = 0; i < session->num_peers; i++)
798     if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
799       return i;
800   return -1;
801 }
802
803
804 /**
805  * Compute a global, (hopefully) unique consensus session id,
806  * from the local id of the consensus session, and the identities of all participants.
807  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
808  * exactly the same peers, the global id will be different.
809  *
810  * @param session session to generate the global id for
811  * @param session_id local id of the consensus session
812  */
813 static void
814 compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id)
815 {
816   int i;
817   struct GNUNET_HashCode tmp;
818
819   /* FIXME: use kdf? */
820
821   session->global_id = *session_id;
822   for (i = 0; i < session->num_peers; ++i)
823   {
824     GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp);
825     session->global_id = tmp;
826     GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
827     session->global_id = tmp;
828   }
829 }
830
831
832 /**
833  * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
834  * the correct signature to be used with e.g. qsort.
835  * We use this function instead.
836  *
837  * @param h1 some hash code
838  * @param h2 some hash code
839  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
840  */
841 static int
842 hash_cmp (const void *h1, const void *h2)
843 {
844   return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
845 }
846
847
848 /**
849  * Create the sorted list of peers for the session,
850  * add the local peer if not in the join message.
851  */
852 static void
853 initialize_session_peer_list (struct ConsensusSession *session,
854                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
855 {
856   unsigned int local_peer_in_list;
857   uint32_t listed_peers;
858   const struct GNUNET_PeerIdentity *msg_peers;
859   struct GNUNET_PeerIdentity *peers;
860   unsigned int i;
861
862   GNUNET_assert (NULL != join_msg);
863
864   /* peers in the join message, may or may not include the local peer */
865   listed_peers = ntohl (join_msg->num_peers);
866   
867   session->num_peers = listed_peers;
868
869   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
870
871   local_peer_in_list = GNUNET_NO;
872   for (i = 0; i < listed_peers; i++)
873   {
874     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
875     {
876       local_peer_in_list = GNUNET_YES;
877       break;
878     }
879   }
880
881   if (GNUNET_NO == local_peer_in_list)
882     session->num_peers++;
883
884   peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
885
886   if (GNUNET_NO == local_peer_in_list)
887     peers[session->num_peers - 1] = my_peer;
888
889   memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
890   qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
891
892   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
893
894   for (i = 0; i < session->num_peers; ++i)
895   {
896     /* initialize back-references, so consensus peer information can
897      * be used as closure */
898     session->info[i].session = session;
899     session->info[i].peer_id = peers[i];
900   }
901
902   GNUNET_free (peers);
903 }
904
905
906 /**
907  * Called when another peer wants to do a set operation with the
908  * local peer.
909  *
910  * @param cls closure
911  * @param other_peer the other peer
912  * @param context_msg message with application specific information from
913  *        the other peer
914  * @param request request from the other peer, use GNUNET_SET_accept
915  *        to accept it, otherwise the request will be refused
916  *        Note that we don't use a return value here, as it is also
917  *        necessary to specify the set we want to do the operation with,
918  *        whith sometimes can be derived from the context message.
919  *        Also necessary to specify the timeout.
920  */
921 static void
922 set_listen_cb (void *cls,
923                const struct GNUNET_PeerIdentity *other_peer,
924                const struct GNUNET_MessageHeader *context_msg,
925                struct GNUNET_SET_Request *request)
926 {
927   struct ConsensusSession *session = cls;
928   struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
929   struct ConsensusPeerInformation *cpi;
930   struct GNUNET_SET_OperationHandle *set_op;
931   struct RoundInfo round_info;
932   int index;
933   int cmp;
934
935   if (NULL == context_msg)
936   {
937     GNUNET_break_op (0);
938     return;
939   }
940
941   index = get_peer_idx (other_peer, session);
942
943   if (index < 0)
944   {
945     GNUNET_break_op (0);
946     return;
947   }
948
949   round_info.round = ntohl (msg->round);
950   round_info.exp_round = ntohl (msg->exp_round);
951   round_info.exp_subround = ntohl (msg->exp_subround);
952
953   cpi = &session->info[index];
954
955   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
956
957   switch (session->current_round)
958   {
959     case CONSENSUS_ROUND_BEGIN:
960       /* we're in the begin round, so requests for the exchange round may
961        * come in, they will be delayed for now! */
962     case CONSENSUS_ROUND_EXCHANGE:
963       cmp = rounds_compare (session, &round_info);
964       if (cmp > 0)
965       {
966         /* the other peer is too late */
967         GNUNET_break_op (0);
968         return;
969       }
970       /* kill old request, if any. this is legal,
971        * as the other peer would not make a new request if it would want to
972        * complete the old one! */
973       if (NULL != cpi->set_op)
974       {
975         GNUNET_SET_operation_cancel (cpi->set_op);
976         cpi->set_op = NULL;
977       }
978       set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
979                                        set_result_cb, &session->info[index]);
980       if (cmp == 0)
981       {
982         cpi->set_op = set_op;
983         GNUNET_SET_commit (set_op, session->element_set);
984         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
985       }
986       else
987       {
988         /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */
989         cpi->delayed_set_op = set_op;
990         cpi->delayed_round_info = round_info;
991         cpi->exp_subround_finished = GNUNET_YES;
992         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
993       }
994       break;
995     default:
996       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
997                   session->local_peer_idx, session->current_round, index);
998       GNUNET_break_op (0);
999       return;
1000   }
1001 }
1002
1003
1004 /**
1005  * Initialize the session, continue receiving messages from the owning client
1006  *
1007  * @param session the session to initialize
1008  * @param join_msg the join message from the client
1009  */
1010 static void
1011 initialize_session (struct ConsensusSession *session,
1012                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
1013 {
1014   struct ConsensusSession *other_session;
1015
1016   initialize_session_peer_list (session, join_msg);
1017   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
1018   compute_global_id (session, &join_msg->session_id);
1019
1020   /* check if some local client already owns the session.
1021    * it is only legal to have a session with an existing global id
1022    * if all other sessions with this global id are finished.*/
1023   other_session = sessions_head;
1024   while (NULL != other_session)
1025   {
1026     if ((other_session != session) && 
1027         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1028     {
1029       if (CONSENSUS_ROUND_FINISH != other_session->current_round)
1030       {
1031         GNUNET_break (0);
1032         destroy_session (session);
1033         return;
1034       }
1035       break;
1036     }
1037     other_session = other_session->next;
1038   }
1039
1040   session->local_peer_idx = get_peer_idx (&my_peer, session);
1041   GNUNET_assert (-1 != session->local_peer_idx);
1042   session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1043   GNUNET_assert (NULL != session->element_set);
1044   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
1045                                              &session->global_id,
1046                                              set_listen_cb, session);
1047   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
1048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1049 }
1050
1051
1052 static struct ConsensusSession *
1053 get_session_by_client (struct GNUNET_SERVER_Client *client)
1054 {
1055   struct ConsensusSession *session;
1056
1057   session = sessions_head;
1058   while (NULL != session)
1059   {
1060     if (session->client == client)
1061       return session;
1062     session = session->next;
1063   }
1064   return NULL;
1065 }
1066
1067
1068 /**
1069  * Called when a client wants to join a consensus session.
1070  *
1071  * @param cls unused
1072  * @param client client that sent the message
1073  * @param m message sent by the client
1074  */
1075 static void
1076 client_join (void *cls,
1077              struct GNUNET_SERVER_Client *client,
1078              const struct GNUNET_MessageHeader *m)
1079 {
1080   struct ConsensusSession *session;
1081
1082   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
1083
1084   session = get_session_by_client (client);
1085   if (NULL != session)
1086   {
1087     GNUNET_break (0);
1088     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1089     return;
1090   }
1091   session = GNUNET_new (struct ConsensusSession);
1092   session->client = client;
1093   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
1094   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1095   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
1096   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1097
1098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
1099 }
1100
1101
1102 /**
1103  * Called when a client performs an insert operation.
1104  *
1105  * @param cls (unused)
1106  * @param client client handle
1107  * @param m message sent by the client
1108  */
1109 void
1110 client_insert (void *cls,
1111                struct GNUNET_SERVER_Client *client,
1112                const struct GNUNET_MessageHeader *m)
1113 {
1114   struct ConsensusSession *session;
1115   struct GNUNET_CONSENSUS_ElementMessage *msg;
1116   struct GNUNET_SET_Element *element;
1117   ssize_t element_size;
1118
1119   session = get_session_by_client (client);
1120
1121   if (NULL == session)
1122   {
1123     GNUNET_break (0);
1124     GNUNET_SERVER_client_disconnect (client);
1125     return;
1126   }
1127
1128   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1129   {
1130     GNUNET_break (0);
1131     GNUNET_SERVER_client_disconnect (client);
1132     return;
1133   }
1134
1135   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1136   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1137   if (element_size < 0)
1138   {
1139     GNUNET_break (0);
1140     return;
1141   }
1142
1143   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
1144   element->type = msg->element_type;
1145   element->size = element_size;
1146   memcpy (&element[1], &msg[1], element_size);
1147   element->data = &element[1];
1148   GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
1149   GNUNET_free (element);
1150   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1151
1152   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
1153 }
1154
1155
1156 /**
1157  * Called when a client performs the conclude operation.
1158  *
1159  * @param cls (unused)
1160  * @param client client handle
1161  * @param message message sent by the client
1162  */
1163 static void
1164 client_conclude (void *cls,
1165                  struct GNUNET_SERVER_Client *client,
1166                  const struct GNUNET_MessageHeader *message)
1167 {
1168   struct ConsensusSession *session;
1169   struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
1170
1171
1172   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
1173   cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
1174   session = get_session_by_client (client);
1175   if (NULL == session)
1176   {
1177     /* client not found */
1178     GNUNET_break (0);
1179     GNUNET_SERVER_client_disconnect (client);
1180     return;
1181   }
1182   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1183   {
1184     /* client requested conclude twice */
1185     GNUNET_break (0);
1186     return;
1187   }
1188   if (session->num_peers <= 1)
1189   {
1190     session->current_round = CONSENSUS_ROUND_FINISH;
1191     GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
1192   }
1193   else
1194   {
1195     session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
1196     /* the 'begin' round is over, start with the next, actual round */
1197     round_over (session, NULL);
1198   }
1199
1200   GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
1201   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1202 }
1203
1204
1205 /**
1206  * Called to clean up, after a shutdown has been requested.
1207  *
1208  * @param cls closure
1209  * @param tc context information (why was this task triggered now)
1210  */
1211 static void
1212 shutdown_task (void *cls,
1213                const struct GNUNET_SCHEDULER_TaskContext *tc)
1214 {
1215   while (NULL != sessions_head)
1216     destroy_session (sessions_head);
1217
1218   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1219 }
1220
1221
1222 /**
1223  * Clean up after a client after it is
1224  * disconnected (either by us or by itself)
1225  *
1226  * @param cls closure, unused
1227  * @param client the client to clean up after
1228  */
1229 void
1230 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1231 {
1232   struct ConsensusSession *session;
1233
1234   session = get_session_by_client (client);
1235   if (NULL == session)
1236     return;
1237   if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
1238       (CONSENSUS_ROUND_FINISH == session->current_round))
1239     destroy_session (session);
1240   else
1241     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1242 }
1243
1244
1245 /**
1246  * Start processing consensus requests.
1247  *
1248  * @param cls closure
1249  * @param server the initialized server
1250  * @param c configuration to use
1251  */
1252 static void
1253 run (void *cls, struct GNUNET_SERVER_Handle *server,
1254      const struct GNUNET_CONFIGURATION_Handle *c)
1255 {
1256   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1257     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1258         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
1259     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1260     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1261     {NULL, NULL, 0, 0}
1262   };
1263
1264   cfg = c;
1265   srv = server;
1266   if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &my_peer))
1267   {
1268     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
1269     GNUNET_break (0);
1270     GNUNET_SCHEDULER_shutdown ();
1271     return;
1272   }
1273   GNUNET_SERVER_add_handlers (server, server_handlers);
1274   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1275   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
1276   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1277 }
1278
1279
1280 /**
1281  * The main function for the consensus service.
1282  *
1283  * @param argc number of arguments from the command line
1284  * @param argv command line arguments
1285  * @return 0 ok, 1 on error
1286  */
1287 int
1288 main (int argc, char *const *argv)
1289 {
1290   int ret;
1291   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1292   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
1293   return (GNUNET_OK == ret) ? 0 : 1;
1294 }
1295