indentation, comment and style fixes, no semantic changes
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_consensus_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
37
38
39 enum ReferendumVote
40 {
41   /**
42    * Vote that nothing should change.
43    * This option is never voted explicitly.
44    */
45   VOTE_STAY = 0,
46   /**
47    * Vote that an element should be added.
48    */
49   VOTE_ADD = 1,
50   /**
51    * Vote that an element should be removed.
52    */
53   VOTE_REMOVE = 2,
54 };
55
56
57 enum EarlyStoppingPhase
58 {
59   EARLY_STOPPING_NONE = 0,
60   EARLY_STOPPING_ONE_MORE = 1,
61   EARLY_STOPPING_DONE = 2,
62 };
63
64
65 GNUNET_NETWORK_STRUCT_BEGIN
66
67 /**
68  * Tuple of integers that together
69  * identify a task uniquely.
70  */
71 struct TaskKey {
72   /**
73    * A value from 'enum PhaseKind'.
74    */
75   uint16_t kind GNUNET_PACKED;
76
77   /**
78    * Number of the first peer
79    * in canonical order.
80    */
81   int16_t peer1 GNUNET_PACKED;
82
83   /**
84    * Number of the second peer in canonical order.
85    */
86   int16_t peer2 GNUNET_PACKED;
87
88   /**
89    * Repetition of the gradecast phase.
90    */
91   int16_t repetition GNUNET_PACKED;
92
93   /**
94    * Leader in the gradecast phase.
95    *
96    * Can be different from both peer1 and peer2.
97    */
98   int16_t leader GNUNET_PACKED;
99 };
100
101
102
103 struct SetKey
104 {
105   int set_kind GNUNET_PACKED;
106   int k1 GNUNET_PACKED;
107   int k2 GNUNET_PACKED;
108 };
109
110
111 struct SetEntry
112 {
113   struct SetKey key;
114   struct GNUNET_SET_Handle *h;
115   /**
116    * GNUNET_YES if the set resulted
117    * from applying a referendum with contested
118    * elements.
119    */
120   int is_contested;
121 };
122
123
124 struct DiffKey
125 {
126   int diff_kind GNUNET_PACKED;
127   int k1 GNUNET_PACKED;
128   int k2 GNUNET_PACKED;
129 };
130
131 struct RfnKey
132 {
133   int rfn_kind GNUNET_PACKED;
134   int k1 GNUNET_PACKED;
135   int k2 GNUNET_PACKED;
136 };
137
138
139 GNUNET_NETWORK_STRUCT_END
140
141 enum PhaseKind
142 {
143   PHASE_KIND_ALL_TO_ALL,
144   PHASE_KIND_ALL_TO_ALL_2,
145   PHASE_KIND_GRADECAST_LEADER,
146   PHASE_KIND_GRADECAST_ECHO,
147   PHASE_KIND_GRADECAST_ECHO_GRADE,
148   PHASE_KIND_GRADECAST_CONFIRM,
149   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
150   /**
151    * Apply a repetition of the all-to-all
152    * gradecast to the current set.
153    */
154   PHASE_KIND_APPLY_REP,
155   PHASE_KIND_FINISH,
156 };
157
158
159 enum SetKind
160 {
161   SET_KIND_NONE = 0,
162   SET_KIND_CURRENT,
163   /**
164    * Last result set from a gradecast
165    */
166   SET_KIND_LAST_GRADECAST,
167   SET_KIND_LEADER_PROPOSAL,
168   SET_KIND_ECHO_RESULT,
169 };
170
171 enum DiffKind
172 {
173   DIFF_KIND_NONE = 0,
174   DIFF_KIND_LEADER_PROPOSAL,
175   DIFF_KIND_LEADER_CONSENSUS,
176   DIFF_KIND_GRADECAST_RESULT,
177 };
178
179 enum RfnKind
180 {
181   RFN_KIND_NONE = 0,
182   RFN_KIND_ECHO,
183   RFN_KIND_CONFIRM,
184   RFN_KIND_GRADECAST_RESULT
185 };
186
187
188 struct SetOpCls
189 {
190   struct SetKey input_set;
191
192   struct SetKey output_set;
193   struct RfnKey output_rfn;
194   struct DiffKey output_diff;
195
196   int do_not_remove;
197
198   int transceive_contested;
199
200   struct GNUNET_SET_OperationHandle *op;
201 };
202
203
204 struct FinishCls
205 {
206   struct SetKey input_set;
207 };
208
209 /**
210  * Closure for both @a start_task
211  * and @a cancel_task.
212  */
213 union TaskFuncCls
214 {
215   struct SetOpCls setop;
216   struct FinishCls finish;
217 };
218
219 struct TaskEntry;
220
221 typedef void (*TaskFunc) (struct TaskEntry *task);
222
223 /*
224  * Node in the consensus task graph.
225  */
226 struct TaskEntry
227 {
228   struct TaskKey key;
229
230   struct Step *step;
231
232   int is_started;
233
234   int is_finished;
235
236   TaskFunc start;
237   TaskFunc cancel;
238
239   union TaskFuncCls cls;
240 };
241
242
243 struct Step
244 {
245   /**
246    * All steps of one session are in a
247    * linked list for easier deallocation.
248    */
249   struct Step *prev;
250
251   /**
252    * All steps of one session are in a
253    * linked list for easier deallocation.
254    */
255   struct Step *next;
256
257   struct ConsensusSession *session;
258
259   /**
260    * Tasks that this step is composed of.
261    */
262   struct TaskEntry **tasks;
263   unsigned int tasks_len;
264   unsigned int tasks_cap;
265
266   unsigned int finished_tasks;
267
268   /*
269    * Tasks that have this task as dependency.
270    *
271    * We store pointers to subordinates rather
272    * than to prerequisites since it makes
273    * tracking the readiness of a task easier.
274    */
275   struct Step **subordinates;
276   unsigned int subordinates_len;
277   unsigned int subordinates_cap;
278
279   /**
280    * Counter for the prerequisites of
281    * this step.
282    */
283   size_t pending_prereq;
284
285   /*
286    * Task that will run this step despite
287    * any pending prerequisites.
288    */
289   struct GNUNET_SCHEDULER_Task *timeout_task;
290
291   unsigned int is_running;
292
293   unsigned int is_finished;
294
295   /*
296    * Synchrony round of the task.
297    * Determines the deadline for the task.
298    */
299   unsigned int round;
300
301   /**
302    * Human-readable name for
303    * the task, used for debugging.
304    */
305   char *debug_name;
306
307   /**
308    * When we're doing an early finish, how should this step be
309    * treated?
310    * If GNUNET_YES, the step will be marked as finished
311    * without actually running its tasks.
312    * Otherwise, the step will still be run even after
313    * an early finish.
314    *
315    * Note that a task may never be finished early if
316    * it is already running.
317    */
318   int early_finishable;
319 };
320
321
322 struct RfnElementInfo
323 {
324   const struct GNUNET_SET_Element *element;
325
326   /*
327    * GNUNET_YES if the peer votes for the proposal.
328    */
329   int *votes;
330
331   /**
332    * Proposal for this element,
333    * can only be VOTE_ADD or VOTE_REMOVE.
334    */
335   enum ReferendumVote proposal;
336 };
337
338
339 struct ReferendumEntry
340 {
341   struct RfnKey key;
342
343   /*
344    * Elements where there is at least one proposed change.
345    *
346    * Maps the hash of the GNUNET_SET_Element
347    * to 'struct RfnElementInfo'.
348    */
349   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
350
351   unsigned int num_peers;
352
353   /**
354    * Stores, for every peer in the session,
355    * whether the peer finished the whole referendum.
356    *
357    * Votes from peers are only counted if they're
358    * marked as commited (#GNUNET_YES) in the referendum.
359    *
360    * Otherwise (#GNUNET_NO), the requested changes are
361    * not counted for majority votes or thresholds.
362    */
363   int *peer_commited;
364
365
366   /**
367    * Contestation state of the peer.  If a peer is contested, the values it
368    * contributed are still counted for applying changes, but the grading is
369    * affected.
370    */
371   int *peer_contested;
372 };
373
374
375 struct DiffElementInfo
376 {
377   const struct GNUNET_SET_Element *element;
378
379   /**
380    * Positive weight for 'add', negative
381    * weights for 'remove'.
382    */
383   int weight;
384 };
385
386
387 /**
388  * Weighted diff.
389  */
390 struct DiffEntry
391 {
392   struct DiffKey key;
393   struct GNUNET_CONTAINER_MultiHashMap *changes;
394 };
395
396 struct SetHandle
397 {
398   struct SetHandle *prev;
399   struct SetHandle *next;
400
401   struct GNUNET_SET_Handle *h;
402 };
403
404
405
406 /**
407  * A consensus session consists of one local client and the remote authorities.
408  */
409 struct ConsensusSession
410 {
411   /**
412    * Consensus sessions are kept in a DLL.
413    */
414   struct ConsensusSession *next;
415
416   /**
417    * Consensus sessions are kept in a DLL.
418    */
419   struct ConsensusSession *prev;
420
421   unsigned int num_client_insert_pending;
422
423   struct GNUNET_CONTAINER_MultiHashMap *setmap;
424   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
425   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
426
427   /**
428    * Array of peers with length 'num_peers'.
429    */
430   int *peers_blacklisted;
431
432   /*
433    * Mapping from (hashed) TaskKey to TaskEntry.
434    *
435    * We map the application_id for a round to the task that should be
436    * executed, so we don't have to go through all task whenever we get
437    * an incoming set op request.
438    */
439   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
440
441   struct Step *steps_head;
442   struct Step *steps_tail;
443
444   int conclude_started;
445
446   int conclude_done;
447
448   /**
449   * Global consensus identification, computed
450   * from the session id and participating authorities.
451   */
452   struct GNUNET_HashCode global_id;
453
454   /**
455    * Client that inhabits the session
456    */
457   struct GNUNET_SERVICE_Client *client;
458
459   /**
460    * Queued messages to the client.
461    */
462   struct GNUNET_MQ_Handle *client_mq;
463
464   /**
465    * Time when the conclusion of the consensus should begin.
466    */
467   struct GNUNET_TIME_Absolute conclude_start;
468
469   /**
470    * Timeout for all rounds together, single rounds will schedule a timeout task
471    * with a fraction of the conclude timeout.
472    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
473    */
474   struct GNUNET_TIME_Absolute conclude_deadline;
475
476   struct GNUNET_PeerIdentity *peers;
477
478   /**
479    * Number of other peers in the consensus.
480    */
481   unsigned int num_peers;
482
483   /**
484    * Index of the local peer in the peers array
485    */
486   unsigned int local_peer_idx;
487
488   /**
489    * Listener for requests from other peers.
490    * Uses the session's global id as app id.
491    */
492   struct GNUNET_SET_ListenHandle *set_listener;
493
494   /**
495    * State of our early stopping scheme.
496    */
497   int early_stopping;
498
499   /**
500    * Our set size from the first round.
501    */
502   uint64_t first_size;
503
504   uint64_t *first_sizes_received;
505
506   /**
507    * Bounded Eppstein lower bound.
508    */
509   uint64_t lower_bound;
510
511   struct SetHandle *set_handles_head;
512   struct SetHandle *set_handles_tail;
513 };
514
515 /**
516  * Linked list of sessions this peer participates in.
517  */
518 static struct ConsensusSession *sessions_head;
519
520 /**
521  * Linked list of sessions this peer participates in.
522  */
523 static struct ConsensusSession *sessions_tail;
524
525 /**
526  * Configuration of the consensus service.
527  */
528 static const struct GNUNET_CONFIGURATION_Handle *cfg;
529
530 /**
531  * Peer that runs this service.
532  */
533 static struct GNUNET_PeerIdentity my_peer;
534
535 /**
536  * Statistics handle.
537  */
538 struct GNUNET_STATISTICS_Handle *statistics;
539
540
541 static void
542 finish_task (struct TaskEntry *task);
543
544
545 static void
546 run_ready_steps (struct ConsensusSession *session);
547
548
549 static const char *
550 phasename (uint16_t phase)
551 {
552   switch (phase)
553   {
554     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
555     case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
556     case PHASE_KIND_FINISH: return "FINISH";
557     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
558     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
559     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
560     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
561     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
562     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
563     default: return "(unknown)";
564   }
565 }
566
567
568 static const char *
569 setname (uint16_t kind)
570 {
571   switch (kind)
572   {
573     case SET_KIND_CURRENT: return "CURRENT";
574     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
575     case SET_KIND_NONE: return "NONE";
576     default: return "(unknown)";
577   }
578 }
579
580 static const char *
581 rfnname (uint16_t kind)
582 {
583   switch (kind)
584   {
585     case RFN_KIND_NONE: return "NONE";
586     case RFN_KIND_ECHO: return "ECHO";
587     case RFN_KIND_CONFIRM: return "CONFIRM";
588     default: return "(unknown)";
589   }
590 }
591
592 static const char *
593 diffname (uint16_t kind)
594 {
595   switch (kind)
596   {
597     case DIFF_KIND_NONE: return "NONE";
598     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
599     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
600     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
601     default: return "(unknown)";
602   }
603 }
604
605 #ifdef GNUNET_EXTRA_LOGGING
606
607
608 static const char *
609 debug_str_element (const struct GNUNET_SET_Element *el)
610 {
611   struct GNUNET_HashCode hash;
612
613   GNUNET_SET_element_hash (el, &hash);
614
615   return GNUNET_h2s (&hash);
616 }
617
618 static const char *
619 debug_str_task_key (struct TaskKey *tk)
620 {
621   static char buf[256];
622
623   snprintf (buf, sizeof (buf),
624             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
625             phasename (tk->kind), tk->peer1, tk->peer2,
626             tk->leader, tk->repetition);
627
628   return buf;
629 }
630
631 static const char *
632 debug_str_diff_key (struct DiffKey *dk)
633 {
634   static char buf[256];
635
636   snprintf (buf, sizeof (buf),
637             "DiffKey kind=%s, k1=%d, k2=%d",
638             diffname (dk->diff_kind), dk->k1, dk->k2);
639
640   return buf;
641 }
642
643 static const char *
644 debug_str_set_key (const struct SetKey *sk)
645 {
646   static char buf[256];
647
648   snprintf (buf, sizeof (buf),
649             "SetKey kind=%s, k1=%d, k2=%d",
650             setname (sk->set_kind), sk->k1, sk->k2);
651
652   return buf;
653 }
654
655
656 static const char *
657 debug_str_rfn_key (const struct RfnKey *rk)
658 {
659   static char buf[256];
660
661   snprintf (buf, sizeof (buf),
662             "RfnKey kind=%s, k1=%d, k2=%d",
663             rfnname (rk->rfn_kind), rk->k1, rk->k2);
664
665   return buf;
666 }
667
668 #endif /* GNUNET_EXTRA_LOGGING */
669
670
671 /**
672  * Send the final result set of the consensus to the client, element by
673  * element.
674  *
675  * @param cls closure
676  * @param element the current element, NULL if all elements have been
677  *        iterated over
678  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
679  */
680 static int
681 send_to_client_iter (void *cls,
682                      const struct GNUNET_SET_Element *element)
683 {
684   struct TaskEntry *task = (struct TaskEntry *) cls;
685   struct ConsensusSession *session = task->step->session;
686   struct GNUNET_MQ_Envelope *ev;
687
688   if (NULL != element)
689   {
690     struct GNUNET_CONSENSUS_ElementMessage *m;
691     const struct ConsensusElement *ce;
692
693     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
694     ce = element->data;
695
696     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "marker is %u\n", (unsigned) ce->marker);
697
698     if (0 != ce->marker)
699       return GNUNET_YES;
700
701     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702                 "P%d: sending element %s to client\n",
703                 session->local_peer_idx,
704                 debug_str_element (element));
705
706     ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
707                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
708     m->element_type = ce->payload_type;
709     GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
710     GNUNET_MQ_send (session->client_mq, ev);
711   }
712   else
713   {
714     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715                 "P%d: finished iterating elements for client\n",
716                 session->local_peer_idx);
717     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
718     GNUNET_MQ_send (session->client_mq, ev);
719   }
720   return GNUNET_YES;
721 }
722
723
724 static struct SetEntry *
725 lookup_set (struct ConsensusSession *session, struct SetKey *key)
726 {
727   struct GNUNET_HashCode hash;
728
729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730               "P%u: looking up set {%s}\n",
731               session->local_peer_idx,
732               debug_str_set_key (key));
733
734   GNUNET_assert (SET_KIND_NONE != key->set_kind);
735   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
736   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
737 }
738
739
740 static struct DiffEntry *
741 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
742 {
743   struct GNUNET_HashCode hash;
744
745   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746               "P%u: looking up diff {%s}\n",
747               session->local_peer_idx,
748               debug_str_diff_key (key));
749
750   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
751   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
752   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
753 }
754
755
756 static struct ReferendumEntry *
757 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
758 {
759   struct GNUNET_HashCode hash;
760
761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762               "P%u: looking up rfn {%s}\n",
763               session->local_peer_idx,
764               debug_str_rfn_key (key));
765
766   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
767   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
768   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
769 }
770
771
772 static void
773 diff_insert (struct DiffEntry *diff,
774              int weight,
775              const struct GNUNET_SET_Element *element)
776 {
777   struct DiffElementInfo *di;
778   struct GNUNET_HashCode hash;
779
780   GNUNET_assert ( (1 == weight) || (-1 == weight));
781
782   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783               "diff_insert with element size %u\n",
784               element->size);
785
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "hashing element\n");
788
789   GNUNET_SET_element_hash (element, &hash);
790
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "hashed element\n");
793
794   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
795
796   if (NULL == di)
797   {
798     di = GNUNET_new (struct DiffElementInfo);
799     di->element = GNUNET_SET_element_dup (element);
800     GNUNET_assert (GNUNET_OK ==
801                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
802                                                       &hash, di,
803                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
804   }
805
806   di->weight = weight;
807 }
808
809
810 static void
811 rfn_commit (struct ReferendumEntry *rfn,
812             uint16_t commit_peer)
813 {
814   GNUNET_assert (commit_peer < rfn->num_peers);
815
816   rfn->peer_commited[commit_peer] = GNUNET_YES;
817 }
818
819
820 static void
821 rfn_contest (struct ReferendumEntry *rfn,
822              uint16_t contested_peer)
823 {
824   GNUNET_assert (contested_peer < rfn->num_peers);
825
826   rfn->peer_contested[contested_peer] = GNUNET_YES;
827 }
828
829
830 static uint16_t
831 rfn_noncontested (struct ReferendumEntry *rfn)
832 {
833   uint16_t i;
834   uint16_t ret;
835
836   ret = 0;
837   for (i = 0; i < rfn->num_peers; i++)
838     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
839       ret++;
840
841   return ret;
842 }
843
844
845 static void
846 rfn_vote (struct ReferendumEntry *rfn,
847           uint16_t voting_peer,
848           enum ReferendumVote vote,
849           const struct GNUNET_SET_Element *element)
850 {
851   struct RfnElementInfo *ri;
852   struct GNUNET_HashCode hash;
853
854   GNUNET_assert (voting_peer < rfn->num_peers);
855
856   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
857      since VOTE_KEEP is implicit in not voting. */
858   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
859
860   GNUNET_SET_element_hash (element, &hash);
861   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
862
863   if (NULL == ri)
864   {
865     ri = GNUNET_new (struct RfnElementInfo);
866     ri->element = GNUNET_SET_element_dup (element);
867     ri->votes = GNUNET_new_array (rfn->num_peers, int);
868     GNUNET_assert (GNUNET_OK ==
869                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
870                                                       &hash, ri,
871                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
872   }
873
874   ri->votes[voting_peer] = GNUNET_YES;
875   ri->proposal = vote;
876 }
877
878
879 static uint16_t
880 task_other_peer (struct TaskEntry *task)
881 {
882   uint16_t me = task->step->session->local_peer_idx;
883   if (task->key.peer1 == me)
884     return task->key.peer2;
885   return task->key.peer1;
886 }
887
888
889 static int
890 cmp_uint64_t (const void *pa, const void *pb)
891 {
892   uint64_t a = *(uint64_t *) pa;
893   uint64_t b = *(uint64_t *) pb;
894
895   if (a == b)
896     return 0;
897   if (a < b)
898     return -1;
899   return 1;
900 }
901
902
903 /**
904  * Callback for set operation results. Called for each element
905  * in the result set.
906  *
907  * @param cls closure
908  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
909  * @param current_size current set size
910  * @param status see enum GNUNET_SET_Status
911  */
912 static void
913 set_result_cb (void *cls,
914                const struct GNUNET_SET_Element *element,
915                uint64_t current_size,
916                enum GNUNET_SET_Status status)
917 {
918   struct TaskEntry *task = cls;
919   struct ConsensusSession *session = task->step->session;
920   struct SetEntry *output_set = NULL;
921   struct DiffEntry *output_diff = NULL;
922   struct ReferendumEntry *output_rfn = NULL;
923   unsigned int other_idx;
924   struct SetOpCls *setop;
925   const struct ConsensusElement *consensus_element = NULL;
926
927   if (NULL != element)
928   {
929     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930                 "P%u: got element of type %u, status %u\n",
931                 session->local_peer_idx,
932                 (unsigned) element->element_type,
933                 (unsigned) status);
934     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
935     consensus_element = element->data;
936   }
937
938   setop = &task->cls.setop;
939
940
941   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942               "P%u: got set result for {%s}, status %u\n",
943               session->local_peer_idx,
944               debug_str_task_key (&task->key),
945               status);
946
947   if (GNUNET_NO == task->is_started)
948   {
949     GNUNET_break_op (0);
950     return;
951   }
952
953   if (GNUNET_YES == task->is_finished)
954   {
955     GNUNET_break_op (0);
956     return;
957   }
958
959   other_idx = task_other_peer (task);
960
961   if (SET_KIND_NONE != setop->output_set.set_kind)
962   {
963     output_set = lookup_set (session, &setop->output_set);
964     GNUNET_assert (NULL != output_set);
965   }
966
967   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
968   {
969     output_diff = lookup_diff (session, &setop->output_diff);
970     GNUNET_assert (NULL != output_diff);
971   }
972
973   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
974   {
975     output_rfn = lookup_rfn (session, &setop->output_rfn);
976     GNUNET_assert (NULL != output_rfn);
977   }
978
979   if (GNUNET_YES == session->peers_blacklisted[other_idx])
980   {
981     /* Peer might have been blacklisted
982        by a gradecast running in parallel, ignore elements from now */
983     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
984       return;
985     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
986       return;
987   }
988
989   if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
990   {
991     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
992                 "P%u: got some marker\n",
993                   session->local_peer_idx);
994     if ( (GNUNET_YES == setop->transceive_contested) &&
995          (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
996     {
997       GNUNET_assert (NULL != output_rfn);
998       rfn_contest (output_rfn, task_other_peer (task));
999       return;
1000     }
1001
1002     if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1003     {
1004
1005       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1006                   "P%u: got size marker\n",
1007                   session->local_peer_idx);
1008
1009
1010       struct ConsensusSizeElement *cse = (void *) consensus_element;
1011
1012       if (cse->sender_index == other_idx)
1013       {
1014         if (NULL == session->first_sizes_received)
1015           session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1016         session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1017
1018         uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1019         qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1020         session->lower_bound = copy[session->num_peers / 3 + 1];
1021         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1022                     "P%u: lower bound %llu\n",
1023                     session->local_peer_idx,
1024                     (long long) session->lower_bound);
1025       }
1026       return;
1027     }
1028
1029     return;
1030   }
1031
1032   switch (status)
1033   {
1034     case GNUNET_SET_STATUS_ADD_LOCAL:
1035       GNUNET_assert (NULL != consensus_element);
1036       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1037                   "Adding element in Task {%s}\n",
1038                   debug_str_task_key (&task->key));
1039       if (NULL != output_set)
1040       {
1041         // FIXME: record pending adds, use callback
1042         GNUNET_SET_add_element (output_set->h,
1043                                 element,
1044                                 NULL,
1045                                 NULL);
1046 #ifdef GNUNET_EXTRA_LOGGING
1047         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1048                     "P%u: adding element %s into set {%s} of task {%s}\n",
1049                     session->local_peer_idx,
1050                     debug_str_element (element),
1051                     debug_str_set_key (&setop->output_set),
1052                     debug_str_task_key (&task->key));
1053 #endif
1054       }
1055       if (NULL != output_diff)
1056       {
1057         diff_insert (output_diff, 1, element);
1058 #ifdef GNUNET_EXTRA_LOGGING
1059         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1060                     "P%u: adding element %s into diff {%s} of task {%s}\n",
1061                     session->local_peer_idx,
1062                     debug_str_element (element),
1063                     debug_str_diff_key (&setop->output_diff),
1064                     debug_str_task_key (&task->key));
1065 #endif
1066       }
1067       if (NULL != output_rfn)
1068       {
1069         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1070 #ifdef GNUNET_EXTRA_LOGGING
1071         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1072                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
1073                     session->local_peer_idx,
1074                     debug_str_element (element),
1075                     debug_str_rfn_key (&setop->output_rfn),
1076                     debug_str_task_key (&task->key));
1077 #endif
1078       }
1079       // XXX: add result to structures in task
1080       break;
1081     case GNUNET_SET_STATUS_ADD_REMOTE:
1082       GNUNET_assert (NULL != consensus_element);
1083       if (GNUNET_YES == setop->do_not_remove)
1084         break;
1085       if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1086         break;
1087       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1088                   "Removing element in Task {%s}\n",
1089                   debug_str_task_key (&task->key));
1090       if (NULL != output_set)
1091       {
1092         // FIXME: record pending adds, use callback
1093         GNUNET_SET_remove_element (output_set->h,
1094                                    element,
1095                                    NULL,
1096                                    NULL);
1097 #ifdef GNUNET_EXTRA_LOGGING
1098         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1099                     "P%u: removing element %s from set {%s} of task {%s}\n",
1100                     session->local_peer_idx,
1101                     debug_str_element (element),
1102                     debug_str_set_key (&setop->output_set),
1103                     debug_str_task_key (&task->key));
1104 #endif
1105       }
1106       if (NULL != output_diff)
1107       {
1108         diff_insert (output_diff, -1, element);
1109 #ifdef GNUNET_EXTRA_LOGGING
1110         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1111                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1112                     session->local_peer_idx,
1113                     debug_str_element (element),
1114                     debug_str_diff_key (&setop->output_diff),
1115                     debug_str_task_key (&task->key));
1116 #endif
1117       }
1118       if (NULL != output_rfn)
1119       {
1120         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1121 #ifdef GNUNET_EXTRA_LOGGING
1122         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1123                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1124                     session->local_peer_idx,
1125                     debug_str_element (element),
1126                     debug_str_rfn_key (&setop->output_rfn),
1127                     debug_str_task_key (&task->key));
1128 #endif
1129       }
1130       break;
1131     case GNUNET_SET_STATUS_DONE:
1132       // XXX: check first if any changes to the underlying
1133       // set are still pending
1134       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135                   "Finishing setop in Task {%s}\n",
1136                   debug_str_task_key (&task->key));
1137       if (NULL != output_rfn)
1138       {
1139         rfn_commit (output_rfn, task_other_peer (task));
1140       }
1141       if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1142       {
1143         session->first_size = current_size;
1144       }
1145       finish_task (task);
1146       break;
1147     case GNUNET_SET_STATUS_FAILURE:
1148       // XXX: cleanup
1149       GNUNET_break_op (0);
1150       finish_task (task);
1151       return;
1152     default:
1153       /* not reached */
1154       GNUNET_assert (0);
1155   }
1156 }
1157
1158 #ifdef EVIL
1159
1160 enum EvilnessType
1161 {
1162   EVILNESS_NONE,
1163   EVILNESS_CRAM_ALL,
1164   EVILNESS_CRAM_LEAD,
1165   EVILNESS_CRAM_ECHO,
1166   EVILNESS_SLACK,
1167   EVILNESS_SLACK_A2A,
1168 };
1169
1170 enum EvilnessSubType
1171 {
1172   EVILNESS_SUB_NONE,
1173   EVILNESS_SUB_REPLACEMENT,
1174   EVILNESS_SUB_NO_REPLACEMENT,
1175 };
1176
1177 struct Evilness
1178 {
1179   enum EvilnessType type;
1180   enum EvilnessSubType subtype;
1181   unsigned int num;
1182 };
1183
1184
1185 static int
1186 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1187 {
1188   if (0 == strcmp ("replace", evil_subtype_str))
1189   {
1190     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1191   }
1192   else if (0 == strcmp ("noreplace", evil_subtype_str))
1193   {
1194     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1195   }
1196   else
1197   {
1198     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1199                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1200                 evil_subtype_str);
1201     return GNUNET_SYSERR;
1202   }
1203   return GNUNET_OK;
1204 }
1205
1206
1207 static void
1208 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1209 {
1210   char *evil_spec;
1211   char *field;
1212   char *evil_type_str = NULL;
1213   char *evil_subtype_str = NULL;
1214
1215   GNUNET_assert (NULL != evil);
1216
1217   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1218   {
1219     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1220                 "P%u: no evilness\n",
1221                 session->local_peer_idx);
1222     evil->type = EVILNESS_NONE;
1223     return;
1224   }
1225   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1226               "P%u: got evilness spec\n",
1227               session->local_peer_idx);
1228
1229   for (field = strtok (evil_spec, "/");
1230        NULL != field;
1231        field = strtok (NULL, "/"))
1232   {
1233     unsigned int peer_num;
1234     unsigned int evil_num;
1235     int ret;
1236
1237     evil_type_str = NULL;
1238     evil_subtype_str = NULL;
1239
1240     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1241
1242     if (ret != 4)
1243     {
1244       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1245                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1246                   field,
1247                   ret);
1248       goto not_evil;
1249     }
1250
1251     GNUNET_assert (NULL != evil_type_str);
1252     GNUNET_assert (NULL != evil_subtype_str);
1253
1254     if (peer_num == session->local_peer_idx)
1255     {
1256       if (0 == strcmp ("slack", evil_type_str))
1257       {
1258         evil->type = EVILNESS_SLACK;
1259       }
1260       if (0 == strcmp ("slack-a2a", evil_type_str))
1261       {
1262         evil->type = EVILNESS_SLACK_A2A;
1263       }
1264       else if (0 == strcmp ("cram-all", evil_type_str))
1265       {
1266         evil->type = EVILNESS_CRAM_ALL;
1267         evil->num = evil_num;
1268         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1269           goto not_evil;
1270       }
1271       else if (0 == strcmp ("cram-lead", evil_type_str))
1272       {
1273         evil->type = EVILNESS_CRAM_LEAD;
1274         evil->num = evil_num;
1275         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1276           goto not_evil;
1277       }
1278       else if (0 == strcmp ("cram-echo", evil_type_str))
1279       {
1280         evil->type = EVILNESS_CRAM_ECHO;
1281         evil->num = evil_num;
1282         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1283           goto not_evil;
1284       }
1285       else
1286       {
1287         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1288                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1289                     evil_type_str);
1290         goto not_evil;
1291       }
1292       goto cleanup;
1293     }
1294     /* No GNUNET_free since memory was allocated by libc */
1295     free (evil_type_str);
1296     evil_type_str = NULL;
1297     evil_subtype_str = NULL;
1298   }
1299 not_evil:
1300   evil->type = EVILNESS_NONE;
1301 cleanup:
1302   GNUNET_free (evil_spec);
1303   /* no GNUNET_free_non_null since it wasn't
1304    * allocated with GNUNET_malloc */
1305   if (NULL != evil_type_str)
1306     free (evil_type_str);
1307   if (NULL != evil_subtype_str)
1308     free (evil_subtype_str);
1309 }
1310
1311 #endif
1312
1313
1314 /**
1315  * Commit the appropriate set for a
1316  * task.
1317  */
1318 static void
1319 commit_set (struct ConsensusSession *session,
1320             struct TaskEntry *task)
1321 {
1322   struct SetEntry *set;
1323   struct SetOpCls *setop = &task->cls.setop;
1324
1325   GNUNET_assert (NULL != setop->op);
1326   set = lookup_set (session, &setop->input_set);
1327   GNUNET_assert (NULL != set);
1328
1329   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1330   {
1331     struct GNUNET_SET_Element element;
1332     struct ConsensusElement ce = { 0 };
1333     ce.marker = CONSENSUS_MARKER_CONTESTED;
1334     element.data = &ce;
1335     element.size = sizeof (struct ConsensusElement);
1336     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1337     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1338   }
1339
1340   if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1341   {
1342     struct GNUNET_SET_Element element;
1343     struct ConsensusSizeElement cse = { 0 };
1344     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting size marker\n");
1345     cse.ce.marker = CONSENSUS_MARKER_SIZE;
1346     cse.size = GNUNET_htonll (session->first_size);
1347     cse.sender_index = session->local_peer_idx;
1348     element.data = &cse;
1349     element.size = sizeof (struct ConsensusSizeElement);
1350     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1351     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1352   }
1353
1354 #ifdef EVIL
1355   {
1356     unsigned int i;
1357     struct Evilness evil;
1358
1359     get_evilness (session, &evil);
1360     if (EVILNESS_NONE != evil.type)
1361     {
1362       /* Useful for evaluation */
1363       GNUNET_STATISTICS_set (statistics,
1364                              "is evil",
1365                              1,
1366                              GNUNET_NO);
1367     }
1368     switch (evil.type)
1369     {
1370       case EVILNESS_CRAM_ALL:
1371       case EVILNESS_CRAM_LEAD:
1372       case EVILNESS_CRAM_ECHO:
1373         /* We're not cramming elements in the
1374            all-to-all round, since that would just
1375            add more elements to the result set, but
1376            wouldn't test robustness. */
1377         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1378         {
1379           GNUNET_SET_commit (setop->op, set->h);
1380           break;
1381         }
1382         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1383             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1384         {
1385           GNUNET_SET_commit (setop->op, set->h);
1386           break;
1387         }
1388         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1389         {
1390           GNUNET_SET_commit (setop->op, set->h);
1391           break;
1392         }
1393         for (i = 0; i < evil.num; i++)
1394         {
1395           struct GNUNET_SET_Element element;
1396           struct ConsensusStuffedElement se = { 0 };
1397           element.data = &se;
1398           element.size = sizeof (struct ConsensusStuffedElement);
1399           element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1400
1401           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1402           {
1403             /* Always generate a new element. */
1404             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1405           }
1406           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1407           {
1408             /* Always cram the same elements, derived from counter. */
1409             GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1410           }
1411           else
1412           {
1413             GNUNET_assert (0);
1414           }
1415           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1416 #ifdef GNUNET_EXTRA_LOGGING
1417           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1418                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1419                       session->local_peer_idx,
1420                       debug_str_element (&element),
1421                       debug_str_set_key (&setop->input_set),
1422                       debug_str_task_key (&task->key));
1423 #endif
1424         }
1425         GNUNET_STATISTICS_update (statistics,
1426                                   "# stuffed elements",
1427                                   evil.num,
1428                                   GNUNET_NO);
1429         GNUNET_SET_commit (setop->op, set->h);
1430         break;
1431       case EVILNESS_SLACK:
1432         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1433                     "P%u: evil peer: slacking\n",
1434                     (unsigned int) session->local_peer_idx);
1435         /* Do nothing. */
1436       case EVILNESS_SLACK_A2A:
1437         if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1438              (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1439         {
1440           struct GNUNET_SET_Handle *empty_set;
1441           empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1442           GNUNET_SET_commit (setop->op, empty_set);
1443           GNUNET_SET_destroy (empty_set);
1444         }
1445         else
1446         {
1447           GNUNET_SET_commit (setop->op, set->h);
1448         }
1449         break;
1450       case EVILNESS_NONE:
1451         GNUNET_SET_commit (setop->op, set->h);
1452         break;
1453     }
1454   }
1455 #else
1456   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1457   {
1458     GNUNET_SET_commit (setop->op, set->h);
1459   }
1460   else
1461   {
1462     /* For our testcases, we don't want the blacklisted
1463        peers to wait. */
1464     GNUNET_SET_operation_cancel (setop->op);
1465     setop->op = NULL;
1466   }
1467 #endif
1468 }
1469
1470
1471 static void
1472 put_diff (struct ConsensusSession *session,
1473          struct DiffEntry *diff)
1474 {
1475   struct GNUNET_HashCode hash;
1476
1477   GNUNET_assert (NULL != diff);
1478
1479   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1480   GNUNET_assert (GNUNET_OK ==
1481                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1482                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1483 }
1484
1485 static void
1486 put_set (struct ConsensusSession *session,
1487          struct SetEntry *set)
1488 {
1489   struct GNUNET_HashCode hash;
1490
1491   GNUNET_assert (NULL != set->h);
1492
1493   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1494               "Putting set %s\n",
1495               debug_str_set_key (&set->key));
1496
1497   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1498   GNUNET_assert (GNUNET_SYSERR !=
1499                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1500                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1501 }
1502
1503
1504 static void
1505 put_rfn (struct ConsensusSession *session,
1506          struct ReferendumEntry *rfn)
1507 {
1508   struct GNUNET_HashCode hash;
1509
1510   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1511   GNUNET_assert (GNUNET_OK ==
1512                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1513                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1514 }
1515
1516
1517
1518 static void
1519 task_cancel_reconcile (struct TaskEntry *task)
1520 {
1521   /* not implemented yet */
1522   GNUNET_assert (0);
1523 }
1524
1525
1526 static void
1527 apply_diff_to_rfn (struct DiffEntry *diff,
1528                    struct ReferendumEntry *rfn,
1529                    uint16_t voting_peer,
1530                    uint16_t num_peers)
1531 {
1532   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1533   struct DiffElementInfo *di;
1534
1535   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1536
1537   while (GNUNET_YES ==
1538          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1539                                                       NULL,
1540                                                       (const void **) &di))
1541   {
1542     if (di->weight > 0)
1543     {
1544       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1545     }
1546     if (di->weight < 0)
1547     {
1548       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1549     }
1550   }
1551
1552   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1553 }
1554
1555
1556 struct DiffEntry *
1557 diff_create ()
1558 {
1559   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1560
1561   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1562
1563   return d;
1564 }
1565
1566
1567 struct DiffEntry *
1568 diff_compose (struct DiffEntry *diff_1,
1569               struct DiffEntry *diff_2)
1570 {
1571   struct DiffEntry *diff_new;
1572   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1573   struct DiffElementInfo *di;
1574
1575   diff_new = diff_create ();
1576
1577   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1578   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1579   {
1580     diff_insert (diff_new, di->weight, di->element);
1581   }
1582   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1583
1584   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1585   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1586   {
1587     diff_insert (diff_new, di->weight, di->element);
1588   }
1589   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1590
1591   return diff_new;
1592 }
1593
1594
1595 struct ReferendumEntry *
1596 rfn_create (uint16_t size)
1597 {
1598   struct ReferendumEntry *rfn;
1599
1600   rfn = GNUNET_new (struct ReferendumEntry);
1601   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1602   rfn->peer_commited = GNUNET_new_array (size, int);
1603   rfn->peer_contested = GNUNET_new_array (size, int);
1604   rfn->num_peers = size;
1605
1606   return rfn;
1607 }
1608
1609
1610 #if UNUSED
1611 static void
1612 diff_destroy (struct DiffEntry *diff)
1613 {
1614   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1615   GNUNET_free (diff);
1616 }
1617 #endif
1618
1619
1620 /**
1621  * For a given majority, count what the outcome
1622  * is (add/remove/keep), and give the number
1623  * of peers that voted for this outcome.
1624  */
1625 static void
1626 rfn_majority (const struct ReferendumEntry *rfn,
1627               const struct RfnElementInfo *ri,
1628               uint16_t *ret_majority,
1629               enum ReferendumVote *ret_vote)
1630 {
1631   uint16_t votes_yes = 0;
1632   uint16_t num_commited = 0;
1633   uint16_t i;
1634
1635   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636               "Computing rfn majority for element %s of rfn {%s}\n",
1637               debug_str_element (ri->element),
1638               debug_str_rfn_key (&rfn->key));
1639
1640   for (i = 0; i < rfn->num_peers; i++)
1641   {
1642     if (GNUNET_NO == rfn->peer_commited[i])
1643       continue;
1644     num_commited++;
1645
1646     if (GNUNET_YES == ri->votes[i])
1647       votes_yes++;
1648   }
1649
1650   if (votes_yes > (num_commited) / 2)
1651   {
1652     *ret_vote = ri->proposal;
1653     *ret_majority = votes_yes;
1654   }
1655   else
1656   {
1657     *ret_vote = VOTE_STAY;
1658     *ret_majority = num_commited - votes_yes;
1659   }
1660 }
1661
1662
1663 struct SetCopyCls
1664 {
1665   struct TaskEntry *task;
1666   struct SetKey dst_set_key;
1667 };
1668
1669
1670 static void
1671 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1672 {
1673   struct SetCopyCls *scc = cls;
1674   struct TaskEntry *task = scc->task;
1675   struct SetKey dst_set_key = scc->dst_set_key;
1676   struct SetEntry *set;
1677   struct SetHandle *sh = GNUNET_new (struct SetHandle);
1678
1679   sh->h = copy;
1680   GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1681                                task->step->session->set_handles_tail,
1682                                sh);
1683
1684   GNUNET_free (scc);
1685   set = GNUNET_new (struct SetEntry);
1686   set->h = copy;
1687   set->key = dst_set_key;
1688   put_set (task->step->session, set);
1689
1690   task->start (task);
1691 }
1692
1693
1694 /**
1695  * Call the start function of the given
1696  * task again after we created a copy of the given set.
1697  */
1698 static void
1699 create_set_copy_for_task (struct TaskEntry *task,
1700                           struct SetKey *src_set_key,
1701                           struct SetKey *dst_set_key)
1702 {
1703   struct SetEntry *src_set;
1704   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1705
1706   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1707               "Copying set {%s} to {%s} for task {%s}\n",
1708               debug_str_set_key (src_set_key),
1709               debug_str_set_key (dst_set_key),
1710               debug_str_task_key (&task->key));
1711
1712   scc->task = task;
1713   scc->dst_set_key = *dst_set_key;
1714   src_set = lookup_set (task->step->session, src_set_key);
1715   GNUNET_assert (NULL != src_set);
1716   GNUNET_SET_copy_lazy (src_set->h,
1717                         set_copy_cb,
1718                         scc);
1719 }
1720
1721
1722 struct SetMutationProgressCls
1723 {
1724   int num_pending;
1725   /**
1726    * Task to finish once all changes are through.
1727    */
1728   struct TaskEntry *task;
1729 };
1730
1731
1732 static void
1733 set_mutation_done (void *cls)
1734 {
1735   struct SetMutationProgressCls *pc = cls;
1736
1737   GNUNET_assert (pc->num_pending > 0);
1738
1739   pc->num_pending--;
1740
1741   if (0 == pc->num_pending)
1742   {
1743     struct TaskEntry *task = pc->task;
1744     GNUNET_free (pc);
1745     finish_task (task);
1746   }
1747 }
1748
1749
1750 static void
1751 try_finish_step_early (struct Step *step)
1752 {
1753   unsigned int i;
1754
1755   if (GNUNET_YES == step->is_running)
1756     return;
1757   if (GNUNET_YES == step->is_finished)
1758     return;
1759   if (GNUNET_NO == step->early_finishable)
1760     return;
1761
1762   step->is_finished = GNUNET_YES;
1763
1764 #ifdef GNUNET_EXTRA_LOGGING
1765   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1766               "Finishing step `%s' early.\n",
1767               step->debug_name);
1768 #endif
1769
1770   for (i = 0; i < step->subordinates_len; i++)
1771   {
1772     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1773     step->subordinates[i]->pending_prereq--;
1774 #ifdef GNUNET_EXTRA_LOGGING
1775     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1776                 "Decreased pending_prereq to %u for step `%s'.\n",
1777                 (unsigned int) step->subordinates[i]->pending_prereq,
1778                 step->subordinates[i]->debug_name);
1779
1780 #endif
1781     try_finish_step_early (step->subordinates[i]);
1782   }
1783
1784   // XXX: maybe schedule as task to avoid recursion?
1785   run_ready_steps (step->session);
1786 }
1787
1788
1789 static void
1790 finish_step (struct Step *step)
1791 {
1792   unsigned int i;
1793
1794   GNUNET_assert (step->finished_tasks == step->tasks_len);
1795   GNUNET_assert (GNUNET_YES == step->is_running);
1796   GNUNET_assert (GNUNET_NO == step->is_finished);
1797
1798 #ifdef GNUNET_EXTRA_LOGGING
1799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1800               "All tasks of step `%s' with %u subordinates finished.\n",
1801               step->debug_name,
1802               step->subordinates_len);
1803 #endif
1804
1805   for (i = 0; i < step->subordinates_len; i++)
1806   {
1807     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1808     step->subordinates[i]->pending_prereq--;
1809 #ifdef GNUNET_EXTRA_LOGGING
1810     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1811                 "Decreased pending_prereq to %u for step `%s'.\n",
1812                 (unsigned int) step->subordinates[i]->pending_prereq,
1813                 step->subordinates[i]->debug_name);
1814
1815 #endif
1816   }
1817
1818   step->is_finished = GNUNET_YES;
1819
1820   // XXX: maybe schedule as task to avoid recursion?
1821   run_ready_steps (step->session);
1822 }
1823
1824
1825
1826 /**
1827  * Apply the result from one round of gradecasts (i.e. every peer
1828  * should have gradecasted) to the peer's current set.
1829  *
1830  * @param task the task with context information
1831  */
1832 static void
1833 task_start_apply_round (struct TaskEntry *task)
1834 {
1835   struct ConsensusSession *session = task->step->session;
1836   struct SetKey sk_in;
1837   struct SetKey sk_out;
1838   struct RfnKey rk_in;
1839   struct SetEntry *set_out;
1840   struct ReferendumEntry *rfn_in;
1841   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1842   struct RfnElementInfo *ri;
1843   struct SetMutationProgressCls *progress_cls;
1844   uint16_t worst_majority = UINT16_MAX;
1845
1846   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1847   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1848   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1849
1850   set_out = lookup_set (session, &sk_out);
1851   if (NULL == set_out)
1852   {
1853     create_set_copy_for_task (task, &sk_in, &sk_out);
1854     return;
1855   }
1856
1857   rfn_in = lookup_rfn (session, &rk_in);
1858   GNUNET_assert (NULL != rfn_in);
1859
1860   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1861   progress_cls->task = task;
1862
1863   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1864
1865   while (GNUNET_YES ==
1866          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1867                                                       NULL,
1868                                                       (const void **) &ri))
1869   {
1870     uint16_t majority_num;
1871     enum ReferendumVote majority_vote;
1872
1873     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1874
1875     if (worst_majority > majority_num)
1876       worst_majority = majority_num;
1877
1878     switch (majority_vote)
1879     {
1880       case VOTE_ADD:
1881         progress_cls->num_pending++;
1882         GNUNET_assert (GNUNET_OK ==
1883                        GNUNET_SET_add_element (set_out->h,
1884                                                ri->element,
1885                                                &set_mutation_done,
1886                                                progress_cls));
1887         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1888                     "P%u: apply round: adding element %s with %u-majority.\n",
1889                     session->local_peer_idx,
1890                     debug_str_element (ri->element), majority_num);
1891         break;
1892       case VOTE_REMOVE:
1893         progress_cls->num_pending++;
1894         GNUNET_assert (GNUNET_OK ==
1895                        GNUNET_SET_remove_element (set_out->h,
1896                                                   ri->element,
1897                                                   &set_mutation_done,
1898                                                   progress_cls));
1899         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1900                     "P%u: apply round: deleting element %s with %u-majority.\n",
1901                     session->local_peer_idx,
1902                     debug_str_element (ri->element), majority_num);
1903         break;
1904       case VOTE_STAY:
1905         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1906                     "P%u: apply round: keeping element %s with %u-majority.\n",
1907                     session->local_peer_idx,
1908                     debug_str_element (ri->element), majority_num);
1909         // do nothing
1910         break;
1911       default:
1912         GNUNET_assert (0);
1913         break;
1914     }
1915   }
1916
1917   if (0 == progress_cls->num_pending)
1918   {
1919     // call closure right now, no pending ops
1920     GNUNET_free (progress_cls);
1921     finish_task (task);
1922   }
1923
1924   {
1925     uint16_t thresh = (session->num_peers / 3) * 2;
1926
1927     if (worst_majority >= thresh)
1928     {
1929       switch (session->early_stopping)
1930       {
1931         case EARLY_STOPPING_NONE:
1932           session->early_stopping = EARLY_STOPPING_ONE_MORE;
1933           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1934                       "P%u: Stopping early (after one more superround)\n",
1935                       session->local_peer_idx);
1936           break;
1937         case EARLY_STOPPING_ONE_MORE:
1938           GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1939                       session->local_peer_idx);
1940           session->early_stopping = EARLY_STOPPING_DONE;
1941           {
1942             struct Step *step;
1943             for (step = session->steps_head; NULL != step; step = step->next)
1944               try_finish_step_early (step);
1945           }
1946           break;
1947         case EARLY_STOPPING_DONE:
1948           /* We shouldn't be here anymore after early stopping */
1949           GNUNET_break (0);
1950           break;
1951         default:
1952           GNUNET_assert (0);
1953           break;
1954       }
1955     }
1956     else if (EARLY_STOPPING_NONE != session->early_stopping)
1957     {
1958       // Our assumption about the number of bad peers
1959       // has been broken.
1960       GNUNET_break_op (0);
1961     }
1962     else
1963     {
1964       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1965                   session->local_peer_idx);
1966     }
1967   }
1968   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1969 }
1970
1971
1972 static void
1973 task_start_grade (struct TaskEntry *task)
1974 {
1975   struct ConsensusSession *session = task->step->session;
1976   struct ReferendumEntry *output_rfn;
1977   struct ReferendumEntry *input_rfn;
1978   struct DiffEntry *input_diff;
1979   struct RfnKey rfn_key;
1980   struct DiffKey diff_key;
1981   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1982   struct RfnElementInfo *ri;
1983   unsigned int gradecast_confidence = 2;
1984
1985   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1986   output_rfn = lookup_rfn (session, &rfn_key);
1987   if (NULL == output_rfn)
1988   {
1989     output_rfn = rfn_create (session->num_peers);
1990     output_rfn->key = rfn_key;
1991     put_rfn (session, output_rfn);
1992   }
1993
1994   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1995   input_diff = lookup_diff (session, &diff_key);
1996   GNUNET_assert (NULL != input_diff);
1997
1998   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1999   input_rfn = lookup_rfn (session, &rfn_key);
2000   GNUNET_assert (NULL != input_rfn);
2001
2002   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2003
2004   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2005
2006   while (GNUNET_YES ==
2007          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2008                                                       NULL,
2009                                                       (const void **) &ri))
2010   {
2011     uint16_t majority_num;
2012     enum ReferendumVote majority_vote;
2013
2014     // XXX: we need contested votes and non-contested votes here
2015     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2016
2017     if (majority_num <= session->num_peers / 3)
2018       majority_vote = VOTE_REMOVE;
2019
2020     switch (majority_vote)
2021     {
2022       case VOTE_STAY:
2023         break;
2024       case VOTE_ADD:
2025         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2026         break;
2027       case VOTE_REMOVE:
2028         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2029         break;
2030       default:
2031         GNUNET_assert (0);
2032         break;
2033     }
2034   }
2035   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2036
2037   {
2038     uint16_t noncontested;
2039     noncontested = rfn_noncontested (input_rfn);
2040     if (noncontested < (session->num_peers / 3) * 2)
2041     {
2042       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2043     }
2044     if (noncontested < (session->num_peers / 3) + 1)
2045     {
2046       gradecast_confidence = 0;
2047     }
2048   }
2049
2050   if (gradecast_confidence >= 1)
2051     rfn_commit (output_rfn, task->key.leader);
2052
2053   if (gradecast_confidence <= 1)
2054     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2055
2056   finish_task (task);
2057 }
2058
2059
2060 static void
2061 task_start_reconcile (struct TaskEntry *task)
2062 {
2063   struct SetEntry *input;
2064   struct SetOpCls *setop = &task->cls.setop;
2065   struct ConsensusSession *session = task->step->session;
2066
2067   input = lookup_set (session, &setop->input_set);
2068   GNUNET_assert (NULL != input);
2069   GNUNET_assert (NULL != input->h);
2070
2071   /* We create the outputs for the operation here
2072      (rather than in the set operation callback)
2073      because we want something valid in there, even
2074      if the other peer doesn't talk to us */
2075
2076   if (SET_KIND_NONE != setop->output_set.set_kind)
2077   {
2078     /* If we don't have an existing output set,
2079        we clone the input set. */
2080     if (NULL == lookup_set (session, &setop->output_set))
2081     {
2082       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2083       return;
2084     }
2085   }
2086
2087   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2088   {
2089     if (NULL == lookup_rfn (session, &setop->output_rfn))
2090     {
2091       struct ReferendumEntry *rfn;
2092
2093       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2094                   "P%u: output rfn <%s> missing, creating.\n",
2095                   session->local_peer_idx,
2096                   debug_str_rfn_key (&setop->output_rfn));
2097
2098       rfn = rfn_create (session->num_peers);
2099       rfn->key = setop->output_rfn;
2100       put_rfn (session, rfn);
2101     }
2102   }
2103
2104   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2105   {
2106     if (NULL == lookup_diff (session, &setop->output_diff))
2107     {
2108       struct DiffEntry *diff;
2109
2110       diff = diff_create ();
2111       diff->key = setop->output_diff;
2112       put_diff (session, diff);
2113     }
2114   }
2115
2116   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2117   {
2118     /* XXX: mark the corresponding rfn as commited if necessary */
2119     finish_task (task);
2120     return;
2121   }
2122
2123   if (task->key.peer1 == session->local_peer_idx)
2124   {
2125     struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
2126
2127     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2128                 "P%u: Looking up set {%s} to run remote union\n",
2129                 session->local_peer_idx,
2130                 debug_str_set_key (&setop->input_set));
2131
2132     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2133     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2134
2135     rcm.kind = htons (task->key.kind);
2136     rcm.peer1 = htons (task->key.peer1);
2137     rcm.peer2 = htons (task->key.peer2);
2138     rcm.leader = htons (task->key.leader);
2139     rcm.repetition = htons (task->key.repetition);
2140
2141     GNUNET_assert (NULL == setop->op);
2142     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2143                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2144
2145     struct GNUNET_SET_Option opts[] = {
2146       { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2147       { GNUNET_SET_OPTION_END },
2148     };
2149
2150     // XXX: maybe this should be done while
2151     // setting up tasks alreays?
2152     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2153                                     &session->global_id,
2154                                     &rcm.header,
2155                                     GNUNET_SET_RESULT_SYMMETRIC,
2156                                     opts,
2157                                     set_result_cb,
2158                                     task);
2159
2160     commit_set (session, task);
2161   }
2162   else if (task->key.peer2 == session->local_peer_idx)
2163   {
2164     /* Wait for the other peer to contact us */
2165     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2166                 session->local_peer_idx, task->key.peer1);
2167
2168     if (NULL != setop->op)
2169     {
2170       commit_set (session, task);
2171     }
2172   }
2173   else
2174   {
2175     /* We made an error while constructing the task graph. */
2176     GNUNET_assert (0);
2177   }
2178 }
2179
2180
2181 static void
2182 task_start_eval_echo (struct TaskEntry *task)
2183 {
2184   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2185   struct ReferendumEntry *input_rfn;
2186   struct RfnElementInfo *ri;
2187   struct SetEntry *output_set;
2188   struct SetMutationProgressCls *progress_cls;
2189   struct ConsensusSession *session = task->step->session;
2190   struct SetKey sk_in;
2191   struct SetKey sk_out;
2192   struct RfnKey rk_in;
2193
2194   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2195   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2196   output_set = lookup_set (session, &sk_out);
2197   if (NULL == output_set)
2198   {
2199     create_set_copy_for_task (task, &sk_in, &sk_out);
2200     return;
2201   }
2202
2203
2204   {
2205     // FIXME: should be marked as a shallow copy, so
2206     // we can destroy everything correctly
2207     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2208     last_set->h = output_set->h;
2209     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2210     put_set (session, last_set);
2211   }
2212
2213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2214               "Evaluating referendum in Task {%s}\n",
2215               debug_str_task_key (&task->key));
2216
2217   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2218   progress_cls->task = task;
2219
2220   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2221   input_rfn = lookup_rfn (session, &rk_in);
2222
2223   GNUNET_assert (NULL != input_rfn);
2224
2225   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2226   GNUNET_assert (NULL != iter);
2227
2228   while (GNUNET_YES ==
2229          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2230                                                       NULL,
2231                                                       (const void **) &ri))
2232   {
2233     enum ReferendumVote majority_vote;
2234     uint16_t majority_num;
2235
2236     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2237
2238     if (majority_num < session->num_peers / 3)
2239     {
2240       /* It is not the case that all nonfaulty peers
2241          echoed the same value.  Since we're doing a set reconciliation, we
2242          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2243          reconciliation as contested.  Other peers might not know that the
2244          leader is faulty, thus we still re-distribute in the confirmation
2245          round. */
2246       output_set->is_contested = GNUNET_YES;
2247     }
2248
2249     switch (majority_vote)
2250     {
2251       case VOTE_ADD:
2252         progress_cls->num_pending++;
2253         GNUNET_assert (GNUNET_OK ==
2254                        GNUNET_SET_add_element (output_set->h,
2255                                                ri->element,
2256                                                set_mutation_done,
2257                                                progress_cls));
2258         break;
2259       case VOTE_REMOVE:
2260         progress_cls->num_pending++;
2261         GNUNET_assert (GNUNET_OK ==
2262                        GNUNET_SET_remove_element (output_set->h,
2263                                                   ri->element,
2264                                                   set_mutation_done,
2265                                                   progress_cls));
2266         break;
2267       case VOTE_STAY:
2268         /* Nothing to do. */
2269         break;
2270       default:
2271         /* not reached */
2272         GNUNET_assert (0);
2273     }
2274   }
2275
2276   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2277
2278   if (0 == progress_cls->num_pending)
2279   {
2280     // call closure right now, no pending ops
2281     GNUNET_free (progress_cls);
2282     finish_task (task);
2283   }
2284 }
2285
2286
2287 static void
2288 task_start_finish (struct TaskEntry *task)
2289 {
2290   struct SetEntry *final_set;
2291   struct ConsensusSession *session = task->step->session;
2292
2293   final_set = lookup_set (session, &task->cls.finish.input_set);
2294
2295   GNUNET_assert (NULL != final_set);
2296
2297
2298   GNUNET_SET_iterate (final_set->h,
2299                       send_to_client_iter,
2300                       task);
2301 }
2302
2303 static void
2304 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2305 {
2306   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2307
2308   GNUNET_assert (GNUNET_NO == task->is_started);
2309   GNUNET_assert (GNUNET_NO == task->is_finished);
2310   GNUNET_assert (NULL != task->start);
2311
2312   task->start (task);
2313
2314   task->is_started = GNUNET_YES;
2315 }
2316
2317
2318
2319
2320 /*
2321  * Run all steps of the session that don't any
2322  * more dependencies.
2323  */
2324 static void
2325 run_ready_steps (struct ConsensusSession *session)
2326 {
2327   struct Step *step;
2328
2329   step = session->steps_head;
2330
2331   while (NULL != step)
2332   {
2333     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2334     {
2335       size_t i;
2336
2337       GNUNET_assert (0 == step->finished_tasks);
2338
2339 #ifdef GNUNET_EXTRA_LOGGING
2340       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2341                   session->local_peer_idx,
2342                   step->debug_name,
2343                   step->round, step->tasks_len, step->subordinates_len);
2344 #endif
2345
2346       step->is_running = GNUNET_YES;
2347       for (i = 0; i < step->tasks_len; i++)
2348         start_task (session, step->tasks[i]);
2349
2350       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2351       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2352         finish_step (step);
2353
2354       /* Running the next ready steps will be triggered by task completion */
2355       return;
2356     }
2357     step = step->next;
2358   }
2359
2360   return;
2361 }
2362
2363
2364
2365 static void
2366 finish_task (struct TaskEntry *task)
2367 {
2368   GNUNET_assert (GNUNET_NO == task->is_finished);
2369   task->is_finished = GNUNET_YES;
2370
2371   task->step->finished_tasks++;
2372
2373   if (task->step->finished_tasks == task->step->tasks_len)
2374     finish_step (task->step);
2375 }
2376
2377
2378 /**
2379  * Search peer in the list of peers in session.
2380  *
2381  * @param peer peer to find
2382  * @param session session with peer
2383  * @return index of peer, -1 if peer is not in session
2384  */
2385 static int
2386 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2387 {
2388   int i;
2389   for (i = 0; i < session->num_peers; i++)
2390     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2391       return i;
2392   return -1;
2393 }
2394
2395
2396 /**
2397  * Compute a global, (hopefully) unique consensus session id,
2398  * from the local id of the consensus session, and the identities of all participants.
2399  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2400  * exactly the same peers, the global id will be different.
2401  *
2402  * @param session session to generate the global id for
2403  * @param local_session_id local id of the consensus session
2404  */
2405 static void
2406 compute_global_id (struct ConsensusSession *session,
2407                    const struct GNUNET_HashCode *local_session_id)
2408 {
2409   const char *salt = "gnunet-service-consensus/session_id";
2410
2411   GNUNET_assert (GNUNET_YES ==
2412                  GNUNET_CRYPTO_kdf (&session->global_id,
2413                                     sizeof (struct GNUNET_HashCode),
2414                                     salt,
2415                                     strlen (salt),
2416                                     session->peers,
2417                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2418                                     local_session_id,
2419                                     sizeof (struct GNUNET_HashCode),
2420                                     NULL));
2421 }
2422
2423
2424 /**
2425  * Compare two peer identities.
2426  *
2427  * @param h1 some peer identity
2428  * @param h2 some peer identity
2429  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2430  */
2431 static int
2432 peer_id_cmp (const void *h1, const void *h2)
2433 {
2434   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2435 }
2436
2437
2438 /**
2439  * Create the sorted list of peers for the session,
2440  * add the local peer if not in the join message.
2441  *
2442  * @param session session to initialize
2443  * @param join_msg join message with the list of peers participating at the end
2444  */
2445 static void
2446 initialize_session_peer_list (struct ConsensusSession *session,
2447                               const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2448 {
2449   const struct GNUNET_PeerIdentity *msg_peers
2450     = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2451   int local_peer_in_list;
2452
2453   session->num_peers = ntohl (join_msg->num_peers);
2454
2455   /* Peers in the join message, may or may not include the local peer,
2456      Add it if it is missing. */
2457   local_peer_in_list = GNUNET_NO;
2458   for (unsigned int i = 0; i < session->num_peers; i++)
2459   {
2460     if (0 == memcmp (&msg_peers[i],
2461                      &my_peer,
2462                      sizeof (struct GNUNET_PeerIdentity)))
2463     {
2464       local_peer_in_list = GNUNET_YES;
2465       break;
2466     }
2467   }
2468   if (GNUNET_NO == local_peer_in_list)
2469     session->num_peers++;
2470
2471   session->peers = GNUNET_new_array (session->num_peers,
2472                                      struct GNUNET_PeerIdentity);
2473   if (GNUNET_NO == local_peer_in_list)
2474     session->peers[session->num_peers - 1] = my_peer;
2475
2476   GNUNET_memcpy (session->peers,
2477                  msg_peers,
2478                  ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2479   qsort (session->peers,
2480          session->num_peers,
2481          sizeof (struct GNUNET_PeerIdentity),
2482          &peer_id_cmp);
2483 }
2484
2485
2486 static struct TaskEntry *
2487 lookup_task (struct ConsensusSession *session,
2488              struct TaskKey *key)
2489 {
2490   struct GNUNET_HashCode hash;
2491
2492
2493   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2494   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2495               GNUNET_h2s (&hash));
2496   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2497 }
2498
2499
2500 /**
2501  * Called when another peer wants to do a set operation with the
2502  * local peer.
2503  *
2504  * @param cls closure
2505  * @param other_peer the other peer
2506  * @param context_msg message with application specific information from
2507  *        the other peer
2508  * @param request request from the other peer, use GNUNET_SET_accept
2509  *        to accept it, otherwise the request will be refused
2510  *        Note that we don't use a return value here, as it is also
2511  *        necessary to specify the set we want to do the operation with,
2512  *        whith sometimes can be derived from the context message.
2513  *        Also necessary to specify the timeout.
2514  */
2515 static void
2516 set_listen_cb (void *cls,
2517                const struct GNUNET_PeerIdentity *other_peer,
2518                const struct GNUNET_MessageHeader *context_msg,
2519                struct GNUNET_SET_Request *request)
2520 {
2521   struct ConsensusSession *session = cls;
2522   struct TaskKey tk;
2523   struct TaskEntry *task;
2524   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2525
2526   if (NULL == context_msg)
2527   {
2528     GNUNET_break_op (0);
2529     return;
2530   }
2531
2532   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2533   {
2534     GNUNET_break_op (0);
2535     return;
2536   }
2537
2538   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2539   {
2540     GNUNET_break_op (0);
2541     return;
2542   }
2543
2544   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2545
2546   tk = ((struct TaskKey) {
2547       .kind = ntohs (cm->kind),
2548       .peer1 = ntohs (cm->peer1),
2549       .peer2 = ntohs (cm->peer2),
2550       .repetition = ntohs (cm->repetition),
2551       .leader = ntohs (cm->leader),
2552   });
2553
2554   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2555               session->local_peer_idx, debug_str_task_key (&tk));
2556
2557   task = lookup_task (session, &tk);
2558
2559   if (NULL == task)
2560   {
2561     GNUNET_break_op (0);
2562     return;
2563   }
2564
2565   if (GNUNET_YES == task->is_finished)
2566   {
2567     GNUNET_break_op (0);
2568     return;
2569   }
2570
2571   if (task->key.peer2 != session->local_peer_idx)
2572   {
2573     /* We're being asked, so we must be thne 2nd peer. */
2574     GNUNET_break_op (0);
2575     return;
2576   }
2577
2578   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2579                     (task->key.peer2 == session->local_peer_idx)));
2580
2581   struct GNUNET_SET_Option opts[] = {
2582     { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2583     { GNUNET_SET_OPTION_END },
2584   };
2585
2586   task->cls.setop.op = GNUNET_SET_accept (request,
2587                                           GNUNET_SET_RESULT_SYMMETRIC,
2588                                           opts,
2589                                           set_result_cb,
2590                                           task);
2591
2592   /* If the task hasn't been started yet,
2593      we wait for that until we commit. */
2594
2595   if (GNUNET_YES == task->is_started)
2596   {
2597     commit_set (session, task);
2598   }
2599 }
2600
2601
2602
2603 static void
2604 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2605           struct TaskEntry *t)
2606 {
2607   struct GNUNET_HashCode round_hash;
2608   struct Step *s;
2609
2610   GNUNET_assert (NULL != t->step);
2611
2612   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2613
2614   s = t->step;
2615
2616   if (s->tasks_len == s->tasks_cap)
2617   {
2618     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2619     GNUNET_array_grow (s->tasks,
2620                        s->tasks_cap,
2621                        target_size);
2622   }
2623
2624 #ifdef GNUNET_EXTRA_LOGGING
2625   GNUNET_assert (NULL != s->debug_name);
2626   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2627               debug_str_task_key (&t->key),
2628               s->debug_name);
2629 #endif
2630
2631   s->tasks[s->tasks_len] = t;
2632   s->tasks_len++;
2633
2634   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2635   GNUNET_assert (GNUNET_OK ==
2636       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2637                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2638 }
2639
2640
2641 static void
2642 install_step_timeouts (struct ConsensusSession *session)
2643 {
2644   /* Given the fully constructed task graph
2645      with rounds for tasks, we can give the tasks timeouts. */
2646
2647   // unsigned int max_round;
2648
2649   /* XXX: implement! */
2650 }
2651
2652
2653
2654 /*
2655  * Arrange two peers in some canonical order.
2656  */
2657 static void
2658 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2659 {
2660   uint16_t a;
2661   uint16_t b;
2662
2663   GNUNET_assert (*p1 < n);
2664   GNUNET_assert (*p2 < n);
2665
2666   if (*p1 < *p2)
2667   {
2668     a = *p1;
2669     b = *p2;
2670   }
2671   else
2672   {
2673     a = *p2;
2674     b = *p1;
2675   }
2676
2677   /* For uniformly random *p1, *p2,
2678      this condition is true with 50% chance */
2679   if (((b - a) + n) % n <= n / 2)
2680   {
2681     *p1 = a;
2682     *p2 = b;
2683   }
2684   else
2685   {
2686     *p1 = b;
2687     *p2 = a;
2688   }
2689 }
2690
2691
2692 /**
2693  * Record @a dep as a dependency of @a step.
2694  */
2695 static void
2696 step_depend_on (struct Step *step, struct Step *dep)
2697 {
2698   /* We're not checking for cyclic dependencies,
2699      but this is a cheap sanity check. */
2700   GNUNET_assert (step != dep);
2701   GNUNET_assert (NULL != step);
2702   GNUNET_assert (NULL != dep);
2703   GNUNET_assert (dep->round <= step->round);
2704
2705 #ifdef GNUNET_EXTRA_LOGGING
2706   /* Make sure we have complete debugging information.
2707      Also checks that we don't screw up too badly
2708      constructing the task graph. */
2709   GNUNET_assert (NULL != step->debug_name);
2710   GNUNET_assert (NULL != dep->debug_name);
2711   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2712               "Making step `%s' depend on `%s'\n",
2713               step->debug_name,
2714               dep->debug_name);
2715 #endif
2716
2717   if (dep->subordinates_cap == dep->subordinates_len)
2718   {
2719     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2720     GNUNET_array_grow (dep->subordinates,
2721                        dep->subordinates_cap,
2722                        target_size);
2723   }
2724
2725   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2726
2727   dep->subordinates[dep->subordinates_len] = step;
2728   dep->subordinates_len++;
2729
2730   step->pending_prereq++;
2731 }
2732
2733
2734 static struct Step *
2735 create_step (struct ConsensusSession *session, int round, int early_finishable)
2736 {
2737   struct Step *step;
2738   step = GNUNET_new (struct Step);
2739   step->session = session;
2740   step->round = round;
2741   step->early_finishable = early_finishable;
2742   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2743                                     session->steps_tail,
2744                                     step);
2745   return step;
2746 }
2747
2748
2749 /**
2750  * Construct the task graph for a single
2751  * gradecast.
2752  */
2753 static void
2754 construct_task_graph_gradecast (struct ConsensusSession *session,
2755                                 uint16_t rep,
2756                                 uint16_t lead,
2757                                 struct Step *step_before,
2758                                 struct Step *step_after)
2759 {
2760   uint16_t n = session->num_peers;
2761   uint16_t me = session->local_peer_idx;
2762
2763   uint16_t p1;
2764   uint16_t p2;
2765
2766   /* The task we're currently setting up. */
2767   struct TaskEntry task;
2768
2769   struct Step *step;
2770   struct Step *prev_step;
2771
2772   uint16_t round;
2773
2774   unsigned int k;
2775
2776   round = step_before->round + 1;
2777
2778   /* gcast step 1: leader disseminates */
2779
2780   step = create_step (session, round, GNUNET_YES);
2781
2782 #ifdef GNUNET_EXTRA_LOGGING
2783   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2784 #endif
2785   step_depend_on (step, step_before);
2786
2787   if (lead == me)
2788   {
2789     for (k = 0; k < n; k++)
2790     {
2791       if (k == me)
2792         continue;
2793       p1 = me;
2794       p2 = k;
2795       arrange_peers (&p1, &p2, n);
2796       task = ((struct TaskEntry) {
2797         .step = step,
2798         .start = task_start_reconcile,
2799         .cancel = task_cancel_reconcile,
2800         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2801       });
2802       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2803       put_task (session->taskmap, &task);
2804     }
2805     /* We run this task to make sure that the leader
2806        has the stored the SET_KIND_LEADER set of himself,
2807        so he can participate in the rest of the gradecast
2808        without the code having to handle any special cases. */
2809     task = ((struct TaskEntry) {
2810       .step = step,
2811       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2812       .start = task_start_reconcile,
2813       .cancel = task_cancel_reconcile,
2814     });
2815     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2816     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2817     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2818     put_task (session->taskmap, &task);
2819   }
2820   else
2821   {
2822     p1 = me;
2823     p2 = lead;
2824     arrange_peers (&p1, &p2, n);
2825     task = ((struct TaskEntry) {
2826       .step = step,
2827       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2828       .start = task_start_reconcile,
2829       .cancel = task_cancel_reconcile,
2830     });
2831     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2832     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2833     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2834     put_task (session->taskmap, &task);
2835   }
2836
2837   /* gcast phase 2: echo */
2838   prev_step = step;
2839   round += 1;
2840   step = create_step (session, round, GNUNET_YES);
2841 #ifdef GNUNET_EXTRA_LOGGING
2842   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2843 #endif
2844   step_depend_on (step, prev_step);
2845
2846   for (k = 0; k < n; k++)
2847   {
2848     p1 = k;
2849     p2 = me;
2850     arrange_peers (&p1, &p2, n);
2851     task = ((struct TaskEntry) {
2852       .step = step,
2853       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2854       .start = task_start_reconcile,
2855       .cancel = task_cancel_reconcile,
2856     });
2857     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2858     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2859     put_task (session->taskmap, &task);
2860   }
2861
2862   prev_step = step;
2863   /* Same round, since step only has local tasks */
2864   step = create_step (session, round, GNUNET_YES);
2865 #ifdef GNUNET_EXTRA_LOGGING
2866   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2867 #endif
2868   step_depend_on (step, prev_step);
2869
2870   arrange_peers (&p1, &p2, n);
2871   task = ((struct TaskEntry) {
2872     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2873     .step = step,
2874     .start = task_start_eval_echo
2875   });
2876   put_task (session->taskmap, &task);
2877
2878   prev_step = step;
2879   round += 1;
2880   step = create_step (session, round, GNUNET_YES);
2881 #ifdef GNUNET_EXTRA_LOGGING
2882   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2883 #endif
2884   step_depend_on (step, prev_step);
2885
2886   /* gcast phase 3: confirmation and grading */
2887   for (k = 0; k < n; k++)
2888   {
2889     p1 = k;
2890     p2 = me;
2891     arrange_peers (&p1, &p2, n);
2892     task = ((struct TaskEntry) {
2893       .step = step,
2894       .start = task_start_reconcile,
2895       .cancel = task_cancel_reconcile,
2896       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2897     });
2898     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2899     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2900     /* If there was at least one element in the echo round that was
2901        contested (i.e. it had no n-t majority), then we let the other peers
2902        know, and other peers let us know.  The contested flag for each peer is
2903        stored in the rfn. */
2904     task.cls.setop.transceive_contested = GNUNET_YES;
2905     put_task (session->taskmap, &task);
2906   }
2907
2908   prev_step = step;
2909   /* Same round, since step only has local tasks */
2910   step = create_step (session, round, GNUNET_YES);
2911 #ifdef GNUNET_EXTRA_LOGGING
2912   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2913 #endif
2914   step_depend_on (step, prev_step);
2915
2916   task = ((struct TaskEntry) {
2917     .step = step,
2918     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2919     .start = task_start_grade,
2920   });
2921   put_task (session->taskmap, &task);
2922
2923   step_depend_on (step_after, step);
2924 }
2925
2926
2927 static void
2928 construct_task_graph (struct ConsensusSession *session)
2929 {
2930   uint16_t n = session->num_peers;
2931   uint16_t t = n / 3;
2932
2933   uint16_t me = session->local_peer_idx;
2934
2935   /* The task we're currently setting up. */
2936   struct TaskEntry task;
2937
2938   /* Current leader */
2939   unsigned int lead;
2940
2941   struct Step *step;
2942   struct Step *prev_step;
2943
2944   unsigned int round = 0;
2945
2946   unsigned int i;
2947
2948   // XXX: introduce first step,
2949   // where we wait for all insert acks
2950   // from the set service
2951
2952   /* faster but brittle all-to-all */
2953
2954   // XXX: Not implemented yet
2955
2956   /* all-to-all step */
2957
2958   step = create_step (session, round, GNUNET_NO);
2959
2960 #ifdef GNUNET_EXTRA_LOGGING
2961   step->debug_name = GNUNET_strdup ("all to all");
2962 #endif
2963
2964   for (i = 0; i < n; i++)
2965   {
2966     uint16_t p1;
2967     uint16_t p2;
2968
2969     p1 = me;
2970     p2 = i;
2971     arrange_peers (&p1, &p2, n);
2972     task = ((struct TaskEntry) {
2973       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2974       .step = step,
2975       .start = task_start_reconcile,
2976       .cancel = task_cancel_reconcile,
2977     });
2978     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2979     task.cls.setop.output_set = task.cls.setop.input_set;
2980     task.cls.setop.do_not_remove = GNUNET_YES;
2981     put_task (session->taskmap, &task);
2982   }
2983
2984   round += 1;
2985   prev_step = step;
2986   step = create_step (session, round, GNUNET_NO);;
2987 #ifdef GNUNET_EXTRA_LOGGING
2988   step->debug_name = GNUNET_strdup ("all to all 2");
2989 #endif
2990   step_depend_on (step, prev_step);
2991
2992
2993   for (i = 0; i < n; i++)
2994   {
2995     uint16_t p1;
2996     uint16_t p2;
2997
2998     p1 = me;
2999     p2 = i;
3000     arrange_peers (&p1, &p2, n);
3001     task = ((struct TaskEntry) {
3002       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3003       .step = step,
3004       .start = task_start_reconcile,
3005       .cancel = task_cancel_reconcile,
3006     });
3007     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3008     task.cls.setop.output_set = task.cls.setop.input_set;
3009     task.cls.setop.do_not_remove = GNUNET_YES;
3010     put_task (session->taskmap, &task);
3011   }
3012
3013   round += 1;
3014
3015   prev_step = step;
3016   step = NULL;
3017
3018
3019
3020   /* Byzantine union */
3021
3022   /* sequential repetitions of the gradecasts */
3023   for (i = 0; i < t + 1; i++)
3024   {
3025     struct Step *step_rep_start;
3026     struct Step *step_rep_end;
3027
3028     /* Every repetition is in a separate round. */
3029     step_rep_start = create_step (session, round, GNUNET_YES);
3030 #ifdef GNUNET_EXTRA_LOGGING
3031     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3032 #endif
3033
3034     step_depend_on (step_rep_start, prev_step);
3035
3036     /* gradecast has three rounds */
3037     round += 3;
3038     step_rep_end = create_step (session, round, GNUNET_YES);
3039 #ifdef GNUNET_EXTRA_LOGGING
3040     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3041 #endif
3042
3043     /* parallel gradecasts */
3044     for (lead = 0; lead < n; lead++)
3045       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3046
3047     task = ((struct TaskEntry) {
3048       .step = step_rep_end,
3049       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3050       .start = task_start_apply_round,
3051     });
3052     put_task (session->taskmap, &task);
3053
3054     prev_step = step_rep_end;
3055   }
3056
3057  /* There is no next gradecast round, thus the final
3058     start step is the overall end step of the gradecasts */
3059   round += 1;
3060   step = create_step (session, round, GNUNET_NO);
3061 #ifdef GNUNET_EXTRA_LOGGING
3062   GNUNET_asprintf (&step->debug_name, "finish");
3063 #endif
3064   step_depend_on (step, prev_step);
3065
3066   task = ((struct TaskEntry) {
3067     .step = step,
3068     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3069     .start = task_start_finish,
3070   });
3071   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3072
3073   put_task (session->taskmap, &task);
3074 }
3075
3076
3077
3078 /**
3079  * Check join message.
3080  *
3081  * @param cls session of client that sent the message
3082  * @param m message sent by the client
3083  * @return #GNUNET_OK if @a m is well-formed
3084  */
3085 static int
3086 check_client_join (void *cls,
3087                    const struct GNUNET_CONSENSUS_JoinMessage *m)
3088 {
3089   uint32_t listed_peers = ntohl (m->num_peers);
3090
3091   if ( (ntohs (m->header.size) - sizeof (*m)) !=
3092        listed_peers * sizeof (struct GNUNET_PeerIdentity))
3093   {
3094     GNUNET_break (0);
3095     return GNUNET_SYSERR;
3096   }
3097   return GNUNET_OK;
3098 }
3099
3100
3101 /**
3102  * Called when a client wants to join a consensus session.
3103  *
3104  * @param cls session of client that sent the message
3105  * @param m message sent by the client
3106  */
3107 static void
3108 handle_client_join (void *cls,
3109                     const struct GNUNET_CONSENSUS_JoinMessage *m)
3110 {
3111   struct ConsensusSession *session = cls;
3112   struct ConsensusSession *other_session;
3113
3114   initialize_session_peer_list (session,
3115                                 m);
3116   compute_global_id (session,
3117                      &m->session_id);
3118
3119   /* Check if some local client already owns the session.
3120      It is only legal to have a session with an existing global id
3121      if all other sessions with this global id are finished.*/
3122   for (other_session = sessions_head;
3123        NULL != other_session;
3124        other_session = other_session->next)
3125   {
3126     if ( (other_session != session) &&
3127          (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3128                                        &other_session->global_id)) )
3129       break;
3130   }
3131
3132   session->conclude_deadline
3133     = GNUNET_TIME_absolute_ntoh (m->deadline);
3134   session->conclude_start
3135     = GNUNET_TIME_absolute_ntoh (m->start);
3136   session->local_peer_idx = get_peer_idx (&my_peer,
3137                                           session);
3138   GNUNET_assert (-1 != session->local_peer_idx);
3139
3140   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3141               "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3142               GNUNET_h2s (&m->session_id),
3143               session->num_peers,
3144               session->local_peer_idx,
3145               GNUNET_STRINGS_relative_time_to_string
3146               (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3147                                                     session->conclude_deadline),
3148                GNUNET_YES));
3149
3150   session->set_listener
3151     = GNUNET_SET_listen (cfg,
3152                          GNUNET_SET_OPERATION_UNION,
3153                          &session->global_id,
3154                          &set_listen_cb,
3155                          session);
3156
3157   session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3158                                                           GNUNET_NO);
3159   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3160                                                            GNUNET_NO);
3161   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3162                                                            GNUNET_NO);
3163   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3164                                                           GNUNET_NO);
3165
3166   {
3167     struct SetEntry *client_set;
3168
3169     client_set = GNUNET_new (struct SetEntry);
3170     client_set->h = GNUNET_SET_create (cfg,
3171                                        GNUNET_SET_OPERATION_UNION);
3172     struct SetHandle *sh = GNUNET_new (struct SetHandle);
3173     sh->h = client_set->h;
3174     GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3175                                  session->set_handles_tail,
3176                                  sh);
3177     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3178     put_set (session,
3179              client_set);
3180   }
3181
3182   session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3183                                                  int);
3184
3185   /* Just construct the task graph,
3186      but don't run anything until the client calls conclude. */
3187   construct_task_graph (session);
3188   GNUNET_SERVICE_client_continue (session->client);
3189 }
3190
3191
3192 static void
3193 client_insert_done (void *cls)
3194 {
3195   // FIXME: implement
3196 }
3197
3198
3199 /**
3200  * Called when a client performs an insert operation.
3201  *
3202  * @param cls client handle
3203  * @param msg message sent by the client
3204  * @return #GNUNET_OK (always well-formed)
3205  */
3206 static int
3207 check_client_insert (void *cls,
3208                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3209 {
3210   return GNUNET_OK;
3211 }
3212
3213
3214 /**
3215  * Called when a client performs an insert operation.
3216  *
3217  * @param cls client handle
3218  * @param msg message sent by the client
3219  */
3220 static void
3221 handle_client_insert (void *cls,
3222                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3223 {
3224   struct ConsensusSession *session = cls;
3225   ssize_t element_size;
3226   struct GNUNET_SET_Handle *initial_set;
3227   struct ConsensusElement *ce;
3228
3229   if (GNUNET_YES == session->conclude_started)
3230   {
3231     GNUNET_break (0);
3232     GNUNET_SERVICE_client_drop (session->client);
3233     return;
3234   }
3235
3236   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3237   ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3238   GNUNET_memcpy (&ce[1], &msg[1], element_size);
3239   ce->payload_type = msg->element_type;
3240
3241   struct GNUNET_SET_Element element = {
3242     .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3243     .size = sizeof (struct ConsensusElement) + element_size,
3244     .data = ce,
3245   };
3246
3247   {
3248     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3249     struct SetEntry *entry;
3250
3251     entry = lookup_set (session,
3252                         &key);
3253     GNUNET_assert (NULL != entry);
3254     initial_set = entry->h;
3255   }
3256
3257   session->num_client_insert_pending++;
3258   GNUNET_SET_add_element (initial_set,
3259                           &element,
3260                           &client_insert_done,
3261                           session);
3262
3263 #ifdef GNUNET_EXTRA_LOGGING
3264   {
3265     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3266                 "P%u: element %s added\n",
3267                 session->local_peer_idx,
3268                 debug_str_element (&element));
3269   }
3270 #endif
3271   GNUNET_free (ce);
3272   GNUNET_SERVICE_client_continue (session->client);
3273 }
3274
3275
3276 /**
3277  * Called when a client performs the conclude operation.
3278  *
3279  * @param cls client handle
3280  * @param message message sent by the client
3281  */
3282 static void
3283 handle_client_conclude (void *cls,
3284                         const struct GNUNET_MessageHeader *message)
3285 {
3286   struct ConsensusSession *session = cls;
3287
3288   if (GNUNET_YES == session->conclude_started)
3289   {
3290     /* conclude started twice */
3291     GNUNET_break (0);
3292     GNUNET_SERVICE_client_drop (session->client);
3293     return;
3294   }
3295   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3296               "conclude requested\n");
3297   session->conclude_started = GNUNET_YES;
3298   install_step_timeouts (session);
3299   run_ready_steps (session);
3300   GNUNET_SERVICE_client_continue (session->client);
3301 }
3302
3303
3304 /**
3305  * Called to clean up, after a shutdown has been requested.
3306  *
3307  * @param cls closure
3308  */
3309 static void
3310 shutdown_task (void *cls)
3311 {
3312   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3313               "shutting down\n");
3314   GNUNET_STATISTICS_destroy (statistics,
3315                              GNUNET_NO);
3316   statistics = NULL;
3317 }
3318
3319
3320 /**
3321  * Start processing consensus requests.
3322  *
3323  * @param cls closure
3324  * @param c configuration to use
3325  * @param service the initialized service
3326  */
3327 static void
3328 run (void *cls,
3329      const struct GNUNET_CONFIGURATION_Handle *c,
3330      struct GNUNET_SERVICE_Handle *service)
3331 {
3332   cfg = c;
3333   if (GNUNET_OK !=
3334       GNUNET_CRYPTO_get_peer_identity (cfg,
3335                                        &my_peer))
3336   {
3337     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3338                 "Could not retrieve host identity\n");
3339     GNUNET_SCHEDULER_shutdown ();
3340     return;
3341   }
3342   statistics = GNUNET_STATISTICS_create ("consensus",
3343                                          cfg);
3344   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3345                                  NULL);
3346 }
3347
3348
3349 /**
3350  * Callback called when a client connects to the service.
3351  *
3352  * @param cls closure for the service
3353  * @param c the new client that connected to the service
3354  * @param mq the message queue used to send messages to the client
3355  * @return @a c
3356  */
3357 static void *
3358 client_connect_cb (void *cls,
3359                    struct GNUNET_SERVICE_Client *c,
3360                    struct GNUNET_MQ_Handle *mq)
3361 {
3362   struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3363
3364   session->client = c;
3365   session->client_mq = mq;
3366   GNUNET_CONTAINER_DLL_insert (sessions_head,
3367                                sessions_tail,
3368                                session);
3369   return session;
3370 }
3371
3372
3373 /**
3374  * Callback called when a client disconnected from the service
3375  *
3376  * @param cls closure for the service
3377  * @param c the client that disconnected
3378  * @param internal_cls should be equal to @a c
3379  */
3380 static void
3381 client_disconnect_cb (void *cls,
3382                       struct GNUNET_SERVICE_Client *c,
3383                       void *internal_cls)
3384 {
3385   struct ConsensusSession *session = internal_cls;
3386
3387   if (NULL != session->set_listener)
3388   {
3389     GNUNET_SET_listen_cancel (session->set_listener);
3390     session->set_listener = NULL;
3391   }
3392   GNUNET_CONTAINER_DLL_remove (sessions_head,
3393                                sessions_tail,
3394                                session);
3395
3396   while (session->set_handles_head)
3397   {
3398     struct SetHandle *sh = session->set_handles_head;
3399     session->set_handles_head = sh->next;
3400     GNUNET_SET_destroy (sh->h);
3401     GNUNET_free (sh);
3402   }
3403   GNUNET_free (session);
3404 }
3405
3406
3407 /**
3408  * Define "main" method using service macro.
3409  */
3410 GNUNET_SERVICE_MAIN
3411 ("consensus",
3412  GNUNET_SERVICE_OPTION_NONE,
3413  &run,
3414  &client_connect_cb,
3415  &client_disconnect_cb,
3416  NULL,
3417  GNUNET_MQ_hd_fixed_size (client_conclude,
3418                           GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3419                           struct GNUNET_MessageHeader,
3420                           NULL),
3421  GNUNET_MQ_hd_var_size (client_insert,
3422                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3423                         struct GNUNET_CONSENSUS_ElementMessage,
3424                         NULL),
3425  GNUNET_MQ_hd_var_size (client_join,
3426                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3427                         struct GNUNET_CONSENSUS_JoinMessage,
3428                         NULL),
3429  GNUNET_MQ_handler_end ());
3430
3431 /* end of gnunet-service-consensus.c */