consensus: slack-a2a evilness mode and static replication in profiler
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_consensus_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
37
38
39 enum ReferendumVote
40 {
41   /**
42    * Vote that nothing should change.
43    * This option is never voted explicitly.
44    */
45   VOTE_STAY = 0,
46   /**
47    * Vote that an element should be added.
48    */
49   VOTE_ADD = 1,
50   /**
51    * Vote that an element should be removed.
52    */
53   VOTE_REMOVE = 2,
54 };
55
56
57 enum EarlyStoppingPhase
58 {
59   EARLY_STOPPING_NONE = 0,
60   EARLY_STOPPING_ONE_MORE = 1,
61   EARLY_STOPPING_DONE = 2,
62 };
63
64
65 GNUNET_NETWORK_STRUCT_BEGIN
66
67 /**
68  * Tuple of integers that together
69  * identify a task uniquely.
70  */
71 struct TaskKey {
72   /**
73    * A value from 'enum PhaseKind'.
74    */
75   uint16_t kind GNUNET_PACKED;
76
77   /**
78    * Number of the first peer
79    * in canonical order.
80    */
81   int16_t peer1 GNUNET_PACKED;
82
83   /**
84    * Number of the second peer in canonical order.
85    */
86   int16_t peer2 GNUNET_PACKED;
87
88   /**
89    * Repetition of the gradecast phase.
90    */
91   int16_t repetition GNUNET_PACKED;
92
93   /**
94    * Leader in the gradecast phase.
95    *
96    * Can be different from both peer1 and peer2.
97    */
98   int16_t leader GNUNET_PACKED;
99 };
100
101
102
103 struct SetKey
104 {
105   int set_kind GNUNET_PACKED;
106   int k1 GNUNET_PACKED;
107   int k2 GNUNET_PACKED;
108 };
109
110
111 struct SetEntry
112 {
113   struct SetKey key;
114   struct GNUNET_SET_Handle *h;
115   /**
116    * GNUNET_YES if the set resulted
117    * from applying a referendum with contested
118    * elements.
119    */
120   int is_contested;
121 };
122
123
124 struct DiffKey
125 {
126   int diff_kind GNUNET_PACKED;
127   int k1 GNUNET_PACKED;
128   int k2 GNUNET_PACKED;
129 };
130
131 struct RfnKey
132 {
133   int rfn_kind GNUNET_PACKED;
134   int k1 GNUNET_PACKED;
135   int k2 GNUNET_PACKED;
136 };
137
138
139 GNUNET_NETWORK_STRUCT_END
140
141 enum PhaseKind
142 {
143   PHASE_KIND_ALL_TO_ALL,
144   PHASE_KIND_ALL_TO_ALL_2,
145   PHASE_KIND_GRADECAST_LEADER,
146   PHASE_KIND_GRADECAST_ECHO,
147   PHASE_KIND_GRADECAST_ECHO_GRADE,
148   PHASE_KIND_GRADECAST_CONFIRM,
149   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
150   /**
151    * Apply a repetition of the all-to-all
152    * gradecast to the current set.
153    */
154   PHASE_KIND_APPLY_REP,
155   PHASE_KIND_FINISH,
156 };
157
158
159 enum SetKind
160 {
161   SET_KIND_NONE = 0,
162   SET_KIND_CURRENT,
163   /**
164    * Last result set from a gradecast
165    */
166   SET_KIND_LAST_GRADECAST,
167   SET_KIND_LEADER_PROPOSAL,
168   SET_KIND_ECHO_RESULT,
169 };
170
171 enum DiffKind
172 {
173   DIFF_KIND_NONE = 0,
174   DIFF_KIND_LEADER_PROPOSAL,
175   DIFF_KIND_LEADER_CONSENSUS,
176   DIFF_KIND_GRADECAST_RESULT,
177 };
178
179 enum RfnKind
180 {
181   RFN_KIND_NONE = 0,
182   RFN_KIND_ECHO,
183   RFN_KIND_CONFIRM,
184   RFN_KIND_GRADECAST_RESULT
185 };
186
187
188 struct SetOpCls
189 {
190   struct SetKey input_set;
191
192   struct SetKey output_set;
193   struct RfnKey output_rfn;
194   struct DiffKey output_diff;
195
196   int do_not_remove;
197
198   int transceive_contested;
199
200   struct GNUNET_SET_OperationHandle *op;
201 };
202
203
204 struct FinishCls
205 {
206   struct SetKey input_set;
207 };
208
209 /**
210  * Closure for both @a start_task
211  * and @a cancel_task.
212  */
213 union TaskFuncCls
214 {
215   struct SetOpCls setop;
216   struct FinishCls finish;
217 };
218
219 struct TaskEntry;
220
221 typedef void (*TaskFunc) (struct TaskEntry *task);
222
223 /*
224  * Node in the consensus task graph.
225  */
226 struct TaskEntry
227 {
228   struct TaskKey key;
229
230   struct Step *step;
231
232   int is_started;
233
234   int is_finished;
235
236   TaskFunc start;
237   TaskFunc cancel;
238
239   union TaskFuncCls cls;
240 };
241
242
243 struct Step
244 {
245   /**
246    * All steps of one session are in a
247    * linked list for easier deallocation.
248    */
249   struct Step *prev;
250
251   /**
252    * All steps of one session are in a
253    * linked list for easier deallocation.
254    */
255   struct Step *next;
256
257   struct ConsensusSession *session;
258
259   /**
260    * Tasks that this step is composed of.
261    */
262   struct TaskEntry **tasks;
263   unsigned int tasks_len;
264   unsigned int tasks_cap;
265
266   unsigned int finished_tasks;
267
268   /*
269    * Tasks that have this task as dependency.
270    *
271    * We store pointers to subordinates rather
272    * than to prerequisites since it makes
273    * tracking the readiness of a task easier.
274    */
275   struct Step **subordinates;
276   unsigned int subordinates_len;
277   unsigned int subordinates_cap;
278
279   /**
280    * Counter for the prerequisites of
281    * this step.
282    */
283   size_t pending_prereq;
284
285   /*
286    * Task that will run this step despite
287    * any pending prerequisites.
288    */
289   struct GNUNET_SCHEDULER_Task *timeout_task;
290
291   unsigned int is_running;
292
293   unsigned int is_finished;
294
295   /*
296    * Synchrony round of the task.
297    * Determines the deadline for the task.
298    */
299   unsigned int round;
300
301   /**
302    * Human-readable name for
303    * the task, used for debugging.
304    */
305   char *debug_name;
306
307   /**
308    * When we're doing an early finish, how should this step be
309    * treated?
310    * If GNUNET_YES, the step will be marked as finished
311    * without actually running its tasks.
312    * Otherwise, the step will still be run even after
313    * an early finish.
314    *
315    * Note that a task may never be finished early if
316    * it is already running.
317    */
318   int early_finishable;
319 };
320
321
322 struct RfnElementInfo
323 {
324   const struct GNUNET_SET_Element *element;
325
326   /*
327    * GNUNET_YES if the peer votes for the proposal.
328    */
329   int *votes;
330
331   /**
332    * Proposal for this element,
333    * can only be VOTE_ADD or VOTE_REMOVE.
334    */
335   enum ReferendumVote proposal;
336 };
337
338
339 struct ReferendumEntry
340 {
341   struct RfnKey key;
342
343   /*
344    * Elements where there is at least one proposed change.
345    *
346    * Maps the hash of the GNUNET_SET_Element
347    * to 'struct RfnElementInfo'.
348    */
349   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
350
351   unsigned int num_peers;
352
353   /**
354    * Stores, for every peer in the session,
355    * whether the peer finished the whole referendum.
356    *
357    * Votes from peers are only counted if they're
358    * marked as commited (#GNUNET_YES) in the referendum.
359    *
360    * Otherwise (#GNUNET_NO), the requested changes are
361    * not counted for majority votes or thresholds.
362    */
363   int *peer_commited;
364
365
366   /**
367    * Contestation state of the peer.  If a peer is contested, the values it
368    * contributed are still counted for applying changes, but the grading is
369    * affected.
370    */
371   int *peer_contested;
372 };
373
374
375 struct DiffElementInfo
376 {
377   const struct GNUNET_SET_Element *element;
378
379   /**
380    * Positive weight for 'add', negative
381    * weights for 'remove'.
382    */
383   int weight;
384 };
385
386
387 /**
388  * Weighted diff.
389  */
390 struct DiffEntry
391 {
392   struct DiffKey key;
393   struct GNUNET_CONTAINER_MultiHashMap *changes;
394 };
395
396
397
398 /**
399  * A consensus session consists of one local client and the remote authorities.
400  */
401 struct ConsensusSession
402 {
403   /**
404    * Consensus sessions are kept in a DLL.
405    */
406   struct ConsensusSession *next;
407
408   /**
409    * Consensus sessions are kept in a DLL.
410    */
411   struct ConsensusSession *prev;
412
413   unsigned int num_client_insert_pending;
414
415   struct GNUNET_CONTAINER_MultiHashMap *setmap;
416   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
417   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
418
419   /**
420    * Array of peers with length 'num_peers'.
421    */
422   int *peers_blacklisted;
423
424   /*
425    * Mapping from (hashed) TaskKey to TaskEntry.
426    *
427    * We map the application_id for a round to the task that should be
428    * executed, so we don't have to go through all task whenever we get
429    * an incoming set op request.
430    */
431   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
432
433   struct Step *steps_head;
434   struct Step *steps_tail;
435
436   int conclude_started;
437
438   int conclude_done;
439
440   /**
441   * Global consensus identification, computed
442   * from the session id and participating authorities.
443   */
444   struct GNUNET_HashCode global_id;
445
446   /**
447    * Client that inhabits the session
448    */
449   struct GNUNET_SERVICE_Client *client;
450
451   /**
452    * Queued messages to the client.
453    */
454   struct GNUNET_MQ_Handle *client_mq;
455
456   /**
457    * Time when the conclusion of the consensus should begin.
458    */
459   struct GNUNET_TIME_Absolute conclude_start;
460
461   /**
462    * Timeout for all rounds together, single rounds will schedule a timeout task
463    * with a fraction of the conclude timeout.
464    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
465    */
466   struct GNUNET_TIME_Absolute conclude_deadline;
467
468   struct GNUNET_PeerIdentity *peers;
469
470   /**
471    * Number of other peers in the consensus.
472    */
473   unsigned int num_peers;
474
475   /**
476    * Index of the local peer in the peers array
477    */
478   unsigned int local_peer_idx;
479
480   /**
481    * Listener for requests from other peers.
482    * Uses the session's global id as app id.
483    */
484   struct GNUNET_SET_ListenHandle *set_listener;
485
486   /**
487    * State of our early stopping scheme.
488    */
489   int early_stopping;
490
491   /**
492    * Our set size from the first round.
493    */
494   uint64_t first_size;
495
496   uint64_t *first_sizes_received;
497
498   /**
499    * Bounded Eppstein lower bound.
500    */
501   uint64_t lower_bound;
502 };
503
504 /**
505  * Linked list of sessions this peer participates in.
506  */
507 static struct ConsensusSession *sessions_head;
508
509 /**
510  * Linked list of sessions this peer participates in.
511  */
512 static struct ConsensusSession *sessions_tail;
513
514 /**
515  * Configuration of the consensus service.
516  */
517 static const struct GNUNET_CONFIGURATION_Handle *cfg;
518
519 /**
520  * Peer that runs this service.
521  */
522 static struct GNUNET_PeerIdentity my_peer;
523
524 /**
525  * Statistics handle.
526  */
527 struct GNUNET_STATISTICS_Handle *statistics;
528
529
530 static void
531 finish_task (struct TaskEntry *task);
532
533
534 static void
535 run_ready_steps (struct ConsensusSession *session);
536
537
538 static const char *
539 phasename (uint16_t phase)
540 {
541   switch (phase)
542   {
543     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
544     case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
545     case PHASE_KIND_FINISH: return "FINISH";
546     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
547     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
548     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
549     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
550     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
551     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
552     default: return "(unknown)";
553   }
554 }
555
556
557 static const char *
558 setname (uint16_t kind)
559 {
560   switch (kind)
561   {
562     case SET_KIND_CURRENT: return "CURRENT";
563     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
564     case SET_KIND_NONE: return "NONE";
565     default: return "(unknown)";
566   }
567 }
568
569 static const char *
570 rfnname (uint16_t kind)
571 {
572   switch (kind)
573   {
574     case RFN_KIND_NONE: return "NONE";
575     case RFN_KIND_ECHO: return "ECHO";
576     case RFN_KIND_CONFIRM: return "CONFIRM";
577     default: return "(unknown)";
578   }
579 }
580
581 static const char *
582 diffname (uint16_t kind)
583 {
584   switch (kind)
585   {
586     case DIFF_KIND_NONE: return "NONE";
587     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
588     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
589     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
590     default: return "(unknown)";
591   }
592 }
593
594 #ifdef GNUNET_EXTRA_LOGGING
595
596
597 static const char *
598 debug_str_element (const struct GNUNET_SET_Element *el)
599 {
600   struct GNUNET_HashCode hash;
601
602   GNUNET_SET_element_hash (el, &hash);
603
604   return GNUNET_h2s (&hash);
605 }
606
607 static const char *
608 debug_str_task_key (struct TaskKey *tk)
609 {
610   static char buf[256];
611
612   snprintf (buf, sizeof (buf),
613             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
614             phasename (tk->kind), tk->peer1, tk->peer2,
615             tk->leader, tk->repetition);
616
617   return buf;
618 }
619
620 static const char *
621 debug_str_diff_key (struct DiffKey *dk)
622 {
623   static char buf[256];
624
625   snprintf (buf, sizeof (buf),
626             "DiffKey kind=%s, k1=%d, k2=%d",
627             diffname (dk->diff_kind), dk->k1, dk->k2);
628
629   return buf;
630 }
631
632 static const char *
633 debug_str_set_key (const struct SetKey *sk)
634 {
635   static char buf[256];
636
637   snprintf (buf, sizeof (buf),
638             "SetKey kind=%s, k1=%d, k2=%d",
639             setname (sk->set_kind), sk->k1, sk->k2);
640
641   return buf;
642 }
643
644
645 static const char *
646 debug_str_rfn_key (const struct RfnKey *rk)
647 {
648   static char buf[256];
649
650   snprintf (buf, sizeof (buf),
651             "RfnKey kind=%s, k1=%d, k2=%d",
652             rfnname (rk->rfn_kind), rk->k1, rk->k2);
653
654   return buf;
655 }
656
657 #endif /* GNUNET_EXTRA_LOGGING */
658
659
660 /**
661  * Send the final result set of the consensus to the client, element by
662  * element.
663  *
664  * @param cls closure
665  * @param element the current element, NULL if all elements have been
666  *        iterated over
667  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
668  */
669 static int
670 send_to_client_iter (void *cls,
671                      const struct GNUNET_SET_Element *element)
672 {
673   struct TaskEntry *task = (struct TaskEntry *) cls;
674   struct ConsensusSession *session = task->step->session;
675   struct GNUNET_MQ_Envelope *ev;
676
677   if (NULL != element)
678   {
679     struct GNUNET_CONSENSUS_ElementMessage *m;
680     const struct ConsensusElement *ce;
681
682     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
683     ce = element->data;
684
685     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "marker is %u\n", (unsigned) ce->marker);
686
687     if (0 != ce->marker)
688       return GNUNET_YES;
689
690     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
691                 "P%d: sending element %s to client\n",
692                 session->local_peer_idx,
693                 debug_str_element (element));
694
695     ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
696                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
697     m->element_type = ce->payload_type;
698     GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
699     GNUNET_MQ_send (session->client_mq, ev);
700   }
701   else
702   {
703     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
704                 "P%d: finished iterating elements for client\n",
705                 session->local_peer_idx);
706     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
707     GNUNET_MQ_send (session->client_mq, ev);
708   }
709   return GNUNET_YES;
710 }
711
712
713 static struct SetEntry *
714 lookup_set (struct ConsensusSession *session, struct SetKey *key)
715 {
716   struct GNUNET_HashCode hash;
717
718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719               "P%u: looking up set {%s}\n",
720               session->local_peer_idx,
721               debug_str_set_key (key));
722
723   GNUNET_assert (SET_KIND_NONE != key->set_kind);
724   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
725   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
726 }
727
728
729 static struct DiffEntry *
730 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
731 {
732   struct GNUNET_HashCode hash;
733
734   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735               "P%u: looking up diff {%s}\n",
736               session->local_peer_idx,
737               debug_str_diff_key (key));
738
739   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
740   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
741   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
742 }
743
744
745 static struct ReferendumEntry *
746 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
747 {
748   struct GNUNET_HashCode hash;
749
750   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751               "P%u: looking up rfn {%s}\n",
752               session->local_peer_idx,
753               debug_str_rfn_key (key));
754
755   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
756   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
757   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
758 }
759
760
761 static void
762 diff_insert (struct DiffEntry *diff,
763              int weight,
764              const struct GNUNET_SET_Element *element)
765 {
766   struct DiffElementInfo *di;
767   struct GNUNET_HashCode hash;
768
769   GNUNET_assert ( (1 == weight) || (-1 == weight));
770
771   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
772               "diff_insert with element size %u\n",
773               element->size);
774
775   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776               "hashing element\n");
777
778   GNUNET_SET_element_hash (element, &hash);
779
780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
781               "hashed element\n");
782
783   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
784
785   if (NULL == di)
786   {
787     di = GNUNET_new (struct DiffElementInfo);
788     di->element = GNUNET_SET_element_dup (element);
789     GNUNET_assert (GNUNET_OK ==
790                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
791                                                       &hash, di,
792                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
793   }
794
795   di->weight = weight;
796 }
797
798
799 static void
800 rfn_commit (struct ReferendumEntry *rfn,
801             uint16_t commit_peer)
802 {
803   GNUNET_assert (commit_peer < rfn->num_peers);
804
805   rfn->peer_commited[commit_peer] = GNUNET_YES;
806 }
807
808
809 static void
810 rfn_contest (struct ReferendumEntry *rfn,
811              uint16_t contested_peer)
812 {
813   GNUNET_assert (contested_peer < rfn->num_peers);
814
815   rfn->peer_contested[contested_peer] = GNUNET_YES;
816 }
817
818
819 static uint16_t
820 rfn_noncontested (struct ReferendumEntry *rfn)
821 {
822   uint16_t i;
823   uint16_t ret;
824
825   ret = 0;
826   for (i = 0; i < rfn->num_peers; i++)
827     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
828       ret++;
829
830   return ret;
831 }
832
833
834 static void
835 rfn_vote (struct ReferendumEntry *rfn,
836           uint16_t voting_peer,
837           enum ReferendumVote vote,
838           const struct GNUNET_SET_Element *element)
839 {
840   struct RfnElementInfo *ri;
841   struct GNUNET_HashCode hash;
842
843   GNUNET_assert (voting_peer < rfn->num_peers);
844
845   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
846      since VOTE_KEEP is implicit in not voting. */
847   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
848
849   GNUNET_SET_element_hash (element, &hash);
850   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
851
852   if (NULL == ri)
853   {
854     ri = GNUNET_new (struct RfnElementInfo);
855     ri->element = GNUNET_SET_element_dup (element);
856     ri->votes = GNUNET_new_array (rfn->num_peers, int);
857     GNUNET_assert (GNUNET_OK ==
858                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
859                                                       &hash, ri,
860                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
861   }
862
863   ri->votes[voting_peer] = GNUNET_YES;
864   ri->proposal = vote;
865 }
866
867
868 static uint16_t
869 task_other_peer (struct TaskEntry *task)
870 {
871   uint16_t me = task->step->session->local_peer_idx;
872   if (task->key.peer1 == me)
873     return task->key.peer2;
874   return task->key.peer1;
875 }
876
877
878 static int
879 cmp_uint64_t (const void *pa, const void *pb)
880 {
881   uint64_t a = *(uint64_t *) pa;
882   uint64_t b = *(uint64_t *) pb;
883
884   if (a == b)
885     return 0;
886   if (a < b)
887     return -1;
888   return 1;
889 }
890
891
892 /**
893  * Callback for set operation results. Called for each element
894  * in the result set.
895  *
896  * @param cls closure
897  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
898  * @param current_size current set size
899  * @param status see enum GNUNET_SET_Status
900  */
901 static void
902 set_result_cb (void *cls,
903                const struct GNUNET_SET_Element *element,
904                uint64_t current_size,
905                enum GNUNET_SET_Status status)
906 {
907   struct TaskEntry *task = cls;
908   struct ConsensusSession *session = task->step->session;
909   struct SetEntry *output_set = NULL;
910   struct DiffEntry *output_diff = NULL;
911   struct ReferendumEntry *output_rfn = NULL;
912   unsigned int other_idx;
913   struct SetOpCls *setop;
914   const struct ConsensusElement *consensus_element = NULL;
915
916   if (NULL != element)
917   {
918     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919                 "P%u: got element of type %u, status %u\n",
920                 session->local_peer_idx,
921                 (unsigned) element->element_type,
922                 (unsigned) status);
923     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
924     consensus_element = element->data;
925   }
926
927   setop = &task->cls.setop;
928
929
930   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
931               "P%u: got set result for {%s}, status %u\n",
932               session->local_peer_idx,
933               debug_str_task_key (&task->key),
934               status);
935
936   if (GNUNET_NO == task->is_started)
937   {
938     GNUNET_break_op (0);
939     return;
940   }
941
942   if (GNUNET_YES == task->is_finished)
943   {
944     GNUNET_break_op (0);
945     return;
946   }
947
948   other_idx = task_other_peer (task);
949
950   if (SET_KIND_NONE != setop->output_set.set_kind)
951   {
952     output_set = lookup_set (session, &setop->output_set);
953     GNUNET_assert (NULL != output_set);
954   }
955
956   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
957   {
958     output_diff = lookup_diff (session, &setop->output_diff);
959     GNUNET_assert (NULL != output_diff);
960   }
961
962   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
963   {
964     output_rfn = lookup_rfn (session, &setop->output_rfn);
965     GNUNET_assert (NULL != output_rfn);
966   }
967
968   if (GNUNET_YES == session->peers_blacklisted[other_idx])
969   {
970     /* Peer might have been blacklisted
971        by a gradecast running in parallel, ignore elements from now */
972     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
973       return;
974     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
975       return;
976   }
977
978   if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
979   {
980     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
981                 "P%u: got some marker\n",
982                   session->local_peer_idx);
983     if ( (GNUNET_YES == setop->transceive_contested) &&
984          (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
985     {
986       GNUNET_assert (NULL != output_rfn);
987       rfn_contest (output_rfn, task_other_peer (task));
988       return;
989     }
990
991     if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
992     {
993
994       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
995                   "P%u: got size marker\n",
996                   session->local_peer_idx);
997
998
999       struct ConsensusSizeElement *cse = (void *) consensus_element;
1000
1001       if (cse->sender_index == other_idx)
1002       {
1003         if (NULL == session->first_sizes_received)
1004           session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1005         session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1006
1007         uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1008         qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1009         session->lower_bound = copy[session->num_peers / 3 + 1];
1010         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1011                     "P%u: lower bound %llu\n",
1012                     session->local_peer_idx,
1013                     (long long) session->lower_bound);
1014       }
1015       return;
1016     }
1017
1018     return;
1019   }
1020
1021   switch (status)
1022   {
1023     case GNUNET_SET_STATUS_ADD_LOCAL:
1024       GNUNET_assert (NULL != consensus_element);
1025       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026                   "Adding element in Task {%s}\n",
1027                   debug_str_task_key (&task->key));
1028       if (NULL != output_set)
1029       {
1030         // FIXME: record pending adds, use callback
1031         GNUNET_SET_add_element (output_set->h,
1032                                 element,
1033                                 NULL,
1034                                 NULL);
1035 #ifdef GNUNET_EXTRA_LOGGING
1036         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1037                     "P%u: adding element %s into set {%s} of task {%s}\n",
1038                     session->local_peer_idx,
1039                     debug_str_element (element),
1040                     debug_str_set_key (&setop->output_set),
1041                     debug_str_task_key (&task->key));
1042 #endif
1043       }
1044       if (NULL != output_diff)
1045       {
1046         diff_insert (output_diff, 1, element);
1047 #ifdef GNUNET_EXTRA_LOGGING
1048         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1049                     "P%u: adding element %s into diff {%s} of task {%s}\n",
1050                     session->local_peer_idx,
1051                     debug_str_element (element),
1052                     debug_str_diff_key (&setop->output_diff),
1053                     debug_str_task_key (&task->key));
1054 #endif
1055       }
1056       if (NULL != output_rfn)
1057       {
1058         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1059 #ifdef GNUNET_EXTRA_LOGGING
1060         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1061                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
1062                     session->local_peer_idx,
1063                     debug_str_element (element),
1064                     debug_str_rfn_key (&setop->output_rfn),
1065                     debug_str_task_key (&task->key));
1066 #endif
1067       }
1068       // XXX: add result to structures in task
1069       break;
1070     case GNUNET_SET_STATUS_ADD_REMOTE:
1071       GNUNET_assert (NULL != consensus_element);
1072       if (GNUNET_YES == setop->do_not_remove)
1073         break;
1074       if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1075         break;
1076       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077                   "Removing element in Task {%s}\n",
1078                   debug_str_task_key (&task->key));
1079       if (NULL != output_set)
1080       {
1081         // FIXME: record pending adds, use callback
1082         GNUNET_SET_remove_element (output_set->h,
1083                                    element,
1084                                    NULL,
1085                                    NULL);
1086 #ifdef GNUNET_EXTRA_LOGGING
1087         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1088                     "P%u: removing element %s from set {%s} of task {%s}\n",
1089                     session->local_peer_idx,
1090                     debug_str_element (element),
1091                     debug_str_set_key (&setop->output_set),
1092                     debug_str_task_key (&task->key));
1093 #endif
1094       }
1095       if (NULL != output_diff)
1096       {
1097         diff_insert (output_diff, -1, element);
1098 #ifdef GNUNET_EXTRA_LOGGING
1099         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1100                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1101                     session->local_peer_idx,
1102                     debug_str_element (element),
1103                     debug_str_diff_key (&setop->output_diff),
1104                     debug_str_task_key (&task->key));
1105 #endif
1106       }
1107       if (NULL != output_rfn)
1108       {
1109         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1110 #ifdef GNUNET_EXTRA_LOGGING
1111         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1112                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1113                     session->local_peer_idx,
1114                     debug_str_element (element),
1115                     debug_str_rfn_key (&setop->output_rfn),
1116                     debug_str_task_key (&task->key));
1117 #endif
1118       }
1119       break;
1120     case GNUNET_SET_STATUS_DONE:
1121       // XXX: check first if any changes to the underlying
1122       // set are still pending
1123       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124                   "Finishing setop in Task {%s}\n",
1125                   debug_str_task_key (&task->key));
1126       if (NULL != output_rfn)
1127       {
1128         rfn_commit (output_rfn, task_other_peer (task));
1129       }
1130       if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1131       {
1132         session->first_size = current_size;
1133       }
1134       finish_task (task);
1135       break;
1136     case GNUNET_SET_STATUS_FAILURE:
1137       // XXX: cleanup
1138       GNUNET_break_op (0);
1139       finish_task (task);
1140       return;
1141     default:
1142       /* not reached */
1143       GNUNET_assert (0);
1144   }
1145 }
1146
1147 #ifdef EVIL
1148
1149 enum EvilnessType
1150 {
1151   EVILNESS_NONE,
1152   EVILNESS_CRAM_ALL,
1153   EVILNESS_CRAM_LEAD,
1154   EVILNESS_CRAM_ECHO,
1155   EVILNESS_SLACK,
1156   EVILNESS_SLACK_A2A,
1157 };
1158
1159 enum EvilnessSubType
1160 {
1161   EVILNESS_SUB_NONE,
1162   EVILNESS_SUB_REPLACEMENT,
1163   EVILNESS_SUB_NO_REPLACEMENT,
1164 };
1165
1166 struct Evilness
1167 {
1168   enum EvilnessType type;
1169   enum EvilnessSubType subtype;
1170   unsigned int num;
1171 };
1172
1173
1174 static int
1175 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1176 {
1177   if (0 == strcmp ("replace", evil_subtype_str))
1178   {
1179     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1180   }
1181   else if (0 == strcmp ("noreplace", evil_subtype_str))
1182   {
1183     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1184   }
1185   else
1186   {
1187     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1188                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1189                 evil_subtype_str);
1190     return GNUNET_SYSERR;
1191   }
1192   return GNUNET_OK;
1193 }
1194
1195
1196 static void
1197 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1198 {
1199   char *evil_spec;
1200   char *field;
1201   char *evil_type_str = NULL;
1202   char *evil_subtype_str = NULL;
1203
1204   GNUNET_assert (NULL != evil);
1205
1206   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1207   {
1208     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1209                 "P%u: no evilness\n",
1210                 session->local_peer_idx);
1211     evil->type = EVILNESS_NONE;
1212     return;
1213   }
1214   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215               "P%u: got evilness spec\n",
1216               session->local_peer_idx);
1217
1218   for (field = strtok (evil_spec, "/");
1219        NULL != field;
1220        field = strtok (NULL, "/"))
1221   {
1222     unsigned int peer_num;
1223     unsigned int evil_num;
1224     int ret;
1225
1226     evil_type_str = NULL;
1227     evil_subtype_str = NULL;
1228
1229     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1230
1231     if (ret != 4)
1232     {
1233       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1234                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1235                   field,
1236                   ret);
1237       goto not_evil;
1238     }
1239
1240     GNUNET_assert (NULL != evil_type_str);
1241     GNUNET_assert (NULL != evil_subtype_str);
1242
1243     if (peer_num == session->local_peer_idx)
1244     {
1245       if (0 == strcmp ("slack", evil_type_str))
1246       {
1247         evil->type = EVILNESS_SLACK;
1248       }
1249       if (0 == strcmp ("slack-a2a", evil_type_str))
1250       {
1251         evil->type = EVILNESS_SLACK_A2A;
1252       }
1253       else if (0 == strcmp ("cram-all", evil_type_str))
1254       {
1255         evil->type = EVILNESS_CRAM_ALL;
1256         evil->num = evil_num;
1257         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1258           goto not_evil;
1259       }
1260       else if (0 == strcmp ("cram-lead", evil_type_str))
1261       {
1262         evil->type = EVILNESS_CRAM_LEAD;
1263         evil->num = evil_num;
1264         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1265           goto not_evil;
1266       }
1267       else if (0 == strcmp ("cram-echo", evil_type_str))
1268       {
1269         evil->type = EVILNESS_CRAM_ECHO;
1270         evil->num = evil_num;
1271         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1272           goto not_evil;
1273       }
1274       else
1275       {
1276         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1277                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1278                     evil_type_str);
1279         goto not_evil;
1280       }
1281       goto cleanup;
1282     }
1283     /* No GNUNET_free since memory was allocated by libc */
1284     free (evil_type_str);
1285     evil_type_str = NULL;
1286     evil_subtype_str = NULL;
1287   }
1288 not_evil:
1289   evil->type = EVILNESS_NONE;
1290 cleanup:
1291   GNUNET_free (evil_spec);
1292   /* no GNUNET_free_non_null since it wasn't
1293    * allocated with GNUNET_malloc */
1294   if (NULL != evil_type_str)
1295     free (evil_type_str);
1296   if (NULL != evil_subtype_str)
1297     free (evil_subtype_str);
1298 }
1299
1300 #endif
1301
1302
1303 /**
1304  * Commit the appropriate set for a
1305  * task.
1306  */
1307 static void
1308 commit_set (struct ConsensusSession *session,
1309             struct TaskEntry *task)
1310 {
1311   struct SetEntry *set;
1312   struct SetOpCls *setop = &task->cls.setop;
1313
1314   GNUNET_assert (NULL != setop->op);
1315   set = lookup_set (session, &setop->input_set);
1316   GNUNET_assert (NULL != set);
1317
1318   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1319   {
1320     struct GNUNET_SET_Element element;
1321     struct ConsensusElement ce = { 0 };
1322     ce.marker = CONSENSUS_MARKER_CONTESTED;
1323     element.data = &ce;
1324     element.size = sizeof (struct ConsensusElement);
1325     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1326     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1327   }
1328
1329   if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1330   {
1331     struct GNUNET_SET_Element element;
1332     struct ConsensusSizeElement cse = { 0 };
1333     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting size marker\n");
1334     cse.ce.marker = CONSENSUS_MARKER_SIZE;
1335     cse.size = GNUNET_htonll (session->first_size);
1336     cse.sender_index = session->local_peer_idx;
1337     element.data = &cse;
1338     element.size = sizeof (struct ConsensusSizeElement);
1339     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1340     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1341   }
1342
1343 #ifdef EVIL
1344   {
1345     unsigned int i;
1346     struct Evilness evil;
1347
1348     get_evilness (session, &evil);
1349     if (EVILNESS_NONE != evil.type)
1350     {
1351       /* Useful for evaluation */
1352       GNUNET_STATISTICS_set (statistics,
1353                              "is evil",
1354                              1,
1355                              GNUNET_NO);
1356     }
1357     switch (evil.type)
1358     {
1359       case EVILNESS_CRAM_ALL:
1360       case EVILNESS_CRAM_LEAD:
1361       case EVILNESS_CRAM_ECHO:
1362         /* We're not cramming elements in the
1363            all-to-all round, since that would just
1364            add more elements to the result set, but
1365            wouldn't test robustness. */
1366         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1367         {
1368           GNUNET_SET_commit (setop->op, set->h);
1369           break;
1370         }
1371         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1372             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1373         {
1374           GNUNET_SET_commit (setop->op, set->h);
1375           break;
1376         }
1377         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1378         {
1379           GNUNET_SET_commit (setop->op, set->h);
1380           break;
1381         }
1382         for (i = 0; i < evil.num; i++)
1383         {
1384           struct GNUNET_SET_Element element;
1385           struct ConsensusStuffedElement se = { 0 };
1386           element.data = &se;
1387           element.size = sizeof (struct ConsensusStuffedElement);
1388           element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1389
1390           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1391           {
1392             /* Always generate a new element. */
1393             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1394           }
1395           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1396           {
1397             /* Always cram the same elements, derived from counter. */
1398             GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1399           }
1400           else
1401           {
1402             GNUNET_assert (0);
1403           }
1404           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1405 #ifdef GNUNET_EXTRA_LOGGING
1406           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1407                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1408                       session->local_peer_idx,
1409                       debug_str_element (&element),
1410                       debug_str_set_key (&setop->input_set),
1411                       debug_str_task_key (&task->key));
1412 #endif
1413         }
1414         GNUNET_STATISTICS_update (statistics,
1415                                   "# stuffed elements",
1416                                   evil.num,
1417                                   GNUNET_NO);
1418         GNUNET_SET_commit (setop->op, set->h);
1419         break;
1420       case EVILNESS_SLACK:
1421         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1422                     "P%u: evil peer: slacking\n",
1423                     (unsigned int) session->local_peer_idx);
1424         /* Do nothing. */
1425       case EVILNESS_SLACK_A2A:
1426         if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1427              (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1428         {
1429           struct GNUNET_SET_Handle *empty_set;
1430           empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1431           GNUNET_SET_commit (setop->op, empty_set);
1432           GNUNET_SET_destroy (empty_set);
1433         }
1434         else
1435         {
1436           GNUNET_SET_commit (setop->op, set->h);
1437         }
1438         break;
1439       case EVILNESS_NONE:
1440         GNUNET_SET_commit (setop->op, set->h);
1441         break;
1442     }
1443   }
1444 #else
1445   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1446   {
1447     GNUNET_SET_commit (setop->op, set->h);
1448   }
1449   else
1450   {
1451     /* For our testcases, we don't want the blacklisted
1452        peers to wait. */
1453     GNUNET_SET_operation_cancel (setop->op);
1454     setop->op = NULL;
1455   }
1456 #endif
1457 }
1458
1459
1460 static void
1461 put_diff (struct ConsensusSession *session,
1462          struct DiffEntry *diff)
1463 {
1464   struct GNUNET_HashCode hash;
1465
1466   GNUNET_assert (NULL != diff);
1467
1468   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1469   GNUNET_assert (GNUNET_OK ==
1470                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1471                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1472 }
1473
1474 static void
1475 put_set (struct ConsensusSession *session,
1476          struct SetEntry *set)
1477 {
1478   struct GNUNET_HashCode hash;
1479
1480   GNUNET_assert (NULL != set->h);
1481
1482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1483               "Putting set %s\n",
1484               debug_str_set_key (&set->key));
1485
1486   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1487   GNUNET_assert (GNUNET_SYSERR !=
1488                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1489                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1490 }
1491
1492
1493 static void
1494 put_rfn (struct ConsensusSession *session,
1495          struct ReferendumEntry *rfn)
1496 {
1497   struct GNUNET_HashCode hash;
1498
1499   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1500   GNUNET_assert (GNUNET_OK ==
1501                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1502                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1503 }
1504
1505
1506
1507 static void
1508 task_cancel_reconcile (struct TaskEntry *task)
1509 {
1510   /* not implemented yet */
1511   GNUNET_assert (0);
1512 }
1513
1514
1515 static void
1516 apply_diff_to_rfn (struct DiffEntry *diff,
1517                    struct ReferendumEntry *rfn,
1518                    uint16_t voting_peer,
1519                    uint16_t num_peers)
1520 {
1521   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1522   struct DiffElementInfo *di;
1523
1524   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1525
1526   while (GNUNET_YES ==
1527          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1528                                                       NULL,
1529                                                       (const void **) &di))
1530   {
1531     if (di->weight > 0)
1532     {
1533       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1534     }
1535     if (di->weight < 0)
1536     {
1537       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1538     }
1539   }
1540
1541   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1542 }
1543
1544
1545 struct DiffEntry *
1546 diff_create ()
1547 {
1548   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1549
1550   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1551
1552   return d;
1553 }
1554
1555
1556 struct DiffEntry *
1557 diff_compose (struct DiffEntry *diff_1,
1558               struct DiffEntry *diff_2)
1559 {
1560   struct DiffEntry *diff_new;
1561   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1562   struct DiffElementInfo *di;
1563
1564   diff_new = diff_create ();
1565
1566   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1567   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1568   {
1569     diff_insert (diff_new, di->weight, di->element);
1570   }
1571   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1572
1573   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1574   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1575   {
1576     diff_insert (diff_new, di->weight, di->element);
1577   }
1578   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1579
1580   return diff_new;
1581 }
1582
1583
1584 struct ReferendumEntry *
1585 rfn_create (uint16_t size)
1586 {
1587   struct ReferendumEntry *rfn;
1588
1589   rfn = GNUNET_new (struct ReferendumEntry);
1590   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1591   rfn->peer_commited = GNUNET_new_array (size, int);
1592   rfn->peer_contested = GNUNET_new_array (size, int);
1593   rfn->num_peers = size;
1594
1595   return rfn;
1596 }
1597
1598
1599 #if UNUSED
1600 static void
1601 diff_destroy (struct DiffEntry *diff)
1602 {
1603   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1604   GNUNET_free (diff);
1605 }
1606 #endif
1607
1608
1609 /**
1610  * For a given majority, count what the outcome
1611  * is (add/remove/keep), and give the number
1612  * of peers that voted for this outcome.
1613  */
1614 static void
1615 rfn_majority (const struct ReferendumEntry *rfn,
1616               const struct RfnElementInfo *ri,
1617               uint16_t *ret_majority,
1618               enum ReferendumVote *ret_vote)
1619 {
1620   uint16_t votes_yes = 0;
1621   uint16_t num_commited = 0;
1622   uint16_t i;
1623
1624   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1625               "Computing rfn majority for element %s of rfn {%s}\n",
1626               debug_str_element (ri->element),
1627               debug_str_rfn_key (&rfn->key));
1628
1629   for (i = 0; i < rfn->num_peers; i++)
1630   {
1631     if (GNUNET_NO == rfn->peer_commited[i])
1632       continue;
1633     num_commited++;
1634
1635     if (GNUNET_YES == ri->votes[i])
1636       votes_yes++;
1637   }
1638
1639   if (votes_yes > (num_commited) / 2)
1640   {
1641     *ret_vote = ri->proposal;
1642     *ret_majority = votes_yes;
1643   }
1644   else
1645   {
1646     *ret_vote = VOTE_STAY;
1647     *ret_majority = num_commited - votes_yes;
1648   }
1649 }
1650
1651
1652 struct SetCopyCls
1653 {
1654   struct TaskEntry *task;
1655   struct SetKey dst_set_key;
1656 };
1657
1658
1659 static void
1660 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1661 {
1662   struct SetCopyCls *scc = cls;
1663   struct TaskEntry *task = scc->task;
1664   struct SetKey dst_set_key = scc->dst_set_key;
1665   struct SetEntry *set;
1666
1667   GNUNET_free (scc);
1668   set = GNUNET_new (struct SetEntry);
1669   set->h = copy;
1670   set->key = dst_set_key;
1671   put_set (task->step->session, set);
1672
1673   task->start (task);
1674 }
1675
1676
1677 /**
1678  * Call the start function of the given
1679  * task again after we created a copy of the given set.
1680  */
1681 static void
1682 create_set_copy_for_task (struct TaskEntry *task,
1683                           struct SetKey *src_set_key,
1684                           struct SetKey *dst_set_key)
1685 {
1686   struct SetEntry *src_set;
1687   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1688
1689   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1690               "Copying set {%s} to {%s} for task {%s}\n",
1691               debug_str_set_key (src_set_key),
1692               debug_str_set_key (dst_set_key),
1693               debug_str_task_key (&task->key));
1694
1695   scc->task = task;
1696   scc->dst_set_key = *dst_set_key;
1697   src_set = lookup_set (task->step->session, src_set_key);
1698   GNUNET_assert (NULL != src_set);
1699   GNUNET_SET_copy_lazy (src_set->h,
1700                         set_copy_cb,
1701                         scc);
1702 }
1703
1704
1705 struct SetMutationProgressCls
1706 {
1707   int num_pending;
1708   /**
1709    * Task to finish once all changes are through.
1710    */
1711   struct TaskEntry *task;
1712 };
1713
1714
1715 static void
1716 set_mutation_done (void *cls)
1717 {
1718   struct SetMutationProgressCls *pc = cls;
1719
1720   GNUNET_assert (pc->num_pending > 0);
1721
1722   pc->num_pending--;
1723
1724   if (0 == pc->num_pending)
1725   {
1726     struct TaskEntry *task = pc->task;
1727     GNUNET_free (pc);
1728     finish_task (task);
1729   }
1730 }
1731
1732
1733 static void
1734 try_finish_step_early (struct Step *step)
1735 {
1736   unsigned int i;
1737
1738   if (GNUNET_YES == step->is_running)
1739     return;
1740   if (GNUNET_YES == step->is_finished)
1741     return;
1742   if (GNUNET_NO == step->early_finishable)
1743     return;
1744
1745   step->is_finished = GNUNET_YES;
1746
1747 #ifdef GNUNET_EXTRA_LOGGING
1748   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1749               "Finishing step `%s' early.\n",
1750               step->debug_name);
1751 #endif
1752
1753   for (i = 0; i < step->subordinates_len; i++)
1754   {
1755     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1756     step->subordinates[i]->pending_prereq--;
1757 #ifdef GNUNET_EXTRA_LOGGING
1758     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1759                 "Decreased pending_prereq to %u for step `%s'.\n",
1760                 (unsigned int) step->subordinates[i]->pending_prereq,
1761                 step->subordinates[i]->debug_name);
1762
1763 #endif
1764     try_finish_step_early (step->subordinates[i]);
1765   }
1766
1767   // XXX: maybe schedule as task to avoid recursion?
1768   run_ready_steps (step->session);
1769 }
1770
1771
1772 static void
1773 finish_step (struct Step *step)
1774 {
1775   unsigned int i;
1776
1777   GNUNET_assert (step->finished_tasks == step->tasks_len);
1778   GNUNET_assert (GNUNET_YES == step->is_running);
1779   GNUNET_assert (GNUNET_NO == step->is_finished);
1780
1781 #ifdef GNUNET_EXTRA_LOGGING
1782   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1783               "All tasks of step `%s' with %u subordinates finished.\n",
1784               step->debug_name,
1785               step->subordinates_len);
1786 #endif
1787
1788   for (i = 0; i < step->subordinates_len; i++)
1789   {
1790     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1791     step->subordinates[i]->pending_prereq--;
1792 #ifdef GNUNET_EXTRA_LOGGING
1793     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1794                 "Decreased pending_prereq to %u for step `%s'.\n",
1795                 (unsigned int) step->subordinates[i]->pending_prereq,
1796                 step->subordinates[i]->debug_name);
1797
1798 #endif
1799   }
1800
1801   step->is_finished = GNUNET_YES;
1802
1803   // XXX: maybe schedule as task to avoid recursion?
1804   run_ready_steps (step->session);
1805 }
1806
1807
1808
1809 /**
1810  * Apply the result from one round of gradecasts (i.e. every peer
1811  * should have gradecasted) to the peer's current set.
1812  *
1813  * @param task the task with context information
1814  */
1815 static void
1816 task_start_apply_round (struct TaskEntry *task)
1817 {
1818   struct ConsensusSession *session = task->step->session;
1819   struct SetKey sk_in;
1820   struct SetKey sk_out;
1821   struct RfnKey rk_in;
1822   struct SetEntry *set_out;
1823   struct ReferendumEntry *rfn_in;
1824   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1825   struct RfnElementInfo *ri;
1826   struct SetMutationProgressCls *progress_cls;
1827   uint16_t worst_majority = UINT16_MAX;
1828
1829   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1830   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1831   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1832
1833   set_out = lookup_set (session, &sk_out);
1834   if (NULL == set_out)
1835   {
1836     create_set_copy_for_task (task, &sk_in, &sk_out);
1837     return;
1838   }
1839
1840   rfn_in = lookup_rfn (session, &rk_in);
1841   GNUNET_assert (NULL != rfn_in);
1842
1843   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1844   progress_cls->task = task;
1845
1846   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1847
1848   while (GNUNET_YES ==
1849          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1850                                                       NULL,
1851                                                       (const void **) &ri))
1852   {
1853     uint16_t majority_num;
1854     enum ReferendumVote majority_vote;
1855
1856     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1857
1858     if (worst_majority > majority_num)
1859       worst_majority = majority_num;
1860
1861     switch (majority_vote)
1862     {
1863       case VOTE_ADD:
1864         progress_cls->num_pending++;
1865         GNUNET_assert (GNUNET_OK ==
1866                        GNUNET_SET_add_element (set_out->h,
1867                                                ri->element,
1868                                                &set_mutation_done,
1869                                                progress_cls));
1870         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1871                     "P%u: apply round: adding element %s with %u-majority.\n",
1872                     session->local_peer_idx,
1873                     debug_str_element (ri->element), majority_num);
1874         break;
1875       case VOTE_REMOVE:
1876         progress_cls->num_pending++;
1877         GNUNET_assert (GNUNET_OK ==
1878                        GNUNET_SET_remove_element (set_out->h,
1879                                                   ri->element,
1880                                                   &set_mutation_done,
1881                                                   progress_cls));
1882         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1883                     "P%u: apply round: deleting element %s with %u-majority.\n",
1884                     session->local_peer_idx,
1885                     debug_str_element (ri->element), majority_num);
1886         break;
1887       case VOTE_STAY:
1888         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1889                     "P%u: apply round: keeping element %s with %u-majority.\n",
1890                     session->local_peer_idx,
1891                     debug_str_element (ri->element), majority_num);
1892         // do nothing
1893         break;
1894       default:
1895         GNUNET_assert (0);
1896         break;
1897     }
1898   }
1899
1900   if (0 == progress_cls->num_pending)
1901   {
1902     // call closure right now, no pending ops
1903     GNUNET_free (progress_cls);
1904     finish_task (task);
1905   }
1906
1907   {
1908     uint16_t thresh = (session->num_peers / 3) * 2;
1909
1910     if (worst_majority >= thresh)
1911     {
1912       switch (session->early_stopping)
1913       {
1914         case EARLY_STOPPING_NONE:
1915           session->early_stopping = EARLY_STOPPING_ONE_MORE;
1916           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1917                       "P%u: Stopping early (after one more superround)\n",
1918                       session->local_peer_idx);
1919           break;
1920         case EARLY_STOPPING_ONE_MORE:
1921           GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1922                       session->local_peer_idx);
1923           session->early_stopping = EARLY_STOPPING_DONE;
1924           {
1925             struct Step *step;
1926             for (step = session->steps_head; NULL != step; step = step->next)
1927               try_finish_step_early (step);
1928           }
1929           break;
1930         case EARLY_STOPPING_DONE:
1931           /* We shouldn't be here anymore after early stopping */
1932           GNUNET_break (0);
1933           break;
1934         default:
1935           GNUNET_assert (0);
1936           break;
1937       }
1938     }
1939     else if (EARLY_STOPPING_NONE != session->early_stopping)
1940     {
1941       // Our assumption about the number of bad peers
1942       // has been broken.
1943       GNUNET_break_op (0);
1944     }
1945     else
1946     {
1947       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1948                   session->local_peer_idx);
1949     }
1950   }
1951   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1952 }
1953
1954
1955 static void
1956 task_start_grade (struct TaskEntry *task)
1957 {
1958   struct ConsensusSession *session = task->step->session;
1959   struct ReferendumEntry *output_rfn;
1960   struct ReferendumEntry *input_rfn;
1961   struct DiffEntry *input_diff;
1962   struct RfnKey rfn_key;
1963   struct DiffKey diff_key;
1964   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1965   struct RfnElementInfo *ri;
1966   unsigned int gradecast_confidence = 2;
1967
1968   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1969   output_rfn = lookup_rfn (session, &rfn_key);
1970   if (NULL == output_rfn)
1971   {
1972     output_rfn = rfn_create (session->num_peers);
1973     output_rfn->key = rfn_key;
1974     put_rfn (session, output_rfn);
1975   }
1976
1977   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1978   input_diff = lookup_diff (session, &diff_key);
1979   GNUNET_assert (NULL != input_diff);
1980
1981   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1982   input_rfn = lookup_rfn (session, &rfn_key);
1983   GNUNET_assert (NULL != input_rfn);
1984
1985   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1986
1987   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1988
1989   while (GNUNET_YES ==
1990          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1991                                                       NULL,
1992                                                       (const void **) &ri))
1993   {
1994     uint16_t majority_num;
1995     enum ReferendumVote majority_vote;
1996
1997     // XXX: we need contested votes and non-contested votes here
1998     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1999
2000     if (majority_num <= session->num_peers / 3)
2001       majority_vote = VOTE_REMOVE;
2002
2003     switch (majority_vote)
2004     {
2005       case VOTE_STAY:
2006         break;
2007       case VOTE_ADD:
2008         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2009         break;
2010       case VOTE_REMOVE:
2011         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2012         break;
2013       default:
2014         GNUNET_assert (0);
2015         break;
2016     }
2017   }
2018   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2019
2020   {
2021     uint16_t noncontested;
2022     noncontested = rfn_noncontested (input_rfn);
2023     if (noncontested < (session->num_peers / 3) * 2)
2024     {
2025       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2026     }
2027     if (noncontested < (session->num_peers / 3) + 1)
2028     {
2029       gradecast_confidence = 0;
2030     }
2031   }
2032
2033   if (gradecast_confidence >= 1)
2034     rfn_commit (output_rfn, task->key.leader);
2035
2036   if (gradecast_confidence <= 1)
2037     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2038
2039   finish_task (task);
2040 }
2041
2042
2043 static void
2044 task_start_reconcile (struct TaskEntry *task)
2045 {
2046   struct SetEntry *input;
2047   struct SetOpCls *setop = &task->cls.setop;
2048   struct ConsensusSession *session = task->step->session;
2049
2050   input = lookup_set (session, &setop->input_set);
2051   GNUNET_assert (NULL != input);
2052   GNUNET_assert (NULL != input->h);
2053
2054   /* We create the outputs for the operation here
2055      (rather than in the set operation callback)
2056      because we want something valid in there, even
2057      if the other peer doesn't talk to us */
2058
2059   if (SET_KIND_NONE != setop->output_set.set_kind)
2060   {
2061     /* If we don't have an existing output set,
2062        we clone the input set. */
2063     if (NULL == lookup_set (session, &setop->output_set))
2064     {
2065       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2066       return;
2067     }
2068   }
2069
2070   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2071   {
2072     if (NULL == lookup_rfn (session, &setop->output_rfn))
2073     {
2074       struct ReferendumEntry *rfn;
2075
2076       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2077                   "P%u: output rfn <%s> missing, creating.\n",
2078                   session->local_peer_idx,
2079                   debug_str_rfn_key (&setop->output_rfn));
2080
2081       rfn = rfn_create (session->num_peers);
2082       rfn->key = setop->output_rfn;
2083       put_rfn (session, rfn);
2084     }
2085   }
2086
2087   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2088   {
2089     if (NULL == lookup_diff (session, &setop->output_diff))
2090     {
2091       struct DiffEntry *diff;
2092
2093       diff = diff_create ();
2094       diff->key = setop->output_diff;
2095       put_diff (session, diff);
2096     }
2097   }
2098
2099   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2100   {
2101     /* XXX: mark the corresponding rfn as commited if necessary */
2102     finish_task (task);
2103     return;
2104   }
2105
2106   if (task->key.peer1 == session->local_peer_idx)
2107   {
2108     struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
2109
2110     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2111                 "P%u: Looking up set {%s} to run remote union\n",
2112                 session->local_peer_idx,
2113                 debug_str_set_key (&setop->input_set));
2114
2115     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2116     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2117
2118     rcm.kind = htons (task->key.kind);
2119     rcm.peer1 = htons (task->key.peer1);
2120     rcm.peer2 = htons (task->key.peer2);
2121     rcm.leader = htons (task->key.leader);
2122     rcm.repetition = htons (task->key.repetition);
2123
2124     GNUNET_assert (NULL == setop->op);
2125     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2126                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2127
2128     struct GNUNET_SET_Option opts[] = {
2129       { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2130       { GNUNET_SET_OPTION_END },
2131     };
2132
2133     // XXX: maybe this should be done while
2134     // setting up tasks alreays?
2135     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2136                                     &session->global_id,
2137                                     &rcm.header,
2138                                     GNUNET_SET_RESULT_SYMMETRIC,
2139                                     opts,
2140                                     set_result_cb,
2141                                     task);
2142
2143     commit_set (session, task);
2144   }
2145   else if (task->key.peer2 == session->local_peer_idx)
2146   {
2147     /* Wait for the other peer to contact us */
2148     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2149                 session->local_peer_idx, task->key.peer1);
2150
2151     if (NULL != setop->op)
2152     {
2153       commit_set (session, task);
2154     }
2155   }
2156   else
2157   {
2158     /* We made an error while constructing the task graph. */
2159     GNUNET_assert (0);
2160   }
2161 }
2162
2163
2164 static void
2165 task_start_eval_echo (struct TaskEntry *task)
2166 {
2167   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2168   struct ReferendumEntry *input_rfn;
2169   struct RfnElementInfo *ri;
2170   struct SetEntry *output_set;
2171   struct SetMutationProgressCls *progress_cls;
2172   struct ConsensusSession *session = task->step->session;
2173   struct SetKey sk_in;
2174   struct SetKey sk_out;
2175   struct RfnKey rk_in;
2176
2177   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2178   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2179   output_set = lookup_set (session, &sk_out);
2180   if (NULL == output_set)
2181   {
2182     create_set_copy_for_task (task, &sk_in, &sk_out);
2183     return;
2184   }
2185
2186
2187   {
2188     // FIXME: should be marked as a shallow copy, so
2189     // we can destroy everything correctly
2190     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2191     last_set->h = output_set->h;
2192     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2193     put_set (session, last_set);
2194   }
2195
2196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2197               "Evaluating referendum in Task {%s}\n",
2198               debug_str_task_key (&task->key));
2199
2200   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2201   progress_cls->task = task;
2202
2203   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2204   input_rfn = lookup_rfn (session, &rk_in);
2205
2206   GNUNET_assert (NULL != input_rfn);
2207
2208   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2209   GNUNET_assert (NULL != iter);
2210
2211   while (GNUNET_YES ==
2212          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2213                                                       NULL,
2214                                                       (const void **) &ri))
2215   {
2216     enum ReferendumVote majority_vote;
2217     uint16_t majority_num;
2218
2219     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2220
2221     if (majority_num < session->num_peers / 3)
2222     {
2223       /* It is not the case that all nonfaulty peers
2224          echoed the same value.  Since we're doing a set reconciliation, we
2225          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2226          reconciliation as contested.  Other peers might not know that the
2227          leader is faulty, thus we still re-distribute in the confirmation
2228          round. */
2229       output_set->is_contested = GNUNET_YES;
2230     }
2231
2232     switch (majority_vote)
2233     {
2234       case VOTE_ADD:
2235         progress_cls->num_pending++;
2236         GNUNET_assert (GNUNET_OK ==
2237                        GNUNET_SET_add_element (output_set->h,
2238                                                ri->element,
2239                                                set_mutation_done,
2240                                                progress_cls));
2241         break;
2242       case VOTE_REMOVE:
2243         progress_cls->num_pending++;
2244         GNUNET_assert (GNUNET_OK ==
2245                        GNUNET_SET_remove_element (output_set->h,
2246                                                   ri->element,
2247                                                   set_mutation_done,
2248                                                   progress_cls));
2249         break;
2250       case VOTE_STAY:
2251         /* Nothing to do. */
2252         break;
2253       default:
2254         /* not reached */
2255         GNUNET_assert (0);
2256     }
2257   }
2258
2259   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2260
2261   if (0 == progress_cls->num_pending)
2262   {
2263     // call closure right now, no pending ops
2264     GNUNET_free (progress_cls);
2265     finish_task (task);
2266   }
2267 }
2268
2269
2270 static void
2271 task_start_finish (struct TaskEntry *task)
2272 {
2273   struct SetEntry *final_set;
2274   struct ConsensusSession *session = task->step->session;
2275
2276   final_set = lookup_set (session, &task->cls.finish.input_set);
2277
2278   GNUNET_assert (NULL != final_set);
2279
2280
2281   GNUNET_SET_iterate (final_set->h,
2282                       send_to_client_iter,
2283                       task);
2284 }
2285
2286 static void
2287 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2288 {
2289   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2290
2291   GNUNET_assert (GNUNET_NO == task->is_started);
2292   GNUNET_assert (GNUNET_NO == task->is_finished);
2293   GNUNET_assert (NULL != task->start);
2294
2295   task->start (task);
2296
2297   task->is_started = GNUNET_YES;
2298 }
2299
2300
2301
2302
2303 /*
2304  * Run all steps of the session that don't any
2305  * more dependencies.
2306  */
2307 static void
2308 run_ready_steps (struct ConsensusSession *session)
2309 {
2310   struct Step *step;
2311
2312   step = session->steps_head;
2313
2314   while (NULL != step)
2315   {
2316     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2317     {
2318       size_t i;
2319
2320       GNUNET_assert (0 == step->finished_tasks);
2321
2322 #ifdef GNUNET_EXTRA_LOGGING
2323       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2324                   session->local_peer_idx,
2325                   step->debug_name,
2326                   step->round, step->tasks_len, step->subordinates_len);
2327 #endif
2328
2329       step->is_running = GNUNET_YES;
2330       for (i = 0; i < step->tasks_len; i++)
2331         start_task (session, step->tasks[i]);
2332
2333       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2334       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2335         finish_step (step);
2336
2337       /* Running the next ready steps will be triggered by task completion */
2338       return;
2339     }
2340     step = step->next;
2341   }
2342
2343   return;
2344 }
2345
2346
2347
2348 static void
2349 finish_task (struct TaskEntry *task)
2350 {
2351   GNUNET_assert (GNUNET_NO == task->is_finished);
2352   task->is_finished = GNUNET_YES;
2353
2354   task->step->finished_tasks++;
2355
2356   if (task->step->finished_tasks == task->step->tasks_len)
2357     finish_step (task->step);
2358 }
2359
2360
2361 /**
2362  * Search peer in the list of peers in session.
2363  *
2364  * @param peer peer to find
2365  * @param session session with peer
2366  * @return index of peer, -1 if peer is not in session
2367  */
2368 static int
2369 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2370 {
2371   int i;
2372   for (i = 0; i < session->num_peers; i++)
2373     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2374       return i;
2375   return -1;
2376 }
2377
2378
2379 /**
2380  * Compute a global, (hopefully) unique consensus session id,
2381  * from the local id of the consensus session, and the identities of all participants.
2382  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2383  * exactly the same peers, the global id will be different.
2384  *
2385  * @param session session to generate the global id for
2386  * @param local_session_id local id of the consensus session
2387  */
2388 static void
2389 compute_global_id (struct ConsensusSession *session,
2390                    const struct GNUNET_HashCode *local_session_id)
2391 {
2392   const char *salt = "gnunet-service-consensus/session_id";
2393
2394   GNUNET_assert (GNUNET_YES ==
2395                  GNUNET_CRYPTO_kdf (&session->global_id,
2396                                     sizeof (struct GNUNET_HashCode),
2397                                     salt,
2398                                     strlen (salt),
2399                                     session->peers,
2400                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2401                                     local_session_id,
2402                                     sizeof (struct GNUNET_HashCode),
2403                                     NULL));
2404 }
2405
2406
2407 /**
2408  * Compare two peer identities.
2409  *
2410  * @param h1 some peer identity
2411  * @param h2 some peer identity
2412  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2413  */
2414 static int
2415 peer_id_cmp (const void *h1, const void *h2)
2416 {
2417   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2418 }
2419
2420
2421 /**
2422  * Create the sorted list of peers for the session,
2423  * add the local peer if not in the join message.
2424  *
2425  * @param session session to initialize
2426  * @param join_msg join message with the list of peers participating at the end
2427  */
2428 static void
2429 initialize_session_peer_list (struct ConsensusSession *session,
2430                               const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2431 {
2432   const struct GNUNET_PeerIdentity *msg_peers
2433     = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2434   int local_peer_in_list;
2435
2436   session->num_peers = ntohl (join_msg->num_peers);
2437
2438   /* Peers in the join message, may or may not include the local peer,
2439      Add it if it is missing. */
2440   local_peer_in_list = GNUNET_NO;
2441   for (unsigned int i = 0; i < session->num_peers; i++)
2442   {
2443     if (0 == memcmp (&msg_peers[i],
2444                      &my_peer,
2445                      sizeof (struct GNUNET_PeerIdentity)))
2446     {
2447       local_peer_in_list = GNUNET_YES;
2448       break;
2449     }
2450   }
2451   if (GNUNET_NO == local_peer_in_list)
2452     session->num_peers++;
2453
2454   session->peers = GNUNET_new_array (session->num_peers,
2455                                      struct GNUNET_PeerIdentity);
2456   if (GNUNET_NO == local_peer_in_list)
2457     session->peers[session->num_peers - 1] = my_peer;
2458
2459   GNUNET_memcpy (session->peers,
2460                  msg_peers,
2461                  ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2462   qsort (session->peers,
2463          session->num_peers,
2464          sizeof (struct GNUNET_PeerIdentity),
2465          &peer_id_cmp);
2466 }
2467
2468
2469 static struct TaskEntry *
2470 lookup_task (struct ConsensusSession *session,
2471              struct TaskKey *key)
2472 {
2473   struct GNUNET_HashCode hash;
2474
2475
2476   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2477   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2478               GNUNET_h2s (&hash));
2479   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2480 }
2481
2482
2483 /**
2484  * Called when another peer wants to do a set operation with the
2485  * local peer.
2486  *
2487  * @param cls closure
2488  * @param other_peer the other peer
2489  * @param context_msg message with application specific information from
2490  *        the other peer
2491  * @param request request from the other peer, use GNUNET_SET_accept
2492  *        to accept it, otherwise the request will be refused
2493  *        Note that we don't use a return value here, as it is also
2494  *        necessary to specify the set we want to do the operation with,
2495  *        whith sometimes can be derived from the context message.
2496  *        Also necessary to specify the timeout.
2497  */
2498 static void
2499 set_listen_cb (void *cls,
2500                const struct GNUNET_PeerIdentity *other_peer,
2501                const struct GNUNET_MessageHeader *context_msg,
2502                struct GNUNET_SET_Request *request)
2503 {
2504   struct ConsensusSession *session = cls;
2505   struct TaskKey tk;
2506   struct TaskEntry *task;
2507   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2508
2509   if (NULL == context_msg)
2510   {
2511     GNUNET_break_op (0);
2512     return;
2513   }
2514
2515   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2516   {
2517     GNUNET_break_op (0);
2518     return;
2519   }
2520
2521   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2522   {
2523     GNUNET_break_op (0);
2524     return;
2525   }
2526
2527   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2528
2529   tk = ((struct TaskKey) {
2530       .kind = ntohs (cm->kind),
2531       .peer1 = ntohs (cm->peer1),
2532       .peer2 = ntohs (cm->peer2),
2533       .repetition = ntohs (cm->repetition),
2534       .leader = ntohs (cm->leader),
2535   });
2536
2537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2538               session->local_peer_idx, debug_str_task_key (&tk));
2539
2540   task = lookup_task (session, &tk);
2541
2542   if (NULL == task)
2543   {
2544     GNUNET_break_op (0);
2545     return;
2546   }
2547
2548   if (GNUNET_YES == task->is_finished)
2549   {
2550     GNUNET_break_op (0);
2551     return;
2552   }
2553
2554   if (task->key.peer2 != session->local_peer_idx)
2555   {
2556     /* We're being asked, so we must be thne 2nd peer. */
2557     GNUNET_break_op (0);
2558     return;
2559   }
2560
2561   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2562                     (task->key.peer2 == session->local_peer_idx)));
2563
2564   struct GNUNET_SET_Option opts[] = {
2565     { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2566     { GNUNET_SET_OPTION_END },
2567   };
2568
2569   task->cls.setop.op = GNUNET_SET_accept (request,
2570                                           GNUNET_SET_RESULT_SYMMETRIC,
2571                                           opts,
2572                                           set_result_cb,
2573                                           task);
2574
2575   /* If the task hasn't been started yet,
2576      we wait for that until we commit. */
2577
2578   if (GNUNET_YES == task->is_started)
2579   {
2580     commit_set (session, task);
2581   }
2582 }
2583
2584
2585
2586 static void
2587 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2588           struct TaskEntry *t)
2589 {
2590   struct GNUNET_HashCode round_hash;
2591   struct Step *s;
2592
2593   GNUNET_assert (NULL != t->step);
2594
2595   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2596
2597   s = t->step;
2598
2599   if (s->tasks_len == s->tasks_cap)
2600   {
2601     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2602     GNUNET_array_grow (s->tasks,
2603                        s->tasks_cap,
2604                        target_size);
2605   }
2606
2607 #ifdef GNUNET_EXTRA_LOGGING
2608   GNUNET_assert (NULL != s->debug_name);
2609   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2610               debug_str_task_key (&t->key),
2611               s->debug_name);
2612 #endif
2613
2614   s->tasks[s->tasks_len] = t;
2615   s->tasks_len++;
2616
2617   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2618   GNUNET_assert (GNUNET_OK ==
2619       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2620                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2621 }
2622
2623
2624 static void
2625 install_step_timeouts (struct ConsensusSession *session)
2626 {
2627   /* Given the fully constructed task graph
2628      with rounds for tasks, we can give the tasks timeouts. */
2629
2630   // unsigned int max_round;
2631
2632   /* XXX: implement! */
2633 }
2634
2635
2636
2637 /*
2638  * Arrange two peers in some canonical order.
2639  */
2640 static void
2641 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2642 {
2643   uint16_t a;
2644   uint16_t b;
2645
2646   GNUNET_assert (*p1 < n);
2647   GNUNET_assert (*p2 < n);
2648
2649   if (*p1 < *p2)
2650   {
2651     a = *p1;
2652     b = *p2;
2653   }
2654   else
2655   {
2656     a = *p2;
2657     b = *p1;
2658   }
2659
2660   /* For uniformly random *p1, *p2,
2661      this condition is true with 50% chance */
2662   if (((b - a) + n) % n <= n / 2)
2663   {
2664     *p1 = a;
2665     *p2 = b;
2666   }
2667   else
2668   {
2669     *p1 = b;
2670     *p2 = a;
2671   }
2672 }
2673
2674
2675 /**
2676  * Record @a dep as a dependency of @a step.
2677  */
2678 static void
2679 step_depend_on (struct Step *step, struct Step *dep)
2680 {
2681   /* We're not checking for cyclic dependencies,
2682      but this is a cheap sanity check. */
2683   GNUNET_assert (step != dep);
2684   GNUNET_assert (NULL != step);
2685   GNUNET_assert (NULL != dep);
2686   GNUNET_assert (dep->round <= step->round);
2687
2688 #ifdef GNUNET_EXTRA_LOGGING
2689   /* Make sure we have complete debugging information.
2690      Also checks that we don't screw up too badly
2691      constructing the task graph. */
2692   GNUNET_assert (NULL != step->debug_name);
2693   GNUNET_assert (NULL != dep->debug_name);
2694   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2695               "Making step `%s' depend on `%s'\n",
2696               step->debug_name,
2697               dep->debug_name);
2698 #endif
2699
2700   if (dep->subordinates_cap == dep->subordinates_len)
2701   {
2702     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2703     GNUNET_array_grow (dep->subordinates,
2704                        dep->subordinates_cap,
2705                        target_size);
2706   }
2707
2708   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2709
2710   dep->subordinates[dep->subordinates_len] = step;
2711   dep->subordinates_len++;
2712
2713   step->pending_prereq++;
2714 }
2715
2716
2717 static struct Step *
2718 create_step (struct ConsensusSession *session, int round, int early_finishable)
2719 {
2720   struct Step *step;
2721   step = GNUNET_new (struct Step);
2722   step->session = session;
2723   step->round = round;
2724   step->early_finishable = early_finishable;
2725   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2726                                     session->steps_tail,
2727                                     step);
2728   return step;
2729 }
2730
2731
2732 /**
2733  * Construct the task graph for a single
2734  * gradecast.
2735  */
2736 static void
2737 construct_task_graph_gradecast (struct ConsensusSession *session,
2738                                 uint16_t rep,
2739                                 uint16_t lead,
2740                                 struct Step *step_before,
2741                                 struct Step *step_after)
2742 {
2743   uint16_t n = session->num_peers;
2744   uint16_t me = session->local_peer_idx;
2745
2746   uint16_t p1;
2747   uint16_t p2;
2748
2749   /* The task we're currently setting up. */
2750   struct TaskEntry task;
2751
2752   struct Step *step;
2753   struct Step *prev_step;
2754
2755   uint16_t round;
2756
2757   unsigned int k;
2758
2759   round = step_before->round + 1;
2760
2761   /* gcast step 1: leader disseminates */
2762
2763   step = create_step (session, round, GNUNET_YES);
2764
2765 #ifdef GNUNET_EXTRA_LOGGING
2766   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2767 #endif
2768   step_depend_on (step, step_before);
2769
2770   if (lead == me)
2771   {
2772     for (k = 0; k < n; k++)
2773     {
2774       if (k == me)
2775         continue;
2776       p1 = me;
2777       p2 = k;
2778       arrange_peers (&p1, &p2, n);
2779       task = ((struct TaskEntry) {
2780         .step = step,
2781         .start = task_start_reconcile,
2782         .cancel = task_cancel_reconcile,
2783         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2784       });
2785       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2786       put_task (session->taskmap, &task);
2787     }
2788     /* We run this task to make sure that the leader
2789        has the stored the SET_KIND_LEADER set of himself,
2790        so he can participate in the rest of the gradecast
2791        without the code having to handle any special cases. */
2792     task = ((struct TaskEntry) {
2793       .step = step,
2794       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2795       .start = task_start_reconcile,
2796       .cancel = task_cancel_reconcile,
2797     });
2798     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2799     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2800     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2801     put_task (session->taskmap, &task);
2802   }
2803   else
2804   {
2805     p1 = me;
2806     p2 = lead;
2807     arrange_peers (&p1, &p2, n);
2808     task = ((struct TaskEntry) {
2809       .step = step,
2810       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2811       .start = task_start_reconcile,
2812       .cancel = task_cancel_reconcile,
2813     });
2814     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2815     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2816     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2817     put_task (session->taskmap, &task);
2818   }
2819
2820   /* gcast phase 2: echo */
2821   prev_step = step;
2822   round += 1;
2823   step = create_step (session, round, GNUNET_YES);
2824 #ifdef GNUNET_EXTRA_LOGGING
2825   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2826 #endif
2827   step_depend_on (step, prev_step);
2828
2829   for (k = 0; k < n; k++)
2830   {
2831     p1 = k;
2832     p2 = me;
2833     arrange_peers (&p1, &p2, n);
2834     task = ((struct TaskEntry) {
2835       .step = step,
2836       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2837       .start = task_start_reconcile,
2838       .cancel = task_cancel_reconcile,
2839     });
2840     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2841     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2842     put_task (session->taskmap, &task);
2843   }
2844
2845   prev_step = step;
2846   /* Same round, since step only has local tasks */
2847   step = create_step (session, round, GNUNET_YES);
2848 #ifdef GNUNET_EXTRA_LOGGING
2849   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2850 #endif
2851   step_depend_on (step, prev_step);
2852
2853   arrange_peers (&p1, &p2, n);
2854   task = ((struct TaskEntry) {
2855     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2856     .step = step,
2857     .start = task_start_eval_echo
2858   });
2859   put_task (session->taskmap, &task);
2860
2861   prev_step = step;
2862   round += 1;
2863   step = create_step (session, round, GNUNET_YES);
2864 #ifdef GNUNET_EXTRA_LOGGING
2865   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2866 #endif
2867   step_depend_on (step, prev_step);
2868
2869   /* gcast phase 3: confirmation and grading */
2870   for (k = 0; k < n; k++)
2871   {
2872     p1 = k;
2873     p2 = me;
2874     arrange_peers (&p1, &p2, n);
2875     task = ((struct TaskEntry) {
2876       .step = step,
2877       .start = task_start_reconcile,
2878       .cancel = task_cancel_reconcile,
2879       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2880     });
2881     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2882     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2883     /* If there was at least one element in the echo round that was
2884        contested (i.e. it had no n-t majority), then we let the other peers
2885        know, and other peers let us know.  The contested flag for each peer is
2886        stored in the rfn. */
2887     task.cls.setop.transceive_contested = GNUNET_YES;
2888     put_task (session->taskmap, &task);
2889   }
2890
2891   prev_step = step;
2892   /* Same round, since step only has local tasks */
2893   step = create_step (session, round, GNUNET_YES);
2894 #ifdef GNUNET_EXTRA_LOGGING
2895   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2896 #endif
2897   step_depend_on (step, prev_step);
2898
2899   task = ((struct TaskEntry) {
2900     .step = step,
2901     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2902     .start = task_start_grade,
2903   });
2904   put_task (session->taskmap, &task);
2905
2906   step_depend_on (step_after, step);
2907 }
2908
2909
2910 static void
2911 construct_task_graph (struct ConsensusSession *session)
2912 {
2913   uint16_t n = session->num_peers;
2914   uint16_t t = n / 3;
2915
2916   uint16_t me = session->local_peer_idx;
2917
2918   /* The task we're currently setting up. */
2919   struct TaskEntry task;
2920
2921   /* Current leader */
2922   unsigned int lead;
2923
2924   struct Step *step;
2925   struct Step *prev_step;
2926
2927   unsigned int round = 0;
2928
2929   unsigned int i;
2930
2931   // XXX: introduce first step,
2932   // where we wait for all insert acks
2933   // from the set service
2934
2935   /* faster but brittle all-to-all */
2936
2937   // XXX: Not implemented yet
2938
2939   /* all-to-all step */
2940
2941   step = create_step (session, round, GNUNET_NO);
2942
2943 #ifdef GNUNET_EXTRA_LOGGING
2944   step->debug_name = GNUNET_strdup ("all to all");
2945 #endif
2946
2947   for (i = 0; i < n; i++)
2948   {
2949     uint16_t p1;
2950     uint16_t p2;
2951
2952     p1 = me;
2953     p2 = i;
2954     arrange_peers (&p1, &p2, n);
2955     task = ((struct TaskEntry) {
2956       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2957       .step = step,
2958       .start = task_start_reconcile,
2959       .cancel = task_cancel_reconcile,
2960     });
2961     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2962     task.cls.setop.output_set = task.cls.setop.input_set;
2963     task.cls.setop.do_not_remove = GNUNET_YES;
2964     put_task (session->taskmap, &task);
2965   }
2966
2967   round += 1;
2968   prev_step = step;
2969   step = create_step (session, round, GNUNET_NO);;
2970 #ifdef GNUNET_EXTRA_LOGGING
2971   step->debug_name = GNUNET_strdup ("all to all 2");
2972 #endif
2973   step_depend_on (step, prev_step);
2974
2975
2976   for (i = 0; i < n; i++)
2977   {
2978     uint16_t p1;
2979     uint16_t p2;
2980
2981     p1 = me;
2982     p2 = i;
2983     arrange_peers (&p1, &p2, n);
2984     task = ((struct TaskEntry) {
2985       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
2986       .step = step,
2987       .start = task_start_reconcile,
2988       .cancel = task_cancel_reconcile,
2989     });
2990     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2991     task.cls.setop.output_set = task.cls.setop.input_set;
2992     task.cls.setop.do_not_remove = GNUNET_YES;
2993     put_task (session->taskmap, &task);
2994   }
2995
2996   round += 1;
2997
2998   prev_step = step;
2999   step = NULL;
3000
3001
3002
3003   /* Byzantine union */
3004
3005   /* sequential repetitions of the gradecasts */
3006   for (i = 0; i < t + 1; i++)
3007   {
3008     struct Step *step_rep_start;
3009     struct Step *step_rep_end;
3010
3011     /* Every repetition is in a separate round. */
3012     step_rep_start = create_step (session, round, GNUNET_YES);
3013 #ifdef GNUNET_EXTRA_LOGGING
3014     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3015 #endif
3016
3017     step_depend_on (step_rep_start, prev_step);
3018
3019     /* gradecast has three rounds */
3020     round += 3;
3021     step_rep_end = create_step (session, round, GNUNET_YES);
3022 #ifdef GNUNET_EXTRA_LOGGING
3023     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3024 #endif
3025
3026     /* parallel gradecasts */
3027     for (lead = 0; lead < n; lead++)
3028       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3029
3030     task = ((struct TaskEntry) {
3031       .step = step_rep_end,
3032       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3033       .start = task_start_apply_round,
3034     });
3035     put_task (session->taskmap, &task);
3036
3037     prev_step = step_rep_end;
3038   }
3039
3040  /* There is no next gradecast round, thus the final
3041     start step is the overall end step of the gradecasts */
3042   round += 1;
3043   step = create_step (session, round, GNUNET_NO);
3044 #ifdef GNUNET_EXTRA_LOGGING
3045   GNUNET_asprintf (&step->debug_name, "finish");
3046 #endif
3047   step_depend_on (step, prev_step);
3048
3049   task = ((struct TaskEntry) {
3050     .step = step,
3051     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3052     .start = task_start_finish,
3053   });
3054   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3055
3056   put_task (session->taskmap, &task);
3057 }
3058
3059
3060
3061 /**
3062  * Check join message.
3063  *
3064  * @param cls session of client that sent the message
3065  * @param m message sent by the client
3066  * @return #GNUNET_OK if @a m is well-formed
3067  */
3068 static int
3069 check_client_join (void *cls,
3070                    const struct GNUNET_CONSENSUS_JoinMessage *m)
3071 {
3072   uint32_t listed_peers = ntohl (m->num_peers);
3073
3074   if ( (ntohs (m->header.size) - sizeof (*m)) !=
3075        listed_peers * sizeof (struct GNUNET_PeerIdentity))
3076   {
3077     GNUNET_break (0);
3078     return GNUNET_SYSERR;
3079   }
3080   return GNUNET_OK;
3081 }
3082
3083
3084 /**
3085  * Called when a client wants to join a consensus session.
3086  *
3087  * @param cls session of client that sent the message
3088  * @param m message sent by the client
3089  */
3090 static void
3091 handle_client_join (void *cls,
3092                     const struct GNUNET_CONSENSUS_JoinMessage *m)
3093 {
3094   struct ConsensusSession *session = cls;
3095   struct ConsensusSession *other_session;
3096
3097   initialize_session_peer_list (session,
3098                                 m);
3099   compute_global_id (session,
3100                      &m->session_id);
3101
3102   /* Check if some local client already owns the session.
3103      It is only legal to have a session with an existing global id
3104      if all other sessions with this global id are finished.*/
3105   for (other_session = sessions_head;
3106        NULL != other_session;
3107        other_session = other_session->next)
3108   {
3109     if ( (other_session != session) &&
3110          (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3111                                        &other_session->global_id)) )
3112       break;
3113   }
3114
3115   session->conclude_deadline
3116     = GNUNET_TIME_absolute_ntoh (m->deadline);
3117   session->conclude_start
3118     = GNUNET_TIME_absolute_ntoh (m->start);
3119   session->local_peer_idx = get_peer_idx (&my_peer,
3120                                           session);
3121   GNUNET_assert (-1 != session->local_peer_idx);
3122
3123   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3124               "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3125               GNUNET_h2s (&m->session_id),
3126               session->num_peers,
3127               session->local_peer_idx,
3128               GNUNET_STRINGS_relative_time_to_string
3129               (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3130                                                     session->conclude_deadline),
3131                GNUNET_YES));
3132
3133   session->set_listener
3134     = GNUNET_SET_listen (cfg,
3135                          GNUNET_SET_OPERATION_UNION,
3136                          &session->global_id,
3137                          &set_listen_cb,
3138                          session);
3139
3140   session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3141                                                           GNUNET_NO);
3142   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3143                                                            GNUNET_NO);
3144   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3145                                                            GNUNET_NO);
3146   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3147                                                           GNUNET_NO);
3148
3149   {
3150     struct SetEntry *client_set;
3151
3152     client_set = GNUNET_new (struct SetEntry);
3153     client_set->h = GNUNET_SET_create (cfg,
3154                                        GNUNET_SET_OPERATION_UNION);
3155     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3156     put_set (session,
3157              client_set);
3158   }
3159
3160   session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3161                                                  int);
3162
3163   /* Just construct the task graph,
3164      but don't run anything until the client calls conclude. */
3165   construct_task_graph (session);
3166   GNUNET_SERVICE_client_continue (session->client);
3167 }
3168
3169
3170 static void
3171 client_insert_done (void *cls)
3172 {
3173   // FIXME: implement
3174 }
3175
3176
3177 /**
3178  * Called when a client performs an insert operation.
3179  *
3180  * @param cls client handle
3181  * @param msg message sent by the client
3182  * @return #GNUNET_OK (always well-formed)
3183  */
3184 static int
3185 check_client_insert (void *cls,
3186                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3187 {
3188   return GNUNET_OK;
3189 }
3190
3191
3192 /**
3193  * Called when a client performs an insert operation.
3194  *
3195  * @param cls client handle
3196  * @param msg message sent by the client
3197  */
3198 static void
3199 handle_client_insert (void *cls,
3200                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3201 {
3202   struct ConsensusSession *session = cls;
3203   ssize_t element_size;
3204   struct GNUNET_SET_Handle *initial_set;
3205   struct ConsensusElement *ce;
3206
3207   if (GNUNET_YES == session->conclude_started)
3208   {
3209     GNUNET_break (0);
3210     GNUNET_SERVICE_client_drop (session->client);
3211     return;
3212   }
3213
3214   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3215   ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3216   GNUNET_memcpy (&ce[1], &msg[1], element_size);
3217   ce->payload_type = msg->element_type;
3218
3219   struct GNUNET_SET_Element element = {
3220     .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3221     .size = sizeof (struct ConsensusElement) + element_size,
3222     .data = ce,
3223   };
3224
3225   {
3226     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3227     struct SetEntry *entry;
3228
3229     entry = lookup_set (session,
3230                         &key);
3231     GNUNET_assert (NULL != entry);
3232     initial_set = entry->h;
3233   }
3234
3235   session->num_client_insert_pending++;
3236   GNUNET_SET_add_element (initial_set,
3237                           &element,
3238                           &client_insert_done,
3239                           session);
3240
3241 #ifdef GNUNET_EXTRA_LOGGING
3242   {
3243     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3244                 "P%u: element %s added\n",
3245                 session->local_peer_idx,
3246                 debug_str_element (&element));
3247   }
3248 #endif
3249   GNUNET_free (ce);
3250   GNUNET_SERVICE_client_continue (session->client);
3251 }
3252
3253
3254 /**
3255  * Called when a client performs the conclude operation.
3256  *
3257  * @param cls client handle
3258  * @param message message sent by the client
3259  */
3260 static void
3261 handle_client_conclude (void *cls,
3262                         const struct GNUNET_MessageHeader *message)
3263 {
3264   struct ConsensusSession *session = cls;
3265
3266   if (GNUNET_YES == session->conclude_started)
3267   {
3268     /* conclude started twice */
3269     GNUNET_break (0);
3270     GNUNET_SERVICE_client_drop (session->client);
3271     return;
3272   }
3273   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3274               "conclude requested\n");
3275   session->conclude_started = GNUNET_YES;
3276   install_step_timeouts (session);
3277   run_ready_steps (session);
3278   GNUNET_SERVICE_client_continue (session->client);
3279 }
3280
3281
3282 /**
3283  * Called to clean up, after a shutdown has been requested.
3284  *
3285  * @param cls closure
3286  */
3287 static void
3288 shutdown_task (void *cls)
3289 {
3290   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3291               "shutting down\n");
3292   GNUNET_STATISTICS_destroy (statistics,
3293                              GNUNET_NO);
3294   statistics = NULL;
3295 }
3296
3297
3298 /**
3299  * Start processing consensus requests.
3300  *
3301  * @param cls closure
3302  * @param c configuration to use
3303  * @param service the initialized service
3304  */
3305 static void
3306 run (void *cls,
3307      const struct GNUNET_CONFIGURATION_Handle *c,
3308      struct GNUNET_SERVICE_Handle *service)
3309 {
3310   cfg = c;
3311   if (GNUNET_OK !=
3312       GNUNET_CRYPTO_get_peer_identity (cfg,
3313                                        &my_peer))
3314   {
3315     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3316                 "Could not retrieve host identity\n");
3317     GNUNET_SCHEDULER_shutdown ();
3318     return;
3319   }
3320   statistics = GNUNET_STATISTICS_create ("consensus",
3321                                          cfg);
3322   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3323                                  NULL);
3324 }
3325
3326
3327 /**
3328  * Callback called when a client connects to the service.
3329  *
3330  * @param cls closure for the service
3331  * @param c the new client that connected to the service
3332  * @param mq the message queue used to send messages to the client
3333  * @return @a c
3334  */
3335 static void *
3336 client_connect_cb (void *cls,
3337                    struct GNUNET_SERVICE_Client *c,
3338                    struct GNUNET_MQ_Handle *mq)
3339 {
3340   struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3341
3342   session->client = c;
3343   session->client_mq = mq;
3344   GNUNET_CONTAINER_DLL_insert (sessions_head,
3345                                sessions_tail,
3346                                session);
3347   return session;
3348 }
3349
3350
3351 /**
3352  * Callback called when a client disconnected from the service
3353  *
3354  * @param cls closure for the service
3355  * @param c the client that disconnected
3356  * @param internal_cls should be equal to @a c
3357  */
3358 static void
3359 client_disconnect_cb (void *cls,
3360                       struct GNUNET_SERVICE_Client *c,
3361                       void *internal_cls)
3362 {
3363   struct ConsensusSession *session = internal_cls;
3364
3365   if (NULL != session->set_listener)
3366   {
3367     GNUNET_SET_listen_cancel (session->set_listener);
3368     session->set_listener = NULL;
3369   }
3370   GNUNET_CONTAINER_DLL_remove (sessions_head,
3371                                sessions_tail,
3372                                session);
3373   GNUNET_free (session);
3374 }
3375
3376
3377 /**
3378  * Define "main" method using service macro.
3379  */
3380 GNUNET_SERVICE_MAIN
3381 ("consensus",
3382  GNUNET_SERVICE_OPTION_NONE,
3383  &run,
3384  &client_connect_cb,
3385  &client_disconnect_cb,
3386  NULL,
3387  GNUNET_MQ_hd_fixed_size (client_conclude,
3388                           GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3389                           struct GNUNET_MessageHeader,
3390                           NULL),
3391  GNUNET_MQ_hd_var_size (client_insert,
3392                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3393                         struct GNUNET_CONSENSUS_ElementMessage,
3394                         NULL),
3395  GNUNET_MQ_hd_var_size (client_join,
3396                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3397                         struct GNUNET_CONSENSUS_JoinMessage,
3398                         NULL),
3399  GNUNET_MQ_handler_end ());
3400
3401 /* end of gnunet-service-consensus.c */