add integer overflow guards and avoid (unlimited) stack allocation
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold <flo@dold.me>
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_consensus_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
37
38
39 enum ReferendumVote
40 {
41   /**
42    * Vote that nothing should change.
43    * This option is never voted explicitly.
44    */
45   VOTE_STAY = 0,
46   /**
47    * Vote that an element should be added.
48    */
49   VOTE_ADD = 1,
50   /**
51    * Vote that an element should be removed.
52    */
53   VOTE_REMOVE = 2,
54 };
55
56
57 enum EarlyStoppingPhase
58 {
59   EARLY_STOPPING_NONE = 0,
60   EARLY_STOPPING_ONE_MORE = 1,
61   EARLY_STOPPING_DONE = 2,
62 };
63
64
65 GNUNET_NETWORK_STRUCT_BEGIN
66
67 /**
68  * Tuple of integers that together
69  * identify a task uniquely.
70  */
71 struct TaskKey
72 {
73   /**
74    * A value from 'enum PhaseKind'.
75    */
76   uint16_t kind GNUNET_PACKED;
77
78   /**
79    * Number of the first peer
80    * in canonical order.
81    */
82   int16_t peer1 GNUNET_PACKED;
83
84   /**
85    * Number of the second peer in canonical order.
86    */
87   int16_t peer2 GNUNET_PACKED;
88
89   /**
90    * Repetition of the gradecast phase.
91    */
92   int16_t repetition GNUNET_PACKED;
93
94   /**
95    * Leader in the gradecast phase.
96    *
97    * Can be different from both peer1 and peer2.
98    */
99   int16_t leader GNUNET_PACKED;
100 };
101
102
103 struct SetKey
104 {
105   int set_kind GNUNET_PACKED;
106   int k1 GNUNET_PACKED;
107   int k2 GNUNET_PACKED;
108 };
109
110
111 struct SetEntry
112 {
113   struct SetKey key;
114   struct GNUNET_SET_Handle *h;
115   /**
116    * GNUNET_YES if the set resulted
117    * from applying a referendum with contested
118    * elements.
119    */
120   int is_contested;
121 };
122
123
124 struct DiffKey
125 {
126   int diff_kind GNUNET_PACKED;
127   int k1 GNUNET_PACKED;
128   int k2 GNUNET_PACKED;
129 };
130
131 struct RfnKey
132 {
133   int rfn_kind GNUNET_PACKED;
134   int k1 GNUNET_PACKED;
135   int k2 GNUNET_PACKED;
136 };
137
138
139 GNUNET_NETWORK_STRUCT_END
140
141 enum PhaseKind
142 {
143   PHASE_KIND_ALL_TO_ALL,
144   PHASE_KIND_ALL_TO_ALL_2,
145   PHASE_KIND_GRADECAST_LEADER,
146   PHASE_KIND_GRADECAST_ECHO,
147   PHASE_KIND_GRADECAST_ECHO_GRADE,
148   PHASE_KIND_GRADECAST_CONFIRM,
149   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
150   /**
151    * Apply a repetition of the all-to-all
152    * gradecast to the current set.
153    */
154   PHASE_KIND_APPLY_REP,
155   PHASE_KIND_FINISH,
156 };
157
158
159 enum SetKind
160 {
161   SET_KIND_NONE = 0,
162   SET_KIND_CURRENT,
163   /**
164    * Last result set from a gradecast
165    */
166   SET_KIND_LAST_GRADECAST,
167   SET_KIND_LEADER_PROPOSAL,
168   SET_KIND_ECHO_RESULT,
169 };
170
171 enum DiffKind
172 {
173   DIFF_KIND_NONE = 0,
174   DIFF_KIND_LEADER_PROPOSAL,
175   DIFF_KIND_LEADER_CONSENSUS,
176   DIFF_KIND_GRADECAST_RESULT,
177 };
178
179 enum RfnKind
180 {
181   RFN_KIND_NONE = 0,
182   RFN_KIND_ECHO,
183   RFN_KIND_CONFIRM,
184   RFN_KIND_GRADECAST_RESULT
185 };
186
187
188 struct SetOpCls
189 {
190   struct SetKey input_set;
191
192   struct SetKey output_set;
193   struct RfnKey output_rfn;
194   struct DiffKey output_diff;
195
196   int do_not_remove;
197
198   int transceive_contested;
199
200   struct GNUNET_SET_OperationHandle *op;
201 };
202
203
204 struct FinishCls
205 {
206   struct SetKey input_set;
207 };
208
209 /**
210  * Closure for both @a start_task
211  * and @a cancel_task.
212  */
213 union TaskFuncCls
214 {
215   struct SetOpCls setop;
216   struct FinishCls finish;
217 };
218
219 struct TaskEntry;
220
221 typedef void (*TaskFunc) (struct TaskEntry *task);
222
223 /*
224  * Node in the consensus task graph.
225  */
226 struct TaskEntry
227 {
228   struct TaskKey key;
229
230   struct Step *step;
231
232   int is_started;
233
234   int is_finished;
235
236   TaskFunc start;
237   TaskFunc cancel;
238
239   union TaskFuncCls cls;
240 };
241
242
243 struct Step
244 {
245   /**
246    * All steps of one session are in a
247    * linked list for easier deallocation.
248    */
249   struct Step *prev;
250
251   /**
252    * All steps of one session are in a
253    * linked list for easier deallocation.
254    */
255   struct Step *next;
256
257   struct ConsensusSession *session;
258
259   /**
260    * Tasks that this step is composed of.
261    */
262   struct TaskEntry **tasks;
263   unsigned int tasks_len;
264   unsigned int tasks_cap;
265
266   unsigned int finished_tasks;
267
268   /*
269    * Tasks that have this task as dependency.
270    *
271    * We store pointers to subordinates rather
272    * than to prerequisites since it makes
273    * tracking the readiness of a task easier.
274    */
275   struct Step **subordinates;
276   unsigned int subordinates_len;
277   unsigned int subordinates_cap;
278
279   /**
280    * Counter for the prerequisites of
281    * this step.
282    */
283   size_t pending_prereq;
284
285   /*
286    * Task that will run this step despite
287    * any pending prerequisites.
288    */
289   struct GNUNET_SCHEDULER_Task *timeout_task;
290
291   unsigned int is_running;
292
293   unsigned int is_finished;
294
295   /*
296    * Synchrony round of the task.
297    * Determines the deadline for the task.
298    */
299   unsigned int round;
300
301   /**
302    * Human-readable name for
303    * the task, used for debugging.
304    */
305   char *debug_name;
306
307   /**
308    * When we're doing an early finish, how should this step be
309    * treated?
310    * If GNUNET_YES, the step will be marked as finished
311    * without actually running its tasks.
312    * Otherwise, the step will still be run even after
313    * an early finish.
314    *
315    * Note that a task may never be finished early if
316    * it is already running.
317    */
318   int early_finishable;
319 };
320
321
322 struct RfnElementInfo
323 {
324   const struct GNUNET_SET_Element *element;
325
326   /*
327    * GNUNET_YES if the peer votes for the proposal.
328    */
329   int *votes;
330
331   /**
332    * Proposal for this element,
333    * can only be VOTE_ADD or VOTE_REMOVE.
334    */
335   enum ReferendumVote proposal;
336 };
337
338
339 struct ReferendumEntry
340 {
341   struct RfnKey key;
342
343   /*
344    * Elements where there is at least one proposed change.
345    *
346    * Maps the hash of the GNUNET_SET_Element
347    * to 'struct RfnElementInfo'.
348    */
349   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
350
351   unsigned int num_peers;
352
353   /**
354    * Stores, for every peer in the session,
355    * whether the peer finished the whole referendum.
356    *
357    * Votes from peers are only counted if they're
358    * marked as commited (#GNUNET_YES) in the referendum.
359    *
360    * Otherwise (#GNUNET_NO), the requested changes are
361    * not counted for majority votes or thresholds.
362    */
363   int *peer_commited;
364
365
366   /**
367    * Contestation state of the peer.  If a peer is contested, the values it
368    * contributed are still counted for applying changes, but the grading is
369    * affected.
370    */
371   int *peer_contested;
372 };
373
374
375 struct DiffElementInfo
376 {
377   const struct GNUNET_SET_Element *element;
378
379   /**
380    * Positive weight for 'add', negative
381    * weights for 'remove'.
382    */
383   int weight;
384 };
385
386
387 /**
388  * Weighted diff.
389  */
390 struct DiffEntry
391 {
392   struct DiffKey key;
393   struct GNUNET_CONTAINER_MultiHashMap *changes;
394 };
395
396 struct SetHandle
397 {
398   struct SetHandle *prev;
399   struct SetHandle *next;
400
401   struct GNUNET_SET_Handle *h;
402 };
403
404
405 /**
406  * A consensus session consists of one local client and the remote authorities.
407  */
408 struct ConsensusSession
409 {
410   /**
411    * Consensus sessions are kept in a DLL.
412    */
413   struct ConsensusSession *next;
414
415   /**
416    * Consensus sessions are kept in a DLL.
417    */
418   struct ConsensusSession *prev;
419
420   unsigned int num_client_insert_pending;
421
422   struct GNUNET_CONTAINER_MultiHashMap *setmap;
423   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
424   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
425
426   /**
427    * Array of peers with length 'num_peers'.
428    */
429   int *peers_blacklisted;
430
431   /*
432    * Mapping from (hashed) TaskKey to TaskEntry.
433    *
434    * We map the application_id for a round to the task that should be
435    * executed, so we don't have to go through all task whenever we get
436    * an incoming set op request.
437    */
438   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
439
440   struct Step *steps_head;
441   struct Step *steps_tail;
442
443   int conclude_started;
444
445   int conclude_done;
446
447   /**
448    * Global consensus identification, computed
449    * from the session id and participating authorities.
450    */
451   struct GNUNET_HashCode global_id;
452
453   /**
454    * Client that inhabits the session
455    */
456   struct GNUNET_SERVICE_Client *client;
457
458   /**
459    * Queued messages to the client.
460    */
461   struct GNUNET_MQ_Handle *client_mq;
462
463   /**
464    * Time when the conclusion of the consensus should begin.
465    */
466   struct GNUNET_TIME_Absolute conclude_start;
467
468   /**
469    * Timeout for all rounds together, single rounds will schedule a timeout task
470    * with a fraction of the conclude timeout.
471    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
472    */
473   struct GNUNET_TIME_Absolute conclude_deadline;
474
475   struct GNUNET_PeerIdentity *peers;
476
477   /**
478    * Number of other peers in the consensus.
479    */
480   unsigned int num_peers;
481
482   /**
483    * Index of the local peer in the peers array
484    */
485   unsigned int local_peer_idx;
486
487   /**
488    * Listener for requests from other peers.
489    * Uses the session's global id as app id.
490    */
491   struct GNUNET_SET_ListenHandle *set_listener;
492
493   /**
494    * State of our early stopping scheme.
495    */
496   int early_stopping;
497
498   /**
499    * Our set size from the first round.
500    */
501   uint64_t first_size;
502
503   uint64_t *first_sizes_received;
504
505   /**
506    * Bounded Eppstein lower bound.
507    */
508   uint64_t lower_bound;
509
510   struct SetHandle *set_handles_head;
511   struct SetHandle *set_handles_tail;
512 };
513
514 /**
515  * Linked list of sessions this peer participates in.
516  */
517 static struct ConsensusSession *sessions_head;
518
519 /**
520  * Linked list of sessions this peer participates in.
521  */
522 static struct ConsensusSession *sessions_tail;
523
524 /**
525  * Configuration of the consensus service.
526  */
527 static const struct GNUNET_CONFIGURATION_Handle *cfg;
528
529 /**
530  * Peer that runs this service.
531  */
532 static struct GNUNET_PeerIdentity my_peer;
533
534 /**
535  * Statistics handle.
536  */
537 struct GNUNET_STATISTICS_Handle *statistics;
538
539
540 static void
541 finish_task (struct TaskEntry *task);
542
543
544 static void
545 run_ready_steps (struct ConsensusSession *session);
546
547
548 static const char *
549 phasename (uint16_t phase)
550 {
551   switch (phase)
552   {
553   case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
554
555   case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
556
557   case PHASE_KIND_FINISH: return "FINISH";
558
559   case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
560
561   case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
562
563   case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
564
565   case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
566
567   case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
568
569   case PHASE_KIND_APPLY_REP: return "APPLY_REP";
570
571   default: return "(unknown)";
572   }
573 }
574
575
576 static const char *
577 setname (uint16_t kind)
578 {
579   switch (kind)
580   {
581   case SET_KIND_CURRENT: return "CURRENT";
582
583   case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
584
585   case SET_KIND_NONE: return "NONE";
586
587   default: return "(unknown)";
588   }
589 }
590
591
592 static const char *
593 rfnname (uint16_t kind)
594 {
595   switch (kind)
596   {
597   case RFN_KIND_NONE: return "NONE";
598
599   case RFN_KIND_ECHO: return "ECHO";
600
601   case RFN_KIND_CONFIRM: return "CONFIRM";
602
603   default: return "(unknown)";
604   }
605 }
606
607
608 static const char *
609 diffname (uint16_t kind)
610 {
611   switch (kind)
612   {
613   case DIFF_KIND_NONE: return "NONE";
614
615   case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
616
617   case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
618
619   case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
620
621   default: return "(unknown)";
622   }
623 }
624
625
626 #ifdef GNUNET_EXTRA_LOGGING
627
628
629 static const char *
630 debug_str_element (const struct GNUNET_SET_Element *el)
631 {
632   struct GNUNET_HashCode hash;
633
634   GNUNET_SET_element_hash (el, &hash);
635
636   return GNUNET_h2s (&hash);
637 }
638
639
640 static const char *
641 debug_str_task_key (struct TaskKey *tk)
642 {
643   static char buf[256];
644
645   snprintf (buf, sizeof(buf),
646             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
647             phasename (tk->kind), tk->peer1, tk->peer2,
648             tk->leader, tk->repetition);
649
650   return buf;
651 }
652
653
654 static const char *
655 debug_str_diff_key (struct DiffKey *dk)
656 {
657   static char buf[256];
658
659   snprintf (buf, sizeof(buf),
660             "DiffKey kind=%s, k1=%d, k2=%d",
661             diffname (dk->diff_kind), dk->k1, dk->k2);
662
663   return buf;
664 }
665
666
667 static const char *
668 debug_str_set_key (const struct SetKey *sk)
669 {
670   static char buf[256];
671
672   snprintf (buf, sizeof(buf),
673             "SetKey kind=%s, k1=%d, k2=%d",
674             setname (sk->set_kind), sk->k1, sk->k2);
675
676   return buf;
677 }
678
679
680 static const char *
681 debug_str_rfn_key (const struct RfnKey *rk)
682 {
683   static char buf[256];
684
685   snprintf (buf, sizeof(buf),
686             "RfnKey kind=%s, k1=%d, k2=%d",
687             rfnname (rk->rfn_kind), rk->k1, rk->k2);
688
689   return buf;
690 }
691
692
693 #endif /* GNUNET_EXTRA_LOGGING */
694
695
696 /**
697  * Send the final result set of the consensus to the client, element by
698  * element.
699  *
700  * @param cls closure
701  * @param element the current element, NULL if all elements have been
702  *        iterated over
703  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
704  */
705 static int
706 send_to_client_iter (void *cls,
707                      const struct GNUNET_SET_Element *element)
708 {
709   struct TaskEntry *task = (struct TaskEntry *) cls;
710   struct ConsensusSession *session = task->step->session;
711   struct GNUNET_MQ_Envelope *ev;
712
713   if (NULL != element)
714   {
715     struct GNUNET_CONSENSUS_ElementMessage *m;
716     const struct ConsensusElement *ce;
717
718     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT ==
719                    element->element_type);
720     ce = element->data;
721
722     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n",
723                 (unsigned) ce->marker);
724
725     if (0 != ce->marker)
726       return GNUNET_YES;
727
728     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
729                 "P%d: sending element %s to client\n",
730                 session->local_peer_idx,
731                 debug_str_element (element));
732
733     ev = GNUNET_MQ_msg_extra (m, element->size - sizeof(struct
734                                                         ConsensusElement),
735                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
736     m->element_type = ce->payload_type;
737     GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof(struct
738                                                          ConsensusElement));
739     GNUNET_MQ_send (session->client_mq, ev);
740   }
741   else
742   {
743     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744                 "P%d: finished iterating elements for client\n",
745                 session->local_peer_idx);
746     ev = GNUNET_MQ_msg_header (
747       GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
748     GNUNET_MQ_send (session->client_mq, ev);
749   }
750   return GNUNET_YES;
751 }
752
753
754 static struct SetEntry *
755 lookup_set (struct ConsensusSession *session, struct SetKey *key)
756 {
757   struct GNUNET_HashCode hash;
758
759   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760               "P%u: looking up set {%s}\n",
761               session->local_peer_idx,
762               debug_str_set_key (key));
763
764   GNUNET_assert (SET_KIND_NONE != key->set_kind);
765   GNUNET_CRYPTO_hash (key, sizeof(struct SetKey), &hash);
766   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
767 }
768
769
770 static struct DiffEntry *
771 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
772 {
773   struct GNUNET_HashCode hash;
774
775   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776               "P%u: looking up diff {%s}\n",
777               session->local_peer_idx,
778               debug_str_diff_key (key));
779
780   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
781   GNUNET_CRYPTO_hash (key, sizeof(struct DiffKey), &hash);
782   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
783 }
784
785
786 static struct ReferendumEntry *
787 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
788 {
789   struct GNUNET_HashCode hash;
790
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "P%u: looking up rfn {%s}\n",
793               session->local_peer_idx,
794               debug_str_rfn_key (key));
795
796   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
797   GNUNET_CRYPTO_hash (key, sizeof(struct RfnKey), &hash);
798   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
799 }
800
801
802 static void
803 diff_insert (struct DiffEntry *diff,
804              int weight,
805              const struct GNUNET_SET_Element *element)
806 {
807   struct DiffElementInfo *di;
808   struct GNUNET_HashCode hash;
809
810   GNUNET_assert ((1 == weight) || (-1 == weight));
811
812   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813               "diff_insert with element size %u\n",
814               element->size);
815
816   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817               "hashing element\n");
818
819   GNUNET_SET_element_hash (element, &hash);
820
821   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822               "hashed element\n");
823
824   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
825
826   if (NULL == di)
827   {
828     di = GNUNET_new (struct DiffElementInfo);
829     di->element = GNUNET_SET_element_dup (element);
830     GNUNET_assert (GNUNET_OK ==
831                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
832                                                       &hash, di,
833                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
834   }
835
836   di->weight = weight;
837 }
838
839
840 static void
841 rfn_commit (struct ReferendumEntry *rfn,
842             uint16_t commit_peer)
843 {
844   GNUNET_assert (commit_peer < rfn->num_peers);
845
846   rfn->peer_commited[commit_peer] = GNUNET_YES;
847 }
848
849
850 static void
851 rfn_contest (struct ReferendumEntry *rfn,
852              uint16_t contested_peer)
853 {
854   GNUNET_assert (contested_peer < rfn->num_peers);
855
856   rfn->peer_contested[contested_peer] = GNUNET_YES;
857 }
858
859
860 static uint16_t
861 rfn_noncontested (struct ReferendumEntry *rfn)
862 {
863   uint16_t i;
864   uint16_t ret;
865
866   ret = 0;
867   for (i = 0; i < rfn->num_peers; i++)
868     if ((GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO ==
869                                                   rfn->peer_contested[i]))
870       ret++;
871
872   return ret;
873 }
874
875
876 static void
877 rfn_vote (struct ReferendumEntry *rfn,
878           uint16_t voting_peer,
879           enum ReferendumVote vote,
880           const struct GNUNET_SET_Element *element)
881 {
882   struct RfnElementInfo *ri;
883   struct GNUNET_HashCode hash;
884
885   GNUNET_assert (voting_peer < rfn->num_peers);
886
887   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
888      since VOTE_KEEP is implicit in not voting. */
889   GNUNET_assert ((VOTE_ADD == vote) || (VOTE_REMOVE == vote));
890
891   GNUNET_SET_element_hash (element, &hash);
892   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
893
894   if (NULL == ri)
895   {
896     ri = GNUNET_new (struct RfnElementInfo);
897     ri->element = GNUNET_SET_element_dup (element);
898     ri->votes = GNUNET_new_array (rfn->num_peers, int);
899     GNUNET_assert (GNUNET_OK ==
900                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
901                                                       &hash, ri,
902                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
903   }
904
905   ri->votes[voting_peer] = GNUNET_YES;
906   ri->proposal = vote;
907 }
908
909
910 static uint16_t
911 task_other_peer (struct TaskEntry *task)
912 {
913   uint16_t me = task->step->session->local_peer_idx;
914
915   if (task->key.peer1 == me)
916     return task->key.peer2;
917   return task->key.peer1;
918 }
919
920
921 static int
922 cmp_uint64_t (const void *pa, const void *pb)
923 {
924   uint64_t a = *(uint64_t *) pa;
925   uint64_t b = *(uint64_t *) pb;
926
927   if (a == b)
928     return 0;
929   if (a < b)
930     return -1;
931   return 1;
932 }
933
934
935 /**
936  * Callback for set operation results. Called for each element
937  * in the result set.
938  *
939  * @param cls closure
940  * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
941  * @param current_size current set size
942  * @param status see enum GNUNET_SET_Status
943  */
944 static void
945 set_result_cb (void *cls,
946                const struct GNUNET_SET_Element *element,
947                uint64_t current_size,
948                enum GNUNET_SET_Status status)
949 {
950   struct TaskEntry *task = cls;
951   struct ConsensusSession *session = task->step->session;
952   struct SetEntry *output_set = NULL;
953   struct DiffEntry *output_diff = NULL;
954   struct ReferendumEntry *output_rfn = NULL;
955   unsigned int other_idx;
956   struct SetOpCls *setop;
957   const struct ConsensusElement *consensus_element = NULL;
958
959   if (NULL != element)
960   {
961     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
962                 "P%u: got element of type %u, status %u\n",
963                 session->local_peer_idx,
964                 (unsigned) element->element_type,
965                 (unsigned) status);
966     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT ==
967                    element->element_type);
968     consensus_element = element->data;
969   }
970
971   setop = &task->cls.setop;
972
973
974   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975               "P%u: got set result for {%s}, status %u\n",
976               session->local_peer_idx,
977               debug_str_task_key (&task->key),
978               status);
979
980   if (GNUNET_NO == task->is_started)
981   {
982     GNUNET_break_op (0);
983     return;
984   }
985
986   if (GNUNET_YES == task->is_finished)
987   {
988     GNUNET_break_op (0);
989     return;
990   }
991
992   other_idx = task_other_peer (task);
993
994   if (SET_KIND_NONE != setop->output_set.set_kind)
995   {
996     output_set = lookup_set (session, &setop->output_set);
997     GNUNET_assert (NULL != output_set);
998   }
999
1000   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1001   {
1002     output_diff = lookup_diff (session, &setop->output_diff);
1003     GNUNET_assert (NULL != output_diff);
1004   }
1005
1006   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1007   {
1008     output_rfn = lookup_rfn (session, &setop->output_rfn);
1009     GNUNET_assert (NULL != output_rfn);
1010   }
1011
1012   if (GNUNET_YES == session->peers_blacklisted[other_idx])
1013   {
1014     /* Peer might have been blacklisted
1015        by a gradecast running in parallel, ignore elements from now */
1016     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
1017       return;
1018     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
1019       return;
1020   }
1021
1022   if ((NULL != consensus_element) && (0 != consensus_element->marker))
1023   {
1024     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1025                 "P%u: got some marker\n",
1026                 session->local_peer_idx);
1027     if ((GNUNET_YES == setop->transceive_contested) &&
1028         (CONSENSUS_MARKER_CONTESTED == consensus_element->marker))
1029     {
1030       GNUNET_assert (NULL != output_rfn);
1031       rfn_contest (output_rfn, task_other_peer (task));
1032       return;
1033     }
1034
1035     if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1036     {
1037       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038                   "P%u: got size marker\n",
1039                   session->local_peer_idx);
1040
1041
1042       struct ConsensusSizeElement *cse = (void *) consensus_element;
1043
1044       if (cse->sender_index == other_idx)
1045       {
1046         if (NULL == session->first_sizes_received)
1047           session->first_sizes_received = GNUNET_new_array (session->num_peers,
1048                                                             uint64_t);
1049         session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1050
1051         uint64_t *copy = GNUNET_memdup (session->first_sizes_received,
1052                                         sizeof(uint64_t) * session->num_peers);
1053         qsort (copy, session->num_peers, sizeof(uint64_t), cmp_uint64_t);
1054         session->lower_bound = copy[session->num_peers / 3 + 1];
1055         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056                     "P%u: lower bound %llu\n",
1057                     session->local_peer_idx,
1058                     (long long) session->lower_bound);
1059         GNUNET_free (copy);
1060       }
1061       return;
1062     }
1063
1064     return;
1065   }
1066
1067   switch (status)
1068   {
1069   case GNUNET_SET_STATUS_ADD_LOCAL:
1070     GNUNET_assert (NULL != consensus_element);
1071     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072                 "Adding element in Task {%s}\n",
1073                 debug_str_task_key (&task->key));
1074     if (NULL != output_set)
1075     {
1076       // FIXME: record pending adds, use callback
1077       GNUNET_SET_add_element (output_set->h,
1078                               element,
1079                               NULL,
1080                               NULL);
1081 #ifdef GNUNET_EXTRA_LOGGING
1082       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083                   "P%u: adding element %s into set {%s} of task {%s}\n",
1084                   session->local_peer_idx,
1085                   debug_str_element (element),
1086                   debug_str_set_key (&setop->output_set),
1087                   debug_str_task_key (&task->key));
1088 #endif
1089     }
1090     if (NULL != output_diff)
1091     {
1092       diff_insert (output_diff, 1, element);
1093 #ifdef GNUNET_EXTRA_LOGGING
1094       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095                   "P%u: adding element %s into diff {%s} of task {%s}\n",
1096                   session->local_peer_idx,
1097                   debug_str_element (element),
1098                   debug_str_diff_key (&setop->output_diff),
1099                   debug_str_task_key (&task->key));
1100 #endif
1101     }
1102     if (NULL != output_rfn)
1103     {
1104       rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1105 #ifdef GNUNET_EXTRA_LOGGING
1106       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107                   "P%u: adding element %s into rfn {%s} of task {%s}\n",
1108                   session->local_peer_idx,
1109                   debug_str_element (element),
1110                   debug_str_rfn_key (&setop->output_rfn),
1111                   debug_str_task_key (&task->key));
1112 #endif
1113     }
1114     // XXX: add result to structures in task
1115     break;
1116
1117   case GNUNET_SET_STATUS_ADD_REMOTE:
1118     GNUNET_assert (NULL != consensus_element);
1119     if (GNUNET_YES == setop->do_not_remove)
1120       break;
1121     if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1122       break;
1123     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124                 "Removing element in Task {%s}\n",
1125                 debug_str_task_key (&task->key));
1126     if (NULL != output_set)
1127     {
1128       // FIXME: record pending adds, use callback
1129       GNUNET_SET_remove_element (output_set->h,
1130                                  element,
1131                                  NULL,
1132                                  NULL);
1133 #ifdef GNUNET_EXTRA_LOGGING
1134       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135                   "P%u: removing element %s from set {%s} of task {%s}\n",
1136                   session->local_peer_idx,
1137                   debug_str_element (element),
1138                   debug_str_set_key (&setop->output_set),
1139                   debug_str_task_key (&task->key));
1140 #endif
1141     }
1142     if (NULL != output_diff)
1143     {
1144       diff_insert (output_diff, -1, element);
1145 #ifdef GNUNET_EXTRA_LOGGING
1146       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147                   "P%u: removing element %s from diff {%s} of task {%s}\n",
1148                   session->local_peer_idx,
1149                   debug_str_element (element),
1150                   debug_str_diff_key (&setop->output_diff),
1151                   debug_str_task_key (&task->key));
1152 #endif
1153     }
1154     if (NULL != output_rfn)
1155     {
1156       rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1157 #ifdef GNUNET_EXTRA_LOGGING
1158       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159                   "P%u: removing element %s from rfn {%s} of task {%s}\n",
1160                   session->local_peer_idx,
1161                   debug_str_element (element),
1162                   debug_str_rfn_key (&setop->output_rfn),
1163                   debug_str_task_key (&task->key));
1164 #endif
1165     }
1166     break;
1167
1168   case GNUNET_SET_STATUS_DONE:
1169     // XXX: check first if any changes to the underlying
1170     // set are still pending
1171     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1172                 "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1173                 session->local_peer_idx,
1174                 debug_str_task_key (&task->key),
1175                 (unsigned int) task->step->finished_tasks,
1176                 (unsigned int) task->step->tasks_len);
1177     if (NULL != output_rfn)
1178     {
1179       rfn_commit (output_rfn, task_other_peer (task));
1180     }
1181     if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1182     {
1183       session->first_size = current_size;
1184     }
1185     finish_task (task);
1186     break;
1187
1188   case GNUNET_SET_STATUS_FAILURE:
1189     // XXX: cleanup
1190     GNUNET_break_op (0);
1191     finish_task (task);
1192     return;
1193
1194   default:
1195     /* not reached */
1196     GNUNET_assert (0);
1197   }
1198 }
1199
1200
1201 #ifdef EVIL
1202
1203 enum EvilnessType
1204 {
1205   EVILNESS_NONE,
1206   EVILNESS_CRAM_ALL,
1207   EVILNESS_CRAM_LEAD,
1208   EVILNESS_CRAM_ECHO,
1209   EVILNESS_SLACK,
1210   EVILNESS_SLACK_A2A,
1211 };
1212
1213 enum EvilnessSubType
1214 {
1215   EVILNESS_SUB_NONE,
1216   EVILNESS_SUB_REPLACEMENT,
1217   EVILNESS_SUB_NO_REPLACEMENT,
1218 };
1219
1220 struct Evilness
1221 {
1222   enum EvilnessType type;
1223   enum EvilnessSubType subtype;
1224   unsigned int num;
1225 };
1226
1227
1228 static int
1229 parse_evilness_cram_subtype (const char *evil_subtype_str, struct
1230                              Evilness *evil)
1231 {
1232   if (0 == strcmp ("replace", evil_subtype_str))
1233   {
1234     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1235   }
1236   else if (0 == strcmp ("noreplace", evil_subtype_str))
1237   {
1238     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1239   }
1240   else
1241   {
1242     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1243                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1244                 evil_subtype_str);
1245     return GNUNET_SYSERR;
1246   }
1247   return GNUNET_OK;
1248 }
1249
1250
1251 static void
1252 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1253 {
1254   char *evil_spec;
1255   char *field;
1256   char *evil_type_str = NULL;
1257   char *evil_subtype_str = NULL;
1258
1259   GNUNET_assert (NULL != evil);
1260
1261   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus",
1262                                                           "EVIL_SPEC",
1263                                                           &evil_spec))
1264   {
1265     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266                 "P%u: no evilness\n",
1267                 session->local_peer_idx);
1268     evil->type = EVILNESS_NONE;
1269     return;
1270   }
1271   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1272               "P%u: got evilness spec\n",
1273               session->local_peer_idx);
1274
1275   for (field = strtok (evil_spec, "/");
1276        NULL != field;
1277        field = strtok (NULL, "/"))
1278   {
1279     unsigned int peer_num;
1280     unsigned int evil_num;
1281     int ret;
1282
1283     evil_type_str = NULL;
1284     evil_subtype_str = NULL;
1285
1286     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str,
1287                   &evil_subtype_str, &evil_num);
1288
1289     if (ret != 4)
1290     {
1291       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1292                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1293                   field,
1294                   ret);
1295       goto not_evil;
1296     }
1297
1298     GNUNET_assert (NULL != evil_type_str);
1299     GNUNET_assert (NULL != evil_subtype_str);
1300
1301     if (peer_num == session->local_peer_idx)
1302     {
1303       if (0 == strcmp ("slack", evil_type_str))
1304       {
1305         evil->type = EVILNESS_SLACK;
1306       }
1307       if (0 == strcmp ("slack-a2a", evil_type_str))
1308       {
1309         evil->type = EVILNESS_SLACK_A2A;
1310       }
1311       else if (0 == strcmp ("cram-all", evil_type_str))
1312       {
1313         evil->type = EVILNESS_CRAM_ALL;
1314         evil->num = evil_num;
1315         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1316           goto not_evil;
1317       }
1318       else if (0 == strcmp ("cram-lead", evil_type_str))
1319       {
1320         evil->type = EVILNESS_CRAM_LEAD;
1321         evil->num = evil_num;
1322         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1323           goto not_evil;
1324       }
1325       else if (0 == strcmp ("cram-echo", evil_type_str))
1326       {
1327         evil->type = EVILNESS_CRAM_ECHO;
1328         evil->num = evil_num;
1329         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1330           goto not_evil;
1331       }
1332       else
1333       {
1334         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1335                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1336                     evil_type_str);
1337         goto not_evil;
1338       }
1339       goto cleanup;
1340     }
1341     /* No GNUNET_free since memory was allocated by libc */
1342     free (evil_type_str);
1343     evil_type_str = NULL;
1344     evil_subtype_str = NULL;
1345   }
1346 not_evil:
1347   evil->type = EVILNESS_NONE;
1348 cleanup:
1349   GNUNET_free (evil_spec);
1350   /* no GNUNET_free_non_null since it wasn't
1351    * allocated with GNUNET_malloc */
1352   if (NULL != evil_type_str)
1353     free (evil_type_str);
1354   if (NULL != evil_subtype_str)
1355     free (evil_subtype_str);
1356 }
1357
1358
1359 #endif
1360
1361
1362 /**
1363  * Commit the appropriate set for a
1364  * task.
1365  */
1366 static void
1367 commit_set (struct ConsensusSession *session,
1368             struct TaskEntry *task)
1369 {
1370   struct SetEntry *set;
1371   struct SetOpCls *setop = &task->cls.setop;
1372
1373   GNUNET_assert (NULL != setop->op);
1374   set = lookup_set (session, &setop->input_set);
1375   GNUNET_assert (NULL != set);
1376
1377   if ((GNUNET_YES == setop->transceive_contested) && (GNUNET_YES ==
1378                                                       set->is_contested))
1379   {
1380     struct GNUNET_SET_Element element;
1381     struct ConsensusElement ce = { 0 };
1382     ce.marker = CONSENSUS_MARKER_CONTESTED;
1383     element.data = &ce;
1384     element.size = sizeof(struct ConsensusElement);
1385     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1386     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1387   }
1388
1389   if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1390   {
1391     struct GNUNET_SET_Element element;
1392     struct ConsensusSizeElement cse = {
1393       .size = 0,
1394       .sender_index = 0
1395     };
1396     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1397     cse.ce.marker = CONSENSUS_MARKER_SIZE;
1398     cse.size = GNUNET_htonll (session->first_size);
1399     cse.sender_index = session->local_peer_idx;
1400     element.data = &cse;
1401     element.size = sizeof(struct ConsensusSizeElement);
1402     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1403     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1404   }
1405
1406 #ifdef EVIL
1407   {
1408     unsigned int i;
1409     struct Evilness evil;
1410
1411     get_evilness (session, &evil);
1412     if (EVILNESS_NONE != evil.type)
1413     {
1414       /* Useful for evaluation */
1415       GNUNET_STATISTICS_set (statistics,
1416                              "is evil",
1417                              1,
1418                              GNUNET_NO);
1419     }
1420     switch (evil.type)
1421     {
1422     case EVILNESS_CRAM_ALL:
1423     case EVILNESS_CRAM_LEAD:
1424     case EVILNESS_CRAM_ECHO:
1425       /* We're not cramming elements in the
1426          all-to-all round, since that would just
1427          add more elements to the result set, but
1428          wouldn't test robustness. */
1429       if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1430       {
1431         GNUNET_SET_commit (setop->op, set->h);
1432         break;
1433       }
1434       if ((EVILNESS_CRAM_LEAD == evil.type) &&
1435           ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) ||
1436            (SET_KIND_CURRENT != set->key.set_kind) ))
1437       {
1438         GNUNET_SET_commit (setop->op, set->h);
1439         break;
1440       }
1441       if ((EVILNESS_CRAM_ECHO == evil.type) && (PHASE_KIND_GRADECAST_ECHO !=
1442                                                 task->key.kind))
1443       {
1444         GNUNET_SET_commit (setop->op, set->h);
1445         break;
1446       }
1447       for (i = 0; i < evil.num; i++)
1448       {
1449         struct GNUNET_SET_Element element;
1450         struct ConsensusStuffedElement se = {
1451           .ce.payload_type = 0,
1452           .ce.marker = 0,
1453         };
1454         element.data = &se;
1455         element.size = sizeof(struct ConsensusStuffedElement);
1456         element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1457
1458         if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1459         {
1460           /* Always generate a new element. */
1461           GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK,
1462                                             &se.rand);
1463         }
1464         else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1465         {
1466           /* Always cram the same elements, derived from counter. */
1467           GNUNET_CRYPTO_hash (&i, sizeof(i), &se.rand);
1468         }
1469         else
1470         {
1471           GNUNET_assert (0);
1472         }
1473         GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1474 #ifdef GNUNET_EXTRA_LOGGING
1475         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1476                     "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1477                     session->local_peer_idx,
1478                     debug_str_element (&element),
1479                     debug_str_set_key (&setop->input_set),
1480                     debug_str_task_key (&task->key));
1481 #endif
1482       }
1483       GNUNET_STATISTICS_update (statistics,
1484                                 "# stuffed elements",
1485                                 evil.num,
1486                                 GNUNET_NO);
1487       GNUNET_SET_commit (setop->op, set->h);
1488       break;
1489
1490     case EVILNESS_SLACK:
1491       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492                   "P%u: evil peer: slacking\n",
1493                   (unsigned int) session->local_peer_idx);
1494
1495     /* Do nothing. */
1496     case EVILNESS_SLACK_A2A:
1497       if ((PHASE_KIND_ALL_TO_ALL_2 == task->key.kind) ||
1498           (PHASE_KIND_ALL_TO_ALL == task->key.kind))
1499       {
1500         struct GNUNET_SET_Handle *empty_set;
1501         empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1502         GNUNET_SET_commit (setop->op, empty_set);
1503         GNUNET_SET_destroy (empty_set);
1504       }
1505       else
1506       {
1507         GNUNET_SET_commit (setop->op, set->h);
1508       }
1509       break;
1510
1511     case EVILNESS_NONE:
1512       GNUNET_SET_commit (setop->op, set->h);
1513       break;
1514     }
1515   }
1516 #else
1517   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1518   {
1519     GNUNET_SET_commit (setop->op, set->h);
1520   }
1521   else
1522   {
1523     /* For our testcases, we don't want the blacklisted
1524        peers to wait. */
1525     GNUNET_SET_operation_cancel (setop->op);
1526     setop->op = NULL;
1527     finish_task (task);
1528   }
1529 #endif
1530 }
1531
1532
1533 static void
1534 put_diff (struct ConsensusSession *session,
1535           struct DiffEntry *diff)
1536 {
1537   struct GNUNET_HashCode hash;
1538
1539   GNUNET_assert (NULL != diff);
1540
1541   GNUNET_CRYPTO_hash (&diff->key, sizeof(struct DiffKey), &hash);
1542   GNUNET_assert (GNUNET_OK ==
1543                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash,
1544                                                     diff,
1545                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1546 }
1547
1548
1549 static void
1550 put_set (struct ConsensusSession *session,
1551          struct SetEntry *set)
1552 {
1553   struct GNUNET_HashCode hash;
1554
1555   GNUNET_assert (NULL != set->h);
1556
1557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1558               "Putting set %s\n",
1559               debug_str_set_key (&set->key));
1560
1561   GNUNET_CRYPTO_hash (&set->key, sizeof(struct SetKey), &hash);
1562   GNUNET_assert (GNUNET_SYSERR !=
1563                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1564                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1565 }
1566
1567
1568 static void
1569 put_rfn (struct ConsensusSession *session,
1570          struct ReferendumEntry *rfn)
1571 {
1572   struct GNUNET_HashCode hash;
1573
1574   GNUNET_CRYPTO_hash (&rfn->key, sizeof(struct RfnKey), &hash);
1575   GNUNET_assert (GNUNET_OK ==
1576                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1577                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1578 }
1579
1580
1581 static void
1582 task_cancel_reconcile (struct TaskEntry *task)
1583 {
1584   /* not implemented yet */
1585   GNUNET_assert (0);
1586 }
1587
1588
1589 static void
1590 apply_diff_to_rfn (struct DiffEntry *diff,
1591                    struct ReferendumEntry *rfn,
1592                    uint16_t voting_peer,
1593                    uint16_t num_peers)
1594 {
1595   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1596   struct DiffElementInfo *di;
1597
1598   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1599
1600   while (GNUNET_YES ==
1601          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1602                                                       NULL,
1603                                                       (const void **) &di))
1604   {
1605     if (di->weight > 0)
1606     {
1607       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1608     }
1609     if (di->weight < 0)
1610     {
1611       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1612     }
1613   }
1614
1615   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1616 }
1617
1618
1619 struct DiffEntry *
1620 diff_create ()
1621 {
1622   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1623
1624   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1625
1626   return d;
1627 }
1628
1629
1630 struct DiffEntry *
1631 diff_compose (struct DiffEntry *diff_1,
1632               struct DiffEntry *diff_2)
1633 {
1634   struct DiffEntry *diff_new;
1635   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1636   struct DiffElementInfo *di;
1637
1638   diff_new = diff_create ();
1639
1640   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1641   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL,
1642                                                                     (const
1643                                                                      void **) &
1644                                                                     di))
1645   {
1646     diff_insert (diff_new, di->weight, di->element);
1647   }
1648   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1649
1650   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1651   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL,
1652                                                                     (const
1653                                                                      void **) &
1654                                                                     di))
1655   {
1656     diff_insert (diff_new, di->weight, di->element);
1657   }
1658   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1659
1660   return diff_new;
1661 }
1662
1663
1664 struct ReferendumEntry *
1665 rfn_create (uint16_t size)
1666 {
1667   struct ReferendumEntry *rfn;
1668
1669   rfn = GNUNET_new (struct ReferendumEntry);
1670   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1671   rfn->peer_commited = GNUNET_new_array (size, int);
1672   rfn->peer_contested = GNUNET_new_array (size, int);
1673   rfn->num_peers = size;
1674
1675   return rfn;
1676 }
1677
1678
1679 #if UNUSED
1680 static void
1681 diff_destroy (struct DiffEntry *diff)
1682 {
1683   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1684   GNUNET_free (diff);
1685 }
1686
1687
1688 #endif
1689
1690
1691 /**
1692  * For a given majority, count what the outcome
1693  * is (add/remove/keep), and give the number
1694  * of peers that voted for this outcome.
1695  */
1696 static void
1697 rfn_majority (const struct ReferendumEntry *rfn,
1698               const struct RfnElementInfo *ri,
1699               uint16_t *ret_majority,
1700               enum ReferendumVote *ret_vote)
1701 {
1702   uint16_t votes_yes = 0;
1703   uint16_t num_commited = 0;
1704   uint16_t i;
1705
1706   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1707               "Computing rfn majority for element %s of rfn {%s}\n",
1708               debug_str_element (ri->element),
1709               debug_str_rfn_key (&rfn->key));
1710
1711   for (i = 0; i < rfn->num_peers; i++)
1712   {
1713     if (GNUNET_NO == rfn->peer_commited[i])
1714       continue;
1715     num_commited++;
1716
1717     if (GNUNET_YES == ri->votes[i])
1718       votes_yes++;
1719   }
1720
1721   if (votes_yes > (num_commited) / 2)
1722   {
1723     *ret_vote = ri->proposal;
1724     *ret_majority = votes_yes;
1725   }
1726   else
1727   {
1728     *ret_vote = VOTE_STAY;
1729     *ret_majority = num_commited - votes_yes;
1730   }
1731 }
1732
1733
1734 struct SetCopyCls
1735 {
1736   struct TaskEntry *task;
1737   struct SetKey dst_set_key;
1738 };
1739
1740
1741 static void
1742 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1743 {
1744   struct SetCopyCls *scc = cls;
1745   struct TaskEntry *task = scc->task;
1746   struct SetKey dst_set_key = scc->dst_set_key;
1747   struct SetEntry *set;
1748   struct SetHandle *sh = GNUNET_new (struct SetHandle);
1749
1750   sh->h = copy;
1751   GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1752                                task->step->session->set_handles_tail,
1753                                sh);
1754
1755   GNUNET_free (scc);
1756   set = GNUNET_new (struct SetEntry);
1757   set->h = copy;
1758   set->key = dst_set_key;
1759   put_set (task->step->session, set);
1760
1761   task->start (task);
1762 }
1763
1764
1765 /**
1766  * Call the start function of the given
1767  * task again after we created a copy of the given set.
1768  */
1769 static void
1770 create_set_copy_for_task (struct TaskEntry *task,
1771                           struct SetKey *src_set_key,
1772                           struct SetKey *dst_set_key)
1773 {
1774   struct SetEntry *src_set;
1775   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1776
1777   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1778               "Copying set {%s} to {%s} for task {%s}\n",
1779               debug_str_set_key (src_set_key),
1780               debug_str_set_key (dst_set_key),
1781               debug_str_task_key (&task->key));
1782
1783   scc->task = task;
1784   scc->dst_set_key = *dst_set_key;
1785   src_set = lookup_set (task->step->session, src_set_key);
1786   GNUNET_assert (NULL != src_set);
1787   GNUNET_SET_copy_lazy (src_set->h,
1788                         set_copy_cb,
1789                         scc);
1790 }
1791
1792
1793 struct SetMutationProgressCls
1794 {
1795   int num_pending;
1796   /**
1797    * Task to finish once all changes are through.
1798    */
1799   struct TaskEntry *task;
1800 };
1801
1802
1803 static void
1804 set_mutation_done (void *cls)
1805 {
1806   struct SetMutationProgressCls *pc = cls;
1807
1808   GNUNET_assert (pc->num_pending > 0);
1809
1810   pc->num_pending--;
1811
1812   if (0 == pc->num_pending)
1813   {
1814     struct TaskEntry *task = pc->task;
1815     GNUNET_free (pc);
1816     finish_task (task);
1817   }
1818 }
1819
1820
1821 static void
1822 try_finish_step_early (struct Step *step)
1823 {
1824   unsigned int i;
1825
1826   if (GNUNET_YES == step->is_running)
1827     return;
1828   if (GNUNET_YES == step->is_finished)
1829     return;
1830   if (GNUNET_NO == step->early_finishable)
1831     return;
1832
1833   step->is_finished = GNUNET_YES;
1834
1835 #ifdef GNUNET_EXTRA_LOGGING
1836   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1837               "Finishing step `%s' early.\n",
1838               step->debug_name);
1839 #endif
1840
1841   for (i = 0; i < step->subordinates_len; i++)
1842   {
1843     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1844     step->subordinates[i]->pending_prereq--;
1845 #ifdef GNUNET_EXTRA_LOGGING
1846     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1847                 "Decreased pending_prereq to %u for step `%s'.\n",
1848                 (unsigned int) step->subordinates[i]->pending_prereq,
1849                 step->subordinates[i]->debug_name);
1850 #endif
1851     try_finish_step_early (step->subordinates[i]);
1852   }
1853
1854   // XXX: maybe schedule as task to avoid recursion?
1855   run_ready_steps (step->session);
1856 }
1857
1858
1859 static void
1860 finish_step (struct Step *step)
1861 {
1862   unsigned int i;
1863
1864   GNUNET_assert (step->finished_tasks == step->tasks_len);
1865   GNUNET_assert (GNUNET_YES == step->is_running);
1866   GNUNET_assert (GNUNET_NO == step->is_finished);
1867
1868 #ifdef GNUNET_EXTRA_LOGGING
1869   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1870               "All tasks of step `%s' with %u subordinates finished.\n",
1871               step->debug_name,
1872               step->subordinates_len);
1873 #endif
1874
1875   for (i = 0; i < step->subordinates_len; i++)
1876   {
1877     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1878     step->subordinates[i]->pending_prereq--;
1879 #ifdef GNUNET_EXTRA_LOGGING
1880     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1881                 "Decreased pending_prereq to %u for step `%s'.\n",
1882                 (unsigned int) step->subordinates[i]->pending_prereq,
1883                 step->subordinates[i]->debug_name);
1884 #endif
1885   }
1886
1887   step->is_finished = GNUNET_YES;
1888
1889   // XXX: maybe schedule as task to avoid recursion?
1890   run_ready_steps (step->session);
1891 }
1892
1893
1894 /**
1895  * Apply the result from one round of gradecasts (i.e. every peer
1896  * should have gradecasted) to the peer's current set.
1897  *
1898  * @param task the task with context information
1899  */
1900 static void
1901 task_start_apply_round (struct TaskEntry *task)
1902 {
1903   struct ConsensusSession *session = task->step->session;
1904   struct SetKey sk_in;
1905   struct SetKey sk_out;
1906   struct RfnKey rk_in;
1907   struct SetEntry *set_out;
1908   struct ReferendumEntry *rfn_in;
1909   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1910   struct RfnElementInfo *ri;
1911   struct SetMutationProgressCls *progress_cls;
1912   uint16_t worst_majority = UINT16_MAX;
1913
1914   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1915   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1916   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1917
1918   set_out = lookup_set (session, &sk_out);
1919   if (NULL == set_out)
1920   {
1921     create_set_copy_for_task (task, &sk_in, &sk_out);
1922     return;
1923   }
1924
1925   rfn_in = lookup_rfn (session, &rk_in);
1926   GNUNET_assert (NULL != rfn_in);
1927
1928   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1929   progress_cls->task = task;
1930
1931   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1932
1933   while (GNUNET_YES ==
1934          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1935                                                       NULL,
1936                                                       (const void **) &ri))
1937   {
1938     uint16_t majority_num;
1939     enum ReferendumVote majority_vote;
1940
1941     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1942
1943     if (worst_majority > majority_num)
1944       worst_majority = majority_num;
1945
1946     switch (majority_vote)
1947     {
1948     case VOTE_ADD:
1949       progress_cls->num_pending++;
1950       GNUNET_assert (GNUNET_OK ==
1951                      GNUNET_SET_add_element (set_out->h,
1952                                              ri->element,
1953                                              &set_mutation_done,
1954                                              progress_cls));
1955       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1956                   "P%u: apply round: adding element %s with %u-majority.\n",
1957                   session->local_peer_idx,
1958                   debug_str_element (ri->element), majority_num);
1959       break;
1960
1961     case VOTE_REMOVE:
1962       progress_cls->num_pending++;
1963       GNUNET_assert (GNUNET_OK ==
1964                      GNUNET_SET_remove_element (set_out->h,
1965                                                 ri->element,
1966                                                 &set_mutation_done,
1967                                                 progress_cls));
1968       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1969                   "P%u: apply round: deleting element %s with %u-majority.\n",
1970                   session->local_peer_idx,
1971                   debug_str_element (ri->element), majority_num);
1972       break;
1973
1974     case VOTE_STAY:
1975       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1976                   "P%u: apply round: keeping element %s with %u-majority.\n",
1977                   session->local_peer_idx,
1978                   debug_str_element (ri->element), majority_num);
1979       // do nothing
1980       break;
1981
1982     default:
1983       GNUNET_assert (0);
1984       break;
1985     }
1986   }
1987
1988   if (0 == progress_cls->num_pending)
1989   {
1990     // call closure right now, no pending ops
1991     GNUNET_free (progress_cls);
1992     finish_task (task);
1993   }
1994
1995   {
1996     uint16_t thresh = (session->num_peers / 3) * 2;
1997
1998     if (worst_majority >= thresh)
1999     {
2000       switch (session->early_stopping)
2001       {
2002       case EARLY_STOPPING_NONE:
2003         session->early_stopping = EARLY_STOPPING_ONE_MORE;
2004         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2005                     "P%u: Stopping early (after one more superround)\n",
2006                     session->local_peer_idx);
2007         break;
2008
2009       case EARLY_STOPPING_ONE_MORE:
2010         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2011                     "P%u: finishing steps due to early finish\n",
2012                     session->local_peer_idx);
2013         session->early_stopping = EARLY_STOPPING_DONE;
2014         {
2015           struct Step *step;
2016           for (step = session->steps_head; NULL != step; step = step->next)
2017             try_finish_step_early (step);
2018         }
2019         break;
2020
2021       case EARLY_STOPPING_DONE:
2022         /* We shouldn't be here anymore after early stopping */
2023         GNUNET_break (0);
2024         break;
2025
2026       default:
2027         GNUNET_assert (0);
2028         break;
2029       }
2030     }
2031     else if (EARLY_STOPPING_NONE != session->early_stopping)
2032     {
2033       // Our assumption about the number of bad peers
2034       // has been broken.
2035       GNUNET_break_op (0);
2036     }
2037     else
2038     {
2039       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2040                   "P%u: NOT finishing early (majority not good enough)\n",
2041                   session->local_peer_idx);
2042     }
2043   }
2044   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2045 }
2046
2047
2048 static void
2049 task_start_grade (struct TaskEntry *task)
2050 {
2051   struct ConsensusSession *session = task->step->session;
2052   struct ReferendumEntry *output_rfn;
2053   struct ReferendumEntry *input_rfn;
2054   struct DiffEntry *input_diff;
2055   struct RfnKey rfn_key;
2056   struct DiffKey diff_key;
2057   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2058   struct RfnElementInfo *ri;
2059   unsigned int gradecast_confidence = 2;
2060
2061   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
2062   output_rfn = lookup_rfn (session, &rfn_key);
2063   if (NULL == output_rfn)
2064   {
2065     output_rfn = rfn_create (session->num_peers);
2066     output_rfn->key = rfn_key;
2067     put_rfn (session, output_rfn);
2068   }
2069
2070   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition,
2071                                 task->key.leader };
2072   input_diff = lookup_diff (session, &diff_key);
2073   GNUNET_assert (NULL != input_diff);
2074
2075   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition,
2076                               task->key.leader };
2077   input_rfn = lookup_rfn (session, &rfn_key);
2078   GNUNET_assert (NULL != input_rfn);
2079
2080   iter = GNUNET_CONTAINER_multihashmap_iterator_create (
2081     input_rfn->rfn_elements);
2082
2083   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader,
2084                      session->num_peers);
2085
2086   while (GNUNET_YES ==
2087          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2088                                                       NULL,
2089                                                       (const void **) &ri))
2090   {
2091     uint16_t majority_num;
2092     enum ReferendumVote majority_vote;
2093
2094     // XXX: we need contested votes and non-contested votes here
2095     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2096
2097     if (majority_num <= session->num_peers / 3)
2098       majority_vote = VOTE_REMOVE;
2099
2100     switch (majority_vote)
2101     {
2102     case VOTE_STAY:
2103       break;
2104
2105     case VOTE_ADD:
2106       rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2107       break;
2108
2109     case VOTE_REMOVE:
2110       rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2111       break;
2112
2113     default:
2114       GNUNET_assert (0);
2115       break;
2116     }
2117   }
2118   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2119
2120   {
2121     uint16_t noncontested;
2122     noncontested = rfn_noncontested (input_rfn);
2123     if (noncontested < (session->num_peers / 3) * 2)
2124     {
2125       gradecast_confidence = GNUNET_MIN (1, gradecast_confidence);
2126     }
2127     if (noncontested < (session->num_peers / 3) + 1)
2128     {
2129       gradecast_confidence = 0;
2130     }
2131   }
2132
2133   if (gradecast_confidence >= 1)
2134     rfn_commit (output_rfn, task->key.leader);
2135
2136   if (gradecast_confidence <= 1)
2137     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2138
2139   finish_task (task);
2140 }
2141
2142
2143 static void
2144 task_start_reconcile (struct TaskEntry *task)
2145 {
2146   struct SetEntry *input;
2147   struct SetOpCls *setop = &task->cls.setop;
2148   struct ConsensusSession *session = task->step->session;
2149
2150   input = lookup_set (session, &setop->input_set);
2151   GNUNET_assert (NULL != input);
2152   GNUNET_assert (NULL != input->h);
2153
2154   /* We create the outputs for the operation here
2155      (rather than in the set operation callback)
2156      because we want something valid in there, even
2157      if the other peer doesn't talk to us */
2158
2159   if (SET_KIND_NONE != setop->output_set.set_kind)
2160   {
2161     /* If we don't have an existing output set,
2162        we clone the input set. */
2163     if (NULL == lookup_set (session, &setop->output_set))
2164     {
2165       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2166       return;
2167     }
2168   }
2169
2170   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2171   {
2172     if (NULL == lookup_rfn (session, &setop->output_rfn))
2173     {
2174       struct ReferendumEntry *rfn;
2175
2176       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2177                   "P%u: output rfn <%s> missing, creating.\n",
2178                   session->local_peer_idx,
2179                   debug_str_rfn_key (&setop->output_rfn));
2180
2181       rfn = rfn_create (session->num_peers);
2182       rfn->key = setop->output_rfn;
2183       put_rfn (session, rfn);
2184     }
2185   }
2186
2187   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2188   {
2189     if (NULL == lookup_diff (session, &setop->output_diff))
2190     {
2191       struct DiffEntry *diff;
2192
2193       diff = diff_create ();
2194       diff->key = setop->output_diff;
2195       put_diff (session, diff);
2196     }
2197   }
2198
2199   if ((task->key.peer1 == session->local_peer_idx) && (task->key.peer2 ==
2200                                                        session->local_peer_idx))
2201   {
2202     /* XXX: mark the corresponding rfn as commited if necessary */
2203     finish_task (task);
2204     return;
2205   }
2206
2207   if (task->key.peer1 == session->local_peer_idx)
2208   {
2209     struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2210
2211     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2212                 "P%u: Looking up set {%s} to run remote union\n",
2213                 session->local_peer_idx,
2214                 debug_str_set_key (&setop->input_set));
2215
2216     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2217     rcm.header.size = htons (sizeof(struct
2218                                     GNUNET_CONSENSUS_RoundContextMessage));
2219
2220     rcm.kind = htons (task->key.kind);
2221     rcm.peer1 = htons (task->key.peer1);
2222     rcm.peer2 = htons (task->key.peer2);
2223     rcm.leader = htons (task->key.leader);
2224     rcm.repetition = htons (task->key.repetition);
2225     rcm.is_contested = htons (0);
2226
2227     GNUNET_assert (NULL == setop->op);
2228     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2229                 "P%u: initiating set op with P%u, our set is %s\n",
2230                 session->local_peer_idx, task->key.peer2, debug_str_set_key (
2231                   &setop->input_set));
2232
2233     struct GNUNET_SET_Option opts[] = {
2234       { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2235       { GNUNET_SET_OPTION_END },
2236     };
2237
2238     // XXX: maybe this should be done while
2239     // setting up tasks alreays?
2240     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2241                                     &session->global_id,
2242                                     &rcm.header,
2243                                     GNUNET_SET_RESULT_SYMMETRIC,
2244                                     opts,
2245                                     set_result_cb,
2246                                     task);
2247
2248     commit_set (session, task);
2249   }
2250   else if (task->key.peer2 == session->local_peer_idx)
2251   {
2252     /* Wait for the other peer to contact us */
2253     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2254                 session->local_peer_idx, task->key.peer1);
2255
2256     if (NULL != setop->op)
2257     {
2258       commit_set (session, task);
2259     }
2260   }
2261   else
2262   {
2263     /* We made an error while constructing the task graph. */
2264     GNUNET_assert (0);
2265   }
2266 }
2267
2268
2269 static void
2270 task_start_eval_echo (struct TaskEntry *task)
2271 {
2272   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2273   struct ReferendumEntry *input_rfn;
2274   struct RfnElementInfo *ri;
2275   struct SetEntry *output_set;
2276   struct SetMutationProgressCls *progress_cls;
2277   struct ConsensusSession *session = task->step->session;
2278   struct SetKey sk_in;
2279   struct SetKey sk_out;
2280   struct RfnKey rk_in;
2281
2282   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition,
2283                             task->key.leader };
2284   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition,
2285                              task->key.leader };
2286   output_set = lookup_set (session, &sk_out);
2287   if (NULL == output_set)
2288   {
2289     create_set_copy_for_task (task, &sk_in, &sk_out);
2290     return;
2291   }
2292
2293
2294   {
2295     // FIXME: should be marked as a shallow copy, so
2296     // we can destroy everything correctly
2297     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2298     last_set->h = output_set->h;
2299     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2300     put_set (session, last_set);
2301   }
2302
2303   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2304               "Evaluating referendum in Task {%s}\n",
2305               debug_str_task_key (&task->key));
2306
2307   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2308   progress_cls->task = task;
2309
2310   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition,
2311                             task->key.leader };
2312   input_rfn = lookup_rfn (session, &rk_in);
2313
2314   GNUNET_assert (NULL != input_rfn);
2315
2316   iter = GNUNET_CONTAINER_multihashmap_iterator_create (
2317     input_rfn->rfn_elements);
2318   GNUNET_assert (NULL != iter);
2319
2320   while (GNUNET_YES ==
2321          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2322                                                       NULL,
2323                                                       (const void **) &ri))
2324   {
2325     enum ReferendumVote majority_vote;
2326     uint16_t majority_num;
2327
2328     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2329
2330     if (majority_num < session->num_peers / 3)
2331     {
2332       /* It is not the case that all nonfaulty peers
2333          echoed the same value.  Since we're doing a set reconciliation, we
2334          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2335          reconciliation as contested.  Other peers might not know that the
2336          leader is faulty, thus we still re-distribute in the confirmation
2337          round. */output_set->is_contested = GNUNET_YES;
2338     }
2339
2340     switch (majority_vote)
2341     {
2342     case VOTE_ADD:
2343       progress_cls->num_pending++;
2344       GNUNET_assert (GNUNET_OK ==
2345                      GNUNET_SET_add_element (output_set->h,
2346                                              ri->element,
2347                                              set_mutation_done,
2348                                              progress_cls));
2349       break;
2350
2351     case VOTE_REMOVE:
2352       progress_cls->num_pending++;
2353       GNUNET_assert (GNUNET_OK ==
2354                      GNUNET_SET_remove_element (output_set->h,
2355                                                 ri->element,
2356                                                 set_mutation_done,
2357                                                 progress_cls));
2358       break;
2359
2360     case VOTE_STAY:
2361       /* Nothing to do. */
2362       break;
2363
2364     default:
2365       /* not reached */
2366       GNUNET_assert (0);
2367     }
2368   }
2369
2370   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2371
2372   if (0 == progress_cls->num_pending)
2373   {
2374     // call closure right now, no pending ops
2375     GNUNET_free (progress_cls);
2376     finish_task (task);
2377   }
2378 }
2379
2380
2381 static void
2382 task_start_finish (struct TaskEntry *task)
2383 {
2384   struct SetEntry *final_set;
2385   struct ConsensusSession *session = task->step->session;
2386
2387   final_set = lookup_set (session, &task->cls.finish.input_set);
2388
2389   GNUNET_assert (NULL != final_set);
2390
2391
2392   GNUNET_SET_iterate (final_set->h,
2393                       send_to_client_iter,
2394                       task);
2395 }
2396
2397
2398 static void
2399 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2400 {
2401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n",
2402               session->local_peer_idx, debug_str_task_key (&task->key));
2403
2404   GNUNET_assert (GNUNET_NO == task->is_started);
2405   GNUNET_assert (GNUNET_NO == task->is_finished);
2406   GNUNET_assert (NULL != task->start);
2407
2408   task->start (task);
2409
2410   task->is_started = GNUNET_YES;
2411 }
2412
2413
2414 /*
2415  * Run all steps of the session that don't any
2416  * more dependencies.
2417  */
2418 static void
2419 run_ready_steps (struct ConsensusSession *session)
2420 {
2421   struct Step *step;
2422
2423   step = session->steps_head;
2424
2425   while (NULL != step)
2426   {
2427     if ((GNUNET_NO == step->is_running) && (0 == step->pending_prereq) &&
2428         (GNUNET_NO == step->is_finished))
2429     {
2430       size_t i;
2431
2432       GNUNET_assert (0 == step->finished_tasks);
2433
2434 #ifdef GNUNET_EXTRA_LOGGING
2435       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2436                   "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2437                   session->local_peer_idx,
2438                   step->debug_name,
2439                   step->round, step->tasks_len, step->subordinates_len);
2440 #endif
2441
2442       step->is_running = GNUNET_YES;
2443       for (i = 0; i < step->tasks_len; i++)
2444         start_task (session, step->tasks[i]);
2445
2446       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2447       if ((step->finished_tasks == step->tasks_len) && (GNUNET_NO ==
2448                                                         step->is_finished))
2449         finish_step (step);
2450
2451       /* Running the next ready steps will be triggered by task completion */
2452       return;
2453     }
2454     step = step->next;
2455   }
2456
2457   return;
2458 }
2459
2460
2461 static void
2462 finish_task (struct TaskEntry *task)
2463 {
2464   GNUNET_assert (GNUNET_NO == task->is_finished);
2465   task->is_finished = GNUNET_YES;
2466
2467   task->step->finished_tasks++;
2468
2469   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2470               "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2471               task->step->session->local_peer_idx,
2472               debug_str_task_key (&task->key),
2473               (unsigned int) task->step->finished_tasks,
2474               (unsigned int) task->step->tasks_len);
2475
2476   if (task->step->finished_tasks == task->step->tasks_len)
2477     finish_step (task->step);
2478 }
2479
2480
2481 /**
2482  * Search peer in the list of peers in session.
2483  *
2484  * @param peer peer to find
2485  * @param session session with peer
2486  * @return index of peer, -1 if peer is not in session
2487  */
2488 static int
2489 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct
2490               ConsensusSession *session)
2491 {
2492   int i;
2493
2494   for (i = 0; i < session->num_peers; i++)
2495     if (0 == GNUNET_memcmp (peer, &session->peers[i]))
2496       return i;
2497   return -1;
2498 }
2499
2500
2501 /**
2502  * Compute a global, (hopefully) unique consensus session id,
2503  * from the local id of the consensus session, and the identities of all participants.
2504  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2505  * exactly the same peers, the global id will be different.
2506  *
2507  * @param session session to generate the global id for
2508  * @param local_session_id local id of the consensus session
2509  */
2510 static void
2511 compute_global_id (struct ConsensusSession *session,
2512                    const struct GNUNET_HashCode *local_session_id)
2513 {
2514   const char *salt = "gnunet-service-consensus/session_id";
2515
2516   GNUNET_assert (GNUNET_YES ==
2517                  GNUNET_CRYPTO_kdf (&session->global_id,
2518                                     sizeof(struct GNUNET_HashCode),
2519                                     salt,
2520                                     strlen (salt),
2521                                     session->peers,
2522                                     session->num_peers * sizeof(struct
2523                                                                 GNUNET_PeerIdentity),
2524                                     local_session_id,
2525                                     sizeof(struct GNUNET_HashCode),
2526                                     NULL));
2527 }
2528
2529
2530 /**
2531  * Compare two peer identities.
2532  *
2533  * @param h1 some peer identity
2534  * @param h2 some peer identity
2535  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2536  */
2537 static int
2538 peer_id_cmp (const void *h1, const void *h2)
2539 {
2540   return memcmp (h1, h2, sizeof(struct GNUNET_PeerIdentity));
2541 }
2542
2543
2544 /**
2545  * Create the sorted list of peers for the session,
2546  * add the local peer if not in the join message.
2547  *
2548  * @param session session to initialize
2549  * @param join_msg join message with the list of peers participating at the end
2550  */
2551 static void
2552 initialize_session_peer_list (struct ConsensusSession *session,
2553                               const struct
2554                               GNUNET_CONSENSUS_JoinMessage *join_msg)
2555 {
2556   const struct GNUNET_PeerIdentity *msg_peers
2557     = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2558   int local_peer_in_list;
2559
2560   session->num_peers = ntohl (join_msg->num_peers);
2561
2562   /* Peers in the join message, may or may not include the local peer,
2563      Add it if it is missing. */
2564   local_peer_in_list = GNUNET_NO;
2565   for (unsigned int i = 0; i < session->num_peers; i++)
2566   {
2567     if (0 == GNUNET_memcmp (&msg_peers[i],
2568                             &my_peer))
2569     {
2570       local_peer_in_list = GNUNET_YES;
2571       break;
2572     }
2573   }
2574   if (GNUNET_NO == local_peer_in_list)
2575     session->num_peers++;
2576
2577   session->peers = GNUNET_new_array (session->num_peers,
2578                                      struct GNUNET_PeerIdentity);
2579   if (GNUNET_NO == local_peer_in_list)
2580     session->peers[session->num_peers - 1] = my_peer;
2581
2582   GNUNET_memcpy (session->peers,
2583                  msg_peers,
2584                  ntohl (join_msg->num_peers) * sizeof(struct
2585                                                       GNUNET_PeerIdentity));
2586   qsort (session->peers,
2587          session->num_peers,
2588          sizeof(struct GNUNET_PeerIdentity),
2589          &peer_id_cmp);
2590 }
2591
2592
2593 static struct TaskEntry *
2594 lookup_task (struct ConsensusSession *session,
2595              struct TaskKey *key)
2596 {
2597   struct GNUNET_HashCode hash;
2598
2599
2600   GNUNET_CRYPTO_hash (key, sizeof(struct TaskKey), &hash);
2601   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2602               GNUNET_h2s (&hash));
2603   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2604 }
2605
2606
2607 /**
2608  * Called when another peer wants to do a set operation with the
2609  * local peer.
2610  *
2611  * @param cls closure
2612  * @param other_peer the other peer
2613  * @param context_msg message with application specific information from
2614  *        the other peer
2615  * @param request request from the other peer, use GNUNET_SET_accept
2616  *        to accept it, otherwise the request will be refused
2617  *        Note that we don't use a return value here, as it is also
2618  *        necessary to specify the set we want to do the operation with,
2619  *        whith sometimes can be derived from the context message.
2620  *        Also necessary to specify the timeout.
2621  */
2622 static void
2623 set_listen_cb (void *cls,
2624                const struct GNUNET_PeerIdentity *other_peer,
2625                const struct GNUNET_MessageHeader *context_msg,
2626                struct GNUNET_SET_Request *request)
2627 {
2628   struct ConsensusSession *session = cls;
2629   struct TaskKey tk;
2630   struct TaskEntry *task;
2631   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2632
2633   if (NULL == context_msg)
2634   {
2635     GNUNET_break_op (0);
2636     return;
2637   }
2638
2639   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (
2640         context_msg->type))
2641   {
2642     GNUNET_break_op (0);
2643     return;
2644   }
2645
2646   if (sizeof(struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (
2647         context_msg->size))
2648   {
2649     GNUNET_break_op (0);
2650     return;
2651   }
2652
2653   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2654
2655   tk = ((struct TaskKey) {
2656     .kind = ntohs (cm->kind),
2657     .peer1 = ntohs (cm->peer1),
2658     .peer2 = ntohs (cm->peer2),
2659     .repetition = ntohs (cm->repetition),
2660     .leader = ntohs (cm->leader),
2661   });
2662
2663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2664               session->local_peer_idx, debug_str_task_key (&tk));
2665
2666   task = lookup_task (session, &tk);
2667
2668   if (NULL == task)
2669   {
2670     GNUNET_break_op (0);
2671     return;
2672   }
2673
2674   if (GNUNET_YES == task->is_finished)
2675   {
2676     GNUNET_break_op (0);
2677     return;
2678   }
2679
2680   if (task->key.peer2 != session->local_peer_idx)
2681   {
2682     /* We're being asked, so we must be thne 2nd peer. */
2683     GNUNET_break_op (0);
2684     return;
2685   }
2686
2687   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2688                     (task->key.peer2 == session->local_peer_idx)));
2689
2690   struct GNUNET_SET_Option opts[] = {
2691     { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2692     { GNUNET_SET_OPTION_END },
2693   };
2694
2695   task->cls.setop.op = GNUNET_SET_accept (request,
2696                                           GNUNET_SET_RESULT_SYMMETRIC,
2697                                           opts,
2698                                           set_result_cb,
2699                                           task);
2700
2701   /* If the task hasn't been started yet,
2702      we wait for that until we commit. */
2703
2704   if (GNUNET_YES == task->is_started)
2705   {
2706     commit_set (session, task);
2707   }
2708 }
2709
2710
2711 static void
2712 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2713           struct TaskEntry *t)
2714 {
2715   struct GNUNET_HashCode round_hash;
2716   struct Step *s;
2717
2718   GNUNET_assert (NULL != t->step);
2719
2720   t = GNUNET_memdup (t, sizeof(struct TaskEntry));
2721
2722   s = t->step;
2723
2724   if (s->tasks_len == s->tasks_cap)
2725   {
2726     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2727     GNUNET_array_grow (s->tasks,
2728                        s->tasks_cap,
2729                        target_size);
2730   }
2731
2732 #ifdef GNUNET_EXTRA_LOGGING
2733   GNUNET_assert (NULL != s->debug_name);
2734   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2735               debug_str_task_key (&t->key),
2736               s->debug_name);
2737 #endif
2738
2739   s->tasks[s->tasks_len] = t;
2740   s->tasks_len++;
2741
2742   GNUNET_CRYPTO_hash (&t->key, sizeof(struct TaskKey), &round_hash);
2743   GNUNET_assert (GNUNET_OK ==
2744                  GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2745                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2746 }
2747
2748
2749 static void
2750 install_step_timeouts (struct ConsensusSession *session)
2751 {
2752   /* Given the fully constructed task graph
2753      with rounds for tasks, we can give the tasks timeouts. */
2754
2755   // unsigned int max_round;
2756
2757   /* XXX: implement! */
2758 }
2759
2760
2761 /*
2762  * Arrange two peers in some canonical order.
2763  */
2764 static void
2765 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2766 {
2767   uint16_t a;
2768   uint16_t b;
2769
2770   GNUNET_assert (*p1 < n);
2771   GNUNET_assert (*p2 < n);
2772
2773   if (*p1 < *p2)
2774   {
2775     a = *p1;
2776     b = *p2;
2777   }
2778   else
2779   {
2780     a = *p2;
2781     b = *p1;
2782   }
2783
2784   /* For uniformly random *p1, *p2,
2785      this condition is true with 50% chance */
2786   if (((b - a) + n) % n <= n / 2)
2787   {
2788     *p1 = a;
2789     *p2 = b;
2790   }
2791   else
2792   {
2793     *p1 = b;
2794     *p2 = a;
2795   }
2796 }
2797
2798
2799 /**
2800  * Record @a dep as a dependency of @a step.
2801  */
2802 static void
2803 step_depend_on (struct Step *step, struct Step *dep)
2804 {
2805   /* We're not checking for cyclic dependencies,
2806      but this is a cheap sanity check. */
2807   GNUNET_assert (step != dep);
2808   GNUNET_assert (NULL != step);
2809   GNUNET_assert (NULL != dep);
2810   GNUNET_assert (dep->round <= step->round);
2811
2812 #ifdef GNUNET_EXTRA_LOGGING
2813   /* Make sure we have complete debugging information.
2814      Also checks that we don't screw up too badly
2815      constructing the task graph. */
2816   GNUNET_assert (NULL != step->debug_name);
2817   GNUNET_assert (NULL != dep->debug_name);
2818   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2819               "Making step `%s' depend on `%s'\n",
2820               step->debug_name,
2821               dep->debug_name);
2822 #endif
2823
2824   if (dep->subordinates_cap == dep->subordinates_len)
2825   {
2826     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2827     GNUNET_array_grow (dep->subordinates,
2828                        dep->subordinates_cap,
2829                        target_size);
2830   }
2831
2832   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2833
2834   dep->subordinates[dep->subordinates_len] = step;
2835   dep->subordinates_len++;
2836
2837   step->pending_prereq++;
2838 }
2839
2840
2841 static struct Step *
2842 create_step (struct ConsensusSession *session, int round, int early_finishable)
2843 {
2844   struct Step *step;
2845
2846   step = GNUNET_new (struct Step);
2847   step->session = session;
2848   step->round = round;
2849   step->early_finishable = early_finishable;
2850   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2851                                     session->steps_tail,
2852                                     step);
2853   return step;
2854 }
2855
2856
2857 /**
2858  * Construct the task graph for a single
2859  * gradecast.
2860  */
2861 static void
2862 construct_task_graph_gradecast (struct ConsensusSession *session,
2863                                 uint16_t rep,
2864                                 uint16_t lead,
2865                                 struct Step *step_before,
2866                                 struct Step *step_after)
2867 {
2868   uint16_t n = session->num_peers;
2869   uint16_t me = session->local_peer_idx;
2870
2871   uint16_t p1;
2872   uint16_t p2;
2873
2874   /* The task we're currently setting up. */
2875   struct TaskEntry task;
2876
2877   struct Step *step;
2878   struct Step *prev_step;
2879
2880   uint16_t round;
2881
2882   unsigned int k;
2883
2884   round = step_before->round + 1;
2885
2886   /* gcast step 1: leader disseminates */
2887
2888   step = create_step (session, round, GNUNET_YES);
2889
2890 #ifdef GNUNET_EXTRA_LOGGING
2891   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead,
2892                    rep);
2893 #endif
2894   step_depend_on (step, step_before);
2895
2896   if (lead == me)
2897   {
2898     for (k = 0; k < n; k++)
2899     {
2900       if (k == me)
2901         continue;
2902       p1 = me;
2903       p2 = k;
2904       arrange_peers (&p1, &p2, n);
2905       task = ((struct TaskEntry) {
2906         .step = step,
2907         .start = task_start_reconcile,
2908         .cancel = task_cancel_reconcile,
2909         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep,
2910                                   me },
2911       });
2912       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2913       put_task (session->taskmap, &task);
2914     }
2915     /* We run this task to make sure that the leader
2916        has the stored the SET_KIND_LEADER set of himself,
2917        so it can participate in the rest of the gradecast
2918        without the code having to handle any special cases. */
2919     task = ((struct TaskEntry) {
2920       .step = step,
2921       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2922       .start = task_start_reconcile,
2923       .cancel = task_cancel_reconcile,
2924     });
2925     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2926     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2927                                                   me };
2928     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL,
2929                                                     rep, me };
2930     put_task (session->taskmap, &task);
2931   }
2932   else
2933   {
2934     p1 = me;
2935     p2 = lead;
2936     arrange_peers (&p1, &p2, n);
2937     task = ((struct TaskEntry) {
2938       .step = step,
2939       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep,
2940                                 lead },
2941       .start = task_start_reconcile,
2942       .cancel = task_cancel_reconcile,
2943     });
2944     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2945     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2946                                                   lead };
2947     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL,
2948                                                     rep, lead };
2949     put_task (session->taskmap, &task);
2950   }
2951
2952   /* gcast phase 2: echo */
2953   prev_step = step;
2954   round += 1;
2955   step = create_step (session, round, GNUNET_YES);
2956 #ifdef GNUNET_EXTRA_LOGGING
2957   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2958 #endif
2959   step_depend_on (step, prev_step);
2960
2961   for (k = 0; k < n; k++)
2962   {
2963     p1 = k;
2964     p2 = me;
2965     arrange_peers (&p1, &p2, n);
2966     task = ((struct TaskEntry) {
2967       .step = step,
2968       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2969       .start = task_start_reconcile,
2970       .cancel = task_cancel_reconcile,
2971     });
2972     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2973                                                  lead };
2974     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2975     put_task (session->taskmap, &task);
2976   }
2977
2978   prev_step = step;
2979   /* Same round, since step only has local tasks */
2980   step = create_step (session, round, GNUNET_YES);
2981 #ifdef GNUNET_EXTRA_LOGGING
2982   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2983 #endif
2984   step_depend_on (step, prev_step);
2985
2986   arrange_peers (&p1, &p2, n);
2987   task = ((struct TaskEntry) {
2988     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep,
2989                               lead },
2990     .step = step,
2991     .start = task_start_eval_echo
2992   });
2993   put_task (session->taskmap, &task);
2994
2995   prev_step = step;
2996   round += 1;
2997   step = create_step (session, round, GNUNET_YES);
2998 #ifdef GNUNET_EXTRA_LOGGING
2999   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
3000 #endif
3001   step_depend_on (step, prev_step);
3002
3003   /* gcast phase 3: confirmation and grading */
3004   for (k = 0; k < n; k++)
3005   {
3006     p1 = k;
3007     p2 = me;
3008     arrange_peers (&p1, &p2, n);
3009     task = ((struct TaskEntry) {
3010       .step = step,
3011       .start = task_start_reconcile,
3012       .cancel = task_cancel_reconcile,
3013       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep,
3014                                 lead },
3015     });
3016     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep,
3017                                                  lead };
3018     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
3019     /* If there was at least one element in the echo round that was
3020        contested (i.e. it had no n-t majority), then we let the other peers
3021        know, and other peers let us know.  The contested flag for each peer is
3022        stored in the rfn. */
3023     task.cls.setop.transceive_contested = GNUNET_YES;
3024     put_task (session->taskmap, &task);
3025   }
3026
3027   prev_step = step;
3028   /* Same round, since step only has local tasks */
3029   step = create_step (session, round, GNUNET_YES);
3030 #ifdef GNUNET_EXTRA_LOGGING
3031   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead,
3032                    rep);
3033 #endif
3034   step_depend_on (step, prev_step);
3035
3036   task = ((struct TaskEntry) {
3037     .step = step,
3038     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep,
3039                               lead },
3040     .start = task_start_grade,
3041   });
3042   put_task (session->taskmap, &task);
3043
3044   step_depend_on (step_after, step);
3045 }
3046
3047
3048 static void
3049 construct_task_graph (struct ConsensusSession *session)
3050 {
3051   uint16_t n = session->num_peers;
3052   uint16_t t = n / 3;
3053
3054   uint16_t me = session->local_peer_idx;
3055
3056   /* The task we're currently setting up. */
3057   struct TaskEntry task;
3058
3059   /* Current leader */
3060   unsigned int lead;
3061
3062   struct Step *step;
3063   struct Step *prev_step;
3064
3065   unsigned int round = 0;
3066
3067   unsigned int i;
3068
3069   // XXX: introduce first step,
3070   // where we wait for all insert acks
3071   // from the set service
3072
3073   /* faster but brittle all-to-all */
3074
3075   // XXX: Not implemented yet
3076
3077   /* all-to-all step */
3078
3079   step = create_step (session, round, GNUNET_NO);
3080
3081 #ifdef GNUNET_EXTRA_LOGGING
3082   step->debug_name = GNUNET_strdup ("all to all");
3083 #endif
3084
3085   for (i = 0; i < n; i++)
3086   {
3087     uint16_t p1;
3088     uint16_t p2;
3089
3090     p1 = me;
3091     p2 = i;
3092     arrange_peers (&p1, &p2, n);
3093     task = ((struct TaskEntry) {
3094       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
3095       .step = step,
3096       .start = task_start_reconcile,
3097       .cancel = task_cancel_reconcile,
3098     });
3099     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3100     task.cls.setop.output_set = task.cls.setop.input_set;
3101     task.cls.setop.do_not_remove = GNUNET_YES;
3102     put_task (session->taskmap, &task);
3103   }
3104
3105   round += 1;
3106   prev_step = step;
3107   step = create_step (session, round, GNUNET_NO);;
3108 #ifdef GNUNET_EXTRA_LOGGING
3109   step->debug_name = GNUNET_strdup ("all to all 2");
3110 #endif
3111   step_depend_on (step, prev_step);
3112
3113
3114   for (i = 0; i < n; i++)
3115   {
3116     uint16_t p1;
3117     uint16_t p2;
3118
3119     p1 = me;
3120     p2 = i;
3121     arrange_peers (&p1, &p2, n);
3122     task = ((struct TaskEntry) {
3123       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3124       .step = step,
3125       .start = task_start_reconcile,
3126       .cancel = task_cancel_reconcile,
3127     });
3128     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3129     task.cls.setop.output_set = task.cls.setop.input_set;
3130     task.cls.setop.do_not_remove = GNUNET_YES;
3131     put_task (session->taskmap, &task);
3132   }
3133
3134   round += 1;
3135
3136   prev_step = step;
3137   step = NULL;
3138
3139
3140   /* Byzantine union */
3141
3142   /* sequential repetitions of the gradecasts */
3143   for (i = 0; i < t + 1; i++)
3144   {
3145     struct Step *step_rep_start;
3146     struct Step *step_rep_end;
3147
3148     /* Every repetition is in a separate round. */
3149     step_rep_start = create_step (session, round, GNUNET_YES);
3150 #ifdef GNUNET_EXTRA_LOGGING
3151     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3152 #endif
3153
3154     step_depend_on (step_rep_start, prev_step);
3155
3156     /* gradecast has three rounds */
3157     round += 3;
3158     step_rep_end = create_step (session, round, GNUNET_YES);
3159 #ifdef GNUNET_EXTRA_LOGGING
3160     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3161 #endif
3162
3163     /* parallel gradecasts */
3164     for (lead = 0; lead < n; lead++)
3165       construct_task_graph_gradecast (session, i, lead, step_rep_start,
3166                                       step_rep_end);
3167
3168     task = ((struct TaskEntry) {
3169       .step = step_rep_end,
3170       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1 },
3171       .start = task_start_apply_round,
3172     });
3173     put_task (session->taskmap, &task);
3174
3175     prev_step = step_rep_end;
3176   }
3177
3178   /* There is no next gradecast round, thus the final
3179      start step is the overall end step of the gradecasts */
3180   round += 1;
3181   step = create_step (session, round, GNUNET_NO);
3182 #ifdef GNUNET_EXTRA_LOGGING
3183   GNUNET_asprintf (&step->debug_name, "finish");
3184 #endif
3185   step_depend_on (step, prev_step);
3186
3187   task = ((struct TaskEntry) {
3188     .step = step,
3189     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3190     .start = task_start_finish,
3191   });
3192   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3193
3194   put_task (session->taskmap, &task);
3195 }
3196
3197
3198 /**
3199  * Check join message.
3200  *
3201  * @param cls session of client that sent the message
3202  * @param m message sent by the client
3203  * @return #GNUNET_OK if @a m is well-formed
3204  */
3205 static int
3206 check_client_join (void *cls,
3207                    const struct GNUNET_CONSENSUS_JoinMessage *m)
3208 {
3209   uint32_t listed_peers = ntohl (m->num_peers);
3210
3211   if ((ntohs (m->header.size) - sizeof(*m)) !=
3212       listed_peers * sizeof(struct GNUNET_PeerIdentity))
3213   {
3214     GNUNET_break (0);
3215     return GNUNET_SYSERR;
3216   }
3217   return GNUNET_OK;
3218 }
3219
3220
3221 /**
3222  * Called when a client wants to join a consensus session.
3223  *
3224  * @param cls session of client that sent the message
3225  * @param m message sent by the client
3226  */
3227 static void
3228 handle_client_join (void *cls,
3229                     const struct GNUNET_CONSENSUS_JoinMessage *m)
3230 {
3231   struct ConsensusSession *session = cls;
3232   struct ConsensusSession *other_session;
3233
3234   initialize_session_peer_list (session,
3235                                 m);
3236   compute_global_id (session,
3237                      &m->session_id);
3238
3239   /* Check if some local client already owns the session.
3240      It is only legal to have a session with an existing global id
3241      if all other sessions with this global id are finished.*/
3242   for (other_session = sessions_head;
3243        NULL != other_session;
3244        other_session = other_session->next)
3245   {
3246     if ((other_session != session) &&
3247         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3248                                       &other_session->global_id)))
3249       break;
3250   }
3251
3252   session->conclude_deadline
3253     = GNUNET_TIME_absolute_ntoh (m->deadline);
3254   session->conclude_start
3255     = GNUNET_TIME_absolute_ntoh (m->start);
3256   session->local_peer_idx = get_peer_idx (&my_peer,
3257                                           session);
3258   GNUNET_assert (-1 != session->local_peer_idx);
3259
3260   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3261               "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3262               GNUNET_h2s (&m->session_id),
3263               session->num_peers,
3264               session->local_peer_idx,
3265               GNUNET_STRINGS_relative_time_to_string
3266                 (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3267                                                       session->conclude_deadline),
3268                 GNUNET_YES));
3269
3270   session->set_listener
3271     = GNUNET_SET_listen (cfg,
3272                          GNUNET_SET_OPERATION_UNION,
3273                          &session->global_id,
3274                          &set_listen_cb,
3275                          session);
3276
3277   session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3278                                                           GNUNET_NO);
3279   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3280                                                            GNUNET_NO);
3281   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3282                                                            GNUNET_NO);
3283   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3284                                                           GNUNET_NO);
3285
3286   {
3287     struct SetEntry *client_set;
3288
3289     client_set = GNUNET_new (struct SetEntry);
3290     client_set->h = GNUNET_SET_create (cfg,
3291                                        GNUNET_SET_OPERATION_UNION);
3292     struct SetHandle *sh = GNUNET_new (struct SetHandle);
3293     sh->h = client_set->h;
3294     GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3295                                  session->set_handles_tail,
3296                                  sh);
3297     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3298     put_set (session,
3299              client_set);
3300   }
3301
3302   session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3303                                                  int);
3304
3305   /* Just construct the task graph,
3306      but don't run anything until the client calls conclude. */
3307   construct_task_graph (session);
3308   GNUNET_SERVICE_client_continue (session->client);
3309 }
3310
3311
3312 static void
3313 client_insert_done (void *cls)
3314 {
3315   // FIXME: implement
3316 }
3317
3318
3319 /**
3320  * Called when a client performs an insert operation.
3321  *
3322  * @param cls client handle
3323  * @param msg message sent by the client
3324  * @return #GNUNET_OK (always well-formed)
3325  */
3326 static int
3327 check_client_insert (void *cls,
3328                      const struct GNUNET_CONSENSUS_ElementMessage *msg)
3329 {
3330   return GNUNET_OK;
3331 }
3332
3333
3334 /**
3335  * Called when a client performs an insert operation.
3336  *
3337  * @param cls client handle
3338  * @param msg message sent by the client
3339  */
3340 static void
3341 handle_client_insert (void *cls,
3342                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3343 {
3344   struct ConsensusSession *session = cls;
3345   ssize_t element_size;
3346   struct GNUNET_SET_Handle *initial_set;
3347   struct ConsensusElement *ce;
3348
3349   if (GNUNET_YES == session->conclude_started)
3350   {
3351     GNUNET_break (0);
3352     GNUNET_SERVICE_client_drop (session->client);
3353     return;
3354   }
3355
3356   element_size = ntohs (msg->header.size) - sizeof(struct
3357                                                    GNUNET_CONSENSUS_ElementMessage);
3358   ce = GNUNET_malloc (sizeof(struct ConsensusElement) + element_size);
3359   GNUNET_memcpy (&ce[1], &msg[1], element_size);
3360   ce->payload_type = msg->element_type;
3361
3362   struct GNUNET_SET_Element element = {
3363     .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3364     .size = sizeof(struct ConsensusElement) + element_size,
3365     .data = ce,
3366   };
3367
3368   {
3369     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3370     struct SetEntry *entry;
3371
3372     entry = lookup_set (session,
3373                         &key);
3374     GNUNET_assert (NULL != entry);
3375     initial_set = entry->h;
3376   }
3377
3378   session->num_client_insert_pending++;
3379   GNUNET_SET_add_element (initial_set,
3380                           &element,
3381                           &client_insert_done,
3382                           session);
3383
3384 #ifdef GNUNET_EXTRA_LOGGING
3385   {
3386     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3387                 "P%u: element %s added\n",
3388                 session->local_peer_idx,
3389                 debug_str_element (&element));
3390   }
3391 #endif
3392   GNUNET_free (ce);
3393   GNUNET_SERVICE_client_continue (session->client);
3394 }
3395
3396
3397 /**
3398  * Called when a client performs the conclude operation.
3399  *
3400  * @param cls client handle
3401  * @param message message sent by the client
3402  */
3403 static void
3404 handle_client_conclude (void *cls,
3405                         const struct GNUNET_MessageHeader *message)
3406 {
3407   struct ConsensusSession *session = cls;
3408
3409   if (GNUNET_YES == session->conclude_started)
3410   {
3411     /* conclude started twice */
3412     GNUNET_break (0);
3413     GNUNET_SERVICE_client_drop (session->client);
3414     return;
3415   }
3416   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3417               "conclude requested\n");
3418   session->conclude_started = GNUNET_YES;
3419   install_step_timeouts (session);
3420   run_ready_steps (session);
3421   GNUNET_SERVICE_client_continue (session->client);
3422 }
3423
3424
3425 /**
3426  * Called to clean up, after a shutdown has been requested.
3427  *
3428  * @param cls closure
3429  */
3430 static void
3431 shutdown_task (void *cls)
3432 {
3433   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3434               "shutting down\n");
3435   GNUNET_STATISTICS_destroy (statistics,
3436                              GNUNET_NO);
3437   statistics = NULL;
3438 }
3439
3440
3441 /**
3442  * Start processing consensus requests.
3443  *
3444  * @param cls closure
3445  * @param c configuration to use
3446  * @param service the initialized service
3447  */
3448 static void
3449 run (void *cls,
3450      const struct GNUNET_CONFIGURATION_Handle *c,
3451      struct GNUNET_SERVICE_Handle *service)
3452 {
3453   cfg = c;
3454   if (GNUNET_OK !=
3455       GNUNET_CRYPTO_get_peer_identity (cfg,
3456                                        &my_peer))
3457   {
3458     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3459                 "Could not retrieve host identity\n");
3460     GNUNET_SCHEDULER_shutdown ();
3461     return;
3462   }
3463   statistics = GNUNET_STATISTICS_create ("consensus",
3464                                          cfg);
3465   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3466                                  NULL);
3467 }
3468
3469
3470 /**
3471  * Callback called when a client connects to the service.
3472  *
3473  * @param cls closure for the service
3474  * @param c the new client that connected to the service
3475  * @param mq the message queue used to send messages to the client
3476  * @return @a c
3477  */
3478 static void *
3479 client_connect_cb (void *cls,
3480                    struct GNUNET_SERVICE_Client *c,
3481                    struct GNUNET_MQ_Handle *mq)
3482 {
3483   struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3484
3485   session->client = c;
3486   session->client_mq = mq;
3487   GNUNET_CONTAINER_DLL_insert (sessions_head,
3488                                sessions_tail,
3489                                session);
3490   return session;
3491 }
3492
3493
3494 /**
3495  * Callback called when a client disconnected from the service
3496  *
3497  * @param cls closure for the service
3498  * @param c the client that disconnected
3499  * @param internal_cls should be equal to @a c
3500  */
3501 static void
3502 client_disconnect_cb (void *cls,
3503                       struct GNUNET_SERVICE_Client *c,
3504                       void *internal_cls)
3505 {
3506   struct ConsensusSession *session = internal_cls;
3507
3508   if (NULL != session->set_listener)
3509   {
3510     GNUNET_SET_listen_cancel (session->set_listener);
3511     session->set_listener = NULL;
3512   }
3513   GNUNET_CONTAINER_DLL_remove (sessions_head,
3514                                sessions_tail,
3515                                session);
3516
3517   while (session->set_handles_head)
3518   {
3519     struct SetHandle *sh = session->set_handles_head;
3520     session->set_handles_head = sh->next;
3521     GNUNET_SET_destroy (sh->h);
3522     GNUNET_free (sh);
3523   }
3524   GNUNET_free (session);
3525 }
3526
3527
3528 /**
3529  * Define "main" method using service macro.
3530  */
3531 GNUNET_SERVICE_MAIN
3532   ("consensus",
3533   GNUNET_SERVICE_OPTION_NONE,
3534   &run,
3535   &client_connect_cb,
3536   &client_disconnect_cb,
3537   NULL,
3538   GNUNET_MQ_hd_fixed_size (client_conclude,
3539                            GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3540                            struct GNUNET_MessageHeader,
3541                            NULL),
3542   GNUNET_MQ_hd_var_size (client_insert,
3543                          GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3544                          struct GNUNET_CONSENSUS_ElementMessage,
3545                          NULL),
3546   GNUNET_MQ_hd_var_size (client_join,
3547                          GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3548                          struct GNUNET_CONSENSUS_JoinMessage,
3549                          NULL),
3550   GNUNET_MQ_handler_end ());
3551
3552 /* end of gnunet-service-consensus.c */