add function conv param string
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
36
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
38
39
40 enum ReferendumVote
41 {
42   /**
43    * Vote that nothing should change.
44    * This option is never voted explicitly.
45    */
46   VOTE_STAY = 0,
47   /**
48    * Vote that an element should be added.
49    */
50   VOTE_ADD = 1,
51   /**
52    * Vote that an element should be removed.
53    */
54   VOTE_REMOVE = 2,
55 };
56
57
58 enum EarlyStoppingPhase
59 {
60   EARLY_STOPPING_NONE = 0,
61   EARLY_STOPPING_ONE_MORE = 1,
62   EARLY_STOPPING_DONE = 2,
63 };
64
65
66 GNUNET_NETWORK_STRUCT_BEGIN
67
68
69 struct ContestedPayload
70 {
71 };
72
73 /**
74  * Tuple of integers that together
75  * identify a task uniquely.
76  */
77 struct TaskKey {
78   /**
79    * A value from 'enum PhaseKind'.
80    */
81   uint16_t kind GNUNET_PACKED;
82
83   /**
84    * Number of the first peer
85    * in canonical order.
86    */
87   int16_t peer1 GNUNET_PACKED;
88
89   /**
90    * Number of the second peer in canonical order.
91    */
92   int16_t peer2 GNUNET_PACKED;
93
94   /**
95    * Repetition of the gradecast phase.
96    */
97   int16_t repetition GNUNET_PACKED;
98
99   /**
100    * Leader in the gradecast phase.
101    *
102    * Can be different from both peer1 and peer2.
103    */
104   int16_t leader GNUNET_PACKED;
105 };
106
107
108
109 struct SetKey
110 {
111   int set_kind GNUNET_PACKED;
112   int k1 GNUNET_PACKED;
113   int k2 GNUNET_PACKED;
114 };
115
116
117 struct SetEntry
118 {
119   struct SetKey key;
120   struct GNUNET_SET_Handle *h;
121   /**
122    * GNUNET_YES if the set resulted
123    * from applying a referendum with contested
124    * elements.
125    */
126   int is_contested;
127 };
128
129
130 struct DiffKey
131 {
132   int diff_kind GNUNET_PACKED;
133   int k1 GNUNET_PACKED;
134   int k2 GNUNET_PACKED;
135 };
136
137 struct RfnKey
138 {
139   int rfn_kind GNUNET_PACKED;
140   int k1 GNUNET_PACKED;
141   int k2 GNUNET_PACKED;
142 };
143
144
145 GNUNET_NETWORK_STRUCT_END
146
147 enum PhaseKind
148 {
149   PHASE_KIND_ALL_TO_ALL,
150   PHASE_KIND_GRADECAST_LEADER,
151   PHASE_KIND_GRADECAST_ECHO,
152   PHASE_KIND_GRADECAST_ECHO_GRADE,
153   PHASE_KIND_GRADECAST_CONFIRM,
154   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
155   /**
156    * Apply a repetition of the all-to-all
157    * gradecast to the current set.
158    */
159   PHASE_KIND_APPLY_REP,
160   PHASE_KIND_FINISH,
161 };
162
163
164 enum SetKind
165 {
166   SET_KIND_NONE = 0,
167   SET_KIND_CURRENT,
168   /**
169    * Last result set from a gradecast
170    */
171   SET_KIND_LAST_GRADECAST,
172   SET_KIND_LEADER_PROPOSAL,
173   SET_KIND_ECHO_RESULT,
174 };
175
176 enum DiffKind
177 {
178   DIFF_KIND_NONE = 0,
179   DIFF_KIND_LEADER_PROPOSAL,
180   DIFF_KIND_LEADER_CONSENSUS,
181   DIFF_KIND_GRADECAST_RESULT,
182 };
183
184 enum RfnKind
185 {
186   RFN_KIND_NONE = 0,
187   RFN_KIND_ECHO,
188   RFN_KIND_CONFIRM,
189   RFN_KIND_GRADECAST_RESULT
190 };
191
192
193 struct SetOpCls
194 {
195   struct SetKey input_set;
196
197   struct SetKey output_set;
198   struct RfnKey output_rfn;
199   struct DiffKey output_diff;
200
201   int do_not_remove;
202
203   int transceive_contested;
204
205   struct GNUNET_SET_OperationHandle *op;
206 };
207
208
209 struct FinishCls
210 {
211   struct SetKey input_set;
212 };
213
214 /**
215  * Closure for both @a start_task
216  * and @a cancel_task.
217  */
218 union TaskFuncCls
219 {
220   struct SetOpCls setop;
221   struct FinishCls finish;
222 };
223
224 struct TaskEntry;
225
226 typedef void (*TaskFunc) (struct TaskEntry *task);
227
228 /*
229  * Node in the consensus task graph.
230  */
231 struct TaskEntry
232 {
233   struct TaskKey key;
234
235   struct Step *step;
236
237   int is_started;
238
239   int is_finished;
240
241   TaskFunc start;
242   TaskFunc cancel;
243
244   union TaskFuncCls cls;
245 };
246
247
248 struct Step
249 {
250   /**
251    * All steps of one session are in a
252    * linked list for easier deallocation.
253    */
254   struct Step *prev;
255
256   /**
257    * All steps of one session are in a
258    * linked list for easier deallocation.
259    */
260   struct Step *next;
261
262   struct ConsensusSession *session;
263
264   /**
265    * Tasks that this step is composed of.
266    */
267   struct TaskEntry **tasks;
268   unsigned int tasks_len;
269   unsigned int tasks_cap;
270
271   unsigned int finished_tasks;
272
273   /*
274    * Tasks that have this task as dependency.
275    *
276    * We store pointers to subordinates rather
277    * than to prerequisites since it makes
278    * tracking the readiness of a task easier.
279    */
280   struct Step **subordinates;
281   unsigned int subordinates_len;
282   unsigned int subordinates_cap;
283
284   /**
285    * Counter for the prerequisites of
286    * this step.
287    */
288   size_t pending_prereq;
289
290   /*
291    * Task that will run this step despite
292    * any pending prerequisites.
293    */
294   struct GNUNET_SCHEDULER_Task *timeout_task;
295
296   unsigned int is_running;
297
298   unsigned int is_finished;
299
300   /*
301    * Synchrony round of the task.
302    * Determines the deadline for the task.
303    */
304   unsigned int round;
305
306   /**
307    * Human-readable name for
308    * the task, used for debugging.
309    */
310   char *debug_name;
311
312   /**
313    * When we're doing an early finish, how should this step be
314    * treated?
315    * If GNUNET_YES, the step will be marked as finished
316    * without actually running its tasks.
317    * Otherwise, the step will still be run even after
318    * an early finish.
319    *
320    * Note that a task may never be finished early if
321    * it is already running.
322    */
323   int early_finishable;
324 };
325
326
327 struct RfnElementInfo
328 {
329   const struct GNUNET_SET_Element *element;
330
331   /*
332    * GNUNET_YES if the peer votes for the proposal.
333    */
334   int *votes;
335
336   /**
337    * Proposal for this element,
338    * can only be VOTE_ADD or VOTE_REMOVE.
339    */
340   enum ReferendumVote proposal;
341 };
342
343
344 struct ReferendumEntry
345 {
346   struct RfnKey key;
347
348   /*
349    * Elements where there is at least one proposed change.
350    *
351    * Maps the hash of the GNUNET_SET_Element
352    * to 'struct RfnElementInfo'.
353    */
354   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
355
356   unsigned int num_peers;
357
358   /**
359    * Stores, for every peer in the session,
360    * whether the peer finished the whole referendum.
361    *
362    * Votes from peers are only counted if they're
363    * marked as commited (#GNUNET_YES) in the referendum.
364    *
365    * Otherwise (#GNUNET_NO), the requested changes are
366    * not counted for majority votes or thresholds.
367    */
368   int *peer_commited;
369
370
371   /**
372    * Contestation state of the peer.  If a peer is contested, the values it
373    * contributed are still counted for applying changes, but the grading is
374    * affected.
375    */
376   int *peer_contested;
377 };
378
379
380 struct DiffElementInfo
381 {
382   const struct GNUNET_SET_Element *element;
383
384   /**
385    * Positive weight for 'add', negative
386    * weights for 'remove'.
387    */
388   int weight;
389 };
390
391
392 /**
393  * Weighted diff.
394  */
395 struct DiffEntry
396 {
397   struct DiffKey key;
398   struct GNUNET_CONTAINER_MultiHashMap *changes;
399 };
400
401
402
403 /**
404  * A consensus session consists of one local client and the remote authorities.
405  */
406 struct ConsensusSession
407 {
408   /**
409    * Consensus sessions are kept in a DLL.
410    */
411   struct ConsensusSession *next;
412
413   /**
414    * Consensus sessions are kept in a DLL.
415    */
416   struct ConsensusSession *prev;
417
418   unsigned int num_client_insert_pending;
419
420   struct GNUNET_CONTAINER_MultiHashMap *setmap;
421   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
422   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
423
424   /**
425    * Array of peers with length 'num_peers'.
426    */
427   int *peers_blacklisted;
428
429   /*
430    * Mapping from (hashed) TaskKey to TaskEntry.
431    *
432    * We map the application_id for a round to the task that should be
433    * executed, so we don't have to go through all task whenever we get
434    * an incoming set op request.
435    */
436   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
437
438   struct Step *steps_head;
439   struct Step *steps_tail;
440
441   int conclude_started;
442
443   int conclude_done;
444
445   /**
446   * Global consensus identification, computed
447   * from the session id and participating authorities.
448   */
449   struct GNUNET_HashCode global_id;
450
451   /**
452    * Client that inhabits the session
453    */
454   struct GNUNET_SERVER_Client *client;
455
456   /**
457    * Queued messages to the client.
458    */
459   struct GNUNET_MQ_Handle *client_mq;
460
461   /**
462    * Time when the conclusion of the consensus should begin.
463    */
464   struct GNUNET_TIME_Absolute conclude_start;
465
466   /**
467    * Timeout for all rounds together, single rounds will schedule a timeout task
468    * with a fraction of the conclude timeout.
469    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
470    */
471   struct GNUNET_TIME_Absolute conclude_deadline;
472
473   struct GNUNET_PeerIdentity *peers;
474
475   /**
476    * Number of other peers in the consensus.
477    */
478   unsigned int num_peers;
479
480   /**
481    * Index of the local peer in the peers array
482    */
483   unsigned int local_peer_idx;
484
485   /**
486    * Listener for requests from other peers.
487    * Uses the session's global id as app id.
488    */
489   struct GNUNET_SET_ListenHandle *set_listener;
490
491   /**
492    * State of our early stopping scheme.
493    */
494   int early_stopping;
495 };
496
497 /**
498  * Linked list of sessions this peer participates in.
499  */
500 static struct ConsensusSession *sessions_head;
501
502 /**
503  * Linked list of sessions this peer participates in.
504  */
505 static struct ConsensusSession *sessions_tail;
506
507 /**
508  * Configuration of the consensus service.
509  */
510 static const struct GNUNET_CONFIGURATION_Handle *cfg;
511
512 /**
513  * Handle to the server for this service.
514  */
515 static struct GNUNET_SERVER_Handle *srv;
516
517 /**
518  * Peer that runs this service.
519  */
520 static struct GNUNET_PeerIdentity my_peer;
521
522 /**
523  * Statistics handle.
524  */
525 struct GNUNET_STATISTICS_Handle *statistics;
526
527
528 static void
529 finish_task (struct TaskEntry *task);
530
531 static void
532 run_ready_steps (struct ConsensusSession *session);
533
534 static const char *
535 phasename (uint16_t phase)
536 {
537   switch (phase)
538   {
539     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
540     case PHASE_KIND_FINISH: return "FINISH";
541     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
542     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
543     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
544     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
545     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
546     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
547     default: return "(unknown)";
548   }
549 }
550
551
552 static const char *
553 setname (uint16_t kind)
554 {
555   switch (kind)
556   {
557     case SET_KIND_CURRENT: return "CURRENT";
558     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
559     case SET_KIND_NONE: return "NONE";
560     default: return "(unknown)";
561   }
562 }
563
564 static const char *
565 rfnname (uint16_t kind)
566 {
567   switch (kind)
568   {
569     case RFN_KIND_NONE: return "NONE";
570     case RFN_KIND_ECHO: return "ECHO";
571     case RFN_KIND_CONFIRM: return "CONFIRM";
572     default: return "(unknown)";
573   }
574 }
575
576 static const char *
577 diffname (uint16_t kind)
578 {
579   switch (kind)
580   {
581     case DIFF_KIND_NONE: return "NONE";
582     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
583     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
584     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
585     default: return "(unknown)";
586   }
587 }
588
589 #ifdef GNUNET_EXTRA_LOGGING
590
591
592 static const char *
593 debug_str_element (const struct GNUNET_SET_Element *el)
594 {
595   struct GNUNET_HashCode hash;
596
597   GNUNET_SET_element_hash (el, &hash);
598
599   return GNUNET_h2s (&hash);
600 }
601
602 static const char *
603 debug_str_task_key (struct TaskKey *tk)
604 {
605   static char buf[256];
606
607   snprintf (buf, sizeof (buf),
608             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
609             phasename (tk->kind), tk->peer1, tk->peer2,
610             tk->leader, tk->repetition);
611
612   return buf;
613 }
614
615 static const char *
616 debug_str_diff_key (struct DiffKey *dk)
617 {
618   static char buf[256];
619
620   snprintf (buf, sizeof (buf),
621             "DiffKey kind=%s, k1=%d, k2=%d",
622             diffname (dk->diff_kind), dk->k1, dk->k2);
623
624   return buf;
625 }
626
627 static const char *
628 debug_str_set_key (const struct SetKey *sk)
629 {
630   static char buf[256];
631
632   snprintf (buf, sizeof (buf),
633             "SetKey kind=%s, k1=%d, k2=%d",
634             setname (sk->set_kind), sk->k1, sk->k2);
635
636   return buf;
637 }
638
639
640 static const char *
641 debug_str_rfn_key (const struct RfnKey *rk)
642 {
643   static char buf[256];
644
645   snprintf (buf, sizeof (buf),
646             "RfnKey kind=%s, k1=%d, k2=%d",
647             rfnname (rk->rfn_kind), rk->k1, rk->k2);
648
649   return buf;
650 }
651
652 #endif /* GNUNET_EXTRA_LOGGING */
653
654
655 /**
656  * Destroy a session, free all resources associated with it.
657  *
658  * @param session the session to destroy
659  */
660 static void
661 destroy_session (struct ConsensusSession *session)
662 {
663   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
664   if (NULL != session->set_listener)
665   {
666     GNUNET_SET_listen_cancel (session->set_listener);
667     session->set_listener = NULL;
668   }
669   if (NULL != session->client_mq)
670   {
671     GNUNET_MQ_destroy (session->client_mq);
672     session->client_mq = NULL;
673     /* The MQ cleanup will also disconnect the underlying client. */
674     session->client = NULL;
675   }
676   if (NULL != session->client)
677   {
678     GNUNET_SERVER_client_disconnect (session->client);
679     session->client = NULL;
680   }
681   GNUNET_free (session);
682 }
683
684
685 /**
686  * Send the final result set of the consensus to the client, element by
687  * element.
688  *
689  * @param cls closure
690  * @param element the current element, NULL if all elements have been
691  *        iterated over
692  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
693  */
694 static int
695 send_to_client_iter (void *cls,
696                      const struct GNUNET_SET_Element *element)
697 {
698   struct TaskEntry *task = (struct TaskEntry *) cls;
699   struct ConsensusSession *session = task->step->session;
700   struct GNUNET_MQ_Envelope *ev;
701
702   if (NULL != element)
703   {
704     struct GNUNET_CONSENSUS_ElementMessage *m;
705
706     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707                 "P%d: sending element %s to client\n",
708                 session->local_peer_idx,
709                 debug_str_element (element));
710
711     ev = GNUNET_MQ_msg_extra (m, element->size,
712                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
713     m->element_type = htons (element->element_type);
714     memcpy (&m[1], element->data, element->size);
715     GNUNET_MQ_send (session->client_mq, ev);
716   }
717   else
718   {
719     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720                 "P%d: finished iterating elements for client\n",
721                 session->local_peer_idx);
722     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
723     GNUNET_MQ_send (session->client_mq, ev);
724   }
725   return GNUNET_YES;
726 }
727
728
729 static struct SetEntry *
730 lookup_set (struct ConsensusSession *session, struct SetKey *key)
731 {
732   struct GNUNET_HashCode hash;
733
734   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735               "P%u: looking up set {%s}\n",
736               session->local_peer_idx,
737               debug_str_set_key (key));
738
739   GNUNET_assert (SET_KIND_NONE != key->set_kind);
740   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
741   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
742 }
743
744
745 static struct DiffEntry *
746 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
747 {
748   struct GNUNET_HashCode hash;
749
750   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751               "P%u: looking up diff {%s}\n",
752               session->local_peer_idx,
753               debug_str_diff_key (key));
754
755   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
756   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
757   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
758 }
759
760
761 static struct ReferendumEntry *
762 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
763 {
764   struct GNUNET_HashCode hash;
765
766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767               "P%u: looking up rfn {%s}\n",
768               session->local_peer_idx,
769               debug_str_rfn_key (key));
770
771   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
772   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
773   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
774 }
775
776
777 static void
778 diff_insert (struct DiffEntry *diff,
779              int weight,
780              const struct GNUNET_SET_Element *element)
781 {
782   struct DiffElementInfo *di;
783   struct GNUNET_HashCode hash;
784
785   GNUNET_assert ( (1 == weight) || (-1 == weight));
786
787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788               "diff_insert with element size %u\n",
789               element->size);
790
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "hashing element\n");
793
794   GNUNET_SET_element_hash (element, &hash);
795
796   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
797               "hashed element\n");
798
799   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
800
801   if (NULL == di)
802   {
803     di = GNUNET_new (struct DiffElementInfo);
804     di->element = GNUNET_SET_element_dup (element);
805     GNUNET_assert (GNUNET_OK ==
806                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
807                                                       &hash, di,
808                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
809   }
810
811   di->weight = weight;
812 }
813
814
815 static void
816 rfn_commit (struct ReferendumEntry *rfn,
817             uint16_t commit_peer)
818 {
819   GNUNET_assert (commit_peer < rfn->num_peers);
820
821   rfn->peer_commited[commit_peer] = GNUNET_YES;
822 }
823
824
825 static void
826 rfn_contest (struct ReferendumEntry *rfn,
827              uint16_t contested_peer)
828 {
829   GNUNET_assert (contested_peer < rfn->num_peers);
830
831   rfn->peer_contested[contested_peer] = GNUNET_YES;
832 }
833
834
835 static uint16_t
836 rfn_noncontested (struct ReferendumEntry *rfn)
837 {
838   uint16_t i;
839   uint16_t ret;
840
841   ret = 0;
842   for (i = 0; i < rfn->num_peers; i++)
843     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
844       ret++;
845
846   return ret;
847 }
848
849
850 static void
851 rfn_vote (struct ReferendumEntry *rfn,
852           uint16_t voting_peer,
853           enum ReferendumVote vote,
854           const struct GNUNET_SET_Element *element)
855 {
856   struct RfnElementInfo *ri;
857   struct GNUNET_HashCode hash;
858
859   GNUNET_assert (voting_peer < rfn->num_peers);
860
861   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
862      since VOTE_KEEP is implicit in not voting. */
863   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
864
865   GNUNET_SET_element_hash (element, &hash);
866   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
867
868   if (NULL == ri)
869   {
870     ri = GNUNET_new (struct RfnElementInfo);
871     ri->element = GNUNET_SET_element_dup (element);
872     ri->votes = GNUNET_new_array (rfn->num_peers, int);
873     GNUNET_assert (GNUNET_OK ==
874                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
875                                                       &hash, ri,
876                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
877   }
878
879   ri->votes[voting_peer] = GNUNET_YES;
880   ri->proposal = vote;
881 }
882
883
884 uint16_t
885 task_other_peer (struct TaskEntry *task)
886 {
887   uint16_t me = task->step->session->local_peer_idx;
888   if (task->key.peer1 == me)
889     return task->key.peer2;
890   return task->key.peer1;
891 }
892
893
894 /**
895  * Callback for set operation results. Called for each element
896  * in the result set.
897  *
898  * @param cls closure
899  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
900  * @param status see enum GNUNET_SET_Status
901  */
902 static void
903 set_result_cb (void *cls,
904                const struct GNUNET_SET_Element *element,
905                enum GNUNET_SET_Status status)
906 {
907   struct TaskEntry *task = cls;
908   struct ConsensusSession *session = task->step->session;
909   struct SetEntry *output_set = NULL;
910   struct DiffEntry *output_diff = NULL;
911   struct ReferendumEntry *output_rfn = NULL;
912   unsigned int other_idx;
913   struct SetOpCls *setop;
914
915   setop = &task->cls.setop;
916
917
918   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919               "P%u: got set result for {%s}, status %u\n",
920               session->local_peer_idx,
921               debug_str_task_key (&task->key),
922               status);
923
924   if (GNUNET_NO == task->is_started)
925   {
926     GNUNET_break_op (0);
927     return;
928   }
929
930   if (GNUNET_YES == task->is_finished)
931   {
932     GNUNET_break_op (0);
933     return;
934   }
935
936   other_idx = task_other_peer (task);
937
938   if (SET_KIND_NONE != setop->output_set.set_kind)
939   {
940     output_set = lookup_set (session, &setop->output_set);
941     GNUNET_assert (NULL != output_set);
942   }
943
944   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
945   {
946     output_diff = lookup_diff (session, &setop->output_diff);
947     GNUNET_assert (NULL != output_diff);
948   }
949
950   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
951   {
952     output_rfn = lookup_rfn (session, &setop->output_rfn);
953     GNUNET_assert (NULL != output_rfn);
954   }
955
956   if (GNUNET_YES == session->peers_blacklisted[other_idx])
957   {
958     /* Peer might have been blacklisted
959        by a gradecast running in parallel, ignore elements from now */
960     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
961       return;
962     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
963       return;
964   }
965
966   if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
967   {
968     if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
969     {
970       GNUNET_assert (NULL != output_rfn);
971       rfn_contest (output_rfn, task_other_peer (task));
972       return;
973     }
974   }
975
976   switch (status)
977   {
978     case GNUNET_SET_STATUS_ADD_LOCAL:
979       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980                   "Adding element in Task {%s}\n",
981                   debug_str_task_key (&task->key));
982       if (NULL != output_set)
983       {
984         // FIXME: record pending adds, use callback
985         GNUNET_SET_add_element (output_set->h,
986                                 element,
987                                 NULL,
988                                 NULL);
989 #ifdef GNUNET_EXTRA_LOGGING
990         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
991                     "P%u: adding element %s into set {%s} of task {%s}\n",
992                     session->local_peer_idx,
993                     debug_str_element (element),
994                     debug_str_set_key (&setop->output_set),
995                     debug_str_task_key (&task->key));
996 #endif
997       }
998       if (NULL != output_diff)
999       {
1000         diff_insert (output_diff, 1, element);
1001 #ifdef GNUNET_EXTRA_LOGGING
1002         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1003                     "P%u: adding element %s into diff {%s} of task {%s}\n",
1004                     session->local_peer_idx,
1005                     debug_str_element (element),
1006                     debug_str_diff_key (&setop->output_diff),
1007                     debug_str_task_key (&task->key));
1008 #endif
1009       }
1010       if (NULL != output_rfn)
1011       {
1012         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1013 #ifdef GNUNET_EXTRA_LOGGING
1014         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1015                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
1016                     session->local_peer_idx,
1017                     debug_str_element (element),
1018                     debug_str_rfn_key (&setop->output_rfn),
1019                     debug_str_task_key (&task->key));
1020 #endif
1021       }
1022       // XXX: add result to structures in task
1023       break;
1024     case GNUNET_SET_STATUS_ADD_REMOTE:
1025       if (GNUNET_YES == setop->do_not_remove)
1026         break;
1027       if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
1028         break;
1029       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030                   "Removing element in Task {%s}\n",
1031                   debug_str_task_key (&task->key));
1032       if (NULL != output_set)
1033       {
1034         // FIXME: record pending adds, use callback
1035         GNUNET_SET_remove_element (output_set->h,
1036                                    element,
1037                                    NULL,
1038                                    NULL);
1039 #ifdef GNUNET_EXTRA_LOGGING
1040         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1041                     "P%u: removing element %s from set {%s} of task {%s}\n",
1042                     session->local_peer_idx,
1043                     debug_str_element (element),
1044                     debug_str_set_key (&setop->output_set),
1045                     debug_str_task_key (&task->key));
1046 #endif
1047       }
1048       if (NULL != output_diff)
1049       {
1050         diff_insert (output_diff, -1, element);
1051 #ifdef GNUNET_EXTRA_LOGGING
1052         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1053                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1054                     session->local_peer_idx,
1055                     debug_str_element (element),
1056                     debug_str_diff_key (&setop->output_diff),
1057                     debug_str_task_key (&task->key));
1058 #endif
1059       }
1060       if (NULL != output_rfn)
1061       {
1062         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1063 #ifdef GNUNET_EXTRA_LOGGING
1064         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1065                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1066                     session->local_peer_idx,
1067                     debug_str_element (element),
1068                     debug_str_rfn_key (&setop->output_rfn),
1069                     debug_str_task_key (&task->key));
1070 #endif
1071       }
1072       break;
1073     case GNUNET_SET_STATUS_DONE:
1074       // XXX: check first if any changes to the underlying
1075       // set are still pending
1076       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077                   "Finishing setop in Task {%s}\n",
1078                   debug_str_task_key (&task->key));
1079       if (NULL != output_rfn)
1080       {
1081         rfn_commit (output_rfn, task_other_peer (task));
1082       }
1083       finish_task (task);
1084       break;
1085     case GNUNET_SET_STATUS_FAILURE:
1086       // XXX: cleanup
1087       GNUNET_break_op (0);
1088       finish_task (task);
1089       return;
1090     default:
1091       /* not reached */
1092       GNUNET_assert (0);
1093   }
1094 }
1095
1096 #ifdef EVIL
1097
1098 enum EvilnessType
1099 {
1100   EVILNESS_NONE,
1101   EVILNESS_CRAM_ALL,
1102   EVILNESS_CRAM_LEAD,
1103   EVILNESS_CRAM_ECHO,
1104   EVILNESS_SLACK,
1105 };
1106
1107 enum EvilnessSubType
1108 {
1109   EVILNESS_SUB_NONE,
1110   EVILNESS_SUB_REPLACEMENT,
1111   EVILNESS_SUB_NO_REPLACEMENT,
1112 };
1113
1114 struct Evilness
1115 {
1116   enum EvilnessType type;
1117   enum EvilnessSubType subtype;
1118   unsigned int num;
1119 };
1120
1121
1122 static int
1123 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1124 {
1125   if (0 == strcmp ("replace", evil_subtype_str))
1126   {
1127     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1128   }
1129   else if (0 == strcmp ("noreplace", evil_subtype_str))
1130   {
1131     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1132   }
1133   else
1134   {
1135     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1136                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1137                 evil_subtype_str);
1138     return GNUNET_SYSERR;
1139   }
1140   return GNUNET_OK;
1141 }
1142
1143
1144 static void
1145 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1146 {
1147   char *evil_spec;
1148   char *field;
1149   char *evil_type_str = NULL;
1150   char *evil_subtype_str = NULL;
1151
1152   GNUNET_assert (NULL != evil);
1153
1154   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1155   {
1156     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157                 "P%u: no evilness\n",
1158                 session->local_peer_idx);
1159     evil->type = EVILNESS_NONE;
1160     return;
1161   }
1162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163               "P%u: got evilness spec\n",
1164               session->local_peer_idx);
1165
1166   for (field = strtok (evil_spec, "/");
1167        NULL != field;
1168        field = strtok (NULL, "/"))
1169   {
1170     unsigned int peer_num;
1171     unsigned int evil_num;
1172     int ret;
1173
1174     evil_type_str = NULL;
1175     evil_subtype_str = NULL;
1176
1177     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1178
1179     if (ret != 4)
1180     {
1181       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1182                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1183                   field,
1184                   ret);
1185       goto not_evil;
1186     }
1187
1188     GNUNET_assert (NULL != evil_type_str);
1189     GNUNET_assert (NULL != evil_subtype_str);
1190
1191     if (peer_num == session->local_peer_idx)
1192     {
1193       if (0 == strcmp ("slack", evil_type_str))
1194       {
1195         evil->type = EVILNESS_SLACK;
1196       }
1197       else if (0 == strcmp ("cram-all", evil_type_str))
1198       {
1199         evil->type = EVILNESS_CRAM_ALL;
1200         evil->num = evil_num;
1201         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1202           goto not_evil;
1203       }
1204       else if (0 == strcmp ("cram-lead", evil_type_str))
1205       {
1206         evil->type = EVILNESS_CRAM_LEAD;
1207         evil->num = evil_num;
1208         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1209           goto not_evil;
1210       }
1211       else if (0 == strcmp ("cram-echo", evil_type_str))
1212       {
1213         evil->type = EVILNESS_CRAM_ECHO;
1214         evil->num = evil_num;
1215         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1216           goto not_evil;
1217       }
1218       else
1219       {
1220         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1221                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1222                     evil_type_str);
1223         goto not_evil;
1224       }
1225       goto cleanup;
1226     }
1227     /* No GNUNET_free since memory was allocated by libc */
1228     free (evil_type_str);
1229     evil_type_str = NULL;
1230     evil_subtype_str = NULL;
1231   }
1232 not_evil:
1233   evil->type = EVILNESS_NONE;
1234 cleanup:
1235   GNUNET_free (evil_spec);
1236   /* no GNUNET_free_non_null since it wasn't
1237    * allocated with GNUNET_malloc */
1238   if (NULL != evil_type_str)
1239     free (evil_type_str);
1240   if (NULL != evil_subtype_str)
1241     free (evil_subtype_str);
1242 }
1243
1244 #endif
1245
1246
1247 /**
1248  * Commit the appropriate set for a
1249  * task.
1250  */
1251 static void
1252 commit_set (struct ConsensusSession *session,
1253             struct TaskEntry *task)
1254 {
1255   struct SetEntry *set;
1256   struct SetOpCls *setop = &task->cls.setop;
1257
1258   GNUNET_assert (NULL != setop->op);
1259   set = lookup_set (session, &setop->input_set);
1260   GNUNET_assert (NULL != set);
1261
1262 #ifdef EVIL
1263   {
1264     unsigned int i;
1265     struct Evilness evil;
1266
1267     get_evilness (session, &evil);
1268     if (EVILNESS_NONE != evil.type)
1269     {
1270       /* Useful for evaluation */
1271       GNUNET_STATISTICS_set (statistics,
1272                              "is evil",
1273                              1,
1274                              GNUNET_NO);
1275     }
1276     switch (evil.type)
1277     {
1278       case EVILNESS_CRAM_ALL:
1279       case EVILNESS_CRAM_LEAD:
1280       case EVILNESS_CRAM_ECHO:
1281         /* We're not cramming elements in the
1282            all-to-all round, since that would just
1283            add more elements to the result set, but
1284            wouldn't test robustness. */
1285         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1286         {
1287           GNUNET_SET_commit (setop->op, set->h);
1288           break;
1289         }
1290         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1291             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1292         {
1293           GNUNET_SET_commit (setop->op, set->h);
1294           break;
1295         }
1296         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1297         {
1298           GNUNET_SET_commit (setop->op, set->h);
1299           break;
1300         }
1301         for (i = 0; i < evil.num; i++)
1302         {
1303           struct GNUNET_HashCode hash;
1304           struct GNUNET_SET_Element element;
1305           element.data = &hash;
1306           element.size = sizeof (struct GNUNET_HashCode);
1307           element.element_type = 0;
1308
1309           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1310           {
1311             /* Always generate a new element. */
1312             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
1313           }
1314           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1315           {
1316             /* Always cram the same elements, derived from counter. */
1317             GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
1318           }
1319           else
1320           {
1321             GNUNET_assert (0);
1322           }
1323           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1324 #ifdef GNUNET_EXTRA_LOGGING
1325           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1326                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1327                       session->local_peer_idx,
1328                       debug_str_element (&element),
1329                       debug_str_set_key (&setop->input_set),
1330                       debug_str_task_key (&task->key));
1331 #endif
1332         }
1333         GNUNET_STATISTICS_update (statistics,
1334                                   "# stuffed elements",
1335                                   evil.num,
1336                                   GNUNET_NO);
1337         GNUNET_SET_commit (setop->op, set->h);
1338         break;
1339       case EVILNESS_SLACK:
1340         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1341                     "P%u: evil peer: slacking\n",
1342                     (unsigned int) session->local_peer_idx);
1343         /* Do nothing. */
1344         break;
1345       case EVILNESS_NONE:
1346         GNUNET_SET_commit (setop->op, set->h);
1347         break;
1348     }
1349   }
1350 #else
1351   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1352   {
1353     struct GNUNET_SET_Element element;
1354     struct ContestedPayload payload;
1355     element.data = &payload;
1356     element.size = sizeof (struct ContestedPayload);
1357     element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1358     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1359   }
1360   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1361   {
1362     GNUNET_SET_commit (setop->op, set->h);
1363   }
1364   else
1365   {
1366     /* For our testcases, we don't want the blacklisted
1367        peers to wait. */
1368     GNUNET_SET_operation_cancel (setop->op);
1369     setop->op = NULL;
1370   }
1371 #endif
1372 }
1373
1374
1375 static void
1376 put_diff (struct ConsensusSession *session,
1377          struct DiffEntry *diff)
1378 {
1379   struct GNUNET_HashCode hash;
1380
1381   GNUNET_assert (NULL != diff);
1382
1383   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1384   GNUNET_assert (GNUNET_OK ==
1385                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1386                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1387 }
1388
1389 static void
1390 put_set (struct ConsensusSession *session,
1391          struct SetEntry *set)
1392 {
1393   struct GNUNET_HashCode hash;
1394
1395   GNUNET_assert (NULL != set->h);
1396
1397   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1398               "Putting set %s\n",
1399               debug_str_set_key (&set->key));
1400
1401   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1402   GNUNET_assert (GNUNET_SYSERR !=
1403                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1404                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1405 }
1406
1407
1408 static void
1409 put_rfn (struct ConsensusSession *session,
1410          struct ReferendumEntry *rfn)
1411 {
1412   struct GNUNET_HashCode hash;
1413
1414   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1415   GNUNET_assert (GNUNET_OK ==
1416                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1417                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1418 }
1419
1420
1421
1422 static void
1423 task_cancel_reconcile (struct TaskEntry *task)
1424 {
1425   /* not implemented yet */
1426   GNUNET_assert (0);
1427 }
1428
1429
1430 static void
1431 apply_diff_to_rfn (struct DiffEntry *diff,
1432                    struct ReferendumEntry *rfn,
1433                    uint16_t voting_peer,
1434                    uint16_t num_peers)
1435 {
1436   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1437   struct DiffElementInfo *di;
1438
1439   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1440
1441   while (GNUNET_YES ==
1442          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1443                                                       NULL,
1444                                                       (const void **) &di))
1445   {
1446     if (di->weight > 0)
1447     {
1448       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1449     }
1450     if (di->weight < 0)
1451     {
1452       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1453     }
1454   }
1455
1456   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1457 }
1458
1459
1460 struct DiffEntry *
1461 diff_create ()
1462 {
1463   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1464
1465   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1466
1467   return d;
1468 }
1469
1470
1471 struct DiffEntry *
1472 diff_compose (struct DiffEntry *diff_1,
1473               struct DiffEntry *diff_2)
1474 {
1475   struct DiffEntry *diff_new;
1476   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1477   struct DiffElementInfo *di;
1478
1479   diff_new = diff_create ();
1480
1481   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1482   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1483   {
1484     diff_insert (diff_new, di->weight, di->element);
1485   }
1486   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1487
1488   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1489   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1490   {
1491     diff_insert (diff_new, di->weight, di->element);
1492   }
1493   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1494
1495   return diff_new;
1496 }
1497
1498
1499 struct ReferendumEntry *
1500 rfn_create (uint16_t size)
1501 {
1502   struct ReferendumEntry *rfn;
1503
1504   rfn = GNUNET_new (struct ReferendumEntry);
1505   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1506   rfn->peer_commited = GNUNET_new_array (size, int);
1507   rfn->peer_contested = GNUNET_new_array (size, int);
1508   rfn->num_peers = size;
1509
1510   return rfn;
1511 }
1512
1513
1514 void
1515 diff_destroy (struct DiffEntry *diff)
1516 {
1517   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1518   GNUNET_free (diff);
1519 }
1520
1521
1522 /**
1523  * For a given majority, count what the outcome
1524  * is (add/remove/keep), and give the number
1525  * of peers that voted for this outcome.
1526  */
1527 static void
1528 rfn_majority (const struct ReferendumEntry *rfn,
1529               const struct RfnElementInfo *ri,
1530               uint16_t *ret_majority,
1531               enum ReferendumVote *ret_vote)
1532 {
1533   uint16_t votes_yes = 0;
1534   uint16_t num_commited = 0;
1535   uint16_t i;
1536
1537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1538               "Computing rfn majority for element %s of rfn {%s}\n",
1539               debug_str_element (ri->element),
1540               debug_str_rfn_key (&rfn->key));
1541
1542   for (i = 0; i < rfn->num_peers; i++)
1543   {
1544     if (GNUNET_NO == rfn->peer_commited[i])
1545       continue;
1546     num_commited++;
1547
1548     if (GNUNET_YES == ri->votes[i])
1549       votes_yes++;
1550   }
1551
1552   if (votes_yes > (num_commited) / 2)
1553   {
1554     *ret_vote = ri->proposal;
1555     *ret_majority = votes_yes;
1556   }
1557   else
1558   {
1559     *ret_vote = VOTE_STAY;
1560     *ret_majority = num_commited - votes_yes;
1561   }
1562 }
1563
1564
1565 struct SetCopyCls
1566 {
1567   struct TaskEntry *task;
1568   struct SetKey dst_set_key;
1569 };
1570
1571
1572 static void
1573 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1574 {
1575   struct SetCopyCls *scc = cls;
1576   struct TaskEntry *task = scc->task;
1577   struct SetKey dst_set_key = scc->dst_set_key;
1578   struct SetEntry *set;
1579
1580   GNUNET_free (scc);
1581   set = GNUNET_new (struct SetEntry);
1582   set->h = copy;
1583   set->key = dst_set_key;
1584   put_set (task->step->session, set);
1585
1586   task->start (task);
1587 }
1588
1589
1590 /**
1591  * Call the start function of the given
1592  * task again after we created a copy of the given set.
1593  */
1594 static void
1595 create_set_copy_for_task (struct TaskEntry *task,
1596                           struct SetKey *src_set_key,
1597                           struct SetKey *dst_set_key)
1598 {
1599   struct SetEntry *src_set;
1600   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1601
1602   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603               "Copying set {%s} to {%s} for task {%s}\n",
1604               debug_str_set_key (src_set_key),
1605               debug_str_set_key (dst_set_key),
1606               debug_str_task_key (&task->key));
1607
1608   scc->task = task;
1609   scc->dst_set_key = *dst_set_key;
1610   src_set = lookup_set (task->step->session, src_set_key);
1611   GNUNET_assert (NULL != src_set);
1612   GNUNET_SET_copy_lazy (src_set->h,
1613                         set_copy_cb,
1614                         scc);
1615 }
1616
1617
1618 struct SetMutationProgressCls
1619 {
1620   int num_pending;
1621   /**
1622    * Task to finish once all changes are through.
1623    */
1624   struct TaskEntry *task;
1625 };
1626
1627
1628 static void
1629 set_mutation_done (void *cls)
1630 {
1631   struct SetMutationProgressCls *pc = cls;
1632
1633   GNUNET_assert (pc->num_pending > 0);
1634
1635   pc->num_pending--;
1636
1637   if (0 == pc->num_pending)
1638   {
1639     struct TaskEntry *task = pc->task;
1640     GNUNET_free (pc);
1641     finish_task (task);
1642   }
1643 }
1644
1645
1646 static void
1647 try_finish_step_early (struct Step *step)
1648 {
1649   unsigned int i;
1650
1651   if (GNUNET_YES == step->is_running)
1652     return;
1653   if (GNUNET_YES == step->is_finished)
1654     return;
1655   if (GNUNET_NO == step->early_finishable)
1656     return;
1657
1658   step->is_finished = GNUNET_YES;
1659
1660 #ifdef GNUNET_EXTRA_LOGGING
1661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1662               "Finishing step `%s' early.\n",
1663               step->debug_name);
1664 #endif
1665
1666   for (i = 0; i < step->subordinates_len; i++)
1667   {
1668     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1669     step->subordinates[i]->pending_prereq--;
1670 #ifdef GNUNET_EXTRA_LOGGING
1671     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672                 "Decreased pending_prereq to %u for step `%s'.\n",
1673                 (unsigned int) step->subordinates[i]->pending_prereq,
1674                 step->subordinates[i]->debug_name);
1675
1676 #endif
1677     try_finish_step_early (step->subordinates[i]);
1678   }
1679
1680   // XXX: maybe schedule as task to avoid recursion?
1681   run_ready_steps (step->session);
1682 }
1683
1684
1685 static void
1686 finish_step (struct Step *step)
1687 {
1688   unsigned int i;
1689
1690   GNUNET_assert (step->finished_tasks == step->tasks_len);
1691   GNUNET_assert (GNUNET_YES == step->is_running);
1692   GNUNET_assert (GNUNET_NO == step->is_finished);
1693
1694 #ifdef GNUNET_EXTRA_LOGGING
1695   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1696               "All tasks of step `%s' with %u subordinates finished.\n",
1697               step->debug_name,
1698               step->subordinates_len);
1699 #endif
1700
1701   for (i = 0; i < step->subordinates_len; i++)
1702   {
1703     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1704     step->subordinates[i]->pending_prereq--;
1705 #ifdef GNUNET_EXTRA_LOGGING
1706     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1707                 "Decreased pending_prereq to %u for step `%s'.\n",
1708                 (unsigned int) step->subordinates[i]->pending_prereq,
1709                 step->subordinates[i]->debug_name);
1710
1711 #endif
1712   }
1713
1714   step->is_finished = GNUNET_YES;
1715
1716   // XXX: maybe schedule as task to avoid recursion?
1717   run_ready_steps (step->session);
1718 }
1719
1720
1721
1722 /**
1723  * Apply the result from one round of gradecasts (i.e. every peer
1724  * should have gradecasted) to the peer's current set.
1725  *
1726  * @param task the task with context information
1727  */
1728 static void
1729 task_start_apply_round (struct TaskEntry *task)
1730 {
1731   struct ConsensusSession *session = task->step->session;
1732   struct SetKey sk_in;
1733   struct SetKey sk_out;
1734   struct RfnKey rk_in;
1735   struct SetEntry *set_out;
1736   struct ReferendumEntry *rfn_in;
1737   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1738   struct RfnElementInfo *ri;
1739   struct SetMutationProgressCls *progress_cls;
1740   uint16_t worst_majority = UINT16_MAX;
1741
1742   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1743   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1744   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1745
1746   set_out = lookup_set (session, &sk_out);
1747   if (NULL == set_out)
1748   {
1749     create_set_copy_for_task (task, &sk_in, &sk_out);
1750     return;
1751   }
1752
1753   rfn_in = lookup_rfn (session, &rk_in);
1754   GNUNET_assert (NULL != rfn_in);
1755
1756   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1757   progress_cls->task = task;
1758
1759   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1760
1761   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1762   {
1763     uint16_t majority_num;
1764     enum ReferendumVote majority_vote;
1765
1766     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1767
1768     if (worst_majority > majority_num)
1769       worst_majority = majority_num;
1770
1771     switch (majority_vote)
1772     {
1773       case VOTE_ADD:
1774         progress_cls->num_pending++;
1775         GNUNET_assert (GNUNET_OK ==
1776                        GNUNET_SET_add_element (set_out->h,
1777                                                ri->element,
1778                                                set_mutation_done,
1779                                                progress_cls));
1780         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1781                     "P%u: apply round: adding element %s with %u-majority.\n",
1782                     session->local_peer_idx,
1783                     debug_str_element (ri->element), majority_num);
1784         break;
1785       case VOTE_REMOVE:
1786         progress_cls->num_pending++;
1787         GNUNET_assert (GNUNET_OK ==
1788                        GNUNET_SET_remove_element (set_out->h,
1789                                                   ri->element,
1790                                                   set_mutation_done,
1791                                                   progress_cls));
1792         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1793                     "P%u: apply round: deleting element %s with %u-majority.\n",
1794                     session->local_peer_idx,
1795                     debug_str_element (ri->element), majority_num);
1796         break;
1797       case VOTE_STAY:
1798         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1799                     "P%u: apply round: keeping element %s with %u-majority.\n",
1800                     session->local_peer_idx,
1801                     debug_str_element (ri->element), majority_num);
1802         // do nothing
1803         break;
1804       default:
1805         GNUNET_assert (0);
1806         break;
1807     }
1808   }
1809
1810   if (progress_cls->num_pending == 0)
1811   {
1812     // call closure right now, no pending ops
1813     GNUNET_free (progress_cls);
1814     finish_task (task);
1815   }
1816
1817   {
1818     uint16_t thresh = (session->num_peers / 3) * 2;
1819
1820     if (worst_majority >= thresh)
1821     {
1822       switch (session->early_stopping)
1823       {
1824         case EARLY_STOPPING_NONE:
1825           session->early_stopping = EARLY_STOPPING_ONE_MORE;
1826           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1827                       "P%u: Stopping early (after one more superround)\n",
1828                       session->local_peer_idx);
1829           break;
1830         case EARLY_STOPPING_ONE_MORE:
1831           GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1832                       session->local_peer_idx);
1833           session->early_stopping = EARLY_STOPPING_DONE;
1834           {
1835             struct Step *step;
1836             for (step = session->steps_head; NULL != step; step = step->next)
1837               try_finish_step_early (step);
1838           }
1839           break;
1840         case EARLY_STOPPING_DONE:
1841           /* We shouldn't be here anymore after early stopping */
1842           GNUNET_break (0);
1843           break;
1844         default:
1845           GNUNET_assert (0);
1846           break;
1847       }
1848     }
1849     else if (EARLY_STOPPING_NONE != session->early_stopping)
1850     {
1851       // Our assumption about the number of bad peers
1852       // has been broken.
1853       GNUNET_break_op (0);
1854     }
1855     else
1856     {
1857       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1858                   session->local_peer_idx);
1859     }
1860   }
1861
1862 }
1863
1864
1865 static void
1866 task_start_grade (struct TaskEntry *task)
1867 {
1868   struct ConsensusSession *session = task->step->session;
1869   struct ReferendumEntry *output_rfn;
1870   struct ReferendumEntry *input_rfn;
1871   struct DiffEntry *input_diff;
1872   struct RfnKey rfn_key;
1873   struct DiffKey diff_key;
1874   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1875   struct RfnElementInfo *ri;
1876   unsigned int gradecast_confidence = 2;
1877
1878   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1879   output_rfn = lookup_rfn (session, &rfn_key);
1880   if (NULL == output_rfn)
1881   {
1882     output_rfn = rfn_create (session->num_peers);
1883     output_rfn->key = rfn_key;
1884     put_rfn (session, output_rfn);
1885   }
1886
1887   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1888   input_diff = lookup_diff (session, &diff_key);
1889   GNUNET_assert (NULL != input_diff);
1890
1891   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1892   input_rfn = lookup_rfn (session, &rfn_key);
1893   GNUNET_assert (NULL != input_rfn);
1894
1895   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1896
1897   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1898
1899   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1900   {
1901     uint16_t majority_num;
1902     enum ReferendumVote majority_vote;
1903
1904     // XXX: we need contested votes and non-contested votes here
1905     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1906
1907     if (majority_num <= session->num_peers / 3)
1908       majority_vote = VOTE_REMOVE;
1909
1910     switch (majority_vote)
1911     {
1912       case VOTE_STAY:
1913         break;
1914       case VOTE_ADD:
1915         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1916         break;
1917       case VOTE_REMOVE:
1918         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1919         break;
1920       default:
1921         GNUNET_assert (0);
1922         break;
1923     }
1924   }
1925
1926   {
1927     uint16_t noncontested;
1928     noncontested = rfn_noncontested (input_rfn);
1929     if (noncontested < (session->num_peers / 3) * 2)
1930     {
1931       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1932     }
1933     if (noncontested < (session->num_peers / 3) + 1)
1934     {
1935       gradecast_confidence = 0;
1936     }
1937   }
1938
1939   if (gradecast_confidence >= 1)
1940     rfn_commit (output_rfn, task->key.leader);
1941
1942   if (gradecast_confidence <= 1)
1943     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1944
1945   finish_task (task);
1946 }
1947
1948
1949 static void
1950 task_start_reconcile (struct TaskEntry *task)
1951 {
1952   struct SetEntry *input;
1953   struct SetOpCls *setop = &task->cls.setop;
1954   struct ConsensusSession *session = task->step->session;
1955
1956   input = lookup_set (session, &setop->input_set);
1957   GNUNET_assert (NULL != input);
1958   GNUNET_assert (NULL != input->h);
1959
1960   /* We create the outputs for the operation here
1961      (rather than in the set operation callback)
1962      because we want something valid in there, even
1963      if the other peer doesn't talk to us */
1964
1965   if (SET_KIND_NONE != setop->output_set.set_kind)
1966   {
1967     /* If we don't have an existing output set,
1968        we clone the input set. */
1969     if (NULL == lookup_set (session, &setop->output_set))
1970     {
1971       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1972       return;
1973     }
1974   }
1975
1976   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1977   {
1978     if (NULL == lookup_rfn (session, &setop->output_rfn))
1979     {
1980       struct ReferendumEntry *rfn;
1981
1982       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1983                   "P%u: output rfn <%s> missing, creating.\n",
1984                   session->local_peer_idx,
1985                   debug_str_rfn_key (&setop->output_rfn));
1986
1987       rfn = rfn_create (session->num_peers);
1988       rfn->key = setop->output_rfn;
1989       put_rfn (session, rfn);
1990     }
1991   }
1992
1993   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1994   {
1995     if (NULL == lookup_diff (session, &setop->output_diff))
1996     {
1997       struct DiffEntry *diff;
1998
1999       diff = diff_create ();
2000       diff->key = setop->output_diff;
2001       put_diff (session, diff);
2002     }
2003   }
2004
2005   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2006   {
2007     /* XXX: mark the corresponding rfn as commited if necessary */
2008     finish_task (task);
2009     return;
2010   }
2011
2012   if (task->key.peer1 == session->local_peer_idx)
2013   {
2014     struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
2015
2016     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2017                 "P%u: Looking up set {%s} to run remote union\n",
2018                 session->local_peer_idx,
2019                 debug_str_set_key (&setop->input_set));
2020
2021     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2022     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2023
2024     rcm.kind = htons (task->key.kind);
2025     rcm.peer1 = htons (task->key.peer1);
2026     rcm.peer2 = htons (task->key.peer2);
2027     rcm.leader = htons (task->key.leader);
2028     rcm.repetition = htons (task->key.repetition);
2029
2030     GNUNET_assert (NULL == setop->op);
2031     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2032                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2033
2034     // XXX: maybe this should be done while
2035     // setting up tasks alreays?
2036     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2037                                     &session->global_id,
2038                                     &rcm.header,
2039                                     GNUNET_SET_RESULT_SYMMETRIC,
2040                                     set_result_cb,
2041                                     task);
2042
2043     commit_set (session, task);
2044   }
2045   else if (task->key.peer2 == session->local_peer_idx)
2046   {
2047     /* Wait for the other peer to contact us */
2048     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2049                 session->local_peer_idx, task->key.peer1);
2050
2051     if (NULL != setop->op)
2052     {
2053       commit_set (session, task);
2054     }
2055   }
2056   else
2057   {
2058     /* We made an error while constructing the task graph. */
2059     GNUNET_assert (0);
2060   }
2061 }
2062
2063
2064 static void
2065 task_start_eval_echo (struct TaskEntry *task)
2066 {
2067   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2068   struct ReferendumEntry *input_rfn;
2069   struct RfnElementInfo *ri;
2070   struct SetEntry *output_set;
2071   struct SetMutationProgressCls *progress_cls;
2072   struct ConsensusSession *session = task->step->session;
2073   struct SetKey sk_in;
2074   struct SetKey sk_out;
2075   struct RfnKey rk_in;
2076
2077   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2078   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2079   output_set = lookup_set (session, &sk_out);
2080   if (NULL == output_set)
2081   {
2082     create_set_copy_for_task (task, &sk_in, &sk_out);
2083     return;
2084   }
2085
2086
2087   {
2088     // FIXME: should be marked as a shallow copy, so
2089     // we can destroy everything correctly
2090     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2091     last_set->h = output_set->h;
2092     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2093     put_set (session, last_set);
2094   }
2095
2096   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2097               "Evaluating referendum in Task {%s}\n",
2098               debug_str_task_key (&task->key));
2099
2100   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2101   progress_cls->task = task;
2102
2103   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2104   input_rfn = lookup_rfn (session, &rk_in);
2105
2106   GNUNET_assert (NULL != input_rfn);
2107
2108   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2109   GNUNET_assert (NULL != iter);
2110
2111   while (GNUNET_YES ==
2112          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2113                                                       NULL,
2114                                                       (const void **) &ri))
2115   {
2116     enum ReferendumVote majority_vote;
2117     uint16_t majority_num;
2118
2119     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2120
2121     if (majority_num < session->num_peers / 3)
2122     {
2123       /* It is not the case that all nonfaulty peers
2124          echoed the same value.  Since we're doing a set reconciliation, we
2125          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2126          reconciliation as contested.  Other peers might not know that the
2127          leader is faulty, thus we still re-distribute in the confirmation
2128          round. */
2129       output_set->is_contested = GNUNET_YES;
2130     }
2131
2132     switch (majority_vote)
2133     {
2134       case VOTE_ADD:
2135         progress_cls->num_pending++;
2136         GNUNET_assert (GNUNET_OK ==
2137                        GNUNET_SET_add_element (output_set->h,
2138                                                ri->element,
2139                                                set_mutation_done,
2140                                                progress_cls));
2141         break;
2142       case VOTE_REMOVE:
2143         progress_cls->num_pending++;
2144         GNUNET_assert (GNUNET_OK ==
2145                        GNUNET_SET_remove_element (output_set->h,
2146                                                   ri->element,
2147                                                   set_mutation_done,
2148                                                   progress_cls));
2149         break;
2150       case VOTE_STAY:
2151         /* Nothing to do. */
2152         break;
2153       default:
2154         /* not reached */
2155         GNUNET_assert (0);
2156     }
2157   }
2158
2159   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2160
2161   if (progress_cls->num_pending == 0)
2162   {
2163     // call closure right now, no pending ops
2164     GNUNET_free (progress_cls);
2165     finish_task (task);
2166   }
2167 }
2168
2169
2170 static void
2171 task_start_finish (struct TaskEntry *task)
2172 {
2173   struct SetEntry *final_set;
2174   struct ConsensusSession *session = task->step->session;
2175
2176   final_set = lookup_set (session, &task->cls.finish.input_set);
2177
2178   GNUNET_assert (NULL != final_set);
2179
2180
2181   GNUNET_SET_iterate (final_set->h,
2182                       send_to_client_iter,
2183                       task);
2184 }
2185
2186 static void
2187 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2188 {
2189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2190
2191   GNUNET_assert (GNUNET_NO == task->is_started);
2192   GNUNET_assert (GNUNET_NO == task->is_finished);
2193   GNUNET_assert (NULL != task->start);
2194
2195   task->start (task);
2196
2197   task->is_started = GNUNET_YES;
2198 }
2199
2200
2201
2202
2203 /*
2204  * Run all steps of the session that don't any
2205  * more dependencies.
2206  */
2207 static void
2208 run_ready_steps (struct ConsensusSession *session)
2209 {
2210   struct Step *step;
2211
2212   step = session->steps_head;
2213
2214   while (NULL != step)
2215   {
2216     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2217     {
2218       size_t i;
2219
2220       GNUNET_assert (0 == step->finished_tasks);
2221
2222 #ifdef GNUNET_EXTRA_LOGGING
2223       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2224                   session->local_peer_idx,
2225                   step->debug_name,
2226                   step->round, step->tasks_len, step->subordinates_len);
2227 #endif
2228
2229       step->is_running = GNUNET_YES;
2230       for (i = 0; i < step->tasks_len; i++)
2231         start_task (session, step->tasks[i]);
2232
2233       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2234       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2235         finish_step (step);
2236
2237       /* Running the next ready steps will be triggered by task completion */
2238       return;
2239     }
2240     step = step->next;
2241   }
2242
2243   return;
2244 }
2245
2246
2247
2248 static void
2249 finish_task (struct TaskEntry *task)
2250 {
2251   GNUNET_assert (GNUNET_NO == task->is_finished);
2252   task->is_finished = GNUNET_YES;
2253
2254   task->step->finished_tasks++;
2255
2256   if (task->step->finished_tasks == task->step->tasks_len)
2257     finish_step (task->step);
2258 }
2259
2260
2261 /**
2262  * Search peer in the list of peers in session.
2263  *
2264  * @param peer peer to find
2265  * @param session session with peer
2266  * @return index of peer, -1 if peer is not in session
2267  */
2268 static int
2269 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2270 {
2271   int i;
2272   for (i = 0; i < session->num_peers; i++)
2273     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2274       return i;
2275   return -1;
2276 }
2277
2278
2279 /**
2280  * Compute a global, (hopefully) unique consensus session id,
2281  * from the local id of the consensus session, and the identities of all participants.
2282  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2283  * exactly the same peers, the global id will be different.
2284  *
2285  * @param session session to generate the global id for
2286  * @param local_session_id local id of the consensus session
2287  */
2288 static void
2289 compute_global_id (struct ConsensusSession *session,
2290                    const struct GNUNET_HashCode *local_session_id)
2291 {
2292   const char *salt = "gnunet-service-consensus/session_id";
2293
2294   GNUNET_assert (GNUNET_YES ==
2295                  GNUNET_CRYPTO_kdf (&session->global_id,
2296                                     sizeof (struct GNUNET_HashCode),
2297                                     salt,
2298                                     strlen (salt),
2299                                     session->peers,
2300                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2301                                     local_session_id,
2302                                     sizeof (struct GNUNET_HashCode),
2303                                     NULL));
2304 }
2305
2306
2307 /**
2308  * Compare two peer identities.
2309  *
2310  * @param h1 some peer identity
2311  * @param h2 some peer identity
2312  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2313  */
2314 static int
2315 peer_id_cmp (const void *h1, const void *h2)
2316 {
2317   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2318 }
2319
2320
2321 /**
2322  * Create the sorted list of peers for the session,
2323  * add the local peer if not in the join message.
2324  */
2325 static void
2326 initialize_session_peer_list (struct ConsensusSession *session,
2327                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2328 {
2329   unsigned int local_peer_in_list;
2330   uint32_t listed_peers;
2331   const struct GNUNET_PeerIdentity *msg_peers;
2332   unsigned int i;
2333
2334   GNUNET_assert (NULL != join_msg);
2335
2336   /* peers in the join message, may or may not include the local peer */
2337   listed_peers = ntohl (join_msg->num_peers);
2338
2339   session->num_peers = listed_peers;
2340
2341   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2342
2343   local_peer_in_list = GNUNET_NO;
2344   for (i = 0; i < listed_peers; i++)
2345   {
2346     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2347     {
2348       local_peer_in_list = GNUNET_YES;
2349       break;
2350     }
2351   }
2352
2353   if (GNUNET_NO == local_peer_in_list)
2354     session->num_peers++;
2355
2356   session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2357
2358   if (GNUNET_NO == local_peer_in_list)
2359     session->peers[session->num_peers - 1] = my_peer;
2360
2361   memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2362   qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
2363 }
2364
2365
2366 static struct TaskEntry *
2367 lookup_task (struct ConsensusSession *session,
2368              struct TaskKey *key)
2369 {
2370   struct GNUNET_HashCode hash;
2371
2372
2373   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2374   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2375               GNUNET_h2s (&hash));
2376   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2377 }
2378
2379
2380 /**
2381  * Called when another peer wants to do a set operation with the
2382  * local peer.
2383  *
2384  * @param cls closure
2385  * @param other_peer the other peer
2386  * @param context_msg message with application specific information from
2387  *        the other peer
2388  * @param request request from the other peer, use GNUNET_SET_accept
2389  *        to accept it, otherwise the request will be refused
2390  *        Note that we don't use a return value here, as it is also
2391  *        necessary to specify the set we want to do the operation with,
2392  *        whith sometimes can be derived from the context message.
2393  *        Also necessary to specify the timeout.
2394  */
2395 static void
2396 set_listen_cb (void *cls,
2397                const struct GNUNET_PeerIdentity *other_peer,
2398                const struct GNUNET_MessageHeader *context_msg,
2399                struct GNUNET_SET_Request *request)
2400 {
2401   struct ConsensusSession *session = cls;
2402   struct TaskKey tk;
2403   struct TaskEntry *task;
2404   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2405
2406   if (NULL == context_msg)
2407   {
2408     GNUNET_break_op (0);
2409     return;
2410   }
2411
2412   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2413   {
2414     GNUNET_break_op (0);
2415     return;
2416   }
2417
2418   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2419   {
2420     GNUNET_break_op (0);
2421     return;
2422   }
2423
2424   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2425
2426   tk = ((struct TaskKey) {
2427       .kind = ntohs (cm->kind),
2428       .peer1 = ntohs (cm->peer1),
2429       .peer2 = ntohs (cm->peer2),
2430       .repetition = ntohs (cm->repetition),
2431       .leader = ntohs (cm->leader),
2432   });
2433
2434   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2435               session->local_peer_idx, debug_str_task_key (&tk));
2436
2437   task = lookup_task (session, &tk);
2438
2439   if (NULL == task)
2440   {
2441     GNUNET_break_op (0);
2442     return;
2443   }
2444
2445   if (GNUNET_YES == task->is_finished)
2446   {
2447     GNUNET_break_op (0);
2448     return;
2449   }
2450
2451   if (task->key.peer2 != session->local_peer_idx)
2452   {
2453     /* We're being asked, so we must be thne 2nd peer. */
2454     GNUNET_break_op (0);
2455     return;
2456   }
2457
2458   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2459                     (task->key.peer2 == session->local_peer_idx)));
2460
2461   task->cls.setop.op = GNUNET_SET_accept (request,
2462                                           GNUNET_SET_RESULT_SYMMETRIC,
2463                                           set_result_cb,
2464                                           task);
2465
2466   /* If the task hasn't been started yet,
2467      we wait for that until we commit. */
2468
2469   if (GNUNET_YES == task->is_started)
2470   {
2471     commit_set (session, task);
2472   }
2473 }
2474
2475
2476
2477 static void
2478 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2479           struct TaskEntry *t)
2480 {
2481   struct GNUNET_HashCode round_hash;
2482   struct Step *s;
2483
2484   GNUNET_assert (NULL != t->step);
2485
2486   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2487
2488   s = t->step;
2489
2490   if (s->tasks_len == s->tasks_cap)
2491   {
2492     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2493     GNUNET_array_grow (s->tasks,
2494                        s->tasks_cap,
2495                        target_size);
2496   }
2497
2498 #ifdef GNUNET_EXTRA_LOGGING
2499   GNUNET_assert (NULL != s->debug_name);
2500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2501               debug_str_task_key (&t->key),
2502               s->debug_name);
2503 #endif
2504
2505   s->tasks[s->tasks_len] = t;
2506   s->tasks_len++;
2507
2508   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2509   GNUNET_assert (GNUNET_OK ==
2510       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2511                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2512 }
2513
2514
2515 static void
2516 install_step_timeouts (struct ConsensusSession *session)
2517 {
2518   /* Given the fully constructed task graph
2519      with rounds for tasks, we can give the tasks timeouts. */
2520
2521   // unsigned int max_round;
2522
2523   /* XXX: implement! */
2524 }
2525
2526
2527
2528 /*
2529  * Arrange two peers in some canonical order.
2530  */
2531 static void
2532 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2533 {
2534   uint16_t a;
2535   uint16_t b;
2536
2537   GNUNET_assert (*p1 < n);
2538   GNUNET_assert (*p2 < n);
2539
2540   if (*p1 < *p2)
2541   {
2542     a = *p1;
2543     b = *p2;
2544   }
2545   else
2546   {
2547     a = *p2;
2548     b = *p1;
2549   }
2550
2551   /* For uniformly random *p1, *p2,
2552      this condition is true with 50% chance */
2553   if (((b - a) + n) % n <= n / 2)
2554   {
2555     *p1 = a;
2556     *p2 = b;
2557   }
2558   else
2559   {
2560     *p1 = b;
2561     *p2 = a;
2562   }
2563 }
2564
2565
2566 /**
2567  * Record @a dep as a dependency of @a step.
2568  */
2569 static void
2570 step_depend_on (struct Step *step, struct Step *dep)
2571 {
2572   /* We're not checking for cyclic dependencies,
2573      but this is a cheap sanity check. */
2574   GNUNET_assert (step != dep);
2575   GNUNET_assert (NULL != step);
2576   GNUNET_assert (NULL != dep);
2577   GNUNET_assert (dep->round <= step->round);
2578
2579 #ifdef GNUNET_EXTRA_LOGGING
2580   /* Make sure we have complete debugging information.
2581      Also checks that we don't screw up too badly
2582      constructing the task graph. */
2583   GNUNET_assert (NULL != step->debug_name);
2584   GNUNET_assert (NULL != dep->debug_name);
2585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2586               "Making step `%s' depend on `%s'\n",
2587               step->debug_name,
2588               dep->debug_name);
2589 #endif
2590
2591   if (dep->subordinates_cap == dep->subordinates_len)
2592   {
2593     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2594     GNUNET_array_grow (dep->subordinates,
2595                        dep->subordinates_cap,
2596                        target_size);
2597   }
2598
2599   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2600
2601   dep->subordinates[dep->subordinates_len] = step;
2602   dep->subordinates_len++;
2603
2604   step->pending_prereq++;
2605 }
2606
2607
2608 static struct Step *
2609 create_step (struct ConsensusSession *session, int round, int early_finishable)
2610 {
2611   struct Step *step;
2612   step = GNUNET_new (struct Step);
2613   step->session = session;
2614   step->round = round;
2615   step->early_finishable = early_finishable;
2616   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2617                                     session->steps_tail,
2618                                     step);
2619   return step;
2620 }
2621
2622
2623 /**
2624  * Construct the task graph for a single
2625  * gradecast.
2626  */
2627 static void
2628 construct_task_graph_gradecast (struct ConsensusSession *session,
2629                                 uint16_t rep,
2630                                 uint16_t lead,
2631                                 struct Step *step_before,
2632                                 struct Step *step_after)
2633 {
2634   uint16_t n = session->num_peers;
2635   uint16_t me = session->local_peer_idx;
2636
2637   uint16_t p1;
2638   uint16_t p2;
2639
2640   /* The task we're currently setting up. */
2641   struct TaskEntry task;
2642
2643   struct Step *step;
2644   struct Step *prev_step;
2645
2646   uint16_t round;
2647
2648   unsigned int k;
2649
2650   round = step_before->round + 1;
2651
2652   /* gcast step 1: leader disseminates */
2653
2654   step = create_step (session, round, GNUNET_YES);
2655
2656 #ifdef GNUNET_EXTRA_LOGGING
2657   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2658 #endif
2659   step_depend_on (step, step_before);
2660
2661   if (lead == me)
2662   {
2663     for (k = 0; k < n; k++)
2664     {
2665       if (k == me)
2666         continue;
2667       p1 = me;
2668       p2 = k;
2669       arrange_peers (&p1, &p2, n);
2670       task = ((struct TaskEntry) {
2671         .step = step,
2672         .start = task_start_reconcile,
2673         .cancel = task_cancel_reconcile,
2674         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2675       });
2676       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2677       put_task (session->taskmap, &task);
2678     }
2679     /* We run this task to make sure that the leader
2680        has the stored the SET_KIND_LEADER set of himself,
2681        so he can participate in the rest of the gradecast
2682        without the code having to handle any special cases. */
2683     task = ((struct TaskEntry) {
2684       .step = step,
2685       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2686       .start = task_start_reconcile,
2687       .cancel = task_cancel_reconcile,
2688     });
2689     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2690     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2691     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2692     put_task (session->taskmap, &task);
2693   }
2694   else
2695   {
2696     p1 = me;
2697     p2 = lead;
2698     arrange_peers (&p1, &p2, n);
2699     task = ((struct TaskEntry) {
2700       .step = step,
2701       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2702       .start = task_start_reconcile,
2703       .cancel = task_cancel_reconcile,
2704     });
2705     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2706     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2707     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2708     put_task (session->taskmap, &task);
2709   }
2710
2711   /* gcast phase 2: echo */
2712   prev_step = step;
2713   round += 1;
2714   step = create_step (session, round, GNUNET_YES);
2715 #ifdef GNUNET_EXTRA_LOGGING
2716   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2717 #endif
2718   step_depend_on (step, prev_step);
2719
2720   for (k = 0; k < n; k++)
2721   {
2722     p1 = k;
2723     p2 = me;
2724     arrange_peers (&p1, &p2, n);
2725     task = ((struct TaskEntry) {
2726       .step = step,
2727       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2728       .start = task_start_reconcile,
2729       .cancel = task_cancel_reconcile,
2730     });
2731     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2732     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2733     put_task (session->taskmap, &task);
2734   }
2735
2736   prev_step = step;
2737   /* Same round, since step only has local tasks */
2738   step = create_step (session, round, GNUNET_YES);
2739 #ifdef GNUNET_EXTRA_LOGGING
2740   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2741 #endif
2742   step_depend_on (step, prev_step);
2743
2744   arrange_peers (&p1, &p2, n);
2745   task = ((struct TaskEntry) {
2746     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2747     .step = step,
2748     .start = task_start_eval_echo
2749   });
2750   put_task (session->taskmap, &task);
2751
2752   prev_step = step;
2753   round += 1;
2754   step = create_step (session, round, GNUNET_YES);
2755 #ifdef GNUNET_EXTRA_LOGGING
2756   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2757 #endif
2758   step_depend_on (step, prev_step);
2759
2760   /* gcast phase 3: confirmation and grading */
2761   for (k = 0; k < n; k++)
2762   {
2763     p1 = k;
2764     p2 = me;
2765     arrange_peers (&p1, &p2, n);
2766     task = ((struct TaskEntry) {
2767       .step = step,
2768       .start = task_start_reconcile,
2769       .cancel = task_cancel_reconcile,
2770       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2771     });
2772     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2773     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2774     /* If there was at least one element in the echo round that was
2775        contested (i.e. it had no n-t majority), then we let the other peers
2776        know, and other peers let us know.  The contested flag for each peer is
2777        stored in the rfn. */
2778     task.cls.setop.transceive_contested = GNUNET_YES;
2779     put_task (session->taskmap, &task);
2780   }
2781
2782   prev_step = step;
2783   /* Same round, since step only has local tasks */
2784   step = create_step (session, round, GNUNET_YES);
2785 #ifdef GNUNET_EXTRA_LOGGING
2786   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2787 #endif
2788   step_depend_on (step, prev_step);
2789
2790   task = ((struct TaskEntry) {
2791     .step = step,
2792     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2793     .start = task_start_grade,
2794   });
2795   put_task (session->taskmap, &task);
2796
2797   step_depend_on (step_after, step);
2798 }
2799
2800
2801 static void
2802 construct_task_graph (struct ConsensusSession *session)
2803 {
2804   uint16_t n = session->num_peers;
2805   uint16_t t = n / 3;
2806
2807   uint16_t me = session->local_peer_idx;
2808
2809   /* The task we're currently setting up. */
2810   struct TaskEntry task;
2811
2812   /* Current leader */
2813   unsigned int lead;
2814
2815   struct Step *step;
2816   struct Step *prev_step;
2817
2818   unsigned int round = 0;
2819
2820   unsigned int i;
2821
2822   // XXX: introduce first step,
2823   // where we wait for all insert acks
2824   // from the set service
2825
2826   /* faster but brittle all-to-all */
2827
2828   // XXX: Not implemented yet
2829
2830   /* all-to-all step */
2831
2832   step = create_step (session, round, GNUNET_NO);
2833
2834 #ifdef GNUNET_EXTRA_LOGGING
2835   step->debug_name = GNUNET_strdup ("all to all");
2836 #endif
2837
2838   for (i = 0; i < n; i++)
2839   {
2840     uint16_t p1;
2841     uint16_t p2;
2842
2843     p1 = me;
2844     p2 = i;
2845     arrange_peers (&p1, &p2, n);
2846     task = ((struct TaskEntry) {
2847       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2848       .step = step,
2849       .start = task_start_reconcile,
2850       .cancel = task_cancel_reconcile,
2851     });
2852     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2853     task.cls.setop.output_set = task.cls.setop.input_set;
2854     task.cls.setop.do_not_remove = GNUNET_YES;
2855     put_task (session->taskmap, &task);
2856   }
2857
2858   prev_step = step;
2859   step = NULL;
2860
2861   round += 1;
2862
2863   /* Byzantine union */
2864
2865   /* sequential repetitions of the gradecasts */
2866   for (i = 0; i < t + 1; i++)
2867   {
2868     struct Step *step_rep_start;
2869     struct Step *step_rep_end;
2870
2871     /* Every repetition is in a separate round. */
2872     step_rep_start = create_step (session, round, GNUNET_YES);
2873 #ifdef GNUNET_EXTRA_LOGGING
2874     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2875 #endif
2876
2877     step_depend_on (step_rep_start, prev_step);
2878
2879     /* gradecast has three rounds */
2880     round += 3;
2881     step_rep_end = create_step (session, round, GNUNET_YES);
2882 #ifdef GNUNET_EXTRA_LOGGING
2883     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2884 #endif
2885
2886     /* parallel gradecasts */
2887     for (lead = 0; lead < n; lead++)
2888       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2889
2890     task = ((struct TaskEntry) {
2891       .step = step_rep_end,
2892       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2893       .start = task_start_apply_round,
2894     });
2895     put_task (session->taskmap, &task);
2896
2897     prev_step = step_rep_end;
2898   }
2899
2900  /* There is no next gradecast round, thus the final
2901     start step is the overall end step of the gradecasts */
2902   round += 1;
2903   step = create_step (session, round, GNUNET_NO);
2904 #ifdef GNUNET_EXTRA_LOGGING
2905   GNUNET_asprintf (&step->debug_name, "finish");
2906 #endif
2907   step_depend_on (step, prev_step);
2908
2909   task = ((struct TaskEntry) {
2910     .step = step,
2911     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2912     .start = task_start_finish,
2913   });
2914   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2915
2916   put_task (session->taskmap, &task);
2917 }
2918
2919
2920 /**
2921  * Initialize the session, continue receiving messages from the owning client
2922  *
2923  * @param session the session to initialize
2924  * @param join_msg the join message from the client
2925  */
2926 static void
2927 initialize_session (struct ConsensusSession *session,
2928                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2929 {
2930   struct ConsensusSession *other_session;
2931
2932   initialize_session_peer_list (session, join_msg);
2933   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
2934   compute_global_id (session, &join_msg->session_id);
2935
2936   /* Check if some local client already owns the session.
2937      It is only legal to have a session with an existing global id
2938      if all other sessions with this global id are finished.*/
2939   other_session = sessions_head;
2940   while (NULL != other_session)
2941   {
2942     if ((other_session != session) &&
2943         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2944     {
2945       //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2946       //{
2947       //  GNUNET_break (0);
2948       //  destroy_session (session);
2949       //  return;
2950       //}
2951       break;
2952     }
2953     other_session = other_session->next;
2954   }
2955
2956   session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
2957   session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
2958
2959   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %llums created\n",
2960               (long long) (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
2961
2962   session->local_peer_idx = get_peer_idx (&my_peer, session);
2963   GNUNET_assert (-1 != session->local_peer_idx);
2964   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2965                                              &session->global_id,
2966                                              set_listen_cb, session);
2967   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2968
2969   session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2970   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2971   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2972   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2973
2974   {
2975     struct SetEntry *client_set;
2976     client_set = GNUNET_new (struct SetEntry);
2977     client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2978     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2979     put_set (session, client_set);
2980   }
2981
2982   session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2983
2984   /* Just construct the task graph,
2985      but don't run anything until the client calls conclude. */
2986   construct_task_graph (session);
2987
2988   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2989 }
2990
2991
2992 static struct ConsensusSession *
2993 get_session_by_client (struct GNUNET_SERVER_Client *client)
2994 {
2995   struct ConsensusSession *session;
2996
2997   session = sessions_head;
2998   while (NULL != session)
2999   {
3000     if (session->client == client)
3001       return session;
3002     session = session->next;
3003   }
3004   return NULL;
3005 }
3006
3007
3008 /**
3009  * Called when a client wants to join a consensus session.
3010  *
3011  * @param cls unused
3012  * @param client client that sent the message
3013  * @param m message sent by the client
3014  */
3015 static void
3016 client_join (void *cls,
3017              struct GNUNET_SERVER_Client *client,
3018              const struct GNUNET_MessageHeader *m)
3019 {
3020   struct ConsensusSession *session;
3021
3022   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
3023
3024   session = get_session_by_client (client);
3025   if (NULL != session)
3026   {
3027     GNUNET_break (0);
3028     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
3029     return;
3030   }
3031   session = GNUNET_new (struct ConsensusSession);
3032   session->client = client;
3033   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
3034   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
3035   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
3036   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3037
3038   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
3039 }
3040
3041
3042 static void
3043 client_insert_done (void *cls)
3044 {
3045   // FIXME: implement
3046 }
3047
3048
3049 /**
3050  * Called when a client performs an insert operation.
3051  *
3052  * @param cls (unused)
3053  * @param client client handle
3054  * @param m message sent by the client
3055  */
3056 void
3057 client_insert (void *cls,
3058                struct GNUNET_SERVER_Client *client,
3059                const struct GNUNET_MessageHeader *m)
3060 {
3061   struct ConsensusSession *session;
3062   struct GNUNET_CONSENSUS_ElementMessage *msg;
3063   struct GNUNET_SET_Element *element;
3064   ssize_t element_size;
3065   struct GNUNET_SET_Handle *initial_set;
3066
3067   session = get_session_by_client (client);
3068
3069   if (NULL == session)
3070   {
3071     GNUNET_break (0);
3072     GNUNET_SERVER_client_disconnect (client);
3073     return;
3074   }
3075
3076   if (GNUNET_YES == session->conclude_started)
3077   {
3078     GNUNET_break (0);
3079     GNUNET_SERVER_client_disconnect (client);
3080     return;
3081   }
3082
3083   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
3084   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3085   if (element_size < 0)
3086   {
3087     GNUNET_break (0);
3088     return;
3089   }
3090
3091   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
3092   element->element_type = msg->element_type;
3093   element->size = element_size;
3094   memcpy (&element[1], &msg[1], element_size);
3095   element->data = &element[1];
3096   {
3097     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3098     struct SetEntry *entry;
3099     entry = lookup_set (session, &key);
3100     GNUNET_assert (NULL != entry);
3101     initial_set = entry->h;
3102   }
3103   session->num_client_insert_pending++;
3104   GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
3105
3106 #ifdef GNUNET_EXTRA_LOGGING
3107   {
3108     struct GNUNET_HashCode hash;
3109
3110     GNUNET_SET_element_hash (element, &hash);
3111
3112     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
3113                 session->local_peer_idx,
3114                 GNUNET_h2s (&hash));
3115   }
3116 #endif
3117
3118   GNUNET_free (element);
3119   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3120 }
3121
3122
3123 /**
3124  * Called when a client performs the conclude operation.
3125  *
3126  * @param cls (unused)
3127  * @param client client handle
3128  * @param message message sent by the client
3129  */
3130 static void
3131 client_conclude (void *cls,
3132                  struct GNUNET_SERVER_Client *client,
3133                  const struct GNUNET_MessageHeader *message)
3134 {
3135   struct ConsensusSession *session;
3136
3137   session = get_session_by_client (client);
3138   if (NULL == session)
3139   {
3140     /* client not found */
3141     GNUNET_break (0);
3142     GNUNET_SERVER_client_disconnect (client);
3143     return;
3144   }
3145
3146   if (GNUNET_YES == session->conclude_started)
3147   {
3148     /* conclude started twice */
3149     GNUNET_break (0);
3150     GNUNET_SERVER_client_disconnect (client);
3151     destroy_session (session);
3152     return;
3153   }
3154
3155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
3156
3157   session->conclude_started = GNUNET_YES;
3158
3159   install_step_timeouts (session);
3160   run_ready_steps (session);
3161
3162
3163   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3164 }
3165
3166
3167 /**
3168  * Called to clean up, after a shutdown has been requested.
3169  *
3170  * @param cls closure
3171  */
3172 static void
3173 shutdown_task (void *cls)
3174 {
3175   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n");
3176   while (NULL != sessions_head)
3177     destroy_session (sessions_head);
3178
3179   GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
3180 }
3181
3182
3183 /**
3184  * Clean up after a client after it is
3185  * disconnected (either by us or by itself)
3186  *
3187  * @param cls closure, unused
3188  * @param client the client to clean up after
3189  */
3190 void
3191 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
3192 {
3193   struct ConsensusSession *session;
3194
3195   session = get_session_by_client (client);
3196   if (NULL == session)
3197     return;
3198   // FIXME: destroy if we can
3199 }
3200
3201
3202
3203 /**
3204  * Start processing consensus requests.
3205  *
3206  * @param cls closure
3207  * @param server the initialized server
3208  * @param c configuration to use
3209  */
3210 static void
3211 run (void *cls, struct GNUNET_SERVER_Handle *server,
3212      const struct GNUNET_CONFIGURATION_Handle *c)
3213 {
3214   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
3215     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3216         sizeof (struct GNUNET_MessageHeader)},
3217     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
3218     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
3219     {NULL, NULL, 0, 0}
3220   };
3221
3222   cfg = c;
3223   srv = server;
3224   if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
3225   {
3226     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
3227     GNUNET_break (0);
3228     GNUNET_SCHEDULER_shutdown ();
3229     return;
3230   }
3231   statistics = GNUNET_STATISTICS_create ("consensus", cfg);
3232   GNUNET_SERVER_add_handlers (server, server_handlers);
3233   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
3234   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
3235   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
3236 }
3237
3238
3239 /**
3240  * The main function for the consensus service.
3241  *
3242  * @param argc number of arguments from the command line
3243  * @param argv command line arguments
3244  * @return 0 ok, 1 on error
3245  */
3246 int
3247 main (int argc, char *const *argv)
3248 {
3249   int ret;
3250   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
3251   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
3252   return (GNUNET_OK == ret) ? 0 : 1;
3253 }