e8385a6bb93dc28988591718d489063252bb6fa2
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
36
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
38
39
40 enum ReferendumVote
41 {
42   /**
43    * Vote that nothing should change.
44    * This option is never voted explicitly.
45    */
46   VOTE_STAY = 0,
47   /**
48    * Vote that an element should be added.
49    */
50   VOTE_ADD = 1,
51   /**
52    * Vote that an element should be removed.
53    */
54   VOTE_REMOVE = 2,
55 };
56
57
58 enum EarlyStoppingPhase
59 {
60   EARLY_STOPPING_NONE = 0,
61   EARLY_STOPPING_ONE_MORE = 1,
62   EARLY_STOPPING_DONE = 2,
63 };
64
65
66 GNUNET_NETWORK_STRUCT_BEGIN
67
68
69 struct ContestedPayload
70 {
71 };
72
73 /**
74  * Tuple of integers that together
75  * identify a task uniquely.
76  */
77 struct TaskKey {
78   /**
79    * A value from 'enum PhaseKind'.
80    */
81   uint16_t kind GNUNET_PACKED;
82
83   /**
84    * Number of the first peer
85    * in canonical order.
86    */
87   int16_t peer1 GNUNET_PACKED;
88
89   /**
90    * Number of the second peer in canonical order.
91    */
92   int16_t peer2 GNUNET_PACKED;
93
94   /**
95    * Repetition of the gradecast phase.
96    */
97   int16_t repetition GNUNET_PACKED;
98
99   /**
100    * Leader in the gradecast phase.
101    *
102    * Can be different from both peer1 and peer2.
103    */
104   int16_t leader GNUNET_PACKED;
105 };
106
107
108
109 struct SetKey
110 {
111   int set_kind GNUNET_PACKED;
112   int k1 GNUNET_PACKED;
113   int k2 GNUNET_PACKED;
114 };
115
116
117 struct SetEntry
118 {
119   struct SetKey key;
120   struct GNUNET_SET_Handle *h;
121   /**
122    * GNUNET_YES if the set resulted
123    * from applying a referendum with contested
124    * elements.
125    */
126   int is_contested;
127 };
128
129
130 struct DiffKey
131 {
132   int diff_kind GNUNET_PACKED;
133   int k1 GNUNET_PACKED;
134   int k2 GNUNET_PACKED;
135 };
136
137 struct RfnKey
138 {
139   int rfn_kind GNUNET_PACKED;
140   int k1 GNUNET_PACKED;
141   int k2 GNUNET_PACKED;
142 };
143
144
145 GNUNET_NETWORK_STRUCT_END
146
147 enum PhaseKind
148 {
149   PHASE_KIND_ALL_TO_ALL,
150   PHASE_KIND_GRADECAST_LEADER,
151   PHASE_KIND_GRADECAST_ECHO,
152   PHASE_KIND_GRADECAST_ECHO_GRADE,
153   PHASE_KIND_GRADECAST_CONFIRM,
154   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
155   /**
156    * Apply a repetition of the all-to-all
157    * gradecast to the current set.
158    */
159   PHASE_KIND_APPLY_REP,
160   PHASE_KIND_FINISH,
161 };
162
163
164 enum SetKind
165 {
166   SET_KIND_NONE = 0,
167   SET_KIND_CURRENT,
168   /**
169    * Last result set from a gradecast
170    */
171   SET_KIND_LAST_GRADECAST,
172   SET_KIND_LEADER_PROPOSAL,
173   SET_KIND_ECHO_RESULT,
174 };
175
176 enum DiffKind
177 {
178   DIFF_KIND_NONE = 0,
179   DIFF_KIND_LEADER_PROPOSAL,
180   DIFF_KIND_LEADER_CONSENSUS,
181   DIFF_KIND_GRADECAST_RESULT,
182 };
183
184 enum RfnKind
185 {
186   RFN_KIND_NONE = 0,
187   RFN_KIND_ECHO,
188   RFN_KIND_CONFIRM,
189   RFN_KIND_GRADECAST_RESULT
190 };
191
192
193 struct SetOpCls
194 {
195   struct SetKey input_set;
196
197   struct SetKey output_set;
198   struct RfnKey output_rfn;
199   struct DiffKey output_diff;
200
201   int do_not_remove;
202
203   int transceive_contested;
204
205   struct GNUNET_SET_OperationHandle *op;
206 };
207
208
209 struct FinishCls
210 {
211   struct SetKey input_set;
212 };
213
214 /**
215  * Closure for both @a start_task
216  * and @a cancel_task.
217  */
218 union TaskFuncCls
219 {
220   struct SetOpCls setop;
221   struct FinishCls finish;
222 };
223
224 struct TaskEntry;
225
226 typedef void (*TaskFunc) (struct TaskEntry *task);
227
228 /*
229  * Node in the consensus task graph.
230  */
231 struct TaskEntry
232 {
233   struct TaskKey key;
234
235   struct Step *step;
236
237   int is_started;
238
239   int is_finished;
240
241   TaskFunc start;
242   TaskFunc cancel;
243
244   union TaskFuncCls cls;
245 };
246
247
248 struct Step
249 {
250   /**
251    * All steps of one session are in a
252    * linked list for easier deallocation.
253    */
254   struct Step *prev;
255
256   /**
257    * All steps of one session are in a
258    * linked list for easier deallocation.
259    */
260   struct Step *next;
261
262   struct ConsensusSession *session;
263
264   /**
265    * Tasks that this step is composed of.
266    */
267   struct TaskEntry **tasks;
268   unsigned int tasks_len;
269   unsigned int tasks_cap;
270
271   unsigned int finished_tasks;
272
273   /*
274    * Tasks that have this task as dependency.
275    *
276    * We store pointers to subordinates rather
277    * than to prerequisites since it makes
278    * tracking the readiness of a task easier.
279    */
280   struct Step **subordinates;
281   unsigned int subordinates_len;
282   unsigned int subordinates_cap;
283
284   /**
285    * Counter for the prerequisites of
286    * this step.
287    */
288   size_t pending_prereq;
289
290   /*
291    * Task that will run this step despite
292    * any pending prerequisites.
293    */
294   struct GNUNET_SCHEDULER_Task *timeout_task;
295
296   unsigned int is_running;
297
298   unsigned int is_finished;
299
300   /*
301    * Synchrony round of the task.
302    * Determines the deadline for the task.
303    */
304   unsigned int round;
305
306   /**
307    * Human-readable name for
308    * the task, used for debugging.
309    */
310   char *debug_name;
311
312   /**
313    * When we're doing an early finish, how should this step be
314    * treated?
315    * If GNUNET_YES, the step will be marked as finished
316    * without actually running its tasks.
317    * Otherwise, the step will still be run even after
318    * an early finish.
319    *
320    * Note that a task may never be finished early if
321    * it is already running.
322    */
323   int early_finishable;
324 };
325
326
327 struct RfnElementInfo
328 {
329   const struct GNUNET_SET_Element *element;
330
331   /*
332    * GNUNET_YES if the peer votes for the proposal.
333    */
334   int *votes;
335
336   /**
337    * Proposal for this element,
338    * can only be VOTE_ADD or VOTE_REMOVE.
339    */
340   enum ReferendumVote proposal;
341 };
342
343
344 struct ReferendumEntry
345 {
346   struct RfnKey key;
347
348   /*
349    * Elements where there is at least one proposed change.
350    *
351    * Maps the hash of the GNUNET_SET_Element
352    * to 'struct RfnElementInfo'.
353    */
354   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
355
356   unsigned int num_peers;
357
358   /**
359    * Stores, for every peer in the session,
360    * whether the peer finished the whole referendum.
361    *
362    * Votes from peers are only counted if they're
363    * marked as commited (#GNUNET_YES) in the referendum.
364    *
365    * Otherwise (#GNUNET_NO), the requested changes are
366    * not counted for majority votes or thresholds.
367    */
368   int *peer_commited;
369
370
371   /**
372    * Contestation state of the peer.  If a peer is contested, the values it
373    * contributed are still counted for applying changes, but the grading is
374    * affected.
375    */
376   int *peer_contested;
377 };
378
379
380 struct DiffElementInfo
381 {
382   const struct GNUNET_SET_Element *element;
383
384   /**
385    * Positive weight for 'add', negative
386    * weights for 'remove'.
387    */
388   int weight;
389 };
390
391
392 /**
393  * Weighted diff.
394  */
395 struct DiffEntry
396 {
397   struct DiffKey key;
398   struct GNUNET_CONTAINER_MultiHashMap *changes;
399 };
400
401
402
403 /**
404  * A consensus session consists of one local client and the remote authorities.
405  */
406 struct ConsensusSession
407 {
408   /**
409    * Consensus sessions are kept in a DLL.
410    */
411   struct ConsensusSession *next;
412
413   /**
414    * Consensus sessions are kept in a DLL.
415    */
416   struct ConsensusSession *prev;
417
418   unsigned int num_client_insert_pending;
419
420   struct GNUNET_CONTAINER_MultiHashMap *setmap;
421   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
422   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
423
424   /**
425    * Array of peers with length 'num_peers'.
426    */
427   int *peers_blacklisted;
428
429   /*
430    * Mapping from (hashed) TaskKey to TaskEntry.
431    *
432    * We map the application_id for a round to the task that should be
433    * executed, so we don't have to go through all task whenever we get
434    * an incoming set op request.
435    */
436   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
437
438   struct Step *steps_head;
439   struct Step *steps_tail;
440
441   int conclude_started;
442
443   int conclude_done;
444
445   /**
446   * Global consensus identification, computed
447   * from the session id and participating authorities.
448   */
449   struct GNUNET_HashCode global_id;
450
451   /**
452    * Client that inhabits the session
453    */
454   struct GNUNET_SERVER_Client *client;
455
456   /**
457    * Queued messages to the client.
458    */
459   struct GNUNET_MQ_Handle *client_mq;
460
461   /**
462    * Time when the conclusion of the consensus should begin.
463    */
464   struct GNUNET_TIME_Absolute conclude_start;
465
466   /**
467    * Timeout for all rounds together, single rounds will schedule a timeout task
468    * with a fraction of the conclude timeout.
469    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
470    */
471   struct GNUNET_TIME_Absolute conclude_deadline;
472
473   struct GNUNET_PeerIdentity *peers;
474
475   /**
476    * Number of other peers in the consensus.
477    */
478   unsigned int num_peers;
479
480   /**
481    * Index of the local peer in the peers array
482    */
483   unsigned int local_peer_idx;
484
485   /**
486    * Listener for requests from other peers.
487    * Uses the session's global id as app id.
488    */
489   struct GNUNET_SET_ListenHandle *set_listener;
490
491   /**
492    * State of our early stopping scheme.
493    */
494   int early_stopping;
495 };
496
497 /**
498  * Linked list of sessions this peer participates in.
499  */
500 static struct ConsensusSession *sessions_head;
501
502 /**
503  * Linked list of sessions this peer participates in.
504  */
505 static struct ConsensusSession *sessions_tail;
506
507 /**
508  * Configuration of the consensus service.
509  */
510 static const struct GNUNET_CONFIGURATION_Handle *cfg;
511
512 /**
513  * Handle to the server for this service.
514  */
515 static struct GNUNET_SERVER_Handle *srv;
516
517 /**
518  * Peer that runs this service.
519  */
520 static struct GNUNET_PeerIdentity my_peer;
521
522 /**
523  * Statistics handle.
524  */
525 struct GNUNET_STATISTICS_Handle *statistics;
526
527
528 static void
529 finish_task (struct TaskEntry *task);
530
531 static void
532 run_ready_steps (struct ConsensusSession *session);
533
534 static const char *
535 phasename (uint16_t phase)
536 {
537   switch (phase)
538   {
539     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
540     case PHASE_KIND_FINISH: return "FINISH";
541     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
542     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
543     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
544     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
545     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
546     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
547     default: return "(unknown)";
548   }
549 }
550
551
552 static const char *
553 setname (uint16_t kind)
554 {
555   switch (kind)
556   {
557     case SET_KIND_CURRENT: return "CURRENT";
558     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
559     case SET_KIND_NONE: return "NONE";
560     default: return "(unknown)";
561   }
562 }
563
564 static const char *
565 rfnname (uint16_t kind)
566 {
567   switch (kind)
568   {
569     case RFN_KIND_NONE: return "NONE";
570     case RFN_KIND_ECHO: return "ECHO";
571     case RFN_KIND_CONFIRM: return "CONFIRM";
572     default: return "(unknown)";
573   }
574 }
575
576 static const char *
577 diffname (uint16_t kind)
578 {
579   switch (kind)
580   {
581     case DIFF_KIND_NONE: return "NONE";
582     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
583     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
584     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
585     default: return "(unknown)";
586   }
587 }
588
589 #ifdef GNUNET_EXTRA_LOGGING
590
591
592 static const char *
593 debug_str_element (const struct GNUNET_SET_Element *el)
594 {
595   struct GNUNET_HashCode hash;
596
597   GNUNET_SET_element_hash (el, &hash);
598
599   return GNUNET_h2s (&hash);
600 }
601
602 static const char *
603 debug_str_task_key (struct TaskKey *tk)
604 {
605   static char buf[256];
606
607   snprintf (buf, sizeof (buf),
608             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
609             phasename (tk->kind), tk->peer1, tk->peer2,
610             tk->leader, tk->repetition);
611
612   return buf;
613 }
614
615 static const char *
616 debug_str_diff_key (struct DiffKey *dk)
617 {
618   static char buf[256];
619
620   snprintf (buf, sizeof (buf),
621             "DiffKey kind=%s, k1=%d, k2=%d",
622             diffname (dk->diff_kind), dk->k1, dk->k2);
623
624   return buf;
625 }
626
627 static const char *
628 debug_str_set_key (const struct SetKey *sk)
629 {
630   static char buf[256];
631
632   snprintf (buf, sizeof (buf),
633             "SetKey kind=%s, k1=%d, k2=%d",
634             setname (sk->set_kind), sk->k1, sk->k2);
635
636   return buf;
637 }
638
639
640 static const char *
641 debug_str_rfn_key (const struct RfnKey *rk)
642 {
643   static char buf[256];
644
645   snprintf (buf, sizeof (buf),
646             "RfnKey kind=%s, k1=%d, k2=%d",
647             rfnname (rk->rfn_kind), rk->k1, rk->k2);
648
649   return buf;
650 }
651
652 #endif /* GNUNET_EXTRA_LOGGING */
653
654
655 /**
656  * Destroy a session, free all resources associated with it.
657  *
658  * @param session the session to destroy
659  */
660 static void
661 destroy_session (struct ConsensusSession *session)
662 {
663   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
664   if (NULL != session->set_listener)
665   {
666     GNUNET_SET_listen_cancel (session->set_listener);
667     session->set_listener = NULL;
668   }
669   if (NULL != session->client_mq)
670   {
671     GNUNET_MQ_destroy (session->client_mq);
672     session->client_mq = NULL;
673     /* The MQ cleanup will also disconnect the underlying client. */
674     session->client = NULL;
675   }
676   if (NULL != session->client)
677   {
678     GNUNET_SERVER_client_disconnect (session->client);
679     session->client = NULL;
680   }
681   GNUNET_free (session);
682 }
683
684
685 /**
686  * Send the final result set of the consensus to the client, element by
687  * element.
688  *
689  * @param cls closure
690  * @param element the current element, NULL if all elements have been
691  *        iterated over
692  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
693  */
694 static int
695 send_to_client_iter (void *cls,
696                      const struct GNUNET_SET_Element *element)
697 {
698   struct TaskEntry *task = (struct TaskEntry *) cls;
699   struct ConsensusSession *session = task->step->session;
700   struct GNUNET_MQ_Envelope *ev;
701
702   if (NULL != element)
703   {
704     struct GNUNET_CONSENSUS_ElementMessage *m;
705
706     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707                 "P%d: sending element %s to client\n",
708                 session->local_peer_idx,
709                 debug_str_element (element));
710
711     ev = GNUNET_MQ_msg_extra (m, element->size,
712                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
713     m->element_type = htons (element->element_type);
714     memcpy (&m[1], element->data, element->size);
715     GNUNET_MQ_send (session->client_mq, ev);
716   }
717   else
718   {
719     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720                 "P%d: finished iterating elements for client\n",
721                 session->local_peer_idx);
722     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
723     GNUNET_MQ_send (session->client_mq, ev);
724   }
725   return GNUNET_YES;
726 }
727
728
729 static struct SetEntry *
730 lookup_set (struct ConsensusSession *session, struct SetKey *key)
731 {
732   struct GNUNET_HashCode hash;
733
734   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735               "P%u: looking up set {%s}\n",
736               session->local_peer_idx,
737               debug_str_set_key (key));
738
739   GNUNET_assert (SET_KIND_NONE != key->set_kind);
740   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
741   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
742 }
743
744
745 static struct DiffEntry *
746 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
747 {
748   struct GNUNET_HashCode hash;
749
750   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751               "P%u: looking up diff {%s}\n",
752               session->local_peer_idx,
753               debug_str_diff_key (key));
754
755   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
756   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
757   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
758 }
759
760
761 static struct ReferendumEntry *
762 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
763 {
764   struct GNUNET_HashCode hash;
765
766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767               "P%u: looking up rfn {%s}\n",
768               session->local_peer_idx,
769               debug_str_rfn_key (key));
770
771   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
772   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
773   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
774 }
775
776
777 static void
778 diff_insert (struct DiffEntry *diff,
779              int weight,
780              const struct GNUNET_SET_Element *element)
781 {
782   struct DiffElementInfo *di;
783   struct GNUNET_HashCode hash;
784
785   GNUNET_assert ( (1 == weight) || (-1 == weight));
786
787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788               "diff_insert with element size %u\n",
789               element->size);
790
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "hashing element\n");
793
794   GNUNET_SET_element_hash (element, &hash);
795
796   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
797               "hashed element\n");
798
799   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
800
801   if (NULL == di)
802   {
803     di = GNUNET_new (struct DiffElementInfo);
804     di->element = GNUNET_SET_element_dup (element);
805     GNUNET_assert (GNUNET_OK ==
806                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
807                                                       &hash, di,
808                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
809   }
810
811   di->weight = weight;
812 }
813
814
815 static void
816 rfn_commit (struct ReferendumEntry *rfn,
817             uint16_t commit_peer)
818 {
819   GNUNET_assert (commit_peer < rfn->num_peers);
820
821   rfn->peer_commited[commit_peer] = GNUNET_YES;
822 }
823
824
825 static void
826 rfn_contest (struct ReferendumEntry *rfn,
827              uint16_t contested_peer)
828 {
829   GNUNET_assert (contested_peer < rfn->num_peers);
830
831   rfn->peer_contested[contested_peer] = GNUNET_YES;
832 }
833
834
835 static uint16_t
836 rfn_noncontested (struct ReferendumEntry *rfn)
837 {
838   uint16_t i;
839   uint16_t ret;
840
841   ret = 0;
842   for (i = 0; i < rfn->num_peers; i++)
843     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
844       ret++;
845
846   return ret;
847 }
848
849
850 static void
851 rfn_vote (struct ReferendumEntry *rfn,
852           uint16_t voting_peer,
853           enum ReferendumVote vote,
854           const struct GNUNET_SET_Element *element)
855 {
856   struct RfnElementInfo *ri;
857   struct GNUNET_HashCode hash;
858
859   GNUNET_assert (voting_peer < rfn->num_peers);
860
861   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
862      since VOTE_KEEP is implicit in not voting. */
863   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
864
865   GNUNET_SET_element_hash (element, &hash);
866   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
867
868   if (NULL == ri)
869   {
870     ri = GNUNET_new (struct RfnElementInfo);
871     ri->element = GNUNET_SET_element_dup (element);
872     ri->votes = GNUNET_new_array (rfn->num_peers, int);
873     GNUNET_assert (GNUNET_OK ==
874                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
875                                                       &hash, ri,
876                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
877   }
878
879   ri->votes[voting_peer] = GNUNET_YES;
880   ri->proposal = vote;
881 }
882
883
884 uint16_t
885 task_other_peer (struct TaskEntry *task)
886 {
887   uint16_t me = task->step->session->local_peer_idx;
888   if (task->key.peer1 == me)
889     return task->key.peer2;
890   return task->key.peer1;
891 }
892
893
894 /**
895  * Callback for set operation results. Called for each element
896  * in the result set.
897  *
898  * @param cls closure
899  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
900  * @param status see enum GNUNET_SET_Status
901  */
902 static void
903 set_result_cb (void *cls,
904                const struct GNUNET_SET_Element *element,
905                enum GNUNET_SET_Status status)
906 {
907   struct TaskEntry *task = cls;
908   struct ConsensusSession *session = task->step->session;
909   struct SetEntry *output_set = NULL;
910   struct DiffEntry *output_diff = NULL;
911   struct ReferendumEntry *output_rfn = NULL;
912   unsigned int other_idx;
913   struct SetOpCls *setop;
914
915   setop = &task->cls.setop;
916
917
918   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919               "P%u: got set result for {%s}, status %u\n",
920               session->local_peer_idx,
921               debug_str_task_key (&task->key),
922               status);
923
924   if (GNUNET_NO == task->is_started)
925   {
926     GNUNET_break_op (0);
927     return;
928   }
929
930   if (GNUNET_YES == task->is_finished)
931   {
932     GNUNET_break_op (0);
933     return;
934   }
935
936   other_idx = task_other_peer (task);
937
938   if (SET_KIND_NONE != setop->output_set.set_kind)
939   {
940     output_set = lookup_set (session, &setop->output_set);
941     GNUNET_assert (NULL != output_set);
942   }
943
944   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
945   {
946     output_diff = lookup_diff (session, &setop->output_diff);
947     GNUNET_assert (NULL != output_diff);
948   }
949
950   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
951   {
952     output_rfn = lookup_rfn (session, &setop->output_rfn);
953     GNUNET_assert (NULL != output_rfn);
954   }
955
956   if (GNUNET_YES == session->peers_blacklisted[other_idx])
957   {
958     /* Peer might have been blacklisted
959        by a gradecast running in parallel, ignore elements from now */
960     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
961       return;
962     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
963       return;
964   }
965
966   if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
967   {
968     if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
969     {
970       GNUNET_assert (NULL != output_rfn);
971       rfn_contest (output_rfn, task_other_peer (task));
972       return;
973     }
974   }
975
976   switch (status)
977   {
978     case GNUNET_SET_STATUS_ADD_LOCAL:
979       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980                   "Adding element in Task {%s}\n",
981                   debug_str_task_key (&task->key));
982       if (NULL != output_set)
983       {
984         // FIXME: record pending adds, use callback
985         GNUNET_SET_add_element (output_set->h,
986                                 element,
987                                 NULL,
988                                 NULL);
989 #ifdef GNUNET_EXTRA_LOGGING
990         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
991                     "P%u: adding element %s into set {%s} of task {%s}\n",
992                     session->local_peer_idx,
993                     debug_str_element (element),
994                     debug_str_set_key (&setop->output_set),
995                     debug_str_task_key (&task->key));
996 #endif
997       }
998       if (NULL != output_diff)
999       {
1000         diff_insert (output_diff, 1, element);
1001 #ifdef GNUNET_EXTRA_LOGGING
1002         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1003                     "P%u: adding element %s into diff {%s} of task {%s}\n",
1004                     session->local_peer_idx,
1005                     debug_str_element (element),
1006                     debug_str_diff_key (&setop->output_diff),
1007                     debug_str_task_key (&task->key));
1008 #endif
1009       }
1010       if (NULL != output_rfn)
1011       {
1012         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1013 #ifdef GNUNET_EXTRA_LOGGING
1014         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1015                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
1016                     session->local_peer_idx,
1017                     debug_str_element (element),
1018                     debug_str_rfn_key (&setop->output_rfn),
1019                     debug_str_task_key (&task->key));
1020 #endif
1021       }
1022       // XXX: add result to structures in task
1023       break;
1024     case GNUNET_SET_STATUS_ADD_REMOTE:
1025       if (GNUNET_YES == setop->do_not_remove)
1026         break;
1027       if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
1028         break;
1029       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030                   "Removing element in Task {%s}\n",
1031                   debug_str_task_key (&task->key));
1032       if (NULL != output_set)
1033       {
1034         // FIXME: record pending adds, use callback
1035         GNUNET_SET_remove_element (output_set->h,
1036                                    element,
1037                                    NULL,
1038                                    NULL);
1039 #ifdef GNUNET_EXTRA_LOGGING
1040         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1041                     "P%u: removing element %s from set {%s} of task {%s}\n",
1042                     session->local_peer_idx,
1043                     debug_str_element (element),
1044                     debug_str_set_key (&setop->output_set),
1045                     debug_str_task_key (&task->key));
1046 #endif
1047       }
1048       if (NULL != output_diff)
1049       {
1050         diff_insert (output_diff, -1, element);
1051 #ifdef GNUNET_EXTRA_LOGGING
1052         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1053                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1054                     session->local_peer_idx,
1055                     debug_str_element (element),
1056                     debug_str_diff_key (&setop->output_diff),
1057                     debug_str_task_key (&task->key));
1058 #endif
1059       }
1060       if (NULL != output_rfn)
1061       {
1062         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1063 #ifdef GNUNET_EXTRA_LOGGING
1064         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1065                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1066                     session->local_peer_idx,
1067                     debug_str_element (element),
1068                     debug_str_rfn_key (&setop->output_rfn),
1069                     debug_str_task_key (&task->key));
1070 #endif
1071       }
1072       break;
1073     case GNUNET_SET_STATUS_DONE:
1074       // XXX: check first if any changes to the underlying
1075       // set are still pending
1076       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077                   "Finishing setop in Task {%s}\n",
1078                   debug_str_task_key (&task->key));
1079       if (NULL != output_rfn)
1080       {
1081         rfn_commit (output_rfn, task_other_peer (task));
1082       }
1083       finish_task (task);
1084       break;
1085     case GNUNET_SET_STATUS_FAILURE:
1086       // XXX: cleanup
1087       GNUNET_break_op (0);
1088       finish_task (task);
1089       return;
1090     default:
1091       /* not reached */
1092       GNUNET_assert (0);
1093   }
1094 }
1095
1096 #ifdef EVIL
1097
1098 enum EvilnessType
1099 {
1100   EVILNESS_NONE,
1101   EVILNESS_CRAM_ALL,
1102   EVILNESS_CRAM_LEAD,
1103   EVILNESS_CRAM_ECHO,
1104   EVILNESS_SLACK,
1105 };
1106
1107 enum EvilnessSubType
1108 {
1109   EVILNESS_SUB_NONE,
1110   EVILNESS_SUB_REPLACEMENT,
1111   EVILNESS_SUB_NO_REPLACEMENT,
1112 };
1113
1114 struct Evilness
1115 {
1116   enum EvilnessType type;
1117   enum EvilnessSubType subtype;
1118   unsigned int num;
1119 };
1120
1121
1122 static int
1123 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1124 {
1125   if (0 == strcmp ("replace", evil_subtype_str))
1126   {
1127     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1128   }
1129   else if (0 == strcmp ("noreplace", evil_subtype_str))
1130   {
1131     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1132   }
1133   else
1134   {
1135     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1136                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1137                 evil_subtype_str);
1138     return GNUNET_SYSERR;
1139   }
1140   return GNUNET_OK;
1141 }
1142
1143
1144 static void
1145 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1146 {
1147   char *evil_spec;
1148   char *field;
1149   char *evil_type_str = NULL;
1150   char *evil_subtype_str = NULL;
1151
1152   GNUNET_assert (NULL != evil);
1153
1154   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1155   {
1156     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157                 "P%u: no evilness\n",
1158                 session->local_peer_idx);
1159     evil->type = EVILNESS_NONE;
1160     return;
1161   }
1162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163               "P%u: got evilness spec\n",
1164               session->local_peer_idx);
1165
1166   for (field = strtok (evil_spec, "/");
1167        NULL != field;
1168        field = strtok (NULL, "/"))
1169   {
1170     unsigned int peer_num;
1171     unsigned int evil_num;
1172     int ret;
1173
1174     evil_type_str = NULL;
1175     evil_subtype_str = NULL;
1176
1177     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1178
1179     if (ret != 4)
1180     {
1181       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1182                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1183                   field,
1184                   ret);
1185       goto not_evil;
1186     }
1187
1188     GNUNET_assert (NULL != evil_type_str);
1189     GNUNET_assert (NULL != evil_subtype_str);
1190
1191     if (peer_num == session->local_peer_idx)
1192     {
1193       if (0 == strcmp ("slack", evil_type_str))
1194       {
1195         evil->type = EVILNESS_SLACK;
1196       }
1197       else if (0 == strcmp ("cram-all", evil_type_str))
1198       {
1199         evil->type = EVILNESS_CRAM_ALL;
1200         evil->num = evil_num;
1201         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1202           goto not_evil;
1203       }
1204       else if (0 == strcmp ("cram-lead", evil_type_str))
1205       {
1206         evil->type = EVILNESS_CRAM_LEAD;
1207         evil->num = evil_num;
1208         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1209           goto not_evil;
1210       }
1211       else if (0 == strcmp ("cram-echo", evil_type_str))
1212       {
1213         evil->type = EVILNESS_CRAM_ECHO;
1214         evil->num = evil_num;
1215         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1216           goto not_evil;
1217       }
1218       else
1219       {
1220         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1221                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1222                     evil_type_str);
1223         goto not_evil;
1224       }
1225       goto cleanup;
1226     }
1227     /* No GNUNET_free since memory was allocated by libc */
1228     free (evil_type_str);
1229     evil_type_str = NULL;
1230     evil_subtype_str = NULL;
1231   }
1232 not_evil:
1233   evil->type = EVILNESS_NONE;
1234 cleanup:
1235   GNUNET_free (evil_spec);
1236   /* no GNUNET_free_non_null since it wasn't
1237    * allocated with GNUNET_malloc */
1238   if (NULL != evil_type_str)
1239     free (evil_type_str);
1240   if (NULL != evil_subtype_str)
1241     free (evil_subtype_str);
1242 }
1243
1244 #endif
1245
1246
1247 /**
1248  * Commit the appropriate set for a
1249  * task.
1250  */
1251 static void
1252 commit_set (struct ConsensusSession *session,
1253             struct TaskEntry *task)
1254 {
1255   struct SetEntry *set;
1256   struct SetOpCls *setop = &task->cls.setop;
1257
1258   GNUNET_assert (NULL != setop->op);
1259   set = lookup_set (session, &setop->input_set);
1260   GNUNET_assert (NULL != set);
1261
1262 #ifdef EVIL
1263   {
1264     unsigned int i;
1265     struct Evilness evil;
1266
1267     get_evilness (session, &evil);
1268     if (EVILNESS_NONE != evil.type)
1269     {
1270       /* Useful for evaluation */
1271       GNUNET_STATISTICS_set (statistics,
1272                              "is evil",
1273                              1,
1274                              GNUNET_NO);
1275     }
1276     switch (evil.type)
1277     {
1278       case EVILNESS_CRAM_ALL:
1279       case EVILNESS_CRAM_LEAD:
1280       case EVILNESS_CRAM_ECHO:
1281         /* We're not cramming elements in the
1282            all-to-all round, since that would just
1283            add more elements to the result set, but
1284            wouldn't test robustness. */
1285         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1286         {
1287           GNUNET_SET_commit (setop->op, set->h);
1288           break;
1289         }
1290         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1291             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1292         {
1293           GNUNET_SET_commit (setop->op, set->h);
1294           break;
1295         }
1296         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1297         {
1298           GNUNET_SET_commit (setop->op, set->h);
1299           break;
1300         }
1301         for (i = 0; i < evil.num; i++)
1302         {
1303           struct GNUNET_HashCode hash;
1304           struct GNUNET_SET_Element element;
1305           element.data = &hash;
1306           element.size = sizeof (struct GNUNET_HashCode);
1307           element.element_type = 0;
1308
1309           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1310           {
1311             /* Always generate a new element. */
1312             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
1313           }
1314           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1315           {
1316             /* Always cram the same elements, derived from counter. */
1317             GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
1318           }
1319           else
1320           {
1321             GNUNET_assert (0);
1322           }
1323           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1324 #ifdef GNUNET_EXTRA_LOGGING
1325           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1326                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1327                       session->local_peer_idx,
1328                       debug_str_element (&element),
1329                       debug_str_set_key (&setop->input_set),
1330                       debug_str_task_key (&task->key));
1331 #endif
1332         }
1333         GNUNET_STATISTICS_update (statistics,
1334                                   "# stuffed elements",
1335                                   evil.num,
1336                                   GNUNET_NO);
1337         GNUNET_SET_commit (setop->op, set->h);
1338         break;
1339       case EVILNESS_SLACK:
1340         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1341                     "P%u: evil peer: slacking\n",
1342                     session->local_peer_idx,
1343                     evil.num);
1344         /* Do nothing. */
1345         break;
1346       case EVILNESS_NONE:
1347         GNUNET_SET_commit (setop->op, set->h);
1348         break;
1349     }
1350   }
1351 #else
1352   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1353   {
1354     struct GNUNET_SET_Element element;
1355     struct ContestedPayload payload;
1356     element.data = &payload;
1357     element.size = sizeof (struct ContestedPayload);
1358     element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1359     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1360   }
1361   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1362   {
1363     GNUNET_SET_commit (setop->op, set->h);
1364   }
1365   else
1366   {
1367     /* For our testcases, we don't want the blacklisted
1368        peers to wait. */
1369     GNUNET_SET_operation_cancel (setop->op);
1370     setop->op = NULL;
1371   }
1372 #endif
1373 }
1374
1375
1376 static void
1377 put_diff (struct ConsensusSession *session,
1378          struct DiffEntry *diff)
1379 {
1380   struct GNUNET_HashCode hash;
1381
1382   GNUNET_assert (NULL != diff);
1383
1384   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1385   GNUNET_assert (GNUNET_OK ==
1386                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1387                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1388 }
1389
1390 static void
1391 put_set (struct ConsensusSession *session,
1392          struct SetEntry *set)
1393 {
1394   struct GNUNET_HashCode hash;
1395
1396   GNUNET_assert (NULL != set->h);
1397
1398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1399               "Putting set %s\n",
1400               debug_str_set_key (&set->key));
1401
1402   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1403   GNUNET_assert (GNUNET_SYSERR !=
1404                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1405                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1406 }
1407
1408
1409 static void
1410 put_rfn (struct ConsensusSession *session,
1411          struct ReferendumEntry *rfn)
1412 {
1413   struct GNUNET_HashCode hash;
1414
1415   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1416   GNUNET_assert (GNUNET_OK ==
1417                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1418                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1419 }
1420
1421
1422
1423 static void
1424 task_cancel_reconcile (struct TaskEntry *task)
1425 {
1426   /* not implemented yet */
1427   GNUNET_assert (0);
1428 }
1429
1430
1431 static void
1432 apply_diff_to_rfn (struct DiffEntry *diff,
1433                    struct ReferendumEntry *rfn,
1434                    uint16_t voting_peer,
1435                    uint16_t num_peers)
1436 {
1437   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1438   struct DiffElementInfo *di;
1439
1440   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1441
1442   while (GNUNET_YES ==
1443          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1444                                                       NULL,
1445                                                       (const void **) &di))
1446   {
1447     if (di->weight > 0)
1448     {
1449       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1450     }
1451     if (di->weight < 0)
1452     {
1453       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1454     }
1455   }
1456
1457   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1458 }
1459
1460
1461 struct DiffEntry *
1462 diff_create ()
1463 {
1464   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1465
1466   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1467
1468   return d;
1469 }
1470
1471
1472 struct DiffEntry *
1473 diff_compose (struct DiffEntry *diff_1,
1474               struct DiffEntry *diff_2)
1475 {
1476   struct DiffEntry *diff_new;
1477   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1478   struct DiffElementInfo *di;
1479
1480   diff_new = diff_create ();
1481
1482   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1483   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1484   {
1485     diff_insert (diff_new, di->weight, di->element);
1486   }
1487   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1488
1489   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1490   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1491   {
1492     diff_insert (diff_new, di->weight, di->element);
1493   }
1494   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1495
1496   return diff_new;
1497 }
1498
1499
1500 struct ReferendumEntry *
1501 rfn_create (uint16_t size)
1502 {
1503   struct ReferendumEntry *rfn;
1504
1505   rfn = GNUNET_new (struct ReferendumEntry);
1506   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1507   rfn->peer_commited = GNUNET_new_array (size, int);
1508   rfn->peer_contested = GNUNET_new_array (size, int);
1509   rfn->num_peers = size;
1510
1511   return rfn;
1512 }
1513
1514
1515 void
1516 diff_destroy (struct DiffEntry *diff)
1517 {
1518   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1519   GNUNET_free (diff);
1520 }
1521
1522
1523 /**
1524  * For a given majority, count what the outcome
1525  * is (add/remove/keep), and give the number
1526  * of peers that voted for this outcome.
1527  */
1528 static void
1529 rfn_majority (const struct ReferendumEntry *rfn,
1530               const struct RfnElementInfo *ri,
1531               uint16_t *ret_majority,
1532               enum ReferendumVote *ret_vote)
1533 {
1534   uint16_t votes_yes = 0;
1535   uint16_t num_commited = 0;
1536   uint16_t i;
1537
1538   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1539               "Computing rfn majority for element %s of rfn {%s}\n",
1540               debug_str_element (ri->element),
1541               debug_str_rfn_key (&rfn->key));
1542
1543   for (i = 0; i < rfn->num_peers; i++)
1544   {
1545     if (GNUNET_NO == rfn->peer_commited[i])
1546       continue;
1547     num_commited++;
1548
1549     if (GNUNET_YES == ri->votes[i])
1550       votes_yes++;
1551   }
1552
1553   if (votes_yes > (num_commited) / 2)
1554   {
1555     *ret_vote = ri->proposal;
1556     *ret_majority = votes_yes;
1557   }
1558   else
1559   {
1560     *ret_vote = VOTE_STAY;
1561     *ret_majority = num_commited - votes_yes;
1562   }
1563 }
1564
1565
1566 struct SetCopyCls
1567 {
1568   struct TaskEntry *task;
1569   struct SetKey dst_set_key;
1570 };
1571
1572
1573 static void
1574 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1575 {
1576   struct SetCopyCls *scc = cls;
1577   struct TaskEntry *task = scc->task;
1578   struct SetKey dst_set_key = scc->dst_set_key;
1579   struct SetEntry *set;
1580
1581   GNUNET_free (scc);
1582   set = GNUNET_new (struct SetEntry);
1583   set->h = copy;
1584   set->key = dst_set_key;
1585   put_set (task->step->session, set);
1586
1587   task->start (task);
1588 }
1589
1590
1591 /**
1592  * Call the start function of the given
1593  * task again after we created a copy of the given set.
1594  */
1595 static void
1596 create_set_copy_for_task (struct TaskEntry *task,
1597                           struct SetKey *src_set_key,
1598                           struct SetKey *dst_set_key)
1599 {
1600   struct SetEntry *src_set;
1601   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1602
1603   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1604               "Copying set {%s} to {%s} for task {%s}\n",
1605               debug_str_set_key (src_set_key),
1606               debug_str_set_key (dst_set_key),
1607               debug_str_task_key (&task->key));
1608
1609   scc->task = task;
1610   scc->dst_set_key = *dst_set_key;
1611   src_set = lookup_set (task->step->session, src_set_key);
1612   GNUNET_assert (NULL != src_set);
1613   GNUNET_SET_copy_lazy (src_set->h,
1614                         set_copy_cb,
1615                         scc);
1616 }
1617
1618
1619 struct SetMutationProgressCls
1620 {
1621   int num_pending;
1622   /**
1623    * Task to finish once all changes are through.
1624    */
1625   struct TaskEntry *task;
1626 };
1627
1628
1629 static void
1630 set_mutation_done (void *cls)
1631 {
1632   struct SetMutationProgressCls *pc = cls;
1633
1634   GNUNET_assert (pc->num_pending > 0);
1635
1636   pc->num_pending--;
1637
1638   if (0 == pc->num_pending)
1639   {
1640     struct TaskEntry *task = pc->task;
1641     GNUNET_free (pc);
1642     finish_task (task);
1643   }
1644 }
1645
1646
1647 static void
1648 try_finish_step_early (struct Step *step)
1649 {
1650   unsigned int i;
1651
1652   if (GNUNET_YES == step->is_running)
1653     return;
1654   if (GNUNET_YES == step->is_finished)
1655     return;
1656   if (GNUNET_NO == step->early_finishable)
1657     return;
1658
1659   step->is_finished = GNUNET_YES;
1660
1661 #ifdef GNUNET_EXTRA_LOGGING
1662   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1663               "Finishing step `%s' early.\n",
1664               step->debug_name);
1665 #endif
1666
1667   for (i = 0; i < step->subordinates_len; i++)
1668   {
1669     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1670     step->subordinates[i]->pending_prereq--;
1671 #ifdef GNUNET_EXTRA_LOGGING
1672     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673                 "Decreased pending_prereq to %u for step `%s'.\n",
1674                 step->subordinates[i]->pending_prereq,
1675                 step->subordinates[i]->debug_name);
1676
1677 #endif
1678     try_finish_step_early (step->subordinates[i]);
1679   }
1680
1681   // XXX: maybe schedule as task to avoid recursion?
1682   run_ready_steps (step->session);
1683 }
1684
1685
1686 static void
1687 finish_step (struct Step *step)
1688 {
1689   unsigned int i;
1690
1691   GNUNET_assert (step->finished_tasks == step->tasks_len);
1692   GNUNET_assert (GNUNET_YES == step->is_running);
1693   GNUNET_assert (GNUNET_NO == step->is_finished);
1694
1695 #ifdef GNUNET_EXTRA_LOGGING
1696   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697               "All tasks of step `%s' with %u subordinates finished.\n",
1698               step->debug_name,
1699               step->subordinates_len);
1700 #endif
1701
1702   for (i = 0; i < step->subordinates_len; i++)
1703   {
1704     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1705     step->subordinates[i]->pending_prereq--;
1706 #ifdef GNUNET_EXTRA_LOGGING
1707     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1708                 "Decreased pending_prereq to %u for step `%s'.\n",
1709                 step->subordinates[i]->pending_prereq,
1710                 step->subordinates[i]->debug_name);
1711
1712 #endif
1713   }
1714
1715   step->is_finished = GNUNET_YES;
1716
1717   // XXX: maybe schedule as task to avoid recursion?
1718   run_ready_steps (step->session);
1719 }
1720
1721
1722
1723 /**
1724  * Apply the result from one round of gradecasts (i.e. every peer
1725  * should have gradecasted) to the peer's current set.
1726  *
1727  * @param task the task with context information
1728  */
1729 static void
1730 task_start_apply_round (struct TaskEntry *task)
1731 {
1732   struct ConsensusSession *session = task->step->session;
1733   struct SetKey sk_in;
1734   struct SetKey sk_out;
1735   struct RfnKey rk_in;
1736   struct SetEntry *set_out;
1737   struct ReferendumEntry *rfn_in;
1738   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1739   struct RfnElementInfo *ri;
1740   struct SetMutationProgressCls *progress_cls;
1741   uint16_t worst_majority = UINT16_MAX;
1742
1743   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1744   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1745   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1746
1747   set_out = lookup_set (session, &sk_out);
1748   if (NULL == set_out)
1749   {
1750     create_set_copy_for_task (task, &sk_in, &sk_out);
1751     return;
1752   }
1753
1754   rfn_in = lookup_rfn (session, &rk_in);
1755   GNUNET_assert (NULL != rfn_in);
1756
1757   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1758   progress_cls->task = task;
1759
1760   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1761
1762   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1763   {
1764     uint16_t majority_num;
1765     enum ReferendumVote majority_vote;
1766
1767     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1768
1769     if (worst_majority > majority_num)
1770       worst_majority = majority_num;
1771
1772     switch (majority_vote)
1773     {
1774       case VOTE_ADD:
1775         progress_cls->num_pending++;
1776         GNUNET_assert (GNUNET_OK ==
1777                        GNUNET_SET_add_element (set_out->h,
1778                                                ri->element,
1779                                                set_mutation_done,
1780                                                progress_cls));
1781         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1782                     "P%u: apply round: adding element %s with %u-majority.\n",
1783                     session->local_peer_idx,
1784                     debug_str_element (ri->element), majority_num);
1785         break;
1786       case VOTE_REMOVE:
1787         progress_cls->num_pending++;
1788         GNUNET_assert (GNUNET_OK ==
1789                        GNUNET_SET_remove_element (set_out->h,
1790                                                   ri->element,
1791                                                   set_mutation_done,
1792                                                   progress_cls));
1793         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1794                     "P%u: apply round: deleting element %s with %u-majority.\n",
1795                     session->local_peer_idx,
1796                     debug_str_element (ri->element), majority_num);
1797         break;
1798       case VOTE_STAY:
1799         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1800                     "P%u: apply round: keeping element %s with %u-majority.\n",
1801                     session->local_peer_idx,
1802                     debug_str_element (ri->element), majority_num);
1803         // do nothing
1804         break;
1805       default:
1806         GNUNET_assert (0);
1807         break;
1808     }
1809   }
1810
1811   if (progress_cls->num_pending == 0)
1812   {
1813     // call closure right now, no pending ops
1814     GNUNET_free (progress_cls);
1815     finish_task (task);
1816   }
1817
1818   {
1819     uint16_t thresh = (session->num_peers / 3) * 2;
1820
1821     if (worst_majority >= thresh)
1822     {
1823       switch (session->early_stopping)
1824       {
1825         case EARLY_STOPPING_NONE:
1826           session->early_stopping = EARLY_STOPPING_ONE_MORE;
1827           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1828                       "P%u: Stopping early (after one more superround)\n",
1829                       session->local_peer_idx);
1830           break;
1831         case EARLY_STOPPING_ONE_MORE:
1832           GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1833                       session->local_peer_idx);
1834           session->early_stopping = EARLY_STOPPING_DONE;
1835           {
1836             struct Step *step;
1837             for (step = session->steps_head; NULL != step; step = step->next)
1838               try_finish_step_early (step);
1839           }
1840           break;
1841         case EARLY_STOPPING_DONE:
1842           /* We shouldn't be here anymore after early stopping */
1843           GNUNET_break (0);
1844           break;
1845         default:
1846           GNUNET_assert (0);
1847           break;
1848       }
1849     }
1850     else if (EARLY_STOPPING_NONE != session->early_stopping)
1851     {
1852       // Our assumption about the number of bad peers
1853       // has been broken.
1854       GNUNET_break_op (0);
1855     }
1856     else
1857     {
1858       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1859                   session->local_peer_idx);
1860     }
1861   }
1862
1863 }
1864
1865
1866 static void
1867 task_start_grade (struct TaskEntry *task)
1868 {
1869   struct ConsensusSession *session = task->step->session;
1870   struct ReferendumEntry *output_rfn;
1871   struct ReferendumEntry *input_rfn;
1872   struct DiffEntry *input_diff;
1873   struct RfnKey rfn_key;
1874   struct DiffKey diff_key;
1875   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1876   struct RfnElementInfo *ri;
1877   unsigned int gradecast_confidence = 2;
1878
1879   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1880   output_rfn = lookup_rfn (session, &rfn_key);
1881   if (NULL == output_rfn)
1882   {
1883     output_rfn = rfn_create (session->num_peers);
1884     output_rfn->key = rfn_key;
1885     put_rfn (session, output_rfn);
1886   }
1887
1888   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1889   input_diff = lookup_diff (session, &diff_key);
1890   GNUNET_assert (NULL != input_diff);
1891
1892   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1893   input_rfn = lookup_rfn (session, &rfn_key);
1894   GNUNET_assert (NULL != input_rfn);
1895
1896   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1897
1898   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1899
1900   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1901   {
1902     uint16_t majority_num;
1903     enum ReferendumVote majority_vote;
1904
1905     // XXX: we need contested votes and non-contested votes here
1906     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1907
1908     if (majority_num <= session->num_peers / 3)
1909       majority_vote = VOTE_REMOVE;
1910
1911     switch (majority_vote)
1912     {
1913       case VOTE_STAY:
1914         break;
1915       case VOTE_ADD:
1916         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1917         break;
1918       case VOTE_REMOVE:
1919         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1920         break;
1921       default:
1922         GNUNET_assert (0);
1923         break;
1924     }
1925   }
1926
1927   {
1928     uint16_t noncontested;
1929     noncontested = rfn_noncontested (input_rfn);
1930     if (noncontested < (session->num_peers / 3) * 2)
1931     {
1932       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1933     }
1934     if (noncontested < (session->num_peers / 3) + 1)
1935     {
1936       gradecast_confidence = 0;
1937     }
1938   }
1939
1940   if (gradecast_confidence >= 1)
1941     rfn_commit (output_rfn, task->key.leader);
1942
1943   if (gradecast_confidence <= 1)
1944     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1945
1946   finish_task (task);
1947 }
1948
1949
1950 static void
1951 task_start_reconcile (struct TaskEntry *task)
1952 {
1953   struct SetEntry *input;
1954   struct SetOpCls *setop = &task->cls.setop;
1955   struct ConsensusSession *session = task->step->session;
1956
1957   input = lookup_set (session, &setop->input_set);
1958   GNUNET_assert (NULL != input);
1959   GNUNET_assert (NULL != input->h);
1960
1961   /* We create the outputs for the operation here
1962      (rather than in the set operation callback)
1963      because we want something valid in there, even
1964      if the other peer doesn't talk to us */
1965
1966   if (SET_KIND_NONE != setop->output_set.set_kind)
1967   {
1968     /* If we don't have an existing output set,
1969        we clone the input set. */
1970     if (NULL == lookup_set (session, &setop->output_set))
1971     {
1972       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1973       return;
1974     }
1975   }
1976
1977   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1978   {
1979     if (NULL == lookup_rfn (session, &setop->output_rfn))
1980     {
1981       struct ReferendumEntry *rfn;
1982
1983       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984                   "P%u: output rfn <%s> missing, creating.\n",
1985                   session->local_peer_idx,
1986                   debug_str_rfn_key (&setop->output_rfn));
1987
1988       rfn = rfn_create (session->num_peers);
1989       rfn->key = setop->output_rfn;
1990       put_rfn (session, rfn);
1991     }
1992   }
1993
1994   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1995   {
1996     if (NULL == lookup_diff (session, &setop->output_diff))
1997     {
1998       struct DiffEntry *diff;
1999
2000       diff = diff_create ();
2001       diff->key = setop->output_diff;
2002       put_diff (session, diff);
2003     }
2004   }
2005
2006   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2007   {
2008     /* XXX: mark the corresponding rfn as commited if necessary */
2009     finish_task (task);
2010     return;
2011   }
2012
2013   if (task->key.peer1 == session->local_peer_idx)
2014   {
2015     struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
2016
2017     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2018                 "P%u: Looking up set {%s} to run remote union\n",
2019                 session->local_peer_idx,
2020                 debug_str_set_key (&setop->input_set));
2021
2022     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2023     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2024
2025     rcm.kind = htons (task->key.kind);
2026     rcm.peer1 = htons (task->key.peer1);
2027     rcm.peer2 = htons (task->key.peer2);
2028     rcm.leader = htons (task->key.leader);
2029     rcm.repetition = htons (task->key.repetition);
2030
2031     GNUNET_assert (NULL == setop->op);
2032     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2033                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2034
2035     // XXX: maybe this should be done while
2036     // setting up tasks alreays?
2037     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2038                                     &session->global_id,
2039                                     &rcm.header,
2040                                     GNUNET_SET_RESULT_SYMMETRIC,
2041                                     set_result_cb,
2042                                     task);
2043
2044     commit_set (session, task);
2045   }
2046   else if (task->key.peer2 == session->local_peer_idx)
2047   {
2048     /* Wait for the other peer to contact us */
2049     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2050                 session->local_peer_idx, task->key.peer1);
2051
2052     if (NULL != setop->op)
2053     {
2054       commit_set (session, task);
2055     }
2056   }
2057   else
2058   {
2059     /* We made an error while constructing the task graph. */
2060     GNUNET_assert (0);
2061   }
2062 }
2063
2064
2065 static void
2066 task_start_eval_echo (struct TaskEntry *task)
2067 {
2068   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2069   struct ReferendumEntry *input_rfn;
2070   struct RfnElementInfo *ri;
2071   struct SetEntry *output_set;
2072   struct SetMutationProgressCls *progress_cls;
2073   struct ConsensusSession *session = task->step->session;
2074   struct SetKey sk_in;
2075   struct SetKey sk_out;
2076   struct RfnKey rk_in;
2077
2078   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2079   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2080   output_set = lookup_set (session, &sk_out);
2081   if (NULL == output_set)
2082   {
2083     create_set_copy_for_task (task, &sk_in, &sk_out);
2084     return;
2085   }
2086
2087
2088   {
2089     // FIXME: should be marked as a shallow copy, so
2090     // we can destroy everything correctly
2091     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2092     last_set->h = output_set->h;
2093     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2094     put_set (session, last_set);
2095   }
2096
2097   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2098               "Evaluating referendum in Task {%s}\n",
2099               debug_str_task_key (&task->key));
2100
2101   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2102   progress_cls->task = task;
2103
2104   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2105   input_rfn = lookup_rfn (session, &rk_in);
2106
2107   GNUNET_assert (NULL != input_rfn);
2108
2109   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2110   GNUNET_assert (NULL != iter);
2111
2112   while (GNUNET_YES ==
2113          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2114                                                       NULL,
2115                                                       (const void **) &ri))
2116   {
2117     enum ReferendumVote majority_vote;
2118     uint16_t majority_num;
2119
2120     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2121
2122     if (majority_num < session->num_peers / 3)
2123     {
2124       /* It is not the case that all nonfaulty peers
2125          echoed the same value.  Since we're doing a set reconciliation, we
2126          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2127          reconciliation as contested.  Other peers might not know that the
2128          leader is faulty, thus we still re-distribute in the confirmation
2129          round. */
2130       output_set->is_contested = GNUNET_YES;
2131     }
2132
2133     switch (majority_vote)
2134     {
2135       case VOTE_ADD:
2136         progress_cls->num_pending++;
2137         GNUNET_assert (GNUNET_OK ==
2138                        GNUNET_SET_add_element (output_set->h,
2139                                                ri->element,
2140                                                set_mutation_done,
2141                                                progress_cls));
2142         break;
2143       case VOTE_REMOVE:
2144         progress_cls->num_pending++;
2145         GNUNET_assert (GNUNET_OK ==
2146                        GNUNET_SET_remove_element (output_set->h,
2147                                                   ri->element,
2148                                                   set_mutation_done,
2149                                                   progress_cls));
2150         break;
2151       case VOTE_STAY:
2152         /* Nothing to do. */
2153         break;
2154       default:
2155         /* not reached */
2156         GNUNET_assert (0);
2157     }
2158   }
2159
2160   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2161
2162   if (progress_cls->num_pending == 0)
2163   {
2164     // call closure right now, no pending ops
2165     GNUNET_free (progress_cls);
2166     finish_task (task);
2167   }
2168 }
2169
2170
2171 static void
2172 task_start_finish (struct TaskEntry *task)
2173 {
2174   struct SetEntry *final_set;
2175   struct ConsensusSession *session = task->step->session;
2176
2177   final_set = lookup_set (session, &task->cls.finish.input_set);
2178
2179   GNUNET_assert (NULL != final_set);
2180
2181
2182   GNUNET_SET_iterate (final_set->h,
2183                       send_to_client_iter,
2184                       task);
2185 }
2186
2187 static void
2188 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2189 {
2190   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2191
2192   GNUNET_assert (GNUNET_NO == task->is_started);
2193   GNUNET_assert (GNUNET_NO == task->is_finished);
2194   GNUNET_assert (NULL != task->start);
2195
2196   task->start (task);
2197
2198   task->is_started = GNUNET_YES;
2199 }
2200
2201
2202
2203
2204 /*
2205  * Run all steps of the session that don't any
2206  * more dependencies.
2207  */
2208 static void
2209 run_ready_steps (struct ConsensusSession *session)
2210 {
2211   struct Step *step;
2212
2213   step = session->steps_head;
2214
2215   while (NULL != step)
2216   {
2217     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2218     {
2219       size_t i;
2220
2221       GNUNET_assert (0 == step->finished_tasks);
2222
2223 #ifdef GNUNET_EXTRA_LOGGING
2224       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2225                   session->local_peer_idx,
2226                   step->debug_name,
2227                   step->round, step->tasks_len, step->subordinates_len);
2228 #endif
2229
2230       step->is_running = GNUNET_YES;
2231       for (i = 0; i < step->tasks_len; i++)
2232         start_task (session, step->tasks[i]);
2233
2234       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2235       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2236         finish_step (step);
2237
2238       /* Running the next ready steps will be triggered by task completion */
2239       return;
2240     }
2241     step = step->next;
2242   }
2243
2244   return;
2245 }
2246
2247
2248
2249 static void
2250 finish_task (struct TaskEntry *task)
2251 {
2252   GNUNET_assert (GNUNET_NO == task->is_finished);
2253   task->is_finished = GNUNET_YES;
2254
2255   task->step->finished_tasks++;
2256
2257   if (task->step->finished_tasks == task->step->tasks_len)
2258     finish_step (task->step);
2259 }
2260
2261
2262 /**
2263  * Search peer in the list of peers in session.
2264  *
2265  * @param peer peer to find
2266  * @param session session with peer
2267  * @return index of peer, -1 if peer is not in session
2268  */
2269 static int
2270 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2271 {
2272   int i;
2273   for (i = 0; i < session->num_peers; i++)
2274     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2275       return i;
2276   return -1;
2277 }
2278
2279
2280 /**
2281  * Compute a global, (hopefully) unique consensus session id,
2282  * from the local id of the consensus session, and the identities of all participants.
2283  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2284  * exactly the same peers, the global id will be different.
2285  *
2286  * @param session session to generate the global id for
2287  * @param local_session_id local id of the consensus session
2288  */
2289 static void
2290 compute_global_id (struct ConsensusSession *session,
2291                    const struct GNUNET_HashCode *local_session_id)
2292 {
2293   const char *salt = "gnunet-service-consensus/session_id";
2294
2295   GNUNET_assert (GNUNET_YES ==
2296                  GNUNET_CRYPTO_kdf (&session->global_id,
2297                                     sizeof (struct GNUNET_HashCode),
2298                                     salt,
2299                                     strlen (salt),
2300                                     session->peers,
2301                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2302                                     local_session_id,
2303                                     sizeof (struct GNUNET_HashCode),
2304                                     NULL));
2305 }
2306
2307
2308 /**
2309  * Compare two peer identities.
2310  *
2311  * @param h1 some peer identity
2312  * @param h2 some peer identity
2313  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2314  */
2315 static int
2316 peer_id_cmp (const void *h1, const void *h2)
2317 {
2318   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2319 }
2320
2321
2322 /**
2323  * Create the sorted list of peers for the session,
2324  * add the local peer if not in the join message.
2325  */
2326 static void
2327 initialize_session_peer_list (struct ConsensusSession *session,
2328                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2329 {
2330   unsigned int local_peer_in_list;
2331   uint32_t listed_peers;
2332   const struct GNUNET_PeerIdentity *msg_peers;
2333   unsigned int i;
2334
2335   GNUNET_assert (NULL != join_msg);
2336
2337   /* peers in the join message, may or may not include the local peer */
2338   listed_peers = ntohl (join_msg->num_peers);
2339
2340   session->num_peers = listed_peers;
2341
2342   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2343
2344   local_peer_in_list = GNUNET_NO;
2345   for (i = 0; i < listed_peers; i++)
2346   {
2347     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2348     {
2349       local_peer_in_list = GNUNET_YES;
2350       break;
2351     }
2352   }
2353
2354   if (GNUNET_NO == local_peer_in_list)
2355     session->num_peers++;
2356
2357   session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2358
2359   if (GNUNET_NO == local_peer_in_list)
2360     session->peers[session->num_peers - 1] = my_peer;
2361
2362   memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2363   qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
2364 }
2365
2366
2367 static struct TaskEntry *
2368 lookup_task (struct ConsensusSession *session,
2369              struct TaskKey *key)
2370 {
2371   struct GNUNET_HashCode hash;
2372
2373
2374   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2375   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2376               GNUNET_h2s (&hash));
2377   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2378 }
2379
2380
2381 /**
2382  * Called when another peer wants to do a set operation with the
2383  * local peer.
2384  *
2385  * @param cls closure
2386  * @param other_peer the other peer
2387  * @param context_msg message with application specific information from
2388  *        the other peer
2389  * @param request request from the other peer, use GNUNET_SET_accept
2390  *        to accept it, otherwise the request will be refused
2391  *        Note that we don't use a return value here, as it is also
2392  *        necessary to specify the set we want to do the operation with,
2393  *        whith sometimes can be derived from the context message.
2394  *        Also necessary to specify the timeout.
2395  */
2396 static void
2397 set_listen_cb (void *cls,
2398                const struct GNUNET_PeerIdentity *other_peer,
2399                const struct GNUNET_MessageHeader *context_msg,
2400                struct GNUNET_SET_Request *request)
2401 {
2402   struct ConsensusSession *session = cls;
2403   struct TaskKey tk;
2404   struct TaskEntry *task;
2405   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2406
2407   if (NULL == context_msg)
2408   {
2409     GNUNET_break_op (0);
2410     return;
2411   }
2412
2413   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2414   {
2415     GNUNET_break_op (0);
2416     return;
2417   }
2418
2419   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2420   {
2421     GNUNET_break_op (0);
2422     return;
2423   }
2424
2425   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2426
2427   tk = ((struct TaskKey) {
2428       .kind = ntohs (cm->kind),
2429       .peer1 = ntohs (cm->peer1),
2430       .peer2 = ntohs (cm->peer2),
2431       .repetition = ntohs (cm->repetition),
2432       .leader = ntohs (cm->leader),
2433   });
2434
2435   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2436               session->local_peer_idx, debug_str_task_key (&tk));
2437
2438   task = lookup_task (session, &tk);
2439
2440   if (NULL == task)
2441   {
2442     GNUNET_break_op (0);
2443     return;
2444   }
2445
2446   if (GNUNET_YES == task->is_finished)
2447   {
2448     GNUNET_break_op (0);
2449     return;
2450   }
2451
2452   if (task->key.peer2 != session->local_peer_idx)
2453   {
2454     /* We're being asked, so we must be thne 2nd peer. */
2455     GNUNET_break_op (0);
2456     return;
2457   }
2458
2459   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2460                     (task->key.peer2 == session->local_peer_idx)));
2461
2462   task->cls.setop.op = GNUNET_SET_accept (request,
2463                                           GNUNET_SET_RESULT_SYMMETRIC,
2464                                           set_result_cb,
2465                                           task);
2466
2467   /* If the task hasn't been started yet,
2468      we wait for that until we commit. */
2469
2470   if (GNUNET_YES == task->is_started)
2471   {
2472     commit_set (session, task);
2473   }
2474 }
2475
2476
2477
2478 static void
2479 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2480           struct TaskEntry *t)
2481 {
2482   struct GNUNET_HashCode round_hash;
2483   struct Step *s;
2484
2485   GNUNET_assert (NULL != t->step);
2486
2487   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2488
2489   s = t->step;
2490
2491   if (s->tasks_len == s->tasks_cap)
2492   {
2493     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2494     GNUNET_array_grow (s->tasks,
2495                        s->tasks_cap,
2496                        target_size);
2497   }
2498
2499 #ifdef GNUNET_EXTRA_LOGGING
2500   GNUNET_assert (NULL != s->debug_name);
2501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2502               debug_str_task_key (&t->key),
2503               s->debug_name);
2504 #endif
2505
2506   s->tasks[s->tasks_len] = t;
2507   s->tasks_len++;
2508
2509   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2510   GNUNET_assert (GNUNET_OK ==
2511       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2512                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2513 }
2514
2515
2516 static void
2517 install_step_timeouts (struct ConsensusSession *session)
2518 {
2519   /* Given the fully constructed task graph
2520      with rounds for tasks, we can give the tasks timeouts. */
2521
2522   // unsigned int max_round;
2523
2524   /* XXX: implement! */
2525 }
2526
2527
2528
2529 /*
2530  * Arrange two peers in some canonical order.
2531  */
2532 static void
2533 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2534 {
2535   uint16_t a;
2536   uint16_t b;
2537
2538   GNUNET_assert (*p1 < n);
2539   GNUNET_assert (*p2 < n);
2540
2541   if (*p1 < *p2)
2542   {
2543     a = *p1;
2544     b = *p2;
2545   }
2546   else
2547   {
2548     a = *p2;
2549     b = *p1;
2550   }
2551
2552   /* For uniformly random *p1, *p2,
2553      this condition is true with 50% chance */
2554   if (((b - a) + n) % n <= n / 2)
2555   {
2556     *p1 = a;
2557     *p2 = b;
2558   }
2559   else
2560   {
2561     *p1 = b;
2562     *p2 = a;
2563   }
2564 }
2565
2566
2567 /**
2568  * Record @a dep as a dependency of @a step.
2569  */
2570 static void
2571 step_depend_on (struct Step *step, struct Step *dep)
2572 {
2573   /* We're not checking for cyclic dependencies,
2574      but this is a cheap sanity check. */
2575   GNUNET_assert (step != dep);
2576   GNUNET_assert (NULL != step);
2577   GNUNET_assert (NULL != dep);
2578   GNUNET_assert (dep->round <= step->round);
2579
2580 #ifdef GNUNET_EXTRA_LOGGING
2581   /* Make sure we have complete debugging information.
2582      Also checks that we don't screw up too badly
2583      constructing the task graph. */
2584   GNUNET_assert (NULL != step->debug_name);
2585   GNUNET_assert (NULL != dep->debug_name);
2586   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2587               "Making step `%s' depend on `%s'\n",
2588               step->debug_name,
2589               dep->debug_name);
2590 #endif
2591
2592   if (dep->subordinates_cap == dep->subordinates_len)
2593   {
2594     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2595     GNUNET_array_grow (dep->subordinates,
2596                        dep->subordinates_cap,
2597                        target_size);
2598   }
2599
2600   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2601
2602   dep->subordinates[dep->subordinates_len] = step;
2603   dep->subordinates_len++;
2604
2605   step->pending_prereq++;
2606 }
2607
2608
2609 static struct Step *
2610 create_step (struct ConsensusSession *session, int round, int early_finishable)
2611 {
2612   struct Step *step;
2613   step = GNUNET_new (struct Step);
2614   step->session = session;
2615   step->round = round;
2616   step->early_finishable = early_finishable;
2617   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2618                                     session->steps_tail,
2619                                     step);
2620   return step;
2621 }
2622
2623
2624 /**
2625  * Construct the task graph for a single
2626  * gradecast.
2627  */
2628 static void
2629 construct_task_graph_gradecast (struct ConsensusSession *session,
2630                                 uint16_t rep,
2631                                 uint16_t lead,
2632                                 struct Step *step_before,
2633                                 struct Step *step_after)
2634 {
2635   uint16_t n = session->num_peers;
2636   uint16_t me = session->local_peer_idx;
2637
2638   uint16_t p1;
2639   uint16_t p2;
2640
2641   /* The task we're currently setting up. */
2642   struct TaskEntry task;
2643
2644   struct Step *step;
2645   struct Step *prev_step;
2646
2647   uint16_t round;
2648
2649   unsigned int k;
2650
2651   round = step_before->round + 1;
2652
2653   /* gcast step 1: leader disseminates */
2654
2655   step = create_step (session, round, GNUNET_YES);
2656
2657 #ifdef GNUNET_EXTRA_LOGGING
2658   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2659 #endif
2660   step_depend_on (step, step_before);
2661
2662   if (lead == me)
2663   {
2664     for (k = 0; k < n; k++)
2665     {
2666       if (k == me)
2667         continue;
2668       p1 = me;
2669       p2 = k;
2670       arrange_peers (&p1, &p2, n);
2671       task = ((struct TaskEntry) {
2672         .step = step,
2673         .start = task_start_reconcile,
2674         .cancel = task_cancel_reconcile,
2675         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2676       });
2677       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2678       put_task (session->taskmap, &task);
2679     }
2680     /* We run this task to make sure that the leader
2681        has the stored the SET_KIND_LEADER set of himself,
2682        so he can participate in the rest of the gradecast
2683        without the code having to handle any special cases. */
2684     task = ((struct TaskEntry) {
2685       .step = step,
2686       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2687       .start = task_start_reconcile,
2688       .cancel = task_cancel_reconcile,
2689     });
2690     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2691     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2692     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2693     put_task (session->taskmap, &task);
2694   }
2695   else
2696   {
2697     p1 = me;
2698     p2 = lead;
2699     arrange_peers (&p1, &p2, n);
2700     task = ((struct TaskEntry) {
2701       .step = step,
2702       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead},
2703       .start = task_start_reconcile,
2704       .cancel = task_cancel_reconcile,
2705     });
2706     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2707     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2708     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2709     put_task (session->taskmap, &task);
2710   }
2711
2712   /* gcast phase 2: echo */
2713   prev_step = step;
2714   round += 1;
2715   step = create_step (session, round, GNUNET_YES);
2716 #ifdef GNUNET_EXTRA_LOGGING
2717   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2718 #endif
2719   step_depend_on (step, prev_step);
2720
2721   for (k = 0; k < n; k++)
2722   {
2723     p1 = k;
2724     p2 = me;
2725     arrange_peers (&p1, &p2, n);
2726     task = ((struct TaskEntry) {
2727       .step = step,
2728       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2729       .start = task_start_reconcile,
2730       .cancel = task_cancel_reconcile,
2731     });
2732     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2733     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2734     put_task (session->taskmap, &task);
2735   }
2736
2737   prev_step = step;
2738   /* Same round, since step only has local tasks */
2739   step = create_step (session, round, GNUNET_YES);
2740 #ifdef GNUNET_EXTRA_LOGGING
2741   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2742 #endif
2743   step_depend_on (step, prev_step);
2744
2745   arrange_peers (&p1, &p2, n);
2746   task = ((struct TaskEntry) {
2747     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2748     .step = step,
2749     .start = task_start_eval_echo
2750   });
2751   put_task (session->taskmap, &task);
2752
2753   prev_step = step;
2754   round += 1;
2755   step = create_step (session, round, GNUNET_YES);
2756 #ifdef GNUNET_EXTRA_LOGGING
2757   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2758 #endif
2759   step_depend_on (step, prev_step);
2760
2761   /* gcast phase 3: confirmation and grading */
2762   for (k = 0; k < n; k++)
2763   {
2764     p1 = k;
2765     p2 = me;
2766     arrange_peers (&p1, &p2, n);
2767     task = ((struct TaskEntry) {
2768       .step = step,
2769       .start = task_start_reconcile,
2770       .cancel = task_cancel_reconcile,
2771       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2772     });
2773     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2774     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2775     /* If there was at least one element in the echo round that was
2776        contested (i.e. it had no n-t majority), then we let the other peers
2777        know, and other peers let us know.  The contested flag for each peer is
2778        stored in the rfn. */
2779     task.cls.setop.transceive_contested = GNUNET_YES;
2780     put_task (session->taskmap, &task);
2781   }
2782
2783   prev_step = step;
2784   /* Same round, since step only has local tasks */
2785   step = create_step (session, round, GNUNET_YES);
2786 #ifdef GNUNET_EXTRA_LOGGING
2787   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2788 #endif
2789   step_depend_on (step, prev_step);
2790
2791   task = ((struct TaskEntry) {
2792     .step = step,
2793     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2794     .start = task_start_grade,
2795   });
2796   put_task (session->taskmap, &task);
2797
2798   step_depend_on (step_after, step);
2799 }
2800
2801
2802 static void
2803 construct_task_graph (struct ConsensusSession *session)
2804 {
2805   uint16_t n = session->num_peers;
2806   uint16_t t = n / 3;
2807
2808   uint16_t me = session->local_peer_idx;
2809
2810   uint16_t p1;
2811   uint16_t p2;
2812
2813   /* The task we're currently setting up. */
2814   struct TaskEntry task;
2815
2816   /* Current leader */
2817   unsigned int lead;
2818
2819   struct Step *step;
2820   struct Step *prev_step;
2821
2822   unsigned int round = 0;
2823
2824   unsigned int i;
2825
2826   // XXX: introduce first step,
2827   // where we wait for all insert acks
2828   // from the set service
2829
2830   /* faster but brittle all-to-all */
2831
2832   // XXX: Not implemented yet
2833
2834   /* all-to-all step */
2835
2836   step = create_step (session, round, GNUNET_NO);
2837
2838 #ifdef GNUNET_EXTRA_LOGGING
2839   step->debug_name = GNUNET_strdup ("all to all");
2840 #endif
2841
2842   for (i = 0; i < n; i++)
2843   {
2844     p1 = me;
2845     p2 = i;
2846     arrange_peers (&p1, &p2, n);
2847     task = ((struct TaskEntry) {
2848       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2849       .step = step,
2850       .start = task_start_reconcile,
2851       .cancel = task_cancel_reconcile,
2852     });
2853     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2854     task.cls.setop.output_set = task.cls.setop.input_set;
2855     task.cls.setop.do_not_remove = GNUNET_YES;
2856     put_task (session->taskmap, &task);
2857   }
2858
2859   prev_step = step;
2860   step = NULL;
2861
2862   round += 1;
2863
2864   /* Byzantine union */
2865
2866   /* sequential repetitions of the gradecasts */
2867   for (i = 0; i < t + 1; i++)
2868   {
2869     struct Step *step_rep_start;
2870     struct Step *step_rep_end;
2871
2872     /* Every repetition is in a separate round. */
2873     step_rep_start = create_step (session, round, GNUNET_YES);
2874 #ifdef GNUNET_EXTRA_LOGGING
2875     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2876 #endif
2877
2878     step_depend_on (step_rep_start, prev_step);
2879
2880     /* gradecast has three rounds */
2881     round += 3;
2882     step_rep_end = create_step (session, round, GNUNET_YES);
2883 #ifdef GNUNET_EXTRA_LOGGING
2884     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2885 #endif
2886
2887     /* parallel gradecasts */
2888     for (lead = 0; lead < n; lead++)
2889       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2890
2891     task = ((struct TaskEntry) {
2892       .step = step_rep_end,
2893       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2894       .start = task_start_apply_round,
2895     });
2896     put_task (session->taskmap, &task);
2897
2898     prev_step = step_rep_end;
2899   }
2900
2901  /* There is no next gradecast round, thus the final
2902     start step is the overall end step of the gradecasts */
2903   round += 1;
2904   step = create_step (session, round, GNUNET_NO);
2905 #ifdef GNUNET_EXTRA_LOGGING
2906   GNUNET_asprintf (&step->debug_name, "finish");
2907 #endif
2908   step_depend_on (step, prev_step);
2909
2910   task = ((struct TaskEntry) {
2911     .step = step,
2912     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2913     .start = task_start_finish,
2914   });
2915   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2916
2917   put_task (session->taskmap, &task);
2918 }
2919
2920
2921 /**
2922  * Initialize the session, continue receiving messages from the owning client
2923  *
2924  * @param session the session to initialize
2925  * @param join_msg the join message from the client
2926  */
2927 static void
2928 initialize_session (struct ConsensusSession *session,
2929                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2930 {
2931   struct ConsensusSession *other_session;
2932
2933   initialize_session_peer_list (session, join_msg);
2934   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
2935   compute_global_id (session, &join_msg->session_id);
2936
2937   /* Check if some local client already owns the session.
2938      It is only legal to have a session with an existing global id
2939      if all other sessions with this global id are finished.*/
2940   other_session = sessions_head;
2941   while (NULL != other_session)
2942   {
2943     if ((other_session != session) &&
2944         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2945     {
2946       //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2947       //{
2948       //  GNUNET_break (0);
2949       //  destroy_session (session);
2950       //  return;
2951       //}
2952       break;
2953     }
2954     other_session = other_session->next;
2955   }
2956
2957   session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
2958   session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
2959
2960   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %ums created\n",
2961               (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
2962
2963   session->local_peer_idx = get_peer_idx (&my_peer, session);
2964   GNUNET_assert (-1 != session->local_peer_idx);
2965   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2966                                              &session->global_id,
2967                                              set_listen_cb, session);
2968   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2969
2970   session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2971   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2972   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2973   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2974
2975   {
2976     struct SetEntry *client_set;
2977     client_set = GNUNET_new (struct SetEntry);
2978     client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2979     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2980     put_set (session, client_set);
2981   }
2982
2983   session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2984
2985   /* Just construct the task graph,
2986      but don't run anything until the client calls conclude. */
2987   construct_task_graph (session);
2988
2989   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2990 }
2991
2992
2993 static struct ConsensusSession *
2994 get_session_by_client (struct GNUNET_SERVER_Client *client)
2995 {
2996   struct ConsensusSession *session;
2997
2998   session = sessions_head;
2999   while (NULL != session)
3000   {
3001     if (session->client == client)
3002       return session;
3003     session = session->next;
3004   }
3005   return NULL;
3006 }
3007
3008
3009 /**
3010  * Called when a client wants to join a consensus session.
3011  *
3012  * @param cls unused
3013  * @param client client that sent the message
3014  * @param m message sent by the client
3015  */
3016 static void
3017 client_join (void *cls,
3018              struct GNUNET_SERVER_Client *client,
3019              const struct GNUNET_MessageHeader *m)
3020 {
3021   struct ConsensusSession *session;
3022
3023   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
3024
3025   session = get_session_by_client (client);
3026   if (NULL != session)
3027   {
3028     GNUNET_break (0);
3029     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
3030     return;
3031   }
3032   session = GNUNET_new (struct ConsensusSession);
3033   session->client = client;
3034   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
3035   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
3036   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
3037   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3038
3039   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
3040 }
3041
3042
3043 static void
3044 client_insert_done (void *cls)
3045 {
3046   // FIXME: implement
3047 }
3048
3049
3050 /**
3051  * Called when a client performs an insert operation.
3052  *
3053  * @param cls (unused)
3054  * @param client client handle
3055  * @param m message sent by the client
3056  */
3057 void
3058 client_insert (void *cls,
3059                struct GNUNET_SERVER_Client *client,
3060                const struct GNUNET_MessageHeader *m)
3061 {
3062   struct ConsensusSession *session;
3063   struct GNUNET_CONSENSUS_ElementMessage *msg;
3064   struct GNUNET_SET_Element *element;
3065   ssize_t element_size;
3066   struct GNUNET_SET_Handle *initial_set;
3067
3068   session = get_session_by_client (client);
3069
3070   if (NULL == session)
3071   {
3072     GNUNET_break (0);
3073     GNUNET_SERVER_client_disconnect (client);
3074     return;
3075   }
3076
3077   if (GNUNET_YES == session->conclude_started)
3078   {
3079     GNUNET_break (0);
3080     GNUNET_SERVER_client_disconnect (client);
3081     return;
3082   }
3083
3084   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
3085   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3086   if (element_size < 0)
3087   {
3088     GNUNET_break (0);
3089     return;
3090   }
3091
3092   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
3093   element->element_type = msg->element_type;
3094   element->size = element_size;
3095   memcpy (&element[1], &msg[1], element_size);
3096   element->data = &element[1];
3097   {
3098     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3099     struct SetEntry *entry;
3100     entry = lookup_set (session, &key);
3101     GNUNET_assert (NULL != entry);
3102     initial_set = entry->h;
3103   }
3104   session->num_client_insert_pending++;
3105   GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
3106
3107 #ifdef GNUNET_EXTRA_LOGGING
3108   {
3109     struct GNUNET_HashCode hash;
3110
3111     GNUNET_SET_element_hash (element, &hash);
3112
3113     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
3114                 session->local_peer_idx,
3115                 GNUNET_h2s (&hash));
3116   }
3117 #endif
3118
3119   GNUNET_free (element);
3120   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3121 }
3122
3123
3124 /**
3125  * Called when a client performs the conclude operation.
3126  *
3127  * @param cls (unused)
3128  * @param client client handle
3129  * @param message message sent by the client
3130  */
3131 static void
3132 client_conclude (void *cls,
3133                  struct GNUNET_SERVER_Client *client,
3134                  const struct GNUNET_MessageHeader *message)
3135 {
3136   struct ConsensusSession *session;
3137
3138   session = get_session_by_client (client);
3139   if (NULL == session)
3140   {
3141     /* client not found */
3142     GNUNET_break (0);
3143     GNUNET_SERVER_client_disconnect (client);
3144     return;
3145   }
3146
3147   if (GNUNET_YES == session->conclude_started)
3148   {
3149     /* conclude started twice */
3150     GNUNET_break (0);
3151     GNUNET_SERVER_client_disconnect (client);
3152     destroy_session (session);
3153     return;
3154   }
3155
3156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
3157
3158   session->conclude_started = GNUNET_YES;
3159
3160   install_step_timeouts (session);
3161   run_ready_steps (session);
3162
3163
3164   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3165 }
3166
3167
3168 /**
3169  * Called to clean up, after a shutdown has been requested.
3170  *
3171  * @param cls closure
3172  * @param tc context information (why was this task triggered now)
3173  */
3174 static void
3175 shutdown_task (void *cls,
3176                const struct GNUNET_SCHEDULER_TaskContext *tc)
3177 {
3178   while (NULL != sessions_head)
3179     destroy_session (sessions_head);
3180
3181   GNUNET_STATISTICS_destroy (statistics, GNUNET_YES);
3182   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
3183 }
3184
3185
3186 /**
3187  * Clean up after a client after it is
3188  * disconnected (either by us or by itself)
3189  *
3190  * @param cls closure, unused
3191  * @param client the client to clean up after
3192  */
3193 void
3194 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
3195 {
3196   struct ConsensusSession *session;
3197
3198   session = get_session_by_client (client);
3199   if (NULL == session)
3200     return;
3201   // FIXME: destroy if we can
3202 }
3203
3204
3205
3206 /**
3207  * Start processing consensus requests.
3208  *
3209  * @param cls closure
3210  * @param server the initialized server
3211  * @param c configuration to use
3212  */
3213 static void
3214 run (void *cls, struct GNUNET_SERVER_Handle *server,
3215      const struct GNUNET_CONFIGURATION_Handle *c)
3216 {
3217   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
3218     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3219         sizeof (struct GNUNET_MessageHeader)},
3220     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
3221     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
3222     {NULL, NULL, 0, 0}
3223   };
3224
3225   cfg = c;
3226   srv = server;
3227   if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
3228   {
3229     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
3230     GNUNET_break (0);
3231     GNUNET_SCHEDULER_shutdown ();
3232     return;
3233   }
3234   statistics = GNUNET_STATISTICS_create ("consensus", cfg);
3235   GNUNET_SERVER_add_handlers (server, server_handlers);
3236   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
3237   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
3238   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
3239 }
3240
3241
3242 /**
3243  * The main function for the consensus service.
3244  *
3245  * @param argc number of arguments from the command line
3246  * @param argv command line arguments
3247  * @return 0 ok, 1 on error
3248  */
3249 int
3250 main (int argc, char *const *argv)
3251 {
3252   int ret;
3253   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
3254   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
3255   return (GNUNET_OK == ret) ? 0 : 1;
3256 }