- minor changes, TODO, debug message
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
36
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
38
39
40 enum ReferendumVote
41 {
42   /**
43    * Vote that nothing should change.
44    * This option is never voted explicitly.
45    */
46   VOTE_STAY = 0,
47   /**
48    * Vote that an element should be added.
49    */
50   VOTE_ADD = 1,
51   /**
52    * Vote that an element should be removed.
53    */
54   VOTE_REMOVE = 2,
55 };
56
57
58 enum EarlyStoppingPhase
59 {
60   EARLY_STOPPING_NONE = 0,
61   EARLY_STOPPING_ONE_MORE = 1,
62   EARLY_STOPPING_DONE = 2,
63 };
64
65
66 GNUNET_NETWORK_STRUCT_BEGIN
67
68
69 struct ContestedPayload
70 {
71 };
72
73 /**
74  * Tuple of integers that together
75  * identify a task uniquely.
76  */
77 struct TaskKey {
78   /**
79    * A value from 'enum PhaseKind'.
80    */
81   uint16_t kind GNUNET_PACKED;
82
83   /**
84    * Number of the first peer
85    * in canonical order.
86    */
87   int16_t peer1 GNUNET_PACKED;
88
89   /**
90    * Number of the second peer in canonical order.
91    */
92   int16_t peer2 GNUNET_PACKED;
93
94   /**
95    * Repetition of the gradecast phase.
96    */
97   int16_t repetition GNUNET_PACKED;
98
99   /**
100    * Leader in the gradecast phase.
101    *
102    * Can be different from both peer1 and peer2.
103    */
104   int16_t leader GNUNET_PACKED;
105 };
106
107
108
109 struct SetKey
110 {
111   int set_kind GNUNET_PACKED;
112   int k1 GNUNET_PACKED;
113   int k2 GNUNET_PACKED;
114 };
115
116
117 struct SetEntry
118 {
119   struct SetKey key;
120   struct GNUNET_SET_Handle *h;
121   /**
122    * GNUNET_YES if the set resulted
123    * from applying a referendum with contested
124    * elements.
125    */
126   int is_contested;
127 };
128
129
130 struct DiffKey
131 {
132   int diff_kind GNUNET_PACKED;
133   int k1 GNUNET_PACKED;
134   int k2 GNUNET_PACKED;
135 };
136
137 struct RfnKey
138 {
139   int rfn_kind GNUNET_PACKED;
140   int k1 GNUNET_PACKED;
141   int k2 GNUNET_PACKED;
142 };
143
144
145 GNUNET_NETWORK_STRUCT_END
146
147 enum PhaseKind
148 {
149   PHASE_KIND_ALL_TO_ALL,
150   PHASE_KIND_GRADECAST_LEADER,
151   PHASE_KIND_GRADECAST_ECHO,
152   PHASE_KIND_GRADECAST_ECHO_GRADE,
153   PHASE_KIND_GRADECAST_CONFIRM,
154   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
155   /**
156    * Apply a repetition of the all-to-all
157    * gradecast to the current set.
158    */
159   PHASE_KIND_APPLY_REP,
160   PHASE_KIND_FINISH,
161 };
162
163
164 enum SetKind
165 {
166   SET_KIND_NONE = 0,
167   SET_KIND_CURRENT,
168   /**
169    * Last result set from a gradecast
170    */
171   SET_KIND_LAST_GRADECAST,
172   SET_KIND_LEADER_PROPOSAL,
173   SET_KIND_ECHO_RESULT,
174 };
175
176 enum DiffKind
177 {
178   DIFF_KIND_NONE = 0,
179   DIFF_KIND_LEADER_PROPOSAL,
180   DIFF_KIND_LEADER_CONSENSUS,
181   DIFF_KIND_GRADECAST_RESULT,
182 };
183
184 enum RfnKind
185 {
186   RFN_KIND_NONE = 0,
187   RFN_KIND_ECHO,
188   RFN_KIND_CONFIRM,
189   RFN_KIND_GRADECAST_RESULT
190 };
191
192
193 struct SetOpCls
194 {
195   struct SetKey input_set;
196
197   struct SetKey output_set;
198   struct RfnKey output_rfn;
199   struct DiffKey output_diff;
200
201   int do_not_remove;
202
203   int transceive_contested;
204
205   struct GNUNET_SET_OperationHandle *op;
206 };
207
208
209 struct FinishCls
210 {
211   struct SetKey input_set;
212 };
213
214 /**
215  * Closure for both @a start_task
216  * and @a cancel_task.
217  */
218 union TaskFuncCls
219 {
220   struct SetOpCls setop;
221   struct FinishCls finish;
222 };
223
224 struct TaskEntry;
225
226 typedef void (*TaskFunc) (struct TaskEntry *task);
227
228 /*
229  * Node in the consensus task graph.
230  */
231 struct TaskEntry
232 {
233   struct TaskKey key;
234
235   struct Step *step;
236
237   int is_started;
238
239   int is_finished;
240
241   TaskFunc start;
242   TaskFunc cancel;
243
244   union TaskFuncCls cls;
245 };
246
247
248 struct Step
249 {
250   /**
251    * All steps of one session are in a
252    * linked list for easier deallocation.
253    */
254   struct Step *prev;
255
256   /**
257    * All steps of one session are in a
258    * linked list for easier deallocation.
259    */
260   struct Step *next;
261
262   struct ConsensusSession *session;
263
264   /**
265    * Tasks that this step is composed of.
266    */
267   struct TaskEntry **tasks;
268   unsigned int tasks_len;
269   unsigned int tasks_cap;
270
271   unsigned int finished_tasks;
272
273   /*
274    * Tasks that have this task as dependency.
275    *
276    * We store pointers to subordinates rather
277    * than to prerequisites since it makes
278    * tracking the readiness of a task easier.
279    */
280   struct Step **subordinates;
281   unsigned int subordinates_len;
282   unsigned int subordinates_cap;
283
284   /**
285    * Counter for the prerequisites of
286    * this step.
287    */
288   size_t pending_prereq;
289
290   /*
291    * Task that will run this step despite
292    * any pending prerequisites.
293    */
294   struct GNUNET_SCHEDULER_Task *timeout_task;
295
296   unsigned int is_running;
297
298   unsigned int is_finished;
299
300   /*
301    * Synchrony round of the task.
302    * Determines the deadline for the task.
303    */
304   unsigned int round;
305
306   /**
307    * Human-readable name for
308    * the task, used for debugging.
309    */
310   char *debug_name;
311
312   /**
313    * When we're doing an early finish, how should this step be
314    * treated?
315    * If GNUNET_YES, the step will be marked as finished
316    * without actually running its tasks.
317    * Otherwise, the step will still be run even after
318    * an early finish.
319    *
320    * Note that a task may never be finished early if
321    * it is already running.
322    */
323   int early_finishable;
324 };
325
326
327 struct RfnElementInfo
328 {
329   const struct GNUNET_SET_Element *element;
330
331   /*
332    * GNUNET_YES if the peer votes for the proposal.
333    */
334   int *votes;
335
336   /**
337    * Proposal for this element,
338    * can only be VOTE_ADD or VOTE_REMOVE.
339    */
340   enum ReferendumVote proposal;
341 };
342
343
344 struct ReferendumEntry
345 {
346   struct RfnKey key;
347
348   /*
349    * Elements where there is at least one proposed change.
350    *
351    * Maps the hash of the GNUNET_SET_Element
352    * to 'struct RfnElementInfo'.
353    */
354   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
355
356   unsigned int num_peers;
357
358   /**
359    * Stores, for every peer in the session,
360    * whether the peer finished the whole referendum.
361    *
362    * Votes from peers are only counted if they're
363    * marked as commited (#GNUNET_YES) in the referendum.
364    *
365    * Otherwise (#GNUNET_NO), the requested changes are
366    * not counted for majority votes or thresholds.
367    */
368   int *peer_commited;
369
370
371   /**
372    * Contestation state of the peer.  If a peer is contested, the values it
373    * contributed are still counted for applying changes, but the grading is
374    * affected.
375    */
376   int *peer_contested;
377 };
378
379
380 struct DiffElementInfo
381 {
382   const struct GNUNET_SET_Element *element;
383
384   /**
385    * Positive weight for 'add', negative
386    * weights for 'remove'.
387    */
388   int weight;
389 };
390
391
392 /**
393  * Weighted diff.
394  */
395 struct DiffEntry
396 {
397   struct DiffKey key;
398   struct GNUNET_CONTAINER_MultiHashMap *changes;
399 };
400
401
402
403 /**
404  * A consensus session consists of one local client and the remote authorities.
405  */
406 struct ConsensusSession
407 {
408   /**
409    * Consensus sessions are kept in a DLL.
410    */
411   struct ConsensusSession *next;
412
413   /**
414    * Consensus sessions are kept in a DLL.
415    */
416   struct ConsensusSession *prev;
417
418   unsigned int num_client_insert_pending;
419
420   struct GNUNET_CONTAINER_MultiHashMap *setmap;
421   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
422   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
423
424   /**
425    * Array of peers with length 'num_peers'.
426    */
427   int *peers_blacklisted;
428
429   /*
430    * Mapping from (hashed) TaskKey to TaskEntry.
431    *
432    * We map the application_id for a round to the task that should be
433    * executed, so we don't have to go through all task whenever we get
434    * an incoming set op request.
435    */
436   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
437
438   struct Step *steps_head;
439   struct Step *steps_tail;
440
441   int conclude_started;
442
443   int conclude_done;
444
445   /**
446   * Global consensus identification, computed
447   * from the session id and participating authorities.
448   */
449   struct GNUNET_HashCode global_id;
450
451   /**
452    * Client that inhabits the session
453    */
454   struct GNUNET_SERVER_Client *client;
455
456   /**
457    * Queued messages to the client.
458    */
459   struct GNUNET_MQ_Handle *client_mq;
460
461   /**
462    * Time when the conclusion of the consensus should begin.
463    */
464   struct GNUNET_TIME_Absolute conclude_start;
465
466   /**
467    * Timeout for all rounds together, single rounds will schedule a timeout task
468    * with a fraction of the conclude timeout.
469    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
470    */
471   struct GNUNET_TIME_Absolute conclude_deadline;
472
473   struct GNUNET_PeerIdentity *peers;
474
475   /**
476    * Number of other peers in the consensus.
477    */
478   unsigned int num_peers;
479
480   /**
481    * Index of the local peer in the peers array
482    */
483   unsigned int local_peer_idx;
484
485   /**
486    * Listener for requests from other peers.
487    * Uses the session's global id as app id.
488    */
489   struct GNUNET_SET_ListenHandle *set_listener;
490
491   /**
492    * State of our early stopping scheme.
493    */
494   int early_stopping;
495 };
496
497 /**
498  * Linked list of sessions this peer participates in.
499  */
500 static struct ConsensusSession *sessions_head;
501
502 /**
503  * Linked list of sessions this peer participates in.
504  */
505 static struct ConsensusSession *sessions_tail;
506
507 /**
508  * Configuration of the consensus service.
509  */
510 static const struct GNUNET_CONFIGURATION_Handle *cfg;
511
512 /**
513  * Handle to the server for this service.
514  */
515 static struct GNUNET_SERVER_Handle *srv;
516
517 /**
518  * Peer that runs this service.
519  */
520 static struct GNUNET_PeerIdentity my_peer;
521
522 /**
523  * Statistics handle.
524  */
525 struct GNUNET_STATISTICS_Handle *statistics;
526
527
528 static void
529 finish_task (struct TaskEntry *task);
530
531 static void
532 run_ready_steps (struct ConsensusSession *session);
533
534 static const char *
535 phasename (uint16_t phase)
536 {
537   switch (phase)
538   {
539     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
540     case PHASE_KIND_FINISH: return "FINISH";
541     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
542     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
543     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
544     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
545     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
546     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
547     default: return "(unknown)";
548   }
549 }
550
551
552 static const char *
553 setname (uint16_t kind)
554 {
555   switch (kind)
556   {
557     case SET_KIND_CURRENT: return "CURRENT";
558     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
559     case SET_KIND_NONE: return "NONE";
560     default: return "(unknown)";
561   }
562 }
563
564 static const char *
565 rfnname (uint16_t kind)
566 {
567   switch (kind)
568   {
569     case RFN_KIND_NONE: return "NONE";
570     case RFN_KIND_ECHO: return "ECHO";
571     case RFN_KIND_CONFIRM: return "CONFIRM";
572     default: return "(unknown)";
573   }
574 }
575
576 static const char *
577 diffname (uint16_t kind)
578 {
579   switch (kind)
580   {
581     case DIFF_KIND_NONE: return "NONE";
582     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
583     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
584     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
585     default: return "(unknown)";
586   }
587 }
588
589 #ifdef GNUNET_EXTRA_LOGGING
590
591
592 static const char *
593 debug_str_element (const struct GNUNET_SET_Element *el)
594 {
595   struct GNUNET_HashCode hash;
596
597   GNUNET_SET_element_hash (el, &hash);
598
599   return GNUNET_h2s (&hash);
600 }
601
602 static const char *
603 debug_str_task_key (struct TaskKey *tk)
604 {
605   static char buf[256];
606
607   snprintf (buf, sizeof (buf),
608             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
609             phasename (tk->kind), tk->peer1, tk->peer2,
610             tk->leader, tk->repetition);
611
612   return buf;
613 }
614
615 static const char *
616 debug_str_diff_key (struct DiffKey *dk)
617 {
618   static char buf[256];
619
620   snprintf (buf, sizeof (buf),
621             "DiffKey kind=%s, k1=%d, k2=%d",
622             diffname (dk->diff_kind), dk->k1, dk->k2);
623
624   return buf;
625 }
626
627 static const char *
628 debug_str_set_key (const struct SetKey *sk)
629 {
630   static char buf[256];
631
632   snprintf (buf, sizeof (buf),
633             "SetKey kind=%s, k1=%d, k2=%d",
634             setname (sk->set_kind), sk->k1, sk->k2);
635
636   return buf;
637 }
638
639
640 static const char *
641 debug_str_rfn_key (const struct RfnKey *rk)
642 {
643   static char buf[256];
644
645   snprintf (buf, sizeof (buf),
646             "RfnKey kind=%s, k1=%d, k2=%d",
647             rfnname (rk->rfn_kind), rk->k1, rk->k2);
648
649   return buf;
650 }
651
652 #endif /* GNUNET_EXTRA_LOGGING */
653
654
655 /**
656  * Destroy a session, free all resources associated with it.
657  *
658  * @param session the session to destroy
659  */
660 static void
661 destroy_session (struct ConsensusSession *session)
662 {
663   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
664   if (NULL != session->set_listener)
665   {
666     GNUNET_SET_listen_cancel (session->set_listener);
667     session->set_listener = NULL;
668   }
669   if (NULL != session->client_mq)
670   {
671     GNUNET_MQ_destroy (session->client_mq);
672     session->client_mq = NULL;
673     /* The MQ cleanup will also disconnect the underlying client. */
674     session->client = NULL;
675   }
676   if (NULL != session->client)
677   {
678     GNUNET_SERVER_client_disconnect (session->client);
679     session->client = NULL;
680   }
681   GNUNET_free (session);
682 }
683
684
685 /**
686  * Send the final result set of the consensus to the client, element by
687  * element.
688  *
689  * @param cls closure
690  * @param element the current element, NULL if all elements have been
691  *        iterated over
692  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
693  */
694 static int
695 send_to_client_iter (void *cls,
696                      const struct GNUNET_SET_Element *element)
697 {
698   struct TaskEntry *task = (struct TaskEntry *) cls;
699   struct ConsensusSession *session = task->step->session;
700   struct GNUNET_MQ_Envelope *ev;
701
702   if (NULL != element)
703   {
704     struct GNUNET_CONSENSUS_ElementMessage *m;
705
706     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707                 "P%d: sending element %s to client\n",
708                 session->local_peer_idx,
709                 debug_str_element (element));
710
711     ev = GNUNET_MQ_msg_extra (m, element->size,
712                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
713     m->element_type = htons (element->element_type);
714     GNUNET_memcpy (&m[1], element->data, element->size);
715     GNUNET_MQ_send (session->client_mq, ev);
716   }
717   else
718   {
719     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720                 "P%d: finished iterating elements for client\n",
721                 session->local_peer_idx);
722     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
723     GNUNET_MQ_send (session->client_mq, ev);
724   }
725   return GNUNET_YES;
726 }
727
728
729 static struct SetEntry *
730 lookup_set (struct ConsensusSession *session, struct SetKey *key)
731 {
732   struct GNUNET_HashCode hash;
733
734   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735               "P%u: looking up set {%s}\n",
736               session->local_peer_idx,
737               debug_str_set_key (key));
738
739   GNUNET_assert (SET_KIND_NONE != key->set_kind);
740   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
741   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
742 }
743
744
745 static struct DiffEntry *
746 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
747 {
748   struct GNUNET_HashCode hash;
749
750   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751               "P%u: looking up diff {%s}\n",
752               session->local_peer_idx,
753               debug_str_diff_key (key));
754
755   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
756   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
757   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
758 }
759
760
761 static struct ReferendumEntry *
762 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
763 {
764   struct GNUNET_HashCode hash;
765
766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767               "P%u: looking up rfn {%s}\n",
768               session->local_peer_idx,
769               debug_str_rfn_key (key));
770
771   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
772   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
773   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
774 }
775
776
777 static void
778 diff_insert (struct DiffEntry *diff,
779              int weight,
780              const struct GNUNET_SET_Element *element)
781 {
782   struct DiffElementInfo *di;
783   struct GNUNET_HashCode hash;
784
785   GNUNET_assert ( (1 == weight) || (-1 == weight));
786
787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788               "diff_insert with element size %u\n",
789               element->size);
790
791   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792               "hashing element\n");
793
794   GNUNET_SET_element_hash (element, &hash);
795
796   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
797               "hashed element\n");
798
799   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
800
801   if (NULL == di)
802   {
803     di = GNUNET_new (struct DiffElementInfo);
804     di->element = GNUNET_SET_element_dup (element);
805     GNUNET_assert (GNUNET_OK ==
806                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
807                                                       &hash, di,
808                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
809   }
810
811   di->weight = weight;
812 }
813
814
815 static void
816 rfn_commit (struct ReferendumEntry *rfn,
817             uint16_t commit_peer)
818 {
819   GNUNET_assert (commit_peer < rfn->num_peers);
820
821   rfn->peer_commited[commit_peer] = GNUNET_YES;
822 }
823
824
825 static void
826 rfn_contest (struct ReferendumEntry *rfn,
827              uint16_t contested_peer)
828 {
829   GNUNET_assert (contested_peer < rfn->num_peers);
830
831   rfn->peer_contested[contested_peer] = GNUNET_YES;
832 }
833
834
835 static uint16_t
836 rfn_noncontested (struct ReferendumEntry *rfn)
837 {
838   uint16_t i;
839   uint16_t ret;
840
841   ret = 0;
842   for (i = 0; i < rfn->num_peers; i++)
843     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
844       ret++;
845
846   return ret;
847 }
848
849
850 static void
851 rfn_vote (struct ReferendumEntry *rfn,
852           uint16_t voting_peer,
853           enum ReferendumVote vote,
854           const struct GNUNET_SET_Element *element)
855 {
856   struct RfnElementInfo *ri;
857   struct GNUNET_HashCode hash;
858
859   GNUNET_assert (voting_peer < rfn->num_peers);
860
861   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
862      since VOTE_KEEP is implicit in not voting. */
863   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
864
865   GNUNET_SET_element_hash (element, &hash);
866   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
867
868   if (NULL == ri)
869   {
870     ri = GNUNET_new (struct RfnElementInfo);
871     ri->element = GNUNET_SET_element_dup (element);
872     ri->votes = GNUNET_new_array (rfn->num_peers, int);
873     GNUNET_assert (GNUNET_OK ==
874                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
875                                                       &hash, ri,
876                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
877   }
878
879   ri->votes[voting_peer] = GNUNET_YES;
880   ri->proposal = vote;
881 }
882
883
884 uint16_t
885 task_other_peer (struct TaskEntry *task)
886 {
887   uint16_t me = task->step->session->local_peer_idx;
888   if (task->key.peer1 == me)
889     return task->key.peer2;
890   return task->key.peer1;
891 }
892
893
894 /**
895  * Callback for set operation results. Called for each element
896  * in the result set.
897  *
898  * @param cls closure
899  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
900  * @param status see enum GNUNET_SET_Status
901  */
902 static void
903 set_result_cb (void *cls,
904                const struct GNUNET_SET_Element *element,
905                enum GNUNET_SET_Status status)
906 {
907   struct TaskEntry *task = cls;
908   struct ConsensusSession *session = task->step->session;
909   struct SetEntry *output_set = NULL;
910   struct DiffEntry *output_diff = NULL;
911   struct ReferendumEntry *output_rfn = NULL;
912   unsigned int other_idx;
913   struct SetOpCls *setop;
914
915   setop = &task->cls.setop;
916
917
918   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919               "P%u: got set result for {%s}, status %u\n",
920               session->local_peer_idx,
921               debug_str_task_key (&task->key),
922               status);
923
924   if (GNUNET_NO == task->is_started)
925   {
926     GNUNET_break_op (0);
927     return;
928   }
929
930   if (GNUNET_YES == task->is_finished)
931   {
932     GNUNET_break_op (0);
933     return;
934   }
935
936   other_idx = task_other_peer (task);
937
938   if (SET_KIND_NONE != setop->output_set.set_kind)
939   {
940     output_set = lookup_set (session, &setop->output_set);
941     GNUNET_assert (NULL != output_set);
942   }
943
944   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
945   {
946     output_diff = lookup_diff (session, &setop->output_diff);
947     GNUNET_assert (NULL != output_diff);
948   }
949
950   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
951   {
952     output_rfn = lookup_rfn (session, &setop->output_rfn);
953     GNUNET_assert (NULL != output_rfn);
954   }
955
956   if (GNUNET_YES == session->peers_blacklisted[other_idx])
957   {
958     /* Peer might have been blacklisted
959        by a gradecast running in parallel, ignore elements from now */
960     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
961       return;
962     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
963       return;
964   }
965
966   if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
967   {
968     if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
969     {
970       GNUNET_assert (NULL != output_rfn);
971       rfn_contest (output_rfn, task_other_peer (task));
972       return;
973     }
974   }
975
976   switch (status)
977   {
978     case GNUNET_SET_STATUS_ADD_LOCAL:
979       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980                   "Adding element in Task {%s}\n",
981                   debug_str_task_key (&task->key));
982       if (NULL != output_set)
983       {
984         // FIXME: record pending adds, use callback
985         GNUNET_SET_add_element (output_set->h,
986                                 element,
987                                 NULL,
988                                 NULL);
989 #ifdef GNUNET_EXTRA_LOGGING
990         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
991                     "P%u: adding element %s into set {%s} of task {%s}\n",
992                     session->local_peer_idx,
993                     debug_str_element (element),
994                     debug_str_set_key (&setop->output_set),
995                     debug_str_task_key (&task->key));
996 #endif
997       }
998       if (NULL != output_diff)
999       {
1000         diff_insert (output_diff, 1, element);
1001 #ifdef GNUNET_EXTRA_LOGGING
1002         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1003                     "P%u: adding element %s into diff {%s} of task {%s}\n",
1004                     session->local_peer_idx,
1005                     debug_str_element (element),
1006                     debug_str_diff_key (&setop->output_diff),
1007                     debug_str_task_key (&task->key));
1008 #endif
1009       }
1010       if (NULL != output_rfn)
1011       {
1012         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1013 #ifdef GNUNET_EXTRA_LOGGING
1014         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1015                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
1016                     session->local_peer_idx,
1017                     debug_str_element (element),
1018                     debug_str_rfn_key (&setop->output_rfn),
1019                     debug_str_task_key (&task->key));
1020 #endif
1021       }
1022       // XXX: add result to structures in task
1023       break;
1024     case GNUNET_SET_STATUS_ADD_REMOTE:
1025       if (GNUNET_YES == setop->do_not_remove)
1026         break;
1027       if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
1028         break;
1029       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030                   "Removing element in Task {%s}\n",
1031                   debug_str_task_key (&task->key));
1032       if (NULL != output_set)
1033       {
1034         // FIXME: record pending adds, use callback
1035         GNUNET_SET_remove_element (output_set->h,
1036                                    element,
1037                                    NULL,
1038                                    NULL);
1039 #ifdef GNUNET_EXTRA_LOGGING
1040         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1041                     "P%u: removing element %s from set {%s} of task {%s}\n",
1042                     session->local_peer_idx,
1043                     debug_str_element (element),
1044                     debug_str_set_key (&setop->output_set),
1045                     debug_str_task_key (&task->key));
1046 #endif
1047       }
1048       if (NULL != output_diff)
1049       {
1050         diff_insert (output_diff, -1, element);
1051 #ifdef GNUNET_EXTRA_LOGGING
1052         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1053                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1054                     session->local_peer_idx,
1055                     debug_str_element (element),
1056                     debug_str_diff_key (&setop->output_diff),
1057                     debug_str_task_key (&task->key));
1058 #endif
1059       }
1060       if (NULL != output_rfn)
1061       {
1062         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1063 #ifdef GNUNET_EXTRA_LOGGING
1064         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1065                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1066                     session->local_peer_idx,
1067                     debug_str_element (element),
1068                     debug_str_rfn_key (&setop->output_rfn),
1069                     debug_str_task_key (&task->key));
1070 #endif
1071       }
1072       break;
1073     case GNUNET_SET_STATUS_DONE:
1074       // XXX: check first if any changes to the underlying
1075       // set are still pending
1076       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077                   "Finishing setop in Task {%s}\n",
1078                   debug_str_task_key (&task->key));
1079       if (NULL != output_rfn)
1080       {
1081         rfn_commit (output_rfn, task_other_peer (task));
1082       }
1083       finish_task (task);
1084       break;
1085     case GNUNET_SET_STATUS_FAILURE:
1086       // XXX: cleanup
1087       GNUNET_break_op (0);
1088       finish_task (task);
1089       return;
1090     default:
1091       /* not reached */
1092       GNUNET_assert (0);
1093   }
1094 }
1095
1096 #ifdef EVIL
1097
1098 enum EvilnessType
1099 {
1100   EVILNESS_NONE,
1101   EVILNESS_CRAM_ALL,
1102   EVILNESS_CRAM_LEAD,
1103   EVILNESS_CRAM_ECHO,
1104   EVILNESS_SLACK,
1105 };
1106
1107 enum EvilnessSubType
1108 {
1109   EVILNESS_SUB_NONE,
1110   EVILNESS_SUB_REPLACEMENT,
1111   EVILNESS_SUB_NO_REPLACEMENT,
1112 };
1113
1114 struct Evilness
1115 {
1116   enum EvilnessType type;
1117   enum EvilnessSubType subtype;
1118   unsigned int num;
1119 };
1120
1121
1122 static int
1123 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1124 {
1125   if (0 == strcmp ("replace", evil_subtype_str))
1126   {
1127     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1128   }
1129   else if (0 == strcmp ("noreplace", evil_subtype_str))
1130   {
1131     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1132   }
1133   else
1134   {
1135     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1136                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1137                 evil_subtype_str);
1138     return GNUNET_SYSERR;
1139   }
1140   return GNUNET_OK;
1141 }
1142
1143
1144 static void
1145 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1146 {
1147   char *evil_spec;
1148   char *field;
1149   char *evil_type_str = NULL;
1150   char *evil_subtype_str = NULL;
1151
1152   GNUNET_assert (NULL != evil);
1153
1154   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1155   {
1156     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157                 "P%u: no evilness\n",
1158                 session->local_peer_idx);
1159     evil->type = EVILNESS_NONE;
1160     return;
1161   }
1162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163               "P%u: got evilness spec\n",
1164               session->local_peer_idx);
1165
1166   for (field = strtok (evil_spec, "/");
1167        NULL != field;
1168        field = strtok (NULL, "/"))
1169   {
1170     unsigned int peer_num;
1171     unsigned int evil_num;
1172     int ret;
1173
1174     evil_type_str = NULL;
1175     evil_subtype_str = NULL;
1176
1177     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1178
1179     if (ret != 4)
1180     {
1181       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1182                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1183                   field,
1184                   ret);
1185       goto not_evil;
1186     }
1187
1188     GNUNET_assert (NULL != evil_type_str);
1189     GNUNET_assert (NULL != evil_subtype_str);
1190
1191     if (peer_num == session->local_peer_idx)
1192     {
1193       if (0 == strcmp ("slack", evil_type_str))
1194       {
1195         evil->type = EVILNESS_SLACK;
1196       }
1197       else if (0 == strcmp ("cram-all", evil_type_str))
1198       {
1199         evil->type = EVILNESS_CRAM_ALL;
1200         evil->num = evil_num;
1201         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1202           goto not_evil;
1203       }
1204       else if (0 == strcmp ("cram-lead", evil_type_str))
1205       {
1206         evil->type = EVILNESS_CRAM_LEAD;
1207         evil->num = evil_num;
1208         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1209           goto not_evil;
1210       }
1211       else if (0 == strcmp ("cram-echo", evil_type_str))
1212       {
1213         evil->type = EVILNESS_CRAM_ECHO;
1214         evil->num = evil_num;
1215         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1216           goto not_evil;
1217       }
1218       else
1219       {
1220         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1221                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1222                     evil_type_str);
1223         goto not_evil;
1224       }
1225       goto cleanup;
1226     }
1227     /* No GNUNET_free since memory was allocated by libc */
1228     free (evil_type_str);
1229     evil_type_str = NULL;
1230     evil_subtype_str = NULL;
1231   }
1232 not_evil:
1233   evil->type = EVILNESS_NONE;
1234 cleanup:
1235   GNUNET_free (evil_spec);
1236   /* no GNUNET_free_non_null since it wasn't
1237    * allocated with GNUNET_malloc */
1238   if (NULL != evil_type_str)
1239     free (evil_type_str);
1240   if (NULL != evil_subtype_str)
1241     free (evil_subtype_str);
1242 }
1243
1244 #endif
1245
1246
1247 /**
1248  * Commit the appropriate set for a
1249  * task.
1250  */
1251 static void
1252 commit_set (struct ConsensusSession *session,
1253             struct TaskEntry *task)
1254 {
1255   struct SetEntry *set;
1256   struct SetOpCls *setop = &task->cls.setop;
1257
1258   GNUNET_assert (NULL != setop->op);
1259   set = lookup_set (session, &setop->input_set);
1260   GNUNET_assert (NULL != set);
1261
1262 #ifdef EVIL
1263   {
1264     unsigned int i;
1265     struct Evilness evil;
1266
1267     get_evilness (session, &evil);
1268     if (EVILNESS_NONE != evil.type)
1269     {
1270       /* Useful for evaluation */
1271       GNUNET_STATISTICS_set (statistics,
1272                              "is evil",
1273                              1,
1274                              GNUNET_NO);
1275     }
1276     switch (evil.type)
1277     {
1278       case EVILNESS_CRAM_ALL:
1279       case EVILNESS_CRAM_LEAD:
1280       case EVILNESS_CRAM_ECHO:
1281         /* We're not cramming elements in the
1282            all-to-all round, since that would just
1283            add more elements to the result set, but
1284            wouldn't test robustness. */
1285         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1286         {
1287           GNUNET_SET_commit (setop->op, set->h);
1288           break;
1289         }
1290         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1291             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1292         {
1293           GNUNET_SET_commit (setop->op, set->h);
1294           break;
1295         }
1296         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1297         {
1298           GNUNET_SET_commit (setop->op, set->h);
1299           break;
1300         }
1301         for (i = 0; i < evil.num; i++)
1302         {
1303           struct GNUNET_HashCode hash;
1304           struct GNUNET_SET_Element element;
1305           element.data = &hash;
1306           element.size = sizeof (struct GNUNET_HashCode);
1307           element.element_type = 0;
1308
1309           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1310           {
1311             /* Always generate a new element. */
1312             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
1313           }
1314           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1315           {
1316             /* Always cram the same elements, derived from counter. */
1317             GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
1318           }
1319           else
1320           {
1321             GNUNET_assert (0);
1322           }
1323           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1324 #ifdef GNUNET_EXTRA_LOGGING
1325           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1326                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1327                       session->local_peer_idx,
1328                       debug_str_element (&element),
1329                       debug_str_set_key (&setop->input_set),
1330                       debug_str_task_key (&task->key));
1331 #endif
1332         }
1333         GNUNET_STATISTICS_update (statistics,
1334                                   "# stuffed elements",
1335                                   evil.num,
1336                                   GNUNET_NO);
1337         GNUNET_SET_commit (setop->op, set->h);
1338         break;
1339       case EVILNESS_SLACK:
1340         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1341                     "P%u: evil peer: slacking\n",
1342                     (unsigned int) session->local_peer_idx);
1343         /* Do nothing. */
1344         break;
1345       case EVILNESS_NONE:
1346         GNUNET_SET_commit (setop->op, set->h);
1347         break;
1348     }
1349   }
1350 #else
1351   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1352   {
1353     struct GNUNET_SET_Element element;
1354     struct ContestedPayload payload;
1355     element.data = &payload;
1356     element.size = sizeof (struct ContestedPayload);
1357     element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1358     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1359   }
1360   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1361   {
1362     GNUNET_SET_commit (setop->op, set->h);
1363   }
1364   else
1365   {
1366     /* For our testcases, we don't want the blacklisted
1367        peers to wait. */
1368     GNUNET_SET_operation_cancel (setop->op);
1369     setop->op = NULL;
1370   }
1371 #endif
1372 }
1373
1374
1375 static void
1376 put_diff (struct ConsensusSession *session,
1377          struct DiffEntry *diff)
1378 {
1379   struct GNUNET_HashCode hash;
1380
1381   GNUNET_assert (NULL != diff);
1382
1383   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1384   GNUNET_assert (GNUNET_OK ==
1385                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1386                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1387 }
1388
1389 static void
1390 put_set (struct ConsensusSession *session,
1391          struct SetEntry *set)
1392 {
1393   struct GNUNET_HashCode hash;
1394
1395   GNUNET_assert (NULL != set->h);
1396
1397   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1398               "Putting set %s\n",
1399               debug_str_set_key (&set->key));
1400
1401   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1402   GNUNET_assert (GNUNET_SYSERR !=
1403                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1404                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1405 }
1406
1407
1408 static void
1409 put_rfn (struct ConsensusSession *session,
1410          struct ReferendumEntry *rfn)
1411 {
1412   struct GNUNET_HashCode hash;
1413
1414   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1415   GNUNET_assert (GNUNET_OK ==
1416                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1417                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1418 }
1419
1420
1421
1422 static void
1423 task_cancel_reconcile (struct TaskEntry *task)
1424 {
1425   /* not implemented yet */
1426   GNUNET_assert (0);
1427 }
1428
1429
1430 static void
1431 apply_diff_to_rfn (struct DiffEntry *diff,
1432                    struct ReferendumEntry *rfn,
1433                    uint16_t voting_peer,
1434                    uint16_t num_peers)
1435 {
1436   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1437   struct DiffElementInfo *di;
1438
1439   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1440
1441   while (GNUNET_YES ==
1442          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1443                                                       NULL,
1444                                                       (const void **) &di))
1445   {
1446     if (di->weight > 0)
1447     {
1448       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1449     }
1450     if (di->weight < 0)
1451     {
1452       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1453     }
1454   }
1455
1456   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1457 }
1458
1459
1460 struct DiffEntry *
1461 diff_create ()
1462 {
1463   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1464
1465   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1466
1467   return d;
1468 }
1469
1470
1471 struct DiffEntry *
1472 diff_compose (struct DiffEntry *diff_1,
1473               struct DiffEntry *diff_2)
1474 {
1475   struct DiffEntry *diff_new;
1476   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1477   struct DiffElementInfo *di;
1478
1479   diff_new = diff_create ();
1480
1481   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1482   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1483   {
1484     diff_insert (diff_new, di->weight, di->element);
1485   }
1486   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1487
1488   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1489   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1490   {
1491     diff_insert (diff_new, di->weight, di->element);
1492   }
1493   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1494
1495   return diff_new;
1496 }
1497
1498
1499 struct ReferendumEntry *
1500 rfn_create (uint16_t size)
1501 {
1502   struct ReferendumEntry *rfn;
1503
1504   rfn = GNUNET_new (struct ReferendumEntry);
1505   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1506   rfn->peer_commited = GNUNET_new_array (size, int);
1507   rfn->peer_contested = GNUNET_new_array (size, int);
1508   rfn->num_peers = size;
1509
1510   return rfn;
1511 }
1512
1513
1514 void
1515 diff_destroy (struct DiffEntry *diff)
1516 {
1517   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1518   GNUNET_free (diff);
1519 }
1520
1521
1522 /**
1523  * For a given majority, count what the outcome
1524  * is (add/remove/keep), and give the number
1525  * of peers that voted for this outcome.
1526  */
1527 static void
1528 rfn_majority (const struct ReferendumEntry *rfn,
1529               const struct RfnElementInfo *ri,
1530               uint16_t *ret_majority,
1531               enum ReferendumVote *ret_vote)
1532 {
1533   uint16_t votes_yes = 0;
1534   uint16_t num_commited = 0;
1535   uint16_t i;
1536
1537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1538               "Computing rfn majority for element %s of rfn {%s}\n",
1539               debug_str_element (ri->element),
1540               debug_str_rfn_key (&rfn->key));
1541
1542   for (i = 0; i < rfn->num_peers; i++)
1543   {
1544     if (GNUNET_NO == rfn->peer_commited[i])
1545       continue;
1546     num_commited++;
1547
1548     if (GNUNET_YES == ri->votes[i])
1549       votes_yes++;
1550   }
1551
1552   if (votes_yes > (num_commited) / 2)
1553   {
1554     *ret_vote = ri->proposal;
1555     *ret_majority = votes_yes;
1556   }
1557   else
1558   {
1559     *ret_vote = VOTE_STAY;
1560     *ret_majority = num_commited - votes_yes;
1561   }
1562 }
1563
1564
1565 struct SetCopyCls
1566 {
1567   struct TaskEntry *task;
1568   struct SetKey dst_set_key;
1569 };
1570
1571
1572 static void
1573 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1574 {
1575   struct SetCopyCls *scc = cls;
1576   struct TaskEntry *task = scc->task;
1577   struct SetKey dst_set_key = scc->dst_set_key;
1578   struct SetEntry *set;
1579
1580   GNUNET_free (scc);
1581   set = GNUNET_new (struct SetEntry);
1582   set->h = copy;
1583   set->key = dst_set_key;
1584   put_set (task->step->session, set);
1585
1586   task->start (task);
1587 }
1588
1589
1590 /**
1591  * Call the start function of the given
1592  * task again after we created a copy of the given set.
1593  */
1594 static void
1595 create_set_copy_for_task (struct TaskEntry *task,
1596                           struct SetKey *src_set_key,
1597                           struct SetKey *dst_set_key)
1598 {
1599   struct SetEntry *src_set;
1600   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1601
1602   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603               "Copying set {%s} to {%s} for task {%s}\n",
1604               debug_str_set_key (src_set_key),
1605               debug_str_set_key (dst_set_key),
1606               debug_str_task_key (&task->key));
1607
1608   scc->task = task;
1609   scc->dst_set_key = *dst_set_key;
1610   src_set = lookup_set (task->step->session, src_set_key);
1611   GNUNET_assert (NULL != src_set);
1612   GNUNET_SET_copy_lazy (src_set->h,
1613                         set_copy_cb,
1614                         scc);
1615 }
1616
1617
1618 struct SetMutationProgressCls
1619 {
1620   int num_pending;
1621   /**
1622    * Task to finish once all changes are through.
1623    */
1624   struct TaskEntry *task;
1625 };
1626
1627
1628 static void
1629 set_mutation_done (void *cls)
1630 {
1631   struct SetMutationProgressCls *pc = cls;
1632
1633   GNUNET_assert (pc->num_pending > 0);
1634
1635   pc->num_pending--;
1636
1637   if (0 == pc->num_pending)
1638   {
1639     struct TaskEntry *task = pc->task;
1640     GNUNET_free (pc);
1641     finish_task (task);
1642   }
1643 }
1644
1645
1646 static void
1647 try_finish_step_early (struct Step *step)
1648 {
1649   unsigned int i;
1650
1651   if (GNUNET_YES == step->is_running)
1652     return;
1653   if (GNUNET_YES == step->is_finished)
1654     return;
1655   if (GNUNET_NO == step->early_finishable)
1656     return;
1657
1658   step->is_finished = GNUNET_YES;
1659
1660 #ifdef GNUNET_EXTRA_LOGGING
1661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1662               "Finishing step `%s' early.\n",
1663               step->debug_name);
1664 #endif
1665
1666   for (i = 0; i < step->subordinates_len; i++)
1667   {
1668     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1669     step->subordinates[i]->pending_prereq--;
1670 #ifdef GNUNET_EXTRA_LOGGING
1671     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672                 "Decreased pending_prereq to %u for step `%s'.\n",
1673                 (unsigned int) step->subordinates[i]->pending_prereq,
1674                 step->subordinates[i]->debug_name);
1675
1676 #endif
1677     try_finish_step_early (step->subordinates[i]);
1678   }
1679
1680   // XXX: maybe schedule as task to avoid recursion?
1681   run_ready_steps (step->session);
1682 }
1683
1684
1685 static void
1686 finish_step (struct Step *step)
1687 {
1688   unsigned int i;
1689
1690   GNUNET_assert (step->finished_tasks == step->tasks_len);
1691   GNUNET_assert (GNUNET_YES == step->is_running);
1692   GNUNET_assert (GNUNET_NO == step->is_finished);
1693
1694 #ifdef GNUNET_EXTRA_LOGGING
1695   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1696               "All tasks of step `%s' with %u subordinates finished.\n",
1697               step->debug_name,
1698               step->subordinates_len);
1699 #endif
1700
1701   for (i = 0; i < step->subordinates_len; i++)
1702   {
1703     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1704     step->subordinates[i]->pending_prereq--;
1705 #ifdef GNUNET_EXTRA_LOGGING
1706     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1707                 "Decreased pending_prereq to %u for step `%s'.\n",
1708                 (unsigned int) step->subordinates[i]->pending_prereq,
1709                 step->subordinates[i]->debug_name);
1710
1711 #endif
1712   }
1713
1714   step->is_finished = GNUNET_YES;
1715
1716   // XXX: maybe schedule as task to avoid recursion?
1717   run_ready_steps (step->session);
1718 }
1719
1720
1721
1722 /**
1723  * Apply the result from one round of gradecasts (i.e. every peer
1724  * should have gradecasted) to the peer's current set.
1725  *
1726  * @param task the task with context information
1727  */
1728 static void
1729 task_start_apply_round (struct TaskEntry *task)
1730 {
1731   struct ConsensusSession *session = task->step->session;
1732   struct SetKey sk_in;
1733   struct SetKey sk_out;
1734   struct RfnKey rk_in;
1735   struct SetEntry *set_out;
1736   struct ReferendumEntry *rfn_in;
1737   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1738   struct RfnElementInfo *ri;
1739   struct SetMutationProgressCls *progress_cls;
1740   uint16_t worst_majority = UINT16_MAX;
1741
1742   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1743   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1744   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1745
1746   set_out = lookup_set (session, &sk_out);
1747   if (NULL == set_out)
1748   {
1749     create_set_copy_for_task (task, &sk_in, &sk_out);
1750     return;
1751   }
1752
1753   rfn_in = lookup_rfn (session, &rk_in);
1754   GNUNET_assert (NULL != rfn_in);
1755
1756   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1757   progress_cls->task = task;
1758
1759   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1760
1761   while (GNUNET_YES ==
1762          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1763                                                       NULL,
1764                                                       (const void **) &ri))
1765   {
1766     uint16_t majority_num;
1767     enum ReferendumVote majority_vote;
1768
1769     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1770
1771     if (worst_majority > majority_num)
1772       worst_majority = majority_num;
1773
1774     switch (majority_vote)
1775     {
1776       case VOTE_ADD:
1777         progress_cls->num_pending++;
1778         GNUNET_assert (GNUNET_OK ==
1779                        GNUNET_SET_add_element (set_out->h,
1780                                                ri->element,
1781                                                &set_mutation_done,
1782                                                progress_cls));
1783         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1784                     "P%u: apply round: adding element %s with %u-majority.\n",
1785                     session->local_peer_idx,
1786                     debug_str_element (ri->element), majority_num);
1787         break;
1788       case VOTE_REMOVE:
1789         progress_cls->num_pending++;
1790         GNUNET_assert (GNUNET_OK ==
1791                        GNUNET_SET_remove_element (set_out->h,
1792                                                   ri->element,
1793                                                   &set_mutation_done,
1794                                                   progress_cls));
1795         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1796                     "P%u: apply round: deleting element %s with %u-majority.\n",
1797                     session->local_peer_idx,
1798                     debug_str_element (ri->element), majority_num);
1799         break;
1800       case VOTE_STAY:
1801         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1802                     "P%u: apply round: keeping element %s with %u-majority.\n",
1803                     session->local_peer_idx,
1804                     debug_str_element (ri->element), majority_num);
1805         // do nothing
1806         break;
1807       default:
1808         GNUNET_assert (0);
1809         break;
1810     }
1811   }
1812
1813   if (0 == progress_cls->num_pending)
1814   {
1815     // call closure right now, no pending ops
1816     GNUNET_free (progress_cls);
1817     finish_task (task);
1818   }
1819
1820   {
1821     uint16_t thresh = (session->num_peers / 3) * 2;
1822
1823     if (worst_majority >= thresh)
1824     {
1825       switch (session->early_stopping)
1826       {
1827         case EARLY_STOPPING_NONE:
1828           session->early_stopping = EARLY_STOPPING_ONE_MORE;
1829           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1830                       "P%u: Stopping early (after one more superround)\n",
1831                       session->local_peer_idx);
1832           break;
1833         case EARLY_STOPPING_ONE_MORE:
1834           GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1835                       session->local_peer_idx);
1836           session->early_stopping = EARLY_STOPPING_DONE;
1837           {
1838             struct Step *step;
1839             for (step = session->steps_head; NULL != step; step = step->next)
1840               try_finish_step_early (step);
1841           }
1842           break;
1843         case EARLY_STOPPING_DONE:
1844           /* We shouldn't be here anymore after early stopping */
1845           GNUNET_break (0);
1846           break;
1847         default:
1848           GNUNET_assert (0);
1849           break;
1850       }
1851     }
1852     else if (EARLY_STOPPING_NONE != session->early_stopping)
1853     {
1854       // Our assumption about the number of bad peers
1855       // has been broken.
1856       GNUNET_break_op (0);
1857     }
1858     else
1859     {
1860       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1861                   session->local_peer_idx);
1862     }
1863   }
1864   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1865 }
1866
1867
1868 static void
1869 task_start_grade (struct TaskEntry *task)
1870 {
1871   struct ConsensusSession *session = task->step->session;
1872   struct ReferendumEntry *output_rfn;
1873   struct ReferendumEntry *input_rfn;
1874   struct DiffEntry *input_diff;
1875   struct RfnKey rfn_key;
1876   struct DiffKey diff_key;
1877   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1878   struct RfnElementInfo *ri;
1879   unsigned int gradecast_confidence = 2;
1880
1881   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1882   output_rfn = lookup_rfn (session, &rfn_key);
1883   if (NULL == output_rfn)
1884   {
1885     output_rfn = rfn_create (session->num_peers);
1886     output_rfn->key = rfn_key;
1887     put_rfn (session, output_rfn);
1888   }
1889
1890   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1891   input_diff = lookup_diff (session, &diff_key);
1892   GNUNET_assert (NULL != input_diff);
1893
1894   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1895   input_rfn = lookup_rfn (session, &rfn_key);
1896   GNUNET_assert (NULL != input_rfn);
1897
1898   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1899
1900   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1901
1902   while (GNUNET_YES ==
1903          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1904                                                       NULL,
1905                                                       (const void **) &ri))
1906   {
1907     uint16_t majority_num;
1908     enum ReferendumVote majority_vote;
1909
1910     // XXX: we need contested votes and non-contested votes here
1911     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1912
1913     if (majority_num <= session->num_peers / 3)
1914       majority_vote = VOTE_REMOVE;
1915
1916     switch (majority_vote)
1917     {
1918       case VOTE_STAY:
1919         break;
1920       case VOTE_ADD:
1921         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1922         break;
1923       case VOTE_REMOVE:
1924         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1925         break;
1926       default:
1927         GNUNET_assert (0);
1928         break;
1929     }
1930   }
1931   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1932
1933   {
1934     uint16_t noncontested;
1935     noncontested = rfn_noncontested (input_rfn);
1936     if (noncontested < (session->num_peers / 3) * 2)
1937     {
1938       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1939     }
1940     if (noncontested < (session->num_peers / 3) + 1)
1941     {
1942       gradecast_confidence = 0;
1943     }
1944   }
1945
1946   if (gradecast_confidence >= 1)
1947     rfn_commit (output_rfn, task->key.leader);
1948
1949   if (gradecast_confidence <= 1)
1950     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1951
1952   finish_task (task);
1953 }
1954
1955
1956 static void
1957 task_start_reconcile (struct TaskEntry *task)
1958 {
1959   struct SetEntry *input;
1960   struct SetOpCls *setop = &task->cls.setop;
1961   struct ConsensusSession *session = task->step->session;
1962
1963   input = lookup_set (session, &setop->input_set);
1964   GNUNET_assert (NULL != input);
1965   GNUNET_assert (NULL != input->h);
1966
1967   /* We create the outputs for the operation here
1968      (rather than in the set operation callback)
1969      because we want something valid in there, even
1970      if the other peer doesn't talk to us */
1971
1972   if (SET_KIND_NONE != setop->output_set.set_kind)
1973   {
1974     /* If we don't have an existing output set,
1975        we clone the input set. */
1976     if (NULL == lookup_set (session, &setop->output_set))
1977     {
1978       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1979       return;
1980     }
1981   }
1982
1983   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1984   {
1985     if (NULL == lookup_rfn (session, &setop->output_rfn))
1986     {
1987       struct ReferendumEntry *rfn;
1988
1989       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1990                   "P%u: output rfn <%s> missing, creating.\n",
1991                   session->local_peer_idx,
1992                   debug_str_rfn_key (&setop->output_rfn));
1993
1994       rfn = rfn_create (session->num_peers);
1995       rfn->key = setop->output_rfn;
1996       put_rfn (session, rfn);
1997     }
1998   }
1999
2000   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2001   {
2002     if (NULL == lookup_diff (session, &setop->output_diff))
2003     {
2004       struct DiffEntry *diff;
2005
2006       diff = diff_create ();
2007       diff->key = setop->output_diff;
2008       put_diff (session, diff);
2009     }
2010   }
2011
2012   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2013   {
2014     /* XXX: mark the corresponding rfn as commited if necessary */
2015     finish_task (task);
2016     return;
2017   }
2018
2019   if (task->key.peer1 == session->local_peer_idx)
2020   {
2021     struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
2022
2023     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2024                 "P%u: Looking up set {%s} to run remote union\n",
2025                 session->local_peer_idx,
2026                 debug_str_set_key (&setop->input_set));
2027
2028     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2029     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2030
2031     rcm.kind = htons (task->key.kind);
2032     rcm.peer1 = htons (task->key.peer1);
2033     rcm.peer2 = htons (task->key.peer2);
2034     rcm.leader = htons (task->key.leader);
2035     rcm.repetition = htons (task->key.repetition);
2036
2037     GNUNET_assert (NULL == setop->op);
2038     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2039                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2040
2041     // XXX: maybe this should be done while
2042     // setting up tasks alreays?
2043     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2044                                     &session->global_id,
2045                                     &rcm.header,
2046                                     GNUNET_SET_RESULT_SYMMETRIC,
2047                                     set_result_cb,
2048                                     task);
2049
2050     commit_set (session, task);
2051   }
2052   else if (task->key.peer2 == session->local_peer_idx)
2053   {
2054     /* Wait for the other peer to contact us */
2055     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2056                 session->local_peer_idx, task->key.peer1);
2057
2058     if (NULL != setop->op)
2059     {
2060       commit_set (session, task);
2061     }
2062   }
2063   else
2064   {
2065     /* We made an error while constructing the task graph. */
2066     GNUNET_assert (0);
2067   }
2068 }
2069
2070
2071 static void
2072 task_start_eval_echo (struct TaskEntry *task)
2073 {
2074   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2075   struct ReferendumEntry *input_rfn;
2076   struct RfnElementInfo *ri;
2077   struct SetEntry *output_set;
2078   struct SetMutationProgressCls *progress_cls;
2079   struct ConsensusSession *session = task->step->session;
2080   struct SetKey sk_in;
2081   struct SetKey sk_out;
2082   struct RfnKey rk_in;
2083
2084   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2085   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2086   output_set = lookup_set (session, &sk_out);
2087   if (NULL == output_set)
2088   {
2089     create_set_copy_for_task (task, &sk_in, &sk_out);
2090     return;
2091   }
2092
2093
2094   {
2095     // FIXME: should be marked as a shallow copy, so
2096     // we can destroy everything correctly
2097     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2098     last_set->h = output_set->h;
2099     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2100     put_set (session, last_set);
2101   }
2102
2103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2104               "Evaluating referendum in Task {%s}\n",
2105               debug_str_task_key (&task->key));
2106
2107   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2108   progress_cls->task = task;
2109
2110   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2111   input_rfn = lookup_rfn (session, &rk_in);
2112
2113   GNUNET_assert (NULL != input_rfn);
2114
2115   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2116   GNUNET_assert (NULL != iter);
2117
2118   while (GNUNET_YES ==
2119          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2120                                                       NULL,
2121                                                       (const void **) &ri))
2122   {
2123     enum ReferendumVote majority_vote;
2124     uint16_t majority_num;
2125
2126     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2127
2128     if (majority_num < session->num_peers / 3)
2129     {
2130       /* It is not the case that all nonfaulty peers
2131          echoed the same value.  Since we're doing a set reconciliation, we
2132          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2133          reconciliation as contested.  Other peers might not know that the
2134          leader is faulty, thus we still re-distribute in the confirmation
2135          round. */
2136       output_set->is_contested = GNUNET_YES;
2137     }
2138
2139     switch (majority_vote)
2140     {
2141       case VOTE_ADD:
2142         progress_cls->num_pending++;
2143         GNUNET_assert (GNUNET_OK ==
2144                        GNUNET_SET_add_element (output_set->h,
2145                                                ri->element,
2146                                                set_mutation_done,
2147                                                progress_cls));
2148         break;
2149       case VOTE_REMOVE:
2150         progress_cls->num_pending++;
2151         GNUNET_assert (GNUNET_OK ==
2152                        GNUNET_SET_remove_element (output_set->h,
2153                                                   ri->element,
2154                                                   set_mutation_done,
2155                                                   progress_cls));
2156         break;
2157       case VOTE_STAY:
2158         /* Nothing to do. */
2159         break;
2160       default:
2161         /* not reached */
2162         GNUNET_assert (0);
2163     }
2164   }
2165
2166   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2167
2168   if (0 == progress_cls->num_pending)
2169   {
2170     // call closure right now, no pending ops
2171     GNUNET_free (progress_cls);
2172     finish_task (task);
2173   }
2174 }
2175
2176
2177 static void
2178 task_start_finish (struct TaskEntry *task)
2179 {
2180   struct SetEntry *final_set;
2181   struct ConsensusSession *session = task->step->session;
2182
2183   final_set = lookup_set (session, &task->cls.finish.input_set);
2184
2185   GNUNET_assert (NULL != final_set);
2186
2187
2188   GNUNET_SET_iterate (final_set->h,
2189                       send_to_client_iter,
2190                       task);
2191 }
2192
2193 static void
2194 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2195 {
2196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2197
2198   GNUNET_assert (GNUNET_NO == task->is_started);
2199   GNUNET_assert (GNUNET_NO == task->is_finished);
2200   GNUNET_assert (NULL != task->start);
2201
2202   task->start (task);
2203
2204   task->is_started = GNUNET_YES;
2205 }
2206
2207
2208
2209
2210 /*
2211  * Run all steps of the session that don't any
2212  * more dependencies.
2213  */
2214 static void
2215 run_ready_steps (struct ConsensusSession *session)
2216 {
2217   struct Step *step;
2218
2219   step = session->steps_head;
2220
2221   while (NULL != step)
2222   {
2223     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2224     {
2225       size_t i;
2226
2227       GNUNET_assert (0 == step->finished_tasks);
2228
2229 #ifdef GNUNET_EXTRA_LOGGING
2230       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2231                   session->local_peer_idx,
2232                   step->debug_name,
2233                   step->round, step->tasks_len, step->subordinates_len);
2234 #endif
2235
2236       step->is_running = GNUNET_YES;
2237       for (i = 0; i < step->tasks_len; i++)
2238         start_task (session, step->tasks[i]);
2239
2240       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2241       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2242         finish_step (step);
2243
2244       /* Running the next ready steps will be triggered by task completion */
2245       return;
2246     }
2247     step = step->next;
2248   }
2249
2250   return;
2251 }
2252
2253
2254
2255 static void
2256 finish_task (struct TaskEntry *task)
2257 {
2258   GNUNET_assert (GNUNET_NO == task->is_finished);
2259   task->is_finished = GNUNET_YES;
2260
2261   task->step->finished_tasks++;
2262
2263   if (task->step->finished_tasks == task->step->tasks_len)
2264     finish_step (task->step);
2265 }
2266
2267
2268 /**
2269  * Search peer in the list of peers in session.
2270  *
2271  * @param peer peer to find
2272  * @param session session with peer
2273  * @return index of peer, -1 if peer is not in session
2274  */
2275 static int
2276 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2277 {
2278   int i;
2279   for (i = 0; i < session->num_peers; i++)
2280     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2281       return i;
2282   return -1;
2283 }
2284
2285
2286 /**
2287  * Compute a global, (hopefully) unique consensus session id,
2288  * from the local id of the consensus session, and the identities of all participants.
2289  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2290  * exactly the same peers, the global id will be different.
2291  *
2292  * @param session session to generate the global id for
2293  * @param local_session_id local id of the consensus session
2294  */
2295 static void
2296 compute_global_id (struct ConsensusSession *session,
2297                    const struct GNUNET_HashCode *local_session_id)
2298 {
2299   const char *salt = "gnunet-service-consensus/session_id";
2300
2301   GNUNET_assert (GNUNET_YES ==
2302                  GNUNET_CRYPTO_kdf (&session->global_id,
2303                                     sizeof (struct GNUNET_HashCode),
2304                                     salt,
2305                                     strlen (salt),
2306                                     session->peers,
2307                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2308                                     local_session_id,
2309                                     sizeof (struct GNUNET_HashCode),
2310                                     NULL));
2311 }
2312
2313
2314 /**
2315  * Compare two peer identities.
2316  *
2317  * @param h1 some peer identity
2318  * @param h2 some peer identity
2319  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2320  */
2321 static int
2322 peer_id_cmp (const void *h1, const void *h2)
2323 {
2324   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2325 }
2326
2327
2328 /**
2329  * Create the sorted list of peers for the session,
2330  * add the local peer if not in the join message.
2331  */
2332 static void
2333 initialize_session_peer_list (struct ConsensusSession *session,
2334                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2335 {
2336   unsigned int local_peer_in_list;
2337   uint32_t listed_peers;
2338   const struct GNUNET_PeerIdentity *msg_peers;
2339   unsigned int i;
2340
2341   GNUNET_assert (NULL != join_msg);
2342
2343   /* peers in the join message, may or may not include the local peer */
2344   listed_peers = ntohl (join_msg->num_peers);
2345
2346   session->num_peers = listed_peers;
2347
2348   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2349
2350   local_peer_in_list = GNUNET_NO;
2351   for (i = 0; i < listed_peers; i++)
2352   {
2353     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2354     {
2355       local_peer_in_list = GNUNET_YES;
2356       break;
2357     }
2358   }
2359
2360   if (GNUNET_NO == local_peer_in_list)
2361     session->num_peers++;
2362
2363   session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2364
2365   if (GNUNET_NO == local_peer_in_list)
2366     session->peers[session->num_peers - 1] = my_peer;
2367
2368   GNUNET_memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2369   qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
2370 }
2371
2372
2373 static struct TaskEntry *
2374 lookup_task (struct ConsensusSession *session,
2375              struct TaskKey *key)
2376 {
2377   struct GNUNET_HashCode hash;
2378
2379
2380   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2381   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2382               GNUNET_h2s (&hash));
2383   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2384 }
2385
2386
2387 /**
2388  * Called when another peer wants to do a set operation with the
2389  * local peer.
2390  *
2391  * @param cls closure
2392  * @param other_peer the other peer
2393  * @param context_msg message with application specific information from
2394  *        the other peer
2395  * @param request request from the other peer, use GNUNET_SET_accept
2396  *        to accept it, otherwise the request will be refused
2397  *        Note that we don't use a return value here, as it is also
2398  *        necessary to specify the set we want to do the operation with,
2399  *        whith sometimes can be derived from the context message.
2400  *        Also necessary to specify the timeout.
2401  */
2402 static void
2403 set_listen_cb (void *cls,
2404                const struct GNUNET_PeerIdentity *other_peer,
2405                const struct GNUNET_MessageHeader *context_msg,
2406                struct GNUNET_SET_Request *request)
2407 {
2408   struct ConsensusSession *session = cls;
2409   struct TaskKey tk;
2410   struct TaskEntry *task;
2411   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2412
2413   if (NULL == context_msg)
2414   {
2415     GNUNET_break_op (0);
2416     return;
2417   }
2418
2419   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2420   {
2421     GNUNET_break_op (0);
2422     return;
2423   }
2424
2425   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2426   {
2427     GNUNET_break_op (0);
2428     return;
2429   }
2430
2431   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2432
2433   tk = ((struct TaskKey) {
2434       .kind = ntohs (cm->kind),
2435       .peer1 = ntohs (cm->peer1),
2436       .peer2 = ntohs (cm->peer2),
2437       .repetition = ntohs (cm->repetition),
2438       .leader = ntohs (cm->leader),
2439   });
2440
2441   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2442               session->local_peer_idx, debug_str_task_key (&tk));
2443
2444   task = lookup_task (session, &tk);
2445
2446   if (NULL == task)
2447   {
2448     GNUNET_break_op (0);
2449     return;
2450   }
2451
2452   if (GNUNET_YES == task->is_finished)
2453   {
2454     GNUNET_break_op (0);
2455     return;
2456   }
2457
2458   if (task->key.peer2 != session->local_peer_idx)
2459   {
2460     /* We're being asked, so we must be thne 2nd peer. */
2461     GNUNET_break_op (0);
2462     return;
2463   }
2464
2465   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2466                     (task->key.peer2 == session->local_peer_idx)));
2467
2468   task->cls.setop.op = GNUNET_SET_accept (request,
2469                                           GNUNET_SET_RESULT_SYMMETRIC,
2470                                           set_result_cb,
2471                                           task);
2472
2473   /* If the task hasn't been started yet,
2474      we wait for that until we commit. */
2475
2476   if (GNUNET_YES == task->is_started)
2477   {
2478     commit_set (session, task);
2479   }
2480 }
2481
2482
2483
2484 static void
2485 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2486           struct TaskEntry *t)
2487 {
2488   struct GNUNET_HashCode round_hash;
2489   struct Step *s;
2490
2491   GNUNET_assert (NULL != t->step);
2492
2493   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2494
2495   s = t->step;
2496
2497   if (s->tasks_len == s->tasks_cap)
2498   {
2499     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2500     GNUNET_array_grow (s->tasks,
2501                        s->tasks_cap,
2502                        target_size);
2503   }
2504
2505 #ifdef GNUNET_EXTRA_LOGGING
2506   GNUNET_assert (NULL != s->debug_name);
2507   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2508               debug_str_task_key (&t->key),
2509               s->debug_name);
2510 #endif
2511
2512   s->tasks[s->tasks_len] = t;
2513   s->tasks_len++;
2514
2515   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2516   GNUNET_assert (GNUNET_OK ==
2517       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2518                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2519 }
2520
2521
2522 static void
2523 install_step_timeouts (struct ConsensusSession *session)
2524 {
2525   /* Given the fully constructed task graph
2526      with rounds for tasks, we can give the tasks timeouts. */
2527
2528   // unsigned int max_round;
2529
2530   /* XXX: implement! */
2531 }
2532
2533
2534
2535 /*
2536  * Arrange two peers in some canonical order.
2537  */
2538 static void
2539 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2540 {
2541   uint16_t a;
2542   uint16_t b;
2543
2544   GNUNET_assert (*p1 < n);
2545   GNUNET_assert (*p2 < n);
2546
2547   if (*p1 < *p2)
2548   {
2549     a = *p1;
2550     b = *p2;
2551   }
2552   else
2553   {
2554     a = *p2;
2555     b = *p1;
2556   }
2557
2558   /* For uniformly random *p1, *p2,
2559      this condition is true with 50% chance */
2560   if (((b - a) + n) % n <= n / 2)
2561   {
2562     *p1 = a;
2563     *p2 = b;
2564   }
2565   else
2566   {
2567     *p1 = b;
2568     *p2 = a;
2569   }
2570 }
2571
2572
2573 /**
2574  * Record @a dep as a dependency of @a step.
2575  */
2576 static void
2577 step_depend_on (struct Step *step, struct Step *dep)
2578 {
2579   /* We're not checking for cyclic dependencies,
2580      but this is a cheap sanity check. */
2581   GNUNET_assert (step != dep);
2582   GNUNET_assert (NULL != step);
2583   GNUNET_assert (NULL != dep);
2584   GNUNET_assert (dep->round <= step->round);
2585
2586 #ifdef GNUNET_EXTRA_LOGGING
2587   /* Make sure we have complete debugging information.
2588      Also checks that we don't screw up too badly
2589      constructing the task graph. */
2590   GNUNET_assert (NULL != step->debug_name);
2591   GNUNET_assert (NULL != dep->debug_name);
2592   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2593               "Making step `%s' depend on `%s'\n",
2594               step->debug_name,
2595               dep->debug_name);
2596 #endif
2597
2598   if (dep->subordinates_cap == dep->subordinates_len)
2599   {
2600     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2601     GNUNET_array_grow (dep->subordinates,
2602                        dep->subordinates_cap,
2603                        target_size);
2604   }
2605
2606   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2607
2608   dep->subordinates[dep->subordinates_len] = step;
2609   dep->subordinates_len++;
2610
2611   step->pending_prereq++;
2612 }
2613
2614
2615 static struct Step *
2616 create_step (struct ConsensusSession *session, int round, int early_finishable)
2617 {
2618   struct Step *step;
2619   step = GNUNET_new (struct Step);
2620   step->session = session;
2621   step->round = round;
2622   step->early_finishable = early_finishable;
2623   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2624                                     session->steps_tail,
2625                                     step);
2626   return step;
2627 }
2628
2629
2630 /**
2631  * Construct the task graph for a single
2632  * gradecast.
2633  */
2634 static void
2635 construct_task_graph_gradecast (struct ConsensusSession *session,
2636                                 uint16_t rep,
2637                                 uint16_t lead,
2638                                 struct Step *step_before,
2639                                 struct Step *step_after)
2640 {
2641   uint16_t n = session->num_peers;
2642   uint16_t me = session->local_peer_idx;
2643
2644   uint16_t p1;
2645   uint16_t p2;
2646
2647   /* The task we're currently setting up. */
2648   struct TaskEntry task;
2649
2650   struct Step *step;
2651   struct Step *prev_step;
2652
2653   uint16_t round;
2654
2655   unsigned int k;
2656
2657   round = step_before->round + 1;
2658
2659   /* gcast step 1: leader disseminates */
2660
2661   step = create_step (session, round, GNUNET_YES);
2662
2663 #ifdef GNUNET_EXTRA_LOGGING
2664   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2665 #endif
2666   step_depend_on (step, step_before);
2667
2668   if (lead == me)
2669   {
2670     for (k = 0; k < n; k++)
2671     {
2672       if (k == me)
2673         continue;
2674       p1 = me;
2675       p2 = k;
2676       arrange_peers (&p1, &p2, n);
2677       task = ((struct TaskEntry) {
2678         .step = step,
2679         .start = task_start_reconcile,
2680         .cancel = task_cancel_reconcile,
2681         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2682       });
2683       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2684       put_task (session->taskmap, &task);
2685     }
2686     /* We run this task to make sure that the leader
2687        has the stored the SET_KIND_LEADER set of himself,
2688        so he can participate in the rest of the gradecast
2689        without the code having to handle any special cases. */
2690     task = ((struct TaskEntry) {
2691       .step = step,
2692       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2693       .start = task_start_reconcile,
2694       .cancel = task_cancel_reconcile,
2695     });
2696     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2697     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2698     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2699     put_task (session->taskmap, &task);
2700   }
2701   else
2702   {
2703     p1 = me;
2704     p2 = lead;
2705     arrange_peers (&p1, &p2, n);
2706     task = ((struct TaskEntry) {
2707       .step = step,
2708       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2709       .start = task_start_reconcile,
2710       .cancel = task_cancel_reconcile,
2711     });
2712     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2713     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2714     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2715     put_task (session->taskmap, &task);
2716   }
2717
2718   /* gcast phase 2: echo */
2719   prev_step = step;
2720   round += 1;
2721   step = create_step (session, round, GNUNET_YES);
2722 #ifdef GNUNET_EXTRA_LOGGING
2723   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2724 #endif
2725   step_depend_on (step, prev_step);
2726
2727   for (k = 0; k < n; k++)
2728   {
2729     p1 = k;
2730     p2 = me;
2731     arrange_peers (&p1, &p2, n);
2732     task = ((struct TaskEntry) {
2733       .step = step,
2734       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2735       .start = task_start_reconcile,
2736       .cancel = task_cancel_reconcile,
2737     });
2738     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2739     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2740     put_task (session->taskmap, &task);
2741   }
2742
2743   prev_step = step;
2744   /* Same round, since step only has local tasks */
2745   step = create_step (session, round, GNUNET_YES);
2746 #ifdef GNUNET_EXTRA_LOGGING
2747   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2748 #endif
2749   step_depend_on (step, prev_step);
2750
2751   arrange_peers (&p1, &p2, n);
2752   task = ((struct TaskEntry) {
2753     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2754     .step = step,
2755     .start = task_start_eval_echo
2756   });
2757   put_task (session->taskmap, &task);
2758
2759   prev_step = step;
2760   round += 1;
2761   step = create_step (session, round, GNUNET_YES);
2762 #ifdef GNUNET_EXTRA_LOGGING
2763   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2764 #endif
2765   step_depend_on (step, prev_step);
2766
2767   /* gcast phase 3: confirmation and grading */
2768   for (k = 0; k < n; k++)
2769   {
2770     p1 = k;
2771     p2 = me;
2772     arrange_peers (&p1, &p2, n);
2773     task = ((struct TaskEntry) {
2774       .step = step,
2775       .start = task_start_reconcile,
2776       .cancel = task_cancel_reconcile,
2777       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2778     });
2779     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2780     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2781     /* If there was at least one element in the echo round that was
2782        contested (i.e. it had no n-t majority), then we let the other peers
2783        know, and other peers let us know.  The contested flag for each peer is
2784        stored in the rfn. */
2785     task.cls.setop.transceive_contested = GNUNET_YES;
2786     put_task (session->taskmap, &task);
2787   }
2788
2789   prev_step = step;
2790   /* Same round, since step only has local tasks */
2791   step = create_step (session, round, GNUNET_YES);
2792 #ifdef GNUNET_EXTRA_LOGGING
2793   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2794 #endif
2795   step_depend_on (step, prev_step);
2796
2797   task = ((struct TaskEntry) {
2798     .step = step,
2799     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2800     .start = task_start_grade,
2801   });
2802   put_task (session->taskmap, &task);
2803
2804   step_depend_on (step_after, step);
2805 }
2806
2807
2808 static void
2809 construct_task_graph (struct ConsensusSession *session)
2810 {
2811   uint16_t n = session->num_peers;
2812   uint16_t t = n / 3;
2813
2814   uint16_t me = session->local_peer_idx;
2815
2816   /* The task we're currently setting up. */
2817   struct TaskEntry task;
2818
2819   /* Current leader */
2820   unsigned int lead;
2821
2822   struct Step *step;
2823   struct Step *prev_step;
2824
2825   unsigned int round = 0;
2826
2827   unsigned int i;
2828
2829   // XXX: introduce first step,
2830   // where we wait for all insert acks
2831   // from the set service
2832
2833   /* faster but brittle all-to-all */
2834
2835   // XXX: Not implemented yet
2836
2837   /* all-to-all step */
2838
2839   step = create_step (session, round, GNUNET_NO);
2840
2841 #ifdef GNUNET_EXTRA_LOGGING
2842   step->debug_name = GNUNET_strdup ("all to all");
2843 #endif
2844
2845   for (i = 0; i < n; i++)
2846   {
2847     uint16_t p1;
2848     uint16_t p2;
2849
2850     p1 = me;
2851     p2 = i;
2852     arrange_peers (&p1, &p2, n);
2853     task = ((struct TaskEntry) {
2854       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2855       .step = step,
2856       .start = task_start_reconcile,
2857       .cancel = task_cancel_reconcile,
2858     });
2859     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2860     task.cls.setop.output_set = task.cls.setop.input_set;
2861     task.cls.setop.do_not_remove = GNUNET_YES;
2862     put_task (session->taskmap, &task);
2863   }
2864
2865   prev_step = step;
2866   step = NULL;
2867
2868   round += 1;
2869
2870   /* Byzantine union */
2871
2872   /* sequential repetitions of the gradecasts */
2873   for (i = 0; i < t + 1; i++)
2874   {
2875     struct Step *step_rep_start;
2876     struct Step *step_rep_end;
2877
2878     /* Every repetition is in a separate round. */
2879     step_rep_start = create_step (session, round, GNUNET_YES);
2880 #ifdef GNUNET_EXTRA_LOGGING
2881     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2882 #endif
2883
2884     step_depend_on (step_rep_start, prev_step);
2885
2886     /* gradecast has three rounds */
2887     round += 3;
2888     step_rep_end = create_step (session, round, GNUNET_YES);
2889 #ifdef GNUNET_EXTRA_LOGGING
2890     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2891 #endif
2892
2893     /* parallel gradecasts */
2894     for (lead = 0; lead < n; lead++)
2895       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2896
2897     task = ((struct TaskEntry) {
2898       .step = step_rep_end,
2899       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2900       .start = task_start_apply_round,
2901     });
2902     put_task (session->taskmap, &task);
2903
2904     prev_step = step_rep_end;
2905   }
2906
2907  /* There is no next gradecast round, thus the final
2908     start step is the overall end step of the gradecasts */
2909   round += 1;
2910   step = create_step (session, round, GNUNET_NO);
2911 #ifdef GNUNET_EXTRA_LOGGING
2912   GNUNET_asprintf (&step->debug_name, "finish");
2913 #endif
2914   step_depend_on (step, prev_step);
2915
2916   task = ((struct TaskEntry) {
2917     .step = step,
2918     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2919     .start = task_start_finish,
2920   });
2921   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2922
2923   put_task (session->taskmap, &task);
2924 }
2925
2926
2927 /**
2928  * Initialize the session, continue receiving messages from the owning client
2929  *
2930  * @param session the session to initialize
2931  * @param join_msg the join message from the client
2932  */
2933 static void
2934 initialize_session (struct ConsensusSession *session,
2935                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2936 {
2937   struct ConsensusSession *other_session;
2938
2939   initialize_session_peer_list (session, join_msg);
2940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
2941   compute_global_id (session, &join_msg->session_id);
2942
2943   /* Check if some local client already owns the session.
2944      It is only legal to have a session with an existing global id
2945      if all other sessions with this global id are finished.*/
2946   other_session = sessions_head;
2947   while (NULL != other_session)
2948   {
2949     if ((other_session != session) &&
2950         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2951     {
2952       //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2953       //{
2954       //  GNUNET_break (0);
2955       //  destroy_session (session);
2956       //  return;
2957       //}
2958       break;
2959     }
2960     other_session = other_session->next;
2961   }
2962
2963   session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
2964   session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
2965
2966   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %llums created\n",
2967               (long long) (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
2968
2969   session->local_peer_idx = get_peer_idx (&my_peer, session);
2970   GNUNET_assert (-1 != session->local_peer_idx);
2971   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2972                                              &session->global_id,
2973                                              set_listen_cb, session);
2974   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2975
2976   session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2977   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2978   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2979   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2980
2981   {
2982     struct SetEntry *client_set;
2983     client_set = GNUNET_new (struct SetEntry);
2984     client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2985     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2986     put_set (session, client_set);
2987   }
2988
2989   session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2990
2991   /* Just construct the task graph,
2992      but don't run anything until the client calls conclude. */
2993   construct_task_graph (session);
2994
2995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2996 }
2997
2998
2999 static struct ConsensusSession *
3000 get_session_by_client (struct GNUNET_SERVER_Client *client)
3001 {
3002   struct ConsensusSession *session;
3003
3004   session = sessions_head;
3005   while (NULL != session)
3006   {
3007     if (session->client == client)
3008       return session;
3009     session = session->next;
3010   }
3011   return NULL;
3012 }
3013
3014
3015 /**
3016  * Called when a client wants to join a consensus session.
3017  *
3018  * @param cls unused
3019  * @param client client that sent the message
3020  * @param m message sent by the client
3021  */
3022 static void
3023 client_join (void *cls,
3024              struct GNUNET_SERVER_Client *client,
3025              const struct GNUNET_MessageHeader *m)
3026 {
3027   struct ConsensusSession *session;
3028
3029   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
3030
3031   session = get_session_by_client (client);
3032   if (NULL != session)
3033   {
3034     GNUNET_break (0);
3035     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
3036     return;
3037   }
3038   session = GNUNET_new (struct ConsensusSession);
3039   session->client = client;
3040   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
3041   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
3042   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
3043   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3044
3045   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
3046 }
3047
3048
3049 static void
3050 client_insert_done (void *cls)
3051 {
3052   // FIXME: implement
3053 }
3054
3055
3056 /**
3057  * Called when a client performs an insert operation.
3058  *
3059  * @param cls (unused)
3060  * @param client client handle
3061  * @param m message sent by the client
3062  */
3063 void
3064 client_insert (void *cls,
3065                struct GNUNET_SERVER_Client *client,
3066                const struct GNUNET_MessageHeader *m)
3067 {
3068   struct ConsensusSession *session;
3069   struct GNUNET_CONSENSUS_ElementMessage *msg;
3070   struct GNUNET_SET_Element *element;
3071   ssize_t element_size;
3072   struct GNUNET_SET_Handle *initial_set;
3073
3074   session = get_session_by_client (client);
3075
3076   if (NULL == session)
3077   {
3078     GNUNET_break (0);
3079     GNUNET_SERVER_client_disconnect (client);
3080     return;
3081   }
3082
3083   if (GNUNET_YES == session->conclude_started)
3084   {
3085     GNUNET_break (0);
3086     GNUNET_SERVER_client_disconnect (client);
3087     return;
3088   }
3089
3090   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
3091   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3092   if (element_size < 0)
3093   {
3094     GNUNET_break (0);
3095     return;
3096   }
3097
3098   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
3099   element->element_type = msg->element_type;
3100   element->size = element_size;
3101   GNUNET_memcpy (&element[1], &msg[1], element_size);
3102   element->data = &element[1];
3103   {
3104     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3105     struct SetEntry *entry;
3106     entry = lookup_set (session, &key);
3107     GNUNET_assert (NULL != entry);
3108     initial_set = entry->h;
3109   }
3110   session->num_client_insert_pending++;
3111   GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
3112
3113 #ifdef GNUNET_EXTRA_LOGGING
3114   {
3115     struct GNUNET_HashCode hash;
3116
3117     GNUNET_SET_element_hash (element, &hash);
3118
3119     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
3120                 session->local_peer_idx,
3121                 GNUNET_h2s (&hash));
3122   }
3123 #endif
3124
3125   GNUNET_free (element);
3126   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3127 }
3128
3129
3130 /**
3131  * Called when a client performs the conclude operation.
3132  *
3133  * @param cls (unused)
3134  * @param client client handle
3135  * @param message message sent by the client
3136  */
3137 static void
3138 client_conclude (void *cls,
3139                  struct GNUNET_SERVER_Client *client,
3140                  const struct GNUNET_MessageHeader *message)
3141 {
3142   struct ConsensusSession *session;
3143
3144   session = get_session_by_client (client);
3145   if (NULL == session)
3146   {
3147     /* client not found */
3148     GNUNET_break (0);
3149     GNUNET_SERVER_client_disconnect (client);
3150     return;
3151   }
3152
3153   if (GNUNET_YES == session->conclude_started)
3154   {
3155     /* conclude started twice */
3156     GNUNET_break (0);
3157     GNUNET_SERVER_client_disconnect (client);
3158     destroy_session (session);
3159     return;
3160   }
3161
3162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
3163
3164   session->conclude_started = GNUNET_YES;
3165
3166   install_step_timeouts (session);
3167   run_ready_steps (session);
3168
3169
3170   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3171 }
3172
3173
3174 /**
3175  * Called to clean up, after a shutdown has been requested.
3176  *
3177  * @param cls closure
3178  */
3179 static void
3180 shutdown_task (void *cls)
3181 {
3182   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n");
3183   while (NULL != sessions_head)
3184     destroy_session (sessions_head);
3185
3186   GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
3187 }
3188
3189
3190 /**
3191  * Clean up after a client after it is
3192  * disconnected (either by us or by itself)
3193  *
3194  * @param cls closure, unused
3195  * @param client the client to clean up after
3196  */
3197 void
3198 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
3199 {
3200   struct ConsensusSession *session;
3201
3202   session = get_session_by_client (client);
3203   if (NULL == session)
3204     return;
3205   // FIXME: destroy if we can
3206 }
3207
3208
3209
3210 /**
3211  * Start processing consensus requests.
3212  *
3213  * @param cls closure
3214  * @param server the initialized server
3215  * @param c configuration to use
3216  */
3217 static void
3218 run (void *cls, struct GNUNET_SERVER_Handle *server,
3219      const struct GNUNET_CONFIGURATION_Handle *c)
3220 {
3221   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
3222     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3223         sizeof (struct GNUNET_MessageHeader)},
3224     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
3225     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
3226     {NULL, NULL, 0, 0}
3227   };
3228
3229   cfg = c;
3230   srv = server;
3231   if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
3232   {
3233     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
3234     GNUNET_break (0);
3235     GNUNET_SCHEDULER_shutdown ();
3236     return;
3237   }
3238   statistics = GNUNET_STATISTICS_create ("consensus", cfg);
3239   GNUNET_SERVER_add_handlers (server, server_handlers);
3240   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
3241   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
3242   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
3243 }
3244
3245
3246 /**
3247  * The main function for the consensus service.
3248  *
3249  * @param argc number of arguments from the command line
3250  * @param argv command line arguments
3251  * @return 0 ok, 1 on error
3252  */
3253 int
3254 main (int argc, char *const *argv)
3255 {
3256   int ret;
3257   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
3258   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
3259   return (GNUNET_OK == ret) ? 0 : 1;
3260 }