- fix "broken connection" notifications
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_consensus_service.h"
33 #include "consensus_protocol.h"
34 #include "consensus.h"
35
36
37 /**
38  * Log macro that prefixes the local peer and the peer we are in contact with.
39  */
40 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
41    cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
42
43
44 /**
45  * Number of exponential rounds, used in the exp and completion round.
46  */
47 #define NUM_EXP_ROUNDS 4
48
49 /* forward declarations */
50
51 /* mutual recursion with struct ConsensusSession */
52 struct ConsensusPeerInformation;
53
54 /* mutual recursion with round_over */
55 static void
56 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
57
58
59 /**
60  * Describes the current round a consensus session is in.
61  */
62 enum ConsensusRound
63 {
64   /**
65    * Not started the protocol yet.
66    */
67   CONSENSUS_ROUND_BEGIN=0,
68   /**
69    * Distribution of elements with the exponential scheme.
70    */
71   CONSENSUS_ROUND_EXCHANGE,
72   /**
73    * Exchange which elements each peer has, but don't
74    * transmit the element's data, only their SHA-512 hashes.
75    * This round uses the all-to-all scheme.
76    */
77   CONSENSUS_ROUND_INVENTORY,
78   /**
79    * Collect and distribute missing values with the exponential scheme.
80    */
81   CONSENSUS_ROUND_COMPLETION,
82   /**
83    * Consensus concluded. After timeout and finished communication with client,
84    * consensus session will be destroyed.
85    */
86   CONSENSUS_ROUND_FINISH
87 };
88
89
90 /**
91  * Complete information about the current round and all
92  * subrounds.
93  */
94 struct RoundInfo
95 {
96   /**
97    * The current main round.
98    */
99   enum ConsensusRound round;
100   /**
101    * The current exp round, valid if
102    * the main round is an exp round.
103    */
104   uint32_t exp_round;
105   /**
106    * The current exp subround, valid if
107    * the main round is an exp round.
108    */
109   uint32_t exp_subround;
110 };
111
112
113 /**
114  * A consensus session consists of one local client and the remote authorities.
115  */
116 struct ConsensusSession
117 {
118   /**
119    * Consensus sessions are kept in a DLL.
120    */
121   struct ConsensusSession *next;
122
123   /**
124    * Consensus sessions are kept in a DLL.
125    */
126   struct ConsensusSession *prev;
127
128   /**
129   * Global consensus identification, computed
130   * from the session id and participating authorities.
131   */
132   struct GNUNET_HashCode global_id;
133
134   /**
135    * Client that inhabits the session
136    */
137   struct GNUNET_SERVER_Client *client;
138
139   /**
140    * Queued messages to the client.
141    */
142   struct GNUNET_MQ_Handle *client_mq;
143
144   /**
145    * Timeout for all rounds together, single rounds will schedule a timeout task
146    * with a fraction of the conclude timeout.
147    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
148    */
149   struct GNUNET_TIME_Relative conclude_timeout;
150
151   /**
152    * Timeout task identifier for the current round.
153    */
154   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
155
156   /**
157    * Number of other peers in the consensus.
158    */
159   unsigned int num_peers;
160
161   /**
162    * Information about the other peers,
163    * their state, etc.
164    */
165   struct ConsensusPeerInformation *info;
166
167   /**
168    * Index of the local peer in the peers array
169    */
170   unsigned int local_peer_idx;
171
172   /**
173    * Current round
174    */
175   enum ConsensusRound current_round;
176
177   /**
178    * Permutation of peers for the current round,
179    */
180   uint32_t *shuffle;
181
182   /**
183    * Inverse permutation of peers for the current round,
184    */
185   uint32_t *shuffle_inv;
186
187   /**
188    * Current round of the exponential scheme.
189    */
190   uint32_t exp_round;
191
192   /**
193    * Current sub-round of the exponential scheme.
194    */
195   uint32_t exp_subround;
196
197   /**
198    * The partner for the current exp-round
199    */
200   struct ConsensusPeerInformation *partner_outgoing;
201
202   /**
203    * The partner for the current exp-round
204    */
205   struct ConsensusPeerInformation *partner_incoming;
206
207   /**
208    * The consensus set of this session.
209    */
210   struct GNUNET_SET_Handle *element_set;
211
212   /**
213    * Listener for requests from other peers.
214    * Uses the session's global id as app id.
215    */
216   struct GNUNET_SET_ListenHandle *set_listener;
217 };
218
219
220 /**
221  * Information about a peer that is in a consensus session.
222  */
223 struct ConsensusPeerInformation
224 {
225   /**
226    * Peer identitty of the peer in the consensus session
227    */
228   struct GNUNET_PeerIdentity peer_id;
229
230   /**
231    * Back-reference to the consensus session,
232    * to that ConsensusPeerInformation can be used as a closure
233    */
234   struct ConsensusSession *session;
235
236   /**
237    * We have finishes the exp-subround with the peer.
238    */
239   int exp_subround_finished;
240
241   /**
242    * Set operation we are currently executing with this peer.
243    */
244   struct GNUNET_SET_OperationHandle *set_op;
245
246   /**
247    * Set operation we are planning on executing with this peer.
248    */
249   struct GNUNET_SET_OperationHandle *delayed_set_op;
250
251   /**
252    * Info about the round of the delayed set operation.
253    */
254   struct RoundInfo delayed_round_info;
255 };
256
257
258 /**
259  * Linked list of sessions this peer participates in.
260  */
261 static struct ConsensusSession *sessions_head;
262
263 /**
264  * Linked list of sessions this peer participates in.
265  */
266 static struct ConsensusSession *sessions_tail;
267
268 /**
269  * Configuration of the consensus service.
270  */
271 static const struct GNUNET_CONFIGURATION_Handle *cfg;
272
273 /**
274  * Handle to the server for this service.
275  */
276 static struct GNUNET_SERVER_Handle *srv;
277
278 /**
279  * Peer that runs this service.
280  */
281 static struct GNUNET_PeerIdentity my_peer;
282
283
284 static int
285 have_exp_subround_finished (const struct ConsensusSession *session)
286 {
287   int not_finished;
288   not_finished = 0;
289   if ( (NULL != session->partner_outgoing) &&
290        (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
291     not_finished++;
292   if ( (NULL != session->partner_incoming) &&
293        (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
294     not_finished++;
295   if (0 == not_finished)
296     return GNUNET_YES;
297   return GNUNET_NO;
298 }
299
300
301 /**
302  * Destroy a session, free all resources associated with it.
303  *
304  * @param session the session to destroy
305  */
306 static void
307 destroy_session (struct ConsensusSession *session)
308 {
309   int i;
310
311   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
312   if (NULL != session->element_set)
313   {
314     GNUNET_SET_destroy (session->element_set);
315     session->element_set = NULL;
316   }
317   if (NULL != session->set_listener)
318   {
319     GNUNET_SET_listen_cancel (session->set_listener);
320     session->set_listener = NULL;
321   }
322   if (NULL != session->client_mq)
323   {
324     GNUNET_MQ_destroy (session->client_mq);
325     session->client_mq = NULL;
326   }
327   if (NULL != session->client)
328   {
329     GNUNET_SERVER_client_disconnect (session->client);
330     session->client = NULL;
331   }
332   if (NULL != session->shuffle)
333   {
334     GNUNET_free (session->shuffle);
335     session->shuffle = NULL;
336   }
337   if (NULL != session->shuffle_inv)
338   {
339     GNUNET_free (session->shuffle_inv);
340     session->shuffle_inv = NULL;
341   }
342   if (NULL != session->info)
343   {
344     for (i = 0; i < session->num_peers; i++)
345     {
346       struct ConsensusPeerInformation *cpi;
347       cpi = &session->info[i];
348       if (NULL != cpi->set_op)
349       {
350         GNUNET_SET_operation_cancel (cpi->set_op);
351         cpi->set_op = NULL;
352       }
353     }
354     GNUNET_free (session->info);
355     session->info = NULL;
356   }
357   GNUNET_free (session);
358 }
359
360
361 /**
362  * Iterator for set elements.
363  *
364  * @param cls closure
365  * @param element the current element, NULL if all elements have been
366  *        iterated over
367  * @return GNUNET_YES to continue iterating, GNUNET_NO to stop.
368  */
369 static int
370 send_to_client_iter (void *cls,
371                      const struct GNUNET_SET_Element *element)
372 {
373   struct ConsensusSession *session = cls;
374   struct GNUNET_MQ_Envelope *ev;
375
376   if (NULL != element)
377   {
378     struct GNUNET_CONSENSUS_ElementMessage *m;
379
380     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n",
381                 session->local_peer_idx);
382
383     ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
384     m->element_type = htons (element->type);
385     memcpy (&m[1], element->data, element->size);
386     GNUNET_MQ_send (session->client_mq, ev);
387   }
388   else
389   {
390     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n",
391                 session->local_peer_idx);
392     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
393     GNUNET_MQ_send (session->client_mq, ev);
394   }
395   return GNUNET_YES;
396 }
397
398
399 /**
400  * Start the next round.
401  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
402  *
403  * @param cls the session
404  * @param tc task context, for when this task is invoked by the scheduler,
405  *           NULL if invoked for another reason
406  */
407 static void
408 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
409 {
410   struct ConsensusSession *session;
411
412   /* don't kick off next round if we're shutting down */
413   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
414     return;
415
416   session = cls;
417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
418
419   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
420   {
421     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
422     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
423   }
424
425   switch (session->current_round)
426   {
427     case CONSENSUS_ROUND_BEGIN:
428       session->current_round = CONSENSUS_ROUND_EXCHANGE;
429       session->exp_round = 0;
430       subround_over (session, NULL);
431       break;
432     case CONSENSUS_ROUND_EXCHANGE:
433       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
434                   session->local_peer_idx);
435       session->current_round = CONSENSUS_ROUND_FINISH;
436       GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
437       break;
438     default:
439       GNUNET_assert (0);
440   }
441 }
442
443
444 /**
445  * Create a new permutation for the session's peers in session->shuffle.
446  * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
447  * both the global session id and the current round index.
448  *
449  * @param session the session to create the new permutation for
450  */
451 static void
452 shuffle (struct ConsensusSession *session)
453 {
454   uint32_t i;
455   uint32_t randomness[session->num_peers-1];
456
457   if (NULL == session->shuffle)
458     session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
459   if (NULL == session->shuffle_inv)
460     session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
461
462   GNUNET_CRYPTO_kdf (randomness, sizeof (randomness),
463                      &session->exp_round, sizeof (uint32_t),
464                      &session->global_id, sizeof (struct GNUNET_HashCode),
465                      NULL);
466
467   for (i = 0; i < session->num_peers; i++)
468     session->shuffle[i] = i;
469
470   for (i = session->num_peers - 1; i > 0; i--)
471   {
472     uint32_t x;
473     uint32_t tmp;
474     x = randomness[i-1] % session->num_peers;
475     tmp = session->shuffle[x];
476     session->shuffle[x] = session->shuffle[i];
477     session->shuffle[i] = tmp;
478   }
479
480   /* create the inverse */
481   for (i = 0; i < session->num_peers; i++)
482     session->shuffle_inv[session->shuffle[i]] = i;
483 }
484
485
486 /**
487  * Find and set the partner_incoming and partner_outgoing of our peer,
488  * one of them may not exist (and thus set to NULL) if the number of peers
489  * in the session is not a power of two.
490  *
491  * @param session the consensus session
492  */
493 static void
494 find_partners (struct ConsensusSession *session)
495 {
496   unsigned int arc;
497   unsigned int num_ghosts;
498   unsigned int largest_arc;
499   int partner_idx;
500
501   /* shuffled local index */
502   int my_idx = session->shuffle[session->local_peer_idx];
503
504   /* distance to neighboring peer in current subround */
505   arc = 1 << session->exp_subround;
506   largest_arc = 1;
507   while (largest_arc < session->num_peers)
508     largest_arc <<= 1;
509   num_ghosts = largest_arc - session->num_peers;
510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
512   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
513
514   if (0 == (my_idx & arc))
515   {
516     /* we are outgoing */
517     partner_idx = (my_idx + arc) % session->num_peers;
518     session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
519     session->partner_outgoing->exp_subround_finished = GNUNET_NO;
520     /* are we a 'ghost' of a peer that would exist if
521      * the number of peers was a power of two, and thus have to partner
522      * with an additional peer?
523      */
524     if (my_idx < num_ghosts)
525     {
526       int ghost_partner_idx;
527       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
528       ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
529       /* platform dependent; modulo sometimes returns negative values */
530       if (ghost_partner_idx < 0)
531         ghost_partner_idx += session->num_peers;
532       /* we only need to have a ghost partner if the partner is outgoing */
533       if (0 == (ghost_partner_idx & arc))
534       {
535         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
536         session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
537         session->partner_incoming->exp_subround_finished = GNUNET_NO;
538         return;
539       }
540     }
541     session->partner_incoming = NULL;
542     return;
543   }
544   /* we only have an incoming connection */
545   partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
546   if (partner_idx < 0)
547     partner_idx += session->num_peers;
548   session->partner_outgoing = NULL;
549   session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
550   session->partner_incoming->exp_subround_finished = GNUNET_NO;
551 }
552
553
554 /**
555  * Callback for set operation results. Called for each element
556  * in the result set.
557  *
558  * @param cls closure
559  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
560  * @param status see enum GNUNET_SET_Status
561  */
562 static void
563 set_result_cb (void *cls,
564                const struct GNUNET_SET_Element *element,
565                enum GNUNET_SET_Status status)
566 {
567   struct ConsensusPeerInformation *cpi = cls;
568   unsigned int remote_idx = cpi - cpi->session->info;
569   unsigned int local_idx = cpi->session->local_peer_idx;
570
571   GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
572                  (cpi == cpi->session->partner_incoming));
573
574   switch (status)
575   {
576     case GNUNET_SET_STATUS_OK:
577       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
578                   local_idx, remote_idx);
579       break;
580     case GNUNET_SET_STATUS_FAILURE:
581       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
582                   local_idx, remote_idx);
583       cpi->set_op = NULL;
584       return;
585     case GNUNET_SET_STATUS_HALF_DONE:
586     case GNUNET_SET_STATUS_DONE:
587       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
588                   local_idx, remote_idx);
589       cpi->exp_subround_finished = GNUNET_YES;
590       cpi->set_op = NULL;
591       if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
592       {
593         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
594                     local_idx);
595         subround_over (cpi->session, NULL);
596       }
597       else
598       {
599         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
600                     local_idx);
601       }
602       return;
603     default:
604       GNUNET_break (0);
605       return;
606   }
607
608   switch (cpi->session->current_round)
609   {
610     case CONSENSUS_ROUND_EXCHANGE:
611       GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
612       break;
613     default:
614       GNUNET_break (0);
615       return;
616   }
617 }
618
619
620 /**
621  * Compare the round the session is in with the round of the given context message.
622  *
623  * @param session a consensus session
624  * @param ri a round context message
625  * @return 0 if it's the same round, -1 if the session is in an earlier round,
626  *         1 if the session is in a later round
627  */
628 static int
629 rounds_compare (struct ConsensusSession *session,
630                 struct RoundInfo* ri)
631 {
632   if (session->current_round < ri->round)
633     return -1;
634   if (session->current_round > ri->round)
635     return 1;
636   if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
637   {
638     if (session->exp_round < ri->exp_round)
639       return -1;
640     if (session->exp_round > ri->exp_round)
641       return 1;
642     if (session->exp_subround < ri->exp_subround)
643       return -1;
644     if (session->exp_subround < ri->exp_subround)
645       return 1;
646     return 0;
647   }
648   /* comparing rounds when we are not in a exp round */
649   GNUNET_assert (0);
650 }
651
652
653 /**
654  * Do the next subround in the exp-scheme.
655  * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
656  *
657  * @param cls the session
658  * @param tc task context, for when this task is invoked by the scheduler,
659  *           NULL if invoked for another reason
660  */
661 static void
662 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
663 {
664   struct ConsensusSession *session;
665   int i;
666
667   /* don't kick off next subround if we're shutting down */
668   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
669     return;
670   session = cls;
671   /* cancel timeout */
672   if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
673   {
674     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
675     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
676   }
677
678   if (session->exp_round >= NUM_EXP_ROUNDS)
679   {
680     round_over (session, NULL);
681     return;
682   }
683
684   if (session->exp_round == 0)
685   {
686     /* initialize everything for the log-rounds */
687     session->exp_round = 1;
688     session->exp_subround = 0;
689     if (NULL == session->shuffle)
690       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
691     if (NULL == session->shuffle_inv)
692       session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
693     for (i = 0; i < session->num_peers; i++)
694       session->shuffle[i] = session->shuffle_inv[i] = i;
695   }
696   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
697   {
698     /* subrounds done, start new log-round */
699     session->exp_round++;
700     session->exp_subround = 0;
701     shuffle (session);
702   }
703   else
704   {
705     session->exp_subround++;
706   }
707
708   /* determine the incoming and outgoing partner */
709   find_partners (session);
710
711   GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
712   GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
713
714   /* initiate set operation with the outgoing partner */
715   if (NULL != session->partner_outgoing)
716   {
717     struct GNUNET_CONSENSUS_RoundContextMessage *msg;
718     msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
719     msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
720     msg->header.size = htons (sizeof *msg);
721     msg->round = htonl (session->current_round);
722     msg->exp_round = htonl (session->exp_round);
723     msg->exp_subround = htonl (session->exp_subround);
724
725     if (NULL != session->partner_outgoing->set_op)
726     {
727       GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
728     }
729     session->partner_outgoing->set_op =
730         GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
731                             &session->global_id,
732                             (struct GNUNET_MessageHeader *) msg,
733                             0, /* FIXME: salt */
734                             GNUNET_SET_RESULT_ADDED,
735                             set_result_cb, session->partner_outgoing);
736     GNUNET_free (msg);
737     GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set);
738   }
739
740   /* commit to the delayed set operation */
741   if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
742   {
743     int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
744
745     if (NULL != session->partner_incoming->set_op)
746     {
747       GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
748       session->partner_incoming->set_op = NULL;
749     }
750     if (cmp == 0)
751     {
752       GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set);
753       session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
754       session->partner_incoming->delayed_set_op = NULL;
755       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
756                   session->local_peer_idx, (int) (session->partner_incoming - session->info));
757     }
758     else
759     {
760       /* this should not happen -- a round has been skipped! */
761       GNUNET_break_op (0);
762     }
763   }
764
765 #ifdef GNUNET_EXTRA_LOGGING
766   {
767     int in;
768     int out;
769     if (session->partner_outgoing == NULL)
770       out = -1;
771     else
772       out = (int) (session->partner_outgoing - session->info);
773     if (session->partner_incoming == NULL)
774       in = -1;
775     else
776       in = (int) (session->partner_incoming - session->info);
777     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
778                 session->exp_round, session->exp_subround, in, out);
779   }
780 #endif /* GNUNET_EXTRA_LOGGING */
781
782 }
783
784
785 /**
786  * Search peer in the list of peers in session.
787  *
788  * @param peer peer to find
789  * @param session session with peer
790  * @return index of peer, -1 if peer is not in session
791  */
792 static int
793 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
794 {
795   int i;
796   for (i = 0; i < session->num_peers; i++)
797     if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
798       return i;
799   return -1;
800 }
801
802
803 /**
804  * Compute a global, (hopefully) unique consensus session id,
805  * from the local id of the consensus session, and the identities of all participants.
806  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
807  * exactly the same peers, the global id will be different.
808  *
809  * @param session session to generate the global id for
810  * @param session_id local id of the consensus session
811  */
812 static void
813 compute_global_id (struct ConsensusSession *session,
814                    const struct GNUNET_HashCode *session_id)
815 {
816   int i;
817   struct GNUNET_HashCode tmp;
818   struct GNUNET_HashCode phash;
819
820   /* FIXME: use kdf? */
821
822   session->global_id = *session_id;
823   for (i = 0; i < session->num_peers; ++i)
824   {
825     GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash);
826     GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp);
827     session->global_id = tmp;
828     GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
829     session->global_id = tmp;
830   }
831 }
832
833
834 /**
835  * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
836  * the correct signature to be used with e.g. qsort.
837  * We use this function instead.
838  *
839  * @param h1 some hash code
840  * @param h2 some hash code
841  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
842  */
843 static int
844 hash_cmp (const void *h1, const void *h2)
845 {
846   return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
847 }
848
849
850 /**
851  * Create the sorted list of peers for the session,
852  * add the local peer if not in the join message.
853  */
854 static void
855 initialize_session_peer_list (struct ConsensusSession *session,
856                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
857 {
858   unsigned int local_peer_in_list;
859   uint32_t listed_peers;
860   const struct GNUNET_PeerIdentity *msg_peers;
861   struct GNUNET_PeerIdentity *peers;
862   unsigned int i;
863
864   GNUNET_assert (NULL != join_msg);
865
866   /* peers in the join message, may or may not include the local peer */
867   listed_peers = ntohl (join_msg->num_peers);
868
869   session->num_peers = listed_peers;
870
871   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
872
873   local_peer_in_list = GNUNET_NO;
874   for (i = 0; i < listed_peers; i++)
875   {
876     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
877     {
878       local_peer_in_list = GNUNET_YES;
879       break;
880     }
881   }
882
883   if (GNUNET_NO == local_peer_in_list)
884     session->num_peers++;
885
886   peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
887
888   if (GNUNET_NO == local_peer_in_list)
889     peers[session->num_peers - 1] = my_peer;
890
891   memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
892   qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
893
894   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
895
896   for (i = 0; i < session->num_peers; ++i)
897   {
898     /* initialize back-references, so consensus peer information can
899      * be used as closure */
900     session->info[i].session = session;
901     session->info[i].peer_id = peers[i];
902   }
903
904   GNUNET_free (peers);
905 }
906
907
908 /**
909  * Called when another peer wants to do a set operation with the
910  * local peer.
911  *
912  * @param cls closure
913  * @param other_peer the other peer
914  * @param context_msg message with application specific information from
915  *        the other peer
916  * @param request request from the other peer, use GNUNET_SET_accept
917  *        to accept it, otherwise the request will be refused
918  *        Note that we don't use a return value here, as it is also
919  *        necessary to specify the set we want to do the operation with,
920  *        whith sometimes can be derived from the context message.
921  *        Also necessary to specify the timeout.
922  */
923 static void
924 set_listen_cb (void *cls,
925                const struct GNUNET_PeerIdentity *other_peer,
926                const struct GNUNET_MessageHeader *context_msg,
927                struct GNUNET_SET_Request *request)
928 {
929   struct ConsensusSession *session = cls;
930   struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
931   struct ConsensusPeerInformation *cpi;
932   struct GNUNET_SET_OperationHandle *set_op;
933   struct RoundInfo round_info;
934   int index;
935   int cmp;
936
937   if (NULL == context_msg)
938   {
939     GNUNET_break_op (0);
940     return;
941   }
942
943   index = get_peer_idx (other_peer, session);
944
945   if (index < 0)
946   {
947     GNUNET_break_op (0);
948     return;
949   }
950
951   round_info.round = ntohl (msg->round);
952   round_info.exp_round = ntohl (msg->exp_round);
953   round_info.exp_subround = ntohl (msg->exp_subround);
954
955   cpi = &session->info[index];
956
957   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
958
959   switch (session->current_round)
960   {
961     case CONSENSUS_ROUND_BEGIN:
962       /* we're in the begin round, so requests for the exchange round may
963        * come in, they will be delayed for now! */
964     case CONSENSUS_ROUND_EXCHANGE:
965       cmp = rounds_compare (session, &round_info);
966       if (cmp > 0)
967       {
968         /* the other peer is too late */
969         GNUNET_break_op (0);
970         return;
971       }
972       /* kill old request, if any. this is legal,
973        * as the other peer would not make a new request if it would want to
974        * complete the old one! */
975       if (NULL != cpi->set_op)
976       {
977         GNUNET_SET_operation_cancel (cpi->set_op);
978         cpi->set_op = NULL;
979       }
980       set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
981                                        set_result_cb, &session->info[index]);
982       if (cmp == 0)
983       {
984         cpi->set_op = set_op;
985         GNUNET_SET_commit (set_op, session->element_set);
986         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
987       }
988       else
989       {
990         /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */
991         cpi->delayed_set_op = set_op;
992         cpi->delayed_round_info = round_info;
993         cpi->exp_subround_finished = GNUNET_YES;
994         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
995       }
996       break;
997     default:
998       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
999                   session->local_peer_idx, session->current_round, index);
1000       GNUNET_break_op (0);
1001       return;
1002   }
1003 }
1004
1005
1006 /**
1007  * Initialize the session, continue receiving messages from the owning client
1008  *
1009  * @param session the session to initialize
1010  * @param join_msg the join message from the client
1011  */
1012 static void
1013 initialize_session (struct ConsensusSession *session,
1014                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
1015 {
1016   struct ConsensusSession *other_session;
1017
1018   initialize_session_peer_list (session, join_msg);
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
1020   compute_global_id (session, &join_msg->session_id);
1021
1022   /* check if some local client already owns the session.
1023    * it is only legal to have a session with an existing global id
1024    * if all other sessions with this global id are finished.*/
1025   other_session = sessions_head;
1026   while (NULL != other_session)
1027   {
1028     if ((other_session != session) &&
1029         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1030     {
1031       if (CONSENSUS_ROUND_FINISH != other_session->current_round)
1032       {
1033         GNUNET_break (0);
1034         destroy_session (session);
1035         return;
1036       }
1037       break;
1038     }
1039     other_session = other_session->next;
1040   }
1041
1042   session->local_peer_idx = get_peer_idx (&my_peer, session);
1043   GNUNET_assert (-1 != session->local_peer_idx);
1044   session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1045   GNUNET_assert (NULL != session->element_set);
1046   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
1047                                              &session->global_id,
1048                                              set_listen_cb, session);
1049   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
1050   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1051 }
1052
1053
1054 static struct ConsensusSession *
1055 get_session_by_client (struct GNUNET_SERVER_Client *client)
1056 {
1057   struct ConsensusSession *session;
1058
1059   session = sessions_head;
1060   while (NULL != session)
1061   {
1062     if (session->client == client)
1063       return session;
1064     session = session->next;
1065   }
1066   return NULL;
1067 }
1068
1069
1070 /**
1071  * Called when a client wants to join a consensus session.
1072  *
1073  * @param cls unused
1074  * @param client client that sent the message
1075  * @param m message sent by the client
1076  */
1077 static void
1078 client_join (void *cls,
1079              struct GNUNET_SERVER_Client *client,
1080              const struct GNUNET_MessageHeader *m)
1081 {
1082   struct ConsensusSession *session;
1083
1084   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
1085
1086   session = get_session_by_client (client);
1087   if (NULL != session)
1088   {
1089     GNUNET_break (0);
1090     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1091     return;
1092   }
1093   session = GNUNET_new (struct ConsensusSession);
1094   session->client = client;
1095   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
1096   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1097   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
1098   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1099
1100   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
1101 }
1102
1103
1104 /**
1105  * Called when a client performs an insert operation.
1106  *
1107  * @param cls (unused)
1108  * @param client client handle
1109  * @param m message sent by the client
1110  */
1111 void
1112 client_insert (void *cls,
1113                struct GNUNET_SERVER_Client *client,
1114                const struct GNUNET_MessageHeader *m)
1115 {
1116   struct ConsensusSession *session;
1117   struct GNUNET_CONSENSUS_ElementMessage *msg;
1118   struct GNUNET_SET_Element *element;
1119   ssize_t element_size;
1120
1121   session = get_session_by_client (client);
1122
1123   if (NULL == session)
1124   {
1125     GNUNET_break (0);
1126     GNUNET_SERVER_client_disconnect (client);
1127     return;
1128   }
1129
1130   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1131   {
1132     GNUNET_break (0);
1133     GNUNET_SERVER_client_disconnect (client);
1134     return;
1135   }
1136
1137   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1138   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1139   if (element_size < 0)
1140   {
1141     GNUNET_break (0);
1142     return;
1143   }
1144
1145   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
1146   element->type = msg->element_type;
1147   element->size = element_size;
1148   memcpy (&element[1], &msg[1], element_size);
1149   element->data = &element[1];
1150   GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
1151   GNUNET_free (element);
1152   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1153
1154   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
1155 }
1156
1157
1158 /**
1159  * Called when a client performs the conclude operation.
1160  *
1161  * @param cls (unused)
1162  * @param client client handle
1163  * @param message message sent by the client
1164  */
1165 static void
1166 client_conclude (void *cls,
1167                  struct GNUNET_SERVER_Client *client,
1168                  const struct GNUNET_MessageHeader *message)
1169 {
1170   struct ConsensusSession *session;
1171   struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
1172
1173
1174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
1175   cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
1176   session = get_session_by_client (client);
1177   if (NULL == session)
1178   {
1179     /* client not found */
1180     GNUNET_break (0);
1181     GNUNET_SERVER_client_disconnect (client);
1182     return;
1183   }
1184   if (CONSENSUS_ROUND_BEGIN != session->current_round)
1185   {
1186     /* client requested conclude twice */
1187     GNUNET_break (0);
1188     return;
1189   }
1190   if (session->num_peers <= 1)
1191   {
1192     session->current_round = CONSENSUS_ROUND_FINISH;
1193     GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
1194   }
1195   else
1196   {
1197     session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
1198     /* the 'begin' round is over, start with the next, actual round */
1199     round_over (session, NULL);
1200   }
1201
1202   GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
1203   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1204 }
1205
1206
1207 /**
1208  * Called to clean up, after a shutdown has been requested.
1209  *
1210  * @param cls closure
1211  * @param tc context information (why was this task triggered now)
1212  */
1213 static void
1214 shutdown_task (void *cls,
1215                const struct GNUNET_SCHEDULER_TaskContext *tc)
1216 {
1217   while (NULL != sessions_head)
1218     destroy_session (sessions_head);
1219
1220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1221 }
1222
1223
1224 /**
1225  * Clean up after a client after it is
1226  * disconnected (either by us or by itself)
1227  *
1228  * @param cls closure, unused
1229  * @param client the client to clean up after
1230  */
1231 void
1232 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1233 {
1234   struct ConsensusSession *session;
1235
1236   session = get_session_by_client (client);
1237   if (NULL == session)
1238     return;
1239   if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
1240       (CONSENSUS_ROUND_FINISH == session->current_round))
1241     destroy_session (session);
1242   else
1243     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1244 }
1245
1246
1247 /**
1248  * Start processing consensus requests.
1249  *
1250  * @param cls closure
1251  * @param server the initialized server
1252  * @param c configuration to use
1253  */
1254 static void
1255 run (void *cls, struct GNUNET_SERVER_Handle *server,
1256      const struct GNUNET_CONFIGURATION_Handle *c)
1257 {
1258   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1259     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1260         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
1261     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1262     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1263     {NULL, NULL, 0, 0}
1264   };
1265
1266   cfg = c;
1267   srv = server;
1268   if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
1269   {
1270     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
1271     GNUNET_break (0);
1272     GNUNET_SCHEDULER_shutdown ();
1273     return;
1274   }
1275   GNUNET_SERVER_add_handlers (server, server_handlers);
1276   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1277   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
1278   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1279 }
1280
1281
1282 /**
1283  * The main function for the consensus service.
1284  *
1285  * @param argc number of arguments from the command line
1286  * @param argv command line arguments
1287  * @return 0 ok, 1 on error
1288  */
1289 int
1290 main (int argc, char *const *argv)
1291 {
1292   int ret;
1293   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1294   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
1295   return (GNUNET_OK == ret) ? 0 : 1;
1296 }
1297