glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14 */
15
16 /**
17  * @file consensus/gnunet-service-consensus.c
18  * @brief multi-peer set reconciliation
19  * @author Florian Dold <flo@dold.me>
20  */
21
22 #include "platform.h"
23 #include "gnunet_util_lib.h"
24 #include "gnunet_block_lib.h"
25 #include "gnunet_protocols.h"
26 #include "gnunet_applications.h"
27 #include "gnunet_set_service.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet_consensus_service.h"
30 #include "consensus_protocol.h"
31 #include "consensus.h"
32
33
34 enum ReferendumVote
35 {
36   /**
37    * Vote that nothing should change.
38    * This option is never voted explicitly.
39    */
40   VOTE_STAY = 0,
41   /**
42    * Vote that an element should be added.
43    */
44   VOTE_ADD = 1,
45   /**
46    * Vote that an element should be removed.
47    */
48   VOTE_REMOVE = 2,
49 };
50
51
52 enum EarlyStoppingPhase
53 {
54   EARLY_STOPPING_NONE = 0,
55   EARLY_STOPPING_ONE_MORE = 1,
56   EARLY_STOPPING_DONE = 2,
57 };
58
59
60 GNUNET_NETWORK_STRUCT_BEGIN
61
62 /**
63  * Tuple of integers that together
64  * identify a task uniquely.
65  */
66 struct TaskKey {
67   /**
68    * A value from 'enum PhaseKind'.
69    */
70   uint16_t kind GNUNET_PACKED;
71
72   /**
73    * Number of the first peer
74    * in canonical order.
75    */
76   int16_t peer1 GNUNET_PACKED;
77
78   /**
79    * Number of the second peer in canonical order.
80    */
81   int16_t peer2 GNUNET_PACKED;
82
83   /**
84    * Repetition of the gradecast phase.
85    */
86   int16_t repetition GNUNET_PACKED;
87
88   /**
89    * Leader in the gradecast phase.
90    *
91    * Can be different from both peer1 and peer2.
92    */
93   int16_t leader GNUNET_PACKED;
94 };
95
96
97
98 struct SetKey
99 {
100   int set_kind GNUNET_PACKED;
101   int k1 GNUNET_PACKED;
102   int k2 GNUNET_PACKED;
103 };
104
105
106 struct SetEntry
107 {
108   struct SetKey key;
109   struct GNUNET_SET_Handle *h;
110   /**
111    * GNUNET_YES if the set resulted
112    * from applying a referendum with contested
113    * elements.
114    */
115   int is_contested;
116 };
117
118
119 struct DiffKey
120 {
121   int diff_kind GNUNET_PACKED;
122   int k1 GNUNET_PACKED;
123   int k2 GNUNET_PACKED;
124 };
125
126 struct RfnKey
127 {
128   int rfn_kind GNUNET_PACKED;
129   int k1 GNUNET_PACKED;
130   int k2 GNUNET_PACKED;
131 };
132
133
134 GNUNET_NETWORK_STRUCT_END
135
136 enum PhaseKind
137 {
138   PHASE_KIND_ALL_TO_ALL,
139   PHASE_KIND_ALL_TO_ALL_2,
140   PHASE_KIND_GRADECAST_LEADER,
141   PHASE_KIND_GRADECAST_ECHO,
142   PHASE_KIND_GRADECAST_ECHO_GRADE,
143   PHASE_KIND_GRADECAST_CONFIRM,
144   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
145   /**
146    * Apply a repetition of the all-to-all
147    * gradecast to the current set.
148    */
149   PHASE_KIND_APPLY_REP,
150   PHASE_KIND_FINISH,
151 };
152
153
154 enum SetKind
155 {
156   SET_KIND_NONE = 0,
157   SET_KIND_CURRENT,
158   /**
159    * Last result set from a gradecast
160    */
161   SET_KIND_LAST_GRADECAST,
162   SET_KIND_LEADER_PROPOSAL,
163   SET_KIND_ECHO_RESULT,
164 };
165
166 enum DiffKind
167 {
168   DIFF_KIND_NONE = 0,
169   DIFF_KIND_LEADER_PROPOSAL,
170   DIFF_KIND_LEADER_CONSENSUS,
171   DIFF_KIND_GRADECAST_RESULT,
172 };
173
174 enum RfnKind
175 {
176   RFN_KIND_NONE = 0,
177   RFN_KIND_ECHO,
178   RFN_KIND_CONFIRM,
179   RFN_KIND_GRADECAST_RESULT
180 };
181
182
183 struct SetOpCls
184 {
185   struct SetKey input_set;
186
187   struct SetKey output_set;
188   struct RfnKey output_rfn;
189   struct DiffKey output_diff;
190
191   int do_not_remove;
192
193   int transceive_contested;
194
195   struct GNUNET_SET_OperationHandle *op;
196 };
197
198
199 struct FinishCls
200 {
201   struct SetKey input_set;
202 };
203
204 /**
205  * Closure for both @a start_task
206  * and @a cancel_task.
207  */
208 union TaskFuncCls
209 {
210   struct SetOpCls setop;
211   struct FinishCls finish;
212 };
213
214 struct TaskEntry;
215
216 typedef void (*TaskFunc) (struct TaskEntry *task);
217
218 /*
219  * Node in the consensus task graph.
220  */
221 struct TaskEntry
222 {
223   struct TaskKey key;
224
225   struct Step *step;
226
227   int is_started;
228
229   int is_finished;
230
231   TaskFunc start;
232   TaskFunc cancel;
233
234   union TaskFuncCls cls;
235 };
236
237
238 struct Step
239 {
240   /**
241    * All steps of one session are in a
242    * linked list for easier deallocation.
243    */
244   struct Step *prev;
245
246   /**
247    * All steps of one session are in a
248    * linked list for easier deallocation.
249    */
250   struct Step *next;
251
252   struct ConsensusSession *session;
253
254   /**
255    * Tasks that this step is composed of.
256    */
257   struct TaskEntry **tasks;
258   unsigned int tasks_len;
259   unsigned int tasks_cap;
260
261   unsigned int finished_tasks;
262
263   /*
264    * Tasks that have this task as dependency.
265    *
266    * We store pointers to subordinates rather
267    * than to prerequisites since it makes
268    * tracking the readiness of a task easier.
269    */
270   struct Step **subordinates;
271   unsigned int subordinates_len;
272   unsigned int subordinates_cap;
273
274   /**
275    * Counter for the prerequisites of
276    * this step.
277    */
278   size_t pending_prereq;
279
280   /*
281    * Task that will run this step despite
282    * any pending prerequisites.
283    */
284   struct GNUNET_SCHEDULER_Task *timeout_task;
285
286   unsigned int is_running;
287
288   unsigned int is_finished;
289
290   /*
291    * Synchrony round of the task.
292    * Determines the deadline for the task.
293    */
294   unsigned int round;
295
296   /**
297    * Human-readable name for
298    * the task, used for debugging.
299    */
300   char *debug_name;
301
302   /**
303    * When we're doing an early finish, how should this step be
304    * treated?
305    * If GNUNET_YES, the step will be marked as finished
306    * without actually running its tasks.
307    * Otherwise, the step will still be run even after
308    * an early finish.
309    *
310    * Note that a task may never be finished early if
311    * it is already running.
312    */
313   int early_finishable;
314 };
315
316
317 struct RfnElementInfo
318 {
319   const struct GNUNET_SET_Element *element;
320
321   /*
322    * GNUNET_YES if the peer votes for the proposal.
323    */
324   int *votes;
325
326   /**
327    * Proposal for this element,
328    * can only be VOTE_ADD or VOTE_REMOVE.
329    */
330   enum ReferendumVote proposal;
331 };
332
333
334 struct ReferendumEntry
335 {
336   struct RfnKey key;
337
338   /*
339    * Elements where there is at least one proposed change.
340    *
341    * Maps the hash of the GNUNET_SET_Element
342    * to 'struct RfnElementInfo'.
343    */
344   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
345
346   unsigned int num_peers;
347
348   /**
349    * Stores, for every peer in the session,
350    * whether the peer finished the whole referendum.
351    *
352    * Votes from peers are only counted if they're
353    * marked as commited (#GNUNET_YES) in the referendum.
354    *
355    * Otherwise (#GNUNET_NO), the requested changes are
356    * not counted for majority votes or thresholds.
357    */
358   int *peer_commited;
359
360
361   /**
362    * Contestation state of the peer.  If a peer is contested, the values it
363    * contributed are still counted for applying changes, but the grading is
364    * affected.
365    */
366   int *peer_contested;
367 };
368
369
370 struct DiffElementInfo
371 {
372   const struct GNUNET_SET_Element *element;
373
374   /**
375    * Positive weight for 'add', negative
376    * weights for 'remove'.
377    */
378   int weight;
379 };
380
381
382 /**
383  * Weighted diff.
384  */
385 struct DiffEntry
386 {
387   struct DiffKey key;
388   struct GNUNET_CONTAINER_MultiHashMap *changes;
389 };
390
391 struct SetHandle
392 {
393   struct SetHandle *prev;
394   struct SetHandle *next;
395
396   struct GNUNET_SET_Handle *h;
397 };
398
399
400
401 /**
402  * A consensus session consists of one local client and the remote authorities.
403  */
404 struct ConsensusSession
405 {
406   /**
407    * Consensus sessions are kept in a DLL.
408    */
409   struct ConsensusSession *next;
410
411   /**
412    * Consensus sessions are kept in a DLL.
413    */
414   struct ConsensusSession *prev;
415
416   unsigned int num_client_insert_pending;
417
418   struct GNUNET_CONTAINER_MultiHashMap *setmap;
419   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
420   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
421
422   /**
423    * Array of peers with length 'num_peers'.
424    */
425   int *peers_blacklisted;
426
427   /*
428    * Mapping from (hashed) TaskKey to TaskEntry.
429    *
430    * We map the application_id for a round to the task that should be
431    * executed, so we don't have to go through all task whenever we get
432    * an incoming set op request.
433    */
434   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
435
436   struct Step *steps_head;
437   struct Step *steps_tail;
438
439   int conclude_started;
440
441   int conclude_done;
442
443   /**
444   * Global consensus identification, computed
445   * from the session id and participating authorities.
446   */
447   struct GNUNET_HashCode global_id;
448
449   /**
450    * Client that inhabits the session
451    */
452   struct GNUNET_SERVICE_Client *client;
453
454   /**
455    * Queued messages to the client.
456    */
457   struct GNUNET_MQ_Handle *client_mq;
458
459   /**
460    * Time when the conclusion of the consensus should begin.
461    */
462   struct GNUNET_TIME_Absolute conclude_start;
463
464   /**
465    * Timeout for all rounds together, single rounds will schedule a timeout task
466    * with a fraction of the conclude timeout.
467    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
468    */
469   struct GNUNET_TIME_Absolute conclude_deadline;
470
471   struct GNUNET_PeerIdentity *peers;
472
473   /**
474    * Number of other peers in the consensus.
475    */
476   unsigned int num_peers;
477
478   /**
479    * Index of the local peer in the peers array
480    */
481   unsigned int local_peer_idx;
482
483   /**
484    * Listener for requests from other peers.
485    * Uses the session's global id as app id.
486    */
487   struct GNUNET_SET_ListenHandle *set_listener;
488
489   /**
490    * State of our early stopping scheme.
491    */
492   int early_stopping;
493
494   /**
495    * Our set size from the first round.
496    */
497   uint64_t first_size;
498
499   uint64_t *first_sizes_received;
500
501   /**
502    * Bounded Eppstein lower bound.
503    */
504   uint64_t lower_bound;
505
506   struct SetHandle *set_handles_head;
507   struct SetHandle *set_handles_tail;
508 };
509
510 /**
511  * Linked list of sessions this peer participates in.
512  */
513 static struct ConsensusSession *sessions_head;
514
515 /**
516  * Linked list of sessions this peer participates in.
517  */
518 static struct ConsensusSession *sessions_tail;
519
520 /**
521  * Configuration of the consensus service.
522  */
523 static const struct GNUNET_CONFIGURATION_Handle *cfg;
524
525 /**
526  * Peer that runs this service.
527  */
528 static struct GNUNET_PeerIdentity my_peer;
529
530 /**
531  * Statistics handle.
532  */
533 struct GNUNET_STATISTICS_Handle *statistics;
534
535
536 static void
537 finish_task (struct TaskEntry *task);
538
539
540 static void
541 run_ready_steps (struct ConsensusSession *session);
542
543
544 static const char *
545 phasename (uint16_t phase)
546 {
547   switch (phase)
548   {
549     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
550     case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
551     case PHASE_KIND_FINISH: return "FINISH";
552     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
553     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
554     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
555     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
556     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
557     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
558     default: return "(unknown)";
559   }
560 }
561
562
563 static const char *
564 setname (uint16_t kind)
565 {
566   switch (kind)
567   {
568     case SET_KIND_CURRENT: return "CURRENT";
569     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
570     case SET_KIND_NONE: return "NONE";
571     default: return "(unknown)";
572   }
573 }
574
575 static const char *
576 rfnname (uint16_t kind)
577 {
578   switch (kind)
579   {
580     case RFN_KIND_NONE: return "NONE";
581     case RFN_KIND_ECHO: return "ECHO";
582     case RFN_KIND_CONFIRM: return "CONFIRM";
583     default: return "(unknown)";
584   }
585 }
586
587 static const char *
588 diffname (uint16_t kind)
589 {
590   switch (kind)
591   {
592     case DIFF_KIND_NONE: return "NONE";
593     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
594     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
595     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
596     default: return "(unknown)";
597   }
598 }
599
600 #ifdef GNUNET_EXTRA_LOGGING
601
602
603 static const char *
604 debug_str_element (const struct GNUNET_SET_Element *el)
605 {
606   struct GNUNET_HashCode hash;
607
608   GNUNET_SET_element_hash (el, &hash);
609
610   return GNUNET_h2s (&hash);
611 }
612
613 static const char *
614 debug_str_task_key (struct TaskKey *tk)
615 {
616   static char buf[256];
617
618   snprintf (buf, sizeof (buf),
619             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
620             phasename (tk->kind), tk->peer1, tk->peer2,
621             tk->leader, tk->repetition);
622
623   return buf;
624 }
625
626 static const char *
627 debug_str_diff_key (struct DiffKey *dk)
628 {
629   static char buf[256];
630
631   snprintf (buf, sizeof (buf),
632             "DiffKey kind=%s, k1=%d, k2=%d",
633             diffname (dk->diff_kind), dk->k1, dk->k2);
634
635   return buf;
636 }
637
638 static const char *
639 debug_str_set_key (const struct SetKey *sk)
640 {
641   static char buf[256];
642
643   snprintf (buf, sizeof (buf),
644             "SetKey kind=%s, k1=%d, k2=%d",
645             setname (sk->set_kind), sk->k1, sk->k2);
646
647   return buf;
648 }
649
650
651 static const char *
652 debug_str_rfn_key (const struct RfnKey *rk)
653 {
654   static char buf[256];
655
656   snprintf (buf, sizeof (buf),
657             "RfnKey kind=%s, k1=%d, k2=%d",
658             rfnname (rk->rfn_kind), rk->k1, rk->k2);
659
660   return buf;
661 }
662
663 #endif /* GNUNET_EXTRA_LOGGING */
664
665
666 /**
667  * Send the final result set of the consensus to the client, element by
668  * element.
669  *
670  * @param cls closure
671  * @param element the current element, NULL if all elements have been
672  *        iterated over
673  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
674  */
675 static int
676 send_to_client_iter (void *cls,
677                      const struct GNUNET_SET_Element *element)
678 {
679   struct TaskEntry *task = (struct TaskEntry *) cls;
680   struct ConsensusSession *session = task->step->session;
681   struct GNUNET_MQ_Envelope *ev;
682
683   if (NULL != element)
684   {
685     struct GNUNET_CONSENSUS_ElementMessage *m;
686     const struct ConsensusElement *ce;
687
688     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
689     ce = element->data;
690
691     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n", (unsigned) ce->marker);
692
693     if (0 != ce->marker)
694       return GNUNET_YES;
695
696     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
697                 "P%d: sending element %s to client\n",
698                 session->local_peer_idx,
699                 debug_str_element (element));
700
701     ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
702                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
703     m->element_type = ce->payload_type;
704     GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
705     GNUNET_MQ_send (session->client_mq, ev);
706   }
707   else
708   {
709     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710                 "P%d: finished iterating elements for client\n",
711                 session->local_peer_idx);
712     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
713     GNUNET_MQ_send (session->client_mq, ev);
714   }
715   return GNUNET_YES;
716 }
717
718
719 static struct SetEntry *
720 lookup_set (struct ConsensusSession *session, struct SetKey *key)
721 {
722   struct GNUNET_HashCode hash;
723
724   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
725               "P%u: looking up set {%s}\n",
726               session->local_peer_idx,
727               debug_str_set_key (key));
728
729   GNUNET_assert (SET_KIND_NONE != key->set_kind);
730   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
731   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
732 }
733
734
735 static struct DiffEntry *
736 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
737 {
738   struct GNUNET_HashCode hash;
739
740   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
741               "P%u: looking up diff {%s}\n",
742               session->local_peer_idx,
743               debug_str_diff_key (key));
744
745   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
746   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
747   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
748 }
749
750
751 static struct ReferendumEntry *
752 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
753 {
754   struct GNUNET_HashCode hash;
755
756   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
757               "P%u: looking up rfn {%s}\n",
758               session->local_peer_idx,
759               debug_str_rfn_key (key));
760
761   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
762   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
763   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
764 }
765
766
767 static void
768 diff_insert (struct DiffEntry *diff,
769              int weight,
770              const struct GNUNET_SET_Element *element)
771 {
772   struct DiffElementInfo *di;
773   struct GNUNET_HashCode hash;
774
775   GNUNET_assert ( (1 == weight) || (-1 == weight));
776
777   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778               "diff_insert with element size %u\n",
779               element->size);
780
781   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782               "hashing element\n");
783
784   GNUNET_SET_element_hash (element, &hash);
785
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "hashed element\n");
788
789   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
790
791   if (NULL == di)
792   {
793     di = GNUNET_new (struct DiffElementInfo);
794     di->element = GNUNET_SET_element_dup (element);
795     GNUNET_assert (GNUNET_OK ==
796                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
797                                                       &hash, di,
798                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
799   }
800
801   di->weight = weight;
802 }
803
804
805 static void
806 rfn_commit (struct ReferendumEntry *rfn,
807             uint16_t commit_peer)
808 {
809   GNUNET_assert (commit_peer < rfn->num_peers);
810
811   rfn->peer_commited[commit_peer] = GNUNET_YES;
812 }
813
814
815 static void
816 rfn_contest (struct ReferendumEntry *rfn,
817              uint16_t contested_peer)
818 {
819   GNUNET_assert (contested_peer < rfn->num_peers);
820
821   rfn->peer_contested[contested_peer] = GNUNET_YES;
822 }
823
824
825 static uint16_t
826 rfn_noncontested (struct ReferendumEntry *rfn)
827 {
828   uint16_t i;
829   uint16_t ret;
830
831   ret = 0;
832   for (i = 0; i < rfn->num_peers; i++)
833     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
834       ret++;
835
836   return ret;
837 }
838
839
840 static void
841 rfn_vote (struct ReferendumEntry *rfn,
842           uint16_t voting_peer,
843           enum ReferendumVote vote,
844           const struct GNUNET_SET_Element *element)
845 {
846   struct RfnElementInfo *ri;
847   struct GNUNET_HashCode hash;
848
849   GNUNET_assert (voting_peer < rfn->num_peers);
850
851   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
852      since VOTE_KEEP is implicit in not voting. */
853   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
854
855   GNUNET_SET_element_hash (element, &hash);
856   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
857
858   if (NULL == ri)
859   {
860     ri = GNUNET_new (struct RfnElementInfo);
861     ri->element = GNUNET_SET_element_dup (element);
862     ri->votes = GNUNET_new_array (rfn->num_peers, int);
863     GNUNET_assert (GNUNET_OK ==
864                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
865                                                       &hash, ri,
866                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
867   }
868
869   ri->votes[voting_peer] = GNUNET_YES;
870   ri->proposal = vote;
871 }
872
873
874 static uint16_t
875 task_other_peer (struct TaskEntry *task)
876 {
877   uint16_t me = task->step->session->local_peer_idx;
878   if (task->key.peer1 == me)
879     return task->key.peer2;
880   return task->key.peer1;
881 }
882
883
884 static int
885 cmp_uint64_t (const void *pa, const void *pb)
886 {
887   uint64_t a = *(uint64_t *) pa;
888   uint64_t b = *(uint64_t *) pb;
889
890   if (a == b)
891     return 0;
892   if (a < b)
893     return -1;
894   return 1;
895 }
896
897
898 /**
899  * Callback for set operation results. Called for each element
900  * in the result set.
901  *
902  * @param cls closure
903  * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
904  * @param current_size current set size
905  * @param status see enum GNUNET_SET_Status
906  */
907 static void
908 set_result_cb (void *cls,
909                const struct GNUNET_SET_Element *element,
910                uint64_t current_size,
911                enum GNUNET_SET_Status status)
912 {
913   struct TaskEntry *task = cls;
914   struct ConsensusSession *session = task->step->session;
915   struct SetEntry *output_set = NULL;
916   struct DiffEntry *output_diff = NULL;
917   struct ReferendumEntry *output_rfn = NULL;
918   unsigned int other_idx;
919   struct SetOpCls *setop;
920   const struct ConsensusElement *consensus_element = NULL;
921
922   if (NULL != element)
923   {
924     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
925                 "P%u: got element of type %u, status %u\n",
926                 session->local_peer_idx,
927                 (unsigned) element->element_type,
928                 (unsigned) status);
929     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
930     consensus_element = element->data;
931   }
932
933   setop = &task->cls.setop;
934
935
936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
937               "P%u: got set result for {%s}, status %u\n",
938               session->local_peer_idx,
939               debug_str_task_key (&task->key),
940               status);
941
942   if (GNUNET_NO == task->is_started)
943   {
944     GNUNET_break_op (0);
945     return;
946   }
947
948   if (GNUNET_YES == task->is_finished)
949   {
950     GNUNET_break_op (0);
951     return;
952   }
953
954   other_idx = task_other_peer (task);
955
956   if (SET_KIND_NONE != setop->output_set.set_kind)
957   {
958     output_set = lookup_set (session, &setop->output_set);
959     GNUNET_assert (NULL != output_set);
960   }
961
962   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
963   {
964     output_diff = lookup_diff (session, &setop->output_diff);
965     GNUNET_assert (NULL != output_diff);
966   }
967
968   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
969   {
970     output_rfn = lookup_rfn (session, &setop->output_rfn);
971     GNUNET_assert (NULL != output_rfn);
972   }
973
974   if (GNUNET_YES == session->peers_blacklisted[other_idx])
975   {
976     /* Peer might have been blacklisted
977        by a gradecast running in parallel, ignore elements from now */
978     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
979       return;
980     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
981       return;
982   }
983
984   if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
985   {
986     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987                 "P%u: got some marker\n",
988                   session->local_peer_idx);
989     if ( (GNUNET_YES == setop->transceive_contested) &&
990          (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
991     {
992       GNUNET_assert (NULL != output_rfn);
993       rfn_contest (output_rfn, task_other_peer (task));
994       return;
995     }
996
997     if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
998     {
999
1000       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1001                   "P%u: got size marker\n",
1002                   session->local_peer_idx);
1003
1004
1005       struct ConsensusSizeElement *cse = (void *) consensus_element;
1006
1007       if (cse->sender_index == other_idx)
1008       {
1009         if (NULL == session->first_sizes_received)
1010           session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1011         session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1012
1013         uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1014         qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1015         session->lower_bound = copy[session->num_peers / 3 + 1];
1016         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017                     "P%u: lower bound %llu\n",
1018                     session->local_peer_idx,
1019                     (long long) session->lower_bound);
1020         GNUNET_free (copy);
1021       }
1022       return;
1023     }
1024
1025     return;
1026   }
1027
1028   switch (status)
1029   {
1030     case GNUNET_SET_STATUS_ADD_LOCAL:
1031       GNUNET_assert (NULL != consensus_element);
1032       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033                   "Adding element in Task {%s}\n",
1034                   debug_str_task_key (&task->key));
1035       if (NULL != output_set)
1036       {
1037         // FIXME: record pending adds, use callback
1038         GNUNET_SET_add_element (output_set->h,
1039                                 element,
1040                                 NULL,
1041                                 NULL);
1042 #ifdef GNUNET_EXTRA_LOGGING
1043         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044                     "P%u: adding element %s into set {%s} of task {%s}\n",
1045                     session->local_peer_idx,
1046                     debug_str_element (element),
1047                     debug_str_set_key (&setop->output_set),
1048                     debug_str_task_key (&task->key));
1049 #endif
1050       }
1051       if (NULL != output_diff)
1052       {
1053         diff_insert (output_diff, 1, element);
1054 #ifdef GNUNET_EXTRA_LOGGING
1055         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056                     "P%u: adding element %s into diff {%s} of task {%s}\n",
1057                     session->local_peer_idx,
1058                     debug_str_element (element),
1059                     debug_str_diff_key (&setop->output_diff),
1060                     debug_str_task_key (&task->key));
1061 #endif
1062       }
1063       if (NULL != output_rfn)
1064       {
1065         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1066 #ifdef GNUNET_EXTRA_LOGGING
1067         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1068                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
1069                     session->local_peer_idx,
1070                     debug_str_element (element),
1071                     debug_str_rfn_key (&setop->output_rfn),
1072                     debug_str_task_key (&task->key));
1073 #endif
1074       }
1075       // XXX: add result to structures in task
1076       break;
1077     case GNUNET_SET_STATUS_ADD_REMOTE:
1078       GNUNET_assert (NULL != consensus_element);
1079       if (GNUNET_YES == setop->do_not_remove)
1080         break;
1081       if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1082         break;
1083       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1084                   "Removing element in Task {%s}\n",
1085                   debug_str_task_key (&task->key));
1086       if (NULL != output_set)
1087       {
1088         // FIXME: record pending adds, use callback
1089         GNUNET_SET_remove_element (output_set->h,
1090                                    element,
1091                                    NULL,
1092                                    NULL);
1093 #ifdef GNUNET_EXTRA_LOGGING
1094         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095                     "P%u: removing element %s from set {%s} of task {%s}\n",
1096                     session->local_peer_idx,
1097                     debug_str_element (element),
1098                     debug_str_set_key (&setop->output_set),
1099                     debug_str_task_key (&task->key));
1100 #endif
1101       }
1102       if (NULL != output_diff)
1103       {
1104         diff_insert (output_diff, -1, element);
1105 #ifdef GNUNET_EXTRA_LOGGING
1106         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1108                     session->local_peer_idx,
1109                     debug_str_element (element),
1110                     debug_str_diff_key (&setop->output_diff),
1111                     debug_str_task_key (&task->key));
1112 #endif
1113       }
1114       if (NULL != output_rfn)
1115       {
1116         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1117 #ifdef GNUNET_EXTRA_LOGGING
1118         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1119                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1120                     session->local_peer_idx,
1121                     debug_str_element (element),
1122                     debug_str_rfn_key (&setop->output_rfn),
1123                     debug_str_task_key (&task->key));
1124 #endif
1125       }
1126       break;
1127     case GNUNET_SET_STATUS_DONE:
1128       // XXX: check first if any changes to the underlying
1129       // set are still pending
1130       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1131                   "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1132                   session->local_peer_idx,
1133                   debug_str_task_key (&task->key),
1134                   (unsigned int) task->step->finished_tasks,
1135                   (unsigned int) task->step->tasks_len);
1136       if (NULL != output_rfn)
1137       {
1138         rfn_commit (output_rfn, task_other_peer (task));
1139       }
1140       if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1141       {
1142         session->first_size = current_size;
1143       }
1144       finish_task (task);
1145       break;
1146     case GNUNET_SET_STATUS_FAILURE:
1147       // XXX: cleanup
1148       GNUNET_break_op (0);
1149       finish_task (task);
1150       return;
1151     default:
1152       /* not reached */
1153       GNUNET_assert (0);
1154   }
1155 }
1156
1157 #ifdef EVIL
1158
1159 enum EvilnessType
1160 {
1161   EVILNESS_NONE,
1162   EVILNESS_CRAM_ALL,
1163   EVILNESS_CRAM_LEAD,
1164   EVILNESS_CRAM_ECHO,
1165   EVILNESS_SLACK,
1166   EVILNESS_SLACK_A2A,
1167 };
1168
1169 enum EvilnessSubType
1170 {
1171   EVILNESS_SUB_NONE,
1172   EVILNESS_SUB_REPLACEMENT,
1173   EVILNESS_SUB_NO_REPLACEMENT,
1174 };
1175
1176 struct Evilness
1177 {
1178   enum EvilnessType type;
1179   enum EvilnessSubType subtype;
1180   unsigned int num;
1181 };
1182
1183
1184 static int
1185 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1186 {
1187   if (0 == strcmp ("replace", evil_subtype_str))
1188   {
1189     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1190   }
1191   else if (0 == strcmp ("noreplace", evil_subtype_str))
1192   {
1193     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1194   }
1195   else
1196   {
1197     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1198                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1199                 evil_subtype_str);
1200     return GNUNET_SYSERR;
1201   }
1202   return GNUNET_OK;
1203 }
1204
1205
1206 static void
1207 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1208 {
1209   char *evil_spec;
1210   char *field;
1211   char *evil_type_str = NULL;
1212   char *evil_subtype_str = NULL;
1213
1214   GNUNET_assert (NULL != evil);
1215
1216   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1217   {
1218     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1219                 "P%u: no evilness\n",
1220                 session->local_peer_idx);
1221     evil->type = EVILNESS_NONE;
1222     return;
1223   }
1224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1225               "P%u: got evilness spec\n",
1226               session->local_peer_idx);
1227
1228   for (field = strtok (evil_spec, "/");
1229        NULL != field;
1230        field = strtok (NULL, "/"))
1231   {
1232     unsigned int peer_num;
1233     unsigned int evil_num;
1234     int ret;
1235
1236     evil_type_str = NULL;
1237     evil_subtype_str = NULL;
1238
1239     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1240
1241     if (ret != 4)
1242     {
1243       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1244                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1245                   field,
1246                   ret);
1247       goto not_evil;
1248     }
1249
1250     GNUNET_assert (NULL != evil_type_str);
1251     GNUNET_assert (NULL != evil_subtype_str);
1252
1253     if (peer_num == session->local_peer_idx)
1254     {
1255       if (0 == strcmp ("slack", evil_type_str))
1256       {
1257         evil->type = EVILNESS_SLACK;
1258       }
1259       if (0 == strcmp ("slack-a2a", evil_type_str))
1260       {
1261         evil->type = EVILNESS_SLACK_A2A;
1262       }
1263       else if (0 == strcmp ("cram-all", evil_type_str))
1264       {
1265         evil->type = EVILNESS_CRAM_ALL;
1266         evil->num = evil_num;
1267         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1268           goto not_evil;
1269       }
1270       else if (0 == strcmp ("cram-lead", evil_type_str))
1271       {
1272         evil->type = EVILNESS_CRAM_LEAD;
1273         evil->num = evil_num;
1274         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1275           goto not_evil;
1276       }
1277       else if (0 == strcmp ("cram-echo", evil_type_str))
1278       {
1279         evil->type = EVILNESS_CRAM_ECHO;
1280         evil->num = evil_num;
1281         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1282           goto not_evil;
1283       }
1284       else
1285       {
1286         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1287                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1288                     evil_type_str);
1289         goto not_evil;
1290       }
1291       goto cleanup;
1292     }
1293     /* No GNUNET_free since memory was allocated by libc */
1294     free (evil_type_str);
1295     evil_type_str = NULL;
1296     evil_subtype_str = NULL;
1297   }
1298 not_evil:
1299   evil->type = EVILNESS_NONE;
1300 cleanup:
1301   GNUNET_free (evil_spec);
1302   /* no GNUNET_free_non_null since it wasn't
1303    * allocated with GNUNET_malloc */
1304   if (NULL != evil_type_str)
1305     free (evil_type_str);
1306   if (NULL != evil_subtype_str)
1307     free (evil_subtype_str);
1308 }
1309
1310 #endif
1311
1312
1313 /**
1314  * Commit the appropriate set for a
1315  * task.
1316  */
1317 static void
1318 commit_set (struct ConsensusSession *session,
1319             struct TaskEntry *task)
1320 {
1321   struct SetEntry *set;
1322   struct SetOpCls *setop = &task->cls.setop;
1323
1324   GNUNET_assert (NULL != setop->op);
1325   set = lookup_set (session, &setop->input_set);
1326   GNUNET_assert (NULL != set);
1327
1328   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1329   {
1330     struct GNUNET_SET_Element element;
1331     struct ConsensusElement ce = { 0 };
1332     ce.marker = CONSENSUS_MARKER_CONTESTED;
1333     element.data = &ce;
1334     element.size = sizeof (struct ConsensusElement);
1335     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1336     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1337   }
1338
1339   if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1340   {
1341     struct GNUNET_SET_Element element;
1342     struct ConsensusSizeElement cse = {
1343       .size = 0,
1344       .sender_index = 0
1345     };
1346     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1347     cse.ce.marker = CONSENSUS_MARKER_SIZE;
1348     cse.size = GNUNET_htonll (session->first_size);
1349     cse.sender_index = session->local_peer_idx;
1350     element.data = &cse;
1351     element.size = sizeof (struct ConsensusSizeElement);
1352     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1353     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1354   }
1355
1356 #ifdef EVIL
1357   {
1358     unsigned int i;
1359     struct Evilness evil;
1360
1361     get_evilness (session, &evil);
1362     if (EVILNESS_NONE != evil.type)
1363     {
1364       /* Useful for evaluation */
1365       GNUNET_STATISTICS_set (statistics,
1366                              "is evil",
1367                              1,
1368                              GNUNET_NO);
1369     }
1370     switch (evil.type)
1371     {
1372       case EVILNESS_CRAM_ALL:
1373       case EVILNESS_CRAM_LEAD:
1374       case EVILNESS_CRAM_ECHO:
1375         /* We're not cramming elements in the
1376            all-to-all round, since that would just
1377            add more elements to the result set, but
1378            wouldn't test robustness. */
1379         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1380         {
1381           GNUNET_SET_commit (setop->op, set->h);
1382           break;
1383         }
1384         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1385             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1386         {
1387           GNUNET_SET_commit (setop->op, set->h);
1388           break;
1389         }
1390         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1391         {
1392           GNUNET_SET_commit (setop->op, set->h);
1393           break;
1394         }
1395         for (i = 0; i < evil.num; i++)
1396         {
1397           struct GNUNET_SET_Element element;
1398           struct ConsensusStuffedElement se = {
1399             .ce.payload_type = 0,
1400             .ce.marker = 0,
1401           };
1402           element.data = &se;
1403           element.size = sizeof (struct ConsensusStuffedElement);
1404           element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1405
1406           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1407           {
1408             /* Always generate a new element. */
1409             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1410           }
1411           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1412           {
1413             /* Always cram the same elements, derived from counter. */
1414             GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1415           }
1416           else
1417           {
1418             GNUNET_assert (0);
1419           }
1420           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1421 #ifdef GNUNET_EXTRA_LOGGING
1422           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1423                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1424                       session->local_peer_idx,
1425                       debug_str_element (&element),
1426                       debug_str_set_key (&setop->input_set),
1427                       debug_str_task_key (&task->key));
1428 #endif
1429         }
1430         GNUNET_STATISTICS_update (statistics,
1431                                   "# stuffed elements",
1432                                   evil.num,
1433                                   GNUNET_NO);
1434         GNUNET_SET_commit (setop->op, set->h);
1435         break;
1436       case EVILNESS_SLACK:
1437         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1438                     "P%u: evil peer: slacking\n",
1439                     (unsigned int) session->local_peer_idx);
1440         /* Do nothing. */
1441       case EVILNESS_SLACK_A2A:
1442         if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1443              (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1444         {
1445           struct GNUNET_SET_Handle *empty_set;
1446           empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1447           GNUNET_SET_commit (setop->op, empty_set);
1448           GNUNET_SET_destroy (empty_set);
1449         }
1450         else
1451         {
1452           GNUNET_SET_commit (setop->op, set->h);
1453         }
1454         break;
1455       case EVILNESS_NONE:
1456         GNUNET_SET_commit (setop->op, set->h);
1457         break;
1458     }
1459   }
1460 #else
1461   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1462   {
1463     GNUNET_SET_commit (setop->op, set->h);
1464   }
1465   else
1466   {
1467     /* For our testcases, we don't want the blacklisted
1468        peers to wait. */
1469     GNUNET_SET_operation_cancel (setop->op);
1470     setop->op = NULL;
1471     finish_task (task);
1472   }
1473 #endif
1474 }
1475
1476
1477 static void
1478 put_diff (struct ConsensusSession *session,
1479          struct DiffEntry *diff)
1480 {
1481   struct GNUNET_HashCode hash;
1482
1483   GNUNET_assert (NULL != diff);
1484
1485   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1486   GNUNET_assert (GNUNET_OK ==
1487                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1488                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1489 }
1490
1491 static void
1492 put_set (struct ConsensusSession *session,
1493          struct SetEntry *set)
1494 {
1495   struct GNUNET_HashCode hash;
1496
1497   GNUNET_assert (NULL != set->h);
1498
1499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1500               "Putting set %s\n",
1501               debug_str_set_key (&set->key));
1502
1503   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1504   GNUNET_assert (GNUNET_SYSERR !=
1505                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1506                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1507 }
1508
1509
1510 static void
1511 put_rfn (struct ConsensusSession *session,
1512          struct ReferendumEntry *rfn)
1513 {
1514   struct GNUNET_HashCode hash;
1515
1516   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1517   GNUNET_assert (GNUNET_OK ==
1518                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1519                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1520 }
1521
1522
1523
1524 static void
1525 task_cancel_reconcile (struct TaskEntry *task)
1526 {
1527   /* not implemented yet */
1528   GNUNET_assert (0);
1529 }
1530
1531
1532 static void
1533 apply_diff_to_rfn (struct DiffEntry *diff,
1534                    struct ReferendumEntry *rfn,
1535                    uint16_t voting_peer,
1536                    uint16_t num_peers)
1537 {
1538   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1539   struct DiffElementInfo *di;
1540
1541   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1542
1543   while (GNUNET_YES ==
1544          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1545                                                       NULL,
1546                                                       (const void **) &di))
1547   {
1548     if (di->weight > 0)
1549     {
1550       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1551     }
1552     if (di->weight < 0)
1553     {
1554       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1555     }
1556   }
1557
1558   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1559 }
1560
1561
1562 struct DiffEntry *
1563 diff_create ()
1564 {
1565   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1566
1567   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1568
1569   return d;
1570 }
1571
1572
1573 struct DiffEntry *
1574 diff_compose (struct DiffEntry *diff_1,
1575               struct DiffEntry *diff_2)
1576 {
1577   struct DiffEntry *diff_new;
1578   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1579   struct DiffElementInfo *di;
1580
1581   diff_new = diff_create ();
1582
1583   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1584   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1585   {
1586     diff_insert (diff_new, di->weight, di->element);
1587   }
1588   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1589
1590   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1591   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1592   {
1593     diff_insert (diff_new, di->weight, di->element);
1594   }
1595   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1596
1597   return diff_new;
1598 }
1599
1600
1601 struct ReferendumEntry *
1602 rfn_create (uint16_t size)
1603 {
1604   struct ReferendumEntry *rfn;
1605
1606   rfn = GNUNET_new (struct ReferendumEntry);
1607   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1608   rfn->peer_commited = GNUNET_new_array (size, int);
1609   rfn->peer_contested = GNUNET_new_array (size, int);
1610   rfn->num_peers = size;
1611
1612   return rfn;
1613 }
1614
1615
1616 #if UNUSED
1617 static void
1618 diff_destroy (struct DiffEntry *diff)
1619 {
1620   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1621   GNUNET_free (diff);
1622 }
1623 #endif
1624
1625
1626 /**
1627  * For a given majority, count what the outcome
1628  * is (add/remove/keep), and give the number
1629  * of peers that voted for this outcome.
1630  */
1631 static void
1632 rfn_majority (const struct ReferendumEntry *rfn,
1633               const struct RfnElementInfo *ri,
1634               uint16_t *ret_majority,
1635               enum ReferendumVote *ret_vote)
1636 {
1637   uint16_t votes_yes = 0;
1638   uint16_t num_commited = 0;
1639   uint16_t i;
1640
1641   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1642               "Computing rfn majority for element %s of rfn {%s}\n",
1643               debug_str_element (ri->element),
1644               debug_str_rfn_key (&rfn->key));
1645
1646   for (i = 0; i < rfn->num_peers; i++)
1647   {
1648     if (GNUNET_NO == rfn->peer_commited[i])
1649       continue;
1650     num_commited++;
1651
1652     if (GNUNET_YES == ri->votes[i])
1653       votes_yes++;
1654   }
1655
1656   if (votes_yes > (num_commited) / 2)
1657   {
1658     *ret_vote = ri->proposal;
1659     *ret_majority = votes_yes;
1660   }
1661   else
1662   {
1663     *ret_vote = VOTE_STAY;
1664     *ret_majority = num_commited - votes_yes;
1665   }
1666 }
1667
1668
1669 struct SetCopyCls
1670 {
1671   struct TaskEntry *task;
1672   struct SetKey dst_set_key;
1673 };
1674
1675
1676 static void
1677 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1678 {
1679   struct SetCopyCls *scc = cls;
1680   struct TaskEntry *task = scc->task;
1681   struct SetKey dst_set_key = scc->dst_set_key;
1682   struct SetEntry *set;
1683   struct SetHandle *sh = GNUNET_new (struct SetHandle);
1684
1685   sh->h = copy;
1686   GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1687                                task->step->session->set_handles_tail,
1688                                sh);
1689
1690   GNUNET_free (scc);
1691   set = GNUNET_new (struct SetEntry);
1692   set->h = copy;
1693   set->key = dst_set_key;
1694   put_set (task->step->session, set);
1695
1696   task->start (task);
1697 }
1698
1699
1700 /**
1701  * Call the start function of the given
1702  * task again after we created a copy of the given set.
1703  */
1704 static void
1705 create_set_copy_for_task (struct TaskEntry *task,
1706                           struct SetKey *src_set_key,
1707                           struct SetKey *dst_set_key)
1708 {
1709   struct SetEntry *src_set;
1710   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1711
1712   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1713               "Copying set {%s} to {%s} for task {%s}\n",
1714               debug_str_set_key (src_set_key),
1715               debug_str_set_key (dst_set_key),
1716               debug_str_task_key (&task->key));
1717
1718   scc->task = task;
1719   scc->dst_set_key = *dst_set_key;
1720   src_set = lookup_set (task->step->session, src_set_key);
1721   GNUNET_assert (NULL != src_set);
1722   GNUNET_SET_copy_lazy (src_set->h,
1723                         set_copy_cb,
1724                         scc);
1725 }
1726
1727
1728 struct SetMutationProgressCls
1729 {
1730   int num_pending;
1731   /**
1732    * Task to finish once all changes are through.
1733    */
1734   struct TaskEntry *task;
1735 };
1736
1737
1738 static void
1739 set_mutation_done (void *cls)
1740 {
1741   struct SetMutationProgressCls *pc = cls;
1742
1743   GNUNET_assert (pc->num_pending > 0);
1744
1745   pc->num_pending--;
1746
1747   if (0 == pc->num_pending)
1748   {
1749     struct TaskEntry *task = pc->task;
1750     GNUNET_free (pc);
1751     finish_task (task);
1752   }
1753 }
1754
1755
1756 static void
1757 try_finish_step_early (struct Step *step)
1758 {
1759   unsigned int i;
1760
1761   if (GNUNET_YES == step->is_running)
1762     return;
1763   if (GNUNET_YES == step->is_finished)
1764     return;
1765   if (GNUNET_NO == step->early_finishable)
1766     return;
1767
1768   step->is_finished = GNUNET_YES;
1769
1770 #ifdef GNUNET_EXTRA_LOGGING
1771   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1772               "Finishing step `%s' early.\n",
1773               step->debug_name);
1774 #endif
1775
1776   for (i = 0; i < step->subordinates_len; i++)
1777   {
1778     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1779     step->subordinates[i]->pending_prereq--;
1780 #ifdef GNUNET_EXTRA_LOGGING
1781     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1782                 "Decreased pending_prereq to %u for step `%s'.\n",
1783                 (unsigned int) step->subordinates[i]->pending_prereq,
1784                 step->subordinates[i]->debug_name);
1785
1786 #endif
1787     try_finish_step_early (step->subordinates[i]);
1788   }
1789
1790   // XXX: maybe schedule as task to avoid recursion?
1791   run_ready_steps (step->session);
1792 }
1793
1794
1795 static void
1796 finish_step (struct Step *step)
1797 {
1798   unsigned int i;
1799
1800   GNUNET_assert (step->finished_tasks == step->tasks_len);
1801   GNUNET_assert (GNUNET_YES == step->is_running);
1802   GNUNET_assert (GNUNET_NO == step->is_finished);
1803
1804 #ifdef GNUNET_EXTRA_LOGGING
1805   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1806               "All tasks of step `%s' with %u subordinates finished.\n",
1807               step->debug_name,
1808               step->subordinates_len);
1809 #endif
1810
1811   for (i = 0; i < step->subordinates_len; i++)
1812   {
1813     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1814     step->subordinates[i]->pending_prereq--;
1815 #ifdef GNUNET_EXTRA_LOGGING
1816     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1817                 "Decreased pending_prereq to %u for step `%s'.\n",
1818                 (unsigned int) step->subordinates[i]->pending_prereq,
1819                 step->subordinates[i]->debug_name);
1820
1821 #endif
1822   }
1823
1824   step->is_finished = GNUNET_YES;
1825
1826   // XXX: maybe schedule as task to avoid recursion?
1827   run_ready_steps (step->session);
1828 }
1829
1830
1831
1832 /**
1833  * Apply the result from one round of gradecasts (i.e. every peer
1834  * should have gradecasted) to the peer's current set.
1835  *
1836  * @param task the task with context information
1837  */
1838 static void
1839 task_start_apply_round (struct TaskEntry *task)
1840 {
1841   struct ConsensusSession *session = task->step->session;
1842   struct SetKey sk_in;
1843   struct SetKey sk_out;
1844   struct RfnKey rk_in;
1845   struct SetEntry *set_out;
1846   struct ReferendumEntry *rfn_in;
1847   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1848   struct RfnElementInfo *ri;
1849   struct SetMutationProgressCls *progress_cls;
1850   uint16_t worst_majority = UINT16_MAX;
1851
1852   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1853   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1854   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1855
1856   set_out = lookup_set (session, &sk_out);
1857   if (NULL == set_out)
1858   {
1859     create_set_copy_for_task (task, &sk_in, &sk_out);
1860     return;
1861   }
1862
1863   rfn_in = lookup_rfn (session, &rk_in);
1864   GNUNET_assert (NULL != rfn_in);
1865
1866   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1867   progress_cls->task = task;
1868
1869   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1870
1871   while (GNUNET_YES ==
1872          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1873                                                       NULL,
1874                                                       (const void **) &ri))
1875   {
1876     uint16_t majority_num;
1877     enum ReferendumVote majority_vote;
1878
1879     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1880
1881     if (worst_majority > majority_num)
1882       worst_majority = majority_num;
1883
1884     switch (majority_vote)
1885     {
1886       case VOTE_ADD:
1887         progress_cls->num_pending++;
1888         GNUNET_assert (GNUNET_OK ==
1889                        GNUNET_SET_add_element (set_out->h,
1890                                                ri->element,
1891                                                &set_mutation_done,
1892                                                progress_cls));
1893         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1894                     "P%u: apply round: adding element %s with %u-majority.\n",
1895                     session->local_peer_idx,
1896                     debug_str_element (ri->element), majority_num);
1897         break;
1898       case VOTE_REMOVE:
1899         progress_cls->num_pending++;
1900         GNUNET_assert (GNUNET_OK ==
1901                        GNUNET_SET_remove_element (set_out->h,
1902                                                   ri->element,
1903                                                   &set_mutation_done,
1904                                                   progress_cls));
1905         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906                     "P%u: apply round: deleting element %s with %u-majority.\n",
1907                     session->local_peer_idx,
1908                     debug_str_element (ri->element), majority_num);
1909         break;
1910       case VOTE_STAY:
1911         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1912                     "P%u: apply round: keeping element %s with %u-majority.\n",
1913                     session->local_peer_idx,
1914                     debug_str_element (ri->element), majority_num);
1915         // do nothing
1916         break;
1917       default:
1918         GNUNET_assert (0);
1919         break;
1920     }
1921   }
1922
1923   if (0 == progress_cls->num_pending)
1924   {
1925     // call closure right now, no pending ops
1926     GNUNET_free (progress_cls);
1927     finish_task (task);
1928   }
1929
1930   {
1931     uint16_t thresh = (session->num_peers / 3) * 2;
1932
1933     if (worst_majority >= thresh)
1934     {
1935       switch (session->early_stopping)
1936       {
1937         case EARLY_STOPPING_NONE:
1938           session->early_stopping = EARLY_STOPPING_ONE_MORE;
1939           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1940                       "P%u: Stopping early (after one more superround)\n",
1941                       session->local_peer_idx);
1942           break;
1943         case EARLY_STOPPING_ONE_MORE:
1944           GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1945                       session->local_peer_idx);
1946           session->early_stopping = EARLY_STOPPING_DONE;
1947           {
1948             struct Step *step;
1949             for (step = session->steps_head; NULL != step; step = step->next)
1950               try_finish_step_early (step);
1951           }
1952           break;
1953         case EARLY_STOPPING_DONE:
1954           /* We shouldn't be here anymore after early stopping */
1955           GNUNET_break (0);
1956           break;
1957         default:
1958           GNUNET_assert (0);
1959           break;
1960       }
1961     }
1962     else if (EARLY_STOPPING_NONE != session->early_stopping)
1963     {
1964       // Our assumption about the number of bad peers
1965       // has been broken.
1966       GNUNET_break_op (0);
1967     }
1968     else
1969     {
1970       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1971                   session->local_peer_idx);
1972     }
1973   }
1974   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1975 }
1976
1977
1978 static void
1979 task_start_grade (struct TaskEntry *task)
1980 {
1981   struct ConsensusSession *session = task->step->session;
1982   struct ReferendumEntry *output_rfn;
1983   struct ReferendumEntry *input_rfn;
1984   struct DiffEntry *input_diff;
1985   struct RfnKey rfn_key;
1986   struct DiffKey diff_key;
1987   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1988   struct RfnElementInfo *ri;
1989   unsigned int gradecast_confidence = 2;
1990
1991   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1992   output_rfn = lookup_rfn (session, &rfn_key);
1993   if (NULL == output_rfn)
1994   {
1995     output_rfn = rfn_create (session->num_peers);
1996     output_rfn->key = rfn_key;
1997     put_rfn (session, output_rfn);
1998   }
1999
2000   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2001   input_diff = lookup_diff (session, &diff_key);
2002   GNUNET_assert (NULL != input_diff);
2003
2004   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2005   input_rfn = lookup_rfn (session, &rfn_key);
2006   GNUNET_assert (NULL != input_rfn);
2007
2008   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2009
2010   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2011
2012   while (GNUNET_YES ==
2013          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2014                                                       NULL,
2015                                                       (const void **) &ri))
2016   {
2017     uint16_t majority_num;
2018     enum ReferendumVote majority_vote;
2019
2020     // XXX: we need contested votes and non-contested votes here
2021     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2022
2023     if (majority_num <= session->num_peers / 3)
2024       majority_vote = VOTE_REMOVE;
2025
2026     switch (majority_vote)
2027     {
2028       case VOTE_STAY:
2029         break;
2030       case VOTE_ADD:
2031         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2032         break;
2033       case VOTE_REMOVE:
2034         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2035         break;
2036       default:
2037         GNUNET_assert (0);
2038         break;
2039     }
2040   }
2041   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2042
2043   {
2044     uint16_t noncontested;
2045     noncontested = rfn_noncontested (input_rfn);
2046     if (noncontested < (session->num_peers / 3) * 2)
2047     {
2048       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2049     }
2050     if (noncontested < (session->num_peers / 3) + 1)
2051     {
2052       gradecast_confidence = 0;
2053     }
2054   }
2055
2056   if (gradecast_confidence >= 1)
2057     rfn_commit (output_rfn, task->key.leader);
2058
2059   if (gradecast_confidence <= 1)
2060     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2061
2062   finish_task (task);
2063 }
2064
2065
2066 static void
2067 task_start_reconcile (struct TaskEntry *task)
2068 {
2069   struct SetEntry *input;
2070   struct SetOpCls *setop = &task->cls.setop;
2071   struct ConsensusSession *session = task->step->session;
2072
2073   input = lookup_set (session, &setop->input_set);
2074   GNUNET_assert (NULL != input);
2075   GNUNET_assert (NULL != input->h);
2076
2077   /* We create the outputs for the operation here
2078      (rather than in the set operation callback)
2079      because we want something valid in there, even
2080      if the other peer doesn't talk to us */
2081
2082   if (SET_KIND_NONE != setop->output_set.set_kind)
2083   {
2084     /* If we don't have an existing output set,
2085        we clone the input set. */
2086     if (NULL == lookup_set (session, &setop->output_set))
2087     {
2088       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2089       return;
2090     }
2091   }
2092
2093   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2094   {
2095     if (NULL == lookup_rfn (session, &setop->output_rfn))
2096     {
2097       struct ReferendumEntry *rfn;
2098
2099       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2100                   "P%u: output rfn <%s> missing, creating.\n",
2101                   session->local_peer_idx,
2102                   debug_str_rfn_key (&setop->output_rfn));
2103
2104       rfn = rfn_create (session->num_peers);
2105       rfn->key = setop->output_rfn;
2106       put_rfn (session, rfn);
2107     }
2108   }
2109
2110   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2111   {
2112     if (NULL == lookup_diff (session, &setop->output_diff))
2113     {
2114       struct DiffEntry *diff;
2115
2116       diff = diff_create ();
2117       diff->key = setop->output_diff;
2118       put_diff (session, diff);
2119     }
2120   }
2121
2122   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2123   {
2124     /* XXX: mark the corresponding rfn as commited if necessary */
2125     finish_task (task);
2126     return;
2127   }
2128
2129   if (task->key.peer1 == session->local_peer_idx)
2130   {
2131     struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2132
2133     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2134                 "P%u: Looking up set {%s} to run remote union\n",
2135                 session->local_peer_idx,
2136                 debug_str_set_key (&setop->input_set));
2137
2138     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2139     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2140
2141     rcm.kind = htons (task->key.kind);
2142     rcm.peer1 = htons (task->key.peer1);
2143     rcm.peer2 = htons (task->key.peer2);
2144     rcm.leader = htons (task->key.leader);
2145     rcm.repetition = htons (task->key.repetition);
2146     rcm.is_contested = htons (0);
2147
2148     GNUNET_assert (NULL == setop->op);
2149     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2150                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2151
2152     struct GNUNET_SET_Option opts[] = {
2153       { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2154       { GNUNET_SET_OPTION_END },
2155     };
2156
2157     // XXX: maybe this should be done while
2158     // setting up tasks alreays?
2159     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2160                                     &session->global_id,
2161                                     &rcm.header,
2162                                     GNUNET_SET_RESULT_SYMMETRIC,
2163                                     opts,
2164                                     set_result_cb,
2165                                     task);
2166
2167     commit_set (session, task);
2168   }
2169   else if (task->key.peer2 == session->local_peer_idx)
2170   {
2171     /* Wait for the other peer to contact us */
2172     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2173                 session->local_peer_idx, task->key.peer1);
2174
2175     if (NULL != setop->op)
2176     {
2177       commit_set (session, task);
2178     }
2179   }
2180   else
2181   {
2182     /* We made an error while constructing the task graph. */
2183     GNUNET_assert (0);
2184   }
2185 }
2186
2187
2188 static void
2189 task_start_eval_echo (struct TaskEntry *task)
2190 {
2191   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2192   struct ReferendumEntry *input_rfn;
2193   struct RfnElementInfo *ri;
2194   struct SetEntry *output_set;
2195   struct SetMutationProgressCls *progress_cls;
2196   struct ConsensusSession *session = task->step->session;
2197   struct SetKey sk_in;
2198   struct SetKey sk_out;
2199   struct RfnKey rk_in;
2200
2201   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2202   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2203   output_set = lookup_set (session, &sk_out);
2204   if (NULL == output_set)
2205   {
2206     create_set_copy_for_task (task, &sk_in, &sk_out);
2207     return;
2208   }
2209
2210
2211   {
2212     // FIXME: should be marked as a shallow copy, so
2213     // we can destroy everything correctly
2214     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2215     last_set->h = output_set->h;
2216     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2217     put_set (session, last_set);
2218   }
2219
2220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2221               "Evaluating referendum in Task {%s}\n",
2222               debug_str_task_key (&task->key));
2223
2224   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2225   progress_cls->task = task;
2226
2227   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2228   input_rfn = lookup_rfn (session, &rk_in);
2229
2230   GNUNET_assert (NULL != input_rfn);
2231
2232   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2233   GNUNET_assert (NULL != iter);
2234
2235   while (GNUNET_YES ==
2236          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2237                                                       NULL,
2238                                                       (const void **) &ri))
2239   {
2240     enum ReferendumVote majority_vote;
2241     uint16_t majority_num;
2242
2243     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2244
2245     if (majority_num < session->num_peers / 3)
2246     {
2247       /* It is not the case that all nonfaulty peers
2248          echoed the same value.  Since we're doing a set reconciliation, we
2249          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2250          reconciliation as contested.  Other peers might not know that the
2251          leader is faulty, thus we still re-distribute in the confirmation
2252          round. */
2253       output_set->is_contested = GNUNET_YES;
2254     }
2255
2256     switch (majority_vote)
2257     {
2258       case VOTE_ADD:
2259         progress_cls->num_pending++;
2260         GNUNET_assert (GNUNET_OK ==
2261                        GNUNET_SET_add_element (output_set->h,
2262                                                ri->element,
2263                                                set_mutation_done,
2264                                                progress_cls));
2265         break;
2266       case VOTE_REMOVE:
2267         progress_cls->num_pending++;
2268         GNUNET_assert (GNUNET_OK ==
2269                        GNUNET_SET_remove_element (output_set->h,
2270                                                   ri->element,
2271                                                   set_mutation_done,
2272                                                   progress_cls));
2273         break;
2274       case VOTE_STAY:
2275         /* Nothing to do. */
2276         break;
2277       default:
2278         /* not reached */
2279         GNUNET_assert (0);
2280     }
2281   }
2282
2283   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2284
2285   if (0 == progress_cls->num_pending)
2286   {
2287     // call closure right now, no pending ops
2288     GNUNET_free (progress_cls);
2289     finish_task (task);
2290   }
2291 }
2292
2293
2294 static void
2295 task_start_finish (struct TaskEntry *task)
2296 {
2297   struct SetEntry *final_set;
2298   struct ConsensusSession *session = task->step->session;
2299
2300   final_set = lookup_set (session, &task->cls.finish.input_set);
2301
2302   GNUNET_assert (NULL != final_set);
2303
2304
2305   GNUNET_SET_iterate (final_set->h,
2306                       send_to_client_iter,
2307                       task);
2308 }
2309
2310 static void
2311 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2312 {
2313   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2314
2315   GNUNET_assert (GNUNET_NO == task->is_started);
2316   GNUNET_assert (GNUNET_NO == task->is_finished);
2317   GNUNET_assert (NULL != task->start);
2318
2319   task->start (task);
2320
2321   task->is_started = GNUNET_YES;
2322 }
2323
2324
2325
2326
2327 /*
2328  * Run all steps of the session that don't any
2329  * more dependencies.
2330  */
2331 static void
2332 run_ready_steps (struct ConsensusSession *session)
2333 {
2334   struct Step *step;
2335
2336   step = session->steps_head;
2337
2338   while (NULL != step)
2339   {
2340     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2341     {
2342       size_t i;
2343
2344       GNUNET_assert (0 == step->finished_tasks);
2345
2346 #ifdef GNUNET_EXTRA_LOGGING
2347       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2348                   session->local_peer_idx,
2349                   step->debug_name,
2350                   step->round, step->tasks_len, step->subordinates_len);
2351 #endif
2352
2353       step->is_running = GNUNET_YES;
2354       for (i = 0; i < step->tasks_len; i++)
2355         start_task (session, step->tasks[i]);
2356
2357       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2358       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2359         finish_step (step);
2360
2361       /* Running the next ready steps will be triggered by task completion */
2362       return;
2363     }
2364     step = step->next;
2365   }
2366
2367   return;
2368 }
2369
2370
2371
2372 static void
2373 finish_task (struct TaskEntry *task)
2374 {
2375   GNUNET_assert (GNUNET_NO == task->is_finished);
2376   task->is_finished = GNUNET_YES;
2377
2378   task->step->finished_tasks++;
2379
2380   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2381               "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2382               task->step->session->local_peer_idx,
2383               debug_str_task_key (&task->key),
2384               (unsigned int) task->step->finished_tasks,
2385               (unsigned int) task->step->tasks_len);
2386
2387   if (task->step->finished_tasks == task->step->tasks_len)
2388     finish_step (task->step);
2389 }
2390
2391
2392 /**
2393  * Search peer in the list of peers in session.
2394  *
2395  * @param peer peer to find
2396  * @param session session with peer
2397  * @return index of peer, -1 if peer is not in session
2398  */
2399 static int
2400 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2401 {
2402   int i;
2403   for (i = 0; i < session->num_peers; i++)
2404     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2405       return i;
2406   return -1;
2407 }
2408
2409
2410 /**
2411  * Compute a global, (hopefully) unique consensus session id,
2412  * from the local id of the consensus session, and the identities of all participants.
2413  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2414  * exactly the same peers, the global id will be different.
2415  *
2416  * @param session session to generate the global id for
2417  * @param local_session_id local id of the consensus session
2418  */
2419 static void
2420 compute_global_id (struct ConsensusSession *session,
2421                    const struct GNUNET_HashCode *local_session_id)
2422 {
2423   const char *salt = "gnunet-service-consensus/session_id";
2424
2425   GNUNET_assert (GNUNET_YES ==
2426                  GNUNET_CRYPTO_kdf (&session->global_id,
2427                                     sizeof (struct GNUNET_HashCode),
2428                                     salt,
2429                                     strlen (salt),
2430                                     session->peers,
2431                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2432                                     local_session_id,
2433                                     sizeof (struct GNUNET_HashCode),
2434                                     NULL));
2435 }
2436
2437
2438 /**
2439  * Compare two peer identities.
2440  *
2441  * @param h1 some peer identity
2442  * @param h2 some peer identity
2443  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2444  */
2445 static int
2446 peer_id_cmp (const void *h1, const void *h2)
2447 {
2448   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2449 }
2450
2451
2452 /**
2453  * Create the sorted list of peers for the session,
2454  * add the local peer if not in the join message.
2455  *
2456  * @param session session to initialize
2457  * @param join_msg join message with the list of peers participating at the end
2458  */
2459 static void
2460 initialize_session_peer_list (struct ConsensusSession *session,
2461                               const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2462 {
2463   const struct GNUNET_PeerIdentity *msg_peers
2464     = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2465   int local_peer_in_list;
2466
2467   session->num_peers = ntohl (join_msg->num_peers);
2468
2469   /* Peers in the join message, may or may not include the local peer,
2470      Add it if it is missing. */
2471   local_peer_in_list = GNUNET_NO;
2472   for (unsigned int i = 0; i < session->num_peers; i++)
2473   {
2474     if (0 == memcmp (&msg_peers[i],
2475                      &my_peer,
2476                      sizeof (struct GNUNET_PeerIdentity)))
2477     {
2478       local_peer_in_list = GNUNET_YES;
2479       break;
2480     }
2481   }
2482   if (GNUNET_NO == local_peer_in_list)
2483     session->num_peers++;
2484
2485   session->peers = GNUNET_new_array (session->num_peers,
2486                                      struct GNUNET_PeerIdentity);
2487   if (GNUNET_NO == local_peer_in_list)
2488     session->peers[session->num_peers - 1] = my_peer;
2489
2490   GNUNET_memcpy (session->peers,
2491                  msg_peers,
2492                  ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2493   qsort (session->peers,
2494          session->num_peers,
2495          sizeof (struct GNUNET_PeerIdentity),
2496          &peer_id_cmp);
2497 }
2498
2499
2500 static struct TaskEntry *
2501 lookup_task (struct ConsensusSession *session,
2502              struct TaskKey *key)
2503 {
2504   struct GNUNET_HashCode hash;
2505
2506
2507   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2508   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2509               GNUNET_h2s (&hash));
2510   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2511 }
2512
2513
2514 /**
2515  * Called when another peer wants to do a set operation with the
2516  * local peer.
2517  *
2518  * @param cls closure
2519  * @param other_peer the other peer
2520  * @param context_msg message with application specific information from
2521  *        the other peer
2522  * @param request request from the other peer, use GNUNET_SET_accept
2523  *        to accept it, otherwise the request will be refused
2524  *        Note that we don't use a return value here, as it is also
2525  *        necessary to specify the set we want to do the operation with,
2526  *        whith sometimes can be derived from the context message.
2527  *        Also necessary to specify the timeout.
2528  */
2529 static void
2530 set_listen_cb (void *cls,
2531                const struct GNUNET_PeerIdentity *other_peer,
2532                const struct GNUNET_MessageHeader *context_msg,
2533                struct GNUNET_SET_Request *request)
2534 {
2535   struct ConsensusSession *session = cls;
2536   struct TaskKey tk;
2537   struct TaskEntry *task;
2538   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2539
2540   if (NULL == context_msg)
2541   {
2542     GNUNET_break_op (0);
2543     return;
2544   }
2545
2546   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2547   {
2548     GNUNET_break_op (0);
2549     return;
2550   }
2551
2552   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2553   {
2554     GNUNET_break_op (0);
2555     return;
2556   }
2557
2558   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2559
2560   tk = ((struct TaskKey) {
2561       .kind = ntohs (cm->kind),
2562       .peer1 = ntohs (cm->peer1),
2563       .peer2 = ntohs (cm->peer2),
2564       .repetition = ntohs (cm->repetition),
2565       .leader = ntohs (cm->leader),
2566   });
2567
2568   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2569               session->local_peer_idx, debug_str_task_key (&tk));
2570
2571   task = lookup_task (session, &tk);
2572
2573   if (NULL == task)
2574   {
2575     GNUNET_break_op (0);
2576     return;
2577   }
2578
2579   if (GNUNET_YES == task->is_finished)
2580   {
2581     GNUNET_break_op (0);
2582     return;
2583   }
2584
2585   if (task->key.peer2 != session->local_peer_idx)
2586   {
2587     /* We're being asked, so we must be thne 2nd peer. */
2588     GNUNET_break_op (0);
2589     return;
2590   }
2591
2592   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2593                     (task->key.peer2 == session->local_peer_idx)));
2594
2595   struct GNUNET_SET_Option opts[] = {
2596     { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2597     { GNUNET_SET_OPTION_END },
2598   };
2599
2600   task->cls.setop.op = GNUNET_SET_accept (request,
2601                                           GNUNET_SET_RESULT_SYMMETRIC,
2602                                           opts,
2603                                           set_result_cb,
2604                                           task);
2605
2606   /* If the task hasn't been started yet,
2607      we wait for that until we commit. */
2608
2609   if (GNUNET_YES == task->is_started)
2610   {
2611     commit_set (session, task);
2612   }
2613 }
2614
2615
2616
2617 static void
2618 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2619           struct TaskEntry *t)
2620 {
2621   struct GNUNET_HashCode round_hash;
2622   struct Step *s;
2623
2624   GNUNET_assert (NULL != t->step);
2625
2626   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2627
2628   s = t->step;
2629
2630   if (s->tasks_len == s->tasks_cap)
2631   {
2632     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2633     GNUNET_array_grow (s->tasks,
2634                        s->tasks_cap,
2635                        target_size);
2636   }
2637
2638 #ifdef GNUNET_EXTRA_LOGGING
2639   GNUNET_assert (NULL != s->debug_name);
2640   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2641               debug_str_task_key (&t->key),
2642               s->debug_name);
2643 #endif
2644
2645   s->tasks[s->tasks_len] = t;
2646   s->tasks_len++;
2647
2648   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2649   GNUNET_assert (GNUNET_OK ==
2650       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2651                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2652 }
2653
2654
2655 static void
2656 install_step_timeouts (struct ConsensusSession *session)
2657 {
2658   /* Given the fully constructed task graph
2659      with rounds for tasks, we can give the tasks timeouts. */
2660
2661   // unsigned int max_round;
2662
2663   /* XXX: implement! */
2664 }
2665
2666
2667
2668 /*
2669  * Arrange two peers in some canonical order.
2670  */
2671 static void
2672 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2673 {
2674   uint16_t a;
2675   uint16_t b;
2676
2677   GNUNET_assert (*p1 < n);
2678   GNUNET_assert (*p2 < n);
2679
2680   if (*p1 < *p2)
2681   {
2682     a = *p1;
2683     b = *p2;
2684   }
2685   else
2686   {
2687     a = *p2;
2688     b = *p1;
2689   }
2690
2691   /* For uniformly random *p1, *p2,
2692      this condition is true with 50% chance */
2693   if (((b - a) + n) % n <= n / 2)
2694   {
2695     *p1 = a;
2696     *p2 = b;
2697   }
2698   else
2699   {
2700     *p1 = b;
2701     *p2 = a;
2702   }
2703 }
2704
2705
2706 /**
2707  * Record @a dep as a dependency of @a step.
2708  */
2709 static void
2710 step_depend_on (struct Step *step, struct Step *dep)
2711 {
2712   /* We're not checking for cyclic dependencies,
2713      but this is a cheap sanity check. */
2714   GNUNET_assert (step != dep);
2715   GNUNET_assert (NULL != step);
2716   GNUNET_assert (NULL != dep);
2717   GNUNET_assert (dep->round <= step->round);
2718
2719 #ifdef GNUNET_EXTRA_LOGGING
2720   /* Make sure we have complete debugging information.
2721      Also checks that we don't screw up too badly
2722      constructing the task graph. */
2723   GNUNET_assert (NULL != step->debug_name);
2724   GNUNET_assert (NULL != dep->debug_name);
2725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2726               "Making step `%s' depend on `%s'\n",
2727               step->debug_name,
2728               dep->debug_name);
2729 #endif
2730
2731   if (dep->subordinates_cap == dep->subordinates_len)
2732   {
2733     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2734     GNUNET_array_grow (dep->subordinates,
2735                        dep->subordinates_cap,
2736                        target_size);
2737   }
2738
2739   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2740
2741   dep->subordinates[dep->subordinates_len] = step;
2742   dep->subordinates_len++;
2743
2744   step->pending_prereq++;
2745 }
2746
2747
2748 static struct Step *
2749 create_step (struct ConsensusSession *session, int round, int early_finishable)
2750 {
2751   struct Step *step;
2752   step = GNUNET_new (struct Step);
2753   step->session = session;
2754   step->round = round;
2755   step->early_finishable = early_finishable;
2756   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2757                                     session->steps_tail,
2758                                     step);
2759   return step;
2760 }
2761
2762
2763 /**
2764  * Construct the task graph for a single
2765  * gradecast.
2766  */
2767 static void
2768 construct_task_graph_gradecast (struct ConsensusSession *session,
2769                                 uint16_t rep,
2770                                 uint16_t lead,
2771                                 struct Step *step_before,
2772                                 struct Step *step_after)
2773 {
2774   uint16_t n = session->num_peers;
2775   uint16_t me = session->local_peer_idx;
2776
2777   uint16_t p1;
2778   uint16_t p2;
2779
2780   /* The task we're currently setting up. */
2781   struct TaskEntry task;
2782
2783   struct Step *step;
2784   struct Step *prev_step;
2785
2786   uint16_t round;
2787
2788   unsigned int k;
2789
2790   round = step_before->round + 1;
2791
2792   /* gcast step 1: leader disseminates */
2793
2794   step = create_step (session, round, GNUNET_YES);
2795
2796 #ifdef GNUNET_EXTRA_LOGGING
2797   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2798 #endif
2799   step_depend_on (step, step_before);
2800
2801   if (lead == me)
2802   {
2803     for (k = 0; k < n; k++)
2804     {
2805       if (k == me)
2806         continue;
2807       p1 = me;
2808       p2 = k;
2809       arrange_peers (&p1, &p2, n);
2810       task = ((struct TaskEntry) {
2811         .step = step,
2812         .start = task_start_reconcile,
2813         .cancel = task_cancel_reconcile,
2814         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2815       });
2816       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2817       put_task (session->taskmap, &task);
2818     }
2819     /* We run this task to make sure that the leader
2820        has the stored the SET_KIND_LEADER set of himself,
2821        so he can participate in the rest of the gradecast
2822        without the code having to handle any special cases. */
2823     task = ((struct TaskEntry) {
2824       .step = step,
2825       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2826       .start = task_start_reconcile,
2827       .cancel = task_cancel_reconcile,
2828     });
2829     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2830     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2831     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2832     put_task (session->taskmap, &task);
2833   }
2834   else
2835   {
2836     p1 = me;
2837     p2 = lead;
2838     arrange_peers (&p1, &p2, n);
2839     task = ((struct TaskEntry) {
2840       .step = step,
2841       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2842       .start = task_start_reconcile,
2843       .cancel = task_cancel_reconcile,
2844     });
2845     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2846     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2847     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2848     put_task (session->taskmap, &task);
2849   }
2850
2851   /* gcast phase 2: echo */
2852   prev_step = step;
2853   round += 1;
2854   step = create_step (session, round, GNUNET_YES);
2855 #ifdef GNUNET_EXTRA_LOGGING
2856   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2857 #endif
2858   step_depend_on (step, prev_step);
2859
2860   for (k = 0; k < n; k++)
2861   {
2862     p1 = k;
2863     p2 = me;
2864     arrange_peers (&p1, &p2, n);
2865     task = ((struct TaskEntry) {
2866       .step = step,
2867       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2868       .start = task_start_reconcile,
2869       .cancel = task_cancel_reconcile,
2870     });
2871     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2872     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2873     put_task (session->taskmap, &task);
2874   }
2875
2876   prev_step = step;
2877   /* Same round, since step only has local tasks */
2878   step = create_step (session, round, GNUNET_YES);
2879 #ifdef GNUNET_EXTRA_LOGGING
2880   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2881 #endif
2882   step_depend_on (step, prev_step);
2883
2884   arrange_peers (&p1, &p2, n);
2885   task = ((struct TaskEntry) {
2886     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2887     .step = step,
2888     .start = task_start_eval_echo
2889   });
2890   put_task (session->taskmap, &task);
2891
2892   prev_step = step;
2893   round += 1;
2894   step = create_step (session, round, GNUNET_YES);
2895 #ifdef GNUNET_EXTRA_LOGGING
2896   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2897 #endif
2898   step_depend_on (step, prev_step);
2899
2900   /* gcast phase 3: confirmation and grading */
2901   for (k = 0; k < n; k++)
2902   {
2903     p1 = k;
2904     p2 = me;
2905     arrange_peers (&p1, &p2, n);
2906     task = ((struct TaskEntry) {
2907       .step = step,
2908       .start = task_start_reconcile,
2909       .cancel = task_cancel_reconcile,
2910       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2911     });
2912     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2913     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2914     /* If there was at least one element in the echo round that was
2915        contested (i.e. it had no n-t majority), then we let the other peers
2916        know, and other peers let us know.  The contested flag for each peer is
2917        stored in the rfn. */
2918     task.cls.setop.transceive_contested = GNUNET_YES;
2919     put_task (session->taskmap, &task);
2920   }
2921
2922   prev_step = step;
2923   /* Same round, since step only has local tasks */
2924   step = create_step (session, round, GNUNET_YES);
2925 #ifdef GNUNET_EXTRA_LOGGING
2926   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2927 #endif
2928   step_depend_on (step, prev_step);
2929
2930   task = ((struct TaskEntry) {
2931     .step = step,
2932     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2933     .start = task_start_grade,
2934   });
2935   put_task (session->taskmap, &task);
2936
2937   step_depend_on (step_after, step);
2938 }
2939
2940
2941 static void
2942 construct_task_graph (struct ConsensusSession *session)
2943 {
2944   uint16_t n = session->num_peers;
2945   uint16_t t = n / 3;
2946
2947   uint16_t me = session->local_peer_idx;
2948
2949   /* The task we're currently setting up. */
2950   struct TaskEntry task;
2951
2952   /* Current leader */
2953   unsigned int lead;
2954
2955   struct Step *step;
2956   struct Step *prev_step;
2957
2958   unsigned int round = 0;
2959
2960   unsigned int i;
2961
2962   // XXX: introduce first step,
2963   // where we wait for all insert acks
2964   // from the set service
2965
2966   /* faster but brittle all-to-all */
2967
2968   // XXX: Not implemented yet
2969
2970   /* all-to-all step */
2971
2972   step = create_step (session, round, GNUNET_NO);
2973
2974 #ifdef GNUNET_EXTRA_LOGGING
2975   step->debug_name = GNUNET_strdup ("all to all");
2976 #endif
2977
2978   for (i = 0; i < n; i++)
2979   {
2980     uint16_t p1;
2981     uint16_t p2;
2982
2983     p1 = me;
2984     p2 = i;
2985     arrange_peers (&p1, &p2, n);
2986     task = ((struct TaskEntry) {
2987       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2988       .step = step,
2989       .start = task_start_reconcile,
2990       .cancel = task_cancel_reconcile,
2991     });
2992     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2993     task.cls.setop.output_set = task.cls.setop.input_set;
2994     task.cls.setop.do_not_remove = GNUNET_YES;
2995     put_task (session->taskmap, &task);
2996   }
2997
2998   round += 1;
2999   prev_step = step;
3000   step = create_step (session, round, GNUNET_NO);;
3001 #ifdef GNUNET_EXTRA_LOGGING
3002   step->debug_name = GNUNET_strdup ("all to all 2");
3003 #endif
3004   step_depend_on (step, prev_step);
3005
3006
3007   for (i = 0; i < n; i++)
3008   {
3009     uint16_t p1;
3010     uint16_t p2;
3011
3012     p1 = me;
3013     p2 = i;
3014     arrange_peers (&p1, &p2, n);
3015     task = ((struct TaskEntry) {
3016       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3017       .step = step,
3018       .start = task_start_reconcile,
3019       .cancel = task_cancel_reconcile,
3020     });
3021     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3022     task.cls.setop.output_set = task.cls.setop.input_set;
3023     task.cls.setop.do_not_remove = GNUNET_YES;
3024     put_task (session->taskmap, &task);
3025   }
3026
3027   round += 1;
3028
3029   prev_step = step;
3030   step = NULL;
3031
3032
3033
3034   /* Byzantine union */
3035
3036   /* sequential repetitions of the gradecasts */
3037   for (i = 0; i < t + 1; i++)
3038   {
3039     struct Step *step_rep_start;
3040     struct Step *step_rep_end;
3041
3042     /* Every repetition is in a separate round. */
3043     step_rep_start = create_step (session, round, GNUNET_YES);
3044 #ifdef GNUNET_EXTRA_LOGGING
3045     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3046 #endif
3047
3048     step_depend_on (step_rep_start, prev_step);
3049
3050     /* gradecast has three rounds */
3051     round += 3;
3052     step_rep_end = create_step (session, round, GNUNET_YES);
3053 #ifdef GNUNET_EXTRA_LOGGING
3054     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3055 #endif
3056
3057     /* parallel gradecasts */
3058     for (lead = 0; lead < n; lead++)
3059       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3060
3061     task = ((struct TaskEntry) {
3062       .step = step_rep_end,
3063       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3064       .start = task_start_apply_round,
3065     });
3066     put_task (session->taskmap, &task);
3067
3068     prev_step = step_rep_end;
3069   }
3070
3071  /* There is no next gradecast round, thus the final
3072     start step is the overall end step of the gradecasts */
3073   round += 1;
3074   step = create_step (session, round, GNUNET_NO);
3075 #ifdef GNUNET_EXTRA_LOGGING
3076   GNUNET_asprintf (&step->debug_name, "finish");
3077 #endif
3078   step_depend_on (step, prev_step);
3079
3080   task = ((struct TaskEntry) {
3081     .step = step,
3082     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3083     .start = task_start_finish,
3084   });
3085   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3086
3087   put_task (session->taskmap, &task);
3088 }
3089
3090
3091
3092 /**
3093  * Check join message.
3094  *
3095  * @param cls session of client that sent the message
3096  * @param m message sent by the client
3097  * @return #GNUNET_OK if @a m is well-formed
3098  */
3099 static int
3100 check_client_join (void *cls,
3101                    const struct GNUNET_CONSENSUS_JoinMessage *m)
3102 {
3103   uint32_t listed_peers = ntohl (m->num_peers);
3104
3105   if ( (ntohs (m->header.size) - sizeof (*m)) !=
3106        listed_peers * sizeof (struct GNUNET_PeerIdentity))
3107   {
3108     GNUNET_break (0);
3109     return GNUNET_SYSERR;
3110   }
3111   return GNUNET_OK;
3112 }
3113
3114
3115 /**
3116  * Called when a client wants to join a consensus session.
3117  *
3118  * @param cls session of client that sent the message
3119  * @param m message sent by the client
3120  */
3121 static void
3122 handle_client_join (void *cls,
3123                     const struct GNUNET_CONSENSUS_JoinMessage *m)
3124 {
3125   struct ConsensusSession *session = cls;
3126   struct ConsensusSession *other_session;
3127
3128   initialize_session_peer_list (session,
3129                                 m);
3130   compute_global_id (session,
3131                      &m->session_id);
3132
3133   /* Check if some local client already owns the session.
3134      It is only legal to have a session with an existing global id
3135      if all other sessions with this global id are finished.*/
3136   for (other_session = sessions_head;
3137        NULL != other_session;
3138        other_session = other_session->next)
3139   {
3140     if ( (other_session != session) &&
3141          (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3142                                        &other_session->global_id)) )
3143       break;
3144   }
3145
3146   session->conclude_deadline
3147     = GNUNET_TIME_absolute_ntoh (m->deadline);
3148   session->conclude_start
3149     = GNUNET_TIME_absolute_ntoh (m->start);
3150   session->local_peer_idx = get_peer_idx (&my_peer,
3151                                           session);
3152   GNUNET_assert (-1 != session->local_peer_idx);
3153
3154   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3155               "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3156               GNUNET_h2s (&m->session_id),
3157               session->num_peers,
3158               session->local_peer_idx,
3159               GNUNET_STRINGS_relative_time_to_string
3160               (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3161                                                     session->conclude_deadline),
3162                GNUNET_YES));
3163
3164   session->set_listener
3165     = GNUNET_SET_listen (cfg,
3166                          GNUNET_SET_OPERATION_UNION,
3167                          &session->global_id,
3168                          &set_listen_cb,
3169                          session);
3170
3171   session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3172                                                           GNUNET_NO);
3173   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3174                                                            GNUNET_NO);
3175   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3176                                                            GNUNET_NO);
3177   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3178                                                           GNUNET_NO);
3179
3180   {
3181     struct SetEntry *client_set;
3182
3183     client_set = GNUNET_new (struct SetEntry);
3184     client_set->h = GNUNET_SET_create (cfg,
3185                                        GNUNET_SET_OPERATION_UNION);
3186     struct SetHandle *sh = GNUNET_new (struct SetHandle);
3187     sh->h = client_set->h;
3188     GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3189                                  session->set_handles_tail,
3190                                  sh);
3191     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3192     put_set (session,
3193              client_set);
3194   }
3195
3196   session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3197                                                  int);
3198
3199   /* Just construct the task graph,
3200      but don't run anything until the client calls conclude. */
3201   construct_task_graph (session);
3202   GNUNET_SERVICE_client_continue (session->client);
3203 }
3204
3205
3206 static void
3207 client_insert_done (void *cls)
3208 {
3209   // FIXME: implement
3210 }
3211
3212
3213 /**
3214  * Called when a client performs an insert operation.
3215  *
3216  * @param cls client handle
3217  * @param msg message sent by the client
3218  * @return #GNUNET_OK (always well-formed)
3219  */
3220 static int
3221 check_client_insert (void *cls,
3222                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3223 {
3224   return GNUNET_OK;
3225 }
3226
3227
3228 /**
3229  * Called when a client performs an insert operation.
3230  *
3231  * @param cls client handle
3232  * @param msg message sent by the client
3233  */
3234 static void
3235 handle_client_insert (void *cls,
3236                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3237 {
3238   struct ConsensusSession *session = cls;
3239   ssize_t element_size;
3240   struct GNUNET_SET_Handle *initial_set;
3241   struct ConsensusElement *ce;
3242
3243   if (GNUNET_YES == session->conclude_started)
3244   {
3245     GNUNET_break (0);
3246     GNUNET_SERVICE_client_drop (session->client);
3247     return;
3248   }
3249
3250   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3251   ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3252   GNUNET_memcpy (&ce[1], &msg[1], element_size);
3253   ce->payload_type = msg->element_type;
3254
3255   struct GNUNET_SET_Element element = {
3256     .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3257     .size = sizeof (struct ConsensusElement) + element_size,
3258     .data = ce,
3259   };
3260
3261   {
3262     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3263     struct SetEntry *entry;
3264
3265     entry = lookup_set (session,
3266                         &key);
3267     GNUNET_assert (NULL != entry);
3268     initial_set = entry->h;
3269   }
3270
3271   session->num_client_insert_pending++;
3272   GNUNET_SET_add_element (initial_set,
3273                           &element,
3274                           &client_insert_done,
3275                           session);
3276
3277 #ifdef GNUNET_EXTRA_LOGGING
3278   {
3279     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3280                 "P%u: element %s added\n",
3281                 session->local_peer_idx,
3282                 debug_str_element (&element));
3283   }
3284 #endif
3285   GNUNET_free (ce);
3286   GNUNET_SERVICE_client_continue (session->client);
3287 }
3288
3289
3290 /**
3291  * Called when a client performs the conclude operation.
3292  *
3293  * @param cls client handle
3294  * @param message message sent by the client
3295  */
3296 static void
3297 handle_client_conclude (void *cls,
3298                         const struct GNUNET_MessageHeader *message)
3299 {
3300   struct ConsensusSession *session = cls;
3301
3302   if (GNUNET_YES == session->conclude_started)
3303   {
3304     /* conclude started twice */
3305     GNUNET_break (0);
3306     GNUNET_SERVICE_client_drop (session->client);
3307     return;
3308   }
3309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3310               "conclude requested\n");
3311   session->conclude_started = GNUNET_YES;
3312   install_step_timeouts (session);
3313   run_ready_steps (session);
3314   GNUNET_SERVICE_client_continue (session->client);
3315 }
3316
3317
3318 /**
3319  * Called to clean up, after a shutdown has been requested.
3320  *
3321  * @param cls closure
3322  */
3323 static void
3324 shutdown_task (void *cls)
3325 {
3326   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3327               "shutting down\n");
3328   GNUNET_STATISTICS_destroy (statistics,
3329                              GNUNET_NO);
3330   statistics = NULL;
3331 }
3332
3333
3334 /**
3335  * Start processing consensus requests.
3336  *
3337  * @param cls closure
3338  * @param c configuration to use
3339  * @param service the initialized service
3340  */
3341 static void
3342 run (void *cls,
3343      const struct GNUNET_CONFIGURATION_Handle *c,
3344      struct GNUNET_SERVICE_Handle *service)
3345 {
3346   cfg = c;
3347   if (GNUNET_OK !=
3348       GNUNET_CRYPTO_get_peer_identity (cfg,
3349                                        &my_peer))
3350   {
3351     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3352                 "Could not retrieve host identity\n");
3353     GNUNET_SCHEDULER_shutdown ();
3354     return;
3355   }
3356   statistics = GNUNET_STATISTICS_create ("consensus",
3357                                          cfg);
3358   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3359                                  NULL);
3360 }
3361
3362
3363 /**
3364  * Callback called when a client connects to the service.
3365  *
3366  * @param cls closure for the service
3367  * @param c the new client that connected to the service
3368  * @param mq the message queue used to send messages to the client
3369  * @return @a c
3370  */
3371 static void *
3372 client_connect_cb (void *cls,
3373                    struct GNUNET_SERVICE_Client *c,
3374                    struct GNUNET_MQ_Handle *mq)
3375 {
3376   struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3377
3378   session->client = c;
3379   session->client_mq = mq;
3380   GNUNET_CONTAINER_DLL_insert (sessions_head,
3381                                sessions_tail,
3382                                session);
3383   return session;
3384 }
3385
3386
3387 /**
3388  * Callback called when a client disconnected from the service
3389  *
3390  * @param cls closure for the service
3391  * @param c the client that disconnected
3392  * @param internal_cls should be equal to @a c
3393  */
3394 static void
3395 client_disconnect_cb (void *cls,
3396                       struct GNUNET_SERVICE_Client *c,
3397                       void *internal_cls)
3398 {
3399   struct ConsensusSession *session = internal_cls;
3400
3401   if (NULL != session->set_listener)
3402   {
3403     GNUNET_SET_listen_cancel (session->set_listener);
3404     session->set_listener = NULL;
3405   }
3406   GNUNET_CONTAINER_DLL_remove (sessions_head,
3407                                sessions_tail,
3408                                session);
3409
3410   while (session->set_handles_head)
3411   {
3412     struct SetHandle *sh = session->set_handles_head;
3413     session->set_handles_head = sh->next;
3414     GNUNET_SET_destroy (sh->h);
3415     GNUNET_free (sh);
3416   }
3417   GNUNET_free (session);
3418 }
3419
3420
3421 /**
3422  * Define "main" method using service macro.
3423  */
3424 GNUNET_SERVICE_MAIN
3425 ("consensus",
3426  GNUNET_SERVICE_OPTION_NONE,
3427  &run,
3428  &client_connect_cb,
3429  &client_disconnect_cb,
3430  NULL,
3431  GNUNET_MQ_hd_fixed_size (client_conclude,
3432                           GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3433                           struct GNUNET_MessageHeader,
3434                           NULL),
3435  GNUNET_MQ_hd_var_size (client_insert,
3436                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3437                         struct GNUNET_CONSENSUS_ElementMessage,
3438                         NULL),
3439  GNUNET_MQ_hd_var_size (client_join,
3440                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3441                         struct GNUNET_CONSENSUS_JoinMessage,
3442                         NULL),
3443  GNUNET_MQ_handler_end ());
3444
3445 /* end of gnunet-service-consensus.c */