Merge remote-tracking branch 'origin/master' into credentials
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_consensus_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
37
38
39 enum ReferendumVote
40 {
41   /**
42    * Vote that nothing should change.
43    * This option is never voted explicitly.
44    */
45   VOTE_STAY = 0,
46   /**
47    * Vote that an element should be added.
48    */
49   VOTE_ADD = 1,
50   /**
51    * Vote that an element should be removed.
52    */
53   VOTE_REMOVE = 2,
54 };
55
56
57 enum EarlyStoppingPhase
58 {
59   EARLY_STOPPING_NONE = 0,
60   EARLY_STOPPING_ONE_MORE = 1,
61   EARLY_STOPPING_DONE = 2,
62 };
63
64
65 GNUNET_NETWORK_STRUCT_BEGIN
66
67 /**
68  * Tuple of integers that together
69  * identify a task uniquely.
70  */
71 struct TaskKey {
72   /**
73    * A value from 'enum PhaseKind'.
74    */
75   uint16_t kind GNUNET_PACKED;
76
77   /**
78    * Number of the first peer
79    * in canonical order.
80    */
81   int16_t peer1 GNUNET_PACKED;
82
83   /**
84    * Number of the second peer in canonical order.
85    */
86   int16_t peer2 GNUNET_PACKED;
87
88   /**
89    * Repetition of the gradecast phase.
90    */
91   int16_t repetition GNUNET_PACKED;
92
93   /**
94    * Leader in the gradecast phase.
95    *
96    * Can be different from both peer1 and peer2.
97    */
98   int16_t leader GNUNET_PACKED;
99 };
100
101
102
103 struct SetKey
104 {
105   int set_kind GNUNET_PACKED;
106   int k1 GNUNET_PACKED;
107   int k2 GNUNET_PACKED;
108 };
109
110
111 struct SetEntry
112 {
113   struct SetKey key;
114   struct GNUNET_SET_Handle *h;
115   /**
116    * GNUNET_YES if the set resulted
117    * from applying a referendum with contested
118    * elements.
119    */
120   int is_contested;
121 };
122
123
124 struct DiffKey
125 {
126   int diff_kind GNUNET_PACKED;
127   int k1 GNUNET_PACKED;
128   int k2 GNUNET_PACKED;
129 };
130
131 struct RfnKey
132 {
133   int rfn_kind GNUNET_PACKED;
134   int k1 GNUNET_PACKED;
135   int k2 GNUNET_PACKED;
136 };
137
138
139 GNUNET_NETWORK_STRUCT_END
140
141 enum PhaseKind
142 {
143   PHASE_KIND_ALL_TO_ALL,
144   PHASE_KIND_ALL_TO_ALL_2,
145   PHASE_KIND_GRADECAST_LEADER,
146   PHASE_KIND_GRADECAST_ECHO,
147   PHASE_KIND_GRADECAST_ECHO_GRADE,
148   PHASE_KIND_GRADECAST_CONFIRM,
149   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
150   /**
151    * Apply a repetition of the all-to-all
152    * gradecast to the current set.
153    */
154   PHASE_KIND_APPLY_REP,
155   PHASE_KIND_FINISH,
156 };
157
158
159 enum SetKind
160 {
161   SET_KIND_NONE = 0,
162   SET_KIND_CURRENT,
163   /**
164    * Last result set from a gradecast
165    */
166   SET_KIND_LAST_GRADECAST,
167   SET_KIND_LEADER_PROPOSAL,
168   SET_KIND_ECHO_RESULT,
169 };
170
171 enum DiffKind
172 {
173   DIFF_KIND_NONE = 0,
174   DIFF_KIND_LEADER_PROPOSAL,
175   DIFF_KIND_LEADER_CONSENSUS,
176   DIFF_KIND_GRADECAST_RESULT,
177 };
178
179 enum RfnKind
180 {
181   RFN_KIND_NONE = 0,
182   RFN_KIND_ECHO,
183   RFN_KIND_CONFIRM,
184   RFN_KIND_GRADECAST_RESULT
185 };
186
187
188 struct SetOpCls
189 {
190   struct SetKey input_set;
191
192   struct SetKey output_set;
193   struct RfnKey output_rfn;
194   struct DiffKey output_diff;
195
196   int do_not_remove;
197
198   int transceive_contested;
199
200   struct GNUNET_SET_OperationHandle *op;
201 };
202
203
204 struct FinishCls
205 {
206   struct SetKey input_set;
207 };
208
209 /**
210  * Closure for both @a start_task
211  * and @a cancel_task.
212  */
213 union TaskFuncCls
214 {
215   struct SetOpCls setop;
216   struct FinishCls finish;
217 };
218
219 struct TaskEntry;
220
221 typedef void (*TaskFunc) (struct TaskEntry *task);
222
223 /*
224  * Node in the consensus task graph.
225  */
226 struct TaskEntry
227 {
228   struct TaskKey key;
229
230   struct Step *step;
231
232   int is_started;
233
234   int is_finished;
235
236   TaskFunc start;
237   TaskFunc cancel;
238
239   union TaskFuncCls cls;
240 };
241
242
243 struct Step
244 {
245   /**
246    * All steps of one session are in a
247    * linked list for easier deallocation.
248    */
249   struct Step *prev;
250
251   /**
252    * All steps of one session are in a
253    * linked list for easier deallocation.
254    */
255   struct Step *next;
256
257   struct ConsensusSession *session;
258
259   /**
260    * Tasks that this step is composed of.
261    */
262   struct TaskEntry **tasks;
263   unsigned int tasks_len;
264   unsigned int tasks_cap;
265
266   unsigned int finished_tasks;
267
268   /*
269    * Tasks that have this task as dependency.
270    *
271    * We store pointers to subordinates rather
272    * than to prerequisites since it makes
273    * tracking the readiness of a task easier.
274    */
275   struct Step **subordinates;
276   unsigned int subordinates_len;
277   unsigned int subordinates_cap;
278
279   /**
280    * Counter for the prerequisites of
281    * this step.
282    */
283   size_t pending_prereq;
284
285   /*
286    * Task that will run this step despite
287    * any pending prerequisites.
288    */
289   struct GNUNET_SCHEDULER_Task *timeout_task;
290
291   unsigned int is_running;
292
293   unsigned int is_finished;
294
295   /*
296    * Synchrony round of the task.
297    * Determines the deadline for the task.
298    */
299   unsigned int round;
300
301   /**
302    * Human-readable name for
303    * the task, used for debugging.
304    */
305   char *debug_name;
306
307   /**
308    * When we're doing an early finish, how should this step be
309    * treated?
310    * If GNUNET_YES, the step will be marked as finished
311    * without actually running its tasks.
312    * Otherwise, the step will still be run even after
313    * an early finish.
314    *
315    * Note that a task may never be finished early if
316    * it is already running.
317    */
318   int early_finishable;
319 };
320
321
322 struct RfnElementInfo
323 {
324   const struct GNUNET_SET_Element *element;
325
326   /*
327    * GNUNET_YES if the peer votes for the proposal.
328    */
329   int *votes;
330
331   /**
332    * Proposal for this element,
333    * can only be VOTE_ADD or VOTE_REMOVE.
334    */
335   enum ReferendumVote proposal;
336 };
337
338
339 struct ReferendumEntry
340 {
341   struct RfnKey key;
342
343   /*
344    * Elements where there is at least one proposed change.
345    *
346    * Maps the hash of the GNUNET_SET_Element
347    * to 'struct RfnElementInfo'.
348    */
349   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
350
351   unsigned int num_peers;
352
353   /**
354    * Stores, for every peer in the session,
355    * whether the peer finished the whole referendum.
356    *
357    * Votes from peers are only counted if they're
358    * marked as commited (#GNUNET_YES) in the referendum.
359    *
360    * Otherwise (#GNUNET_NO), the requested changes are
361    * not counted for majority votes or thresholds.
362    */
363   int *peer_commited;
364
365
366   /**
367    * Contestation state of the peer.  If a peer is contested, the values it
368    * contributed are still counted for applying changes, but the grading is
369    * affected.
370    */
371   int *peer_contested;
372 };
373
374
375 struct DiffElementInfo
376 {
377   const struct GNUNET_SET_Element *element;
378
379   /**
380    * Positive weight for 'add', negative
381    * weights for 'remove'.
382    */
383   int weight;
384 };
385
386
387 /**
388  * Weighted diff.
389  */
390 struct DiffEntry
391 {
392   struct DiffKey key;
393   struct GNUNET_CONTAINER_MultiHashMap *changes;
394 };
395
396 struct SetHandle
397 {
398   struct SetHandle *prev;
399   struct SetHandle *next;
400
401   struct GNUNET_SET_Handle *h;
402 };
403
404
405
406 /**
407  * A consensus session consists of one local client and the remote authorities.
408  */
409 struct ConsensusSession
410 {
411   /**
412    * Consensus sessions are kept in a DLL.
413    */
414   struct ConsensusSession *next;
415
416   /**
417    * Consensus sessions are kept in a DLL.
418    */
419   struct ConsensusSession *prev;
420
421   unsigned int num_client_insert_pending;
422
423   struct GNUNET_CONTAINER_MultiHashMap *setmap;
424   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
425   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
426
427   /**
428    * Array of peers with length 'num_peers'.
429    */
430   int *peers_blacklisted;
431
432   /*
433    * Mapping from (hashed) TaskKey to TaskEntry.
434    *
435    * We map the application_id for a round to the task that should be
436    * executed, so we don't have to go through all task whenever we get
437    * an incoming set op request.
438    */
439   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
440
441   struct Step *steps_head;
442   struct Step *steps_tail;
443
444   int conclude_started;
445
446   int conclude_done;
447
448   /**
449   * Global consensus identification, computed
450   * from the session id and participating authorities.
451   */
452   struct GNUNET_HashCode global_id;
453
454   /**
455    * Client that inhabits the session
456    */
457   struct GNUNET_SERVICE_Client *client;
458
459   /**
460    * Queued messages to the client.
461    */
462   struct GNUNET_MQ_Handle *client_mq;
463
464   /**
465    * Time when the conclusion of the consensus should begin.
466    */
467   struct GNUNET_TIME_Absolute conclude_start;
468
469   /**
470    * Timeout for all rounds together, single rounds will schedule a timeout task
471    * with a fraction of the conclude timeout.
472    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
473    */
474   struct GNUNET_TIME_Absolute conclude_deadline;
475
476   struct GNUNET_PeerIdentity *peers;
477
478   /**
479    * Number of other peers in the consensus.
480    */
481   unsigned int num_peers;
482
483   /**
484    * Index of the local peer in the peers array
485    */
486   unsigned int local_peer_idx;
487
488   /**
489    * Listener for requests from other peers.
490    * Uses the session's global id as app id.
491    */
492   struct GNUNET_SET_ListenHandle *set_listener;
493
494   /**
495    * State of our early stopping scheme.
496    */
497   int early_stopping;
498
499   /**
500    * Our set size from the first round.
501    */
502   uint64_t first_size;
503
504   uint64_t *first_sizes_received;
505
506   /**
507    * Bounded Eppstein lower bound.
508    */
509   uint64_t lower_bound;
510
511   struct SetHandle *set_handles_head;
512   struct SetHandle *set_handles_tail;
513 };
514
515 /**
516  * Linked list of sessions this peer participates in.
517  */
518 static struct ConsensusSession *sessions_head;
519
520 /**
521  * Linked list of sessions this peer participates in.
522  */
523 static struct ConsensusSession *sessions_tail;
524
525 /**
526  * Configuration of the consensus service.
527  */
528 static const struct GNUNET_CONFIGURATION_Handle *cfg;
529
530 /**
531  * Peer that runs this service.
532  */
533 static struct GNUNET_PeerIdentity my_peer;
534
535 /**
536  * Statistics handle.
537  */
538 struct GNUNET_STATISTICS_Handle *statistics;
539
540
541 static void
542 finish_task (struct TaskEntry *task);
543
544
545 static void
546 run_ready_steps (struct ConsensusSession *session);
547
548
549 static const char *
550 phasename (uint16_t phase)
551 {
552   switch (phase)
553   {
554     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
555     case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
556     case PHASE_KIND_FINISH: return "FINISH";
557     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
558     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
559     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
560     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
561     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
562     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
563     default: return "(unknown)";
564   }
565 }
566
567
568 static const char *
569 setname (uint16_t kind)
570 {
571   switch (kind)
572   {
573     case SET_KIND_CURRENT: return "CURRENT";
574     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
575     case SET_KIND_NONE: return "NONE";
576     default: return "(unknown)";
577   }
578 }
579
580 static const char *
581 rfnname (uint16_t kind)
582 {
583   switch (kind)
584   {
585     case RFN_KIND_NONE: return "NONE";
586     case RFN_KIND_ECHO: return "ECHO";
587     case RFN_KIND_CONFIRM: return "CONFIRM";
588     default: return "(unknown)";
589   }
590 }
591
592 static const char *
593 diffname (uint16_t kind)
594 {
595   switch (kind)
596   {
597     case DIFF_KIND_NONE: return "NONE";
598     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
599     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
600     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
601     default: return "(unknown)";
602   }
603 }
604
605 #ifdef GNUNET_EXTRA_LOGGING
606
607
608 static const char *
609 debug_str_element (const struct GNUNET_SET_Element *el)
610 {
611   struct GNUNET_HashCode hash;
612
613   GNUNET_SET_element_hash (el, &hash);
614
615   return GNUNET_h2s (&hash);
616 }
617
618 static const char *
619 debug_str_task_key (struct TaskKey *tk)
620 {
621   static char buf[256];
622
623   snprintf (buf, sizeof (buf),
624             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
625             phasename (tk->kind), tk->peer1, tk->peer2,
626             tk->leader, tk->repetition);
627
628   return buf;
629 }
630
631 static const char *
632 debug_str_diff_key (struct DiffKey *dk)
633 {
634   static char buf[256];
635
636   snprintf (buf, sizeof (buf),
637             "DiffKey kind=%s, k1=%d, k2=%d",
638             diffname (dk->diff_kind), dk->k1, dk->k2);
639
640   return buf;
641 }
642
643 static const char *
644 debug_str_set_key (const struct SetKey *sk)
645 {
646   static char buf[256];
647
648   snprintf (buf, sizeof (buf),
649             "SetKey kind=%s, k1=%d, k2=%d",
650             setname (sk->set_kind), sk->k1, sk->k2);
651
652   return buf;
653 }
654
655
656 static const char *
657 debug_str_rfn_key (const struct RfnKey *rk)
658 {
659   static char buf[256];
660
661   snprintf (buf, sizeof (buf),
662             "RfnKey kind=%s, k1=%d, k2=%d",
663             rfnname (rk->rfn_kind), rk->k1, rk->k2);
664
665   return buf;
666 }
667
668 #endif /* GNUNET_EXTRA_LOGGING */
669
670
671 /**
672  * Send the final result set of the consensus to the client, element by
673  * element.
674  *
675  * @param cls closure
676  * @param element the current element, NULL if all elements have been
677  *        iterated over
678  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
679  */
680 static int
681 send_to_client_iter (void *cls,
682                      const struct GNUNET_SET_Element *element)
683 {
684   struct TaskEntry *task = (struct TaskEntry *) cls;
685   struct ConsensusSession *session = task->step->session;
686   struct GNUNET_MQ_Envelope *ev;
687
688   if (NULL != element)
689   {
690     struct GNUNET_CONSENSUS_ElementMessage *m;
691     const struct ConsensusElement *ce;
692
693     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
694     ce = element->data;
695
696     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n", (unsigned) ce->marker);
697
698     if (0 != ce->marker)
699       return GNUNET_YES;
700
701     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702                 "P%d: sending element %s to client\n",
703                 session->local_peer_idx,
704                 debug_str_element (element));
705
706     ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
707                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
708     m->element_type = ce->payload_type;
709     GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
710     GNUNET_MQ_send (session->client_mq, ev);
711   }
712   else
713   {
714     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715                 "P%d: finished iterating elements for client\n",
716                 session->local_peer_idx);
717     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
718     GNUNET_MQ_send (session->client_mq, ev);
719   }
720   return GNUNET_YES;
721 }
722
723
724 static struct SetEntry *
725 lookup_set (struct ConsensusSession *session, struct SetKey *key)
726 {
727   struct GNUNET_HashCode hash;
728
729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730               "P%u: looking up set {%s}\n",
731               session->local_peer_idx,
732               debug_str_set_key (key));
733
734   GNUNET_assert (SET_KIND_NONE != key->set_kind);
735   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
736   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
737 }
738
739
740 static struct DiffEntry *
741 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
742 {
743   struct GNUNET_HashCode hash;
744
745   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746               "P%u: looking up diff {%s}\n",
747               session->local_peer_idx,
748               debug_str_diff_key (key));
749
750   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
751   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
752   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
753 }
754
755
756 static struct ReferendumEntry *
757 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
758 {
759   struct GNUNET_HashCode hash;
760
761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762               "P%u: looking up rfn {%s}\n",
763               session->local_peer_idx,
764               debug_str_rfn_key (key));
765
766   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
767   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
768   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
769 }
770
771
772 static void
773 diff_insert (struct DiffEntry *diff,
774              int weight,
775              const struct GNUNET_SET_Element *element)
776 {
777   struct DiffElementInfo *di;
778   struct GNUNET_HashCode hash;
779
780   GNUNET_assert ( (1 == weight) || (-1 == weight));
781
782   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783               "diff_insert with element size %u\n",
784               element->size);
785
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "hashing element\n");
788
789   GNUNET_SET_element_hash (element, &hash);
790
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "hashed element\n");
793
794   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
795
796   if (NULL == di)
797   {
798     di = GNUNET_new (struct DiffElementInfo);
799     di->element = GNUNET_SET_element_dup (element);
800     GNUNET_assert (GNUNET_OK ==
801                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
802                                                       &hash, di,
803                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
804   }
805
806   di->weight = weight;
807 }
808
809
810 static void
811 rfn_commit (struct ReferendumEntry *rfn,
812             uint16_t commit_peer)
813 {
814   GNUNET_assert (commit_peer < rfn->num_peers);
815
816   rfn->peer_commited[commit_peer] = GNUNET_YES;
817 }
818
819
820 static void
821 rfn_contest (struct ReferendumEntry *rfn,
822              uint16_t contested_peer)
823 {
824   GNUNET_assert (contested_peer < rfn->num_peers);
825
826   rfn->peer_contested[contested_peer] = GNUNET_YES;
827 }
828
829
830 static uint16_t
831 rfn_noncontested (struct ReferendumEntry *rfn)
832 {
833   uint16_t i;
834   uint16_t ret;
835
836   ret = 0;
837   for (i = 0; i < rfn->num_peers; i++)
838     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
839       ret++;
840
841   return ret;
842 }
843
844
845 static void
846 rfn_vote (struct ReferendumEntry *rfn,
847           uint16_t voting_peer,
848           enum ReferendumVote vote,
849           const struct GNUNET_SET_Element *element)
850 {
851   struct RfnElementInfo *ri;
852   struct GNUNET_HashCode hash;
853
854   GNUNET_assert (voting_peer < rfn->num_peers);
855
856   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
857      since VOTE_KEEP is implicit in not voting. */
858   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
859
860   GNUNET_SET_element_hash (element, &hash);
861   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
862
863   if (NULL == ri)
864   {
865     ri = GNUNET_new (struct RfnElementInfo);
866     ri->element = GNUNET_SET_element_dup (element);
867     ri->votes = GNUNET_new_array (rfn->num_peers, int);
868     GNUNET_assert (GNUNET_OK ==
869                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
870                                                       &hash, ri,
871                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
872   }
873
874   ri->votes[voting_peer] = GNUNET_YES;
875   ri->proposal = vote;
876 }
877
878
879 static uint16_t
880 task_other_peer (struct TaskEntry *task)
881 {
882   uint16_t me = task->step->session->local_peer_idx;
883   if (task->key.peer1 == me)
884     return task->key.peer2;
885   return task->key.peer1;
886 }
887
888
889 static int
890 cmp_uint64_t (const void *pa, const void *pb)
891 {
892   uint64_t a = *(uint64_t *) pa;
893   uint64_t b = *(uint64_t *) pb;
894
895   if (a == b)
896     return 0;
897   if (a < b)
898     return -1;
899   return 1;
900 }
901
902
903 /**
904  * Callback for set operation results. Called for each element
905  * in the result set.
906  *
907  * @param cls closure
908  * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
909  * @param current_size current set size
910  * @param status see enum GNUNET_SET_Status
911  */
912 static void
913 set_result_cb (void *cls,
914                const struct GNUNET_SET_Element *element,
915                uint64_t current_size,
916                enum GNUNET_SET_Status status)
917 {
918   struct TaskEntry *task = cls;
919   struct ConsensusSession *session = task->step->session;
920   struct SetEntry *output_set = NULL;
921   struct DiffEntry *output_diff = NULL;
922   struct ReferendumEntry *output_rfn = NULL;
923   unsigned int other_idx;
924   struct SetOpCls *setop;
925   const struct ConsensusElement *consensus_element = NULL;
926
927   if (NULL != element)
928   {
929     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930                 "P%u: got element of type %u, status %u\n",
931                 session->local_peer_idx,
932                 (unsigned) element->element_type,
933                 (unsigned) status);
934     GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
935     consensus_element = element->data;
936   }
937
938   setop = &task->cls.setop;
939
940
941   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942               "P%u: got set result for {%s}, status %u\n",
943               session->local_peer_idx,
944               debug_str_task_key (&task->key),
945               status);
946
947   if (GNUNET_NO == task->is_started)
948   {
949     GNUNET_break_op (0);
950     return;
951   }
952
953   if (GNUNET_YES == task->is_finished)
954   {
955     GNUNET_break_op (0);
956     return;
957   }
958
959   other_idx = task_other_peer (task);
960
961   if (SET_KIND_NONE != setop->output_set.set_kind)
962   {
963     output_set = lookup_set (session, &setop->output_set);
964     GNUNET_assert (NULL != output_set);
965   }
966
967   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
968   {
969     output_diff = lookup_diff (session, &setop->output_diff);
970     GNUNET_assert (NULL != output_diff);
971   }
972
973   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
974   {
975     output_rfn = lookup_rfn (session, &setop->output_rfn);
976     GNUNET_assert (NULL != output_rfn);
977   }
978
979   if (GNUNET_YES == session->peers_blacklisted[other_idx])
980   {
981     /* Peer might have been blacklisted
982        by a gradecast running in parallel, ignore elements from now */
983     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
984       return;
985     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
986       return;
987   }
988
989   if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
990   {
991     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
992                 "P%u: got some marker\n",
993                   session->local_peer_idx);
994     if ( (GNUNET_YES == setop->transceive_contested) &&
995          (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
996     {
997       GNUNET_assert (NULL != output_rfn);
998       rfn_contest (output_rfn, task_other_peer (task));
999       return;
1000     }
1001
1002     if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1003     {
1004
1005       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006                   "P%u: got size marker\n",
1007                   session->local_peer_idx);
1008
1009
1010       struct ConsensusSizeElement *cse = (void *) consensus_element;
1011
1012       if (cse->sender_index == other_idx)
1013       {
1014         if (NULL == session->first_sizes_received)
1015           session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1016         session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1017
1018         uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1019         qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1020         session->lower_bound = copy[session->num_peers / 3 + 1];
1021         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022                     "P%u: lower bound %llu\n",
1023                     session->local_peer_idx,
1024                     (long long) session->lower_bound);
1025         GNUNET_free (copy);
1026       }
1027       return;
1028     }
1029
1030     return;
1031   }
1032
1033   switch (status)
1034   {
1035     case GNUNET_SET_STATUS_ADD_LOCAL:
1036       GNUNET_assert (NULL != consensus_element);
1037       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038                   "Adding element in Task {%s}\n",
1039                   debug_str_task_key (&task->key));
1040       if (NULL != output_set)
1041       {
1042         // FIXME: record pending adds, use callback
1043         GNUNET_SET_add_element (output_set->h,
1044                                 element,
1045                                 NULL,
1046                                 NULL);
1047 #ifdef GNUNET_EXTRA_LOGGING
1048         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049                     "P%u: adding element %s into set {%s} of task {%s}\n",
1050                     session->local_peer_idx,
1051                     debug_str_element (element),
1052                     debug_str_set_key (&setop->output_set),
1053                     debug_str_task_key (&task->key));
1054 #endif
1055       }
1056       if (NULL != output_diff)
1057       {
1058         diff_insert (output_diff, 1, element);
1059 #ifdef GNUNET_EXTRA_LOGGING
1060         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061                     "P%u: adding element %s into diff {%s} of task {%s}\n",
1062                     session->local_peer_idx,
1063                     debug_str_element (element),
1064                     debug_str_diff_key (&setop->output_diff),
1065                     debug_str_task_key (&task->key));
1066 #endif
1067       }
1068       if (NULL != output_rfn)
1069       {
1070         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1071 #ifdef GNUNET_EXTRA_LOGGING
1072         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
1074                     session->local_peer_idx,
1075                     debug_str_element (element),
1076                     debug_str_rfn_key (&setop->output_rfn),
1077                     debug_str_task_key (&task->key));
1078 #endif
1079       }
1080       // XXX: add result to structures in task
1081       break;
1082     case GNUNET_SET_STATUS_ADD_REMOTE:
1083       GNUNET_assert (NULL != consensus_element);
1084       if (GNUNET_YES == setop->do_not_remove)
1085         break;
1086       if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1087         break;
1088       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1089                   "Removing element in Task {%s}\n",
1090                   debug_str_task_key (&task->key));
1091       if (NULL != output_set)
1092       {
1093         // FIXME: record pending adds, use callback
1094         GNUNET_SET_remove_element (output_set->h,
1095                                    element,
1096                                    NULL,
1097                                    NULL);
1098 #ifdef GNUNET_EXTRA_LOGGING
1099         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1100                     "P%u: removing element %s from set {%s} of task {%s}\n",
1101                     session->local_peer_idx,
1102                     debug_str_element (element),
1103                     debug_str_set_key (&setop->output_set),
1104                     debug_str_task_key (&task->key));
1105 #endif
1106       }
1107       if (NULL != output_diff)
1108       {
1109         diff_insert (output_diff, -1, element);
1110 #ifdef GNUNET_EXTRA_LOGGING
1111         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1113                     session->local_peer_idx,
1114                     debug_str_element (element),
1115                     debug_str_diff_key (&setop->output_diff),
1116                     debug_str_task_key (&task->key));
1117 #endif
1118       }
1119       if (NULL != output_rfn)
1120       {
1121         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1122 #ifdef GNUNET_EXTRA_LOGGING
1123         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1125                     session->local_peer_idx,
1126                     debug_str_element (element),
1127                     debug_str_rfn_key (&setop->output_rfn),
1128                     debug_str_task_key (&task->key));
1129 #endif
1130       }
1131       break;
1132     case GNUNET_SET_STATUS_DONE:
1133       // XXX: check first if any changes to the underlying
1134       // set are still pending
1135       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136                   "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1137                   session->local_peer_idx,
1138                   debug_str_task_key (&task->key),
1139                   (unsigned int) task->step->finished_tasks,
1140                   (unsigned int) task->step->tasks_len);
1141       if (NULL != output_rfn)
1142       {
1143         rfn_commit (output_rfn, task_other_peer (task));
1144       }
1145       if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1146       {
1147         session->first_size = current_size;
1148       }
1149       finish_task (task);
1150       break;
1151     case GNUNET_SET_STATUS_FAILURE:
1152       // XXX: cleanup
1153       GNUNET_break_op (0);
1154       finish_task (task);
1155       return;
1156     default:
1157       /* not reached */
1158       GNUNET_assert (0);
1159   }
1160 }
1161
1162 #ifdef EVIL
1163
1164 enum EvilnessType
1165 {
1166   EVILNESS_NONE,
1167   EVILNESS_CRAM_ALL,
1168   EVILNESS_CRAM_LEAD,
1169   EVILNESS_CRAM_ECHO,
1170   EVILNESS_SLACK,
1171   EVILNESS_SLACK_A2A,
1172 };
1173
1174 enum EvilnessSubType
1175 {
1176   EVILNESS_SUB_NONE,
1177   EVILNESS_SUB_REPLACEMENT,
1178   EVILNESS_SUB_NO_REPLACEMENT,
1179 };
1180
1181 struct Evilness
1182 {
1183   enum EvilnessType type;
1184   enum EvilnessSubType subtype;
1185   unsigned int num;
1186 };
1187
1188
1189 static int
1190 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1191 {
1192   if (0 == strcmp ("replace", evil_subtype_str))
1193   {
1194     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1195   }
1196   else if (0 == strcmp ("noreplace", evil_subtype_str))
1197   {
1198     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1199   }
1200   else
1201   {
1202     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1203                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1204                 evil_subtype_str);
1205     return GNUNET_SYSERR;
1206   }
1207   return GNUNET_OK;
1208 }
1209
1210
1211 static void
1212 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1213 {
1214   char *evil_spec;
1215   char *field;
1216   char *evil_type_str = NULL;
1217   char *evil_subtype_str = NULL;
1218
1219   GNUNET_assert (NULL != evil);
1220
1221   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1222   {
1223     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1224                 "P%u: no evilness\n",
1225                 session->local_peer_idx);
1226     evil->type = EVILNESS_NONE;
1227     return;
1228   }
1229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230               "P%u: got evilness spec\n",
1231               session->local_peer_idx);
1232
1233   for (field = strtok (evil_spec, "/");
1234        NULL != field;
1235        field = strtok (NULL, "/"))
1236   {
1237     unsigned int peer_num;
1238     unsigned int evil_num;
1239     int ret;
1240
1241     evil_type_str = NULL;
1242     evil_subtype_str = NULL;
1243
1244     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1245
1246     if (ret != 4)
1247     {
1248       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1249                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1250                   field,
1251                   ret);
1252       goto not_evil;
1253     }
1254
1255     GNUNET_assert (NULL != evil_type_str);
1256     GNUNET_assert (NULL != evil_subtype_str);
1257
1258     if (peer_num == session->local_peer_idx)
1259     {
1260       if (0 == strcmp ("slack", evil_type_str))
1261       {
1262         evil->type = EVILNESS_SLACK;
1263       }
1264       if (0 == strcmp ("slack-a2a", evil_type_str))
1265       {
1266         evil->type = EVILNESS_SLACK_A2A;
1267       }
1268       else if (0 == strcmp ("cram-all", evil_type_str))
1269       {
1270         evil->type = EVILNESS_CRAM_ALL;
1271         evil->num = evil_num;
1272         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1273           goto not_evil;
1274       }
1275       else if (0 == strcmp ("cram-lead", evil_type_str))
1276       {
1277         evil->type = EVILNESS_CRAM_LEAD;
1278         evil->num = evil_num;
1279         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1280           goto not_evil;
1281       }
1282       else if (0 == strcmp ("cram-echo", evil_type_str))
1283       {
1284         evil->type = EVILNESS_CRAM_ECHO;
1285         evil->num = evil_num;
1286         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1287           goto not_evil;
1288       }
1289       else
1290       {
1291         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1292                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1293                     evil_type_str);
1294         goto not_evil;
1295       }
1296       goto cleanup;
1297     }
1298     /* No GNUNET_free since memory was allocated by libc */
1299     free (evil_type_str);
1300     evil_type_str = NULL;
1301     evil_subtype_str = NULL;
1302   }
1303 not_evil:
1304   evil->type = EVILNESS_NONE;
1305 cleanup:
1306   GNUNET_free (evil_spec);
1307   /* no GNUNET_free_non_null since it wasn't
1308    * allocated with GNUNET_malloc */
1309   if (NULL != evil_type_str)
1310     free (evil_type_str);
1311   if (NULL != evil_subtype_str)
1312     free (evil_subtype_str);
1313 }
1314
1315 #endif
1316
1317
1318 /**
1319  * Commit the appropriate set for a
1320  * task.
1321  */
1322 static void
1323 commit_set (struct ConsensusSession *session,
1324             struct TaskEntry *task)
1325 {
1326   struct SetEntry *set;
1327   struct SetOpCls *setop = &task->cls.setop;
1328
1329   GNUNET_assert (NULL != setop->op);
1330   set = lookup_set (session, &setop->input_set);
1331   GNUNET_assert (NULL != set);
1332
1333   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1334   {
1335     struct GNUNET_SET_Element element;
1336     struct ConsensusElement ce = { 0 };
1337     ce.marker = CONSENSUS_MARKER_CONTESTED;
1338     element.data = &ce;
1339     element.size = sizeof (struct ConsensusElement);
1340     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1341     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1342   }
1343
1344   if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1345   {
1346     struct GNUNET_SET_Element element;
1347     struct ConsensusSizeElement cse = {
1348       .size = 0,
1349       .sender_index = 0
1350     };
1351     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1352     cse.ce.marker = CONSENSUS_MARKER_SIZE;
1353     cse.size = GNUNET_htonll (session->first_size);
1354     cse.sender_index = session->local_peer_idx;
1355     element.data = &cse;
1356     element.size = sizeof (struct ConsensusSizeElement);
1357     element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1358     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1359   }
1360
1361 #ifdef EVIL
1362   {
1363     unsigned int i;
1364     struct Evilness evil;
1365
1366     get_evilness (session, &evil);
1367     if (EVILNESS_NONE != evil.type)
1368     {
1369       /* Useful for evaluation */
1370       GNUNET_STATISTICS_set (statistics,
1371                              "is evil",
1372                              1,
1373                              GNUNET_NO);
1374     }
1375     switch (evil.type)
1376     {
1377       case EVILNESS_CRAM_ALL:
1378       case EVILNESS_CRAM_LEAD:
1379       case EVILNESS_CRAM_ECHO:
1380         /* We're not cramming elements in the
1381            all-to-all round, since that would just
1382            add more elements to the result set, but
1383            wouldn't test robustness. */
1384         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1385         {
1386           GNUNET_SET_commit (setop->op, set->h);
1387           break;
1388         }
1389         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1390             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1391         {
1392           GNUNET_SET_commit (setop->op, set->h);
1393           break;
1394         }
1395         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1396         {
1397           GNUNET_SET_commit (setop->op, set->h);
1398           break;
1399         }
1400         for (i = 0; i < evil.num; i++)
1401         {
1402           struct GNUNET_SET_Element element;
1403           struct ConsensusStuffedElement se = {
1404             .ce.payload_type = 0,
1405             .ce.marker = 0,
1406           };
1407           element.data = &se;
1408           element.size = sizeof (struct ConsensusStuffedElement);
1409           element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1410
1411           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1412           {
1413             /* Always generate a new element. */
1414             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1415           }
1416           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1417           {
1418             /* Always cram the same elements, derived from counter. */
1419             GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1420           }
1421           else
1422           {
1423             GNUNET_assert (0);
1424           }
1425           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1426 #ifdef GNUNET_EXTRA_LOGGING
1427           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1428                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1429                       session->local_peer_idx,
1430                       debug_str_element (&element),
1431                       debug_str_set_key (&setop->input_set),
1432                       debug_str_task_key (&task->key));
1433 #endif
1434         }
1435         GNUNET_STATISTICS_update (statistics,
1436                                   "# stuffed elements",
1437                                   evil.num,
1438                                   GNUNET_NO);
1439         GNUNET_SET_commit (setop->op, set->h);
1440         break;
1441       case EVILNESS_SLACK:
1442         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1443                     "P%u: evil peer: slacking\n",
1444                     (unsigned int) session->local_peer_idx);
1445         /* Do nothing. */
1446       case EVILNESS_SLACK_A2A:
1447         if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1448              (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1449         {
1450           struct GNUNET_SET_Handle *empty_set;
1451           empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1452           GNUNET_SET_commit (setop->op, empty_set);
1453           GNUNET_SET_destroy (empty_set);
1454         }
1455         else
1456         {
1457           GNUNET_SET_commit (setop->op, set->h);
1458         }
1459         break;
1460       case EVILNESS_NONE:
1461         GNUNET_SET_commit (setop->op, set->h);
1462         break;
1463     }
1464   }
1465 #else
1466   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1467   {
1468     GNUNET_SET_commit (setop->op, set->h);
1469   }
1470   else
1471   {
1472     /* For our testcases, we don't want the blacklisted
1473        peers to wait. */
1474     GNUNET_SET_operation_cancel (setop->op);
1475     setop->op = NULL;
1476     finish_task (task);
1477   }
1478 #endif
1479 }
1480
1481
1482 static void
1483 put_diff (struct ConsensusSession *session,
1484          struct DiffEntry *diff)
1485 {
1486   struct GNUNET_HashCode hash;
1487
1488   GNUNET_assert (NULL != diff);
1489
1490   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1491   GNUNET_assert (GNUNET_OK ==
1492                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1493                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1494 }
1495
1496 static void
1497 put_set (struct ConsensusSession *session,
1498          struct SetEntry *set)
1499 {
1500   struct GNUNET_HashCode hash;
1501
1502   GNUNET_assert (NULL != set->h);
1503
1504   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1505               "Putting set %s\n",
1506               debug_str_set_key (&set->key));
1507
1508   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1509   GNUNET_assert (GNUNET_SYSERR !=
1510                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1511                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1512 }
1513
1514
1515 static void
1516 put_rfn (struct ConsensusSession *session,
1517          struct ReferendumEntry *rfn)
1518 {
1519   struct GNUNET_HashCode hash;
1520
1521   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1522   GNUNET_assert (GNUNET_OK ==
1523                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1524                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1525 }
1526
1527
1528
1529 static void
1530 task_cancel_reconcile (struct TaskEntry *task)
1531 {
1532   /* not implemented yet */
1533   GNUNET_assert (0);
1534 }
1535
1536
1537 static void
1538 apply_diff_to_rfn (struct DiffEntry *diff,
1539                    struct ReferendumEntry *rfn,
1540                    uint16_t voting_peer,
1541                    uint16_t num_peers)
1542 {
1543   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1544   struct DiffElementInfo *di;
1545
1546   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1547
1548   while (GNUNET_YES ==
1549          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1550                                                       NULL,
1551                                                       (const void **) &di))
1552   {
1553     if (di->weight > 0)
1554     {
1555       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1556     }
1557     if (di->weight < 0)
1558     {
1559       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1560     }
1561   }
1562
1563   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1564 }
1565
1566
1567 struct DiffEntry *
1568 diff_create ()
1569 {
1570   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1571
1572   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1573
1574   return d;
1575 }
1576
1577
1578 struct DiffEntry *
1579 diff_compose (struct DiffEntry *diff_1,
1580               struct DiffEntry *diff_2)
1581 {
1582   struct DiffEntry *diff_new;
1583   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1584   struct DiffElementInfo *di;
1585
1586   diff_new = diff_create ();
1587
1588   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1589   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1590   {
1591     diff_insert (diff_new, di->weight, di->element);
1592   }
1593   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1594
1595   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1596   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1597   {
1598     diff_insert (diff_new, di->weight, di->element);
1599   }
1600   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1601
1602   return diff_new;
1603 }
1604
1605
1606 struct ReferendumEntry *
1607 rfn_create (uint16_t size)
1608 {
1609   struct ReferendumEntry *rfn;
1610
1611   rfn = GNUNET_new (struct ReferendumEntry);
1612   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1613   rfn->peer_commited = GNUNET_new_array (size, int);
1614   rfn->peer_contested = GNUNET_new_array (size, int);
1615   rfn->num_peers = size;
1616
1617   return rfn;
1618 }
1619
1620
1621 #if UNUSED
1622 static void
1623 diff_destroy (struct DiffEntry *diff)
1624 {
1625   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1626   GNUNET_free (diff);
1627 }
1628 #endif
1629
1630
1631 /**
1632  * For a given majority, count what the outcome
1633  * is (add/remove/keep), and give the number
1634  * of peers that voted for this outcome.
1635  */
1636 static void
1637 rfn_majority (const struct ReferendumEntry *rfn,
1638               const struct RfnElementInfo *ri,
1639               uint16_t *ret_majority,
1640               enum ReferendumVote *ret_vote)
1641 {
1642   uint16_t votes_yes = 0;
1643   uint16_t num_commited = 0;
1644   uint16_t i;
1645
1646   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1647               "Computing rfn majority for element %s of rfn {%s}\n",
1648               debug_str_element (ri->element),
1649               debug_str_rfn_key (&rfn->key));
1650
1651   for (i = 0; i < rfn->num_peers; i++)
1652   {
1653     if (GNUNET_NO == rfn->peer_commited[i])
1654       continue;
1655     num_commited++;
1656
1657     if (GNUNET_YES == ri->votes[i])
1658       votes_yes++;
1659   }
1660
1661   if (votes_yes > (num_commited) / 2)
1662   {
1663     *ret_vote = ri->proposal;
1664     *ret_majority = votes_yes;
1665   }
1666   else
1667   {
1668     *ret_vote = VOTE_STAY;
1669     *ret_majority = num_commited - votes_yes;
1670   }
1671 }
1672
1673
1674 struct SetCopyCls
1675 {
1676   struct TaskEntry *task;
1677   struct SetKey dst_set_key;
1678 };
1679
1680
1681 static void
1682 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1683 {
1684   struct SetCopyCls *scc = cls;
1685   struct TaskEntry *task = scc->task;
1686   struct SetKey dst_set_key = scc->dst_set_key;
1687   struct SetEntry *set;
1688   struct SetHandle *sh = GNUNET_new (struct SetHandle);
1689
1690   sh->h = copy;
1691   GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1692                                task->step->session->set_handles_tail,
1693                                sh);
1694
1695   GNUNET_free (scc);
1696   set = GNUNET_new (struct SetEntry);
1697   set->h = copy;
1698   set->key = dst_set_key;
1699   put_set (task->step->session, set);
1700
1701   task->start (task);
1702 }
1703
1704
1705 /**
1706  * Call the start function of the given
1707  * task again after we created a copy of the given set.
1708  */
1709 static void
1710 create_set_copy_for_task (struct TaskEntry *task,
1711                           struct SetKey *src_set_key,
1712                           struct SetKey *dst_set_key)
1713 {
1714   struct SetEntry *src_set;
1715   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1716
1717   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1718               "Copying set {%s} to {%s} for task {%s}\n",
1719               debug_str_set_key (src_set_key),
1720               debug_str_set_key (dst_set_key),
1721               debug_str_task_key (&task->key));
1722
1723   scc->task = task;
1724   scc->dst_set_key = *dst_set_key;
1725   src_set = lookup_set (task->step->session, src_set_key);
1726   GNUNET_assert (NULL != src_set);
1727   GNUNET_SET_copy_lazy (src_set->h,
1728                         set_copy_cb,
1729                         scc);
1730 }
1731
1732
1733 struct SetMutationProgressCls
1734 {
1735   int num_pending;
1736   /**
1737    * Task to finish once all changes are through.
1738    */
1739   struct TaskEntry *task;
1740 };
1741
1742
1743 static void
1744 set_mutation_done (void *cls)
1745 {
1746   struct SetMutationProgressCls *pc = cls;
1747
1748   GNUNET_assert (pc->num_pending > 0);
1749
1750   pc->num_pending--;
1751
1752   if (0 == pc->num_pending)
1753   {
1754     struct TaskEntry *task = pc->task;
1755     GNUNET_free (pc);
1756     finish_task (task);
1757   }
1758 }
1759
1760
1761 static void
1762 try_finish_step_early (struct Step *step)
1763 {
1764   unsigned int i;
1765
1766   if (GNUNET_YES == step->is_running)
1767     return;
1768   if (GNUNET_YES == step->is_finished)
1769     return;
1770   if (GNUNET_NO == step->early_finishable)
1771     return;
1772
1773   step->is_finished = GNUNET_YES;
1774
1775 #ifdef GNUNET_EXTRA_LOGGING
1776   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1777               "Finishing step `%s' early.\n",
1778               step->debug_name);
1779 #endif
1780
1781   for (i = 0; i < step->subordinates_len; i++)
1782   {
1783     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1784     step->subordinates[i]->pending_prereq--;
1785 #ifdef GNUNET_EXTRA_LOGGING
1786     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1787                 "Decreased pending_prereq to %u for step `%s'.\n",
1788                 (unsigned int) step->subordinates[i]->pending_prereq,
1789                 step->subordinates[i]->debug_name);
1790
1791 #endif
1792     try_finish_step_early (step->subordinates[i]);
1793   }
1794
1795   // XXX: maybe schedule as task to avoid recursion?
1796   run_ready_steps (step->session);
1797 }
1798
1799
1800 static void
1801 finish_step (struct Step *step)
1802 {
1803   unsigned int i;
1804
1805   GNUNET_assert (step->finished_tasks == step->tasks_len);
1806   GNUNET_assert (GNUNET_YES == step->is_running);
1807   GNUNET_assert (GNUNET_NO == step->is_finished);
1808
1809 #ifdef GNUNET_EXTRA_LOGGING
1810   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1811               "All tasks of step `%s' with %u subordinates finished.\n",
1812               step->debug_name,
1813               step->subordinates_len);
1814 #endif
1815
1816   for (i = 0; i < step->subordinates_len; i++)
1817   {
1818     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1819     step->subordinates[i]->pending_prereq--;
1820 #ifdef GNUNET_EXTRA_LOGGING
1821     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1822                 "Decreased pending_prereq to %u for step `%s'.\n",
1823                 (unsigned int) step->subordinates[i]->pending_prereq,
1824                 step->subordinates[i]->debug_name);
1825
1826 #endif
1827   }
1828
1829   step->is_finished = GNUNET_YES;
1830
1831   // XXX: maybe schedule as task to avoid recursion?
1832   run_ready_steps (step->session);
1833 }
1834
1835
1836
1837 /**
1838  * Apply the result from one round of gradecasts (i.e. every peer
1839  * should have gradecasted) to the peer's current set.
1840  *
1841  * @param task the task with context information
1842  */
1843 static void
1844 task_start_apply_round (struct TaskEntry *task)
1845 {
1846   struct ConsensusSession *session = task->step->session;
1847   struct SetKey sk_in;
1848   struct SetKey sk_out;
1849   struct RfnKey rk_in;
1850   struct SetEntry *set_out;
1851   struct ReferendumEntry *rfn_in;
1852   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1853   struct RfnElementInfo *ri;
1854   struct SetMutationProgressCls *progress_cls;
1855   uint16_t worst_majority = UINT16_MAX;
1856
1857   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1858   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1859   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1860
1861   set_out = lookup_set (session, &sk_out);
1862   if (NULL == set_out)
1863   {
1864     create_set_copy_for_task (task, &sk_in, &sk_out);
1865     return;
1866   }
1867
1868   rfn_in = lookup_rfn (session, &rk_in);
1869   GNUNET_assert (NULL != rfn_in);
1870
1871   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1872   progress_cls->task = task;
1873
1874   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1875
1876   while (GNUNET_YES ==
1877          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1878                                                       NULL,
1879                                                       (const void **) &ri))
1880   {
1881     uint16_t majority_num;
1882     enum ReferendumVote majority_vote;
1883
1884     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1885
1886     if (worst_majority > majority_num)
1887       worst_majority = majority_num;
1888
1889     switch (majority_vote)
1890     {
1891       case VOTE_ADD:
1892         progress_cls->num_pending++;
1893         GNUNET_assert (GNUNET_OK ==
1894                        GNUNET_SET_add_element (set_out->h,
1895                                                ri->element,
1896                                                &set_mutation_done,
1897                                                progress_cls));
1898         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1899                     "P%u: apply round: adding element %s with %u-majority.\n",
1900                     session->local_peer_idx,
1901                     debug_str_element (ri->element), majority_num);
1902         break;
1903       case VOTE_REMOVE:
1904         progress_cls->num_pending++;
1905         GNUNET_assert (GNUNET_OK ==
1906                        GNUNET_SET_remove_element (set_out->h,
1907                                                   ri->element,
1908                                                   &set_mutation_done,
1909                                                   progress_cls));
1910         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1911                     "P%u: apply round: deleting element %s with %u-majority.\n",
1912                     session->local_peer_idx,
1913                     debug_str_element (ri->element), majority_num);
1914         break;
1915       case VOTE_STAY:
1916         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917                     "P%u: apply round: keeping element %s with %u-majority.\n",
1918                     session->local_peer_idx,
1919                     debug_str_element (ri->element), majority_num);
1920         // do nothing
1921         break;
1922       default:
1923         GNUNET_assert (0);
1924         break;
1925     }
1926   }
1927
1928   if (0 == progress_cls->num_pending)
1929   {
1930     // call closure right now, no pending ops
1931     GNUNET_free (progress_cls);
1932     finish_task (task);
1933   }
1934
1935   {
1936     uint16_t thresh = (session->num_peers / 3) * 2;
1937
1938     if (worst_majority >= thresh)
1939     {
1940       switch (session->early_stopping)
1941       {
1942         case EARLY_STOPPING_NONE:
1943           session->early_stopping = EARLY_STOPPING_ONE_MORE;
1944           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1945                       "P%u: Stopping early (after one more superround)\n",
1946                       session->local_peer_idx);
1947           break;
1948         case EARLY_STOPPING_ONE_MORE:
1949           GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1950                       session->local_peer_idx);
1951           session->early_stopping = EARLY_STOPPING_DONE;
1952           {
1953             struct Step *step;
1954             for (step = session->steps_head; NULL != step; step = step->next)
1955               try_finish_step_early (step);
1956           }
1957           break;
1958         case EARLY_STOPPING_DONE:
1959           /* We shouldn't be here anymore after early stopping */
1960           GNUNET_break (0);
1961           break;
1962         default:
1963           GNUNET_assert (0);
1964           break;
1965       }
1966     }
1967     else if (EARLY_STOPPING_NONE != session->early_stopping)
1968     {
1969       // Our assumption about the number of bad peers
1970       // has been broken.
1971       GNUNET_break_op (0);
1972     }
1973     else
1974     {
1975       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1976                   session->local_peer_idx);
1977     }
1978   }
1979   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1980 }
1981
1982
1983 static void
1984 task_start_grade (struct TaskEntry *task)
1985 {
1986   struct ConsensusSession *session = task->step->session;
1987   struct ReferendumEntry *output_rfn;
1988   struct ReferendumEntry *input_rfn;
1989   struct DiffEntry *input_diff;
1990   struct RfnKey rfn_key;
1991   struct DiffKey diff_key;
1992   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1993   struct RfnElementInfo *ri;
1994   unsigned int gradecast_confidence = 2;
1995
1996   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1997   output_rfn = lookup_rfn (session, &rfn_key);
1998   if (NULL == output_rfn)
1999   {
2000     output_rfn = rfn_create (session->num_peers);
2001     output_rfn->key = rfn_key;
2002     put_rfn (session, output_rfn);
2003   }
2004
2005   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2006   input_diff = lookup_diff (session, &diff_key);
2007   GNUNET_assert (NULL != input_diff);
2008
2009   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2010   input_rfn = lookup_rfn (session, &rfn_key);
2011   GNUNET_assert (NULL != input_rfn);
2012
2013   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2014
2015   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2016
2017   while (GNUNET_YES ==
2018          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2019                                                       NULL,
2020                                                       (const void **) &ri))
2021   {
2022     uint16_t majority_num;
2023     enum ReferendumVote majority_vote;
2024
2025     // XXX: we need contested votes and non-contested votes here
2026     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2027
2028     if (majority_num <= session->num_peers / 3)
2029       majority_vote = VOTE_REMOVE;
2030
2031     switch (majority_vote)
2032     {
2033       case VOTE_STAY:
2034         break;
2035       case VOTE_ADD:
2036         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2037         break;
2038       case VOTE_REMOVE:
2039         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2040         break;
2041       default:
2042         GNUNET_assert (0);
2043         break;
2044     }
2045   }
2046   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2047
2048   {
2049     uint16_t noncontested;
2050     noncontested = rfn_noncontested (input_rfn);
2051     if (noncontested < (session->num_peers / 3) * 2)
2052     {
2053       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2054     }
2055     if (noncontested < (session->num_peers / 3) + 1)
2056     {
2057       gradecast_confidence = 0;
2058     }
2059   }
2060
2061   if (gradecast_confidence >= 1)
2062     rfn_commit (output_rfn, task->key.leader);
2063
2064   if (gradecast_confidence <= 1)
2065     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2066
2067   finish_task (task);
2068 }
2069
2070
2071 static void
2072 task_start_reconcile (struct TaskEntry *task)
2073 {
2074   struct SetEntry *input;
2075   struct SetOpCls *setop = &task->cls.setop;
2076   struct ConsensusSession *session = task->step->session;
2077
2078   input = lookup_set (session, &setop->input_set);
2079   GNUNET_assert (NULL != input);
2080   GNUNET_assert (NULL != input->h);
2081
2082   /* We create the outputs for the operation here
2083      (rather than in the set operation callback)
2084      because we want something valid in there, even
2085      if the other peer doesn't talk to us */
2086
2087   if (SET_KIND_NONE != setop->output_set.set_kind)
2088   {
2089     /* If we don't have an existing output set,
2090        we clone the input set. */
2091     if (NULL == lookup_set (session, &setop->output_set))
2092     {
2093       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2094       return;
2095     }
2096   }
2097
2098   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2099   {
2100     if (NULL == lookup_rfn (session, &setop->output_rfn))
2101     {
2102       struct ReferendumEntry *rfn;
2103
2104       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2105                   "P%u: output rfn <%s> missing, creating.\n",
2106                   session->local_peer_idx,
2107                   debug_str_rfn_key (&setop->output_rfn));
2108
2109       rfn = rfn_create (session->num_peers);
2110       rfn->key = setop->output_rfn;
2111       put_rfn (session, rfn);
2112     }
2113   }
2114
2115   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2116   {
2117     if (NULL == lookup_diff (session, &setop->output_diff))
2118     {
2119       struct DiffEntry *diff;
2120
2121       diff = diff_create ();
2122       diff->key = setop->output_diff;
2123       put_diff (session, diff);
2124     }
2125   }
2126
2127   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2128   {
2129     /* XXX: mark the corresponding rfn as commited if necessary */
2130     finish_task (task);
2131     return;
2132   }
2133
2134   if (task->key.peer1 == session->local_peer_idx)
2135   {
2136     struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2137
2138     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2139                 "P%u: Looking up set {%s} to run remote union\n",
2140                 session->local_peer_idx,
2141                 debug_str_set_key (&setop->input_set));
2142
2143     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2144     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2145
2146     rcm.kind = htons (task->key.kind);
2147     rcm.peer1 = htons (task->key.peer1);
2148     rcm.peer2 = htons (task->key.peer2);
2149     rcm.leader = htons (task->key.leader);
2150     rcm.repetition = htons (task->key.repetition);
2151     rcm.is_contested = htons (0);
2152
2153     GNUNET_assert (NULL == setop->op);
2154     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2155                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2156
2157     struct GNUNET_SET_Option opts[] = {
2158       { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2159       { GNUNET_SET_OPTION_END },
2160     };
2161
2162     // XXX: maybe this should be done while
2163     // setting up tasks alreays?
2164     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2165                                     &session->global_id,
2166                                     &rcm.header,
2167                                     GNUNET_SET_RESULT_SYMMETRIC,
2168                                     opts,
2169                                     set_result_cb,
2170                                     task);
2171
2172     commit_set (session, task);
2173   }
2174   else if (task->key.peer2 == session->local_peer_idx)
2175   {
2176     /* Wait for the other peer to contact us */
2177     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2178                 session->local_peer_idx, task->key.peer1);
2179
2180     if (NULL != setop->op)
2181     {
2182       commit_set (session, task);
2183     }
2184   }
2185   else
2186   {
2187     /* We made an error while constructing the task graph. */
2188     GNUNET_assert (0);
2189   }
2190 }
2191
2192
2193 static void
2194 task_start_eval_echo (struct TaskEntry *task)
2195 {
2196   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2197   struct ReferendumEntry *input_rfn;
2198   struct RfnElementInfo *ri;
2199   struct SetEntry *output_set;
2200   struct SetMutationProgressCls *progress_cls;
2201   struct ConsensusSession *session = task->step->session;
2202   struct SetKey sk_in;
2203   struct SetKey sk_out;
2204   struct RfnKey rk_in;
2205
2206   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2207   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2208   output_set = lookup_set (session, &sk_out);
2209   if (NULL == output_set)
2210   {
2211     create_set_copy_for_task (task, &sk_in, &sk_out);
2212     return;
2213   }
2214
2215
2216   {
2217     // FIXME: should be marked as a shallow copy, so
2218     // we can destroy everything correctly
2219     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2220     last_set->h = output_set->h;
2221     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2222     put_set (session, last_set);
2223   }
2224
2225   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2226               "Evaluating referendum in Task {%s}\n",
2227               debug_str_task_key (&task->key));
2228
2229   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2230   progress_cls->task = task;
2231
2232   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2233   input_rfn = lookup_rfn (session, &rk_in);
2234
2235   GNUNET_assert (NULL != input_rfn);
2236
2237   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2238   GNUNET_assert (NULL != iter);
2239
2240   while (GNUNET_YES ==
2241          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2242                                                       NULL,
2243                                                       (const void **) &ri))
2244   {
2245     enum ReferendumVote majority_vote;
2246     uint16_t majority_num;
2247
2248     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2249
2250     if (majority_num < session->num_peers / 3)
2251     {
2252       /* It is not the case that all nonfaulty peers
2253          echoed the same value.  Since we're doing a set reconciliation, we
2254          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2255          reconciliation as contested.  Other peers might not know that the
2256          leader is faulty, thus we still re-distribute in the confirmation
2257          round. */
2258       output_set->is_contested = GNUNET_YES;
2259     }
2260
2261     switch (majority_vote)
2262     {
2263       case VOTE_ADD:
2264         progress_cls->num_pending++;
2265         GNUNET_assert (GNUNET_OK ==
2266                        GNUNET_SET_add_element (output_set->h,
2267                                                ri->element,
2268                                                set_mutation_done,
2269                                                progress_cls));
2270         break;
2271       case VOTE_REMOVE:
2272         progress_cls->num_pending++;
2273         GNUNET_assert (GNUNET_OK ==
2274                        GNUNET_SET_remove_element (output_set->h,
2275                                                   ri->element,
2276                                                   set_mutation_done,
2277                                                   progress_cls));
2278         break;
2279       case VOTE_STAY:
2280         /* Nothing to do. */
2281         break;
2282       default:
2283         /* not reached */
2284         GNUNET_assert (0);
2285     }
2286   }
2287
2288   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2289
2290   if (0 == progress_cls->num_pending)
2291   {
2292     // call closure right now, no pending ops
2293     GNUNET_free (progress_cls);
2294     finish_task (task);
2295   }
2296 }
2297
2298
2299 static void
2300 task_start_finish (struct TaskEntry *task)
2301 {
2302   struct SetEntry *final_set;
2303   struct ConsensusSession *session = task->step->session;
2304
2305   final_set = lookup_set (session, &task->cls.finish.input_set);
2306
2307   GNUNET_assert (NULL != final_set);
2308
2309
2310   GNUNET_SET_iterate (final_set->h,
2311                       send_to_client_iter,
2312                       task);
2313 }
2314
2315 static void
2316 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2317 {
2318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2319
2320   GNUNET_assert (GNUNET_NO == task->is_started);
2321   GNUNET_assert (GNUNET_NO == task->is_finished);
2322   GNUNET_assert (NULL != task->start);
2323
2324   task->start (task);
2325
2326   task->is_started = GNUNET_YES;
2327 }
2328
2329
2330
2331
2332 /*
2333  * Run all steps of the session that don't any
2334  * more dependencies.
2335  */
2336 static void
2337 run_ready_steps (struct ConsensusSession *session)
2338 {
2339   struct Step *step;
2340
2341   step = session->steps_head;
2342
2343   while (NULL != step)
2344   {
2345     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2346     {
2347       size_t i;
2348
2349       GNUNET_assert (0 == step->finished_tasks);
2350
2351 #ifdef GNUNET_EXTRA_LOGGING
2352       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2353                   session->local_peer_idx,
2354                   step->debug_name,
2355                   step->round, step->tasks_len, step->subordinates_len);
2356 #endif
2357
2358       step->is_running = GNUNET_YES;
2359       for (i = 0; i < step->tasks_len; i++)
2360         start_task (session, step->tasks[i]);
2361
2362       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2363       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2364         finish_step (step);
2365
2366       /* Running the next ready steps will be triggered by task completion */
2367       return;
2368     }
2369     step = step->next;
2370   }
2371
2372   return;
2373 }
2374
2375
2376
2377 static void
2378 finish_task (struct TaskEntry *task)
2379 {
2380   GNUNET_assert (GNUNET_NO == task->is_finished);
2381   task->is_finished = GNUNET_YES;
2382
2383   task->step->finished_tasks++;
2384
2385   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2386               "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2387               task->step->session->local_peer_idx,
2388               debug_str_task_key (&task->key),
2389               (unsigned int) task->step->finished_tasks,
2390               (unsigned int) task->step->tasks_len);
2391
2392   if (task->step->finished_tasks == task->step->tasks_len)
2393     finish_step (task->step);
2394 }
2395
2396
2397 /**
2398  * Search peer in the list of peers in session.
2399  *
2400  * @param peer peer to find
2401  * @param session session with peer
2402  * @return index of peer, -1 if peer is not in session
2403  */
2404 static int
2405 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2406 {
2407   int i;
2408   for (i = 0; i < session->num_peers; i++)
2409     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2410       return i;
2411   return -1;
2412 }
2413
2414
2415 /**
2416  * Compute a global, (hopefully) unique consensus session id,
2417  * from the local id of the consensus session, and the identities of all participants.
2418  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2419  * exactly the same peers, the global id will be different.
2420  *
2421  * @param session session to generate the global id for
2422  * @param local_session_id local id of the consensus session
2423  */
2424 static void
2425 compute_global_id (struct ConsensusSession *session,
2426                    const struct GNUNET_HashCode *local_session_id)
2427 {
2428   const char *salt = "gnunet-service-consensus/session_id";
2429
2430   GNUNET_assert (GNUNET_YES ==
2431                  GNUNET_CRYPTO_kdf (&session->global_id,
2432                                     sizeof (struct GNUNET_HashCode),
2433                                     salt,
2434                                     strlen (salt),
2435                                     session->peers,
2436                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2437                                     local_session_id,
2438                                     sizeof (struct GNUNET_HashCode),
2439                                     NULL));
2440 }
2441
2442
2443 /**
2444  * Compare two peer identities.
2445  *
2446  * @param h1 some peer identity
2447  * @param h2 some peer identity
2448  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2449  */
2450 static int
2451 peer_id_cmp (const void *h1, const void *h2)
2452 {
2453   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2454 }
2455
2456
2457 /**
2458  * Create the sorted list of peers for the session,
2459  * add the local peer if not in the join message.
2460  *
2461  * @param session session to initialize
2462  * @param join_msg join message with the list of peers participating at the end
2463  */
2464 static void
2465 initialize_session_peer_list (struct ConsensusSession *session,
2466                               const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2467 {
2468   const struct GNUNET_PeerIdentity *msg_peers
2469     = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2470   int local_peer_in_list;
2471
2472   session->num_peers = ntohl (join_msg->num_peers);
2473
2474   /* Peers in the join message, may or may not include the local peer,
2475      Add it if it is missing. */
2476   local_peer_in_list = GNUNET_NO;
2477   for (unsigned int i = 0; i < session->num_peers; i++)
2478   {
2479     if (0 == memcmp (&msg_peers[i],
2480                      &my_peer,
2481                      sizeof (struct GNUNET_PeerIdentity)))
2482     {
2483       local_peer_in_list = GNUNET_YES;
2484       break;
2485     }
2486   }
2487   if (GNUNET_NO == local_peer_in_list)
2488     session->num_peers++;
2489
2490   session->peers = GNUNET_new_array (session->num_peers,
2491                                      struct GNUNET_PeerIdentity);
2492   if (GNUNET_NO == local_peer_in_list)
2493     session->peers[session->num_peers - 1] = my_peer;
2494
2495   GNUNET_memcpy (session->peers,
2496                  msg_peers,
2497                  ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2498   qsort (session->peers,
2499          session->num_peers,
2500          sizeof (struct GNUNET_PeerIdentity),
2501          &peer_id_cmp);
2502 }
2503
2504
2505 static struct TaskEntry *
2506 lookup_task (struct ConsensusSession *session,
2507              struct TaskKey *key)
2508 {
2509   struct GNUNET_HashCode hash;
2510
2511
2512   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2513   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2514               GNUNET_h2s (&hash));
2515   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2516 }
2517
2518
2519 /**
2520  * Called when another peer wants to do a set operation with the
2521  * local peer.
2522  *
2523  * @param cls closure
2524  * @param other_peer the other peer
2525  * @param context_msg message with application specific information from
2526  *        the other peer
2527  * @param request request from the other peer, use GNUNET_SET_accept
2528  *        to accept it, otherwise the request will be refused
2529  *        Note that we don't use a return value here, as it is also
2530  *        necessary to specify the set we want to do the operation with,
2531  *        whith sometimes can be derived from the context message.
2532  *        Also necessary to specify the timeout.
2533  */
2534 static void
2535 set_listen_cb (void *cls,
2536                const struct GNUNET_PeerIdentity *other_peer,
2537                const struct GNUNET_MessageHeader *context_msg,
2538                struct GNUNET_SET_Request *request)
2539 {
2540   struct ConsensusSession *session = cls;
2541   struct TaskKey tk;
2542   struct TaskEntry *task;
2543   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2544
2545   if (NULL == context_msg)
2546   {
2547     GNUNET_break_op (0);
2548     return;
2549   }
2550
2551   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2552   {
2553     GNUNET_break_op (0);
2554     return;
2555   }
2556
2557   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2558   {
2559     GNUNET_break_op (0);
2560     return;
2561   }
2562
2563   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2564
2565   tk = ((struct TaskKey) {
2566       .kind = ntohs (cm->kind),
2567       .peer1 = ntohs (cm->peer1),
2568       .peer2 = ntohs (cm->peer2),
2569       .repetition = ntohs (cm->repetition),
2570       .leader = ntohs (cm->leader),
2571   });
2572
2573   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2574               session->local_peer_idx, debug_str_task_key (&tk));
2575
2576   task = lookup_task (session, &tk);
2577
2578   if (NULL == task)
2579   {
2580     GNUNET_break_op (0);
2581     return;
2582   }
2583
2584   if (GNUNET_YES == task->is_finished)
2585   {
2586     GNUNET_break_op (0);
2587     return;
2588   }
2589
2590   if (task->key.peer2 != session->local_peer_idx)
2591   {
2592     /* We're being asked, so we must be thne 2nd peer. */
2593     GNUNET_break_op (0);
2594     return;
2595   }
2596
2597   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2598                     (task->key.peer2 == session->local_peer_idx)));
2599
2600   struct GNUNET_SET_Option opts[] = {
2601     { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2602     { GNUNET_SET_OPTION_END },
2603   };
2604
2605   task->cls.setop.op = GNUNET_SET_accept (request,
2606                                           GNUNET_SET_RESULT_SYMMETRIC,
2607                                           opts,
2608                                           set_result_cb,
2609                                           task);
2610
2611   /* If the task hasn't been started yet,
2612      we wait for that until we commit. */
2613
2614   if (GNUNET_YES == task->is_started)
2615   {
2616     commit_set (session, task);
2617   }
2618 }
2619
2620
2621
2622 static void
2623 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2624           struct TaskEntry *t)
2625 {
2626   struct GNUNET_HashCode round_hash;
2627   struct Step *s;
2628
2629   GNUNET_assert (NULL != t->step);
2630
2631   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2632
2633   s = t->step;
2634
2635   if (s->tasks_len == s->tasks_cap)
2636   {
2637     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2638     GNUNET_array_grow (s->tasks,
2639                        s->tasks_cap,
2640                        target_size);
2641   }
2642
2643 #ifdef GNUNET_EXTRA_LOGGING
2644   GNUNET_assert (NULL != s->debug_name);
2645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2646               debug_str_task_key (&t->key),
2647               s->debug_name);
2648 #endif
2649
2650   s->tasks[s->tasks_len] = t;
2651   s->tasks_len++;
2652
2653   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2654   GNUNET_assert (GNUNET_OK ==
2655       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2656                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2657 }
2658
2659
2660 static void
2661 install_step_timeouts (struct ConsensusSession *session)
2662 {
2663   /* Given the fully constructed task graph
2664      with rounds for tasks, we can give the tasks timeouts. */
2665
2666   // unsigned int max_round;
2667
2668   /* XXX: implement! */
2669 }
2670
2671
2672
2673 /*
2674  * Arrange two peers in some canonical order.
2675  */
2676 static void
2677 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2678 {
2679   uint16_t a;
2680   uint16_t b;
2681
2682   GNUNET_assert (*p1 < n);
2683   GNUNET_assert (*p2 < n);
2684
2685   if (*p1 < *p2)
2686   {
2687     a = *p1;
2688     b = *p2;
2689   }
2690   else
2691   {
2692     a = *p2;
2693     b = *p1;
2694   }
2695
2696   /* For uniformly random *p1, *p2,
2697      this condition is true with 50% chance */
2698   if (((b - a) + n) % n <= n / 2)
2699   {
2700     *p1 = a;
2701     *p2 = b;
2702   }
2703   else
2704   {
2705     *p1 = b;
2706     *p2 = a;
2707   }
2708 }
2709
2710
2711 /**
2712  * Record @a dep as a dependency of @a step.
2713  */
2714 static void
2715 step_depend_on (struct Step *step, struct Step *dep)
2716 {
2717   /* We're not checking for cyclic dependencies,
2718      but this is a cheap sanity check. */
2719   GNUNET_assert (step != dep);
2720   GNUNET_assert (NULL != step);
2721   GNUNET_assert (NULL != dep);
2722   GNUNET_assert (dep->round <= step->round);
2723
2724 #ifdef GNUNET_EXTRA_LOGGING
2725   /* Make sure we have complete debugging information.
2726      Also checks that we don't screw up too badly
2727      constructing the task graph. */
2728   GNUNET_assert (NULL != step->debug_name);
2729   GNUNET_assert (NULL != dep->debug_name);
2730   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2731               "Making step `%s' depend on `%s'\n",
2732               step->debug_name,
2733               dep->debug_name);
2734 #endif
2735
2736   if (dep->subordinates_cap == dep->subordinates_len)
2737   {
2738     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2739     GNUNET_array_grow (dep->subordinates,
2740                        dep->subordinates_cap,
2741                        target_size);
2742   }
2743
2744   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2745
2746   dep->subordinates[dep->subordinates_len] = step;
2747   dep->subordinates_len++;
2748
2749   step->pending_prereq++;
2750 }
2751
2752
2753 static struct Step *
2754 create_step (struct ConsensusSession *session, int round, int early_finishable)
2755 {
2756   struct Step *step;
2757   step = GNUNET_new (struct Step);
2758   step->session = session;
2759   step->round = round;
2760   step->early_finishable = early_finishable;
2761   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2762                                     session->steps_tail,
2763                                     step);
2764   return step;
2765 }
2766
2767
2768 /**
2769  * Construct the task graph for a single
2770  * gradecast.
2771  */
2772 static void
2773 construct_task_graph_gradecast (struct ConsensusSession *session,
2774                                 uint16_t rep,
2775                                 uint16_t lead,
2776                                 struct Step *step_before,
2777                                 struct Step *step_after)
2778 {
2779   uint16_t n = session->num_peers;
2780   uint16_t me = session->local_peer_idx;
2781
2782   uint16_t p1;
2783   uint16_t p2;
2784
2785   /* The task we're currently setting up. */
2786   struct TaskEntry task;
2787
2788   struct Step *step;
2789   struct Step *prev_step;
2790
2791   uint16_t round;
2792
2793   unsigned int k;
2794
2795   round = step_before->round + 1;
2796
2797   /* gcast step 1: leader disseminates */
2798
2799   step = create_step (session, round, GNUNET_YES);
2800
2801 #ifdef GNUNET_EXTRA_LOGGING
2802   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2803 #endif
2804   step_depend_on (step, step_before);
2805
2806   if (lead == me)
2807   {
2808     for (k = 0; k < n; k++)
2809     {
2810       if (k == me)
2811         continue;
2812       p1 = me;
2813       p2 = k;
2814       arrange_peers (&p1, &p2, n);
2815       task = ((struct TaskEntry) {
2816         .step = step,
2817         .start = task_start_reconcile,
2818         .cancel = task_cancel_reconcile,
2819         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2820       });
2821       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2822       put_task (session->taskmap, &task);
2823     }
2824     /* We run this task to make sure that the leader
2825        has the stored the SET_KIND_LEADER set of himself,
2826        so he can participate in the rest of the gradecast
2827        without the code having to handle any special cases. */
2828     task = ((struct TaskEntry) {
2829       .step = step,
2830       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2831       .start = task_start_reconcile,
2832       .cancel = task_cancel_reconcile,
2833     });
2834     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2835     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2836     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2837     put_task (session->taskmap, &task);
2838   }
2839   else
2840   {
2841     p1 = me;
2842     p2 = lead;
2843     arrange_peers (&p1, &p2, n);
2844     task = ((struct TaskEntry) {
2845       .step = step,
2846       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2847       .start = task_start_reconcile,
2848       .cancel = task_cancel_reconcile,
2849     });
2850     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2851     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2852     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2853     put_task (session->taskmap, &task);
2854   }
2855
2856   /* gcast phase 2: echo */
2857   prev_step = step;
2858   round += 1;
2859   step = create_step (session, round, GNUNET_YES);
2860 #ifdef GNUNET_EXTRA_LOGGING
2861   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2862 #endif
2863   step_depend_on (step, prev_step);
2864
2865   for (k = 0; k < n; k++)
2866   {
2867     p1 = k;
2868     p2 = me;
2869     arrange_peers (&p1, &p2, n);
2870     task = ((struct TaskEntry) {
2871       .step = step,
2872       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2873       .start = task_start_reconcile,
2874       .cancel = task_cancel_reconcile,
2875     });
2876     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2877     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2878     put_task (session->taskmap, &task);
2879   }
2880
2881   prev_step = step;
2882   /* Same round, since step only has local tasks */
2883   step = create_step (session, round, GNUNET_YES);
2884 #ifdef GNUNET_EXTRA_LOGGING
2885   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2886 #endif
2887   step_depend_on (step, prev_step);
2888
2889   arrange_peers (&p1, &p2, n);
2890   task = ((struct TaskEntry) {
2891     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2892     .step = step,
2893     .start = task_start_eval_echo
2894   });
2895   put_task (session->taskmap, &task);
2896
2897   prev_step = step;
2898   round += 1;
2899   step = create_step (session, round, GNUNET_YES);
2900 #ifdef GNUNET_EXTRA_LOGGING
2901   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2902 #endif
2903   step_depend_on (step, prev_step);
2904
2905   /* gcast phase 3: confirmation and grading */
2906   for (k = 0; k < n; k++)
2907   {
2908     p1 = k;
2909     p2 = me;
2910     arrange_peers (&p1, &p2, n);
2911     task = ((struct TaskEntry) {
2912       .step = step,
2913       .start = task_start_reconcile,
2914       .cancel = task_cancel_reconcile,
2915       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2916     });
2917     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2918     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2919     /* If there was at least one element in the echo round that was
2920        contested (i.e. it had no n-t majority), then we let the other peers
2921        know, and other peers let us know.  The contested flag for each peer is
2922        stored in the rfn. */
2923     task.cls.setop.transceive_contested = GNUNET_YES;
2924     put_task (session->taskmap, &task);
2925   }
2926
2927   prev_step = step;
2928   /* Same round, since step only has local tasks */
2929   step = create_step (session, round, GNUNET_YES);
2930 #ifdef GNUNET_EXTRA_LOGGING
2931   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2932 #endif
2933   step_depend_on (step, prev_step);
2934
2935   task = ((struct TaskEntry) {
2936     .step = step,
2937     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2938     .start = task_start_grade,
2939   });
2940   put_task (session->taskmap, &task);
2941
2942   step_depend_on (step_after, step);
2943 }
2944
2945
2946 static void
2947 construct_task_graph (struct ConsensusSession *session)
2948 {
2949   uint16_t n = session->num_peers;
2950   uint16_t t = n / 3;
2951
2952   uint16_t me = session->local_peer_idx;
2953
2954   /* The task we're currently setting up. */
2955   struct TaskEntry task;
2956
2957   /* Current leader */
2958   unsigned int lead;
2959
2960   struct Step *step;
2961   struct Step *prev_step;
2962
2963   unsigned int round = 0;
2964
2965   unsigned int i;
2966
2967   // XXX: introduce first step,
2968   // where we wait for all insert acks
2969   // from the set service
2970
2971   /* faster but brittle all-to-all */
2972
2973   // XXX: Not implemented yet
2974
2975   /* all-to-all step */
2976
2977   step = create_step (session, round, GNUNET_NO);
2978
2979 #ifdef GNUNET_EXTRA_LOGGING
2980   step->debug_name = GNUNET_strdup ("all to all");
2981 #endif
2982
2983   for (i = 0; i < n; i++)
2984   {
2985     uint16_t p1;
2986     uint16_t p2;
2987
2988     p1 = me;
2989     p2 = i;
2990     arrange_peers (&p1, &p2, n);
2991     task = ((struct TaskEntry) {
2992       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2993       .step = step,
2994       .start = task_start_reconcile,
2995       .cancel = task_cancel_reconcile,
2996     });
2997     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2998     task.cls.setop.output_set = task.cls.setop.input_set;
2999     task.cls.setop.do_not_remove = GNUNET_YES;
3000     put_task (session->taskmap, &task);
3001   }
3002
3003   round += 1;
3004   prev_step = step;
3005   step = create_step (session, round, GNUNET_NO);;
3006 #ifdef GNUNET_EXTRA_LOGGING
3007   step->debug_name = GNUNET_strdup ("all to all 2");
3008 #endif
3009   step_depend_on (step, prev_step);
3010
3011
3012   for (i = 0; i < n; i++)
3013   {
3014     uint16_t p1;
3015     uint16_t p2;
3016
3017     p1 = me;
3018     p2 = i;
3019     arrange_peers (&p1, &p2, n);
3020     task = ((struct TaskEntry) {
3021       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3022       .step = step,
3023       .start = task_start_reconcile,
3024       .cancel = task_cancel_reconcile,
3025     });
3026     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3027     task.cls.setop.output_set = task.cls.setop.input_set;
3028     task.cls.setop.do_not_remove = GNUNET_YES;
3029     put_task (session->taskmap, &task);
3030   }
3031
3032   round += 1;
3033
3034   prev_step = step;
3035   step = NULL;
3036
3037
3038
3039   /* Byzantine union */
3040
3041   /* sequential repetitions of the gradecasts */
3042   for (i = 0; i < t + 1; i++)
3043   {
3044     struct Step *step_rep_start;
3045     struct Step *step_rep_end;
3046
3047     /* Every repetition is in a separate round. */
3048     step_rep_start = create_step (session, round, GNUNET_YES);
3049 #ifdef GNUNET_EXTRA_LOGGING
3050     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3051 #endif
3052
3053     step_depend_on (step_rep_start, prev_step);
3054
3055     /* gradecast has three rounds */
3056     round += 3;
3057     step_rep_end = create_step (session, round, GNUNET_YES);
3058 #ifdef GNUNET_EXTRA_LOGGING
3059     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3060 #endif
3061
3062     /* parallel gradecasts */
3063     for (lead = 0; lead < n; lead++)
3064       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3065
3066     task = ((struct TaskEntry) {
3067       .step = step_rep_end,
3068       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3069       .start = task_start_apply_round,
3070     });
3071     put_task (session->taskmap, &task);
3072
3073     prev_step = step_rep_end;
3074   }
3075
3076  /* There is no next gradecast round, thus the final
3077     start step is the overall end step of the gradecasts */
3078   round += 1;
3079   step = create_step (session, round, GNUNET_NO);
3080 #ifdef GNUNET_EXTRA_LOGGING
3081   GNUNET_asprintf (&step->debug_name, "finish");
3082 #endif
3083   step_depend_on (step, prev_step);
3084
3085   task = ((struct TaskEntry) {
3086     .step = step,
3087     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3088     .start = task_start_finish,
3089   });
3090   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3091
3092   put_task (session->taskmap, &task);
3093 }
3094
3095
3096
3097 /**
3098  * Check join message.
3099  *
3100  * @param cls session of client that sent the message
3101  * @param m message sent by the client
3102  * @return #GNUNET_OK if @a m is well-formed
3103  */
3104 static int
3105 check_client_join (void *cls,
3106                    const struct GNUNET_CONSENSUS_JoinMessage *m)
3107 {
3108   uint32_t listed_peers = ntohl (m->num_peers);
3109
3110   if ( (ntohs (m->header.size) - sizeof (*m)) !=
3111        listed_peers * sizeof (struct GNUNET_PeerIdentity))
3112   {
3113     GNUNET_break (0);
3114     return GNUNET_SYSERR;
3115   }
3116   return GNUNET_OK;
3117 }
3118
3119
3120 /**
3121  * Called when a client wants to join a consensus session.
3122  *
3123  * @param cls session of client that sent the message
3124  * @param m message sent by the client
3125  */
3126 static void
3127 handle_client_join (void *cls,
3128                     const struct GNUNET_CONSENSUS_JoinMessage *m)
3129 {
3130   struct ConsensusSession *session = cls;
3131   struct ConsensusSession *other_session;
3132
3133   initialize_session_peer_list (session,
3134                                 m);
3135   compute_global_id (session,
3136                      &m->session_id);
3137
3138   /* Check if some local client already owns the session.
3139      It is only legal to have a session with an existing global id
3140      if all other sessions with this global id are finished.*/
3141   for (other_session = sessions_head;
3142        NULL != other_session;
3143        other_session = other_session->next)
3144   {
3145     if ( (other_session != session) &&
3146          (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3147                                        &other_session->global_id)) )
3148       break;
3149   }
3150
3151   session->conclude_deadline
3152     = GNUNET_TIME_absolute_ntoh (m->deadline);
3153   session->conclude_start
3154     = GNUNET_TIME_absolute_ntoh (m->start);
3155   session->local_peer_idx = get_peer_idx (&my_peer,
3156                                           session);
3157   GNUNET_assert (-1 != session->local_peer_idx);
3158
3159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3160               "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3161               GNUNET_h2s (&m->session_id),
3162               session->num_peers,
3163               session->local_peer_idx,
3164               GNUNET_STRINGS_relative_time_to_string
3165               (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3166                                                     session->conclude_deadline),
3167                GNUNET_YES));
3168
3169   session->set_listener
3170     = GNUNET_SET_listen (cfg,
3171                          GNUNET_SET_OPERATION_UNION,
3172                          &session->global_id,
3173                          &set_listen_cb,
3174                          session);
3175
3176   session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3177                                                           GNUNET_NO);
3178   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3179                                                            GNUNET_NO);
3180   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3181                                                            GNUNET_NO);
3182   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3183                                                           GNUNET_NO);
3184
3185   {
3186     struct SetEntry *client_set;
3187
3188     client_set = GNUNET_new (struct SetEntry);
3189     client_set->h = GNUNET_SET_create (cfg,
3190                                        GNUNET_SET_OPERATION_UNION);
3191     struct SetHandle *sh = GNUNET_new (struct SetHandle);
3192     sh->h = client_set->h;
3193     GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3194                                  session->set_handles_tail,
3195                                  sh);
3196     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3197     put_set (session,
3198              client_set);
3199   }
3200
3201   session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3202                                                  int);
3203
3204   /* Just construct the task graph,
3205      but don't run anything until the client calls conclude. */
3206   construct_task_graph (session);
3207   GNUNET_SERVICE_client_continue (session->client);
3208 }
3209
3210
3211 static void
3212 client_insert_done (void *cls)
3213 {
3214   // FIXME: implement
3215 }
3216
3217
3218 /**
3219  * Called when a client performs an insert operation.
3220  *
3221  * @param cls client handle
3222  * @param msg message sent by the client
3223  * @return #GNUNET_OK (always well-formed)
3224  */
3225 static int
3226 check_client_insert (void *cls,
3227                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3228 {
3229   return GNUNET_OK;
3230 }
3231
3232
3233 /**
3234  * Called when a client performs an insert operation.
3235  *
3236  * @param cls client handle
3237  * @param msg message sent by the client
3238  */
3239 static void
3240 handle_client_insert (void *cls,
3241                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3242 {
3243   struct ConsensusSession *session = cls;
3244   ssize_t element_size;
3245   struct GNUNET_SET_Handle *initial_set;
3246   struct ConsensusElement *ce;
3247
3248   if (GNUNET_YES == session->conclude_started)
3249   {
3250     GNUNET_break (0);
3251     GNUNET_SERVICE_client_drop (session->client);
3252     return;
3253   }
3254
3255   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3256   ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3257   GNUNET_memcpy (&ce[1], &msg[1], element_size);
3258   ce->payload_type = msg->element_type;
3259
3260   struct GNUNET_SET_Element element = {
3261     .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3262     .size = sizeof (struct ConsensusElement) + element_size,
3263     .data = ce,
3264   };
3265
3266   {
3267     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3268     struct SetEntry *entry;
3269
3270     entry = lookup_set (session,
3271                         &key);
3272     GNUNET_assert (NULL != entry);
3273     initial_set = entry->h;
3274   }
3275
3276   session->num_client_insert_pending++;
3277   GNUNET_SET_add_element (initial_set,
3278                           &element,
3279                           &client_insert_done,
3280                           session);
3281
3282 #ifdef GNUNET_EXTRA_LOGGING
3283   {
3284     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3285                 "P%u: element %s added\n",
3286                 session->local_peer_idx,
3287                 debug_str_element (&element));
3288   }
3289 #endif
3290   GNUNET_free (ce);
3291   GNUNET_SERVICE_client_continue (session->client);
3292 }
3293
3294
3295 /**
3296  * Called when a client performs the conclude operation.
3297  *
3298  * @param cls client handle
3299  * @param message message sent by the client
3300  */
3301 static void
3302 handle_client_conclude (void *cls,
3303                         const struct GNUNET_MessageHeader *message)
3304 {
3305   struct ConsensusSession *session = cls;
3306
3307   if (GNUNET_YES == session->conclude_started)
3308   {
3309     /* conclude started twice */
3310     GNUNET_break (0);
3311     GNUNET_SERVICE_client_drop (session->client);
3312     return;
3313   }
3314   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3315               "conclude requested\n");
3316   session->conclude_started = GNUNET_YES;
3317   install_step_timeouts (session);
3318   run_ready_steps (session);
3319   GNUNET_SERVICE_client_continue (session->client);
3320 }
3321
3322
3323 /**
3324  * Called to clean up, after a shutdown has been requested.
3325  *
3326  * @param cls closure
3327  */
3328 static void
3329 shutdown_task (void *cls)
3330 {
3331   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3332               "shutting down\n");
3333   GNUNET_STATISTICS_destroy (statistics,
3334                              GNUNET_NO);
3335   statistics = NULL;
3336 }
3337
3338
3339 /**
3340  * Start processing consensus requests.
3341  *
3342  * @param cls closure
3343  * @param c configuration to use
3344  * @param service the initialized service
3345  */
3346 static void
3347 run (void *cls,
3348      const struct GNUNET_CONFIGURATION_Handle *c,
3349      struct GNUNET_SERVICE_Handle *service)
3350 {
3351   cfg = c;
3352   if (GNUNET_OK !=
3353       GNUNET_CRYPTO_get_peer_identity (cfg,
3354                                        &my_peer))
3355   {
3356     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3357                 "Could not retrieve host identity\n");
3358     GNUNET_SCHEDULER_shutdown ();
3359     return;
3360   }
3361   statistics = GNUNET_STATISTICS_create ("consensus",
3362                                          cfg);
3363   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3364                                  NULL);
3365 }
3366
3367
3368 /**
3369  * Callback called when a client connects to the service.
3370  *
3371  * @param cls closure for the service
3372  * @param c the new client that connected to the service
3373  * @param mq the message queue used to send messages to the client
3374  * @return @a c
3375  */
3376 static void *
3377 client_connect_cb (void *cls,
3378                    struct GNUNET_SERVICE_Client *c,
3379                    struct GNUNET_MQ_Handle *mq)
3380 {
3381   struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3382
3383   session->client = c;
3384   session->client_mq = mq;
3385   GNUNET_CONTAINER_DLL_insert (sessions_head,
3386                                sessions_tail,
3387                                session);
3388   return session;
3389 }
3390
3391
3392 /**
3393  * Callback called when a client disconnected from the service
3394  *
3395  * @param cls closure for the service
3396  * @param c the client that disconnected
3397  * @param internal_cls should be equal to @a c
3398  */
3399 static void
3400 client_disconnect_cb (void *cls,
3401                       struct GNUNET_SERVICE_Client *c,
3402                       void *internal_cls)
3403 {
3404   struct ConsensusSession *session = internal_cls;
3405
3406   if (NULL != session->set_listener)
3407   {
3408     GNUNET_SET_listen_cancel (session->set_listener);
3409     session->set_listener = NULL;
3410   }
3411   GNUNET_CONTAINER_DLL_remove (sessions_head,
3412                                sessions_tail,
3413                                session);
3414
3415   while (session->set_handles_head)
3416   {
3417     struct SetHandle *sh = session->set_handles_head;
3418     session->set_handles_head = sh->next;
3419     GNUNET_SET_destroy (sh->h);
3420     GNUNET_free (sh);
3421   }
3422   GNUNET_free (session);
3423 }
3424
3425
3426 /**
3427  * Define "main" method using service macro.
3428  */
3429 GNUNET_SERVICE_MAIN
3430 ("consensus",
3431  GNUNET_SERVICE_OPTION_NONE,
3432  &run,
3433  &client_connect_cb,
3434  &client_disconnect_cb,
3435  NULL,
3436  GNUNET_MQ_hd_fixed_size (client_conclude,
3437                           GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3438                           struct GNUNET_MessageHeader,
3439                           NULL),
3440  GNUNET_MQ_hd_var_size (client_insert,
3441                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3442                         struct GNUNET_CONSENSUS_ElementMessage,
3443                         NULL),
3444  GNUNET_MQ_hd_var_size (client_join,
3445                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3446                         struct GNUNET_CONSENSUS_JoinMessage,
3447                         NULL),
3448  GNUNET_MQ_handler_end ());
3449
3450 /* end of gnunet-service-consensus.c */