More API function tests...
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
36
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
38
39
40 enum ReferendumVote
41 {
42   /**
43    * Vote that nothing should change.
44    * This option is never voted explicitly.
45    */
46   VOTE_STAY = 0,
47   /**
48    * Vote that an element should be added.
49    */
50   VOTE_ADD = 1,
51   /**
52    * Vote that an element should be removed.
53    */
54   VOTE_REMOVE = 2,
55 };
56
57
58 enum EarlyStoppingPhase
59 {
60   EARLY_STOPPING_NONE = 0,
61   EARLY_STOPPING_ONE_MORE = 1,
62   EARLY_STOPPING_DONE = 2,
63 };
64
65
66 GNUNET_NETWORK_STRUCT_BEGIN
67
68
69 struct ContestedPayload
70 {
71 };
72
73 /**
74  * Tuple of integers that together
75  * identify a task uniquely.
76  */
77 struct TaskKey {
78   /**
79    * A value from 'enum PhaseKind'.
80    */
81   uint16_t kind GNUNET_PACKED;
82
83   /**
84    * Number of the first peer
85    * in canonical order.
86    */
87   int16_t peer1 GNUNET_PACKED;
88
89   /**
90    * Number of the second peer in canonical order.
91    */
92   int16_t peer2 GNUNET_PACKED;
93
94   /**
95    * Repetition of the gradecast phase.
96    */
97   int16_t repetition GNUNET_PACKED;
98
99   /**
100    * Leader in the gradecast phase.
101    *
102    * Can be different from both peer1 and peer2.
103    */
104   int16_t leader GNUNET_PACKED;
105 };
106
107
108
109 struct SetKey
110 {
111   int set_kind GNUNET_PACKED;
112   int k1 GNUNET_PACKED;
113   int k2 GNUNET_PACKED;
114 };
115
116
117 struct SetEntry
118 {
119   struct SetKey key;
120   struct GNUNET_SET_Handle *h;
121   /**
122    * GNUNET_YES if the set resulted
123    * from applying a referendum with contested
124    * elements.
125    */
126   int is_contested;
127 };
128
129
130 struct DiffKey
131 {
132   int diff_kind GNUNET_PACKED;
133   int k1 GNUNET_PACKED;
134   int k2 GNUNET_PACKED;
135 };
136
137 struct RfnKey
138 {
139   int rfn_kind GNUNET_PACKED;
140   int k1 GNUNET_PACKED;
141   int k2 GNUNET_PACKED;
142 };
143
144
145 GNUNET_NETWORK_STRUCT_END
146
147 enum PhaseKind
148 {
149   PHASE_KIND_ALL_TO_ALL,
150   PHASE_KIND_GRADECAST_LEADER,
151   PHASE_KIND_GRADECAST_ECHO,
152   PHASE_KIND_GRADECAST_ECHO_GRADE,
153   PHASE_KIND_GRADECAST_CONFIRM,
154   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
155   /**
156    * Apply a repetition of the all-to-all
157    * gradecast to the current set.
158    */
159   PHASE_KIND_APPLY_REP,
160   PHASE_KIND_FINISH,
161 };
162
163
164 enum SetKind
165 {
166   SET_KIND_NONE = 0,
167   SET_KIND_CURRENT,
168   /**
169    * Last result set from a gradecast
170    */
171   SET_KIND_LAST_GRADECAST,
172   SET_KIND_LEADER_PROPOSAL,
173   SET_KIND_ECHO_RESULT,
174 };
175
176 enum DiffKind
177 {
178   DIFF_KIND_NONE = 0,
179   DIFF_KIND_LEADER_PROPOSAL,
180   DIFF_KIND_LEADER_CONSENSUS,
181   DIFF_KIND_GRADECAST_RESULT,
182 };
183
184 enum RfnKind
185 {
186   RFN_KIND_NONE = 0,
187   RFN_KIND_ECHO,
188   RFN_KIND_CONFIRM,
189   RFN_KIND_GRADECAST_RESULT
190 };
191
192
193 struct SetOpCls
194 {
195   struct SetKey input_set;
196
197   struct SetKey output_set;
198   struct RfnKey output_rfn;
199   struct DiffKey output_diff;
200
201   int do_not_remove;
202
203   int transceive_contested;
204
205   struct GNUNET_SET_OperationHandle *op;
206 };
207
208
209 struct FinishCls
210 {
211   struct SetKey input_set;
212 };
213
214 /**
215  * Closure for both @a start_task
216  * and @a cancel_task.
217  */
218 union TaskFuncCls
219 {
220   struct SetOpCls setop;
221   struct FinishCls finish;
222 };
223
224 struct TaskEntry;
225
226 typedef void (*TaskFunc) (struct TaskEntry *task);
227
228 /*
229  * Node in the consensus task graph.
230  */
231 struct TaskEntry
232 {
233   struct TaskKey key;
234
235   struct Step *step;
236
237   int is_started;
238
239   int is_finished;
240
241   TaskFunc start;
242   TaskFunc cancel;
243
244   union TaskFuncCls cls;
245 };
246
247
248 struct Step
249 {
250   /**
251    * All steps of one session are in a
252    * linked list for easier deallocation.
253    */
254   struct Step *prev;
255
256   /**
257    * All steps of one session are in a
258    * linked list for easier deallocation.
259    */
260   struct Step *next;
261
262   struct ConsensusSession *session;
263
264   /**
265    * Tasks that this step is composed of.
266    */
267   struct TaskEntry **tasks;
268   unsigned int tasks_len;
269   unsigned int tasks_cap;
270
271   unsigned int finished_tasks;
272
273   /*
274    * Tasks that have this task as dependency.
275    *
276    * We store pointers to subordinates rather
277    * than to prerequisites since it makes
278    * tracking the readiness of a task easier.
279    */
280   struct Step **subordinates;
281   unsigned int subordinates_len;
282   unsigned int subordinates_cap;
283
284   /**
285    * Counter for the prerequisites of
286    * this step.
287    */
288   size_t pending_prereq;
289
290   /*
291    * Task that will run this step despite
292    * any pending prerequisites.
293    */
294   struct GNUNET_SCHEDULER_Task *timeout_task;
295
296   unsigned int is_running;
297
298   unsigned int is_finished;
299
300   /*
301    * Synchrony round of the task.
302    * Determines the deadline for the task.
303    */
304   unsigned int round;
305
306   /**
307    * Human-readable name for
308    * the task, used for debugging.
309    */
310   char *debug_name;
311
312   /**
313    * When we're doing an early finish, how should this step be
314    * treated?
315    * If GNUNET_YES, the step will be marked as finished
316    * without actually running its tasks.
317    * Otherwise, the step will still be run even after
318    * an early finish.
319    *
320    * Note that a task may never be finished early if
321    * it is already running.
322    */
323   int early_finishable;
324 };
325
326
327 struct RfnElementInfo
328 {
329   const struct GNUNET_SET_Element *element;
330
331   /*
332    * GNUNET_YES if the peer votes for the proposal.
333    */
334   int *votes;
335
336   /**
337    * Proposal for this element,
338    * can only be VOTE_ADD or VOTE_REMOVE.
339    */
340   enum ReferendumVote proposal;
341 };
342
343
344 struct ReferendumEntry
345 {
346   struct RfnKey key;
347
348   /*
349    * Elements where there is at least one proposed change.
350    *
351    * Maps the hash of the GNUNET_SET_Element
352    * to 'struct RfnElementInfo'.
353    */
354   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
355
356   unsigned int num_peers;
357
358   /**
359    * Stores, for every peer in the session,
360    * whether the peer finished the whole referendum.
361    *
362    * Votes from peers are only counted if they're
363    * marked as commited (#GNUNET_YES) in the referendum.
364    *
365    * Otherwise (#GNUNET_NO), the requested changes are
366    * not counted for majority votes or thresholds.
367    */
368   int *peer_commited;
369
370
371   /**
372    * Contestation state of the peer.  If a peer is contested, the values it
373    * contributed are still counted for applying changes, but the grading is
374    * affected.
375    */
376   int *peer_contested;
377 };
378
379
380 struct DiffElementInfo
381 {
382   const struct GNUNET_SET_Element *element;
383
384   /**
385    * Positive weight for 'add', negative
386    * weights for 'remove'.
387    */
388   int weight;
389 };
390
391
392 /**
393  * Weighted diff.
394  */
395 struct DiffEntry
396 {
397   struct DiffKey key;
398   struct GNUNET_CONTAINER_MultiHashMap *changes;
399 };
400
401
402
403 /**
404  * A consensus session consists of one local client and the remote authorities.
405  */
406 struct ConsensusSession
407 {
408   /**
409    * Consensus sessions are kept in a DLL.
410    */
411   struct ConsensusSession *next;
412
413   /**
414    * Consensus sessions are kept in a DLL.
415    */
416   struct ConsensusSession *prev;
417
418   unsigned int num_client_insert_pending;
419
420   struct GNUNET_CONTAINER_MultiHashMap *setmap;
421   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
422   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
423
424   /**
425    * Array of peers with length 'num_peers'.
426    */
427   int *peers_blacklisted;
428
429   /*
430    * Mapping from (hashed) TaskKey to TaskEntry.
431    *
432    * We map the application_id for a round to the task that should be
433    * executed, so we don't have to go through all task whenever we get
434    * an incoming set op request.
435    */
436   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
437
438   struct Step *steps_head;
439   struct Step *steps_tail;
440
441   int conclude_started;
442
443   int conclude_done;
444
445   /**
446   * Global consensus identification, computed
447   * from the session id and participating authorities.
448   */
449   struct GNUNET_HashCode global_id;
450
451   /**
452    * Client that inhabits the session
453    */
454   struct GNUNET_SERVICE_Client *client;
455
456   /**
457    * Queued messages to the client.
458    */
459   struct GNUNET_MQ_Handle *client_mq;
460
461   /**
462    * Time when the conclusion of the consensus should begin.
463    */
464   struct GNUNET_TIME_Absolute conclude_start;
465
466   /**
467    * Timeout for all rounds together, single rounds will schedule a timeout task
468    * with a fraction of the conclude timeout.
469    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
470    */
471   struct GNUNET_TIME_Absolute conclude_deadline;
472
473   struct GNUNET_PeerIdentity *peers;
474
475   /**
476    * Number of other peers in the consensus.
477    */
478   unsigned int num_peers;
479
480   /**
481    * Index of the local peer in the peers array
482    */
483   unsigned int local_peer_idx;
484
485   /**
486    * Listener for requests from other peers.
487    * Uses the session's global id as app id.
488    */
489   struct GNUNET_SET_ListenHandle *set_listener;
490
491   /**
492    * State of our early stopping scheme.
493    */
494   int early_stopping;
495 };
496
497 /**
498  * Linked list of sessions this peer participates in.
499  */
500 static struct ConsensusSession *sessions_head;
501
502 /**
503  * Linked list of sessions this peer participates in.
504  */
505 static struct ConsensusSession *sessions_tail;
506
507 /**
508  * Configuration of the consensus service.
509  */
510 static const struct GNUNET_CONFIGURATION_Handle *cfg;
511
512 /**
513  * Peer that runs this service.
514  */
515 static struct GNUNET_PeerIdentity my_peer;
516
517 /**
518  * Statistics handle.
519  */
520 struct GNUNET_STATISTICS_Handle *statistics;
521
522
523 static void
524 finish_task (struct TaskEntry *task);
525
526
527 static void
528 run_ready_steps (struct ConsensusSession *session);
529
530
531 static const char *
532 phasename (uint16_t phase)
533 {
534   switch (phase)
535   {
536     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
537     case PHASE_KIND_FINISH: return "FINISH";
538     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
539     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
540     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
541     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
542     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
543     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
544     default: return "(unknown)";
545   }
546 }
547
548
549 static const char *
550 setname (uint16_t kind)
551 {
552   switch (kind)
553   {
554     case SET_KIND_CURRENT: return "CURRENT";
555     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
556     case SET_KIND_NONE: return "NONE";
557     default: return "(unknown)";
558   }
559 }
560
561 static const char *
562 rfnname (uint16_t kind)
563 {
564   switch (kind)
565   {
566     case RFN_KIND_NONE: return "NONE";
567     case RFN_KIND_ECHO: return "ECHO";
568     case RFN_KIND_CONFIRM: return "CONFIRM";
569     default: return "(unknown)";
570   }
571 }
572
573 static const char *
574 diffname (uint16_t kind)
575 {
576   switch (kind)
577   {
578     case DIFF_KIND_NONE: return "NONE";
579     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
580     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
581     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
582     default: return "(unknown)";
583   }
584 }
585
586 #ifdef GNUNET_EXTRA_LOGGING
587
588
589 static const char *
590 debug_str_element (const struct GNUNET_SET_Element *el)
591 {
592   struct GNUNET_HashCode hash;
593
594   GNUNET_SET_element_hash (el, &hash);
595
596   return GNUNET_h2s (&hash);
597 }
598
599 static const char *
600 debug_str_task_key (struct TaskKey *tk)
601 {
602   static char buf[256];
603
604   snprintf (buf, sizeof (buf),
605             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
606             phasename (tk->kind), tk->peer1, tk->peer2,
607             tk->leader, tk->repetition);
608
609   return buf;
610 }
611
612 static const char *
613 debug_str_diff_key (struct DiffKey *dk)
614 {
615   static char buf[256];
616
617   snprintf (buf, sizeof (buf),
618             "DiffKey kind=%s, k1=%d, k2=%d",
619             diffname (dk->diff_kind), dk->k1, dk->k2);
620
621   return buf;
622 }
623
624 static const char *
625 debug_str_set_key (const struct SetKey *sk)
626 {
627   static char buf[256];
628
629   snprintf (buf, sizeof (buf),
630             "SetKey kind=%s, k1=%d, k2=%d",
631             setname (sk->set_kind), sk->k1, sk->k2);
632
633   return buf;
634 }
635
636
637 static const char *
638 debug_str_rfn_key (const struct RfnKey *rk)
639 {
640   static char buf[256];
641
642   snprintf (buf, sizeof (buf),
643             "RfnKey kind=%s, k1=%d, k2=%d",
644             rfnname (rk->rfn_kind), rk->k1, rk->k2);
645
646   return buf;
647 }
648
649 #endif /* GNUNET_EXTRA_LOGGING */
650
651
652 /**
653  * Send the final result set of the consensus to the client, element by
654  * element.
655  *
656  * @param cls closure
657  * @param element the current element, NULL if all elements have been
658  *        iterated over
659  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
660  */
661 static int
662 send_to_client_iter (void *cls,
663                      const struct GNUNET_SET_Element *element)
664 {
665   struct TaskEntry *task = (struct TaskEntry *) cls;
666   struct ConsensusSession *session = task->step->session;
667   struct GNUNET_MQ_Envelope *ev;
668
669   if (NULL != element)
670   {
671     struct GNUNET_CONSENSUS_ElementMessage *m;
672
673     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674                 "P%d: sending element %s to client\n",
675                 session->local_peer_idx,
676                 debug_str_element (element));
677
678     ev = GNUNET_MQ_msg_extra (m, element->size,
679                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
680     m->element_type = htons (element->element_type);
681     GNUNET_memcpy (&m[1], element->data, element->size);
682     GNUNET_MQ_send (session->client_mq, ev);
683   }
684   else
685   {
686     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687                 "P%d: finished iterating elements for client\n",
688                 session->local_peer_idx);
689     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
690     GNUNET_MQ_send (session->client_mq, ev);
691   }
692   return GNUNET_YES;
693 }
694
695
696 static struct SetEntry *
697 lookup_set (struct ConsensusSession *session, struct SetKey *key)
698 {
699   struct GNUNET_HashCode hash;
700
701   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702               "P%u: looking up set {%s}\n",
703               session->local_peer_idx,
704               debug_str_set_key (key));
705
706   GNUNET_assert (SET_KIND_NONE != key->set_kind);
707   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
708   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
709 }
710
711
712 static struct DiffEntry *
713 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
714 {
715   struct GNUNET_HashCode hash;
716
717   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718               "P%u: looking up diff {%s}\n",
719               session->local_peer_idx,
720               debug_str_diff_key (key));
721
722   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
723   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
724   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
725 }
726
727
728 static struct ReferendumEntry *
729 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
730 {
731   struct GNUNET_HashCode hash;
732
733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734               "P%u: looking up rfn {%s}\n",
735               session->local_peer_idx,
736               debug_str_rfn_key (key));
737
738   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
739   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
740   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
741 }
742
743
744 static void
745 diff_insert (struct DiffEntry *diff,
746              int weight,
747              const struct GNUNET_SET_Element *element)
748 {
749   struct DiffElementInfo *di;
750   struct GNUNET_HashCode hash;
751
752   GNUNET_assert ( (1 == weight) || (-1 == weight));
753
754   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
755               "diff_insert with element size %u\n",
756               element->size);
757
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759               "hashing element\n");
760
761   GNUNET_SET_element_hash (element, &hash);
762
763   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
764               "hashed element\n");
765
766   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
767
768   if (NULL == di)
769   {
770     di = GNUNET_new (struct DiffElementInfo);
771     di->element = GNUNET_SET_element_dup (element);
772     GNUNET_assert (GNUNET_OK ==
773                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
774                                                       &hash, di,
775                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
776   }
777
778   di->weight = weight;
779 }
780
781
782 static void
783 rfn_commit (struct ReferendumEntry *rfn,
784             uint16_t commit_peer)
785 {
786   GNUNET_assert (commit_peer < rfn->num_peers);
787
788   rfn->peer_commited[commit_peer] = GNUNET_YES;
789 }
790
791
792 static void
793 rfn_contest (struct ReferendumEntry *rfn,
794              uint16_t contested_peer)
795 {
796   GNUNET_assert (contested_peer < rfn->num_peers);
797
798   rfn->peer_contested[contested_peer] = GNUNET_YES;
799 }
800
801
802 static uint16_t
803 rfn_noncontested (struct ReferendumEntry *rfn)
804 {
805   uint16_t i;
806   uint16_t ret;
807
808   ret = 0;
809   for (i = 0; i < rfn->num_peers; i++)
810     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
811       ret++;
812
813   return ret;
814 }
815
816
817 static void
818 rfn_vote (struct ReferendumEntry *rfn,
819           uint16_t voting_peer,
820           enum ReferendumVote vote,
821           const struct GNUNET_SET_Element *element)
822 {
823   struct RfnElementInfo *ri;
824   struct GNUNET_HashCode hash;
825
826   GNUNET_assert (voting_peer < rfn->num_peers);
827
828   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
829      since VOTE_KEEP is implicit in not voting. */
830   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
831
832   GNUNET_SET_element_hash (element, &hash);
833   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
834
835   if (NULL == ri)
836   {
837     ri = GNUNET_new (struct RfnElementInfo);
838     ri->element = GNUNET_SET_element_dup (element);
839     ri->votes = GNUNET_new_array (rfn->num_peers, int);
840     GNUNET_assert (GNUNET_OK ==
841                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
842                                                       &hash, ri,
843                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
844   }
845
846   ri->votes[voting_peer] = GNUNET_YES;
847   ri->proposal = vote;
848 }
849
850
851 uint16_t
852 task_other_peer (struct TaskEntry *task)
853 {
854   uint16_t me = task->step->session->local_peer_idx;
855   if (task->key.peer1 == me)
856     return task->key.peer2;
857   return task->key.peer1;
858 }
859
860
861 /**
862  * Callback for set operation results. Called for each element
863  * in the result set.
864  *
865  * @param cls closure
866  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
867  * @param status see enum GNUNET_SET_Status
868  */
869 static void
870 set_result_cb (void *cls,
871                const struct GNUNET_SET_Element *element,
872                enum GNUNET_SET_Status status)
873 {
874   struct TaskEntry *task = cls;
875   struct ConsensusSession *session = task->step->session;
876   struct SetEntry *output_set = NULL;
877   struct DiffEntry *output_diff = NULL;
878   struct ReferendumEntry *output_rfn = NULL;
879   unsigned int other_idx;
880   struct SetOpCls *setop;
881
882   setop = &task->cls.setop;
883
884
885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886               "P%u: got set result for {%s}, status %u\n",
887               session->local_peer_idx,
888               debug_str_task_key (&task->key),
889               status);
890
891   if (GNUNET_NO == task->is_started)
892   {
893     GNUNET_break_op (0);
894     return;
895   }
896
897   if (GNUNET_YES == task->is_finished)
898   {
899     GNUNET_break_op (0);
900     return;
901   }
902
903   other_idx = task_other_peer (task);
904
905   if (SET_KIND_NONE != setop->output_set.set_kind)
906   {
907     output_set = lookup_set (session, &setop->output_set);
908     GNUNET_assert (NULL != output_set);
909   }
910
911   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
912   {
913     output_diff = lookup_diff (session, &setop->output_diff);
914     GNUNET_assert (NULL != output_diff);
915   }
916
917   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
918   {
919     output_rfn = lookup_rfn (session, &setop->output_rfn);
920     GNUNET_assert (NULL != output_rfn);
921   }
922
923   if (GNUNET_YES == session->peers_blacklisted[other_idx])
924   {
925     /* Peer might have been blacklisted
926        by a gradecast running in parallel, ignore elements from now */
927     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
928       return;
929     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
930       return;
931   }
932
933   if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
934   {
935     if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
936     {
937       GNUNET_assert (NULL != output_rfn);
938       rfn_contest (output_rfn, task_other_peer (task));
939       return;
940     }
941   }
942
943   switch (status)
944   {
945     case GNUNET_SET_STATUS_ADD_LOCAL:
946       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
947                   "Adding element in Task {%s}\n",
948                   debug_str_task_key (&task->key));
949       if (NULL != output_set)
950       {
951         // FIXME: record pending adds, use callback
952         GNUNET_SET_add_element (output_set->h,
953                                 element,
954                                 NULL,
955                                 NULL);
956 #ifdef GNUNET_EXTRA_LOGGING
957         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
958                     "P%u: adding element %s into set {%s} of task {%s}\n",
959                     session->local_peer_idx,
960                     debug_str_element (element),
961                     debug_str_set_key (&setop->output_set),
962                     debug_str_task_key (&task->key));
963 #endif
964       }
965       if (NULL != output_diff)
966       {
967         diff_insert (output_diff, 1, element);
968 #ifdef GNUNET_EXTRA_LOGGING
969         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
970                     "P%u: adding element %s into diff {%s} of task {%s}\n",
971                     session->local_peer_idx,
972                     debug_str_element (element),
973                     debug_str_diff_key (&setop->output_diff),
974                     debug_str_task_key (&task->key));
975 #endif
976       }
977       if (NULL != output_rfn)
978       {
979         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
980 #ifdef GNUNET_EXTRA_LOGGING
981         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
982                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
983                     session->local_peer_idx,
984                     debug_str_element (element),
985                     debug_str_rfn_key (&setop->output_rfn),
986                     debug_str_task_key (&task->key));
987 #endif
988       }
989       // XXX: add result to structures in task
990       break;
991     case GNUNET_SET_STATUS_ADD_REMOTE:
992       if (GNUNET_YES == setop->do_not_remove)
993         break;
994       if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
995         break;
996       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997                   "Removing element in Task {%s}\n",
998                   debug_str_task_key (&task->key));
999       if (NULL != output_set)
1000       {
1001         // FIXME: record pending adds, use callback
1002         GNUNET_SET_remove_element (output_set->h,
1003                                    element,
1004                                    NULL,
1005                                    NULL);
1006 #ifdef GNUNET_EXTRA_LOGGING
1007         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1008                     "P%u: removing element %s from set {%s} of task {%s}\n",
1009                     session->local_peer_idx,
1010                     debug_str_element (element),
1011                     debug_str_set_key (&setop->output_set),
1012                     debug_str_task_key (&task->key));
1013 #endif
1014       }
1015       if (NULL != output_diff)
1016       {
1017         diff_insert (output_diff, -1, element);
1018 #ifdef GNUNET_EXTRA_LOGGING
1019         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1020                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1021                     session->local_peer_idx,
1022                     debug_str_element (element),
1023                     debug_str_diff_key (&setop->output_diff),
1024                     debug_str_task_key (&task->key));
1025 #endif
1026       }
1027       if (NULL != output_rfn)
1028       {
1029         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1030 #ifdef GNUNET_EXTRA_LOGGING
1031         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1032                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1033                     session->local_peer_idx,
1034                     debug_str_element (element),
1035                     debug_str_rfn_key (&setop->output_rfn),
1036                     debug_str_task_key (&task->key));
1037 #endif
1038       }
1039       break;
1040     case GNUNET_SET_STATUS_DONE:
1041       // XXX: check first if any changes to the underlying
1042       // set are still pending
1043       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044                   "Finishing setop in Task {%s}\n",
1045                   debug_str_task_key (&task->key));
1046       if (NULL != output_rfn)
1047       {
1048         rfn_commit (output_rfn, task_other_peer (task));
1049       }
1050       finish_task (task);
1051       break;
1052     case GNUNET_SET_STATUS_FAILURE:
1053       // XXX: cleanup
1054       GNUNET_break_op (0);
1055       finish_task (task);
1056       return;
1057     default:
1058       /* not reached */
1059       GNUNET_assert (0);
1060   }
1061 }
1062
1063 #ifdef EVIL
1064
1065 enum EvilnessType
1066 {
1067   EVILNESS_NONE,
1068   EVILNESS_CRAM_ALL,
1069   EVILNESS_CRAM_LEAD,
1070   EVILNESS_CRAM_ECHO,
1071   EVILNESS_SLACK,
1072 };
1073
1074 enum EvilnessSubType
1075 {
1076   EVILNESS_SUB_NONE,
1077   EVILNESS_SUB_REPLACEMENT,
1078   EVILNESS_SUB_NO_REPLACEMENT,
1079 };
1080
1081 struct Evilness
1082 {
1083   enum EvilnessType type;
1084   enum EvilnessSubType subtype;
1085   unsigned int num;
1086 };
1087
1088
1089 static int
1090 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1091 {
1092   if (0 == strcmp ("replace", evil_subtype_str))
1093   {
1094     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1095   }
1096   else if (0 == strcmp ("noreplace", evil_subtype_str))
1097   {
1098     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1099   }
1100   else
1101   {
1102     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1103                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1104                 evil_subtype_str);
1105     return GNUNET_SYSERR;
1106   }
1107   return GNUNET_OK;
1108 }
1109
1110
1111 static void
1112 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1113 {
1114   char *evil_spec;
1115   char *field;
1116   char *evil_type_str = NULL;
1117   char *evil_subtype_str = NULL;
1118
1119   GNUNET_assert (NULL != evil);
1120
1121   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1122   {
1123     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124                 "P%u: no evilness\n",
1125                 session->local_peer_idx);
1126     evil->type = EVILNESS_NONE;
1127     return;
1128   }
1129   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1130               "P%u: got evilness spec\n",
1131               session->local_peer_idx);
1132
1133   for (field = strtok (evil_spec, "/");
1134        NULL != field;
1135        field = strtok (NULL, "/"))
1136   {
1137     unsigned int peer_num;
1138     unsigned int evil_num;
1139     int ret;
1140
1141     evil_type_str = NULL;
1142     evil_subtype_str = NULL;
1143
1144     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1145
1146     if (ret != 4)
1147     {
1148       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1149                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1150                   field,
1151                   ret);
1152       goto not_evil;
1153     }
1154
1155     GNUNET_assert (NULL != evil_type_str);
1156     GNUNET_assert (NULL != evil_subtype_str);
1157
1158     if (peer_num == session->local_peer_idx)
1159     {
1160       if (0 == strcmp ("slack", evil_type_str))
1161       {
1162         evil->type = EVILNESS_SLACK;
1163       }
1164       else if (0 == strcmp ("cram-all", evil_type_str))
1165       {
1166         evil->type = EVILNESS_CRAM_ALL;
1167         evil->num = evil_num;
1168         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1169           goto not_evil;
1170       }
1171       else if (0 == strcmp ("cram-lead", evil_type_str))
1172       {
1173         evil->type = EVILNESS_CRAM_LEAD;
1174         evil->num = evil_num;
1175         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1176           goto not_evil;
1177       }
1178       else if (0 == strcmp ("cram-echo", evil_type_str))
1179       {
1180         evil->type = EVILNESS_CRAM_ECHO;
1181         evil->num = evil_num;
1182         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1183           goto not_evil;
1184       }
1185       else
1186       {
1187         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1188                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1189                     evil_type_str);
1190         goto not_evil;
1191       }
1192       goto cleanup;
1193     }
1194     /* No GNUNET_free since memory was allocated by libc */
1195     free (evil_type_str);
1196     evil_type_str = NULL;
1197     evil_subtype_str = NULL;
1198   }
1199 not_evil:
1200   evil->type = EVILNESS_NONE;
1201 cleanup:
1202   GNUNET_free (evil_spec);
1203   /* no GNUNET_free_non_null since it wasn't
1204    * allocated with GNUNET_malloc */
1205   if (NULL != evil_type_str)
1206     free (evil_type_str);
1207   if (NULL != evil_subtype_str)
1208     free (evil_subtype_str);
1209 }
1210
1211 #endif
1212
1213
1214 /**
1215  * Commit the appropriate set for a
1216  * task.
1217  */
1218 static void
1219 commit_set (struct ConsensusSession *session,
1220             struct TaskEntry *task)
1221 {
1222   struct SetEntry *set;
1223   struct SetOpCls *setop = &task->cls.setop;
1224
1225   GNUNET_assert (NULL != setop->op);
1226   set = lookup_set (session, &setop->input_set);
1227   GNUNET_assert (NULL != set);
1228
1229 #ifdef EVIL
1230   {
1231     unsigned int i;
1232     struct Evilness evil;
1233
1234     get_evilness (session, &evil);
1235     if (EVILNESS_NONE != evil.type)
1236     {
1237       /* Useful for evaluation */
1238       GNUNET_STATISTICS_set (statistics,
1239                              "is evil",
1240                              1,
1241                              GNUNET_NO);
1242     }
1243     switch (evil.type)
1244     {
1245       case EVILNESS_CRAM_ALL:
1246       case EVILNESS_CRAM_LEAD:
1247       case EVILNESS_CRAM_ECHO:
1248         /* We're not cramming elements in the
1249            all-to-all round, since that would just
1250            add more elements to the result set, but
1251            wouldn't test robustness. */
1252         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1253         {
1254           GNUNET_SET_commit (setop->op, set->h);
1255           break;
1256         }
1257         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1258             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1259         {
1260           GNUNET_SET_commit (setop->op, set->h);
1261           break;
1262         }
1263         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1264         {
1265           GNUNET_SET_commit (setop->op, set->h);
1266           break;
1267         }
1268         for (i = 0; i < evil.num; i++)
1269         {
1270           struct GNUNET_HashCode hash;
1271           struct GNUNET_SET_Element element;
1272           element.data = &hash;
1273           element.size = sizeof (struct GNUNET_HashCode);
1274           element.element_type = 0;
1275
1276           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1277           {
1278             /* Always generate a new element. */
1279             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
1280           }
1281           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1282           {
1283             /* Always cram the same elements, derived from counter. */
1284             GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
1285           }
1286           else
1287           {
1288             GNUNET_assert (0);
1289           }
1290           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1291 #ifdef GNUNET_EXTRA_LOGGING
1292           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1293                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1294                       session->local_peer_idx,
1295                       debug_str_element (&element),
1296                       debug_str_set_key (&setop->input_set),
1297                       debug_str_task_key (&task->key));
1298 #endif
1299         }
1300         GNUNET_STATISTICS_update (statistics,
1301                                   "# stuffed elements",
1302                                   evil.num,
1303                                   GNUNET_NO);
1304         GNUNET_SET_commit (setop->op, set->h);
1305         break;
1306       case EVILNESS_SLACK:
1307         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1308                     "P%u: evil peer: slacking\n",
1309                     (unsigned int) session->local_peer_idx);
1310         /* Do nothing. */
1311         break;
1312       case EVILNESS_NONE:
1313         GNUNET_SET_commit (setop->op, set->h);
1314         break;
1315     }
1316   }
1317 #else
1318   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1319   {
1320     struct GNUNET_SET_Element element;
1321     struct ContestedPayload payload;
1322     element.data = &payload;
1323     element.size = sizeof (struct ContestedPayload);
1324     element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1325     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1326   }
1327   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1328   {
1329     GNUNET_SET_commit (setop->op, set->h);
1330   }
1331   else
1332   {
1333     /* For our testcases, we don't want the blacklisted
1334        peers to wait. */
1335     GNUNET_SET_operation_cancel (setop->op);
1336     setop->op = NULL;
1337   }
1338 #endif
1339 }
1340
1341
1342 static void
1343 put_diff (struct ConsensusSession *session,
1344          struct DiffEntry *diff)
1345 {
1346   struct GNUNET_HashCode hash;
1347
1348   GNUNET_assert (NULL != diff);
1349
1350   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1351   GNUNET_assert (GNUNET_OK ==
1352                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1353                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1354 }
1355
1356 static void
1357 put_set (struct ConsensusSession *session,
1358          struct SetEntry *set)
1359 {
1360   struct GNUNET_HashCode hash;
1361
1362   GNUNET_assert (NULL != set->h);
1363
1364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1365               "Putting set %s\n",
1366               debug_str_set_key (&set->key));
1367
1368   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1369   GNUNET_assert (GNUNET_SYSERR !=
1370                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1371                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1372 }
1373
1374
1375 static void
1376 put_rfn (struct ConsensusSession *session,
1377          struct ReferendumEntry *rfn)
1378 {
1379   struct GNUNET_HashCode hash;
1380
1381   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1382   GNUNET_assert (GNUNET_OK ==
1383                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1384                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1385 }
1386
1387
1388
1389 static void
1390 task_cancel_reconcile (struct TaskEntry *task)
1391 {
1392   /* not implemented yet */
1393   GNUNET_assert (0);
1394 }
1395
1396
1397 static void
1398 apply_diff_to_rfn (struct DiffEntry *diff,
1399                    struct ReferendumEntry *rfn,
1400                    uint16_t voting_peer,
1401                    uint16_t num_peers)
1402 {
1403   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1404   struct DiffElementInfo *di;
1405
1406   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1407
1408   while (GNUNET_YES ==
1409          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1410                                                       NULL,
1411                                                       (const void **) &di))
1412   {
1413     if (di->weight > 0)
1414     {
1415       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1416     }
1417     if (di->weight < 0)
1418     {
1419       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1420     }
1421   }
1422
1423   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1424 }
1425
1426
1427 struct DiffEntry *
1428 diff_create ()
1429 {
1430   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1431
1432   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1433
1434   return d;
1435 }
1436
1437
1438 struct DiffEntry *
1439 diff_compose (struct DiffEntry *diff_1,
1440               struct DiffEntry *diff_2)
1441 {
1442   struct DiffEntry *diff_new;
1443   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1444   struct DiffElementInfo *di;
1445
1446   diff_new = diff_create ();
1447
1448   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1449   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1450   {
1451     diff_insert (diff_new, di->weight, di->element);
1452   }
1453   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1454
1455   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1456   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1457   {
1458     diff_insert (diff_new, di->weight, di->element);
1459   }
1460   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1461
1462   return diff_new;
1463 }
1464
1465
1466 struct ReferendumEntry *
1467 rfn_create (uint16_t size)
1468 {
1469   struct ReferendumEntry *rfn;
1470
1471   rfn = GNUNET_new (struct ReferendumEntry);
1472   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1473   rfn->peer_commited = GNUNET_new_array (size, int);
1474   rfn->peer_contested = GNUNET_new_array (size, int);
1475   rfn->num_peers = size;
1476
1477   return rfn;
1478 }
1479
1480
1481 #if UNUSED
1482 static void
1483 diff_destroy (struct DiffEntry *diff)
1484 {
1485   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1486   GNUNET_free (diff);
1487 }
1488 #endif
1489
1490
1491 /**
1492  * For a given majority, count what the outcome
1493  * is (add/remove/keep), and give the number
1494  * of peers that voted for this outcome.
1495  */
1496 static void
1497 rfn_majority (const struct ReferendumEntry *rfn,
1498               const struct RfnElementInfo *ri,
1499               uint16_t *ret_majority,
1500               enum ReferendumVote *ret_vote)
1501 {
1502   uint16_t votes_yes = 0;
1503   uint16_t num_commited = 0;
1504   uint16_t i;
1505
1506   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1507               "Computing rfn majority for element %s of rfn {%s}\n",
1508               debug_str_element (ri->element),
1509               debug_str_rfn_key (&rfn->key));
1510
1511   for (i = 0; i < rfn->num_peers; i++)
1512   {
1513     if (GNUNET_NO == rfn->peer_commited[i])
1514       continue;
1515     num_commited++;
1516
1517     if (GNUNET_YES == ri->votes[i])
1518       votes_yes++;
1519   }
1520
1521   if (votes_yes > (num_commited) / 2)
1522   {
1523     *ret_vote = ri->proposal;
1524     *ret_majority = votes_yes;
1525   }
1526   else
1527   {
1528     *ret_vote = VOTE_STAY;
1529     *ret_majority = num_commited - votes_yes;
1530   }
1531 }
1532
1533
1534 struct SetCopyCls
1535 {
1536   struct TaskEntry *task;
1537   struct SetKey dst_set_key;
1538 };
1539
1540
1541 static void
1542 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1543 {
1544   struct SetCopyCls *scc = cls;
1545   struct TaskEntry *task = scc->task;
1546   struct SetKey dst_set_key = scc->dst_set_key;
1547   struct SetEntry *set;
1548
1549   GNUNET_free (scc);
1550   set = GNUNET_new (struct SetEntry);
1551   set->h = copy;
1552   set->key = dst_set_key;
1553   put_set (task->step->session, set);
1554
1555   task->start (task);
1556 }
1557
1558
1559 /**
1560  * Call the start function of the given
1561  * task again after we created a copy of the given set.
1562  */
1563 static void
1564 create_set_copy_for_task (struct TaskEntry *task,
1565                           struct SetKey *src_set_key,
1566                           struct SetKey *dst_set_key)
1567 {
1568   struct SetEntry *src_set;
1569   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1570
1571   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1572               "Copying set {%s} to {%s} for task {%s}\n",
1573               debug_str_set_key (src_set_key),
1574               debug_str_set_key (dst_set_key),
1575               debug_str_task_key (&task->key));
1576
1577   scc->task = task;
1578   scc->dst_set_key = *dst_set_key;
1579   src_set = lookup_set (task->step->session, src_set_key);
1580   GNUNET_assert (NULL != src_set);
1581   GNUNET_SET_copy_lazy (src_set->h,
1582                         set_copy_cb,
1583                         scc);
1584 }
1585
1586
1587 struct SetMutationProgressCls
1588 {
1589   int num_pending;
1590   /**
1591    * Task to finish once all changes are through.
1592    */
1593   struct TaskEntry *task;
1594 };
1595
1596
1597 static void
1598 set_mutation_done (void *cls)
1599 {
1600   struct SetMutationProgressCls *pc = cls;
1601
1602   GNUNET_assert (pc->num_pending > 0);
1603
1604   pc->num_pending--;
1605
1606   if (0 == pc->num_pending)
1607   {
1608     struct TaskEntry *task = pc->task;
1609     GNUNET_free (pc);
1610     finish_task (task);
1611   }
1612 }
1613
1614
1615 static void
1616 try_finish_step_early (struct Step *step)
1617 {
1618   unsigned int i;
1619
1620   if (GNUNET_YES == step->is_running)
1621     return;
1622   if (GNUNET_YES == step->is_finished)
1623     return;
1624   if (GNUNET_NO == step->early_finishable)
1625     return;
1626
1627   step->is_finished = GNUNET_YES;
1628
1629 #ifdef GNUNET_EXTRA_LOGGING
1630   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1631               "Finishing step `%s' early.\n",
1632               step->debug_name);
1633 #endif
1634
1635   for (i = 0; i < step->subordinates_len; i++)
1636   {
1637     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1638     step->subordinates[i]->pending_prereq--;
1639 #ifdef GNUNET_EXTRA_LOGGING
1640     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1641                 "Decreased pending_prereq to %u for step `%s'.\n",
1642                 (unsigned int) step->subordinates[i]->pending_prereq,
1643                 step->subordinates[i]->debug_name);
1644
1645 #endif
1646     try_finish_step_early (step->subordinates[i]);
1647   }
1648
1649   // XXX: maybe schedule as task to avoid recursion?
1650   run_ready_steps (step->session);
1651 }
1652
1653
1654 static void
1655 finish_step (struct Step *step)
1656 {
1657   unsigned int i;
1658
1659   GNUNET_assert (step->finished_tasks == step->tasks_len);
1660   GNUNET_assert (GNUNET_YES == step->is_running);
1661   GNUNET_assert (GNUNET_NO == step->is_finished);
1662
1663 #ifdef GNUNET_EXTRA_LOGGING
1664   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1665               "All tasks of step `%s' with %u subordinates finished.\n",
1666               step->debug_name,
1667               step->subordinates_len);
1668 #endif
1669
1670   for (i = 0; i < step->subordinates_len; i++)
1671   {
1672     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1673     step->subordinates[i]->pending_prereq--;
1674 #ifdef GNUNET_EXTRA_LOGGING
1675     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1676                 "Decreased pending_prereq to %u for step `%s'.\n",
1677                 (unsigned int) step->subordinates[i]->pending_prereq,
1678                 step->subordinates[i]->debug_name);
1679
1680 #endif
1681   }
1682
1683   step->is_finished = GNUNET_YES;
1684
1685   // XXX: maybe schedule as task to avoid recursion?
1686   run_ready_steps (step->session);
1687 }
1688
1689
1690
1691 /**
1692  * Apply the result from one round of gradecasts (i.e. every peer
1693  * should have gradecasted) to the peer's current set.
1694  *
1695  * @param task the task with context information
1696  */
1697 static void
1698 task_start_apply_round (struct TaskEntry *task)
1699 {
1700   struct ConsensusSession *session = task->step->session;
1701   struct SetKey sk_in;
1702   struct SetKey sk_out;
1703   struct RfnKey rk_in;
1704   struct SetEntry *set_out;
1705   struct ReferendumEntry *rfn_in;
1706   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1707   struct RfnElementInfo *ri;
1708   struct SetMutationProgressCls *progress_cls;
1709   uint16_t worst_majority = UINT16_MAX;
1710
1711   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1712   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1713   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1714
1715   set_out = lookup_set (session, &sk_out);
1716   if (NULL == set_out)
1717   {
1718     create_set_copy_for_task (task, &sk_in, &sk_out);
1719     return;
1720   }
1721
1722   rfn_in = lookup_rfn (session, &rk_in);
1723   GNUNET_assert (NULL != rfn_in);
1724
1725   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1726   progress_cls->task = task;
1727
1728   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1729
1730   while (GNUNET_YES ==
1731          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1732                                                       NULL,
1733                                                       (const void **) &ri))
1734   {
1735     uint16_t majority_num;
1736     enum ReferendumVote majority_vote;
1737
1738     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1739
1740     if (worst_majority > majority_num)
1741       worst_majority = majority_num;
1742
1743     switch (majority_vote)
1744     {
1745       case VOTE_ADD:
1746         progress_cls->num_pending++;
1747         GNUNET_assert (GNUNET_OK ==
1748                        GNUNET_SET_add_element (set_out->h,
1749                                                ri->element,
1750                                                &set_mutation_done,
1751                                                progress_cls));
1752         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1753                     "P%u: apply round: adding element %s with %u-majority.\n",
1754                     session->local_peer_idx,
1755                     debug_str_element (ri->element), majority_num);
1756         break;
1757       case VOTE_REMOVE:
1758         progress_cls->num_pending++;
1759         GNUNET_assert (GNUNET_OK ==
1760                        GNUNET_SET_remove_element (set_out->h,
1761                                                   ri->element,
1762                                                   &set_mutation_done,
1763                                                   progress_cls));
1764         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1765                     "P%u: apply round: deleting element %s with %u-majority.\n",
1766                     session->local_peer_idx,
1767                     debug_str_element (ri->element), majority_num);
1768         break;
1769       case VOTE_STAY:
1770         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1771                     "P%u: apply round: keeping element %s with %u-majority.\n",
1772                     session->local_peer_idx,
1773                     debug_str_element (ri->element), majority_num);
1774         // do nothing
1775         break;
1776       default:
1777         GNUNET_assert (0);
1778         break;
1779     }
1780   }
1781
1782   if (0 == progress_cls->num_pending)
1783   {
1784     // call closure right now, no pending ops
1785     GNUNET_free (progress_cls);
1786     finish_task (task);
1787   }
1788
1789   {
1790     uint16_t thresh = (session->num_peers / 3) * 2;
1791
1792     if (worst_majority >= thresh)
1793     {
1794       switch (session->early_stopping)
1795       {
1796         case EARLY_STOPPING_NONE:
1797           session->early_stopping = EARLY_STOPPING_ONE_MORE;
1798           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1799                       "P%u: Stopping early (after one more superround)\n",
1800                       session->local_peer_idx);
1801           break;
1802         case EARLY_STOPPING_ONE_MORE:
1803           GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1804                       session->local_peer_idx);
1805           session->early_stopping = EARLY_STOPPING_DONE;
1806           {
1807             struct Step *step;
1808             for (step = session->steps_head; NULL != step; step = step->next)
1809               try_finish_step_early (step);
1810           }
1811           break;
1812         case EARLY_STOPPING_DONE:
1813           /* We shouldn't be here anymore after early stopping */
1814           GNUNET_break (0);
1815           break;
1816         default:
1817           GNUNET_assert (0);
1818           break;
1819       }
1820     }
1821     else if (EARLY_STOPPING_NONE != session->early_stopping)
1822     {
1823       // Our assumption about the number of bad peers
1824       // has been broken.
1825       GNUNET_break_op (0);
1826     }
1827     else
1828     {
1829       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1830                   session->local_peer_idx);
1831     }
1832   }
1833   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1834 }
1835
1836
1837 static void
1838 task_start_grade (struct TaskEntry *task)
1839 {
1840   struct ConsensusSession *session = task->step->session;
1841   struct ReferendumEntry *output_rfn;
1842   struct ReferendumEntry *input_rfn;
1843   struct DiffEntry *input_diff;
1844   struct RfnKey rfn_key;
1845   struct DiffKey diff_key;
1846   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1847   struct RfnElementInfo *ri;
1848   unsigned int gradecast_confidence = 2;
1849
1850   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1851   output_rfn = lookup_rfn (session, &rfn_key);
1852   if (NULL == output_rfn)
1853   {
1854     output_rfn = rfn_create (session->num_peers);
1855     output_rfn->key = rfn_key;
1856     put_rfn (session, output_rfn);
1857   }
1858
1859   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1860   input_diff = lookup_diff (session, &diff_key);
1861   GNUNET_assert (NULL != input_diff);
1862
1863   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1864   input_rfn = lookup_rfn (session, &rfn_key);
1865   GNUNET_assert (NULL != input_rfn);
1866
1867   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1868
1869   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1870
1871   while (GNUNET_YES ==
1872          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1873                                                       NULL,
1874                                                       (const void **) &ri))
1875   {
1876     uint16_t majority_num;
1877     enum ReferendumVote majority_vote;
1878
1879     // XXX: we need contested votes and non-contested votes here
1880     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1881
1882     if (majority_num <= session->num_peers / 3)
1883       majority_vote = VOTE_REMOVE;
1884
1885     switch (majority_vote)
1886     {
1887       case VOTE_STAY:
1888         break;
1889       case VOTE_ADD:
1890         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1891         break;
1892       case VOTE_REMOVE:
1893         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1894         break;
1895       default:
1896         GNUNET_assert (0);
1897         break;
1898     }
1899   }
1900   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1901
1902   {
1903     uint16_t noncontested;
1904     noncontested = rfn_noncontested (input_rfn);
1905     if (noncontested < (session->num_peers / 3) * 2)
1906     {
1907       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1908     }
1909     if (noncontested < (session->num_peers / 3) + 1)
1910     {
1911       gradecast_confidence = 0;
1912     }
1913   }
1914
1915   if (gradecast_confidence >= 1)
1916     rfn_commit (output_rfn, task->key.leader);
1917
1918   if (gradecast_confidence <= 1)
1919     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1920
1921   finish_task (task);
1922 }
1923
1924
1925 static void
1926 task_start_reconcile (struct TaskEntry *task)
1927 {
1928   struct SetEntry *input;
1929   struct SetOpCls *setop = &task->cls.setop;
1930   struct ConsensusSession *session = task->step->session;
1931
1932   input = lookup_set (session, &setop->input_set);
1933   GNUNET_assert (NULL != input);
1934   GNUNET_assert (NULL != input->h);
1935
1936   /* We create the outputs for the operation here
1937      (rather than in the set operation callback)
1938      because we want something valid in there, even
1939      if the other peer doesn't talk to us */
1940
1941   if (SET_KIND_NONE != setop->output_set.set_kind)
1942   {
1943     /* If we don't have an existing output set,
1944        we clone the input set. */
1945     if (NULL == lookup_set (session, &setop->output_set))
1946     {
1947       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1948       return;
1949     }
1950   }
1951
1952   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1953   {
1954     if (NULL == lookup_rfn (session, &setop->output_rfn))
1955     {
1956       struct ReferendumEntry *rfn;
1957
1958       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1959                   "P%u: output rfn <%s> missing, creating.\n",
1960                   session->local_peer_idx,
1961                   debug_str_rfn_key (&setop->output_rfn));
1962
1963       rfn = rfn_create (session->num_peers);
1964       rfn->key = setop->output_rfn;
1965       put_rfn (session, rfn);
1966     }
1967   }
1968
1969   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1970   {
1971     if (NULL == lookup_diff (session, &setop->output_diff))
1972     {
1973       struct DiffEntry *diff;
1974
1975       diff = diff_create ();
1976       diff->key = setop->output_diff;
1977       put_diff (session, diff);
1978     }
1979   }
1980
1981   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
1982   {
1983     /* XXX: mark the corresponding rfn as commited if necessary */
1984     finish_task (task);
1985     return;
1986   }
1987
1988   if (task->key.peer1 == session->local_peer_idx)
1989   {
1990     struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
1991
1992     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1993                 "P%u: Looking up set {%s} to run remote union\n",
1994                 session->local_peer_idx,
1995                 debug_str_set_key (&setop->input_set));
1996
1997     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
1998     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
1999
2000     rcm.kind = htons (task->key.kind);
2001     rcm.peer1 = htons (task->key.peer1);
2002     rcm.peer2 = htons (task->key.peer2);
2003     rcm.leader = htons (task->key.leader);
2004     rcm.repetition = htons (task->key.repetition);
2005
2006     GNUNET_assert (NULL == setop->op);
2007     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2008                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2009
2010     // XXX: maybe this should be done while
2011     // setting up tasks alreays?
2012     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2013                                     &session->global_id,
2014                                     &rcm.header,
2015                                     GNUNET_SET_RESULT_SYMMETRIC,
2016                                     set_result_cb,
2017                                     task);
2018
2019     commit_set (session, task);
2020   }
2021   else if (task->key.peer2 == session->local_peer_idx)
2022   {
2023     /* Wait for the other peer to contact us */
2024     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2025                 session->local_peer_idx, task->key.peer1);
2026
2027     if (NULL != setop->op)
2028     {
2029       commit_set (session, task);
2030     }
2031   }
2032   else
2033   {
2034     /* We made an error while constructing the task graph. */
2035     GNUNET_assert (0);
2036   }
2037 }
2038
2039
2040 static void
2041 task_start_eval_echo (struct TaskEntry *task)
2042 {
2043   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2044   struct ReferendumEntry *input_rfn;
2045   struct RfnElementInfo *ri;
2046   struct SetEntry *output_set;
2047   struct SetMutationProgressCls *progress_cls;
2048   struct ConsensusSession *session = task->step->session;
2049   struct SetKey sk_in;
2050   struct SetKey sk_out;
2051   struct RfnKey rk_in;
2052
2053   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2054   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2055   output_set = lookup_set (session, &sk_out);
2056   if (NULL == output_set)
2057   {
2058     create_set_copy_for_task (task, &sk_in, &sk_out);
2059     return;
2060   }
2061
2062
2063   {
2064     // FIXME: should be marked as a shallow copy, so
2065     // we can destroy everything correctly
2066     struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2067     last_set->h = output_set->h;
2068     last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2069     put_set (session, last_set);
2070   }
2071
2072   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2073               "Evaluating referendum in Task {%s}\n",
2074               debug_str_task_key (&task->key));
2075
2076   progress_cls = GNUNET_new (struct SetMutationProgressCls);
2077   progress_cls->task = task;
2078
2079   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2080   input_rfn = lookup_rfn (session, &rk_in);
2081
2082   GNUNET_assert (NULL != input_rfn);
2083
2084   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2085   GNUNET_assert (NULL != iter);
2086
2087   while (GNUNET_YES ==
2088          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2089                                                       NULL,
2090                                                       (const void **) &ri))
2091   {
2092     enum ReferendumVote majority_vote;
2093     uint16_t majority_num;
2094
2095     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2096
2097     if (majority_num < session->num_peers / 3)
2098     {
2099       /* It is not the case that all nonfaulty peers
2100          echoed the same value.  Since we're doing a set reconciliation, we
2101          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
2102          reconciliation as contested.  Other peers might not know that the
2103          leader is faulty, thus we still re-distribute in the confirmation
2104          round. */
2105       output_set->is_contested = GNUNET_YES;
2106     }
2107
2108     switch (majority_vote)
2109     {
2110       case VOTE_ADD:
2111         progress_cls->num_pending++;
2112         GNUNET_assert (GNUNET_OK ==
2113                        GNUNET_SET_add_element (output_set->h,
2114                                                ri->element,
2115                                                set_mutation_done,
2116                                                progress_cls));
2117         break;
2118       case VOTE_REMOVE:
2119         progress_cls->num_pending++;
2120         GNUNET_assert (GNUNET_OK ==
2121                        GNUNET_SET_remove_element (output_set->h,
2122                                                   ri->element,
2123                                                   set_mutation_done,
2124                                                   progress_cls));
2125         break;
2126       case VOTE_STAY:
2127         /* Nothing to do. */
2128         break;
2129       default:
2130         /* not reached */
2131         GNUNET_assert (0);
2132     }
2133   }
2134
2135   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2136
2137   if (0 == progress_cls->num_pending)
2138   {
2139     // call closure right now, no pending ops
2140     GNUNET_free (progress_cls);
2141     finish_task (task);
2142   }
2143 }
2144
2145
2146 static void
2147 task_start_finish (struct TaskEntry *task)
2148 {
2149   struct SetEntry *final_set;
2150   struct ConsensusSession *session = task->step->session;
2151
2152   final_set = lookup_set (session, &task->cls.finish.input_set);
2153
2154   GNUNET_assert (NULL != final_set);
2155
2156
2157   GNUNET_SET_iterate (final_set->h,
2158                       send_to_client_iter,
2159                       task);
2160 }
2161
2162 static void
2163 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2164 {
2165   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2166
2167   GNUNET_assert (GNUNET_NO == task->is_started);
2168   GNUNET_assert (GNUNET_NO == task->is_finished);
2169   GNUNET_assert (NULL != task->start);
2170
2171   task->start (task);
2172
2173   task->is_started = GNUNET_YES;
2174 }
2175
2176
2177
2178
2179 /*
2180  * Run all steps of the session that don't any
2181  * more dependencies.
2182  */
2183 static void
2184 run_ready_steps (struct ConsensusSession *session)
2185 {
2186   struct Step *step;
2187
2188   step = session->steps_head;
2189
2190   while (NULL != step)
2191   {
2192     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2193     {
2194       size_t i;
2195
2196       GNUNET_assert (0 == step->finished_tasks);
2197
2198 #ifdef GNUNET_EXTRA_LOGGING
2199       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2200                   session->local_peer_idx,
2201                   step->debug_name,
2202                   step->round, step->tasks_len, step->subordinates_len);
2203 #endif
2204
2205       step->is_running = GNUNET_YES;
2206       for (i = 0; i < step->tasks_len; i++)
2207         start_task (session, step->tasks[i]);
2208
2209       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2210       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2211         finish_step (step);
2212
2213       /* Running the next ready steps will be triggered by task completion */
2214       return;
2215     }
2216     step = step->next;
2217   }
2218
2219   return;
2220 }
2221
2222
2223
2224 static void
2225 finish_task (struct TaskEntry *task)
2226 {
2227   GNUNET_assert (GNUNET_NO == task->is_finished);
2228   task->is_finished = GNUNET_YES;
2229
2230   task->step->finished_tasks++;
2231
2232   if (task->step->finished_tasks == task->step->tasks_len)
2233     finish_step (task->step);
2234 }
2235
2236
2237 /**
2238  * Search peer in the list of peers in session.
2239  *
2240  * @param peer peer to find
2241  * @param session session with peer
2242  * @return index of peer, -1 if peer is not in session
2243  */
2244 static int
2245 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2246 {
2247   int i;
2248   for (i = 0; i < session->num_peers; i++)
2249     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2250       return i;
2251   return -1;
2252 }
2253
2254
2255 /**
2256  * Compute a global, (hopefully) unique consensus session id,
2257  * from the local id of the consensus session, and the identities of all participants.
2258  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2259  * exactly the same peers, the global id will be different.
2260  *
2261  * @param session session to generate the global id for
2262  * @param local_session_id local id of the consensus session
2263  */
2264 static void
2265 compute_global_id (struct ConsensusSession *session,
2266                    const struct GNUNET_HashCode *local_session_id)
2267 {
2268   const char *salt = "gnunet-service-consensus/session_id";
2269
2270   GNUNET_assert (GNUNET_YES ==
2271                  GNUNET_CRYPTO_kdf (&session->global_id,
2272                                     sizeof (struct GNUNET_HashCode),
2273                                     salt,
2274                                     strlen (salt),
2275                                     session->peers,
2276                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2277                                     local_session_id,
2278                                     sizeof (struct GNUNET_HashCode),
2279                                     NULL));
2280 }
2281
2282
2283 /**
2284  * Compare two peer identities.
2285  *
2286  * @param h1 some peer identity
2287  * @param h2 some peer identity
2288  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2289  */
2290 static int
2291 peer_id_cmp (const void *h1, const void *h2)
2292 {
2293   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2294 }
2295
2296
2297 /**
2298  * Create the sorted list of peers for the session,
2299  * add the local peer if not in the join message.
2300  *
2301  * @param session session to initialize
2302  * @param join_msg join message with the list of peers participating at the end
2303  */
2304 static void
2305 initialize_session_peer_list (struct ConsensusSession *session,
2306                               const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2307 {
2308   const struct GNUNET_PeerIdentity *msg_peers
2309     = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2310   int local_peer_in_list;
2311
2312   session->num_peers = ntohl (join_msg->num_peers);
2313
2314   /* Peers in the join message, may or may not include the local peer,
2315      Add it if it is missing. */
2316   local_peer_in_list = GNUNET_NO;
2317   for (unsigned int i = 0; i < session->num_peers; i++)
2318   {
2319     if (0 == memcmp (&msg_peers[i],
2320                      &my_peer,
2321                      sizeof (struct GNUNET_PeerIdentity)))
2322     {
2323       local_peer_in_list = GNUNET_YES;
2324       break;
2325     }
2326   }
2327   if (GNUNET_NO == local_peer_in_list)
2328     session->num_peers++;
2329
2330   session->peers = GNUNET_new_array (session->num_peers,
2331                                      struct GNUNET_PeerIdentity);
2332   if (GNUNET_NO == local_peer_in_list)
2333     session->peers[session->num_peers - 1] = my_peer;
2334
2335   GNUNET_memcpy (session->peers,
2336                  msg_peers,
2337                  ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2338   qsort (session->peers,
2339          session->num_peers,
2340          sizeof (struct GNUNET_PeerIdentity),
2341          &peer_id_cmp);
2342 }
2343
2344
2345 static struct TaskEntry *
2346 lookup_task (struct ConsensusSession *session,
2347              struct TaskKey *key)
2348 {
2349   struct GNUNET_HashCode hash;
2350
2351
2352   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2353   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2354               GNUNET_h2s (&hash));
2355   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2356 }
2357
2358
2359 /**
2360  * Called when another peer wants to do a set operation with the
2361  * local peer.
2362  *
2363  * @param cls closure
2364  * @param other_peer the other peer
2365  * @param context_msg message with application specific information from
2366  *        the other peer
2367  * @param request request from the other peer, use GNUNET_SET_accept
2368  *        to accept it, otherwise the request will be refused
2369  *        Note that we don't use a return value here, as it is also
2370  *        necessary to specify the set we want to do the operation with,
2371  *        whith sometimes can be derived from the context message.
2372  *        Also necessary to specify the timeout.
2373  */
2374 static void
2375 set_listen_cb (void *cls,
2376                const struct GNUNET_PeerIdentity *other_peer,
2377                const struct GNUNET_MessageHeader *context_msg,
2378                struct GNUNET_SET_Request *request)
2379 {
2380   struct ConsensusSession *session = cls;
2381   struct TaskKey tk;
2382   struct TaskEntry *task;
2383   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2384
2385   if (NULL == context_msg)
2386   {
2387     GNUNET_break_op (0);
2388     return;
2389   }
2390
2391   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2392   {
2393     GNUNET_break_op (0);
2394     return;
2395   }
2396
2397   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2398   {
2399     GNUNET_break_op (0);
2400     return;
2401   }
2402
2403   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2404
2405   tk = ((struct TaskKey) {
2406       .kind = ntohs (cm->kind),
2407       .peer1 = ntohs (cm->peer1),
2408       .peer2 = ntohs (cm->peer2),
2409       .repetition = ntohs (cm->repetition),
2410       .leader = ntohs (cm->leader),
2411   });
2412
2413   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2414               session->local_peer_idx, debug_str_task_key (&tk));
2415
2416   task = lookup_task (session, &tk);
2417
2418   if (NULL == task)
2419   {
2420     GNUNET_break_op (0);
2421     return;
2422   }
2423
2424   if (GNUNET_YES == task->is_finished)
2425   {
2426     GNUNET_break_op (0);
2427     return;
2428   }
2429
2430   if (task->key.peer2 != session->local_peer_idx)
2431   {
2432     /* We're being asked, so we must be thne 2nd peer. */
2433     GNUNET_break_op (0);
2434     return;
2435   }
2436
2437   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2438                     (task->key.peer2 == session->local_peer_idx)));
2439
2440   task->cls.setop.op = GNUNET_SET_accept (request,
2441                                           GNUNET_SET_RESULT_SYMMETRIC,
2442                                           set_result_cb,
2443                                           task);
2444
2445   /* If the task hasn't been started yet,
2446      we wait for that until we commit. */
2447
2448   if (GNUNET_YES == task->is_started)
2449   {
2450     commit_set (session, task);
2451   }
2452 }
2453
2454
2455
2456 static void
2457 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2458           struct TaskEntry *t)
2459 {
2460   struct GNUNET_HashCode round_hash;
2461   struct Step *s;
2462
2463   GNUNET_assert (NULL != t->step);
2464
2465   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2466
2467   s = t->step;
2468
2469   if (s->tasks_len == s->tasks_cap)
2470   {
2471     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2472     GNUNET_array_grow (s->tasks,
2473                        s->tasks_cap,
2474                        target_size);
2475   }
2476
2477 #ifdef GNUNET_EXTRA_LOGGING
2478   GNUNET_assert (NULL != s->debug_name);
2479   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2480               debug_str_task_key (&t->key),
2481               s->debug_name);
2482 #endif
2483
2484   s->tasks[s->tasks_len] = t;
2485   s->tasks_len++;
2486
2487   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2488   GNUNET_assert (GNUNET_OK ==
2489       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2490                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2491 }
2492
2493
2494 static void
2495 install_step_timeouts (struct ConsensusSession *session)
2496 {
2497   /* Given the fully constructed task graph
2498      with rounds for tasks, we can give the tasks timeouts. */
2499
2500   // unsigned int max_round;
2501
2502   /* XXX: implement! */
2503 }
2504
2505
2506
2507 /*
2508  * Arrange two peers in some canonical order.
2509  */
2510 static void
2511 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2512 {
2513   uint16_t a;
2514   uint16_t b;
2515
2516   GNUNET_assert (*p1 < n);
2517   GNUNET_assert (*p2 < n);
2518
2519   if (*p1 < *p2)
2520   {
2521     a = *p1;
2522     b = *p2;
2523   }
2524   else
2525   {
2526     a = *p2;
2527     b = *p1;
2528   }
2529
2530   /* For uniformly random *p1, *p2,
2531      this condition is true with 50% chance */
2532   if (((b - a) + n) % n <= n / 2)
2533   {
2534     *p1 = a;
2535     *p2 = b;
2536   }
2537   else
2538   {
2539     *p1 = b;
2540     *p2 = a;
2541   }
2542 }
2543
2544
2545 /**
2546  * Record @a dep as a dependency of @a step.
2547  */
2548 static void
2549 step_depend_on (struct Step *step, struct Step *dep)
2550 {
2551   /* We're not checking for cyclic dependencies,
2552      but this is a cheap sanity check. */
2553   GNUNET_assert (step != dep);
2554   GNUNET_assert (NULL != step);
2555   GNUNET_assert (NULL != dep);
2556   GNUNET_assert (dep->round <= step->round);
2557
2558 #ifdef GNUNET_EXTRA_LOGGING
2559   /* Make sure we have complete debugging information.
2560      Also checks that we don't screw up too badly
2561      constructing the task graph. */
2562   GNUNET_assert (NULL != step->debug_name);
2563   GNUNET_assert (NULL != dep->debug_name);
2564   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2565               "Making step `%s' depend on `%s'\n",
2566               step->debug_name,
2567               dep->debug_name);
2568 #endif
2569
2570   if (dep->subordinates_cap == dep->subordinates_len)
2571   {
2572     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2573     GNUNET_array_grow (dep->subordinates,
2574                        dep->subordinates_cap,
2575                        target_size);
2576   }
2577
2578   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2579
2580   dep->subordinates[dep->subordinates_len] = step;
2581   dep->subordinates_len++;
2582
2583   step->pending_prereq++;
2584 }
2585
2586
2587 static struct Step *
2588 create_step (struct ConsensusSession *session, int round, int early_finishable)
2589 {
2590   struct Step *step;
2591   step = GNUNET_new (struct Step);
2592   step->session = session;
2593   step->round = round;
2594   step->early_finishable = early_finishable;
2595   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2596                                     session->steps_tail,
2597                                     step);
2598   return step;
2599 }
2600
2601
2602 /**
2603  * Construct the task graph for a single
2604  * gradecast.
2605  */
2606 static void
2607 construct_task_graph_gradecast (struct ConsensusSession *session,
2608                                 uint16_t rep,
2609                                 uint16_t lead,
2610                                 struct Step *step_before,
2611                                 struct Step *step_after)
2612 {
2613   uint16_t n = session->num_peers;
2614   uint16_t me = session->local_peer_idx;
2615
2616   uint16_t p1;
2617   uint16_t p2;
2618
2619   /* The task we're currently setting up. */
2620   struct TaskEntry task;
2621
2622   struct Step *step;
2623   struct Step *prev_step;
2624
2625   uint16_t round;
2626
2627   unsigned int k;
2628
2629   round = step_before->round + 1;
2630
2631   /* gcast step 1: leader disseminates */
2632
2633   step = create_step (session, round, GNUNET_YES);
2634
2635 #ifdef GNUNET_EXTRA_LOGGING
2636   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2637 #endif
2638   step_depend_on (step, step_before);
2639
2640   if (lead == me)
2641   {
2642     for (k = 0; k < n; k++)
2643     {
2644       if (k == me)
2645         continue;
2646       p1 = me;
2647       p2 = k;
2648       arrange_peers (&p1, &p2, n);
2649       task = ((struct TaskEntry) {
2650         .step = step,
2651         .start = task_start_reconcile,
2652         .cancel = task_cancel_reconcile,
2653         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2654       });
2655       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2656       put_task (session->taskmap, &task);
2657     }
2658     /* We run this task to make sure that the leader
2659        has the stored the SET_KIND_LEADER set of himself,
2660        so he can participate in the rest of the gradecast
2661        without the code having to handle any special cases. */
2662     task = ((struct TaskEntry) {
2663       .step = step,
2664       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2665       .start = task_start_reconcile,
2666       .cancel = task_cancel_reconcile,
2667     });
2668     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2669     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2670     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2671     put_task (session->taskmap, &task);
2672   }
2673   else
2674   {
2675     p1 = me;
2676     p2 = lead;
2677     arrange_peers (&p1, &p2, n);
2678     task = ((struct TaskEntry) {
2679       .step = step,
2680       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2681       .start = task_start_reconcile,
2682       .cancel = task_cancel_reconcile,
2683     });
2684     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2685     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2686     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2687     put_task (session->taskmap, &task);
2688   }
2689
2690   /* gcast phase 2: echo */
2691   prev_step = step;
2692   round += 1;
2693   step = create_step (session, round, GNUNET_YES);
2694 #ifdef GNUNET_EXTRA_LOGGING
2695   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2696 #endif
2697   step_depend_on (step, prev_step);
2698
2699   for (k = 0; k < n; k++)
2700   {
2701     p1 = k;
2702     p2 = me;
2703     arrange_peers (&p1, &p2, n);
2704     task = ((struct TaskEntry) {
2705       .step = step,
2706       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2707       .start = task_start_reconcile,
2708       .cancel = task_cancel_reconcile,
2709     });
2710     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2711     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2712     put_task (session->taskmap, &task);
2713   }
2714
2715   prev_step = step;
2716   /* Same round, since step only has local tasks */
2717   step = create_step (session, round, GNUNET_YES);
2718 #ifdef GNUNET_EXTRA_LOGGING
2719   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2720 #endif
2721   step_depend_on (step, prev_step);
2722
2723   arrange_peers (&p1, &p2, n);
2724   task = ((struct TaskEntry) {
2725     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2726     .step = step,
2727     .start = task_start_eval_echo
2728   });
2729   put_task (session->taskmap, &task);
2730
2731   prev_step = step;
2732   round += 1;
2733   step = create_step (session, round, GNUNET_YES);
2734 #ifdef GNUNET_EXTRA_LOGGING
2735   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2736 #endif
2737   step_depend_on (step, prev_step);
2738
2739   /* gcast phase 3: confirmation and grading */
2740   for (k = 0; k < n; k++)
2741   {
2742     p1 = k;
2743     p2 = me;
2744     arrange_peers (&p1, &p2, n);
2745     task = ((struct TaskEntry) {
2746       .step = step,
2747       .start = task_start_reconcile,
2748       .cancel = task_cancel_reconcile,
2749       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2750     });
2751     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2752     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2753     /* If there was at least one element in the echo round that was
2754        contested (i.e. it had no n-t majority), then we let the other peers
2755        know, and other peers let us know.  The contested flag for each peer is
2756        stored in the rfn. */
2757     task.cls.setop.transceive_contested = GNUNET_YES;
2758     put_task (session->taskmap, &task);
2759   }
2760
2761   prev_step = step;
2762   /* Same round, since step only has local tasks */
2763   step = create_step (session, round, GNUNET_YES);
2764 #ifdef GNUNET_EXTRA_LOGGING
2765   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2766 #endif
2767   step_depend_on (step, prev_step);
2768
2769   task = ((struct TaskEntry) {
2770     .step = step,
2771     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2772     .start = task_start_grade,
2773   });
2774   put_task (session->taskmap, &task);
2775
2776   step_depend_on (step_after, step);
2777 }
2778
2779
2780 static void
2781 construct_task_graph (struct ConsensusSession *session)
2782 {
2783   uint16_t n = session->num_peers;
2784   uint16_t t = n / 3;
2785
2786   uint16_t me = session->local_peer_idx;
2787
2788   /* The task we're currently setting up. */
2789   struct TaskEntry task;
2790
2791   /* Current leader */
2792   unsigned int lead;
2793
2794   struct Step *step;
2795   struct Step *prev_step;
2796
2797   unsigned int round = 0;
2798
2799   unsigned int i;
2800
2801   // XXX: introduce first step,
2802   // where we wait for all insert acks
2803   // from the set service
2804
2805   /* faster but brittle all-to-all */
2806
2807   // XXX: Not implemented yet
2808
2809   /* all-to-all step */
2810
2811   step = create_step (session, round, GNUNET_NO);
2812
2813 #ifdef GNUNET_EXTRA_LOGGING
2814   step->debug_name = GNUNET_strdup ("all to all");
2815 #endif
2816
2817   for (i = 0; i < n; i++)
2818   {
2819     uint16_t p1;
2820     uint16_t p2;
2821
2822     p1 = me;
2823     p2 = i;
2824     arrange_peers (&p1, &p2, n);
2825     task = ((struct TaskEntry) {
2826       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2827       .step = step,
2828       .start = task_start_reconcile,
2829       .cancel = task_cancel_reconcile,
2830     });
2831     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2832     task.cls.setop.output_set = task.cls.setop.input_set;
2833     task.cls.setop.do_not_remove = GNUNET_YES;
2834     put_task (session->taskmap, &task);
2835   }
2836
2837   prev_step = step;
2838   step = NULL;
2839
2840   round += 1;
2841
2842   /* Byzantine union */
2843
2844   /* sequential repetitions of the gradecasts */
2845   for (i = 0; i < t + 1; i++)
2846   {
2847     struct Step *step_rep_start;
2848     struct Step *step_rep_end;
2849
2850     /* Every repetition is in a separate round. */
2851     step_rep_start = create_step (session, round, GNUNET_YES);
2852 #ifdef GNUNET_EXTRA_LOGGING
2853     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2854 #endif
2855
2856     step_depend_on (step_rep_start, prev_step);
2857
2858     /* gradecast has three rounds */
2859     round += 3;
2860     step_rep_end = create_step (session, round, GNUNET_YES);
2861 #ifdef GNUNET_EXTRA_LOGGING
2862     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2863 #endif
2864
2865     /* parallel gradecasts */
2866     for (lead = 0; lead < n; lead++)
2867       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2868
2869     task = ((struct TaskEntry) {
2870       .step = step_rep_end,
2871       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2872       .start = task_start_apply_round,
2873     });
2874     put_task (session->taskmap, &task);
2875
2876     prev_step = step_rep_end;
2877   }
2878
2879  /* There is no next gradecast round, thus the final
2880     start step is the overall end step of the gradecasts */
2881   round += 1;
2882   step = create_step (session, round, GNUNET_NO);
2883 #ifdef GNUNET_EXTRA_LOGGING
2884   GNUNET_asprintf (&step->debug_name, "finish");
2885 #endif
2886   step_depend_on (step, prev_step);
2887
2888   task = ((struct TaskEntry) {
2889     .step = step,
2890     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2891     .start = task_start_finish,
2892   });
2893   task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2894
2895   put_task (session->taskmap, &task);
2896 }
2897
2898
2899
2900 /**
2901  * Check join message.
2902  *
2903  * @param cls session of client that sent the message
2904  * @param m message sent by the client
2905  * @return #GNUNET_OK if @a m is well-formed
2906  */
2907 static int
2908 check_client_join (void *cls,
2909                    const struct GNUNET_CONSENSUS_JoinMessage *m)
2910 {
2911   uint32_t listed_peers = ntohl (m->num_peers);
2912
2913   if ( (ntohs (m->header.size) - sizeof (*m)) !=
2914        listed_peers * sizeof (struct GNUNET_PeerIdentity))
2915   {
2916     GNUNET_break (0);
2917     return GNUNET_SYSERR;
2918   }
2919   return GNUNET_OK;
2920 }
2921
2922
2923 /**
2924  * Called when a client wants to join a consensus session.
2925  *
2926  * @param cls session of client that sent the message
2927  * @param m message sent by the client
2928  */
2929 static void
2930 handle_client_join (void *cls,
2931                     const struct GNUNET_CONSENSUS_JoinMessage *m)
2932 {
2933   struct ConsensusSession *session = cls;
2934   struct ConsensusSession *other_session;
2935
2936   initialize_session_peer_list (session,
2937                                 m);
2938   compute_global_id (session,
2939                      &m->session_id);
2940
2941   /* Check if some local client already owns the session.
2942      It is only legal to have a session with an existing global id
2943      if all other sessions with this global id are finished.*/
2944   for (other_session = sessions_head;
2945        NULL != other_session;
2946        other_session = other_session->next)
2947   {
2948     if ( (other_session != session) &&
2949          (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
2950                                        &other_session->global_id)) )
2951       break;
2952   }
2953
2954   session->conclude_deadline
2955     = GNUNET_TIME_absolute_ntoh (m->deadline);
2956   session->conclude_start
2957     = GNUNET_TIME_absolute_ntoh (m->start);
2958   session->local_peer_idx = get_peer_idx (&my_peer,
2959                                           session);
2960   GNUNET_assert (-1 != session->local_peer_idx);
2961
2962   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2963               "Joining consensus session %s containing %u peers as %u with timeout %s\n",
2964               GNUNET_h2s (&m->session_id),
2965               session->num_peers,
2966               session->local_peer_idx,
2967               GNUNET_STRINGS_relative_time_to_string
2968               (GNUNET_TIME_absolute_get_difference (session->conclude_start,
2969                                                     session->conclude_deadline),
2970                GNUNET_YES));
2971
2972   session->set_listener
2973     = GNUNET_SET_listen (cfg,
2974                          GNUNET_SET_OPERATION_UNION,
2975                          &session->global_id,
2976                          &set_listen_cb,
2977                          session);
2978
2979   session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
2980                                                           GNUNET_NO);
2981   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
2982                                                            GNUNET_NO);
2983   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
2984                                                            GNUNET_NO);
2985   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
2986                                                           GNUNET_NO);
2987
2988   {
2989     struct SetEntry *client_set;
2990
2991     client_set = GNUNET_new (struct SetEntry);
2992     client_set->h = GNUNET_SET_create (cfg,
2993                                        GNUNET_SET_OPERATION_UNION);
2994     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2995     put_set (session,
2996              client_set);
2997   }
2998
2999   session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3000                                                  int);
3001
3002   /* Just construct the task graph,
3003      but don't run anything until the client calls conclude. */
3004   construct_task_graph (session);
3005   GNUNET_SERVICE_client_continue (session->client);
3006 }
3007
3008
3009 static void
3010 client_insert_done (void *cls)
3011 {
3012   // FIXME: implement
3013 }
3014
3015
3016 /**
3017  * Called when a client performs an insert operation.
3018  *
3019  * @param cls client handle
3020  * @param msg message sent by the client
3021  * @return #GNUNET_OK (always well-formed)
3022  */
3023 static int
3024 check_client_insert (void *cls,
3025                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3026 {
3027   return GNUNET_OK;
3028 }
3029
3030
3031 /**
3032  * Called when a client performs an insert operation.
3033  *
3034  * @param cls client handle
3035  * @param msg message sent by the client
3036  */
3037 static void
3038 handle_client_insert (void *cls,
3039                       const struct GNUNET_CONSENSUS_ElementMessage *msg)
3040 {
3041   struct ConsensusSession *session = cls;
3042   struct GNUNET_SET_Element *element;
3043   ssize_t element_size;
3044   struct GNUNET_SET_Handle *initial_set;
3045
3046   if (GNUNET_YES == session->conclude_started)
3047   {
3048     GNUNET_break (0);
3049     GNUNET_SERVICE_client_drop (session->client);
3050     return;
3051   }
3052   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3053   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
3054   element->element_type = msg->element_type;
3055   element->size = element_size;
3056   GNUNET_memcpy (&element[1], &msg[1], element_size);
3057   element->data = &element[1];
3058   {
3059     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3060     struct SetEntry *entry;
3061
3062     entry = lookup_set (session,
3063                         &key);
3064     GNUNET_assert (NULL != entry);
3065     initial_set = entry->h;
3066   }
3067   session->num_client_insert_pending++;
3068   GNUNET_SET_add_element (initial_set,
3069                           element,
3070                           &client_insert_done,
3071                           session);
3072
3073 #ifdef GNUNET_EXTRA_LOGGING
3074   {
3075     struct GNUNET_HashCode hash;
3076
3077     GNUNET_SET_element_hash (element,
3078                              &hash);
3079
3080     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3081                 "P%u: element %s added\n",
3082                 session->local_peer_idx,
3083                 GNUNET_h2s (&hash));
3084   }
3085 #endif
3086   GNUNET_free (element);
3087   GNUNET_SERVICE_client_continue (session->client);
3088 }
3089
3090
3091 /**
3092  * Called when a client performs the conclude operation.
3093  *
3094  * @param cls client handle
3095  * @param message message sent by the client
3096  */
3097 static void
3098 handle_client_conclude (void *cls,
3099                         const struct GNUNET_MessageHeader *message)
3100 {
3101   struct ConsensusSession *session = cls;
3102
3103   if (GNUNET_YES == session->conclude_started)
3104   {
3105     /* conclude started twice */
3106     GNUNET_break (0);
3107     GNUNET_SERVICE_client_drop (session->client);
3108     return;
3109   }
3110   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3111               "conclude requested\n");
3112   session->conclude_started = GNUNET_YES;
3113   install_step_timeouts (session);
3114   run_ready_steps (session);
3115   GNUNET_SERVICE_client_continue (session->client);
3116 }
3117
3118
3119 /**
3120  * Called to clean up, after a shutdown has been requested.
3121  *
3122  * @param cls closure
3123  */
3124 static void
3125 shutdown_task (void *cls)
3126 {
3127   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3128               "shutting down\n");
3129   GNUNET_STATISTICS_destroy (statistics,
3130                              GNUNET_NO);
3131   statistics = NULL;
3132 }
3133
3134
3135 /**
3136  * Start processing consensus requests.
3137  *
3138  * @param cls closure
3139  * @param c configuration to use
3140  * @param service the initialized service
3141  */
3142 static void
3143 run (void *cls,
3144      const struct GNUNET_CONFIGURATION_Handle *c,
3145      struct GNUNET_SERVICE_Handle *service)
3146 {
3147   cfg = c;
3148   if (GNUNET_OK !=
3149       GNUNET_CRYPTO_get_peer_identity (cfg,
3150                                        &my_peer))
3151   {
3152     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3153                 "Could not retrieve host identity\n");
3154     GNUNET_SCHEDULER_shutdown ();
3155     return;
3156   }
3157   statistics = GNUNET_STATISTICS_create ("consensus",
3158                                          cfg);
3159   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3160                                  NULL);
3161 }
3162
3163
3164 /**
3165  * Callback called when a client connects to the service.
3166  *
3167  * @param cls closure for the service
3168  * @param c the new client that connected to the service
3169  * @param mq the message queue used to send messages to the client
3170  * @return @a c
3171  */
3172 static void *
3173 client_connect_cb (void *cls,
3174                    struct GNUNET_SERVICE_Client *c,
3175                    struct GNUNET_MQ_Handle *mq)
3176 {
3177   struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3178
3179   session->client = c;
3180   session->client_mq = mq;
3181   GNUNET_CONTAINER_DLL_insert (sessions_head,
3182                                sessions_tail,
3183                                session);
3184   return session;
3185 }
3186
3187
3188 /**
3189  * Callback called when a client disconnected from the service
3190  *
3191  * @param cls closure for the service
3192  * @param c the client that disconnected
3193  * @param internal_cls should be equal to @a c
3194  */
3195 static void
3196 client_disconnect_cb (void *cls,
3197                       struct GNUNET_SERVICE_Client *c,
3198                       void *internal_cls)
3199 {
3200   struct ConsensusSession *session = internal_cls;
3201
3202   if (NULL != session->set_listener)
3203   {
3204     GNUNET_SET_listen_cancel (session->set_listener);
3205     session->set_listener = NULL;
3206   }
3207   GNUNET_CONTAINER_DLL_remove (sessions_head,
3208                                sessions_tail,
3209                                session);
3210   GNUNET_free (session);
3211 }
3212
3213
3214 /**
3215  * Define "main" method using service macro.
3216  */
3217 GNUNET_SERVICE_MAIN
3218 ("consensus",
3219  GNUNET_SERVICE_OPTION_NONE,
3220  &run,
3221  &client_connect_cb,
3222  &client_disconnect_cb,
3223  NULL,
3224  GNUNET_MQ_hd_fixed_size (client_conclude,
3225                           GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3226                           struct GNUNET_MessageHeader,
3227                           NULL),
3228  GNUNET_MQ_hd_var_size (client_insert,
3229                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3230                         struct GNUNET_CONSENSUS_ElementMessage,
3231                         NULL),
3232  GNUNET_MQ_hd_var_size (client_join,
3233                         GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3234                         struct GNUNET_CONSENSUS_JoinMessage,
3235                         NULL),
3236  GNUNET_MQ_handler_end ());
3237
3238 /* end of gnunet-service-consensus.c */