Merge branch 'master' of gnunet.org:gnunet
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_consensus_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
37
38
39 enum ReferendumVote
40 {
41   /**
42    * Vote that nothing should change.
43    * This option is never voted explicitly.
44    */
45   VOTE_STAY = 0,
46   /**
47    * Vote that an element should be added.
48    */
49   VOTE_ADD = 1,
50   /**
51    * Vote that an element should be removed.
52    */
53   VOTE_REMOVE = 2,
54 };
55
56
57 enum EarlyStoppingPhase
58 {
59   EARLY_STOPPING_NONE = 0,
60   EARLY_STOPPING_ONE_MORE = 1,
61   EARLY_STOPPING_DONE = 2,
62 };
63
64
65 GNUNET_NETWORK_STRUCT_BEGIN
66
67 /**
68  * Tuple of integers that together
69  * identify a task uniquely.
70  */
71 struct TaskKey {
72   /**
73    * A value from 'enum PhaseKind'.
74    */
75   uint16_t kind GNUNET_PACKED;
76
77   /**
78    * Number of the first peer
79    * in canonical order.
80    */
81   int16_t peer1 GNUNET_PACKED;
82
83   /**
84    * Number of the second peer in canonical order.
85    */
86   int16_t peer2 GNUNET_PACKED;
87
88   /**
89    * Repetition of the gradecast phase.
90    */
91   int16_t repetition GNUNET_PACKED;
92
93   /**
94    * Leader in the gradecast phase.
95    *
96    * Can be different from both peer1 and peer2.
97    */
98   int16_t leader GNUNET_PACKED;
99 };
100
101
102
103 struct SetKey
104 {
105   int set_kind GNUNET_PACKED;
106   int k1 GNUNET_PACKED;
107   int k2 GNUNET_PACKED;
108 };
109
110
111 struct SetEntry
112 {
113   struct SetKey key;
114   struct GNUNET_SET_Handle *h;
115   /**
116    * GNUNET_YES if the set resulted
117    * from applying a referendum with contested
118    * elements.
119    */
120   int is_contested;
121 };
122
123
124 struct DiffKey
125 {
126   int diff_kind GNUNET_PACKED;
127   int k1 GNUNET_PACKED;
128   int k2 GNUNET_PACKED;
129 };
130
131 struct RfnKey
132 {
133   int rfn_kind GNUNET_PACKED;
134   int k1 GNUNET_PACKED;
135   int k2 GNUNET_PACKED;
136 };
137
138
139 GNUNET_NETWORK_STRUCT_END
140
141 enum PhaseKind
142 {
143   PHASE_KIND_ALL_TO_ALL,
144   PHASE_KIND_ALL_TO_ALL_2,
145   PHASE_KIND_GRADECAST_LEADER,
146   PHASE_KIND_GRADECAST_ECHO,
147   PHASE_KIND_GRADECAST_ECHO_GRADE,
148   PHASE_KIND_GRADECAST_CONFIRM,
149   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
150   /**
151    * Apply a repetition of the all-to-all
152    * gradecast to the current set.
153    */
154   PHASE_KIND_APPLY_REP,
155   PHASE_KIND_FINISH,
156 };
157
158
159 enum SetKind
160 {
161   SET_KIND_NONE = 0,
162   SET_KIND_CURRENT,
163   /**
164    * Last result set from a gradecast
165    */
166   SET_KIND_LAST_GRADECAST,
167   SET_KIND_LEADER_PROPOSAL,
168   SET_KIND_ECHO_RESULT,
169 };
170
171 enum DiffKind
172 {
173   DIFF_KIND_NONE = 0,
174   DIFF_KIND_LEADER_PROPOSAL,
175   DIFF_KIND_LEADER_CONSENSUS,
176   DIFF_KIND_GRADECAST_RESULT,
177 };
178
179 enum RfnKind
180 {
181   RFN_KIND_NONE = 0,
182   RFN_KIND_ECHO,
183   RFN_KIND_CONFIRM,
184   RFN_KIND_GRADECAST_RESULT
185 };
186
187
188 struct SetOpCls
189 {
190   struct SetKey input_set;
191
192   struct SetKey output_set;
193   struct RfnKey output_rfn;
194   struct DiffKey output_diff;
195
196   int do_not_remove;
197
198   int transceive_contested;
199
200   struct GNUNET_SET_OperationHandle *op;
201 };
202
203
204 struct FinishCls
205 {
206   struct SetKey input_set;
207 };
208
209 /**
210  * Closure for both @a start_task
211  * and @a cancel_task.
212  */
213 union TaskFuncCls
214 {
215   struct SetOpCls setop;
216   struct FinishCls finish;
217 };
218
219 struct TaskEntry;
220
221 typedef void (*TaskFunc) (struct TaskEntry *task);
222
223 /*
224  * Node in the consensus task graph.
225  */
226 struct TaskEntry
227 {
228   struct TaskKey key;
229
230   struct Step *step;
231
232   int is_started;
233
234   int is_finished;
235
236   TaskFunc start;
237   TaskFunc cancel;
238
239   union TaskFuncCls cls;
240 };
241
242
243 struct Step
244 {
245   /**
246    * All steps of one session are in a
247    * linked list for easier deallocation.
248    */
249   struct Step *prev;
250
251   /**
252    * All steps of one session are in a
253    * linked list for easier deallocation.
254    */
255   struct Step *next;
256
257   struct ConsensusSession *session;
258
259   /**
260    * Tasks that this step is composed of.
261    */
262   struct TaskEntry **tasks;
263   unsigned int tasks_len;
264   unsigned int tasks_cap;
265
266   unsigned int finished_tasks;
267
268   /*
269    * Tasks that have this task as dependency.
270    *
271    * We store pointers to subordinates rather
272    * than to prerequisites since it makes
273    * tracking the readiness of a task easier.
274    */
275   struct Step **subordinates;
276   unsigned int subordinates_len;
277   unsigned int subordinates_cap;
278
279   /**
280    * Counter for the prerequisites of
281    * this step.
282    */
283   size_t pending_prereq;
284
285   /*
286    * Task that will run this step despite
287    * any pending prerequisites.
288    */
289   struct GNUNET_SCHEDULER_Task *timeout_task;
290
291   unsigned int is_running;
292
293   unsigned int is_finished;
294
295   /*
296    * Synchrony round of the task.
297    * Determines the deadline for the task.
298    */
299   unsigned int round;
300
301   /**
302    * Human-readable name for
303    * the task, used for debugging.
304    */
305   char *debug_name;
306
307   /**
308    * When we're doing an early finish, how should this step be
309    * treated?
310    * If GNUNET_YES, the step will be marked as finished
311    * without actually running its tasks.
312    * Otherwise, the step will still be run even after
313    * an early finish.
314    *
315    * Note that a task may never be finished early if
316    * it is already running.
317    */
318   int early_finishable;
319 };
320
321
322 struct RfnElementInfo
323 {
324   const struct GNUNET_SET_Element *element;
325
326   /*
327    * GNUNET_YES if the peer votes for the proposal.
328    */
329   int *votes;
330
331   /**
332    * Proposal for this element,
333    * can only be VOTE_ADD or VOTE_REMOVE.
334    */
335   enum ReferendumVote proposal;
336 };
337
338
339 struct ReferendumEntry
340 {
341   struct RfnKey key;
342
343   /*
344    * Elements where there is at least one proposed change.
345    *
346    * Maps the hash of the GNUNET_SET_Element
347    * to 'struct RfnElementInfo'.
348    */
349   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
350
351   unsigned int num_peers;
352
353   /**
354    * Stores, for every peer in the session,
355    * whether the peer finished the whole referendum.
356    *
357    * Votes from peers are only counted if they're
358    * marked as commited (#GNUNET_YES) in the referendum.
359    *
360    * Otherwise (#GNUNET_NO), the requested changes are
361    * not counted for majority votes or thresholds.
362    */
363   int *peer_commited;
364
365
366   /**
367    * Contestation state of the peer.  If a peer is contested, the values it
368    * contributed are still counted for applying changes, but the grading is
369    * affected.
370    */
371   int *peer_contested;
372 };
373
374
375 struct DiffElementInfo
376 {
377   const struct GNUNET_SET_Element *element;
378
379   /**
380    * Positive weight for 'add', negative
381    * weights for 'remove'.
382    */
383   int weight;
384 };
385
386
387 /**
388  * Weighted diff.
389  */
390 struct DiffEntry
391 {
392   struct DiffKey key;
393   struct GNUNET_CONTAINER_MultiHashMap *changes;
394 };
395
396 struct SetHandle
397 {
398   struct SetHandle *prev;
399   struct SetHandle *next;
400
401   struct GNUNET_SET_Handle *h;
402 };
403
404
405
406 /**
407  * A consensus session consists of one local client and the remote authorities.
408  */
409 struct ConsensusSession
410 {
411   /**
412    * Consensus sessions are kept in a DLL.
413    */
414   struct ConsensusSession *next;
415
416   /**
417    * Consensus sessions are kept in a DLL.
418    */
419   struct ConsensusSession *prev;
420
421   unsigned int num_client_insert_pending;
422
423   struct GNUNET_CONTAINER_MultiHashMap *setmap;
424   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
425   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
426
427   /**
428    * Array of peers with length 'num_peers'.
429    */
430   int *peers_blacklisted;
431
432   /*
433    * Mapping from (hashed) TaskKey to TaskEntry.
434    *
435    * We map the application_id for a round to the task that should be
436    * executed, so we don't have to go through all task whenever we get
437    * an incoming set op request.
438    */
439   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
440
441   struct Step *steps_head;
442   struct Step *steps_tail;
443
444   int conclude_started;
445
446   int conclude_done;
447
448   /**
449   * Global consensus identification, computed
450   * from the session id and participating authorities.
451   */
452   struct GNUNET_HashCode global_id;
453
454   /**
455    * Client that inhabits the session
456    */
457   struct GNUNET_SERVICE_Client *client;
458
459   /**
460    * Queued messages to the client.
461    */
462   struct GNUNET_MQ_Handle *client_mq;
463
464   /**
465    * Time when the conclusion of the consensus should begin.
466    */
467   struct GNUNET_TIME_Absolute conclude_start;
468
469   /**
470    * Timeout for all rounds together, single rounds will schedule a timeout task
471    * with a fraction of the conclude timeout.
472    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
473    */
474   struct GNUNET_TIME_Absolute conclude_deadline;
475
476   struct GNUNET_PeerIdentity *peers;
477
478   /**
479    * Number of other peers in the consensus.
480    */
481   unsigned int num_peers;
482
483   /**
484    * Index of the local peer in the peers array
485    */
486   unsigned int local_peer_idx;
487
488   /**
489    * Listener for requests from other peers.
490    * Uses the session's global id as app id.
491    */
492   struct GNUNET_SET_ListenHandle *set_listener;
493
494   /**
495    * State of our early stopping scheme.
496    */
497   int early_stopping;
498
499   /**
500    * Our set size from the first round.
501    */
502   uint64_t first_size;
503
504   uint64_t *first_sizes_received;
505
506   /**
507    * Bounded Eppstein lower bound.
508    */
509   uint64_t lower_bound;
510
511   struct SetHandle *set_handles_head;
512   struct SetHandle *set_handles_tail;
513 };
514
515 /**
516  * Linked list of sessions this peer participates in.
517  */
518 static struct ConsensusSession *sessions_head;
519
520 /**
521  * Linked list of sessions this peer participates in.
522  */
523 static struct ConsensusSession *sessions_tail;
524
525 /**
526  * Configuration of the consensus service.
527  */
528 static const struct GNUNET_CONFIGURATION_Handle *cfg;
529
530 /**
531  * Peer that runs this service.
532  */
533 static struct GNUNET_PeerIdentity my_peer;
534
535 /**
536  * Statistics handle.
537  */
538 struct GNUNET_STATISTICS_Handle *statistics;
539
540
541 static void
542 finish_task (struct TaskEntry *task);
543
544
545 static void
546 run_ready_steps (struct ConsensusSession *session);
547
548
549 static const char *
550 phasename (uint16_t phase)
551 {
552   switch (phase)
553   {
554     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
555     case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
556     case PHASE_KIND_FINISH: return "FINISH";
557     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
558     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
559     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
560     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
561     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
562     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
563     default: return "(unknown)";
564   }
565 }
566
567
568 static const char *
569 setname (uint16_t kind)
570 {
571   switch (kind)
572   {
573     case SET_KIND_CURRENT: return "CURRENT";
574     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
575     case SET_KIND_NONE: return "NONE";
576     default: return "(unknown)";
577   }
578 }
579
580 static const char *
581 rfnname (uint16_t kind)
582 {
583   switch (kind)
584   {
585     case RFN_KIND_NONE: return "NONE";
586     case RFN_KIND_ECHO: return "ECHO";
587     case RFN_KIND_CONFIRM: return "CONFIRM";
588     default: return "(unknown)";
589   }
590 }
591
592 static const char *
593 diffname (uint16_t kind)
594 {
595   switch (kind)
596   {
597     case DIFF_KIND_NONE: return "NONE";
598     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
599     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
600     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
601     default: return "(unknown)";
602   }
603 }
604
605 #ifdef GNUNET_EXTRA_LOGGING
606
607
608 static const char *
609 debug_str_element (const struct GNUNET_SET_Element *el)
610 {
611   struct GNUNET_HashCode hash;
612
613   GNUNET_SET_element_hash (el, &hash);
614
615   return GNUNET_h2s (&hash);
616 }
617
618 static const char *
619 debug_str_task_key (struct TaskKey *tk)
620 {
621   static char buf[256];
622
623   snprintf (buf, sizeof (buf),
624             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
625             phasename (tk->kind), tk->peer1, tk->peer2,
626             tk->leader, tk->repetition);
627
628   return buf;
629 }
630
631 static const char *
632 debug_str_diff_key (struct DiffKey *dk)
633 {
634   static char buf[256];
635
636   snprintf (buf, sizeof (buf),
637             "DiffKey kind=%s, k1=%d, k2=%d",
638             diffname (dk->diff_kind), dk->k1, dk->k2);
639
640   return buf;
641 }
642
643 static const char *
644 debug_str_set_key (const struct SetKey *sk)
645 {
646   static char buf[256];
647
648   snprintf (buf, sizeof (buf),
649             "SetKey kind=%s, k1=%d, k2=%d",
650             setname (sk->set_kind), sk->k1, sk->k2);
651
652   return buf;
653 }
654
655
656 static const char *
657 debug_str_rfn_key (const struct RfnKey *rk)
658 {
659   static char buf[256];
660
661   snprintf (buf, sizeof (buf),
662             "RfnKey kind=%s, k1=%d, k2=%d",
663             rfnname (rk->rfn_kind), rk->k1, rk->k2);
664
665   return buf;
666 }
667
668 #endif /* GNUNET_EXTRA_LOGGING */
669
670
671 /**
672  * Send the final result set of the consensus to the client, element by
673  * element.
674  *
675  * @param cls closure
676  * @param element the current element, NULL if all elements have been
677  *        iterated over
678  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
679  */
680 static int
681 send_to_client_iter (void *cls,
682                      const struct GNUNET_SET_Element *element)
683 {
684   struct TaskEntry *task = (struct TaskEntry *) cls;
685   struct ConsensusSession *session = task->step->session;
686   struct GNUNET_MQ_Envelope *ev;
687
688   if (NULL != element)
689   {
690     struct GNUNET_CONSENSUS_ElementMessage *m;
691     const struct ConsensusElement *ce;
692
693     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
694     ce = element->data;
695
696     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "marker is %u\n", (unsigned) ce->marker);
697
698     if (0 != ce->marker)
699       return GNUNET_YES;
700
701     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702                 "P%d: sending element %s to client\n",
703                 session->local_peer_idx,
704                 debug_str_element (element));
705
706     ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
707                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
708     m->element_type = ce->payload_type;
709     GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
710     GNUNET_MQ_send (session->client_mq, ev);
711   }
712   else
713   {
714     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715                 "P%d: finished iterating elements for client\n",
716                 session->local_peer_idx);
717     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
718     GNUNET_MQ_send (session->client_mq, ev);
719   }
720   return GNUNET_YES;
721 }
722
723
724 static struct SetEntry *
725 lookup_set (struct ConsensusSession *session, struct SetKey *key)
726 {
727   struct GNUNET_HashCode hash;
728
729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730               "P%u: looking up set {%s}\n",
731               session->local_peer_idx,
732               debug_str_set_key (key));
733
734   GNUNET_assert (SET_KIND_NONE != key->set_kind);
735   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
736   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
737 }
738
739
740 static struct DiffEntry *
741 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
742 {
743   struct GNUNET_HashCode hash;
744
745   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746               "P%u: looking up diff {%s}\n",
747               session->local_peer_idx,
748               debug_str_diff_key (key));
749
750   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
751   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
752   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
753 }
754
755
756 static struct ReferendumEntry *
757 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
758 {
759   struct GNUNET_HashCode hash;
760
761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762               "P%u: looking up rfn {%s}\n",
763               session->local_peer_idx,
764               debug_str_rfn_key (key));
765
766   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
767   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
768   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
769 }
770
771
772 static void
773 diff_insert (struct DiffEntry *diff,
774              int weight,
775              const struct GNUNET_SET_Element *element)
776 {
777   struct DiffElementInfo *di;
778   struct GNUNET_HashCode hash;
779
780   GNUNET_assert ( (1 == weight) || (-1 == weight));
781
782   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783               "diff_insert with element size %u\n",
784               element->size);
785
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "hashing element\n");
788
789   GNUNET_SET_element_hash (element, &hash);
790
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "hashed element\n");
793
794   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
795
796   if (NULL == di)
797   {
798     di = GNUNET_new (struct DiffElementInfo);
799     di->element = GNUNET_SET_element_dup (element);
800     GNUNET_assert (GNUNET_OK ==
801                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
802                                                       &hash, di,
803                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
804   }
805
806   di->weight = weight;
807 }
808
809
810 static void
811 rfn_commit (struct ReferendumEntry *rfn,
812             uint16_t commit_peer)
813 {
814   GNUNET_assert (commit_peer < rfn->num_peers);
815
816   rfn->peer_commited[commit_peer] = GNUNET_YES;
817 }
818
819
820 static void
821 rfn_contest (struct ReferendumEntry *rfn,
822              uint16_t contested_peer)
823 {
824   GNUNET_assert (contested_peer < rfn->num_peers);
825
826   rfn->peer_contested[contested_peer] = GNUNET_YES;
827 }
828
829
830 static uint16_t
831 rfn_noncontested (struct ReferendumEntry *rfn)
832 {
833   uint16_t i;
834   uint16_t ret;
835
836   ret = 0;
837   for (i = 0; i < rfn->num_peers; i++)
838     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
839       ret++;
840
841   return ret;
842 }
843
844
845 static void
846 rfn_vote (struct ReferendumEntry *rfn,
847           uint16_t voting_peer,
848           enum ReferendumVote vote,
849           const struct GNUNET_SET_Element *element)
850 {
851   struct RfnElementInfo *ri;
852   struct GNUNET_HashCode hash;
853
854   GNUNET_assert (voting_peer < rfn->num_peers);
855
856   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
857      since VOTE_KEEP is implicit in not voting. */
858   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
859
860   GNUNET_SET_element_hash (element, &hash);
861   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
862
863   if (NULL == ri)
864   {
865     ri = GNUNET_new (struct RfnElementInfo);
866     ri->element = GNUNET_SET_element_dup (element);
867     ri->votes = GNUNET_new_array (rfn->num_peers, int);
868     GNUNET_assert (GNUNET_OK ==
869                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
870                                                       &hash, ri,
871                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
872   }
873
874   ri->votes[voting_peer] = GNUNET_YES;
875   ri->proposal = vote;
876 }
877
878
879 static uint16_t
880 task_other_peer (struct TaskEntry *task)
881 {
882   uint16_t me = task->step->session->local_peer_idx;
883   if (task->key.peer1 == me)
884     return task->key.peer2;
885   return task->key.peer1;
886 }
887
888
889 static int
890 cmp_uint64_t (const void *pa, const void *pb)
891 {
892   uint64_t a = *(uint64_t *) pa;
893   uint64_t b = *(uint64_t *) pb;
894
895   if (a == b)
896     return 0;
897   if (a < b)
898     return -1;
899   return 1;
900 }
901
902
903 /**
904  * Callback for set operation results. Called for each element
905  * in the result set.
906  *
907  * @param cls closure
908  * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
909  * @param current_size current set size
910  * @param status see enum GNUNET_SET_Status
911  */
912 static void
913 set_result_cb (void *cls,
914                const struct GNUNET_SET_Element *element,
915                uint64_t current_size,
916                enum GNUNET_SET_Status status)
917 {
918   struct TaskEntry *task = cls;
919   struct ConsensusSession *session = task->step->session;
920   struct SetEntry *output_set = NULL;
921   struct DiffEntry *output_diff = NULL;
922   struct ReferendumEntry *output_rfn = NULL;
923   unsigned int other_idx;
924   struct SetOpCls *setop;
925   const struct ConsensusElement *consensus_element = NULL;
926
927   if (NULL != element)
928   {
929     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930                 "P%u: got element of type %u, status %u\n",
931                 session->local_peer_idx,
932                 (unsigned) element->element_type,
933                 (unsigned) status);
934     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
935     consensus_element = element->data;
936   }
937
938   setop = &task->cls.setop;
939
940
941   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942               "P%u: got set result for {%s}, status %u\n",
943               session->local_peer_idx,
944               debug_str_task_key (&task->key),
945               status);
946
947   if (GNUNET_NO == task->is_started)
948   {
949     GNUNET_break_op (0);
950     return;
951   }
952
953   if (GNUNET_YES == task->is_finished)
954   {
955     GNUNET_break_op (0);
956     return;
957   }
958
959   other_idx = task_other_peer (task);
960
961   if (SET_KIND_NONE != setop->output_set.set_kind)
962   {
963     output_set = lookup_set (session, &setop->output_set);
964     GNUNET_assert (NULL != output_set);
965   }
966
967   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
968   {
969     output_diff = lookup_diff (session, &setop->output_diff);
970     GNUNET_assert (NULL != output_diff);
971   }
972
973   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
974   {
975     output_rfn = lookup_rfn (session, &setop->output_rfn);
976     GNUNET_assert (NULL != output_rfn);
977   }
978
979   if (GNUNET_YES == session->peers_blacklisted[other_idx])
980   {
981     /* Peer might have been blacklisted
982        by a gradecast running in parallel, ignore elements from now */
983     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
984       return;
985     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
986       return;
987   }
988
989   if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
990   {
991     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
992                 "P%u: got some marker\n",
993                   session->local_peer_idx);
994     if ( (GNUNET_YES == setop->transceive_contested) &&
995          (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
996     {
997       GNUNET_assert (NULL != output_rfn);
998       rfn_contest (output_rfn, task_other_peer (task));
999       return;
1000     }
1001
1002     if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1003     {
1004
1005       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1006                   "P%u: got size marker\n",
1007                   session->local_peer_idx);
1008
1009
1010       struct ConsensusSizeElement *cse = (void *) consensus_element;
1011
1012       if (cse->sender_index == other_idx)
1013       {
1014         if (NULL == session->first_sizes_received)
1015           session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1016         session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1017
1018         uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1019         qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1020         session->lower_bound = copy[session->num_peers / 3 + 1];
1021         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1022                     "P%u: lower bound %llu\n",
1023                     session->local_peer_idx,
1024                     (long long) session->lower_bound);
1025         GNUNET_free (copy);
1026       }
1027       return;
1028     }
1029
1030     return;
1031   }
1032
1033   switch (status)
1034   {
1035     case GNUNET_SET_STATUS_ADD_LOCAL:
1036       GNUNET_assert (NULL != consensus_element);
1037       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038                   "Adding element in Task {%s}\n",
1039                   debug_str_task_key (&task->key));
1040       if (NULL != output_set)
1041       {
1042         // FIXME: record pending adds, use callback
1043         GNUNET_SET_add_element (output_set->h,
1044                                 element,
1045                                 NULL,
1046                                 NULL);
1047 #ifdef GNUNET_EXTRA_LOGGING
1048         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1049                     "P%u: adding element %s into set {%s} of task {%s}\n",
1050                     session->local_peer_idx,
1051                     debug_str_element (element),
1052                     debug_str_set_key (&setop->output_set),
1053                     debug_str_task_key (&task->key));
1054 #endif
1055       }
1056       if (NULL != output_diff)
1057       {
1058         diff_insert (output_diff, 1, element);
1059 #ifdef GNUNET_EXTRA_LOGGING
1060         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1061                     "P%u: adding element %s into diff {%s} of task {%s}\n",
1062                     session->local_peer_idx,
1063                     debug_str_element (element),
1064                     debug_str_diff_key (&setop->output_diff),
1065                     debug_str_task_key (&task->key));
1066 #endif
1067       }
1068       if (NULL != output_rfn)
1069       {
1070         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1071 #ifdef GNUNET_EXTRA_LOGGING
1072         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1073                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
1074                     session->local_peer_idx,
1075                     debug_str_element (element),
1076                     debug_str_rfn_key (&setop->output_rfn),
1077                     debug_str_task_key (&task->key));
1078 #endif
1079       }
1080       // XXX: add result to structures in task
1081       break;
1082     case GNUNET_SET_STATUS_ADD_REMOTE:
1083       GNUNET_assert (NULL != consensus_element);
1084       if (GNUNET_YES == setop->do_not_remove)
1085         break;
1086       if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1087         break;
1088       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1089                   "Removing element in Task {%s}\n",
1090                   debug_str_task_key (&task->key));
1091       if (NULL != output_set)
1092       {
1093         // FIXME: record pending adds, use callback
1094         GNUNET_SET_remove_element (output_set->h,
1095                                    element,
1096                                    NULL,
1097                                    NULL);
1098 #ifdef GNUNET_EXTRA_LOGGING
1099         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1100                     "P%u: removing element %s from set {%s} of task {%s}\n",
1101                     session->local_peer_idx,
1102                     debug_str_element (element),
1103                     debug_str_set_key (&setop->output_set),
1104                     debug_str_task_key (&task->key));
1105 #endif
1106       }
1107       if (NULL != output_diff)
1108       {
1109         diff_insert (output_diff, -1, element);
1110 #ifdef GNUNET_EXTRA_LOGGING
1111         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1112                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1113                     session->local_peer_idx,
1114                     debug_str_element (element),
1115                     debug_str_diff_key (&setop->output_diff),
1116                     debug_str_task_key (&task->key));
1117 #endif
1118       }
1119       if (NULL != output_rfn)
1120       {
1121         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1122 #ifdef GNUNET_EXTRA_LOGGING
1123         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1124                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1125                     session->local_peer_idx,
1126                     debug_str_element (element),
1127                     debug_str_rfn_key (&setop->output_rfn),
1128                     debug_str_task_key (&task->key));
1129 #endif
1130       }
1131       break;
1132     case GNUNET_SET_STATUS_DONE:
1133       // XXX: check first if any changes to the underlying
1134       // set are still pending
1135       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136                   "Finishing setop in Task {%s}\n",
1137                   debug_str_task_key (&task->key));
1138       if (NULL != output_rfn)
1139       {
1140         rfn_commit (output_rfn, task_other_peer (task));
1141       }
1142       if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1143       {
1144         session->first_size = current_size;
1145       }
1146       finish_task (task);
1147       break;
1148     case GNUNET_SET_STATUS_FAILURE:
1149       // XXX: cleanup
1150       GNUNET_break_op (0);
1151       finish_task (task);
1152       return;
1153     default:
1154       /* not reached */
1155       GNUNET_assert (0);
1156   }
1157 }
1158
1159 #ifdef EVIL
1160
1161 enum EvilnessType
1162 {
1163   EVILNESS_NONE,
1164   EVILNESS_CRAM_ALL,
1165   EVILNESS_CRAM_LEAD,
1166   EVILNESS_CRAM_ECHO,
1167   EVILNESS_SLACK,
1168   EVILNESS_SLACK_A2A,
1169 };
1170
1171 enum EvilnessSubType
1172 {
1173   EVILNESS_SUB_NONE,
1174   EVILNESS_SUB_REPLACEMENT,
1175   EVILNESS_SUB_NO_REPLACEMENT,
1176 };
1177
1178 struct Evilness
1179 {
1180   enum EvilnessType type;
1181   enum EvilnessSubType subtype;
1182   unsigned int num;
1183 };
1184
1185
1186 static int
1187 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1188 {
1189   if (0 == strcmp ("replace", evil_subtype_str))
1190   {
1191     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1192   }
1193   else if (0 == strcmp ("noreplace", evil_subtype_str))
1194   {
1195     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1196   }
1197   else
1198   {
1199     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1200                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1201                 evil_subtype_str);
1202     return GNUNET_SYSERR;
1203   }
1204   return GNUNET_OK;
1205 }
1206
1207
1208 static void
1209 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1210 {
1211   char *evil_spec;
1212   char *field;
1213   char *evil_type_str = NULL;
1214   char *evil_subtype_str = NULL;
1215
1216   GNUNET_assert (NULL != evil);
1217
1218   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1219   {
1220     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221                 "P%u: no evilness\n",
1222                 session->local_peer_idx);
1223     evil->type = EVILNESS_NONE;
1224     return;
1225   }
1226   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227               "P%u: got evilness spec\n",
1228               session->local_peer_idx);
1229
1230   for (field = strtok (evil_spec, "/");
1231        NULL != field;
1232        field = strtok (NULL, "/"))
1233   {
1234     unsigned int peer_num;
1235     unsigned int evil_num;
1236     int ret;
1237
1238     evil_type_str = NULL;
1239     evil_subtype_str = NULL;
1240
1241     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1242
1243     if (ret != 4)
1244     {
1245       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1246                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1247                   field,
1248                   ret);
1249       goto not_evil;
1250     }
1251
1252     GNUNET_assert (NULL != evil_type_str);
1253     GNUNET_assert (NULL != evil_subtype_str);
1254
1255     if (peer_num == session->local_peer_idx)
1256     {
1257       if (0 == strcmp ("slack", evil_type_str))
1258       {
1259         evil->type = EVILNESS_SLACK;
1260       }
1261       if (0 == strcmp ("slack-a2a", evil_type_str))
1262       {
1263         evil->type = EVILNESS_SLACK_A2A;
1264       }
1265       else if (0 == strcmp ("cram-all", evil_type_str))
1266       {
1267         evil->type = EVILNESS_CRAM_ALL;
1268         evil->num = evil_num;
1269         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1270           goto not_evil;
1271       }
1272       else if (0 == strcmp ("cram-lead", evil_type_str))
1273       {
1274         evil->type = EVILNESS_CRAM_LEAD;
1275         evil->num = evil_num;
1276         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1277           goto not_evil;
1278       }
1279       else if (0 == strcmp ("cram-echo", evil_type_str))
1280       {
1281         evil->type = EVILNESS_CRAM_ECHO;
1282         evil->num = evil_num;
1283         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1284           goto not_evil;
1285       }
1286       else
1287       {
1288         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1289                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1290                     evil_type_str);
1291         goto not_evil;
1292       }
1293       goto cleanup;
1294     }
1295     /* No GNUNET_free since memory was allocated by libc */
1296     free (evil_type_str);
1297     evil_type_str = NULL;
1298     evil_subtype_str = NULL;
1299   }
1300 not_evil:
1301   evil->type = EVILNESS_NONE;
1302 cleanup:
1303   GNUNET_free (evil_spec);
1304   /* no GNUNET_free_non_null since it wasn't
1305    * allocated with GNUNET_malloc */
1306   if (NULL != evil_type_str)
1307     free (evil_type_str);
1308   if (NULL != evil_subtype_str)
1309     free (evil_subtype_str);
1310 }
1311
1312 #endif
1313
1314
1315 /**
1316  * Commit the appropriate set for a
1317  * task.
1318  */
1319 static void
1320 commit_set (struct ConsensusSession *session,
1321             struct TaskEntry *task)
1322 {
1323   struct SetEntry *set;
1324   struct SetOpCls *setop = &task->cls.setop;
1325
1326   GNUNET_assert (NULL != setop->op);
1327   set = lookup_set (session, &setop->input_set);
1328   GNUNET_assert (NULL != set);
1329
1330   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1331   {
1332     struct GNUNET_SET_Element element;
1333     struct ConsensusElement ce = { 0 };
1334     ce.marker = CONSENSUS_MARKER_CONTESTED;
1335     element.data = &ce;
1336     element.size = sizeof (struct ConsensusElement);
1337     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1338     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1339   }
1340
1341   if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1342   {
1343     struct GNUNET_SET_Element element;
1344     struct ConsensusSizeElement cse = {
1345       .size = 0,
1346       .sender_index = 0
1347     };
1348     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting size marker\n");
1349     cse.ce.marker = CONSENSUS_MARKER_SIZE;
1350     cse.size = GNUNET_htonll (session->first_size);
1351     cse.sender_index = session->local_peer_idx;
1352     element.data = &cse;
1353     element.size = sizeof (struct ConsensusSizeElement);
1354     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1355     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1356   }
1357
1358 #ifdef EVIL
1359   {
1360     unsigned int i;
1361     struct Evilness evil;
1362
1363     get_evilness (session, &evil);
1364     if (EVILNESS_NONE != evil.type)
1365     {
1366       /* Useful for evaluation */
1367       GNUNET_STATISTICS_set (statistics,
1368                              "is evil",
1369                              1,
1370                              GNUNET_NO);
1371     }
1372     switch (evil.type)
1373     {
1374       case EVILNESS_CRAM_ALL:
1375       case EVILNESS_CRAM_LEAD:
1376       case EVILNESS_CRAM_ECHO:
1377         /* We're not cramming elements in the
1378            all-to-all round, since that would just
1379            add more elements to the result set, but
1380            wouldn't test robustness. */
1381         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1382         {
1383           GNUNET_SET_commit (setop->op, set->h);
1384           break;
1385         }
1386         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1387             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1388         {
1389           GNUNET_SET_commit (setop->op, set->h);
1390           break;
1391         }
1392         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1393         {
1394           GNUNET_SET_commit (setop->op, set->h);
1395           break;
1396         }
1397         for (i = 0; i < evil.num; i++)
1398         {
1399           struct GNUNET_SET_Element element;
1400           struct ConsensusStuffedElement se = {
1401             .ce.payload_type = 0,
1402             .ce.marker = 0,
1403           };
1404           element.data = &se;
1405           element.size = sizeof (struct ConsensusStuffedElement);
1406           element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1407
1408           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1409           {
1410             /* Always generate a new element. */
1411             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1412           }
1413           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1414           {
1415             /* Always cram the same elements, derived from counter. */
1416             GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1417           }
1418           else
1419           {
1420             GNUNET_assert (0);
1421           }
1422           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1423 #ifdef GNUNET_EXTRA_LOGGING
1424           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1425                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1426                       session->local_peer_idx,
1427                       debug_str_element (&element),
1428                       debug_str_set_key (&setop->input_set),
1429                       debug_str_task_key (&task->key));
1430 #endif
1431         }
1432         GNUNET_STATISTICS_update (statistics,
1433                                   "# stuffed elements",
1434                                   evil.num,
1435                                   GNUNET_NO);
1436         GNUNET_SET_commit (setop->op, set->h);
1437         break;
1438       case EVILNESS_SLACK:
1439         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1440                     "P%u: evil peer: slacking\n",
1441                     (unsigned int) session->local_peer_idx);
1442         /* Do nothing. */
1443       case EVILNESS_SLACK_A2A:
1444         if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1445              (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1446         {
1447           struct GNUNET_SET_Handle *empty_set;
1448           empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1449           GNUNET_SET_commit (setop->op, empty_set);
1450           GNUNET_SET_destroy (empty_set);
1451         }
1452         else
1453         {
1454           GNUNET_SET_commit (setop->op, set->h);
1455         }
1456         break;
1457       case EVILNESS_NONE:
1458         GNUNET_SET_commit (setop->op, set->h);
1459         break;
1460     }
1461   }
1462 #else
1463   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1464   {
1465     GNUNET_SET_commit (setop->op, set->h);
1466   }
1467   else
1468   {
1469     /* For our testcases, we don't want the blacklisted
1470        peers to wait. */
1471     GNUNET_SET_operation_cancel (setop->op);
1472     setop->op = NULL;
1473   }
1474 #endif
1475 }
1476
1477
1478 static void
1479 put_diff (struct ConsensusSession *session,
1480          struct DiffEntry *diff)
1481 {
1482   struct GNUNET_HashCode hash;
1483
1484   GNUNET_assert (NULL != diff);
1485
1486   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1487   GNUNET_assert (GNUNET_OK ==
1488                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1489                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1490 }
1491
1492 static void
1493 put_set (struct ConsensusSession *session,
1494          struct SetEntry *set)
1495 {
1496   struct GNUNET_HashCode hash;
1497
1498   GNUNET_assert (NULL != set->h);
1499
1500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1501               "Putting set %s\n",
1502               debug_str_set_key (&set->key));
1503
1504   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1505   GNUNET_assert (GNUNET_SYSERR !=
1506                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1507                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1508 }
1509
1510
1511 static void
1512 put_rfn (struct ConsensusSession *session,
1513          struct ReferendumEntry *rfn)
1514 {
1515   struct GNUNET_HashCode hash;
1516
1517   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1518   GNUNET_assert (GNUNET_OK ==
1519                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1520                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1521 }
1522
1523
1524
1525 static void
1526 task_cancel_reconcile (struct TaskEntry *task)
1527 {
1528   /* not implemented yet */
1529   GNUNET_assert (0);
1530 }
1531
1532
1533 static void
1534 apply_diff_to_rfn (struct DiffEntry *diff,
1535                    struct ReferendumEntry *rfn,
1536                    uint16_t voting_peer,
1537                    uint16_t num_peers)
1538 {
1539   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1540   struct DiffElementInfo *di;
1541
1542   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1543
1544   while (GNUNET_YES ==
1545          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1546                                                       NULL,
1547                                                       (const void **) &di))
1548   {
1549     if (di->weight > 0)
1550     {
1551       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1552     }
1553     if (di->weight < 0)
1554     {
1555       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1556     }
1557   }
1558
1559   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1560 }
1561
1562
1563 struct DiffEntry *
1564 diff_create ()
1565 {
1566   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1567
1568   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1569
1570   return d;
1571 }
1572
1573
1574 struct DiffEntry *
1575 diff_compose (struct DiffEntry *diff_1,
1576               struct DiffEntry *diff_2)
1577 {
1578   struct DiffEntry *diff_new;
1579   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1580   struct DiffElementInfo *di;
1581
1582   diff_new = diff_create ();
1583
1584   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1585   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1586   {
1587     diff_insert (diff_new, di->weight, di->element);
1588   }
1589   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1590
1591   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1592   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1593   {
1594     diff_insert (diff_new, di->weight, di->element);
1595   }
1596   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1597
1598   return diff_new;
1599 }
1600
1601
1602 struct ReferendumEntry *
1603 rfn_create (uint16_t size)
1604 {
1605   struct ReferendumEntry *rfn;
1606
1607   rfn = GNUNET_new (struct ReferendumEntry);
1608   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1609   rfn->peer_commited = GNUNET_new_array (size, int);
1610   rfn->peer_contested = GNUNET_new_array (size, int);
1611   rfn->num_peers = size;
1612
1613   return rfn;
1614 }
1615
1616
1617 #if UNUSED
1618 static void
1619 diff_destroy (struct DiffEntry *diff)
1620 {
1621   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1622   GNUNET_free (diff);
1623 }
1624 #endif
1625
1626
1627 /**
1628  * For a given majority, count what the outcome
1629  * is (add/remove/keep), and give the number
1630  * of peers that voted for this outcome.
1631  */
1632 static void
1633 rfn_majority (const struct ReferendumEntry *rfn,
1634               const struct RfnElementInfo *ri,
1635               uint16_t *ret_majority,
1636               enum ReferendumVote *ret_vote)
1637 {
1638   uint16_t votes_yes = 0;
1639   uint16_t num_commited = 0;
1640   uint16_t i;
1641
1642   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1643               "Computing rfn majority for element %s of rfn {%s}\n",
1644               debug_str_element (ri->element),
1645               debug_str_rfn_key (&rfn->key));
1646
1647   for (i = 0; i < rfn->num_peers; i++)
1648   {
1649     if (GNUNET_NO == rfn->peer_commited[i])
1650       continue;
1651     num_commited++;
1652
1653     if (GNUNET_YES == ri->votes[i])
1654       votes_yes++;
1655   }
1656
1657   if (votes_yes > (num_commited) / 2)
1658   {
1659     *ret_vote = ri->proposal;
1660     *ret_majority = votes_yes;
1661   }
1662   else
1663   {
1664     *ret_vote = VOTE_STAY;
1665     *ret_majority = num_commited - votes_yes;
1666   }
1667 }
1668
1669
1670 struct SetCopyCls
1671 {
1672   struct TaskEntry *task;
1673   struct SetKey dst_set_key;
1674 };
1675
1676
1677 static void
1678 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1679 {
1680   struct SetCopyCls *scc = cls;
1681   struct TaskEntry *task = scc->task;
1682   struct SetKey dst_set_key = scc->dst_set_key;
1683   struct SetEntry *set;
1684   struct SetHandle *sh = GNUNET_new (struct SetHandle);
1685
1686   sh->h = copy;
1687   GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1688                                task->step->session->set_handles_tail,
1689                                sh);
1690
1691   GNUNET_free (scc);
1692   set = GNUNET_new (struct SetEntry);
1693   set->h = copy;
1694   set->key = dst_set_key;
1695   put_set (task->step->session, set);
1696
1697   task->start (task);
1698 }
1699
1700
1701 /**
1702  * Call the start function of the given
1703  * task again after we created a copy of the given set.
1704  */
1705 static void
1706 create_set_copy_for_task (struct TaskEntry *task,
1707                           struct SetKey *src_set_key,
1708                           struct SetKey *dst_set_key)
1709 {
1710   struct SetEntry *src_set;
1711   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1712
1713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1714               "Copying set {%s} to {%s} for task {%s}\n",
1715               debug_str_set_key (src_set_key),
1716               debug_str_set_key (dst_set_key),
1717               debug_str_task_key (&task->key));
1718
1719   scc->task = task;
1720   scc->dst_set_key = *dst_set_key;
1721   src_set = lookup_set (task->step->session, src_set_key);
1722   GNUNET_assert (NULL != src_set);
1723   GNUNET_SET_copy_lazy (src_set->h,
1724                         set_copy_cb,
1725                         scc);
1726 }
1727
1728
1729 struct SetMutationProgressCls
1730 {
1731   int num_pending;
1732   /**
1733    * Task to finish once all changes are through.
1734    */
1735   struct TaskEntry *task;
1736 };
1737
1738
1739 static void
1740 set_mutation_done (void *cls)
1741 {
1742   struct SetMutationProgressCls *pc = cls;
1743
1744   GNUNET_assert (pc->num_pending > 0);
1745
1746   pc->num_pending--;
1747
1748   if (0 == pc->num_pending)
1749   {
1750     struct TaskEntry *task = pc->task;
1751     GNUNET_free (pc);
1752     finish_task (task);
1753   }
1754 }
1755
1756
1757 static void
1758 try_finish_step_early (struct Step *step)
1759 {
1760   unsigned int i;
1761
1762   if (GNUNET_YES == step->is_running)
1763     return;
1764   if (GNUNET_YES == step->is_finished)
1765     return;
1766   if (GNUNET_NO == step->early_finishable)
1767     return;
1768
1769   step->is_finished = GNUNET_YES;
1770
1771 #ifdef GNUNET_EXTRA_LOGGING
1772   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1773               "Finishing step `%s' early.\n",
1774               step->debug_name);
1775 #endif
1776
1777   for (i = 0; i < step->subordinates_len; i++)
1778   {
1779     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1780     step->subordinates[i]->pending_prereq--;
1781 #ifdef GNUNET_EXTRA_LOGGING
1782     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1783                 "Decreased pending_prereq to %u for step `%s'.\n",
1784                 (unsigned int) step->subordinates[i]->pending_prereq,
1785                 step->subordinates[i]->debug_name);
1786
1787 #endif
1788     try_finish_step_early (step->subordinates[i]);
1789   }
1790
1791   // XXX: maybe schedule as task to avoid recursion?
1792   run_ready_steps (step->session);
1793 }
1794
1795
1796 static void
1797 finish_step (struct Step *step)
1798 {
1799   unsigned int i;
1800
1801   GNUNET_assert (step->finished_tasks == step->tasks_len);
1802   GNUNET_assert (GNUNET_YES == step->is_running);
1803   GNUNET_assert (GNUNET_NO == step->is_finished);
1804
1805 #ifdef GNUNET_EXTRA_LOGGING
1806   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1807               "All tasks of step `%s' with %u subordinates finished.\n",
1808               step->debug_name,
1809               step->subordinates_len);
1810 #endif
1811
1812   for (i = 0; i < step->subordinates_len; i++)
1813   {
1814     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1815     step->subordinates[i]->pending_prereq--;
1816 #ifdef GNUNET_EXTRA_LOGGING
1817     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1818                 "Decreased pending_prereq to %u for step `%s'.\n",
1819                 (unsigned int) step->subordinates[i]->pending_prereq,
1820                 step->subordinates[i]->debug_name);
1821
1822 #endif
1823   }
1824
1825   step->is_finished = GNUNET_YES;
1826
1827   // XXX: maybe schedule as task to avoid recursion?
1828   run_ready_steps (step->session);
1829 }
1830
1831
1832
1833 /**
1834  * Apply the result from one round of gradecasts (i.e. every peer
1835  * should have gradecasted) to the peer's current set.
1836  *
1837  * @param task the task with context information
1838  */
1839 static void
1840 task_start_apply_round (struct TaskEntry *task)
1841 {
1842   struct ConsensusSession *session = task->step->session;
1843   struct SetKey sk_in;
1844   struct SetKey sk_out;
1845   struct RfnKey rk_in;
1846   struct SetEntry *set_out;
1847   struct ReferendumEntry *rfn_in;
1848   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1849   struct RfnElementInfo *ri;
1850   struct SetMutationProgressCls *progress_cls;
1851   uint16_t worst_majority = UINT16_MAX;
1852
1853   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1854   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1855   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1856
1857   set_out = lookup_set (session, &sk_out);
1858   if (NULL == set_out)
1859   {
1860     create_set_copy_for_task (task, &sk_in, &sk_out);
1861     return;
1862   }
1863
1864   rfn_in = lookup_rfn (session, &rk_in);
1865   GNUNET_assert (NULL != rfn_in);
1866
1867   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1868   progress_cls->task = task;
1869
1870   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1871
1872   while (GNUNET_YES ==
1873          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1874                                                       NULL,
1875                                                       (const void **) &ri))
1876   {
1877     uint16_t majority_num;
1878     enum ReferendumVote majority_vote;
1879
1880     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1881
1882     if (worst_majority > majority_num)
1883       worst_majority = majority_num;
1884
1885     switch (majority_vote)
1886     {
1887       case VOTE_ADD:
1888         progress_cls->num_pending++;
1889         GNUNET_assert (GNUNET_OK ==
1890                        GNUNET_SET_add_element (set_out->h,
1891                                                ri->element,
1892                                                &set_mutation_done,
1893                                                progress_cls));
1894         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1895                     "P%u: apply round: adding element %s with %u-majority.\n",
1896                     session->local_peer_idx,
1897                     debug_str_element (ri->element), majority_num);
1898         break;
1899       case VOTE_REMOVE:
1900         progress_cls->num_pending++;
1901         GNUNET_assert (GNUNET_OK ==
1902                        GNUNET_SET_remove_element (set_out->h,
1903                                                   ri->element,
1904                                                   &set_mutation_done,
1905                                                   progress_cls));
1906         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1907                     "P%u: apply round: deleting element %s with %u-majority.\n",
1908                     session->local_peer_idx,
1909                     debug_str_element (ri->element), majority_num);
1910         break;
1911       case VOTE_STAY:
1912         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1913                     "P%u: apply round: keeping element %s with %u-majority.\n",
1914                     session->local_peer_idx,
1915                     debug_str_element (ri->element), majority_num);
1916         // do nothing
1917         break;
1918       default:
1919         GNUNET_assert (0);
1920         break;
1921     }
1922   }
1923
1924   if (0 == progress_cls->num_pending)
1925   {
1926     // call closure right now, no pending ops
1927     GNUNET_free (progress_cls);
1928     finish_task (task);
1929   }
1930
1931   {
1932     uint16_t thresh = (session->num_peers / 3) * 2;
1933
1934     if (worst_majority >= thresh)
1935     {
1936       switch (session->early_stopping)
1937       {
1938         case EARLY_STOPPING_NONE:
1939           session->early_stopping = EARLY_STOPPING_ONE_MORE;
1940           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1941                       "P%u: Stopping early (after one more superround)\n",
1942                       session->local_peer_idx);
1943           break;
1944         case EARLY_STOPPING_ONE_MORE:
1945           GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1946                       session->local_peer_idx);
1947           session->early_stopping = EARLY_STOPPING_DONE;
1948           {
1949             struct Step *step;
1950             for (step = session->steps_head; NULL != step; step = step->next)
1951               try_finish_step_early (step);
1952           }
1953           break;
1954         case EARLY_STOPPING_DONE:
1955           /* We shouldn't be here anymore after early stopping */
1956           GNUNET_break (0);
1957           break;
1958         default:
1959           GNUNET_assert (0);
1960           break;
1961       }
1962     }
1963     else if (EARLY_STOPPING_NONE != session->early_stopping)
1964     {
1965       // Our assumption about the number of bad peers
1966       // has been broken.
1967       GNUNET_break_op (0);
1968     }
1969     else
1970     {
1971       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1972                   session->local_peer_idx);
1973     }
1974   }
1975   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1976 }
1977
1978
1979 static void
1980 task_start_grade (struct TaskEntry *task)
1981 {
1982   struct ConsensusSession *session = task->step->session;
1983   struct ReferendumEntry *output_rfn;
1984   struct ReferendumEntry *input_rfn;
1985   struct DiffEntry *input_diff;
1986   struct RfnKey rfn_key;
1987   struct DiffKey diff_key;
1988   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1989   struct RfnElementInfo *ri;
1990   unsigned int gradecast_confidence = 2;
1991
1992   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1993   output_rfn = lookup_rfn (session, &rfn_key);
1994   if (NULL == output_rfn)
1995   {
1996     output_rfn = rfn_create (session->num_peers);
1997     output_rfn->key = rfn_key;
1998     put_rfn (session, output_rfn);
1999   }
2000
2001   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2002   input_diff = lookup_diff (session, &diff_key);
2003   GNUNET_assert (NULL != input_diff);
2004
2005   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2006   input_rfn = lookup_rfn (session, &rfn_key);
2007   GNUNET_assert (NULL != input_rfn);
2008
2009   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2010
2011   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2012
2013   while (GNUNET_YES ==
2014          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2015                                                       NULL,
2016                                                       (const void **) &ri))
2017   {
2018     uint16_t majority_num;
2019     enum ReferendumVote majority_vote;
2020
2021     // XXX: we need contested votes and non-contested votes here
2022     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2023
2024     if (majority_num <= session->num_peers / 3)
2025       majority_vote = VOTE_REMOVE;
2026
2027     switch (majority_vote)
2028     {
2029       case VOTE_STAY:
2030         break;
2031       case VOTE_ADD:
2032         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2033         break;
2034       case VOTE_REMOVE:
2035         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2036         break;
2037       default:
2038         GNUNET_assert (0);
2039         break;
2040     }
2041   }
2042   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2043
2044   {
2045     uint16_t noncontested;
2046     noncontested = rfn_noncontested (input_rfn);
2047     if (noncontested < (session->num_peers / 3) * 2)
2048     {
2049       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2050     }
2051     if (noncontested < (session->num_peers / 3) + 1)
2052     {
2053       gradecast_confidence = 0;
2054     }
2055   }
2056
2057   if (gradecast_confidence >= 1)
2058     rfn_commit (output_rfn, task->key.leader);
2059
2060   if (gradecast_confidence <= 1)
2061     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2062
2063   finish_task (task);
2064 }
2065
2066
2067 static void
2068 task_start_reconcile (struct TaskEntry *task)
2069 {
2070   struct SetEntry *input;
2071   struct SetOpCls *setop = &task->cls.setop;
2072   struct ConsensusSession *session = task->step->session;
2073
2074   input = lookup_set (session, &setop->input_set);
2075   GNUNET_assert (NULL != input);
2076   GNUNET_assert (NULL != input->h);
2077
2078   /* We create the outputs for the operation here
2079      (rather than in the set operation callback)
2080      because we want something valid in there, even
2081      if the other peer doesn't talk to us */
2082
2083   if (SET_KIND_NONE != setop->output_set.set_kind)
2084   {
2085     /* If we don't have an existing output set,
2086        we clone the input set. */
2087     if (NULL == lookup_set (session, &setop->output_set))
2088     {
2089       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2090       return;
2091     }
2092   }
2093
2094   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2095   {
2096     if (NULL == lookup_rfn (session, &setop->output_rfn))
2097     {
2098       struct ReferendumEntry *rfn;
2099
2100       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2101                   "P%u: output rfn <%s> missing, creating.\n",
2102                   session->local_peer_idx,
2103                   debug_str_rfn_key (&setop->output_rfn));
2104
2105       rfn = rfn_create (session->num_peers);
2106       rfn->key = setop->output_rfn;
2107       put_rfn (session, rfn);
2108     }
2109   }
2110
2111   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2112   {
2113     if (NULL == lookup_diff (session, &setop->output_diff))
2114     {
2115       struct DiffEntry *diff;
2116
2117       diff = diff_create ();
2118       diff->key = setop->output_diff;
2119       put_diff (session, diff);
2120     }
2121   }
2122
2123   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2124   {
2125     /* XXX: mark the corresponding rfn as commited if necessary */
2126     finish_task (task);
2127     return;
2128   }
2129
2130   if (task->key.peer1 == session->local_peer_idx)
2131   {
2132     struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2133
2134     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2135                 "P%u: Looking up set {%s} to run remote union\n",
2136                 session->local_peer_idx,
2137                 debug_str_set_key (&setop->input_set));
2138
2139     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2140     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2141
2142     rcm.kind = htons (task->key.kind);
2143     rcm.peer1 = htons (task->key.peer1);
2144     rcm.peer2 = htons (task->key.peer2);
2145     rcm.leader = htons (task->key.leader);
2146     rcm.repetition = htons (task->key.repetition);
2147     rcm.is_contested = htons (0);
2148
2149     GNUNET_assert (NULL == setop->op);
2150     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2151                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2152
2153     struct GNUNET_SET_Option opts[] = {
2154       { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2155       { GNUNET_SET_OPTION_END },
2156     };
2157
2158     // XXX: maybe this should be done while
2159     // setting up tasks alreays?
2160     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2161                                     &session->global_id,
2162                                     &rcm.header,
2163                                     GNUNET_SET_RESULT_SYMMETRIC,
2164                                     opts,
2165                                     set_result_cb,
2166                                     task);
2167
2168     commit_set (session, task);
2169   }
2170   else if (task->key.peer2 == session->local_peer_idx)
2171   {
2172     /* Wait for the other peer to contact us */
2173     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2174                 session->local_peer_idx, task->key.peer1);
2175
2176     if (NULL != setop->op)
2177     {
2178       commit_set (session, task);
2179     }
2180   }
2181   else
2182   {
2183     /* We made an error while constructing the task graph. */
2184     GNUNET_assert (0);
2185   }
2186 }
2187
2188
2189 static void
2190 task_start_eval_echo (struct TaskEntry *task)
2191 {
2192   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2193   struct ReferendumEntry *input_rfn;
2194   struct RfnElementInfo *ri;
2195   struct SetEntry *output_set;
2196   struct SetMutationProgressCls *progress_cls;
2197   struct ConsensusSession *session = task->step->session;
2198   struct SetKey sk_in;
2199   struct SetKey sk_out;
2200   struct RfnKey rk_in;
2201
2202   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2203   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2204   output_set = lookup_set (session, &sk_out);
2205   if (NULL == output_set)
2206   {
2207     create_set_copy_for_task (task, &sk_in, &sk_out);
2208     return;
2209   }
2210
2211
2212   {
2213     // FIXME: should be marked as a shallow copy, so
2214     // we can destroy everything correctly
2215     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2216     last_set->h = output_set->h;
2217     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2218     put_set (session, last_set);
2219   }
2220
2221   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2222               "Evaluating referendum in Task {%s}\n",
2223               debug_str_task_key (&task->key));
2224
2225   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2226   progress_cls->task = task;
2227
2228   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2229   input_rfn = lookup_rfn (session, &rk_in);
2230
2231   GNUNET_assert (NULL != input_rfn);
2232
2233   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2234   GNUNET_assert (NULL != iter);
2235
2236   while (GNUNET_YES ==
2237          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2238                                                       NULL,
2239                                                       (const void **) &ri))
2240   {
2241     enum ReferendumVote majority_vote;
2242     uint16_t majority_num;
2243
2244     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2245
2246     if (majority_num < session->num_peers / 3)
2247     {
2248       /* It is not the case that all nonfaulty peers
2249          echoed the same value.  Since we're doing a set reconciliation, we
2250          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2251          reconciliation as contested.  Other peers might not know that the
2252          leader is faulty, thus we still re-distribute in the confirmation
2253          round. */
2254       output_set->is_contested = GNUNET_YES;
2255     }
2256
2257     switch (majority_vote)
2258     {
2259       case VOTE_ADD:
2260         progress_cls->num_pending++;
2261         GNUNET_assert (GNUNET_OK ==
2262                        GNUNET_SET_add_element (output_set->h,
2263                                                ri->element,
2264                                                set_mutation_done,
2265                                                progress_cls));
2266         break;
2267       case VOTE_REMOVE:
2268         progress_cls->num_pending++;
2269         GNUNET_assert (GNUNET_OK ==
2270                        GNUNET_SET_remove_element (output_set->h,
2271                                                   ri->element,
2272                                                   set_mutation_done,
2273                                                   progress_cls));
2274         break;
2275       case VOTE_STAY:
2276         /* Nothing to do. */
2277         break;
2278       default:
2279         /* not reached */
2280         GNUNET_assert (0);
2281     }
2282   }
2283
2284   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2285
2286   if (0 == progress_cls->num_pending)
2287   {
2288     // call closure right now, no pending ops
2289     GNUNET_free (progress_cls);
2290     finish_task (task);
2291   }
2292 }
2293
2294
2295 static void
2296 task_start_finish (struct TaskEntry *task)
2297 {
2298   struct SetEntry *final_set;
2299   struct ConsensusSession *session = task->step->session;
2300
2301   final_set = lookup_set (session, &task->cls.finish.input_set);
2302
2303   GNUNET_assert (NULL != final_set);
2304
2305
2306   GNUNET_SET_iterate (final_set->h,
2307                       send_to_client_iter,
2308                       task);
2309 }
2310
2311 static void
2312 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2313 {
2314   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2315
2316   GNUNET_assert (GNUNET_NO == task->is_started);
2317   GNUNET_assert (GNUNET_NO == task->is_finished);
2318   GNUNET_assert (NULL != task->start);
2319
2320   task->start (task);
2321
2322   task->is_started = GNUNET_YES;
2323 }
2324
2325
2326
2327
2328 /*
2329  * Run all steps of the session that don't any
2330  * more dependencies.
2331  */
2332 static void
2333 run_ready_steps (struct ConsensusSession *session)
2334 {
2335   struct Step *step;
2336
2337   step = session->steps_head;
2338
2339   while (NULL != step)
2340   {
2341     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2342     {
2343       size_t i;
2344
2345       GNUNET_assert (0 == step->finished_tasks);
2346
2347 #ifdef GNUNET_EXTRA_LOGGING
2348       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2349                   session->local_peer_idx,
2350                   step->debug_name,
2351                   step->round, step->tasks_len, step->subordinates_len);
2352 #endif
2353
2354       step->is_running = GNUNET_YES;
2355       for (i = 0; i < step->tasks_len; i++)
2356         start_task (session, step->tasks[i]);
2357
2358       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2359       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2360         finish_step (step);
2361
2362       /* Running the next ready steps will be triggered by task completion */
2363       return;
2364     }
2365     step = step->next;
2366   }
2367
2368   return;
2369 }
2370
2371
2372
2373 static void
2374 finish_task (struct TaskEntry *task)
2375 {
2376   GNUNET_assert (GNUNET_NO == task->is_finished);
2377   task->is_finished = GNUNET_YES;
2378
2379   task->step->finished_tasks++;
2380
2381   if (task->step->finished_tasks == task->step->tasks_len)
2382     finish_step (task->step);
2383 }
2384
2385
2386 /**
2387  * Search peer in the list of peers in session.
2388  *
2389  * @param peer peer to find
2390  * @param session session with peer
2391  * @return index of peer, -1 if peer is not in session
2392  */
2393 static int
2394 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2395 {
2396   int i;
2397   for (i = 0; i < session->num_peers; i++)
2398     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2399       return i;
2400   return -1;
2401 }
2402
2403
2404 /**
2405  * Compute a global, (hopefully) unique consensus session id,
2406  * from the local id of the consensus session, and the identities of all participants.
2407  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2408  * exactly the same peers, the global id will be different.
2409  *
2410  * @param session session to generate the global id for
2411  * @param local_session_id local id of the consensus session
2412  */
2413 static void
2414 compute_global_id (struct ConsensusSession *session,
2415                    const struct GNUNET_HashCode *local_session_id)
2416 {
2417   const char *salt = "gnunet-service-consensus/session_id";
2418
2419   GNUNET_assert (GNUNET_YES ==
2420                  GNUNET_CRYPTO_kdf (&session->global_id,
2421                                     sizeof (struct GNUNET_HashCode),
2422                                     salt,
2423                                     strlen (salt),
2424                                     session->peers,
2425                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2426                                     local_session_id,
2427                                     sizeof (struct GNUNET_HashCode),
2428                                     NULL));
2429 }
2430
2431
2432 /**
2433  * Compare two peer identities.
2434  *
2435  * @param h1 some peer identity
2436  * @param h2 some peer identity
2437  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2438  */
2439 static int
2440 peer_id_cmp (const void *h1, const void *h2)
2441 {
2442   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2443 }
2444
2445
2446 /**
2447  * Create the sorted list of peers for the session,
2448  * add the local peer if not in the join message.
2449  *
2450  * @param session session to initialize
2451  * @param join_msg join message with the list of peers participating at the end
2452  */
2453 static void
2454 initialize_session_peer_list (struct ConsensusSession *session,
2455                               const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2456 {
2457   const struct GNUNET_PeerIdentity *msg_peers
2458     = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2459   int local_peer_in_list;
2460
2461   session->num_peers = ntohl (join_msg->num_peers);
2462
2463   /* Peers in the join message, may or may not include the local peer,
2464      Add it if it is missing. */
2465   local_peer_in_list = GNUNET_NO;
2466   for (unsigned int i = 0; i < session->num_peers; i++)
2467   {
2468     if (0 == memcmp (&msg_peers[i],
2469                      &my_peer,
2470                      sizeof (struct GNUNET_PeerIdentity)))
2471     {
2472       local_peer_in_list = GNUNET_YES;
2473       break;
2474     }
2475   }
2476   if (GNUNET_NO == local_peer_in_list)
2477     session->num_peers++;
2478
2479   session->peers = GNUNET_new_array (session->num_peers,
2480                                      struct GNUNET_PeerIdentity);
2481   if (GNUNET_NO == local_peer_in_list)
2482     session->peers[session->num_peers - 1] = my_peer;
2483
2484   GNUNET_memcpy (session->peers,
2485                  msg_peers,
2486                  ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2487   qsort (session->peers,
2488          session->num_peers,
2489          sizeof (struct GNUNET_PeerIdentity),
2490          &peer_id_cmp);
2491 }
2492
2493
2494 static struct TaskEntry *
2495 lookup_task (struct ConsensusSession *session,
2496              struct TaskKey *key)
2497 {
2498   struct GNUNET_HashCode hash;
2499
2500
2501   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2502   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2503               GNUNET_h2s (&hash));
2504   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2505 }
2506
2507
2508 /**
2509  * Called when another peer wants to do a set operation with the
2510  * local peer.
2511  *
2512  * @param cls closure
2513  * @param other_peer the other peer
2514  * @param context_msg message with application specific information from
2515  *        the other peer
2516  * @param request request from the other peer, use GNUNET_SET_accept
2517  *        to accept it, otherwise the request will be refused
2518  *        Note that we don't use a return value here, as it is also
2519  *        necessary to specify the set we want to do the operation with,
2520  *        whith sometimes can be derived from the context message.
2521  *        Also necessary to specify the timeout.
2522  */
2523 static void
2524 set_listen_cb (void *cls,
2525                const struct GNUNET_PeerIdentity *other_peer,
2526                const struct GNUNET_MessageHeader *context_msg,
2527                struct GNUNET_SET_Request *request)
2528 {
2529   struct ConsensusSession *session = cls;
2530   struct TaskKey tk;
2531   struct TaskEntry *task;
2532   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2533
2534   if (NULL == context_msg)
2535   {
2536     GNUNET_break_op (0);
2537     return;
2538   }
2539
2540   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2541   {
2542     GNUNET_break_op (0);
2543     return;
2544   }
2545
2546   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2547   {
2548     GNUNET_break_op (0);
2549     return;
2550   }
2551
2552   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2553
2554   tk = ((struct TaskKey) {
2555       .kind = ntohs (cm->kind),
2556       .peer1 = ntohs (cm->peer1),
2557       .peer2 = ntohs (cm->peer2),
2558       .repetition = ntohs (cm->repetition),
2559       .leader = ntohs (cm->leader),
2560   });
2561
2562   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2563               session->local_peer_idx, debug_str_task_key (&tk));
2564
2565   task = lookup_task (session, &tk);
2566
2567   if (NULL == task)
2568   {
2569     GNUNET_break_op (0);
2570     return;
2571   }
2572
2573   if (GNUNET_YES == task->is_finished)
2574   {
2575     GNUNET_break_op (0);
2576     return;
2577   }
2578
2579   if (task->key.peer2 != session->local_peer_idx)
2580   {
2581     /* We're being asked, so we must be thne 2nd peer. */
2582     GNUNET_break_op (0);
2583     return;
2584   }
2585
2586   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2587                     (task->key.peer2 == session->local_peer_idx)));
2588
2589   struct GNUNET_SET_Option opts[] = {
2590     { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2591     { GNUNET_SET_OPTION_END },
2592   };
2593
2594   task->cls.setop.op = GNUNET_SET_accept (request,
2595                                           GNUNET_SET_RESULT_SYMMETRIC,
2596                                           opts,
2597                                           set_result_cb,
2598                                           task);
2599
2600   /* If the task hasn't been started yet,
2601      we wait for that until we commit. */
2602
2603   if (GNUNET_YES == task->is_started)
2604   {
2605     commit_set (session, task);
2606   }
2607 }
2608
2609
2610
2611 static void
2612 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2613           struct TaskEntry *t)
2614 {
2615   struct GNUNET_HashCode round_hash;
2616   struct Step *s;
2617
2618   GNUNET_assert (NULL != t->step);
2619
2620   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2621
2622   s = t->step;
2623
2624   if (s->tasks_len == s->tasks_cap)
2625   {
2626     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2627     GNUNET_array_grow (s->tasks,
2628                        s->tasks_cap,
2629                        target_size);
2630   }
2631
2632 #ifdef GNUNET_EXTRA_LOGGING
2633   GNUNET_assert (NULL != s->debug_name);
2634   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2635               debug_str_task_key (&t->key),
2636               s->debug_name);
2637 #endif
2638
2639   s->tasks[s->tasks_len] = t;
2640   s->tasks_len++;
2641
2642   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2643   GNUNET_assert (GNUNET_OK ==
2644       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2645                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2646 }
2647
2648
2649 static void
2650 install_step_timeouts (struct ConsensusSession *session)
2651 {
2652   /* Given the fully constructed task graph
2653      with rounds for tasks, we can give the tasks timeouts. */
2654
2655   // unsigned int max_round;
2656
2657   /* XXX: implement! */
2658 }
2659
2660
2661
2662 /*
2663  * Arrange two peers in some canonical order.
2664  */
2665 static void
2666 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2667 {
2668   uint16_t a;
2669   uint16_t b;
2670
2671   GNUNET_assert (*p1 < n);
2672   GNUNET_assert (*p2 < n);
2673
2674   if (*p1 < *p2)
2675   {
2676     a = *p1;
2677     b = *p2;
2678   }
2679   else
2680   {
2681     a = *p2;
2682     b = *p1;
2683   }
2684
2685   /* For uniformly random *p1, *p2,
2686      this condition is true with 50% chance */
2687   if (((b - a) + n) % n <= n / 2)
2688   {
2689     *p1 = a;
2690     *p2 = b;
2691   }
2692   else
2693   {
2694     *p1 = b;
2695     *p2 = a;
2696   }
2697 }
2698
2699
2700 /**
2701  * Record @a dep as a dependency of @a step.
2702  */
2703 static void
2704 step_depend_on (struct Step *step, struct Step *dep)
2705 {
2706   /* We're not checking for cyclic dependencies,
2707      but this is a cheap sanity check. */
2708   GNUNET_assert (step != dep);
2709   GNUNET_assert (NULL != step);
2710   GNUNET_assert (NULL != dep);
2711   GNUNET_assert (dep->round <= step->round);
2712
2713 #ifdef GNUNET_EXTRA_LOGGING
2714   /* Make sure we have complete debugging information.
2715      Also checks that we don't screw up too badly
2716      constructing the task graph. */
2717   GNUNET_assert (NULL != step->debug_name);
2718   GNUNET_assert (NULL != dep->debug_name);
2719   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2720               "Making step `%s' depend on `%s'\n",
2721               step->debug_name,
2722               dep->debug_name);
2723 #endif
2724
2725   if (dep->subordinates_cap == dep->subordinates_len)
2726   {
2727     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2728     GNUNET_array_grow (dep->subordinates,
2729                        dep->subordinates_cap,
2730                        target_size);
2731   }
2732
2733   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2734
2735   dep->subordinates[dep->subordinates_len] = step;
2736   dep->subordinates_len++;
2737
2738   step->pending_prereq++;
2739 }
2740
2741
2742 static struct Step *
2743 create_step (struct ConsensusSession *session, int round, int early_finishable)
2744 {
2745   struct Step *step;
2746   step = GNUNET_new (struct Step);
2747   step->session = session;
2748   step->round = round;
2749   step->early_finishable = early_finishable;
2750   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2751                                     session->steps_tail,
2752                                     step);
2753   return step;
2754 }
2755
2756
2757 /**
2758  * Construct the task graph for a single
2759  * gradecast.
2760  */
2761 static void
2762 construct_task_graph_gradecast (struct ConsensusSession *session,
2763                                 uint16_t rep,
2764                                 uint16_t lead,
2765                                 struct Step *step_before,
2766                                 struct Step *step_after)
2767 {
2768   uint16_t n = session->num_peers;
2769   uint16_t me = session->local_peer_idx;
2770
2771   uint16_t p1;
2772   uint16_t p2;
2773
2774   /* The task we're currently setting up. */
2775   struct TaskEntry task;
2776
2777   struct Step *step;
2778   struct Step *prev_step;
2779
2780   uint16_t round;
2781
2782   unsigned int k;
2783
2784   round = step_before->round + 1;
2785
2786   /* gcast step 1: leader disseminates */
2787
2788   step = create_step (session, round, GNUNET_YES);
2789
2790 #ifdef GNUNET_EXTRA_LOGGING
2791   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2792 #endif
2793   step_depend_on (step, step_before);
2794
2795   if (lead == me)
2796   {
2797     for (k = 0; k < n; k++)
2798     {
2799       if (k == me)
2800         continue;
2801       p1 = me;
2802       p2 = k;
2803       arrange_peers (&p1, &p2, n);
2804       task = ((struct TaskEntry) {
2805         .step = step,
2806         .start = task_start_reconcile,
2807         .cancel = task_cancel_reconcile,
2808         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2809       });
2810       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2811       put_task (session->taskmap, &task);
2812     }
2813     /* We run this task to make sure that the leader
2814        has the stored the SET_KIND_LEADER set of himself,
2815        so he can participate in the rest of the gradecast
2816        without the code having to handle any special cases. */
2817     task = ((struct TaskEntry) {
2818       .step = step,
2819       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2820       .start = task_start_reconcile,
2821       .cancel = task_cancel_reconcile,
2822     });
2823     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2824     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2825     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2826     put_task (session->taskmap, &task);
2827   }
2828   else
2829   {
2830     p1 = me;
2831     p2 = lead;
2832     arrange_peers (&p1, &p2, n);
2833     task = ((struct TaskEntry) {
2834       .step = step,
2835       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2836       .start = task_start_reconcile,
2837       .cancel = task_cancel_reconcile,
2838     });
2839     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2840     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2841     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2842     put_task (session->taskmap, &task);
2843   }
2844
2845   /* gcast phase 2: echo */
2846   prev_step = step;
2847   round += 1;
2848   step = create_step (session, round, GNUNET_YES);
2849 #ifdef GNUNET_EXTRA_LOGGING
2850   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2851 #endif
2852   step_depend_on (step, prev_step);
2853
2854   for (k = 0; k < n; k++)
2855   {
2856     p1 = k;
2857     p2 = me;
2858     arrange_peers (&p1, &p2, n);
2859     task = ((struct TaskEntry) {
2860       .step = step,
2861       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2862       .start = task_start_reconcile,
2863       .cancel = task_cancel_reconcile,
2864     });
2865     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2866     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2867     put_task (session->taskmap, &task);
2868   }
2869
2870   prev_step = step;
2871   /* Same round, since step only has local tasks */
2872   step = create_step (session, round, GNUNET_YES);
2873 #ifdef GNUNET_EXTRA_LOGGING
2874   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2875 #endif
2876   step_depend_on (step, prev_step);
2877
2878   arrange_peers (&p1, &p2, n);
2879   task = ((struct TaskEntry) {
2880     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2881     .step = step,
2882     .start = task_start_eval_echo
2883   });
2884   put_task (session->taskmap, &task);
2885
2886   prev_step = step;
2887   round += 1;
2888   step = create_step (session, round, GNUNET_YES);
2889 #ifdef GNUNET_EXTRA_LOGGING
2890   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2891 #endif
2892   step_depend_on (step, prev_step);
2893
2894   /* gcast phase 3: confirmation and grading */
2895   for (k = 0; k < n; k++)
2896   {
2897     p1 = k;
2898     p2 = me;
2899     arrange_peers (&p1, &p2, n);
2900     task = ((struct TaskEntry) {
2901       .step = step,
2902       .start = task_start_reconcile,
2903       .cancel = task_cancel_reconcile,
2904       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2905     });
2906     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2907     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2908     /* If there was at least one element in the echo round that was
2909        contested (i.e. it had no n-t majority), then we let the other peers
2910        know, and other peers let us know.  The contested flag for each peer is
2911        stored in the rfn. */
2912     task.cls.setop.transceive_contested = GNUNET_YES;
2913     put_task (session->taskmap, &task);
2914   }
2915
2916   prev_step = step;
2917   /* Same round, since step only has local tasks */
2918   step = create_step (session, round, GNUNET_YES);
2919 #ifdef GNUNET_EXTRA_LOGGING
2920   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2921 #endif
2922   step_depend_on (step, prev_step);
2923
2924   task = ((struct TaskEntry) {
2925     .step = step,
2926     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2927     .start = task_start_grade,
2928   });
2929   put_task (session->taskmap, &task);
2930
2931   step_depend_on (step_after, step);
2932 }
2933
2934
2935 static void
2936 construct_task_graph (struct ConsensusSession *session)
2937 {
2938   uint16_t n = session->num_peers;
2939   uint16_t t = n / 3;
2940
2941   uint16_t me = session->local_peer_idx;
2942
2943   /* The task we're currently setting up. */
2944   struct TaskEntry task;
2945
2946   /* Current leader */
2947   unsigned int lead;
2948
2949   struct Step *step;
2950   struct Step *prev_step;
2951
2952   unsigned int round = 0;
2953
2954   unsigned int i;
2955
2956   // XXX: introduce first step,
2957   // where we wait for all insert acks
2958   // from the set service
2959
2960   /* faster but brittle all-to-all */
2961
2962   // XXX: Not implemented yet
2963
2964   /* all-to-all step */
2965
2966   step = create_step (session, round, GNUNET_NO);
2967
2968 #ifdef GNUNET_EXTRA_LOGGING
2969   step->debug_name = GNUNET_strdup ("all to all");
2970 #endif
2971
2972   for (i = 0; i < n; i++)
2973   {
2974     uint16_t p1;
2975     uint16_t p2;
2976
2977     p1 = me;
2978     p2 = i;
2979     arrange_peers (&p1, &p2, n);
2980     task = ((struct TaskEntry) {
2981       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2982       .step = step,
2983       .start = task_start_reconcile,
2984       .cancel = task_cancel_reconcile,
2985     });
2986     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2987     task.cls.setop.output_set = task.cls.setop.input_set;
2988     task.cls.setop.do_not_remove = GNUNET_YES;
2989     put_task (session->taskmap, &task);
2990   }
2991
2992   round += 1;
2993   prev_step = step;
2994   step = create_step (session, round, GNUNET_NO);;
2995 #ifdef GNUNET_EXTRA_LOGGING
2996   step->debug_name = GNUNET_strdup ("all to all 2");
2997 #endif
2998   step_depend_on (step, prev_step);
2999
3000
3001   for (i = 0; i < n; i++)
3002   {
3003     uint16_t p1;
3004     uint16_t p2;
3005
3006     p1 = me;
3007     p2 = i;
3008     arrange_peers (&p1, &p2, n);
3009     task = ((struct TaskEntry) {
3010       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3011       .step = step,
3012       .start = task_start_reconcile,
3013       .cancel = task_cancel_reconcile,
3014     });
3015     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3016     task.cls.setop.output_set = task.cls.setop.input_set;
3017     task.cls.setop.do_not_remove = GNUNET_YES;
3018     put_task (session->taskmap, &task);
3019   }
3020
3021   round += 1;
3022
3023   prev_step = step;
3024   step = NULL;
3025
3026
3027
3028   /* Byzantine union */
3029
3030   /* sequential repetitions of the gradecasts */
3031   for (i = 0; i < t + 1; i++)
3032   {
3033     struct Step *step_rep_start;
3034     struct Step *step_rep_end;
3035
3036     /* Every repetition is in a separate round. */
3037     step_rep_start = create_step (session, round, GNUNET_YES);
3038 #ifdef GNUNET_EXTRA_LOGGING
3039     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3040 #endif
3041
3042     step_depend_on (step_rep_start, prev_step);
3043
3044     /* gradecast has three rounds */
3045     round += 3;
3046     step_rep_end = create_step (session, round, GNUNET_YES);
3047 #ifdef GNUNET_EXTRA_LOGGING
3048     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3049 #endif
3050
3051     /* parallel gradecasts */
3052     for (lead = 0; lead < n; lead++)
3053       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3054
3055     task = ((struct TaskEntry) {
3056       .step = step_rep_end,
3057       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3058       .start = task_start_apply_round,
3059     });
3060     put_task (session->taskmap, &task);
3061
3062     prev_step = step_rep_end;
3063   }
3064
3065  /* There is no next gradecast round, thus the final
3066     start step is the overall end step of the gradecasts */
3067   round += 1;
3068   step = create_step (session, round, GNUNET_NO);
3069 #ifdef GNUNET_EXTRA_LOGGING
3070   GNUNET_asprintf (&step->debug_name, "finish");
3071 #endif
3072   step_depend_on (step, prev_step);
3073
3074   task = ((struct TaskEntry) {
3075     .step = step,
3076     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3077     .start = task_start_finish,
3078   });
3079   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3080
3081   put_task (session->taskmap, &task);
3082 }
3083
3084
3085
3086 /**
3087  * Check join message.
3088  *
3089  * @param cls session of client that sent the message
3090  * @param m message sent by the client
3091  * @return #GNUNET_OK if @a m is well-formed
3092  */
3093 static int
3094 check_client_join (void *cls,
3095                    const struct GNUNET_CONSENSUS_JoinMessage *m)
3096 {
3097   uint32_t listed_peers = ntohl (m->num_peers);
3098
3099   if ( (ntohs (m->header.size) - sizeof (*m)) !=
3100        listed_peers * sizeof (struct GNUNET_PeerIdentity))
3101   {
3102     GNUNET_break (0);
3103     return GNUNET_SYSERR;
3104   }
3105   return GNUNET_OK;
3106 }
3107
3108
3109 /**
3110  * Called when a client wants to join a consensus session.
3111  *
3112  * @param cls session of client that sent the message
3113  * @param m message sent by the client
3114  */
3115 static void
3116 handle_client_join (void *cls,
3117                     const struct GNUNET_CONSENSUS_JoinMessage *m)
3118 {
3119   struct ConsensusSession *session = cls;
3120   struct ConsensusSession *other_session;
3121
3122   initialize_session_peer_list (session,
3123                                 m);
3124   compute_global_id (session,
3125                      &m->session_id);
3126
3127   /* Check if some local client already owns the session.
3128      It is only legal to have a session with an existing global id
3129      if all other sessions with this global id are finished.*/
3130   for (other_session = sessions_head;
3131        NULL != other_session;
3132        other_session = other_session->next)
3133   {
3134     if ( (other_session != session) &&
3135          (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3136                                        &other_session->global_id)) )
3137       break;
3138   }
3139
3140   session->conclude_deadline
3141     = GNUNET_TIME_absolute_ntoh (m->deadline);
3142   session->conclude_start
3143     = GNUNET_TIME_absolute_ntoh (m->start);
3144   session->local_peer_idx = get_peer_idx (&my_peer,
3145                                           session);
3146   GNUNET_assert (-1 != session->local_peer_idx);
3147
3148   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3149               "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3150               GNUNET_h2s (&m->session_id),
3151               session->num_peers,
3152               session->local_peer_idx,
3153               GNUNET_STRINGS_relative_time_to_string
3154               (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3155                                                     session->conclude_deadline),
3156                GNUNET_YES));
3157
3158   session->set_listener
3159     = GNUNET_SET_listen (cfg,
3160                          GNUNET_SET_OPERATION_UNION,
3161                          &session->global_id,
3162                          &set_listen_cb,
3163                          session);
3164
3165   session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3166                                                           GNUNET_NO);
3167   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3168                                                            GNUNET_NO);
3169   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3170                                                            GNUNET_NO);
3171   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3172                                                           GNUNET_NO);
3173
3174   {
3175     struct SetEntry *client_set;
3176
3177     client_set = GNUNET_new (struct SetEntry);
3178     client_set->h = GNUNET_SET_create (cfg,
3179                                        GNUNET_SET_OPERATION_UNION);
3180     struct SetHandle *sh = GNUNET_new (struct SetHandle);
3181     sh->h = client_set->h;
3182     GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3183                                  session->set_handles_tail,
3184                                  sh);
3185     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3186     put_set (session,
3187              client_set);
3188   }
3189
3190   session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3191                                                  int);
3192
3193   /* Just construct the task graph,
3194      but don't run anything until the client calls conclude. */
3195   construct_task_graph (session);
3196   GNUNET_SERVICE_client_continue (session->client);
3197 }
3198
3199
3200 static void
3201 client_insert_done (void *cls)
3202 {
3203   // FIXME: implement
3204 }
3205
3206
3207 /**
3208  * Called when a client performs an insert operation.
3209  *
3210  * @param cls client handle
3211  * @param msg message sent by the client
3212  * @return #GNUNET_OK (always well-formed)
3213  */
3214 static int
3215 check_client_insert (void *cls,
3216                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3217 {
3218   return GNUNET_OK;
3219 }
3220
3221
3222 /**
3223  * Called when a client performs an insert operation.
3224  *
3225  * @param cls client handle
3226  * @param msg message sent by the client
3227  */
3228 static void
3229 handle_client_insert (void *cls,
3230                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3231 {
3232   struct ConsensusSession *session = cls;
3233   ssize_t element_size;
3234   struct GNUNET_SET_Handle *initial_set;
3235   struct ConsensusElement *ce;
3236
3237   if (GNUNET_YES == session->conclude_started)
3238   {
3239     GNUNET_break (0);
3240     GNUNET_SERVICE_client_drop (session->client);
3241     return;
3242   }
3243
3244   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3245   ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3246   GNUNET_memcpy (&ce[1], &msg[1], element_size);
3247   ce->payload_type = msg->element_type;
3248
3249   struct GNUNET_SET_Element element = {
3250     .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3251     .size = sizeof (struct ConsensusElement) + element_size,
3252     .data = ce,
3253   };
3254
3255   {
3256     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3257     struct SetEntry *entry;
3258
3259     entry = lookup_set (session,
3260                         &key);
3261     GNUNET_assert (NULL != entry);
3262     initial_set = entry->h;
3263   }
3264
3265   session->num_client_insert_pending++;
3266   GNUNET_SET_add_element (initial_set,
3267                           &element,
3268                           &client_insert_done,
3269                           session);
3270
3271 #ifdef GNUNET_EXTRA_LOGGING
3272   {
3273     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3274                 "P%u: element %s added\n",
3275                 session->local_peer_idx,
3276                 debug_str_element (&element));
3277   }
3278 #endif
3279   GNUNET_free (ce);
3280   GNUNET_SERVICE_client_continue (session->client);
3281 }
3282
3283
3284 /**
3285  * Called when a client performs the conclude operation.
3286  *
3287  * @param cls client handle
3288  * @param message message sent by the client
3289  */
3290 static void
3291 handle_client_conclude (void *cls,
3292                         const struct GNUNET_MessageHeader *message)
3293 {
3294   struct ConsensusSession *session = cls;
3295
3296   if (GNUNET_YES == session->conclude_started)
3297   {
3298     /* conclude started twice */
3299     GNUNET_break (0);
3300     GNUNET_SERVICE_client_drop (session->client);
3301     return;
3302   }
3303   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3304               "conclude requested\n");
3305   session->conclude_started = GNUNET_YES;
3306   install_step_timeouts (session);
3307   run_ready_steps (session);
3308   GNUNET_SERVICE_client_continue (session->client);
3309 }
3310
3311
3312 /**
3313  * Called to clean up, after a shutdown has been requested.
3314  *
3315  * @param cls closure
3316  */
3317 static void
3318 shutdown_task (void *cls)
3319 {
3320   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3321               "shutting down\n");
3322   GNUNET_STATISTICS_destroy (statistics,
3323                              GNUNET_NO);
3324   statistics = NULL;
3325 }
3326
3327
3328 /**
3329  * Start processing consensus requests.
3330  *
3331  * @param cls closure
3332  * @param c configuration to use
3333  * @param service the initialized service
3334  */
3335 static void
3336 run (void *cls,
3337      const struct GNUNET_CONFIGURATION_Handle *c,
3338      struct GNUNET_SERVICE_Handle *service)
3339 {
3340   cfg = c;
3341   if (GNUNET_OK !=
3342       GNUNET_CRYPTO_get_peer_identity (cfg,
3343                                        &my_peer))
3344   {
3345     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3346                 "Could not retrieve host identity\n");
3347     GNUNET_SCHEDULER_shutdown ();
3348     return;
3349   }
3350   statistics = GNUNET_STATISTICS_create ("consensus",
3351                                          cfg);
3352   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3353                                  NULL);
3354 }
3355
3356
3357 /**
3358  * Callback called when a client connects to the service.
3359  *
3360  * @param cls closure for the service
3361  * @param c the new client that connected to the service
3362  * @param mq the message queue used to send messages to the client
3363  * @return @a c
3364  */
3365 static void *
3366 client_connect_cb (void *cls,
3367                    struct GNUNET_SERVICE_Client *c,
3368                    struct GNUNET_MQ_Handle *mq)
3369 {
3370   struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3371
3372   session->client = c;
3373   session->client_mq = mq;
3374   GNUNET_CONTAINER_DLL_insert (sessions_head,
3375                                sessions_tail,
3376                                session);
3377   return session;
3378 }
3379
3380
3381 /**
3382  * Callback called when a client disconnected from the service
3383  *
3384  * @param cls closure for the service
3385  * @param c the client that disconnected
3386  * @param internal_cls should be equal to @a c
3387  */
3388 static void
3389 client_disconnect_cb (void *cls,
3390                       struct GNUNET_SERVICE_Client *c,
3391                       void *internal_cls)
3392 {
3393   struct ConsensusSession *session = internal_cls;
3394
3395   if (NULL != session->set_listener)
3396   {
3397     GNUNET_SET_listen_cancel (session->set_listener);
3398     session->set_listener = NULL;
3399   }
3400   GNUNET_CONTAINER_DLL_remove (sessions_head,
3401                                sessions_tail,
3402                                session);
3403
3404   while (session->set_handles_head)
3405   {
3406     struct SetHandle *sh = session->set_handles_head;
3407     session->set_handles_head = sh->next;
3408     GNUNET_SET_destroy (sh->h);
3409     GNUNET_free (sh);
3410   }
3411   GNUNET_free (session);
3412 }
3413
3414
3415 /**
3416  * Define "main" method using service macro.
3417  */
3418 GNUNET_SERVICE_MAIN
3419 ("consensus",
3420  GNUNET_SERVICE_OPTION_NONE,
3421  &run,
3422  &client_connect_cb,
3423  &client_disconnect_cb,
3424  NULL,
3425  GNUNET_MQ_hd_fixed_size (client_conclude,
3426                           GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3427                           struct GNUNET_MessageHeader,
3428                           NULL),
3429  GNUNET_MQ_hd_var_size (client_insert,
3430                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3431                         struct GNUNET_CONSENSUS_ElementMessage,
3432                         NULL),
3433  GNUNET_MQ_hd_var_size (client_join,
3434                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3435                         struct GNUNET_CONSENSUS_JoinMessage,
3436                         NULL),
3437  GNUNET_MQ_handler_end ());
3438
3439 /* end of gnunet-service-consensus.c */