RPS: Change cosmetics - fix indentation
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14      
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file consensus/gnunet-service-consensus.c
21  * @brief multi-peer set reconciliation
22  * @author Florian Dold <flo@dold.me>
23  */
24
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_block_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_applications.h"
30 #include "gnunet_set_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_consensus_service.h"
33 #include "consensus_protocol.h"
34 #include "consensus.h"
35
36
37 enum ReferendumVote
38 {
39   /**
40    * Vote that nothing should change.
41    * This option is never voted explicitly.
42    */
43   VOTE_STAY = 0,
44   /**
45    * Vote that an element should be added.
46    */
47   VOTE_ADD = 1,
48   /**
49    * Vote that an element should be removed.
50    */
51   VOTE_REMOVE = 2,
52 };
53
54
55 enum EarlyStoppingPhase
56 {
57   EARLY_STOPPING_NONE = 0,
58   EARLY_STOPPING_ONE_MORE = 1,
59   EARLY_STOPPING_DONE = 2,
60 };
61
62
63 GNUNET_NETWORK_STRUCT_BEGIN
64
65 /**
66  * Tuple of integers that together
67  * identify a task uniquely.
68  */
69 struct TaskKey {
70   /**
71    * A value from 'enum PhaseKind'.
72    */
73   uint16_t kind GNUNET_PACKED;
74
75   /**
76    * Number of the first peer
77    * in canonical order.
78    */
79   int16_t peer1 GNUNET_PACKED;
80
81   /**
82    * Number of the second peer in canonical order.
83    */
84   int16_t peer2 GNUNET_PACKED;
85
86   /**
87    * Repetition of the gradecast phase.
88    */
89   int16_t repetition GNUNET_PACKED;
90
91   /**
92    * Leader in the gradecast phase.
93    *
94    * Can be different from both peer1 and peer2.
95    */
96   int16_t leader GNUNET_PACKED;
97 };
98
99
100
101 struct SetKey
102 {
103   int set_kind GNUNET_PACKED;
104   int k1 GNUNET_PACKED;
105   int k2 GNUNET_PACKED;
106 };
107
108
109 struct SetEntry
110 {
111   struct SetKey key;
112   struct GNUNET_SET_Handle *h;
113   /**
114    * GNUNET_YES if the set resulted
115    * from applying a referendum with contested
116    * elements.
117    */
118   int is_contested;
119 };
120
121
122 struct DiffKey
123 {
124   int diff_kind GNUNET_PACKED;
125   int k1 GNUNET_PACKED;
126   int k2 GNUNET_PACKED;
127 };
128
129 struct RfnKey
130 {
131   int rfn_kind GNUNET_PACKED;
132   int k1 GNUNET_PACKED;
133   int k2 GNUNET_PACKED;
134 };
135
136
137 GNUNET_NETWORK_STRUCT_END
138
139 enum PhaseKind
140 {
141   PHASE_KIND_ALL_TO_ALL,
142   PHASE_KIND_ALL_TO_ALL_2,
143   PHASE_KIND_GRADECAST_LEADER,
144   PHASE_KIND_GRADECAST_ECHO,
145   PHASE_KIND_GRADECAST_ECHO_GRADE,
146   PHASE_KIND_GRADECAST_CONFIRM,
147   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
148   /**
149    * Apply a repetition of the all-to-all
150    * gradecast to the current set.
151    */
152   PHASE_KIND_APPLY_REP,
153   PHASE_KIND_FINISH,
154 };
155
156
157 enum SetKind
158 {
159   SET_KIND_NONE = 0,
160   SET_KIND_CURRENT,
161   /**
162    * Last result set from a gradecast
163    */
164   SET_KIND_LAST_GRADECAST,
165   SET_KIND_LEADER_PROPOSAL,
166   SET_KIND_ECHO_RESULT,
167 };
168
169 enum DiffKind
170 {
171   DIFF_KIND_NONE = 0,
172   DIFF_KIND_LEADER_PROPOSAL,
173   DIFF_KIND_LEADER_CONSENSUS,
174   DIFF_KIND_GRADECAST_RESULT,
175 };
176
177 enum RfnKind
178 {
179   RFN_KIND_NONE = 0,
180   RFN_KIND_ECHO,
181   RFN_KIND_CONFIRM,
182   RFN_KIND_GRADECAST_RESULT
183 };
184
185
186 struct SetOpCls
187 {
188   struct SetKey input_set;
189
190   struct SetKey output_set;
191   struct RfnKey output_rfn;
192   struct DiffKey output_diff;
193
194   int do_not_remove;
195
196   int transceive_contested;
197
198   struct GNUNET_SET_OperationHandle *op;
199 };
200
201
202 struct FinishCls
203 {
204   struct SetKey input_set;
205 };
206
207 /**
208  * Closure for both @a start_task
209  * and @a cancel_task.
210  */
211 union TaskFuncCls
212 {
213   struct SetOpCls setop;
214   struct FinishCls finish;
215 };
216
217 struct TaskEntry;
218
219 typedef void (*TaskFunc) (struct TaskEntry *task);
220
221 /*
222  * Node in the consensus task graph.
223  */
224 struct TaskEntry
225 {
226   struct TaskKey key;
227
228   struct Step *step;
229
230   int is_started;
231
232   int is_finished;
233
234   TaskFunc start;
235   TaskFunc cancel;
236
237   union TaskFuncCls cls;
238 };
239
240
241 struct Step
242 {
243   /**
244    * All steps of one session are in a
245    * linked list for easier deallocation.
246    */
247   struct Step *prev;
248
249   /**
250    * All steps of one session are in a
251    * linked list for easier deallocation.
252    */
253   struct Step *next;
254
255   struct ConsensusSession *session;
256
257   /**
258    * Tasks that this step is composed of.
259    */
260   struct TaskEntry **tasks;
261   unsigned int tasks_len;
262   unsigned int tasks_cap;
263
264   unsigned int finished_tasks;
265
266   /*
267    * Tasks that have this task as dependency.
268    *
269    * We store pointers to subordinates rather
270    * than to prerequisites since it makes
271    * tracking the readiness of a task easier.
272    */
273   struct Step **subordinates;
274   unsigned int subordinates_len;
275   unsigned int subordinates_cap;
276
277   /**
278    * Counter for the prerequisites of
279    * this step.
280    */
281   size_t pending_prereq;
282
283   /*
284    * Task that will run this step despite
285    * any pending prerequisites.
286    */
287   struct GNUNET_SCHEDULER_Task *timeout_task;
288
289   unsigned int is_running;
290
291   unsigned int is_finished;
292
293   /*
294    * Synchrony round of the task.
295    * Determines the deadline for the task.
296    */
297   unsigned int round;
298
299   /**
300    * Human-readable name for
301    * the task, used for debugging.
302    */
303   char *debug_name;
304
305   /**
306    * When we're doing an early finish, how should this step be
307    * treated?
308    * If GNUNET_YES, the step will be marked as finished
309    * without actually running its tasks.
310    * Otherwise, the step will still be run even after
311    * an early finish.
312    *
313    * Note that a task may never be finished early if
314    * it is already running.
315    */
316   int early_finishable;
317 };
318
319
320 struct RfnElementInfo
321 {
322   const struct GNUNET_SET_Element *element;
323
324   /*
325    * GNUNET_YES if the peer votes for the proposal.
326    */
327   int *votes;
328
329   /**
330    * Proposal for this element,
331    * can only be VOTE_ADD or VOTE_REMOVE.
332    */
333   enum ReferendumVote proposal;
334 };
335
336
337 struct ReferendumEntry
338 {
339   struct RfnKey key;
340
341   /*
342    * Elements where there is at least one proposed change.
343    *
344    * Maps the hash of the GNUNET_SET_Element
345    * to 'struct RfnElementInfo'.
346    */
347   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
348
349   unsigned int num_peers;
350
351   /**
352    * Stores, for every peer in the session,
353    * whether the peer finished the whole referendum.
354    *
355    * Votes from peers are only counted if they're
356    * marked as commited (#GNUNET_YES) in the referendum.
357    *
358    * Otherwise (#GNUNET_NO), the requested changes are
359    * not counted for majority votes or thresholds.
360    */
361   int *peer_commited;
362
363
364   /**
365    * Contestation state of the peer.  If a peer is contested, the values it
366    * contributed are still counted for applying changes, but the grading is
367    * affected.
368    */
369   int *peer_contested;
370 };
371
372
373 struct DiffElementInfo
374 {
375   const struct GNUNET_SET_Element *element;
376
377   /**
378    * Positive weight for 'add', negative
379    * weights for 'remove'.
380    */
381   int weight;
382 };
383
384
385 /**
386  * Weighted diff.
387  */
388 struct DiffEntry
389 {
390   struct DiffKey key;
391   struct GNUNET_CONTAINER_MultiHashMap *changes;
392 };
393
394 struct SetHandle
395 {
396   struct SetHandle *prev;
397   struct SetHandle *next;
398
399   struct GNUNET_SET_Handle *h;
400 };
401
402
403
404 /**
405  * A consensus session consists of one local client and the remote authorities.
406  */
407 struct ConsensusSession
408 {
409   /**
410    * Consensus sessions are kept in a DLL.
411    */
412   struct ConsensusSession *next;
413
414   /**
415    * Consensus sessions are kept in a DLL.
416    */
417   struct ConsensusSession *prev;
418
419   unsigned int num_client_insert_pending;
420
421   struct GNUNET_CONTAINER_MultiHashMap *setmap;
422   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
423   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
424
425   /**
426    * Array of peers with length 'num_peers'.
427    */
428   int *peers_blacklisted;
429
430   /*
431    * Mapping from (hashed) TaskKey to TaskEntry.
432    *
433    * We map the application_id for a round to the task that should be
434    * executed, so we don't have to go through all task whenever we get
435    * an incoming set op request.
436    */
437   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
438
439   struct Step *steps_head;
440   struct Step *steps_tail;
441
442   int conclude_started;
443
444   int conclude_done;
445
446   /**
447   * Global consensus identification, computed
448   * from the session id and participating authorities.
449   */
450   struct GNUNET_HashCode global_id;
451
452   /**
453    * Client that inhabits the session
454    */
455   struct GNUNET_SERVICE_Client *client;
456
457   /**
458    * Queued messages to the client.
459    */
460   struct GNUNET_MQ_Handle *client_mq;
461
462   /**
463    * Time when the conclusion of the consensus should begin.
464    */
465   struct GNUNET_TIME_Absolute conclude_start;
466
467   /**
468    * Timeout for all rounds together, single rounds will schedule a timeout task
469    * with a fraction of the conclude timeout.
470    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
471    */
472   struct GNUNET_TIME_Absolute conclude_deadline;
473
474   struct GNUNET_PeerIdentity *peers;
475
476   /**
477    * Number of other peers in the consensus.
478    */
479   unsigned int num_peers;
480
481   /**
482    * Index of the local peer in the peers array
483    */
484   unsigned int local_peer_idx;
485
486   /**
487    * Listener for requests from other peers.
488    * Uses the session's global id as app id.
489    */
490   struct GNUNET_SET_ListenHandle *set_listener;
491
492   /**
493    * State of our early stopping scheme.
494    */
495   int early_stopping;
496
497   /**
498    * Our set size from the first round.
499    */
500   uint64_t first_size;
501
502   uint64_t *first_sizes_received;
503
504   /**
505    * Bounded Eppstein lower bound.
506    */
507   uint64_t lower_bound;
508
509   struct SetHandle *set_handles_head;
510   struct SetHandle *set_handles_tail;
511 };
512
513 /**
514  * Linked list of sessions this peer participates in.
515  */
516 static struct ConsensusSession *sessions_head;
517
518 /**
519  * Linked list of sessions this peer participates in.
520  */
521 static struct ConsensusSession *sessions_tail;
522
523 /**
524  * Configuration of the consensus service.
525  */
526 static const struct GNUNET_CONFIGURATION_Handle *cfg;
527
528 /**
529  * Peer that runs this service.
530  */
531 static struct GNUNET_PeerIdentity my_peer;
532
533 /**
534  * Statistics handle.
535  */
536 struct GNUNET_STATISTICS_Handle *statistics;
537
538
539 static void
540 finish_task (struct TaskEntry *task);
541
542
543 static void
544 run_ready_steps (struct ConsensusSession *session);
545
546
547 static const char *
548 phasename (uint16_t phase)
549 {
550   switch (phase)
551   {
552     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
553     case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
554     case PHASE_KIND_FINISH: return "FINISH";
555     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
556     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
557     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
558     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
559     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
560     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
561     default: return "(unknown)";
562   }
563 }
564
565
566 static const char *
567 setname (uint16_t kind)
568 {
569   switch (kind)
570   {
571     case SET_KIND_CURRENT: return "CURRENT";
572     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
573     case SET_KIND_NONE: return "NONE";
574     default: return "(unknown)";
575   }
576 }
577
578 static const char *
579 rfnname (uint16_t kind)
580 {
581   switch (kind)
582   {
583     case RFN_KIND_NONE: return "NONE";
584     case RFN_KIND_ECHO: return "ECHO";
585     case RFN_KIND_CONFIRM: return "CONFIRM";
586     default: return "(unknown)";
587   }
588 }
589
590 static const char *
591 diffname (uint16_t kind)
592 {
593   switch (kind)
594   {
595     case DIFF_KIND_NONE: return "NONE";
596     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
597     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
598     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
599     default: return "(unknown)";
600   }
601 }
602
603 #ifdef GNUNET_EXTRA_LOGGING
604
605
606 static const char *
607 debug_str_element (const struct GNUNET_SET_Element *el)
608 {
609   struct GNUNET_HashCode hash;
610
611   GNUNET_SET_element_hash (el, &hash);
612
613   return GNUNET_h2s (&hash);
614 }
615
616 static const char *
617 debug_str_task_key (struct TaskKey *tk)
618 {
619   static char buf[256];
620
621   snprintf (buf, sizeof (buf),
622             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
623             phasename (tk->kind), tk->peer1, tk->peer2,
624             tk->leader, tk->repetition);
625
626   return buf;
627 }
628
629 static const char *
630 debug_str_diff_key (struct DiffKey *dk)
631 {
632   static char buf[256];
633
634   snprintf (buf, sizeof (buf),
635             "DiffKey kind=%s, k1=%d, k2=%d",
636             diffname (dk->diff_kind), dk->k1, dk->k2);
637
638   return buf;
639 }
640
641 static const char *
642 debug_str_set_key (const struct SetKey *sk)
643 {
644   static char buf[256];
645
646   snprintf (buf, sizeof (buf),
647             "SetKey kind=%s, k1=%d, k2=%d",
648             setname (sk->set_kind), sk->k1, sk->k2);
649
650   return buf;
651 }
652
653
654 static const char *
655 debug_str_rfn_key (const struct RfnKey *rk)
656 {
657   static char buf[256];
658
659   snprintf (buf, sizeof (buf),
660             "RfnKey kind=%s, k1=%d, k2=%d",
661             rfnname (rk->rfn_kind), rk->k1, rk->k2);
662
663   return buf;
664 }
665
666 #endif /* GNUNET_EXTRA_LOGGING */
667
668
669 /**
670  * Send the final result set of the consensus to the client, element by
671  * element.
672  *
673  * @param cls closure
674  * @param element the current element, NULL if all elements have been
675  *        iterated over
676  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
677  */
678 static int
679 send_to_client_iter (void *cls,
680                      const struct GNUNET_SET_Element *element)
681 {
682   struct TaskEntry *task = (struct TaskEntry *) cls;
683   struct ConsensusSession *session = task->step->session;
684   struct GNUNET_MQ_Envelope *ev;
685
686   if (NULL != element)
687   {
688     struct GNUNET_CONSENSUS_ElementMessage *m;
689     const struct ConsensusElement *ce;
690
691     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
692     ce = element->data;
693
694     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n", (unsigned) ce->marker);
695
696     if (0 != ce->marker)
697       return GNUNET_YES;
698
699     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
700                 "P%d: sending element %s to client\n",
701                 session->local_peer_idx,
702                 debug_str_element (element));
703
704     ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
705                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
706     m->element_type = ce->payload_type;
707     GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
708     GNUNET_MQ_send (session->client_mq, ev);
709   }
710   else
711   {
712     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713                 "P%d: finished iterating elements for client\n",
714                 session->local_peer_idx);
715     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
716     GNUNET_MQ_send (session->client_mq, ev);
717   }
718   return GNUNET_YES;
719 }
720
721
722 static struct SetEntry *
723 lookup_set (struct ConsensusSession *session, struct SetKey *key)
724 {
725   struct GNUNET_HashCode hash;
726
727   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728               "P%u: looking up set {%s}\n",
729               session->local_peer_idx,
730               debug_str_set_key (key));
731
732   GNUNET_assert (SET_KIND_NONE != key->set_kind);
733   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
734   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
735 }
736
737
738 static struct DiffEntry *
739 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
740 {
741   struct GNUNET_HashCode hash;
742
743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744               "P%u: looking up diff {%s}\n",
745               session->local_peer_idx,
746               debug_str_diff_key (key));
747
748   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
749   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
750   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
751 }
752
753
754 static struct ReferendumEntry *
755 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
756 {
757   struct GNUNET_HashCode hash;
758
759   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760               "P%u: looking up rfn {%s}\n",
761               session->local_peer_idx,
762               debug_str_rfn_key (key));
763
764   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
765   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
766   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
767 }
768
769
770 static void
771 diff_insert (struct DiffEntry *diff,
772              int weight,
773              const struct GNUNET_SET_Element *element)
774 {
775   struct DiffElementInfo *di;
776   struct GNUNET_HashCode hash;
777
778   GNUNET_assert ( (1 == weight) || (-1 == weight));
779
780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
781               "diff_insert with element size %u\n",
782               element->size);
783
784   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785               "hashing element\n");
786
787   GNUNET_SET_element_hash (element, &hash);
788
789   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
790               "hashed element\n");
791
792   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
793
794   if (NULL == di)
795   {
796     di = GNUNET_new (struct DiffElementInfo);
797     di->element = GNUNET_SET_element_dup (element);
798     GNUNET_assert (GNUNET_OK ==
799                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
800                                                       &hash, di,
801                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
802   }
803
804   di->weight = weight;
805 }
806
807
808 static void
809 rfn_commit (struct ReferendumEntry *rfn,
810             uint16_t commit_peer)
811 {
812   GNUNET_assert (commit_peer < rfn->num_peers);
813
814   rfn->peer_commited[commit_peer] = GNUNET_YES;
815 }
816
817
818 static void
819 rfn_contest (struct ReferendumEntry *rfn,
820              uint16_t contested_peer)
821 {
822   GNUNET_assert (contested_peer < rfn->num_peers);
823
824   rfn->peer_contested[contested_peer] = GNUNET_YES;
825 }
826
827
828 static uint16_t
829 rfn_noncontested (struct ReferendumEntry *rfn)
830 {
831   uint16_t i;
832   uint16_t ret;
833
834   ret = 0;
835   for (i = 0; i < rfn->num_peers; i++)
836     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
837       ret++;
838
839   return ret;
840 }
841
842
843 static void
844 rfn_vote (struct ReferendumEntry *rfn,
845           uint16_t voting_peer,
846           enum ReferendumVote vote,
847           const struct GNUNET_SET_Element *element)
848 {
849   struct RfnElementInfo *ri;
850   struct GNUNET_HashCode hash;
851
852   GNUNET_assert (voting_peer < rfn->num_peers);
853
854   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
855      since VOTE_KEEP is implicit in not voting. */
856   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
857
858   GNUNET_SET_element_hash (element, &hash);
859   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
860
861   if (NULL == ri)
862   {
863     ri = GNUNET_new (struct RfnElementInfo);
864     ri->element = GNUNET_SET_element_dup (element);
865     ri->votes = GNUNET_new_array (rfn->num_peers, int);
866     GNUNET_assert (GNUNET_OK ==
867                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
868                                                       &hash, ri,
869                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
870   }
871
872   ri->votes[voting_peer] = GNUNET_YES;
873   ri->proposal = vote;
874 }
875
876
877 static uint16_t
878 task_other_peer (struct TaskEntry *task)
879 {
880   uint16_t me = task->step->session->local_peer_idx;
881   if (task->key.peer1 == me)
882     return task->key.peer2;
883   return task->key.peer1;
884 }
885
886
887 static int
888 cmp_uint64_t (const void *pa, const void *pb)
889 {
890   uint64_t a = *(uint64_t *) pa;
891   uint64_t b = *(uint64_t *) pb;
892
893   if (a == b)
894     return 0;
895   if (a < b)
896     return -1;
897   return 1;
898 }
899
900
901 /**
902  * Callback for set operation results. Called for each element
903  * in the result set.
904  *
905  * @param cls closure
906  * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
907  * @param current_size current set size
908  * @param status see enum GNUNET_SET_Status
909  */
910 static void
911 set_result_cb (void *cls,
912                const struct GNUNET_SET_Element *element,
913                uint64_t current_size,
914                enum GNUNET_SET_Status status)
915 {
916   struct TaskEntry *task = cls;
917   struct ConsensusSession *session = task->step->session;
918   struct SetEntry *output_set = NULL;
919   struct DiffEntry *output_diff = NULL;
920   struct ReferendumEntry *output_rfn = NULL;
921   unsigned int other_idx;
922   struct SetOpCls *setop;
923   const struct ConsensusElement *consensus_element = NULL;
924
925   if (NULL != element)
926   {
927     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928                 "P%u: got element of type %u, status %u\n",
929                 session->local_peer_idx,
930                 (unsigned) element->element_type,
931                 (unsigned) status);
932     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
933     consensus_element = element->data;
934   }
935
936   setop = &task->cls.setop;
937
938
939   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
940               "P%u: got set result for {%s}, status %u\n",
941               session->local_peer_idx,
942               debug_str_task_key (&task->key),
943               status);
944
945   if (GNUNET_NO == task->is_started)
946   {
947     GNUNET_break_op (0);
948     return;
949   }
950
951   if (GNUNET_YES == task->is_finished)
952   {
953     GNUNET_break_op (0);
954     return;
955   }
956
957   other_idx = task_other_peer (task);
958
959   if (SET_KIND_NONE != setop->output_set.set_kind)
960   {
961     output_set = lookup_set (session, &setop->output_set);
962     GNUNET_assert (NULL != output_set);
963   }
964
965   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
966   {
967     output_diff = lookup_diff (session, &setop->output_diff);
968     GNUNET_assert (NULL != output_diff);
969   }
970
971   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
972   {
973     output_rfn = lookup_rfn (session, &setop->output_rfn);
974     GNUNET_assert (NULL != output_rfn);
975   }
976
977   if (GNUNET_YES == session->peers_blacklisted[other_idx])
978   {
979     /* Peer might have been blacklisted
980        by a gradecast running in parallel, ignore elements from now */
981     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
982       return;
983     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
984       return;
985   }
986
987   if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
988   {
989     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990                 "P%u: got some marker\n",
991                   session->local_peer_idx);
992     if ( (GNUNET_YES == setop->transceive_contested) &&
993          (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
994     {
995       GNUNET_assert (NULL != output_rfn);
996       rfn_contest (output_rfn, task_other_peer (task));
997       return;
998     }
999
1000     if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1001     {
1002
1003       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004                   "P%u: got size marker\n",
1005                   session->local_peer_idx);
1006
1007
1008       struct ConsensusSizeElement *cse = (void *) consensus_element;
1009
1010       if (cse->sender_index == other_idx)
1011       {
1012         if (NULL == session->first_sizes_received)
1013           session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1014         session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1015
1016         uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1017         qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1018         session->lower_bound = copy[session->num_peers / 3 + 1];
1019         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020                     "P%u: lower bound %llu\n",
1021                     session->local_peer_idx,
1022                     (long long) session->lower_bound);
1023         GNUNET_free (copy);
1024       }
1025       return;
1026     }
1027
1028     return;
1029   }
1030
1031   switch (status)
1032   {
1033     case GNUNET_SET_STATUS_ADD_LOCAL:
1034       GNUNET_assert (NULL != consensus_element);
1035       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1036                   "Adding element in Task {%s}\n",
1037                   debug_str_task_key (&task->key));
1038       if (NULL != output_set)
1039       {
1040         // FIXME: record pending adds, use callback
1041         GNUNET_SET_add_element (output_set->h,
1042                                 element,
1043                                 NULL,
1044                                 NULL);
1045 #ifdef GNUNET_EXTRA_LOGGING
1046         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1047                     "P%u: adding element %s into set {%s} of task {%s}\n",
1048                     session->local_peer_idx,
1049                     debug_str_element (element),
1050                     debug_str_set_key (&setop->output_set),
1051                     debug_str_task_key (&task->key));
1052 #endif
1053       }
1054       if (NULL != output_diff)
1055       {
1056         diff_insert (output_diff, 1, element);
1057 #ifdef GNUNET_EXTRA_LOGGING
1058         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059                     "P%u: adding element %s into diff {%s} of task {%s}\n",
1060                     session->local_peer_idx,
1061                     debug_str_element (element),
1062                     debug_str_diff_key (&setop->output_diff),
1063                     debug_str_task_key (&task->key));
1064 #endif
1065       }
1066       if (NULL != output_rfn)
1067       {
1068         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1069 #ifdef GNUNET_EXTRA_LOGGING
1070         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
1072                     session->local_peer_idx,
1073                     debug_str_element (element),
1074                     debug_str_rfn_key (&setop->output_rfn),
1075                     debug_str_task_key (&task->key));
1076 #endif
1077       }
1078       // XXX: add result to structures in task
1079       break;
1080     case GNUNET_SET_STATUS_ADD_REMOTE:
1081       GNUNET_assert (NULL != consensus_element);
1082       if (GNUNET_YES == setop->do_not_remove)
1083         break;
1084       if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1085         break;
1086       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1087                   "Removing element in Task {%s}\n",
1088                   debug_str_task_key (&task->key));
1089       if (NULL != output_set)
1090       {
1091         // FIXME: record pending adds, use callback
1092         GNUNET_SET_remove_element (output_set->h,
1093                                    element,
1094                                    NULL,
1095                                    NULL);
1096 #ifdef GNUNET_EXTRA_LOGGING
1097         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098                     "P%u: removing element %s from set {%s} of task {%s}\n",
1099                     session->local_peer_idx,
1100                     debug_str_element (element),
1101                     debug_str_set_key (&setop->output_set),
1102                     debug_str_task_key (&task->key));
1103 #endif
1104       }
1105       if (NULL != output_diff)
1106       {
1107         diff_insert (output_diff, -1, element);
1108 #ifdef GNUNET_EXTRA_LOGGING
1109         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1111                     session->local_peer_idx,
1112                     debug_str_element (element),
1113                     debug_str_diff_key (&setop->output_diff),
1114                     debug_str_task_key (&task->key));
1115 #endif
1116       }
1117       if (NULL != output_rfn)
1118       {
1119         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1120 #ifdef GNUNET_EXTRA_LOGGING
1121         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1122                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1123                     session->local_peer_idx,
1124                     debug_str_element (element),
1125                     debug_str_rfn_key (&setop->output_rfn),
1126                     debug_str_task_key (&task->key));
1127 #endif
1128       }
1129       break;
1130     case GNUNET_SET_STATUS_DONE:
1131       // XXX: check first if any changes to the underlying
1132       // set are still pending
1133       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1134                   "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1135                   session->local_peer_idx,
1136                   debug_str_task_key (&task->key),
1137                   (unsigned int) task->step->finished_tasks,
1138                   (unsigned int) task->step->tasks_len);
1139       if (NULL != output_rfn)
1140       {
1141         rfn_commit (output_rfn, task_other_peer (task));
1142       }
1143       if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1144       {
1145         session->first_size = current_size;
1146       }
1147       finish_task (task);
1148       break;
1149     case GNUNET_SET_STATUS_FAILURE:
1150       // XXX: cleanup
1151       GNUNET_break_op (0);
1152       finish_task (task);
1153       return;
1154     default:
1155       /* not reached */
1156       GNUNET_assert (0);
1157   }
1158 }
1159
1160 #ifdef EVIL
1161
1162 enum EvilnessType
1163 {
1164   EVILNESS_NONE,
1165   EVILNESS_CRAM_ALL,
1166   EVILNESS_CRAM_LEAD,
1167   EVILNESS_CRAM_ECHO,
1168   EVILNESS_SLACK,
1169   EVILNESS_SLACK_A2A,
1170 };
1171
1172 enum EvilnessSubType
1173 {
1174   EVILNESS_SUB_NONE,
1175   EVILNESS_SUB_REPLACEMENT,
1176   EVILNESS_SUB_NO_REPLACEMENT,
1177 };
1178
1179 struct Evilness
1180 {
1181   enum EvilnessType type;
1182   enum EvilnessSubType subtype;
1183   unsigned int num;
1184 };
1185
1186
1187 static int
1188 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1189 {
1190   if (0 == strcmp ("replace", evil_subtype_str))
1191   {
1192     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1193   }
1194   else if (0 == strcmp ("noreplace", evil_subtype_str))
1195   {
1196     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1197   }
1198   else
1199   {
1200     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1201                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1202                 evil_subtype_str);
1203     return GNUNET_SYSERR;
1204   }
1205   return GNUNET_OK;
1206 }
1207
1208
1209 static void
1210 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1211 {
1212   char *evil_spec;
1213   char *field;
1214   char *evil_type_str = NULL;
1215   char *evil_subtype_str = NULL;
1216
1217   GNUNET_assert (NULL != evil);
1218
1219   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1220   {
1221     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222                 "P%u: no evilness\n",
1223                 session->local_peer_idx);
1224     evil->type = EVILNESS_NONE;
1225     return;
1226   }
1227   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1228               "P%u: got evilness spec\n",
1229               session->local_peer_idx);
1230
1231   for (field = strtok (evil_spec, "/");
1232        NULL != field;
1233        field = strtok (NULL, "/"))
1234   {
1235     unsigned int peer_num;
1236     unsigned int evil_num;
1237     int ret;
1238
1239     evil_type_str = NULL;
1240     evil_subtype_str = NULL;
1241
1242     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1243
1244     if (ret != 4)
1245     {
1246       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1247                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1248                   field,
1249                   ret);
1250       goto not_evil;
1251     }
1252
1253     GNUNET_assert (NULL != evil_type_str);
1254     GNUNET_assert (NULL != evil_subtype_str);
1255
1256     if (peer_num == session->local_peer_idx)
1257     {
1258       if (0 == strcmp ("slack", evil_type_str))
1259       {
1260         evil->type = EVILNESS_SLACK;
1261       }
1262       if (0 == strcmp ("slack-a2a", evil_type_str))
1263       {
1264         evil->type = EVILNESS_SLACK_A2A;
1265       }
1266       else if (0 == strcmp ("cram-all", evil_type_str))
1267       {
1268         evil->type = EVILNESS_CRAM_ALL;
1269         evil->num = evil_num;
1270         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1271           goto not_evil;
1272       }
1273       else if (0 == strcmp ("cram-lead", evil_type_str))
1274       {
1275         evil->type = EVILNESS_CRAM_LEAD;
1276         evil->num = evil_num;
1277         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1278           goto not_evil;
1279       }
1280       else if (0 == strcmp ("cram-echo", evil_type_str))
1281       {
1282         evil->type = EVILNESS_CRAM_ECHO;
1283         evil->num = evil_num;
1284         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1285           goto not_evil;
1286       }
1287       else
1288       {
1289         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1290                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1291                     evil_type_str);
1292         goto not_evil;
1293       }
1294       goto cleanup;
1295     }
1296     /* No GNUNET_free since memory was allocated by libc */
1297     free (evil_type_str);
1298     evil_type_str = NULL;
1299     evil_subtype_str = NULL;
1300   }
1301 not_evil:
1302   evil->type = EVILNESS_NONE;
1303 cleanup:
1304   GNUNET_free (evil_spec);
1305   /* no GNUNET_free_non_null since it wasn't
1306    * allocated with GNUNET_malloc */
1307   if (NULL != evil_type_str)
1308     free (evil_type_str);
1309   if (NULL != evil_subtype_str)
1310     free (evil_subtype_str);
1311 }
1312
1313 #endif
1314
1315
1316 /**
1317  * Commit the appropriate set for a
1318  * task.
1319  */
1320 static void
1321 commit_set (struct ConsensusSession *session,
1322             struct TaskEntry *task)
1323 {
1324   struct SetEntry *set;
1325   struct SetOpCls *setop = &task->cls.setop;
1326
1327   GNUNET_assert (NULL != setop->op);
1328   set = lookup_set (session, &setop->input_set);
1329   GNUNET_assert (NULL != set);
1330
1331   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1332   {
1333     struct GNUNET_SET_Element element;
1334     struct ConsensusElement ce = { 0 };
1335     ce.marker = CONSENSUS_MARKER_CONTESTED;
1336     element.data = &ce;
1337     element.size = sizeof (struct ConsensusElement);
1338     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1339     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1340   }
1341
1342   if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1343   {
1344     struct GNUNET_SET_Element element;
1345     struct ConsensusSizeElement cse = {
1346       .size = 0,
1347       .sender_index = 0
1348     };
1349     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1350     cse.ce.marker = CONSENSUS_MARKER_SIZE;
1351     cse.size = GNUNET_htonll (session->first_size);
1352     cse.sender_index = session->local_peer_idx;
1353     element.data = &cse;
1354     element.size = sizeof (struct ConsensusSizeElement);
1355     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1356     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1357   }
1358
1359 #ifdef EVIL
1360   {
1361     unsigned int i;
1362     struct Evilness evil;
1363
1364     get_evilness (session, &evil);
1365     if (EVILNESS_NONE != evil.type)
1366     {
1367       /* Useful for evaluation */
1368       GNUNET_STATISTICS_set (statistics,
1369                              "is evil",
1370                              1,
1371                              GNUNET_NO);
1372     }
1373     switch (evil.type)
1374     {
1375       case EVILNESS_CRAM_ALL:
1376       case EVILNESS_CRAM_LEAD:
1377       case EVILNESS_CRAM_ECHO:
1378         /* We're not cramming elements in the
1379            all-to-all round, since that would just
1380            add more elements to the result set, but
1381            wouldn't test robustness. */
1382         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1383         {
1384           GNUNET_SET_commit (setop->op, set->h);
1385           break;
1386         }
1387         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1388             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1389         {
1390           GNUNET_SET_commit (setop->op, set->h);
1391           break;
1392         }
1393         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1394         {
1395           GNUNET_SET_commit (setop->op, set->h);
1396           break;
1397         }
1398         for (i = 0; i < evil.num; i++)
1399         {
1400           struct GNUNET_SET_Element element;
1401           struct ConsensusStuffedElement se = {
1402             .ce.payload_type = 0,
1403             .ce.marker = 0,
1404           };
1405           element.data = &se;
1406           element.size = sizeof (struct ConsensusStuffedElement);
1407           element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1408
1409           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1410           {
1411             /* Always generate a new element. */
1412             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1413           }
1414           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1415           {
1416             /* Always cram the same elements, derived from counter. */
1417             GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1418           }
1419           else
1420           {
1421             GNUNET_assert (0);
1422           }
1423           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1424 #ifdef GNUNET_EXTRA_LOGGING
1425           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1426                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1427                       session->local_peer_idx,
1428                       debug_str_element (&element),
1429                       debug_str_set_key (&setop->input_set),
1430                       debug_str_task_key (&task->key));
1431 #endif
1432         }
1433         GNUNET_STATISTICS_update (statistics,
1434                                   "# stuffed elements",
1435                                   evil.num,
1436                                   GNUNET_NO);
1437         GNUNET_SET_commit (setop->op, set->h);
1438         break;
1439       case EVILNESS_SLACK:
1440         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441                     "P%u: evil peer: slacking\n",
1442                     (unsigned int) session->local_peer_idx);
1443         /* Do nothing. */
1444       case EVILNESS_SLACK_A2A:
1445         if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1446              (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1447         {
1448           struct GNUNET_SET_Handle *empty_set;
1449           empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1450           GNUNET_SET_commit (setop->op, empty_set);
1451           GNUNET_SET_destroy (empty_set);
1452         }
1453         else
1454         {
1455           GNUNET_SET_commit (setop->op, set->h);
1456         }
1457         break;
1458       case EVILNESS_NONE:
1459         GNUNET_SET_commit (setop->op, set->h);
1460         break;
1461     }
1462   }
1463 #else
1464   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1465   {
1466     GNUNET_SET_commit (setop->op, set->h);
1467   }
1468   else
1469   {
1470     /* For our testcases, we don't want the blacklisted
1471        peers to wait. */
1472     GNUNET_SET_operation_cancel (setop->op);
1473     setop->op = NULL;
1474     finish_task (task);
1475   }
1476 #endif
1477 }
1478
1479
1480 static void
1481 put_diff (struct ConsensusSession *session,
1482          struct DiffEntry *diff)
1483 {
1484   struct GNUNET_HashCode hash;
1485
1486   GNUNET_assert (NULL != diff);
1487
1488   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1489   GNUNET_assert (GNUNET_OK ==
1490                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1491                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1492 }
1493
1494 static void
1495 put_set (struct ConsensusSession *session,
1496          struct SetEntry *set)
1497 {
1498   struct GNUNET_HashCode hash;
1499
1500   GNUNET_assert (NULL != set->h);
1501
1502   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1503               "Putting set %s\n",
1504               debug_str_set_key (&set->key));
1505
1506   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1507   GNUNET_assert (GNUNET_SYSERR !=
1508                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1509                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1510 }
1511
1512
1513 static void
1514 put_rfn (struct ConsensusSession *session,
1515          struct ReferendumEntry *rfn)
1516 {
1517   struct GNUNET_HashCode hash;
1518
1519   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1520   GNUNET_assert (GNUNET_OK ==
1521                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1522                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1523 }
1524
1525
1526
1527 static void
1528 task_cancel_reconcile (struct TaskEntry *task)
1529 {
1530   /* not implemented yet */
1531   GNUNET_assert (0);
1532 }
1533
1534
1535 static void
1536 apply_diff_to_rfn (struct DiffEntry *diff,
1537                    struct ReferendumEntry *rfn,
1538                    uint16_t voting_peer,
1539                    uint16_t num_peers)
1540 {
1541   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1542   struct DiffElementInfo *di;
1543
1544   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1545
1546   while (GNUNET_YES ==
1547          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1548                                                       NULL,
1549                                                       (const void **) &di))
1550   {
1551     if (di->weight > 0)
1552     {
1553       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1554     }
1555     if (di->weight < 0)
1556     {
1557       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1558     }
1559   }
1560
1561   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1562 }
1563
1564
1565 struct DiffEntry *
1566 diff_create ()
1567 {
1568   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1569
1570   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1571
1572   return d;
1573 }
1574
1575
1576 struct DiffEntry *
1577 diff_compose (struct DiffEntry *diff_1,
1578               struct DiffEntry *diff_2)
1579 {
1580   struct DiffEntry *diff_new;
1581   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1582   struct DiffElementInfo *di;
1583
1584   diff_new = diff_create ();
1585
1586   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1587   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1588   {
1589     diff_insert (diff_new, di->weight, di->element);
1590   }
1591   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1592
1593   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1594   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1595   {
1596     diff_insert (diff_new, di->weight, di->element);
1597   }
1598   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1599
1600   return diff_new;
1601 }
1602
1603
1604 struct ReferendumEntry *
1605 rfn_create (uint16_t size)
1606 {
1607   struct ReferendumEntry *rfn;
1608
1609   rfn = GNUNET_new (struct ReferendumEntry);
1610   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1611   rfn->peer_commited = GNUNET_new_array (size, int);
1612   rfn->peer_contested = GNUNET_new_array (size, int);
1613   rfn->num_peers = size;
1614
1615   return rfn;
1616 }
1617
1618
1619 #if UNUSED
1620 static void
1621 diff_destroy (struct DiffEntry *diff)
1622 {
1623   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1624   GNUNET_free (diff);
1625 }
1626 #endif
1627
1628
1629 /**
1630  * For a given majority, count what the outcome
1631  * is (add/remove/keep), and give the number
1632  * of peers that voted for this outcome.
1633  */
1634 static void
1635 rfn_majority (const struct ReferendumEntry *rfn,
1636               const struct RfnElementInfo *ri,
1637               uint16_t *ret_majority,
1638               enum ReferendumVote *ret_vote)
1639 {
1640   uint16_t votes_yes = 0;
1641   uint16_t num_commited = 0;
1642   uint16_t i;
1643
1644   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1645               "Computing rfn majority for element %s of rfn {%s}\n",
1646               debug_str_element (ri->element),
1647               debug_str_rfn_key (&rfn->key));
1648
1649   for (i = 0; i < rfn->num_peers; i++)
1650   {
1651     if (GNUNET_NO == rfn->peer_commited[i])
1652       continue;
1653     num_commited++;
1654
1655     if (GNUNET_YES == ri->votes[i])
1656       votes_yes++;
1657   }
1658
1659   if (votes_yes > (num_commited) / 2)
1660   {
1661     *ret_vote = ri->proposal;
1662     *ret_majority = votes_yes;
1663   }
1664   else
1665   {
1666     *ret_vote = VOTE_STAY;
1667     *ret_majority = num_commited - votes_yes;
1668   }
1669 }
1670
1671
1672 struct SetCopyCls
1673 {
1674   struct TaskEntry *task;
1675   struct SetKey dst_set_key;
1676 };
1677
1678
1679 static void
1680 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1681 {
1682   struct SetCopyCls *scc = cls;
1683   struct TaskEntry *task = scc->task;
1684   struct SetKey dst_set_key = scc->dst_set_key;
1685   struct SetEntry *set;
1686   struct SetHandle *sh = GNUNET_new (struct SetHandle);
1687
1688   sh->h = copy;
1689   GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1690                                task->step->session->set_handles_tail,
1691                                sh);
1692
1693   GNUNET_free (scc);
1694   set = GNUNET_new (struct SetEntry);
1695   set->h = copy;
1696   set->key = dst_set_key;
1697   put_set (task->step->session, set);
1698
1699   task->start (task);
1700 }
1701
1702
1703 /**
1704  * Call the start function of the given
1705  * task again after we created a copy of the given set.
1706  */
1707 static void
1708 create_set_copy_for_task (struct TaskEntry *task,
1709                           struct SetKey *src_set_key,
1710                           struct SetKey *dst_set_key)
1711 {
1712   struct SetEntry *src_set;
1713   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1714
1715   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1716               "Copying set {%s} to {%s} for task {%s}\n",
1717               debug_str_set_key (src_set_key),
1718               debug_str_set_key (dst_set_key),
1719               debug_str_task_key (&task->key));
1720
1721   scc->task = task;
1722   scc->dst_set_key = *dst_set_key;
1723   src_set = lookup_set (task->step->session, src_set_key);
1724   GNUNET_assert (NULL != src_set);
1725   GNUNET_SET_copy_lazy (src_set->h,
1726                         set_copy_cb,
1727                         scc);
1728 }
1729
1730
1731 struct SetMutationProgressCls
1732 {
1733   int num_pending;
1734   /**
1735    * Task to finish once all changes are through.
1736    */
1737   struct TaskEntry *task;
1738 };
1739
1740
1741 static void
1742 set_mutation_done (void *cls)
1743 {
1744   struct SetMutationProgressCls *pc = cls;
1745
1746   GNUNET_assert (pc->num_pending > 0);
1747
1748   pc->num_pending--;
1749
1750   if (0 == pc->num_pending)
1751   {
1752     struct TaskEntry *task = pc->task;
1753     GNUNET_free (pc);
1754     finish_task (task);
1755   }
1756 }
1757
1758
1759 static void
1760 try_finish_step_early (struct Step *step)
1761 {
1762   unsigned int i;
1763
1764   if (GNUNET_YES == step->is_running)
1765     return;
1766   if (GNUNET_YES == step->is_finished)
1767     return;
1768   if (GNUNET_NO == step->early_finishable)
1769     return;
1770
1771   step->is_finished = GNUNET_YES;
1772
1773 #ifdef GNUNET_EXTRA_LOGGING
1774   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1775               "Finishing step `%s' early.\n",
1776               step->debug_name);
1777 #endif
1778
1779   for (i = 0; i < step->subordinates_len; i++)
1780   {
1781     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1782     step->subordinates[i]->pending_prereq--;
1783 #ifdef GNUNET_EXTRA_LOGGING
1784     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1785                 "Decreased pending_prereq to %u for step `%s'.\n",
1786                 (unsigned int) step->subordinates[i]->pending_prereq,
1787                 step->subordinates[i]->debug_name);
1788
1789 #endif
1790     try_finish_step_early (step->subordinates[i]);
1791   }
1792
1793   // XXX: maybe schedule as task to avoid recursion?
1794   run_ready_steps (step->session);
1795 }
1796
1797
1798 static void
1799 finish_step (struct Step *step)
1800 {
1801   unsigned int i;
1802
1803   GNUNET_assert (step->finished_tasks == step->tasks_len);
1804   GNUNET_assert (GNUNET_YES == step->is_running);
1805   GNUNET_assert (GNUNET_NO == step->is_finished);
1806
1807 #ifdef GNUNET_EXTRA_LOGGING
1808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1809               "All tasks of step `%s' with %u subordinates finished.\n",
1810               step->debug_name,
1811               step->subordinates_len);
1812 #endif
1813
1814   for (i = 0; i < step->subordinates_len; i++)
1815   {
1816     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1817     step->subordinates[i]->pending_prereq--;
1818 #ifdef GNUNET_EXTRA_LOGGING
1819     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1820                 "Decreased pending_prereq to %u for step `%s'.\n",
1821                 (unsigned int) step->subordinates[i]->pending_prereq,
1822                 step->subordinates[i]->debug_name);
1823
1824 #endif
1825   }
1826
1827   step->is_finished = GNUNET_YES;
1828
1829   // XXX: maybe schedule as task to avoid recursion?
1830   run_ready_steps (step->session);
1831 }
1832
1833
1834
1835 /**
1836  * Apply the result from one round of gradecasts (i.e. every peer
1837  * should have gradecasted) to the peer's current set.
1838  *
1839  * @param task the task with context information
1840  */
1841 static void
1842 task_start_apply_round (struct TaskEntry *task)
1843 {
1844   struct ConsensusSession *session = task->step->session;
1845   struct SetKey sk_in;
1846   struct SetKey sk_out;
1847   struct RfnKey rk_in;
1848   struct SetEntry *set_out;
1849   struct ReferendumEntry *rfn_in;
1850   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1851   struct RfnElementInfo *ri;
1852   struct SetMutationProgressCls *progress_cls;
1853   uint16_t worst_majority = UINT16_MAX;
1854
1855   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1856   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1857   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1858
1859   set_out = lookup_set (session, &sk_out);
1860   if (NULL == set_out)
1861   {
1862     create_set_copy_for_task (task, &sk_in, &sk_out);
1863     return;
1864   }
1865
1866   rfn_in = lookup_rfn (session, &rk_in);
1867   GNUNET_assert (NULL != rfn_in);
1868
1869   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1870   progress_cls->task = task;
1871
1872   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1873
1874   while (GNUNET_YES ==
1875          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1876                                                       NULL,
1877                                                       (const void **) &ri))
1878   {
1879     uint16_t majority_num;
1880     enum ReferendumVote majority_vote;
1881
1882     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1883
1884     if (worst_majority > majority_num)
1885       worst_majority = majority_num;
1886
1887     switch (majority_vote)
1888     {
1889       case VOTE_ADD:
1890         progress_cls->num_pending++;
1891         GNUNET_assert (GNUNET_OK ==
1892                        GNUNET_SET_add_element (set_out->h,
1893                                                ri->element,
1894                                                &set_mutation_done,
1895                                                progress_cls));
1896         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1897                     "P%u: apply round: adding element %s with %u-majority.\n",
1898                     session->local_peer_idx,
1899                     debug_str_element (ri->element), majority_num);
1900         break;
1901       case VOTE_REMOVE:
1902         progress_cls->num_pending++;
1903         GNUNET_assert (GNUNET_OK ==
1904                        GNUNET_SET_remove_element (set_out->h,
1905                                                   ri->element,
1906                                                   &set_mutation_done,
1907                                                   progress_cls));
1908         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1909                     "P%u: apply round: deleting element %s with %u-majority.\n",
1910                     session->local_peer_idx,
1911                     debug_str_element (ri->element), majority_num);
1912         break;
1913       case VOTE_STAY:
1914         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1915                     "P%u: apply round: keeping element %s with %u-majority.\n",
1916                     session->local_peer_idx,
1917                     debug_str_element (ri->element), majority_num);
1918         // do nothing
1919         break;
1920       default:
1921         GNUNET_assert (0);
1922         break;
1923     }
1924   }
1925
1926   if (0 == progress_cls->num_pending)
1927   {
1928     // call closure right now, no pending ops
1929     GNUNET_free (progress_cls);
1930     finish_task (task);
1931   }
1932
1933   {
1934     uint16_t thresh = (session->num_peers / 3) * 2;
1935
1936     if (worst_majority >= thresh)
1937     {
1938       switch (session->early_stopping)
1939       {
1940         case EARLY_STOPPING_NONE:
1941           session->early_stopping = EARLY_STOPPING_ONE_MORE;
1942           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1943                       "P%u: Stopping early (after one more superround)\n",
1944                       session->local_peer_idx);
1945           break;
1946         case EARLY_STOPPING_ONE_MORE:
1947           GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1948                       session->local_peer_idx);
1949           session->early_stopping = EARLY_STOPPING_DONE;
1950           {
1951             struct Step *step;
1952             for (step = session->steps_head; NULL != step; step = step->next)
1953               try_finish_step_early (step);
1954           }
1955           break;
1956         case EARLY_STOPPING_DONE:
1957           /* We shouldn't be here anymore after early stopping */
1958           GNUNET_break (0);
1959           break;
1960         default:
1961           GNUNET_assert (0);
1962           break;
1963       }
1964     }
1965     else if (EARLY_STOPPING_NONE != session->early_stopping)
1966     {
1967       // Our assumption about the number of bad peers
1968       // has been broken.
1969       GNUNET_break_op (0);
1970     }
1971     else
1972     {
1973       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1974                   session->local_peer_idx);
1975     }
1976   }
1977   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1978 }
1979
1980
1981 static void
1982 task_start_grade (struct TaskEntry *task)
1983 {
1984   struct ConsensusSession *session = task->step->session;
1985   struct ReferendumEntry *output_rfn;
1986   struct ReferendumEntry *input_rfn;
1987   struct DiffEntry *input_diff;
1988   struct RfnKey rfn_key;
1989   struct DiffKey diff_key;
1990   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1991   struct RfnElementInfo *ri;
1992   unsigned int gradecast_confidence = 2;
1993
1994   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1995   output_rfn = lookup_rfn (session, &rfn_key);
1996   if (NULL == output_rfn)
1997   {
1998     output_rfn = rfn_create (session->num_peers);
1999     output_rfn->key = rfn_key;
2000     put_rfn (session, output_rfn);
2001   }
2002
2003   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2004   input_diff = lookup_diff (session, &diff_key);
2005   GNUNET_assert (NULL != input_diff);
2006
2007   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2008   input_rfn = lookup_rfn (session, &rfn_key);
2009   GNUNET_assert (NULL != input_rfn);
2010
2011   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2012
2013   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2014
2015   while (GNUNET_YES ==
2016          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2017                                                       NULL,
2018                                                       (const void **) &ri))
2019   {
2020     uint16_t majority_num;
2021     enum ReferendumVote majority_vote;
2022
2023     // XXX: we need contested votes and non-contested votes here
2024     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2025
2026     if (majority_num <= session->num_peers / 3)
2027       majority_vote = VOTE_REMOVE;
2028
2029     switch (majority_vote)
2030     {
2031       case VOTE_STAY:
2032         break;
2033       case VOTE_ADD:
2034         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2035         break;
2036       case VOTE_REMOVE:
2037         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2038         break;
2039       default:
2040         GNUNET_assert (0);
2041         break;
2042     }
2043   }
2044   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2045
2046   {
2047     uint16_t noncontested;
2048     noncontested = rfn_noncontested (input_rfn);
2049     if (noncontested < (session->num_peers / 3) * 2)
2050     {
2051       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2052     }
2053     if (noncontested < (session->num_peers / 3) + 1)
2054     {
2055       gradecast_confidence = 0;
2056     }
2057   }
2058
2059   if (gradecast_confidence >= 1)
2060     rfn_commit (output_rfn, task->key.leader);
2061
2062   if (gradecast_confidence <= 1)
2063     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2064
2065   finish_task (task);
2066 }
2067
2068
2069 static void
2070 task_start_reconcile (struct TaskEntry *task)
2071 {
2072   struct SetEntry *input;
2073   struct SetOpCls *setop = &task->cls.setop;
2074   struct ConsensusSession *session = task->step->session;
2075
2076   input = lookup_set (session, &setop->input_set);
2077   GNUNET_assert (NULL != input);
2078   GNUNET_assert (NULL != input->h);
2079
2080   /* We create the outputs for the operation here
2081      (rather than in the set operation callback)
2082      because we want something valid in there, even
2083      if the other peer doesn't talk to us */
2084
2085   if (SET_KIND_NONE != setop->output_set.set_kind)
2086   {
2087     /* If we don't have an existing output set,
2088        we clone the input set. */
2089     if (NULL == lookup_set (session, &setop->output_set))
2090     {
2091       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2092       return;
2093     }
2094   }
2095
2096   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2097   {
2098     if (NULL == lookup_rfn (session, &setop->output_rfn))
2099     {
2100       struct ReferendumEntry *rfn;
2101
2102       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2103                   "P%u: output rfn <%s> missing, creating.\n",
2104                   session->local_peer_idx,
2105                   debug_str_rfn_key (&setop->output_rfn));
2106
2107       rfn = rfn_create (session->num_peers);
2108       rfn->key = setop->output_rfn;
2109       put_rfn (session, rfn);
2110     }
2111   }
2112
2113   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2114   {
2115     if (NULL == lookup_diff (session, &setop->output_diff))
2116     {
2117       struct DiffEntry *diff;
2118
2119       diff = diff_create ();
2120       diff->key = setop->output_diff;
2121       put_diff (session, diff);
2122     }
2123   }
2124
2125   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2126   {
2127     /* XXX: mark the corresponding rfn as commited if necessary */
2128     finish_task (task);
2129     return;
2130   }
2131
2132   if (task->key.peer1 == session->local_peer_idx)
2133   {
2134     struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2135
2136     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2137                 "P%u: Looking up set {%s} to run remote union\n",
2138                 session->local_peer_idx,
2139                 debug_str_set_key (&setop->input_set));
2140
2141     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2142     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2143
2144     rcm.kind = htons (task->key.kind);
2145     rcm.peer1 = htons (task->key.peer1);
2146     rcm.peer2 = htons (task->key.peer2);
2147     rcm.leader = htons (task->key.leader);
2148     rcm.repetition = htons (task->key.repetition);
2149     rcm.is_contested = htons (0);
2150
2151     GNUNET_assert (NULL == setop->op);
2152     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2153                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2154
2155     struct GNUNET_SET_Option opts[] = {
2156       { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2157       { GNUNET_SET_OPTION_END },
2158     };
2159
2160     // XXX: maybe this should be done while
2161     // setting up tasks alreays?
2162     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2163                                     &session->global_id,
2164                                     &rcm.header,
2165                                     GNUNET_SET_RESULT_SYMMETRIC,
2166                                     opts,
2167                                     set_result_cb,
2168                                     task);
2169
2170     commit_set (session, task);
2171   }
2172   else if (task->key.peer2 == session->local_peer_idx)
2173   {
2174     /* Wait for the other peer to contact us */
2175     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2176                 session->local_peer_idx, task->key.peer1);
2177
2178     if (NULL != setop->op)
2179     {
2180       commit_set (session, task);
2181     }
2182   }
2183   else
2184   {
2185     /* We made an error while constructing the task graph. */
2186     GNUNET_assert (0);
2187   }
2188 }
2189
2190
2191 static void
2192 task_start_eval_echo (struct TaskEntry *task)
2193 {
2194   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2195   struct ReferendumEntry *input_rfn;
2196   struct RfnElementInfo *ri;
2197   struct SetEntry *output_set;
2198   struct SetMutationProgressCls *progress_cls;
2199   struct ConsensusSession *session = task->step->session;
2200   struct SetKey sk_in;
2201   struct SetKey sk_out;
2202   struct RfnKey rk_in;
2203
2204   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2205   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2206   output_set = lookup_set (session, &sk_out);
2207   if (NULL == output_set)
2208   {
2209     create_set_copy_for_task (task, &sk_in, &sk_out);
2210     return;
2211   }
2212
2213
2214   {
2215     // FIXME: should be marked as a shallow copy, so
2216     // we can destroy everything correctly
2217     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2218     last_set->h = output_set->h;
2219     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2220     put_set (session, last_set);
2221   }
2222
2223   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2224               "Evaluating referendum in Task {%s}\n",
2225               debug_str_task_key (&task->key));
2226
2227   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2228   progress_cls->task = task;
2229
2230   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2231   input_rfn = lookup_rfn (session, &rk_in);
2232
2233   GNUNET_assert (NULL != input_rfn);
2234
2235   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2236   GNUNET_assert (NULL != iter);
2237
2238   while (GNUNET_YES ==
2239          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2240                                                       NULL,
2241                                                       (const void **) &ri))
2242   {
2243     enum ReferendumVote majority_vote;
2244     uint16_t majority_num;
2245
2246     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2247
2248     if (majority_num < session->num_peers / 3)
2249     {
2250       /* It is not the case that all nonfaulty peers
2251          echoed the same value.  Since we're doing a set reconciliation, we
2252          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2253          reconciliation as contested.  Other peers might not know that the
2254          leader is faulty, thus we still re-distribute in the confirmation
2255          round. */
2256       output_set->is_contested = GNUNET_YES;
2257     }
2258
2259     switch (majority_vote)
2260     {
2261       case VOTE_ADD:
2262         progress_cls->num_pending++;
2263         GNUNET_assert (GNUNET_OK ==
2264                        GNUNET_SET_add_element (output_set->h,
2265                                                ri->element,
2266                                                set_mutation_done,
2267                                                progress_cls));
2268         break;
2269       case VOTE_REMOVE:
2270         progress_cls->num_pending++;
2271         GNUNET_assert (GNUNET_OK ==
2272                        GNUNET_SET_remove_element (output_set->h,
2273                                                   ri->element,
2274                                                   set_mutation_done,
2275                                                   progress_cls));
2276         break;
2277       case VOTE_STAY:
2278         /* Nothing to do. */
2279         break;
2280       default:
2281         /* not reached */
2282         GNUNET_assert (0);
2283     }
2284   }
2285
2286   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2287
2288   if (0 == progress_cls->num_pending)
2289   {
2290     // call closure right now, no pending ops
2291     GNUNET_free (progress_cls);
2292     finish_task (task);
2293   }
2294 }
2295
2296
2297 static void
2298 task_start_finish (struct TaskEntry *task)
2299 {
2300   struct SetEntry *final_set;
2301   struct ConsensusSession *session = task->step->session;
2302
2303   final_set = lookup_set (session, &task->cls.finish.input_set);
2304
2305   GNUNET_assert (NULL != final_set);
2306
2307
2308   GNUNET_SET_iterate (final_set->h,
2309                       send_to_client_iter,
2310                       task);
2311 }
2312
2313 static void
2314 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2315 {
2316   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2317
2318   GNUNET_assert (GNUNET_NO == task->is_started);
2319   GNUNET_assert (GNUNET_NO == task->is_finished);
2320   GNUNET_assert (NULL != task->start);
2321
2322   task->start (task);
2323
2324   task->is_started = GNUNET_YES;
2325 }
2326
2327
2328
2329
2330 /*
2331  * Run all steps of the session that don't any
2332  * more dependencies.
2333  */
2334 static void
2335 run_ready_steps (struct ConsensusSession *session)
2336 {
2337   struct Step *step;
2338
2339   step = session->steps_head;
2340
2341   while (NULL != step)
2342   {
2343     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2344     {
2345       size_t i;
2346
2347       GNUNET_assert (0 == step->finished_tasks);
2348
2349 #ifdef GNUNET_EXTRA_LOGGING
2350       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2351                   session->local_peer_idx,
2352                   step->debug_name,
2353                   step->round, step->tasks_len, step->subordinates_len);
2354 #endif
2355
2356       step->is_running = GNUNET_YES;
2357       for (i = 0; i < step->tasks_len; i++)
2358         start_task (session, step->tasks[i]);
2359
2360       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2361       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2362         finish_step (step);
2363
2364       /* Running the next ready steps will be triggered by task completion */
2365       return;
2366     }
2367     step = step->next;
2368   }
2369
2370   return;
2371 }
2372
2373
2374
2375 static void
2376 finish_task (struct TaskEntry *task)
2377 {
2378   GNUNET_assert (GNUNET_NO == task->is_finished);
2379   task->is_finished = GNUNET_YES;
2380
2381   task->step->finished_tasks++;
2382
2383   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2384               "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2385               task->step->session->local_peer_idx,
2386               debug_str_task_key (&task->key),
2387               (unsigned int) task->step->finished_tasks,
2388               (unsigned int) task->step->tasks_len);
2389
2390   if (task->step->finished_tasks == task->step->tasks_len)
2391     finish_step (task->step);
2392 }
2393
2394
2395 /**
2396  * Search peer in the list of peers in session.
2397  *
2398  * @param peer peer to find
2399  * @param session session with peer
2400  * @return index of peer, -1 if peer is not in session
2401  */
2402 static int
2403 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2404 {
2405   int i;
2406   for (i = 0; i < session->num_peers; i++)
2407     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2408       return i;
2409   return -1;
2410 }
2411
2412
2413 /**
2414  * Compute a global, (hopefully) unique consensus session id,
2415  * from the local id of the consensus session, and the identities of all participants.
2416  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2417  * exactly the same peers, the global id will be different.
2418  *
2419  * @param session session to generate the global id for
2420  * @param local_session_id local id of the consensus session
2421  */
2422 static void
2423 compute_global_id (struct ConsensusSession *session,
2424                    const struct GNUNET_HashCode *local_session_id)
2425 {
2426   const char *salt = "gnunet-service-consensus/session_id";
2427
2428   GNUNET_assert (GNUNET_YES ==
2429                  GNUNET_CRYPTO_kdf (&session->global_id,
2430                                     sizeof (struct GNUNET_HashCode),
2431                                     salt,
2432                                     strlen (salt),
2433                                     session->peers,
2434                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2435                                     local_session_id,
2436                                     sizeof (struct GNUNET_HashCode),
2437                                     NULL));
2438 }
2439
2440
2441 /**
2442  * Compare two peer identities.
2443  *
2444  * @param h1 some peer identity
2445  * @param h2 some peer identity
2446  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2447  */
2448 static int
2449 peer_id_cmp (const void *h1, const void *h2)
2450 {
2451   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2452 }
2453
2454
2455 /**
2456  * Create the sorted list of peers for the session,
2457  * add the local peer if not in the join message.
2458  *
2459  * @param session session to initialize
2460  * @param join_msg join message with the list of peers participating at the end
2461  */
2462 static void
2463 initialize_session_peer_list (struct ConsensusSession *session,
2464                               const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2465 {
2466   const struct GNUNET_PeerIdentity *msg_peers
2467     = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2468   int local_peer_in_list;
2469
2470   session->num_peers = ntohl (join_msg->num_peers);
2471
2472   /* Peers in the join message, may or may not include the local peer,
2473      Add it if it is missing. */
2474   local_peer_in_list = GNUNET_NO;
2475   for (unsigned int i = 0; i < session->num_peers; i++)
2476   {
2477     if (0 == memcmp (&msg_peers[i],
2478                      &my_peer,
2479                      sizeof (struct GNUNET_PeerIdentity)))
2480     {
2481       local_peer_in_list = GNUNET_YES;
2482       break;
2483     }
2484   }
2485   if (GNUNET_NO == local_peer_in_list)
2486     session->num_peers++;
2487
2488   session->peers = GNUNET_new_array (session->num_peers,
2489                                      struct GNUNET_PeerIdentity);
2490   if (GNUNET_NO == local_peer_in_list)
2491     session->peers[session->num_peers - 1] = my_peer;
2492
2493   GNUNET_memcpy (session->peers,
2494                  msg_peers,
2495                  ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2496   qsort (session->peers,
2497          session->num_peers,
2498          sizeof (struct GNUNET_PeerIdentity),
2499          &peer_id_cmp);
2500 }
2501
2502
2503 static struct TaskEntry *
2504 lookup_task (struct ConsensusSession *session,
2505              struct TaskKey *key)
2506 {
2507   struct GNUNET_HashCode hash;
2508
2509
2510   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2512               GNUNET_h2s (&hash));
2513   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2514 }
2515
2516
2517 /**
2518  * Called when another peer wants to do a set operation with the
2519  * local peer.
2520  *
2521  * @param cls closure
2522  * @param other_peer the other peer
2523  * @param context_msg message with application specific information from
2524  *        the other peer
2525  * @param request request from the other peer, use GNUNET_SET_accept
2526  *        to accept it, otherwise the request will be refused
2527  *        Note that we don't use a return value here, as it is also
2528  *        necessary to specify the set we want to do the operation with,
2529  *        whith sometimes can be derived from the context message.
2530  *        Also necessary to specify the timeout.
2531  */
2532 static void
2533 set_listen_cb (void *cls,
2534                const struct GNUNET_PeerIdentity *other_peer,
2535                const struct GNUNET_MessageHeader *context_msg,
2536                struct GNUNET_SET_Request *request)
2537 {
2538   struct ConsensusSession *session = cls;
2539   struct TaskKey tk;
2540   struct TaskEntry *task;
2541   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2542
2543   if (NULL == context_msg)
2544   {
2545     GNUNET_break_op (0);
2546     return;
2547   }
2548
2549   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2550   {
2551     GNUNET_break_op (0);
2552     return;
2553   }
2554
2555   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2556   {
2557     GNUNET_break_op (0);
2558     return;
2559   }
2560
2561   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2562
2563   tk = ((struct TaskKey) {
2564       .kind = ntohs (cm->kind),
2565       .peer1 = ntohs (cm->peer1),
2566       .peer2 = ntohs (cm->peer2),
2567       .repetition = ntohs (cm->repetition),
2568       .leader = ntohs (cm->leader),
2569   });
2570
2571   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2572               session->local_peer_idx, debug_str_task_key (&tk));
2573
2574   task = lookup_task (session, &tk);
2575
2576   if (NULL == task)
2577   {
2578     GNUNET_break_op (0);
2579     return;
2580   }
2581
2582   if (GNUNET_YES == task->is_finished)
2583   {
2584     GNUNET_break_op (0);
2585     return;
2586   }
2587
2588   if (task->key.peer2 != session->local_peer_idx)
2589   {
2590     /* We're being asked, so we must be thne 2nd peer. */
2591     GNUNET_break_op (0);
2592     return;
2593   }
2594
2595   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2596                     (task->key.peer2 == session->local_peer_idx)));
2597
2598   struct GNUNET_SET_Option opts[] = {
2599     { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2600     { GNUNET_SET_OPTION_END },
2601   };
2602
2603   task->cls.setop.op = GNUNET_SET_accept (request,
2604                                           GNUNET_SET_RESULT_SYMMETRIC,
2605                                           opts,
2606                                           set_result_cb,
2607                                           task);
2608
2609   /* If the task hasn't been started yet,
2610      we wait for that until we commit. */
2611
2612   if (GNUNET_YES == task->is_started)
2613   {
2614     commit_set (session, task);
2615   }
2616 }
2617
2618
2619
2620 static void
2621 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2622           struct TaskEntry *t)
2623 {
2624   struct GNUNET_HashCode round_hash;
2625   struct Step *s;
2626
2627   GNUNET_assert (NULL != t->step);
2628
2629   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2630
2631   s = t->step;
2632
2633   if (s->tasks_len == s->tasks_cap)
2634   {
2635     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2636     GNUNET_array_grow (s->tasks,
2637                        s->tasks_cap,
2638                        target_size);
2639   }
2640
2641 #ifdef GNUNET_EXTRA_LOGGING
2642   GNUNET_assert (NULL != s->debug_name);
2643   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2644               debug_str_task_key (&t->key),
2645               s->debug_name);
2646 #endif
2647
2648   s->tasks[s->tasks_len] = t;
2649   s->tasks_len++;
2650
2651   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2652   GNUNET_assert (GNUNET_OK ==
2653       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2654                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2655 }
2656
2657
2658 static void
2659 install_step_timeouts (struct ConsensusSession *session)
2660 {
2661   /* Given the fully constructed task graph
2662      with rounds for tasks, we can give the tasks timeouts. */
2663
2664   // unsigned int max_round;
2665
2666   /* XXX: implement! */
2667 }
2668
2669
2670
2671 /*
2672  * Arrange two peers in some canonical order.
2673  */
2674 static void
2675 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2676 {
2677   uint16_t a;
2678   uint16_t b;
2679
2680   GNUNET_assert (*p1 < n);
2681   GNUNET_assert (*p2 < n);
2682
2683   if (*p1 < *p2)
2684   {
2685     a = *p1;
2686     b = *p2;
2687   }
2688   else
2689   {
2690     a = *p2;
2691     b = *p1;
2692   }
2693
2694   /* For uniformly random *p1, *p2,
2695      this condition is true with 50% chance */
2696   if (((b - a) + n) % n <= n / 2)
2697   {
2698     *p1 = a;
2699     *p2 = b;
2700   }
2701   else
2702   {
2703     *p1 = b;
2704     *p2 = a;
2705   }
2706 }
2707
2708
2709 /**
2710  * Record @a dep as a dependency of @a step.
2711  */
2712 static void
2713 step_depend_on (struct Step *step, struct Step *dep)
2714 {
2715   /* We're not checking for cyclic dependencies,
2716      but this is a cheap sanity check. */
2717   GNUNET_assert (step != dep);
2718   GNUNET_assert (NULL != step);
2719   GNUNET_assert (NULL != dep);
2720   GNUNET_assert (dep->round <= step->round);
2721
2722 #ifdef GNUNET_EXTRA_LOGGING
2723   /* Make sure we have complete debugging information.
2724      Also checks that we don't screw up too badly
2725      constructing the task graph. */
2726   GNUNET_assert (NULL != step->debug_name);
2727   GNUNET_assert (NULL != dep->debug_name);
2728   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2729               "Making step `%s' depend on `%s'\n",
2730               step->debug_name,
2731               dep->debug_name);
2732 #endif
2733
2734   if (dep->subordinates_cap == dep->subordinates_len)
2735   {
2736     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2737     GNUNET_array_grow (dep->subordinates,
2738                        dep->subordinates_cap,
2739                        target_size);
2740   }
2741
2742   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2743
2744   dep->subordinates[dep->subordinates_len] = step;
2745   dep->subordinates_len++;
2746
2747   step->pending_prereq++;
2748 }
2749
2750
2751 static struct Step *
2752 create_step (struct ConsensusSession *session, int round, int early_finishable)
2753 {
2754   struct Step *step;
2755   step = GNUNET_new (struct Step);
2756   step->session = session;
2757   step->round = round;
2758   step->early_finishable = early_finishable;
2759   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2760                                     session->steps_tail,
2761                                     step);
2762   return step;
2763 }
2764
2765
2766 /**
2767  * Construct the task graph for a single
2768  * gradecast.
2769  */
2770 static void
2771 construct_task_graph_gradecast (struct ConsensusSession *session,
2772                                 uint16_t rep,
2773                                 uint16_t lead,
2774                                 struct Step *step_before,
2775                                 struct Step *step_after)
2776 {
2777   uint16_t n = session->num_peers;
2778   uint16_t me = session->local_peer_idx;
2779
2780   uint16_t p1;
2781   uint16_t p2;
2782
2783   /* The task we're currently setting up. */
2784   struct TaskEntry task;
2785
2786   struct Step *step;
2787   struct Step *prev_step;
2788
2789   uint16_t round;
2790
2791   unsigned int k;
2792
2793   round = step_before->round + 1;
2794
2795   /* gcast step 1: leader disseminates */
2796
2797   step = create_step (session, round, GNUNET_YES);
2798
2799 #ifdef GNUNET_EXTRA_LOGGING
2800   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2801 #endif
2802   step_depend_on (step, step_before);
2803
2804   if (lead == me)
2805   {
2806     for (k = 0; k < n; k++)
2807     {
2808       if (k == me)
2809         continue;
2810       p1 = me;
2811       p2 = k;
2812       arrange_peers (&p1, &p2, n);
2813       task = ((struct TaskEntry) {
2814         .step = step,
2815         .start = task_start_reconcile,
2816         .cancel = task_cancel_reconcile,
2817         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2818       });
2819       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2820       put_task (session->taskmap, &task);
2821     }
2822     /* We run this task to make sure that the leader
2823        has the stored the SET_KIND_LEADER set of himself,
2824        so it can participate in the rest of the gradecast
2825        without the code having to handle any special cases. */
2826     task = ((struct TaskEntry) {
2827       .step = step,
2828       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2829       .start = task_start_reconcile,
2830       .cancel = task_cancel_reconcile,
2831     });
2832     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2833     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2834     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2835     put_task (session->taskmap, &task);
2836   }
2837   else
2838   {
2839     p1 = me;
2840     p2 = lead;
2841     arrange_peers (&p1, &p2, n);
2842     task = ((struct TaskEntry) {
2843       .step = step,
2844       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2845       .start = task_start_reconcile,
2846       .cancel = task_cancel_reconcile,
2847     });
2848     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2849     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2850     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2851     put_task (session->taskmap, &task);
2852   }
2853
2854   /* gcast phase 2: echo */
2855   prev_step = step;
2856   round += 1;
2857   step = create_step (session, round, GNUNET_YES);
2858 #ifdef GNUNET_EXTRA_LOGGING
2859   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2860 #endif
2861   step_depend_on (step, prev_step);
2862
2863   for (k = 0; k < n; k++)
2864   {
2865     p1 = k;
2866     p2 = me;
2867     arrange_peers (&p1, &p2, n);
2868     task = ((struct TaskEntry) {
2869       .step = step,
2870       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2871       .start = task_start_reconcile,
2872       .cancel = task_cancel_reconcile,
2873     });
2874     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2875     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2876     put_task (session->taskmap, &task);
2877   }
2878
2879   prev_step = step;
2880   /* Same round, since step only has local tasks */
2881   step = create_step (session, round, GNUNET_YES);
2882 #ifdef GNUNET_EXTRA_LOGGING
2883   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2884 #endif
2885   step_depend_on (step, prev_step);
2886
2887   arrange_peers (&p1, &p2, n);
2888   task = ((struct TaskEntry) {
2889     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2890     .step = step,
2891     .start = task_start_eval_echo
2892   });
2893   put_task (session->taskmap, &task);
2894
2895   prev_step = step;
2896   round += 1;
2897   step = create_step (session, round, GNUNET_YES);
2898 #ifdef GNUNET_EXTRA_LOGGING
2899   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2900 #endif
2901   step_depend_on (step, prev_step);
2902
2903   /* gcast phase 3: confirmation and grading */
2904   for (k = 0; k < n; k++)
2905   {
2906     p1 = k;
2907     p2 = me;
2908     arrange_peers (&p1, &p2, n);
2909     task = ((struct TaskEntry) {
2910       .step = step,
2911       .start = task_start_reconcile,
2912       .cancel = task_cancel_reconcile,
2913       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2914     });
2915     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2916     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2917     /* If there was at least one element in the echo round that was
2918        contested (i.e. it had no n-t majority), then we let the other peers
2919        know, and other peers let us know.  The contested flag for each peer is
2920        stored in the rfn. */
2921     task.cls.setop.transceive_contested = GNUNET_YES;
2922     put_task (session->taskmap, &task);
2923   }
2924
2925   prev_step = step;
2926   /* Same round, since step only has local tasks */
2927   step = create_step (session, round, GNUNET_YES);
2928 #ifdef GNUNET_EXTRA_LOGGING
2929   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2930 #endif
2931   step_depend_on (step, prev_step);
2932
2933   task = ((struct TaskEntry) {
2934     .step = step,
2935     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2936     .start = task_start_grade,
2937   });
2938   put_task (session->taskmap, &task);
2939
2940   step_depend_on (step_after, step);
2941 }
2942
2943
2944 static void
2945 construct_task_graph (struct ConsensusSession *session)
2946 {
2947   uint16_t n = session->num_peers;
2948   uint16_t t = n / 3;
2949
2950   uint16_t me = session->local_peer_idx;
2951
2952   /* The task we're currently setting up. */
2953   struct TaskEntry task;
2954
2955   /* Current leader */
2956   unsigned int lead;
2957
2958   struct Step *step;
2959   struct Step *prev_step;
2960
2961   unsigned int round = 0;
2962
2963   unsigned int i;
2964
2965   // XXX: introduce first step,
2966   // where we wait for all insert acks
2967   // from the set service
2968
2969   /* faster but brittle all-to-all */
2970
2971   // XXX: Not implemented yet
2972
2973   /* all-to-all step */
2974
2975   step = create_step (session, round, GNUNET_NO);
2976
2977 #ifdef GNUNET_EXTRA_LOGGING
2978   step->debug_name = GNUNET_strdup ("all to all");
2979 #endif
2980
2981   for (i = 0; i < n; i++)
2982   {
2983     uint16_t p1;
2984     uint16_t p2;
2985
2986     p1 = me;
2987     p2 = i;
2988     arrange_peers (&p1, &p2, n);
2989     task = ((struct TaskEntry) {
2990       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2991       .step = step,
2992       .start = task_start_reconcile,
2993       .cancel = task_cancel_reconcile,
2994     });
2995     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2996     task.cls.setop.output_set = task.cls.setop.input_set;
2997     task.cls.setop.do_not_remove = GNUNET_YES;
2998     put_task (session->taskmap, &task);
2999   }
3000
3001   round += 1;
3002   prev_step = step;
3003   step = create_step (session, round, GNUNET_NO);;
3004 #ifdef GNUNET_EXTRA_LOGGING
3005   step->debug_name = GNUNET_strdup ("all to all 2");
3006 #endif
3007   step_depend_on (step, prev_step);
3008
3009
3010   for (i = 0; i < n; i++)
3011   {
3012     uint16_t p1;
3013     uint16_t p2;
3014
3015     p1 = me;
3016     p2 = i;
3017     arrange_peers (&p1, &p2, n);
3018     task = ((struct TaskEntry) {
3019       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3020       .step = step,
3021       .start = task_start_reconcile,
3022       .cancel = task_cancel_reconcile,
3023     });
3024     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3025     task.cls.setop.output_set = task.cls.setop.input_set;
3026     task.cls.setop.do_not_remove = GNUNET_YES;
3027     put_task (session->taskmap, &task);
3028   }
3029
3030   round += 1;
3031
3032   prev_step = step;
3033   step = NULL;
3034
3035
3036
3037   /* Byzantine union */
3038
3039   /* sequential repetitions of the gradecasts */
3040   for (i = 0; i < t + 1; i++)
3041   {
3042     struct Step *step_rep_start;
3043     struct Step *step_rep_end;
3044
3045     /* Every repetition is in a separate round. */
3046     step_rep_start = create_step (session, round, GNUNET_YES);
3047 #ifdef GNUNET_EXTRA_LOGGING
3048     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3049 #endif
3050
3051     step_depend_on (step_rep_start, prev_step);
3052
3053     /* gradecast has three rounds */
3054     round += 3;
3055     step_rep_end = create_step (session, round, GNUNET_YES);
3056 #ifdef GNUNET_EXTRA_LOGGING
3057     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3058 #endif
3059
3060     /* parallel gradecasts */
3061     for (lead = 0; lead < n; lead++)
3062       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3063
3064     task = ((struct TaskEntry) {
3065       .step = step_rep_end,
3066       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3067       .start = task_start_apply_round,
3068     });
3069     put_task (session->taskmap, &task);
3070
3071     prev_step = step_rep_end;
3072   }
3073
3074  /* There is no next gradecast round, thus the final
3075     start step is the overall end step of the gradecasts */
3076   round += 1;
3077   step = create_step (session, round, GNUNET_NO);
3078 #ifdef GNUNET_EXTRA_LOGGING
3079   GNUNET_asprintf (&step->debug_name, "finish");
3080 #endif
3081   step_depend_on (step, prev_step);
3082
3083   task = ((struct TaskEntry) {
3084     .step = step,
3085     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3086     .start = task_start_finish,
3087   });
3088   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3089
3090   put_task (session->taskmap, &task);
3091 }
3092
3093
3094
3095 /**
3096  * Check join message.
3097  *
3098  * @param cls session of client that sent the message
3099  * @param m message sent by the client
3100  * @return #GNUNET_OK if @a m is well-formed
3101  */
3102 static int
3103 check_client_join (void *cls,
3104                    const struct GNUNET_CONSENSUS_JoinMessage *m)
3105 {
3106   uint32_t listed_peers = ntohl (m->num_peers);
3107
3108   if ( (ntohs (m->header.size) - sizeof (*m)) !=
3109        listed_peers * sizeof (struct GNUNET_PeerIdentity))
3110   {
3111     GNUNET_break (0);
3112     return GNUNET_SYSERR;
3113   }
3114   return GNUNET_OK;
3115 }
3116
3117
3118 /**
3119  * Called when a client wants to join a consensus session.
3120  *
3121  * @param cls session of client that sent the message
3122  * @param m message sent by the client
3123  */
3124 static void
3125 handle_client_join (void *cls,
3126                     const struct GNUNET_CONSENSUS_JoinMessage *m)
3127 {
3128   struct ConsensusSession *session = cls;
3129   struct ConsensusSession *other_session;
3130
3131   initialize_session_peer_list (session,
3132                                 m);
3133   compute_global_id (session,
3134                      &m->session_id);
3135
3136   /* Check if some local client already owns the session.
3137      It is only legal to have a session with an existing global id
3138      if all other sessions with this global id are finished.*/
3139   for (other_session = sessions_head;
3140        NULL != other_session;
3141        other_session = other_session->next)
3142   {
3143     if ( (other_session != session) &&
3144          (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3145                                        &other_session->global_id)) )
3146       break;
3147   }
3148
3149   session->conclude_deadline
3150     = GNUNET_TIME_absolute_ntoh (m->deadline);
3151   session->conclude_start
3152     = GNUNET_TIME_absolute_ntoh (m->start);
3153   session->local_peer_idx = get_peer_idx (&my_peer,
3154                                           session);
3155   GNUNET_assert (-1 != session->local_peer_idx);
3156
3157   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3158               "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3159               GNUNET_h2s (&m->session_id),
3160               session->num_peers,
3161               session->local_peer_idx,
3162               GNUNET_STRINGS_relative_time_to_string
3163               (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3164                                                     session->conclude_deadline),
3165                GNUNET_YES));
3166
3167   session->set_listener
3168     = GNUNET_SET_listen (cfg,
3169                          GNUNET_SET_OPERATION_UNION,
3170                          &session->global_id,
3171                          &set_listen_cb,
3172                          session);
3173
3174   session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3175                                                           GNUNET_NO);
3176   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3177                                                            GNUNET_NO);
3178   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3179                                                            GNUNET_NO);
3180   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3181                                                           GNUNET_NO);
3182
3183   {
3184     struct SetEntry *client_set;
3185
3186     client_set = GNUNET_new (struct SetEntry);
3187     client_set->h = GNUNET_SET_create (cfg,
3188                                        GNUNET_SET_OPERATION_UNION);
3189     struct SetHandle *sh = GNUNET_new (struct SetHandle);
3190     sh->h = client_set->h;
3191     GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3192                                  session->set_handles_tail,
3193                                  sh);
3194     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3195     put_set (session,
3196              client_set);
3197   }
3198
3199   session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3200                                                  int);
3201
3202   /* Just construct the task graph,
3203      but don't run anything until the client calls conclude. */
3204   construct_task_graph (session);
3205   GNUNET_SERVICE_client_continue (session->client);
3206 }
3207
3208
3209 static void
3210 client_insert_done (void *cls)
3211 {
3212   // FIXME: implement
3213 }
3214
3215
3216 /**
3217  * Called when a client performs an insert operation.
3218  *
3219  * @param cls client handle
3220  * @param msg message sent by the client
3221  * @return #GNUNET_OK (always well-formed)
3222  */
3223 static int
3224 check_client_insert (void *cls,
3225                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3226 {
3227   return GNUNET_OK;
3228 }
3229
3230
3231 /**
3232  * Called when a client performs an insert operation.
3233  *
3234  * @param cls client handle
3235  * @param msg message sent by the client
3236  */
3237 static void
3238 handle_client_insert (void *cls,
3239                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3240 {
3241   struct ConsensusSession *session = cls;
3242   ssize_t element_size;
3243   struct GNUNET_SET_Handle *initial_set;
3244   struct ConsensusElement *ce;
3245
3246   if (GNUNET_YES == session->conclude_started)
3247   {
3248     GNUNET_break (0);
3249     GNUNET_SERVICE_client_drop (session->client);
3250     return;
3251   }
3252
3253   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3254   ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3255   GNUNET_memcpy (&ce[1], &msg[1], element_size);
3256   ce->payload_type = msg->element_type;
3257
3258   struct GNUNET_SET_Element element = {
3259     .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3260     .size = sizeof (struct ConsensusElement) + element_size,
3261     .data = ce,
3262   };
3263
3264   {
3265     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3266     struct SetEntry *entry;
3267
3268     entry = lookup_set (session,
3269                         &key);
3270     GNUNET_assert (NULL != entry);
3271     initial_set = entry->h;
3272   }
3273
3274   session->num_client_insert_pending++;
3275   GNUNET_SET_add_element (initial_set,
3276                           &element,
3277                           &client_insert_done,
3278                           session);
3279
3280 #ifdef GNUNET_EXTRA_LOGGING
3281   {
3282     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3283                 "P%u: element %s added\n",
3284                 session->local_peer_idx,
3285                 debug_str_element (&element));
3286   }
3287 #endif
3288   GNUNET_free (ce);
3289   GNUNET_SERVICE_client_continue (session->client);
3290 }
3291
3292
3293 /**
3294  * Called when a client performs the conclude operation.
3295  *
3296  * @param cls client handle
3297  * @param message message sent by the client
3298  */
3299 static void
3300 handle_client_conclude (void *cls,
3301                         const struct GNUNET_MessageHeader *message)
3302 {
3303   struct ConsensusSession *session = cls;
3304
3305   if (GNUNET_YES == session->conclude_started)
3306   {
3307     /* conclude started twice */
3308     GNUNET_break (0);
3309     GNUNET_SERVICE_client_drop (session->client);
3310     return;
3311   }
3312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3313               "conclude requested\n");
3314   session->conclude_started = GNUNET_YES;
3315   install_step_timeouts (session);
3316   run_ready_steps (session);
3317   GNUNET_SERVICE_client_continue (session->client);
3318 }
3319
3320
3321 /**
3322  * Called to clean up, after a shutdown has been requested.
3323  *
3324  * @param cls closure
3325  */
3326 static void
3327 shutdown_task (void *cls)
3328 {
3329   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3330               "shutting down\n");
3331   GNUNET_STATISTICS_destroy (statistics,
3332                              GNUNET_NO);
3333   statistics = NULL;
3334 }
3335
3336
3337 /**
3338  * Start processing consensus requests.
3339  *
3340  * @param cls closure
3341  * @param c configuration to use
3342  * @param service the initialized service
3343  */
3344 static void
3345 run (void *cls,
3346      const struct GNUNET_CONFIGURATION_Handle *c,
3347      struct GNUNET_SERVICE_Handle *service)
3348 {
3349   cfg = c;
3350   if (GNUNET_OK !=
3351       GNUNET_CRYPTO_get_peer_identity (cfg,
3352                                        &my_peer))
3353   {
3354     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3355                 "Could not retrieve host identity\n");
3356     GNUNET_SCHEDULER_shutdown ();
3357     return;
3358   }
3359   statistics = GNUNET_STATISTICS_create ("consensus",
3360                                          cfg);
3361   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3362                                  NULL);
3363 }
3364
3365
3366 /**
3367  * Callback called when a client connects to the service.
3368  *
3369  * @param cls closure for the service
3370  * @param c the new client that connected to the service
3371  * @param mq the message queue used to send messages to the client
3372  * @return @a c
3373  */
3374 static void *
3375 client_connect_cb (void *cls,
3376                    struct GNUNET_SERVICE_Client *c,
3377                    struct GNUNET_MQ_Handle *mq)
3378 {
3379   struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3380
3381   session->client = c;
3382   session->client_mq = mq;
3383   GNUNET_CONTAINER_DLL_insert (sessions_head,
3384                                sessions_tail,
3385                                session);
3386   return session;
3387 }
3388
3389
3390 /**
3391  * Callback called when a client disconnected from the service
3392  *
3393  * @param cls closure for the service
3394  * @param c the client that disconnected
3395  * @param internal_cls should be equal to @a c
3396  */
3397 static void
3398 client_disconnect_cb (void *cls,
3399                       struct GNUNET_SERVICE_Client *c,
3400                       void *internal_cls)
3401 {
3402   struct ConsensusSession *session = internal_cls;
3403
3404   if (NULL != session->set_listener)
3405   {
3406     GNUNET_SET_listen_cancel (session->set_listener);
3407     session->set_listener = NULL;
3408   }
3409   GNUNET_CONTAINER_DLL_remove (sessions_head,
3410                                sessions_tail,
3411                                session);
3412
3413   while (session->set_handles_head)
3414   {
3415     struct SetHandle *sh = session->set_handles_head;
3416     session->set_handles_head = sh->next;
3417     GNUNET_SET_destroy (sh->h);
3418     GNUNET_free (sh);
3419   }
3420   GNUNET_free (session);
3421 }
3422
3423
3424 /**
3425  * Define "main" method using service macro.
3426  */
3427 GNUNET_SERVICE_MAIN
3428 ("consensus",
3429  GNUNET_SERVICE_OPTION_NONE,
3430  &run,
3431  &client_connect_cb,
3432  &client_disconnect_cb,
3433  NULL,
3434  GNUNET_MQ_hd_fixed_size (client_conclude,
3435                           GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3436                           struct GNUNET_MessageHeader,
3437                           NULL),
3438  GNUNET_MQ_hd_var_size (client_insert,
3439                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3440                         struct GNUNET_CONSENSUS_ElementMessage,
3441                         NULL),
3442  GNUNET_MQ_hd_var_size (client_join,
3443                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3444                         struct GNUNET_CONSENSUS_JoinMessage,
3445                         NULL),
3446  GNUNET_MQ_handler_end ());
3447
3448 /* end of gnunet-service-consensus.c */