- delete unused message type
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_consensus_service.h"
33 #include "consensus_protocol.h"
34 #include "consensus.h"
35
36
37 /**
38  * Log macro that prefixes the local peer and the peer we are in contact with.
39  */
40 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
41    cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
42
43
44 /**
45  * Number of exponential rounds, used in the exp and completion round.
46  */
47 #define NUM_EXP_ROUNDS 4
48
49 /* forward declarations */
50
51 /* mutual recursion with struct ConsensusSession */
52 struct ConsensusPeerInformation;
53
54 /* mutual recursion with round_over */
55 static void
56 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
57
58
59 /**
60  * Describes the current round a consensus session is in.
61  */
62 enum ConsensusRound
63 {
64   /**
65    * Not started the protocol yet.
66    */
67   CONSENSUS_ROUND_BEGIN=0,
68   /**
69    * Distribution of elements with the exponential scheme.
70    */
71   CONSENSUS_ROUND_EXCHANGE,
72   /**
73    * Exchange which elements each peer has, but don't
74    * transmit the element's data, only their SHA-512 hashes.
75    * This round uses the all-to-all scheme.
76    */
77   CONSENSUS_ROUND_INVENTORY,
78   /**
79    * Collect and distribute missing values with the exponential scheme.
80    */
81   CONSENSUS_ROUND_COMPLETION,
82   /**
83    * Consensus concluded. After timeout and finished communication with client,
84    * consensus session will be destroyed.
85    */
86   CONSENSUS_ROUND_FINISH
87 };
88
89
90 /**
91  * Complete information about the current round and all
92  * subrounds.
93  */
94 struct RoundInfo
95 {
96   /**
97    * The current main round.
98    */
99   enum ConsensusRound round;
100   /**
101    * The current exp round, valid if
102    * the main round is an exp round.
103    */
104   uint32_t exp_round;
105   /**
106    * The current exp subround, valid if
107    * the main round is an exp round.
108    */
109   uint32_t exp_subround;
110 };
111
112
113 /**
114  * A consensus session consists of one local client and the remote authorities.
115  */
116 struct ConsensusSession
117 {
118   /**
119    * Consensus sessions are kept in a DLL.
120    */
121   struct ConsensusSession *next;
122
123   /**
124    * Consensus sessions are kept in a DLL.
125    */
126   struct ConsensusSession *prev;
127
128   /**
129   * Global consensus identification, computed
130   * from the session id and participating authorities.
131   */
132   struct GNUNET_HashCode global_id;
133
134   /**
135    * Client that inhabits the session
136    */
137   struct GNUNET_SERVER_Client *client;
138
139   /**
140    * Queued messages to the client.
141    */
142   struct GNUNET_MQ_Handle *client_mq;
143
144   /**
145    * Time when the conclusion of the consensus should begin.
146    */
147   struct GNUNET_TIME_Absolute conclude_start;
148
149   /**
150    * Timeout for all rounds together, single rounds will schedule a timeout task
151    * with a fraction of the conclude timeout.
152    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
153    */
154   struct GNUNET_TIME_Absolute conclude_deadline;
155
156   /**
157    * Timeout task identifier for the current round.
158    */
159   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
160
161   /**
162    * Number of other peers in the consensus.
163    */
164   unsigned int num_peers;
165
166   /**
167    * Information about the other peers,
168    * their state, etc.
169    */
170   struct ConsensusPeerInformation *info;
171
172   /**
173    * Index of the local peer in the peers array
174    */
175   unsigned int local_peer_idx;
176
177   /**
178    * Current round
179    */
180   enum ConsensusRound current_round;
181
182   /**
183    * Permutation of peers for the current round,
184    */
185   uint32_t *shuffle;
186
187   /**
188    * Inverse permutation of peers for the current round,
189    */
190   uint32_t *shuffle_inv;
191
192   /**
193    * Current round of the exponential scheme.
194    */
195   uint32_t exp_round;
196
197   /**
198    * Current sub-round of the exponential scheme.
199    */
200   uint32_t exp_subround;
201
202   /**
203    * The partner for the current exp-round
204    */
205   struct ConsensusPeerInformation *partner_outgoing;
206
207   /**
208    * The partner for the current exp-round
209    */
210   struct ConsensusPeerInformation *partner_incoming;
211
212   /**
213    * The consensus set of this session.
214    */
215   struct GNUNET_SET_Handle *element_set;
216
217   /**
218    * Listener for requests from other peers.
219    * Uses the session's global id as app id.
220    */
221   struct GNUNET_SET_ListenHandle *set_listener;
222 };
223
224
225 /**
226  * Information about a peer that is in a consensus session.
227  */
228 struct ConsensusPeerInformation
229 {
230   /**
231    * Peer identitty of the peer in the consensus session
232    */
233   struct GNUNET_PeerIdentity peer_id;
234
235   /**
236    * Back-reference to the consensus session,
237    * to that ConsensusPeerInformation can be used as a closure
238    */
239   struct ConsensusSession *session;
240
241   /**
242    * We have finishes the exp-subround with the peer.
243    */
244   int exp_subround_finished;
245
246   /**
247    * Set operation we are currently executing with this peer.
248    */
249   struct GNUNET_SET_OperationHandle *set_op;
250
251   /**
252    * Set operation we are planning on executing with this peer.
253    */
254   struct GNUNET_SET_OperationHandle *delayed_set_op;
255
256   /**
257    * Info about the round of the delayed set operation.
258    */
259   struct RoundInfo delayed_round_info;
260 };
261
262
263 /**
264  * Linked list of sessions this peer participates in.
265  */
266 static struct ConsensusSession *sessions_head;
267
268 /**
269  * Linked list of sessions this peer participates in.
270  */
271 static struct ConsensusSession *sessions_tail;
272
273 /**
274  * Configuration of the consensus service.
275  */
276 static const struct GNUNET_CONFIGURATION_Handle *cfg;
277
278 /**
279  * Handle to the server for this service.
280  */
281 static struct GNUNET_SERVER_Handle *srv;
282
283 /**
284  * Peer that runs this service.
285  */
286 static struct GNUNET_PeerIdentity my_peer;
287
288
289 static int
290 have_exp_subround_finished (const struct ConsensusSession *session)
291 {
292   int not_finished;
293   not_finished = 0;
294   if ( (NULL != session->partner_outgoing) &&
295        (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
296     not_finished++;
297   if ( (NULL != session->partner_incoming) &&
298        (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
299     not_finished++;
300   if (0 == not_finished)
301     return GNUNET_YES;
302   return GNUNET_NO;
303 }
304
305
306 /**
307  * Destroy a session, free all resources associated with it.
308  *
309  * @param session the session to destroy
310  */
311 static void
312 destroy_session (struct ConsensusSession *session)
313 {
314   int i;
315
316   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
317   if (NULL != session->element_set)
318   {
319     GNUNET_SET_destroy (session->element_set);
320     session->element_set = NULL;
321   }
322   if (NULL != session->set_listener)
323   {
324     GNUNET_SET_listen_cancel (session->set_listener);
325     session->set_listener = NULL;
326   }
327   if (NULL != session->client_mq)
328   {
329     GNUNET_MQ_destroy (session->client_mq);
330     session->client_mq = NULL;
331   }
332   if (NULL != session->client)
333   {
334     GNUNET_SERVER_client_disconnect (session->client);
335     session->client = NULL;
336   }
337   if (NULL != session->shuffle)
338   {
339     GNUNET_free (session->shuffle);
340     session->shuffle = NULL;
341   }
342   if (NULL != session->shuffle_inv)
343   {
344     GNUNET_free (session->shuffle_inv);
345     session->shuffle_inv = NULL;
346   }
347   if (NULL != session->info)
348   {
349     for (i = 0; i < session->num_peers; i++)
350     {
351       struct ConsensusPeerInformation *cpi;
352       cpi = &session->info[i];
353       if (NULL != cpi->set_op)
354       {
355         GNUNET_SET_operation_cancel (cpi->set_op);
356         cpi->set_op = NULL;
357       }
358     }
359     GNUNET_free (session->info);
360     session->info = NULL;
361   }
362   GNUNET_free (session);
363 }
364
365
366 /**
367  * Iterator for set elements.
368  *
369  * @param cls closure
370  * @param element the current element, NULL if all elements have been
371  *        iterated over
372  * @return GNUNET_YES to continue iterating, GNUNET_NO to stop.
373  */
374 static int
375 send_to_client_iter (void *cls,
376                      const struct GNUNET_SET_Element *element)
377 {
378   struct ConsensusSession *session = cls;
379   struct GNUNET_MQ_Envelope *ev;
380
381   if (NULL != element)
382   {
383     struct GNUNET_CONSENSUS_ElementMessage *m;
384
385     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n",
386                 session->local_peer_idx);
387
388     ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
389     m->element_type = htons (element->type);
390     memcpy (&m[1], element->data, element->size);
391     GNUNET_MQ_send (session->client_mq, ev);
392   }
393   else
394   {
395     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n",
396                 session->local_peer_idx);
397     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
398     GNUNET_MQ_send (session->client_mq, ev);
399   }
400   return GNUNET_YES;
401 }
402
403
404 /**
405  * Start the next round.
406  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
407  *
408  * @param cls the session
409  * @param tc task context, for when this task is invoked by the scheduler,
410  *           NULL if invoked for another reason
411  */
412 static void
413 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
414 {
415   struct ConsensusSession *session;
416
417   /* don't kick off next round if we're shutting down */
418   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
419     return;
420
421   session = cls;
422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
423
424   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
425   {
426     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
427     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
428   }
429
430   switch (session->current_round)
431   {
432     case CONSENSUS_ROUND_BEGIN:
433       session->current_round = CONSENSUS_ROUND_EXCHANGE;
434       session->exp_round = 0;
435       subround_over (session, NULL);
436       break;
437     case CONSENSUS_ROUND_EXCHANGE:
438       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
439                   session->local_peer_idx);
440       session->current_round = CONSENSUS_ROUND_FINISH;
441       GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
442       break;
443     default:
444       GNUNET_assert (0);
445   }
446 }
447
448
449 /**
450  * Create a new permutation for the session's peers in session->shuffle.
451  * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
452  * both the global session id and the current round index.
453  *
454  * @param session the session to create the new permutation for
455  */
456 static void
457 shuffle (struct ConsensusSession *session)
458 {
459   uint32_t i;
460   uint32_t randomness[session->num_peers-1];
461
462   if (NULL == session->shuffle)
463     session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
464   if (NULL == session->shuffle_inv)
465     session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
466
467   GNUNET_CRYPTO_kdf (randomness, sizeof (randomness),
468                      &session->exp_round, sizeof (uint32_t),
469                      &session->global_id, sizeof (struct GNUNET_HashCode),
470                      NULL);
471
472   for (i = 0; i < session->num_peers; i++)
473     session->shuffle[i] = i;
474
475   for (i = session->num_peers - 1; i > 0; i--)
476   {
477     uint32_t x;
478     uint32_t tmp;
479     x = randomness[i-1] % session->num_peers;
480     tmp = session->shuffle[x];
481     session->shuffle[x] = session->shuffle[i];
482     session->shuffle[i] = tmp;
483   }
484
485   /* create the inverse */
486   for (i = 0; i < session->num_peers; i++)
487     session->shuffle_inv[session->shuffle[i]] = i;
488 }
489
490
491 /**
492  * Find and set the partner_incoming and partner_outgoing of our peer,
493  * one of them may not exist (and thus set to NULL) if the number of peers
494  * in the session is not a power of two.
495  *
496  * @param session the consensus session
497  */
498 static void
499 find_partners (struct ConsensusSession *session)
500 {
501   unsigned int arc;
502   unsigned int num_ghosts;
503   unsigned int largest_arc;
504   int partner_idx;
505
506   /* shuffled local index */
507   int my_idx = session->shuffle[session->local_peer_idx];
508
509   /* distance to neighboring peer in current subround */
510   arc = 1 << session->exp_subround;
511   largest_arc = 1;
512   while (largest_arc < session->num_peers)
513     largest_arc <<= 1;
514   num_ghosts = largest_arc - session->num_peers;
515   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
516   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
517   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
518
519   if (0 == (my_idx & arc))
520   {
521     /* we are outgoing */
522     partner_idx = (my_idx + arc) % session->num_peers;
523     session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
524     session->partner_outgoing->exp_subround_finished = GNUNET_NO;
525     /* are we a 'ghost' of a peer that would exist if
526      * the number of peers was a power of two, and thus have to partner
527      * with an additional peer?
528      */
529     if (my_idx < num_ghosts)
530     {
531       int ghost_partner_idx;
532       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
533       ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
534       /* platform dependent; modulo sometimes returns negative values */
535       if (ghost_partner_idx < 0)
536         ghost_partner_idx += session->num_peers;
537       /* we only need to have a ghost partner if the partner is outgoing */
538       if (0 == (ghost_partner_idx & arc))
539       {
540         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
541         session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
542         session->partner_incoming->exp_subround_finished = GNUNET_NO;
543         return;
544       }
545     }
546     session->partner_incoming = NULL;
547     return;
548   }
549   /* we only have an incoming connection */
550   partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
551   if (partner_idx < 0)
552     partner_idx += session->num_peers;
553   session->partner_outgoing = NULL;
554   session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
555   session->partner_incoming->exp_subround_finished = GNUNET_NO;
556 }
557
558
559 /**
560  * Callback for set operation results. Called for each element
561  * in the result set.
562  *
563  * @param cls closure
564  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
565  * @param status see enum GNUNET_SET_Status
566  */
567 static void
568 set_result_cb (void *cls,
569                const struct GNUNET_SET_Element *element,
570                enum GNUNET_SET_Status status)
571 {
572   struct ConsensusPeerInformation *cpi = cls;
573   unsigned int remote_idx = cpi - cpi->session->info;
574   unsigned int local_idx = cpi->session->local_peer_idx;
575
576   GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
577                  (cpi == cpi->session->partner_incoming));
578
579   switch (status)
580   {
581     case GNUNET_SET_STATUS_OK:
582       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
583                   local_idx, remote_idx);
584       break;
585     case GNUNET_SET_STATUS_FAILURE:
586       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
587                   local_idx, remote_idx);
588       cpi->set_op = NULL;
589       return;
590     case GNUNET_SET_STATUS_HALF_DONE:
591     case GNUNET_SET_STATUS_DONE:
592       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
593                   local_idx, remote_idx);
594       cpi->exp_subround_finished = GNUNET_YES;
595       cpi->set_op = NULL;
596       if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
597       {
598         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
599                     local_idx);
600         subround_over (cpi->session, NULL);
601       }
602       else
603       {
604         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
605                     local_idx);
606       }
607       return;
608     default:
609       GNUNET_break (0);
610       return;
611   }
612
613   switch (cpi->session->current_round)
614   {
615     case CONSENSUS_ROUND_EXCHANGE:
616       GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
617       break;
618     default:
619       GNUNET_break (0);
620       return;
621   }
622 }
623
624
625 /**
626  * Compare the round the session is in with the round of the given context message.
627  *
628  * @param session a consensus session
629  * @param ri a round context message
630  * @return 0 if it's the same round, -1 if the session is in an earlier round,
631  *         1 if the session is in a later round
632  */
633 static int
634 rounds_compare (struct ConsensusSession *session,
635                 struct RoundInfo* ri)
636 {
637   if (session->current_round < ri->round)
638     return -1;
639   if (session->current_round > ri->round)
640     return 1;
641   if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
642   {
643     if (session->exp_round < ri->exp_round)
644       return -1;
645     if (session->exp_round > ri->exp_round)
646       return 1;
647     if (session->exp_subround < ri->exp_subround)
648       return -1;
649     if (session->exp_subround < ri->exp_subround)
650       return 1;
651     return 0;
652   }
653   /* comparing rounds when we are not in a exp round */
654   GNUNET_assert (0);
655 }
656
657
658 /**
659  * Do the next subround in the exp-scheme.
660  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
661  *
662  * @param cls the session
663  * @param tc task context, for when this task is invoked by the scheduler,
664  *           NULL if invoked for another reason
665  */
666 static void
667 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
668 {
669   struct ConsensusSession *session;
670   int i;
671
672   /* don't kick off next subround if we're shutting down */
673   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
674     return;
675   session = cls;
676   /* cancel timeout */
677   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
678   {
679     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
680     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
681   }
682
683   if (session->exp_round >= NUM_EXP_ROUNDS)
684   {
685     round_over (session, NULL);
686     return;
687   }
688
689   if (session->exp_round == 0)
690   {
691     /* initialize everything for the log-rounds */
692     session->exp_round = 1;
693     session->exp_subround = 0;
694     if (NULL == session->shuffle)
695       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
696     if (NULL == session->shuffle_inv)
697       session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
698     for (i = 0; i < session->num_peers; i++)
699       session->shuffle[i] = session->shuffle_inv[i] = i;
700   }
701   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
702   {
703     /* subrounds done, start new log-round */
704     session->exp_round++;
705     session->exp_subround = 0;
706     shuffle (session);
707   }
708   else
709   {
710     session->exp_subround++;
711   }
712
713   /* determine the incoming and outgoing partner */
714   find_partners (session);
715
716   GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
717   GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
718
719   /* initiate set operation with the outgoing partner */
720   if (NULL != session->partner_outgoing)
721   {
722     struct GNUNET_CONSENSUS_RoundContextMessage *msg;
723     msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
724     msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
725     msg->header.size = htons (sizeof *msg);
726     msg->round = htonl (session->current_round);
727     msg->exp_round = htonl (session->exp_round);
728     msg->exp_subround = htonl (session->exp_subround);
729
730     if (NULL != session->partner_outgoing->set_op)
731     {
732       GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
733     }
734     session->partner_outgoing->set_op =
735         GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
736                             &session->global_id,
737                             (struct GNUNET_MessageHeader *) msg,
738                             0, /* FIXME: salt */
739                             GNUNET_SET_RESULT_ADDED,
740                             set_result_cb, session->partner_outgoing);
741     GNUNET_free (msg);
742     GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set);
743   }
744
745   /* commit to the delayed set operation */
746   if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
747   {
748     int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
749
750     if (NULL != session->partner_incoming->set_op)
751     {
752       GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
753       session->partner_incoming->set_op = NULL;
754     }
755     if (cmp == 0)
756     {
757       GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set);
758       session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
759       session->partner_incoming->delayed_set_op = NULL;
760       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
761                   session->local_peer_idx, (int) (session->partner_incoming - session->info));
762     }
763     else
764     {
765       /* this should not happen -- a round has been skipped! */
766       GNUNET_break_op (0);
767     }
768   }
769
770 #ifdef GNUNET_EXTRA_LOGGING
771   {
772     int in;
773     int out;
774     if (session->partner_outgoing == NULL)
775       out = -1;
776     else
777       out = (int) (session->partner_outgoing - session->info);
778     if (session->partner_incoming == NULL)
779       in = -1;
780     else
781       in = (int) (session->partner_incoming - session->info);
782     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
783                 session->exp_round, session->exp_subround, in, out);
784   }
785 #endif /* GNUNET_EXTRA_LOGGING */
786
787 }
788
789
790 /**
791  * Search peer in the list of peers in session.
792  *
793  * @param peer peer to find
794  * @param session session with peer
795  * @return index of peer, -1 if peer is not in session
796  */
797 static int
798 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
799 {
800   int i;
801   for (i = 0; i < session->num_peers; i++)
802     if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
803       return i;
804   return -1;
805 }
806
807
808 /**
809  * Compute a global, (hopefully) unique consensus session id,
810  * from the local id of the consensus session, and the identities of all participants.
811  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
812  * exactly the same peers, the global id will be different.
813  *
814  * @param session session to generate the global id for
815  * @param session_id local id of the consensus session
816  */
817 static void
818 compute_global_id (struct ConsensusSession *session,
819                    const struct GNUNET_HashCode *session_id)
820 {
821   int i;
822   struct GNUNET_HashCode tmp;
823   struct GNUNET_HashCode phash;
824
825   /* FIXME: use kdf? */
826
827   session->global_id = *session_id;
828   for (i = 0; i < session->num_peers; ++i)
829   {
830     GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash);
831     GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp);
832     session->global_id = tmp;
833     GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
834     session->global_id = tmp;
835   }
836 }
837
838
839 /**
840  * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
841  * the correct signature to be used with e.g. qsort.
842  * We use this function instead.
843  *
844  * @param h1 some hash code
845  * @param h2 some hash code
846  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
847  */
848 static int
849 hash_cmp (const void *h1, const void *h2)
850 {
851   return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
852 }
853
854
855 /**
856  * Create the sorted list of peers for the session,
857  * add the local peer if not in the join message.
858  */
859 static void
860 initialize_session_peer_list (struct ConsensusSession *session,
861                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
862 {
863   unsigned int local_peer_in_list;
864   uint32_t listed_peers;
865   const struct GNUNET_PeerIdentity *msg_peers;
866   struct GNUNET_PeerIdentity *peers;
867   unsigned int i;
868
869   GNUNET_assert (NULL != join_msg);
870
871   /* peers in the join message, may or may not include the local peer */
872   listed_peers = ntohl (join_msg->num_peers);
873
874   session->num_peers = listed_peers;
875
876   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
877
878   local_peer_in_list = GNUNET_NO;
879   for (i = 0; i < listed_peers; i++)
880   {
881     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
882     {
883       local_peer_in_list = GNUNET_YES;
884       break;
885     }
886   }
887
888   if (GNUNET_NO == local_peer_in_list)
889     session->num_peers++;
890
891   peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
892
893   if (GNUNET_NO == local_peer_in_list)
894     peers[session->num_peers - 1] = my_peer;
895
896   memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
897   qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
898
899   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
900
901   for (i = 0; i < session->num_peers; ++i)
902   {
903     /* initialize back-references, so consensus peer information can
904      * be used as closure */
905     session->info[i].session = session;
906     session->info[i].peer_id = peers[i];
907   }
908
909   GNUNET_free (peers);
910 }
911
912
913 /**
914  * Called when another peer wants to do a set operation with the
915  * local peer.
916  *
917  * @param cls closure
918  * @param other_peer the other peer
919  * @param context_msg message with application specific information from
920  *        the other peer
921  * @param request request from the other peer, use GNUNET_SET_accept
922  *        to accept it, otherwise the request will be refused
923  *        Note that we don't use a return value here, as it is also
924  *        necessary to specify the set we want to do the operation with,
925  *        whith sometimes can be derived from the context message.
926  *        Also necessary to specify the timeout.
927  */
928 static void
929 set_listen_cb (void *cls,
930                const struct GNUNET_PeerIdentity *other_peer,
931                const struct GNUNET_MessageHeader *context_msg,
932                struct GNUNET_SET_Request *request)
933 {
934   struct ConsensusSession *session = cls;
935   struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
936   struct ConsensusPeerInformation *cpi;
937   struct GNUNET_SET_OperationHandle *set_op;
938   struct RoundInfo round_info;
939   int index;
940   int cmp;
941
942   if (NULL == context_msg)
943   {
944     GNUNET_break_op (0);
945     return;
946   }
947
948   index = get_peer_idx (other_peer, session);
949
950   if (index < 0)
951   {
952     GNUNET_break_op (0);
953     return;
954   }
955
956   round_info.round = ntohl (msg->round);
957   round_info.exp_round = ntohl (msg->exp_round);
958   round_info.exp_subround = ntohl (msg->exp_subround);
959
960   cpi = &session->info[index];
961
962   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
963
964   switch (session->current_round)
965   {
966     case CONSENSUS_ROUND_BEGIN:
967       /* we're in the begin round, so requests for the exchange round may
968        * come in, they will be delayed for now! */
969     case CONSENSUS_ROUND_EXCHANGE:
970       cmp = rounds_compare (session, &round_info);
971       if (cmp > 0)
972       {
973         /* the other peer is too late */
974         GNUNET_break_op (0);
975         return;
976       }
977       /* kill old request, if any. this is legal,
978        * as the other peer would not make a new request if it would want to
979        * complete the old one! */
980       if (NULL != cpi->set_op)
981       {
982         GNUNET_SET_operation_cancel (cpi->set_op);
983         cpi->set_op = NULL;
984       }
985       set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
986                                        set_result_cb, &session->info[index]);
987       if (cmp == 0)
988       {
989         cpi->set_op = set_op;
990         GNUNET_SET_commit (set_op, session->element_set);
991         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
992       }
993       else
994       {
995         /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */
996         cpi->delayed_set_op = set_op;
997         cpi->delayed_round_info = round_info;
998         cpi->exp_subround_finished = GNUNET_YES;
999         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
1000       }
1001       break;
1002     default:
1003       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
1004                   session->local_peer_idx, session->current_round, index);
1005       GNUNET_break_op (0);
1006       return;
1007   }
1008 }
1009
1010
1011 /**
1012  * Initialize the session, continue receiving messages from the owning client
1013  *
1014  * @param session the session to initialize
1015  * @param join_msg the join message from the client
1016  */
1017 static void
1018 initialize_session (struct ConsensusSession *session,
1019                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
1020 {
1021   struct ConsensusSession *other_session;
1022
1023   initialize_session_peer_list (session, join_msg);
1024   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
1025   compute_global_id (session, &join_msg->session_id);
1026
1027   /* check if some local client already owns the session.
1028    * it is only legal to have a session with an existing global id
1029    * if all other sessions with this global id are finished.*/
1030   other_session = sessions_head;
1031   while (NULL != other_session)
1032   {
1033     if ((other_session != session) &&
1034         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1035     {
1036       if (CONSENSUS_ROUND_FINISH != other_session->current_round)
1037       {
1038         GNUNET_break (0);
1039         destroy_session (session);
1040         return;
1041       }
1042       break;
1043     }
1044     other_session = other_session->next;
1045   }
1046
1047   session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
1048   session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
1049
1050   session->local_peer_idx = get_peer_idx (&my_peer, session);
1051   GNUNET_assert (-1 != session->local_peer_idx);
1052   session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1053   GNUNET_assert (NULL != session->element_set);
1054   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
1055                                              &session->global_id,
1056                                              set_listen_cb, session);
1057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1059 }
1060
1061
1062 static struct ConsensusSession *
1063 get_session_by_client (struct GNUNET_SERVER_Client *client)
1064 {
1065   struct ConsensusSession *session;
1066
1067   session = sessions_head;
1068   while (NULL != session)
1069   {
1070     if (session->client == client)
1071       return session;
1072     session = session->next;
1073   }
1074   return NULL;
1075 }
1076
1077
1078 /**
1079  * Called when a client wants to join a consensus session.
1080  *
1081  * @param cls unused
1082  * @param client client that sent the message
1083  * @param m message sent by the client
1084  */
1085 static void
1086 client_join (void *cls,
1087              struct GNUNET_SERVER_Client *client,
1088              const struct GNUNET_MessageHeader *m)
1089 {
1090   struct ConsensusSession *session;
1091
1092   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
1093
1094   session = get_session_by_client (client);
1095   if (NULL != session)
1096   {
1097     GNUNET_break (0);
1098     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1099     return;
1100   }
1101   session = GNUNET_new (struct ConsensusSession);
1102   session->client = client;
1103   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
1104   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1105   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
1106   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1107
1108   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
1109 }
1110
1111
1112 /**
1113  * Called when a client performs an insert operation.
1114  *
1115  * @param cls (unused)
1116  * @param client client handle
1117  * @param m message sent by the client
1118  */
1119 void
1120 client_insert (void *cls,
1121                struct GNUNET_SERVER_Client *client,
1122                const struct GNUNET_MessageHeader *m)
1123 {
1124   struct ConsensusSession *session;
1125   struct GNUNET_CONSENSUS_ElementMessage *msg;
1126   struct GNUNET_SET_Element *element;
1127   ssize_t element_size;
1128
1129   session = get_session_by_client (client);
1130
1131   if (NULL == session)
1132   {
1133     GNUNET_break (0);
1134     GNUNET_SERVER_client_disconnect (client);
1135     return;
1136   }
1137
1138   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1139   {
1140     GNUNET_break (0);
1141     GNUNET_SERVER_client_disconnect (client);
1142     return;
1143   }
1144
1145   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1146   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1147   if (element_size < 0)
1148   {
1149     GNUNET_break (0);
1150     return;
1151   }
1152
1153   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
1154   element->type = msg->element_type;
1155   element->size = element_size;
1156   memcpy (&element[1], &msg[1], element_size);
1157   element->data = &element[1];
1158   GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
1159   GNUNET_free (element);
1160   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1161
1162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
1163 }
1164
1165
1166 /**
1167  * Called when a client performs the conclude operation.
1168  *
1169  * @param cls (unused)
1170  * @param client client handle
1171  * @param message message sent by the client
1172  */
1173 static void
1174 client_conclude (void *cls,
1175                  struct GNUNET_SERVER_Client *client,
1176                  const struct GNUNET_MessageHeader *message)
1177 {
1178   struct ConsensusSession *session;
1179
1180   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
1181   session = get_session_by_client (client);
1182   if (NULL == session)
1183   {
1184     /* client not found */
1185     GNUNET_break (0);
1186     GNUNET_SERVER_client_disconnect (client);
1187     return;
1188   }
1189   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1190   {
1191     /* client requested conclude twice */
1192     GNUNET_break (0);
1193     return;
1194   }
1195   if (session->num_peers <= 1)
1196   {
1197     session->current_round = CONSENSUS_ROUND_FINISH;
1198     GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
1199   }
1200   else
1201   {
1202     /* the 'begin' round is over, start with the next, actual round */
1203     round_over (session, NULL);
1204   }
1205
1206   GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
1207   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1208 }
1209
1210
1211 /**
1212  * Called to clean up, after a shutdown has been requested.
1213  *
1214  * @param cls closure
1215  * @param tc context information (why was this task triggered now)
1216  */
1217 static void
1218 shutdown_task (void *cls,
1219                const struct GNUNET_SCHEDULER_TaskContext *tc)
1220 {
1221   while (NULL != sessions_head)
1222     destroy_session (sessions_head);
1223
1224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1225 }
1226
1227
1228 /**
1229  * Clean up after a client after it is
1230  * disconnected (either by us or by itself)
1231  *
1232  * @param cls closure, unused
1233  * @param client the client to clean up after
1234  */
1235 void
1236 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1237 {
1238   struct ConsensusSession *session;
1239
1240   session = get_session_by_client (client);
1241   if (NULL == session)
1242     return;
1243   if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
1244       (CONSENSUS_ROUND_FINISH == session->current_round))
1245     destroy_session (session);
1246   else
1247     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1248 }
1249
1250
1251 /**
1252  * Start processing consensus requests.
1253  *
1254  * @param cls closure
1255  * @param server the initialized server
1256  * @param c configuration to use
1257  */
1258 static void
1259 run (void *cls, struct GNUNET_SERVER_Handle *server,
1260      const struct GNUNET_CONFIGURATION_Handle *c)
1261 {
1262   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1263     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1264         sizeof (struct GNUNET_MessageHeader)},
1265     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1266     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1267     {NULL, NULL, 0, 0}
1268   };
1269
1270   cfg = c;
1271   srv = server;
1272   if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
1273   {
1274     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
1275     GNUNET_break (0);
1276     GNUNET_SCHEDULER_shutdown ();
1277     return;
1278   }
1279   GNUNET_SERVER_add_handlers (server, server_handlers);
1280   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1281   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
1282   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1283 }
1284
1285
1286 /**
1287  * The main function for the consensus service.
1288  *
1289  * @param argc number of arguments from the command line
1290  * @param argv command line arguments
1291  * @return 0 ok, 1 on error
1292  */
1293 int
1294 main (int argc, char *const *argv)
1295 {
1296   int ret;
1297   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1298   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
1299   return (GNUNET_OK == ret) ? 0 : 1;
1300 }
1301