-fix (C) notices
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
36
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
38
39
40 enum ReferendumVote
41 {
42   /**
43    * Vote that nothing should change.
44    * This option is never voted explicitly.
45    */
46   VOTE_STAY = 0,
47   /**
48    * Vote that an element should be added.
49    */
50   VOTE_ADD = 1,
51   /**
52    * Vote that an element should be removed.
53    */
54   VOTE_REMOVE = 2,
55 };
56
57
58 GNUNET_NETWORK_STRUCT_BEGIN
59
60
61 struct ContestedPayload
62 {
63 };
64
65 /**
66  * Tuple of integers that together
67  * identify a task uniquely.
68  */
69 struct TaskKey {
70   /**
71    * A value from 'enum PhaseKind'.
72    */
73   uint16_t kind GNUNET_PACKED;
74
75   /**
76    * Number of the first peer
77    * in canonical order.
78    */
79   int16_t peer1 GNUNET_PACKED;
80
81   /**
82    * Number of the second peer in canonical order.
83    */
84   int16_t peer2 GNUNET_PACKED;
85
86   /**
87    * Repetition of the gradecast phase.
88    */
89   int16_t repetition GNUNET_PACKED;
90
91   /**
92    * Leader in the gradecast phase.
93    *
94    * Can be different from both peer1 and peer2.
95    */
96   int16_t leader GNUNET_PACKED;
97 };
98
99
100
101 struct SetKey
102 {
103   int set_kind GNUNET_PACKED;
104   int k1 GNUNET_PACKED;
105   int k2 GNUNET_PACKED;
106 };
107
108
109 struct SetEntry
110 {
111   struct SetKey key;
112   struct GNUNET_SET_Handle *h;
113   /**
114    * GNUNET_YES if the set resulted
115    * from applying a referendum with contested
116    * elements.
117    */
118   int is_contested;
119 };
120
121
122 struct DiffKey
123 {
124   int diff_kind GNUNET_PACKED;
125   int k1 GNUNET_PACKED;
126   int k2 GNUNET_PACKED;
127 };
128
129 struct RfnKey
130 {
131   int rfn_kind GNUNET_PACKED;
132   int k1 GNUNET_PACKED;
133   int k2 GNUNET_PACKED;
134 };
135
136
137 GNUNET_NETWORK_STRUCT_END
138
139 enum PhaseKind
140 {
141   PHASE_KIND_ALL_TO_ALL,
142   PHASE_KIND_GRADECAST_LEADER,
143   PHASE_KIND_GRADECAST_ECHO,
144   PHASE_KIND_GRADECAST_ECHO_GRADE,
145   PHASE_KIND_GRADECAST_CONFIRM,
146   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
147   /**
148    * Apply a repetition of the all-to-all
149    * gradecast to the current set.
150    */
151   PHASE_KIND_APPLY_REP,
152   PHASE_KIND_FINISH,
153 };
154
155
156 enum SetKind
157 {
158   SET_KIND_NONE = 0,
159   SET_KIND_CURRENT,
160   SET_KIND_LEADER_PROPOSAL,
161   SET_KIND_ECHO_RESULT,
162 };
163
164 enum DiffKind
165 {
166   DIFF_KIND_NONE = 0,
167   DIFF_KIND_LEADER_PROPOSAL,
168   DIFF_KIND_LEADER_CONSENSUS,
169   DIFF_KIND_GRADECAST_RESULT,
170 };
171
172 enum RfnKind
173 {
174   RFN_KIND_NONE = 0,
175   RFN_KIND_ECHO,
176   RFN_KIND_CONFIRM,
177   RFN_KIND_GRADECAST_RESULT
178 };
179
180
181 struct SetOpCls
182 {
183   struct SetKey input_set;
184
185   struct SetKey output_set;
186   struct RfnKey output_rfn;
187   struct DiffKey output_diff;
188
189   int do_not_remove;
190
191   int transceive_contested;
192
193   struct GNUNET_SET_OperationHandle *op;
194 };
195
196
197 struct FinishCls
198 {
199   struct SetKey input_set;
200 };
201
202 /**
203  * Closure for both @a start_task
204  * and @a cancel_task.
205  */
206 union TaskFuncCls
207 {
208   struct SetOpCls setop;
209   struct FinishCls finish;
210 };
211
212 struct TaskEntry;
213
214 typedef void (*TaskFunc) (struct TaskEntry *task);
215
216 /*
217  * Node in the consensus task graph.
218  */
219 struct TaskEntry
220 {
221   struct TaskKey key;
222
223   struct Step *step;
224
225   int is_started;
226
227   int is_finished;
228
229   TaskFunc start;
230   TaskFunc cancel;
231
232   union TaskFuncCls cls;
233 };
234
235
236 struct Step
237 {
238   /**
239    * All steps of one session are in a
240    * linked list for easier deallocation.
241    */
242   struct Step *prev;
243
244   /**
245    * All steps of one session are in a
246    * linked list for easier deallocation.
247    */
248   struct Step *next;
249
250   struct ConsensusSession *session;
251
252   struct TaskEntry **tasks;
253   unsigned int tasks_len;
254   unsigned int tasks_cap;
255
256   unsigned int finished_tasks;
257
258   /*
259    * Tasks that have this task as dependency.
260    *
261    * We store pointers to subordinates rather
262    * than to prerequisites since it makes
263    * tracking the readiness of a task easier.
264    */
265   struct Step **subordinates;
266   unsigned int subordinates_len;
267   unsigned int subordinates_cap;
268
269   /**
270    * Counter for the prerequisites of
271    * this step.
272    */
273   size_t pending_prereq;
274
275   /*
276    * Task that will run this step despite
277    * any pending prerequisites.
278    */
279   struct GNUNET_SCHEDULER_Task *timeout_task;
280
281   unsigned int is_running;
282
283   unsigned int is_finished;
284
285   /*
286    * Synchrony round of the task.
287    * Determines the deadline for the task.
288    */
289   unsigned int round;
290
291   /**
292    * Human-readable name for
293    * the task, used for debugging.
294    */
295   char *debug_name;
296 };
297
298
299 struct RfnElementInfo
300 {
301   const struct GNUNET_SET_Element *element;
302
303   /*
304    * GNUNET_YES if the peer votes for the proposal.
305    */
306   int *votes;
307
308   /**
309    * Proposal for this element,
310    * can only be VOTE_ADD or VOTE_REMOVE.
311    */
312   enum ReferendumVote proposal;
313 };
314
315
316 struct ReferendumEntry
317 {
318   struct RfnKey key;
319
320   /*
321    * Elements where there is at least one proposed change.
322    *
323    * Maps the hash of the GNUNET_SET_Element
324    * to 'struct RfnElementInfo'.
325    */
326   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
327
328   unsigned int num_peers;
329
330   /**
331    * Stores, for every peer in the session,
332    * whether the peer finished the whole referendum.
333    *
334    * Votes from peers are only counted if they're
335    * marked as commited (#GNUNET_YES) in the referendum.
336    *
337    * Otherwise (#GNUNET_NO), the requested changes are
338    * not counted for majority votes or thresholds.
339    */
340   int *peer_commited;
341
342
343   /**
344    * Contestation state of the peer.  If a peer is contested, the values it
345    * contributed are still counted for applying changes, but the grading is
346    * affected.
347    */
348   int *peer_contested;
349 };
350
351
352 struct DiffElementInfo
353 {
354   const struct GNUNET_SET_Element *element;
355
356   /**
357    * Positive weight for 'add', negative
358    * weights for 'remove'.
359    */
360   int weight;
361 };
362
363
364 /**
365  * Weighted diff.
366  */
367 struct DiffEntry
368 {
369   struct DiffKey key;
370   struct GNUNET_CONTAINER_MultiHashMap *changes;
371 };
372
373
374
375 /**
376  * A consensus session consists of one local client and the remote authorities.
377  */
378 struct ConsensusSession
379 {
380   /**
381    * Consensus sessions are kept in a DLL.
382    */
383   struct ConsensusSession *next;
384
385   /**
386    * Consensus sessions are kept in a DLL.
387    */
388   struct ConsensusSession *prev;
389
390   unsigned int num_client_insert_pending;
391
392   struct GNUNET_CONTAINER_MultiHashMap *setmap;
393   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
394   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
395
396   /**
397    * Array of peers with length 'num_peers'.
398    */
399   int *peers_blacklisted;
400
401   /*
402    * Mapping from (hashed) TaskKey to TaskEntry.
403    *
404    * We map the application_id for a round to the task that should be
405    * executed, so we don't have to go through all task whenever we get
406    * an incoming set op request.
407    */
408   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
409
410   struct Step *steps_head;
411   struct Step *steps_tail;
412
413   int conclude_started;
414
415   int conclude_done;
416
417   /**
418   * Global consensus identification, computed
419   * from the session id and participating authorities.
420   */
421   struct GNUNET_HashCode global_id;
422
423   /**
424    * Client that inhabits the session
425    */
426   struct GNUNET_SERVER_Client *client;
427
428   /**
429    * Queued messages to the client.
430    */
431   struct GNUNET_MQ_Handle *client_mq;
432
433   /**
434    * Time when the conclusion of the consensus should begin.
435    */
436   struct GNUNET_TIME_Absolute conclude_start;
437
438   /**
439    * Timeout for all rounds together, single rounds will schedule a timeout task
440    * with a fraction of the conclude timeout.
441    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
442    */
443   struct GNUNET_TIME_Absolute conclude_deadline;
444
445   struct GNUNET_PeerIdentity *peers;
446
447   /**
448    * Number of other peers in the consensus.
449    */
450   unsigned int num_peers;
451
452   /**
453    * Index of the local peer in the peers array
454    */
455   unsigned int local_peer_idx;
456
457   /**
458    * Listener for requests from other peers.
459    * Uses the session's global id as app id.
460    */
461   struct GNUNET_SET_ListenHandle *set_listener;
462 };
463
464 /**
465  * Linked list of sessions this peer participates in.
466  */
467 static struct ConsensusSession *sessions_head;
468
469 /**
470  * Linked list of sessions this peer participates in.
471  */
472 static struct ConsensusSession *sessions_tail;
473
474 /**
475  * Configuration of the consensus service.
476  */
477 static const struct GNUNET_CONFIGURATION_Handle *cfg;
478
479 /**
480  * Handle to the server for this service.
481  */
482 static struct GNUNET_SERVER_Handle *srv;
483
484 /**
485  * Peer that runs this service.
486  */
487 static struct GNUNET_PeerIdentity my_peer;
488
489 /**
490  * Statistics handle.
491  */
492 struct GNUNET_STATISTICS_Handle *statistics;
493
494
495 static void
496 finish_task (struct TaskEntry *task);
497
498 static void
499 run_ready_steps (struct ConsensusSession *session);
500
501 static const char *
502 phasename (uint16_t phase)
503 {
504   switch (phase)
505   {
506     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
507     case PHASE_KIND_FINISH: return "FINISH";
508     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
509     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
510     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
511     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
512     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
513     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
514     default: return "(unknown)";
515   }
516 }
517
518
519 static const char *
520 setname (uint16_t kind)
521 {
522   switch (kind)
523   {
524     case SET_KIND_CURRENT: return "CURRENT";
525     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
526     case SET_KIND_NONE: return "NONE";
527     default: return "(unknown)";
528   }
529 }
530
531 static const char *
532 rfnname (uint16_t kind)
533 {
534   switch (kind)
535   {
536     case RFN_KIND_NONE: return "NONE";
537     case RFN_KIND_ECHO: return "ECHO";
538     case RFN_KIND_CONFIRM: return "CONFIRM";
539     default: return "(unknown)";
540   }
541 }
542
543 static const char *
544 diffname (uint16_t kind)
545 {
546   switch (kind)
547   {
548     case DIFF_KIND_NONE: return "NONE";
549     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
550     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
551     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
552     default: return "(unknown)";
553   }
554 }
555
556 #ifdef GNUNET_EXTRA_LOGGING
557
558
559 static const char *
560 debug_str_element (const struct GNUNET_SET_Element *el)
561 {
562   struct GNUNET_HashCode hash;
563
564   GNUNET_SET_element_hash (el, &hash);
565
566   return GNUNET_h2s (&hash);
567 }
568
569 static const char *
570 debug_str_task_key (struct TaskKey *tk)
571 {
572   static char buf[256];
573
574   snprintf (buf, sizeof (buf),
575             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
576             phasename (tk->kind), tk->peer1, tk->peer2,
577             tk->leader, tk->repetition);
578
579   return buf;
580 }
581
582 static const char *
583 debug_str_diff_key (struct DiffKey *dk)
584 {
585   static char buf[256];
586
587   snprintf (buf, sizeof (buf),
588             "DiffKey kind=%s, k1=%d, k2=%d",
589             diffname (dk->diff_kind), dk->k1, dk->k2);
590
591   return buf;
592 }
593
594 static const char *
595 debug_str_set_key (const struct SetKey *sk)
596 {
597   static char buf[256];
598
599   snprintf (buf, sizeof (buf),
600             "SetKey kind=%s, k1=%d, k2=%d",
601             setname (sk->set_kind), sk->k1, sk->k2);
602
603   return buf;
604 }
605
606
607 static const char *
608 debug_str_rfn_key (const struct RfnKey *rk)
609 {
610   static char buf[256];
611
612   snprintf (buf, sizeof (buf),
613             "RfnKey kind=%s, k1=%d, k2=%d",
614             rfnname (rk->rfn_kind), rk->k1, rk->k2);
615
616   return buf;
617 }
618
619 #endif /* GNUNET_EXTRA_LOGGING */
620
621
622 /**
623  * Destroy a session, free all resources associated with it.
624  *
625  * @param session the session to destroy
626  */
627 static void
628 destroy_session (struct ConsensusSession *session)
629 {
630   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
631   if (NULL != session->set_listener)
632   {
633     GNUNET_SET_listen_cancel (session->set_listener);
634     session->set_listener = NULL;
635   }
636   if (NULL != session->client_mq)
637   {
638     GNUNET_MQ_destroy (session->client_mq);
639     session->client_mq = NULL;
640     /* The MQ cleanup will also disconnect the underlying client. */
641     session->client = NULL;
642   }
643   if (NULL != session->client)
644   {
645     GNUNET_SERVER_client_disconnect (session->client);
646     session->client = NULL;
647   }
648   GNUNET_free (session);
649 }
650
651
652 /**
653  * Send the final result set of the consensus to the client, element by
654  * element.
655  *
656  * @param cls closure
657  * @param element the current element, NULL if all elements have been
658  *        iterated over
659  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
660  */
661 static int
662 send_to_client_iter (void *cls,
663                      const struct GNUNET_SET_Element *element)
664 {
665   struct TaskEntry *task = (struct TaskEntry *) cls;
666   struct ConsensusSession *session = task->step->session;
667   struct GNUNET_MQ_Envelope *ev;
668
669   if (NULL != element)
670   {
671     struct GNUNET_CONSENSUS_ElementMessage *m;
672
673     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674                 "P%d: sending element %s to client\n",
675                 session->local_peer_idx,
676                 debug_str_element (element));
677
678     ev = GNUNET_MQ_msg_extra (m, element->size,
679                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
680     m->element_type = htons (element->element_type);
681     memcpy (&m[1], element->data, element->size);
682     GNUNET_MQ_send (session->client_mq, ev);
683   }
684   else
685   {
686     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687                 "P%d: finished iterating elements for client\n",
688                 session->local_peer_idx);
689     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
690     GNUNET_MQ_send (session->client_mq, ev);
691   }
692   return GNUNET_YES;
693 }
694
695
696 static struct SetEntry *
697 lookup_set (struct ConsensusSession *session, struct SetKey *key)
698 {
699   struct GNUNET_HashCode hash;
700
701   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702               "P%u: looking up set {%s}\n",
703               session->local_peer_idx,
704               debug_str_set_key (key));
705
706   GNUNET_assert (SET_KIND_NONE != key->set_kind);
707   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
708   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
709 }
710
711
712 static struct DiffEntry *
713 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
714 {
715   struct GNUNET_HashCode hash;
716
717   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718               "P%u: looking up diff {%s}\n",
719               session->local_peer_idx,
720               debug_str_diff_key (key));
721
722   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
723   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
724   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
725 }
726
727
728 static struct ReferendumEntry *
729 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
730 {
731   struct GNUNET_HashCode hash;
732
733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734               "P%u: looking up rfn {%s}\n",
735               session->local_peer_idx,
736               debug_str_rfn_key (key));
737
738   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
739   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
740   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
741 }
742
743
744 static void
745 diff_insert (struct DiffEntry *diff,
746              int weight,
747              const struct GNUNET_SET_Element *element)
748 {
749   struct DiffElementInfo *di;
750   struct GNUNET_HashCode hash;
751
752   GNUNET_assert ( (1 == weight) || (-1 == weight));
753
754   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
755               "diff_insert with element size %u\n",
756               element->size);
757
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759               "hashing element\n");
760
761   GNUNET_SET_element_hash (element, &hash);
762
763   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
764               "hashed element\n");
765
766   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
767
768   if (NULL == di)
769   {
770     di = GNUNET_new (struct DiffElementInfo);
771     di->element = GNUNET_SET_element_dup (element);
772     GNUNET_assert (GNUNET_OK ==
773                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
774                                                       &hash, di,
775                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
776   }
777
778   di->weight = weight;
779 }
780
781
782 static void
783 rfn_commit (struct ReferendumEntry *rfn,
784             uint16_t commit_peer)
785 {
786   GNUNET_assert (commit_peer < rfn->num_peers);
787
788   rfn->peer_commited[commit_peer] = GNUNET_YES;
789 }
790
791
792 static void
793 rfn_contest (struct ReferendumEntry *rfn,
794              uint16_t contested_peer)
795 {
796   GNUNET_assert (contested_peer < rfn->num_peers);
797
798   rfn->peer_contested[contested_peer] = GNUNET_YES;
799 }
800
801
802 static uint16_t
803 rfn_noncontested (struct ReferendumEntry *rfn)
804 {
805   uint16_t i;
806   uint16_t ret;
807
808   ret = 0;
809   for (i = 0; i < rfn->num_peers; i++)
810     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
811       ret++;
812
813   return ret;
814 }
815
816
817 static void
818 rfn_vote (struct ReferendumEntry *rfn,
819           uint16_t voting_peer,
820           enum ReferendumVote vote,
821           const struct GNUNET_SET_Element *element)
822 {
823   struct RfnElementInfo *ri;
824   struct GNUNET_HashCode hash;
825
826   GNUNET_assert (voting_peer < rfn->num_peers);
827
828   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
829      since VOTE_KEEP is implicit in not voting. */
830   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
831
832   GNUNET_SET_element_hash (element, &hash);
833   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
834
835   if (NULL == ri)
836   {
837     ri = GNUNET_new (struct RfnElementInfo);
838     ri->element = GNUNET_SET_element_dup (element);
839     ri->votes = GNUNET_new_array (rfn->num_peers, int);
840     GNUNET_assert (GNUNET_OK ==
841                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
842                                                       &hash, ri,
843                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
844   }
845
846   ri->votes[voting_peer] = GNUNET_YES;
847   ri->proposal = vote;
848 }
849
850
851 uint16_t
852 task_other_peer (struct TaskEntry *task)
853 {
854   uint16_t me = task->step->session->local_peer_idx;
855   if (task->key.peer1 == me)
856     return task->key.peer2;
857   return task->key.peer1;
858 }
859
860
861 /**
862  * Callback for set operation results. Called for each element
863  * in the result set.
864  *
865  * @param cls closure
866  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
867  * @param status see enum GNUNET_SET_Status
868  */
869 static void
870 set_result_cb (void *cls,
871                const struct GNUNET_SET_Element *element,
872                enum GNUNET_SET_Status status)
873 {
874   struct TaskEntry *task = cls;
875   struct ConsensusSession *session = task->step->session;
876   struct SetEntry *output_set = NULL;
877   struct DiffEntry *output_diff = NULL;
878   struct ReferendumEntry *output_rfn = NULL;
879   unsigned int other_idx;
880   struct SetOpCls *setop;
881
882   setop = &task->cls.setop;
883
884
885   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886               "P%u: got set result for {%s}, status %u\n",
887               session->local_peer_idx,
888               debug_str_task_key (&task->key),
889               status);
890
891   if (GNUNET_NO == task->is_started)
892   {
893     GNUNET_break_op (0);
894     return;
895   }
896
897   if (GNUNET_YES == task->is_finished)
898   {
899     GNUNET_break_op (0);
900     return;
901   }
902
903   other_idx = task_other_peer (task);
904
905   if (SET_KIND_NONE != setop->output_set.set_kind)
906   {
907     output_set = lookup_set (session, &setop->output_set);
908     GNUNET_assert (NULL != output_set);
909   }
910
911   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
912   {
913     output_diff = lookup_diff (session, &setop->output_diff);
914     GNUNET_assert (NULL != output_diff);
915   }
916
917   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
918   {
919     output_rfn = lookup_rfn (session, &setop->output_rfn);
920     GNUNET_assert (NULL != output_rfn);
921   }
922
923   if (GNUNET_YES == session->peers_blacklisted[other_idx])
924   {
925     /* Peer might have been blacklisted
926        by a gradecast running in parallel, ignore elements from now */
927     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
928       return;
929     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
930       return;
931   }
932
933   if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
934   {
935     if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
936     {
937       GNUNET_assert (NULL != output_rfn);
938       rfn_contest (output_rfn, task_other_peer (task));
939       return;
940     }
941   }
942
943   switch (status)
944   {
945     case GNUNET_SET_STATUS_ADD_LOCAL:
946       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
947                   "Adding element in Task {%s}\n",
948                   debug_str_task_key (&task->key));
949       if (NULL != output_set)
950       {
951         // FIXME: record pending adds, use callback
952         GNUNET_SET_add_element (output_set->h,
953                                 element,
954                                 NULL,
955                                 NULL);
956 #ifdef GNUNET_EXTRA_LOGGING
957         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
958                     "P%u: adding element %s into set {%s} of task {%s}\n",
959                     session->local_peer_idx,
960                     debug_str_element (element),
961                     debug_str_set_key (&setop->output_set),
962                     debug_str_task_key (&task->key));
963 #endif
964       }
965       if (NULL != output_diff)
966       {
967         diff_insert (output_diff, 1, element);
968 #ifdef GNUNET_EXTRA_LOGGING
969         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
970                     "P%u: adding element %s into diff {%s} of task {%s}\n",
971                     session->local_peer_idx,
972                     debug_str_element (element),
973                     debug_str_diff_key (&setop->output_diff),
974                     debug_str_task_key (&task->key));
975 #endif
976       }
977       if (NULL != output_rfn)
978       {
979         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
980 #ifdef GNUNET_EXTRA_LOGGING
981         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
982                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
983                     session->local_peer_idx,
984                     debug_str_element (element),
985                     debug_str_rfn_key (&setop->output_rfn),
986                     debug_str_task_key (&task->key));
987 #endif
988       }
989       // XXX: add result to structures in task
990       break;
991     case GNUNET_SET_STATUS_ADD_REMOTE:
992       if (GNUNET_YES == setop->do_not_remove)
993         break;
994       if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
995         break;
996       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997                   "Removing element in Task {%s}\n",
998                   debug_str_task_key (&task->key));
999       if (NULL != output_set)
1000       {
1001         // FIXME: record pending adds, use callback
1002         GNUNET_SET_remove_element (output_set->h,
1003                                    element,
1004                                    NULL,
1005                                    NULL);
1006 #ifdef GNUNET_EXTRA_LOGGING
1007         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1008                     "P%u: removing element %s from set {%s} of task {%s}\n",
1009                     session->local_peer_idx,
1010                     debug_str_element (element),
1011                     debug_str_set_key (&setop->output_set),
1012                     debug_str_task_key (&task->key));
1013 #endif
1014       }
1015       if (NULL != output_diff)
1016       {
1017         diff_insert (output_diff, -1, element);
1018 #ifdef GNUNET_EXTRA_LOGGING
1019         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1020                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1021                     session->local_peer_idx,
1022                     debug_str_element (element),
1023                     debug_str_diff_key (&setop->output_diff),
1024                     debug_str_task_key (&task->key));
1025 #endif
1026       }
1027       if (NULL != output_rfn)
1028       {
1029         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1030 #ifdef GNUNET_EXTRA_LOGGING
1031         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1032                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1033                     session->local_peer_idx,
1034                     debug_str_element (element),
1035                     debug_str_rfn_key (&setop->output_rfn),
1036                     debug_str_task_key (&task->key));
1037 #endif
1038       }
1039       break;
1040     case GNUNET_SET_STATUS_DONE:
1041       // XXX: check first if any changes to the underlying
1042       // set are still pending
1043       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044                   "Finishing setop in Task {%s}\n",
1045                   debug_str_task_key (&task->key));
1046       if (NULL != output_rfn)
1047       {
1048         rfn_commit (output_rfn, task_other_peer (task));
1049       }
1050       finish_task (task);
1051       break;
1052     case GNUNET_SET_STATUS_FAILURE:
1053       // XXX: cleanup
1054       GNUNET_break_op (0);
1055       finish_task (task);
1056       return;
1057     default:
1058       /* not reached */
1059       GNUNET_assert (0);
1060   }
1061 }
1062
1063 #ifdef EVIL
1064
1065 enum EvilnessType
1066 {
1067   EVILNESS_NONE,
1068   EVILNESS_CRAM_ALL,
1069   EVILNESS_CRAM_LEAD,
1070   EVILNESS_CRAM_ECHO,
1071   EVILNESS_SLACK,
1072 };
1073
1074 enum EvilnessSubType
1075 {
1076   EVILNESS_SUB_NONE,
1077   EVILNESS_SUB_REPLACEMENT,
1078   EVILNESS_SUB_NO_REPLACEMENT,
1079 };
1080
1081 struct Evilness
1082 {
1083   enum EvilnessType type;
1084   enum EvilnessSubType subtype;
1085   unsigned int num;
1086 };
1087
1088
1089 static int
1090 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1091 {
1092   if (0 == strcmp ("replace", evil_subtype_str))
1093   {
1094     evil->subtype = EVILNESS_SUB_REPLACEMENT;
1095   }
1096   else if (0 == strcmp ("noreplace", evil_subtype_str))
1097   {
1098     evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1099   }
1100   else
1101   {
1102     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1103                 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1104                 evil_subtype_str);
1105     return GNUNET_SYSERR;
1106   }
1107   return GNUNET_OK;
1108 }
1109
1110
1111 static void
1112 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1113 {
1114   char *evil_spec;
1115   char *field;
1116   char *evil_type_str = NULL;
1117   char *evil_subtype_str = NULL;
1118
1119   GNUNET_assert (NULL != evil);
1120
1121   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1122   {
1123     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124                 "P%u: no evilness\n",
1125                 session->local_peer_idx);
1126     evil->type = EVILNESS_NONE;
1127     return;
1128   }
1129   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1130               "P%u: got evilness spec\n",
1131               session->local_peer_idx);
1132
1133   for (field = strtok (evil_spec, "/");
1134        NULL != field;
1135        field = strtok (NULL, "/"))
1136   {
1137     unsigned int peer_num;
1138     unsigned int evil_num;
1139     int ret;
1140
1141     evil_type_str = NULL;
1142     evil_subtype_str = NULL;
1143
1144     ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1145
1146     if (ret != 4)
1147     {
1148       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1149                   "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1150                   field,
1151                   ret);
1152       goto not_evil;
1153     }
1154
1155     GNUNET_assert (NULL != evil_type_str);
1156     GNUNET_assert (NULL != evil_subtype_str);
1157
1158     if (peer_num == session->local_peer_idx)
1159     {
1160       if (0 == strcmp ("slack", evil_type_str))
1161       {
1162         evil->type = EVILNESS_SLACK;
1163       }
1164       else if (0 == strcmp ("cram-all", evil_type_str))
1165       {
1166         evil->type = EVILNESS_CRAM_ALL;
1167         evil->num = evil_num;
1168         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1169           goto not_evil;
1170       }
1171       else if (0 == strcmp ("cram-lead", evil_type_str))
1172       {
1173         evil->type = EVILNESS_CRAM_LEAD;
1174         evil->num = evil_num;
1175         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1176           goto not_evil;
1177       }
1178       else if (0 == strcmp ("cram-echo", evil_type_str))
1179       {
1180         evil->type = EVILNESS_CRAM_ECHO;
1181         evil->num = evil_num;
1182         if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1183           goto not_evil;
1184       }
1185       else
1186       {
1187         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1188                     "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1189                     evil_type_str);
1190         goto not_evil;
1191       }
1192       goto cleanup;
1193     }
1194     /* No GNUNET_free since memory was allocated by libc */
1195     free (evil_type_str);
1196     evil_type_str = NULL;
1197     evil_subtype_str = NULL;
1198   }
1199 not_evil:
1200   evil->type = EVILNESS_NONE;
1201 cleanup:
1202   GNUNET_free (evil_spec);
1203   /* no GNUNET_free_non_null since it wasn't
1204    * allocated with GNUNET_malloc */
1205   if (NULL != evil_type_str)
1206     free (evil_type_str);
1207   if (NULL != evil_subtype_str)
1208     free (evil_subtype_str);
1209 }
1210
1211 #endif
1212
1213
1214 /**
1215  * Commit the appropriate set for a
1216  * task.
1217  */
1218 static void
1219 commit_set (struct ConsensusSession *session,
1220             struct TaskEntry *task)
1221 {
1222   struct SetEntry *set;
1223   struct SetOpCls *setop = &task->cls.setop;
1224
1225   GNUNET_assert (NULL != setop->op);
1226   set = lookup_set (session, &setop->input_set);
1227   GNUNET_assert (NULL != set);
1228
1229 #ifdef EVIL
1230   {
1231     unsigned int i;
1232     struct Evilness evil;
1233
1234     get_evilness (session, &evil);
1235     if (EVILNESS_NONE != evil.type)
1236     {
1237       /* Useful for evaluation */
1238       GNUNET_STATISTICS_set (statistics,
1239                              "is evil",
1240                              1,
1241                              GNUNET_NO);
1242     }
1243     switch (evil.type)
1244     {
1245       case EVILNESS_CRAM_ALL:
1246       case EVILNESS_CRAM_LEAD:
1247       case EVILNESS_CRAM_ECHO:
1248         /* We're not cramming elements in the
1249            all-to-all round, since that would just
1250            add more elements to the result set, but
1251            wouldn't test robustness. */
1252         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1253         {
1254           GNUNET_SET_commit (setop->op, set->h);
1255           break;
1256         }
1257         if ((EVILNESS_CRAM_LEAD == evil.type) &&
1258             ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1259         {
1260           GNUNET_SET_commit (setop->op, set->h);
1261           break;
1262         }
1263         if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1264         {
1265           GNUNET_SET_commit (setop->op, set->h);
1266           break;
1267         }
1268         for (i = 0; i < evil.num; i++)
1269         {
1270           struct GNUNET_HashCode hash;
1271           struct GNUNET_SET_Element element;
1272           element.data = &hash;
1273           element.size = sizeof (struct GNUNET_HashCode);
1274           element.element_type = 0;
1275
1276           if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1277           {
1278             /* Always generate a new element. */
1279             GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
1280           }
1281           else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1282           {
1283             /* Always cram the same elements, derived from counter. */
1284             GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
1285           }
1286           else
1287           {
1288             GNUNET_assert (0);
1289           }
1290           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1291 #ifdef GNUNET_EXTRA_LOGGING
1292           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1293                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1294                       session->local_peer_idx,
1295                       debug_str_element (&element),
1296                       debug_str_set_key (&setop->input_set),
1297                       debug_str_task_key (&task->key));
1298 #endif
1299         }
1300         GNUNET_STATISTICS_update (statistics,
1301                                   "# stuffed elements",
1302                                   evil.num,
1303                                   GNUNET_NO);
1304         GNUNET_SET_commit (setop->op, set->h);
1305         break;
1306       case EVILNESS_SLACK:
1307         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1308                     "P%u: evil peer: slacking\n",
1309                     session->local_peer_idx,
1310                     evil.num);
1311         /* Do nothing. */
1312         break;
1313       case EVILNESS_NONE:
1314         GNUNET_SET_commit (setop->op, set->h);
1315         break;
1316     }
1317   }
1318 #else
1319   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1320   {
1321     struct GNUNET_SET_Element element;
1322     struct ContestedPayload payload;
1323     element.data = &payload;
1324     element.size = sizeof (struct ContestedPayload);
1325     element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1326     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1327   }
1328   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1329   {
1330     GNUNET_SET_commit (setop->op, set->h);
1331   }
1332   else
1333   {
1334     /* For our testcases, we don't want the blacklisted
1335        peers to wait. */
1336     GNUNET_SET_operation_cancel (setop->op);
1337     setop->op = NULL;
1338   }
1339 #endif
1340 }
1341
1342
1343 static void
1344 put_diff (struct ConsensusSession *session,
1345          struct DiffEntry *diff)
1346 {
1347   struct GNUNET_HashCode hash;
1348
1349   GNUNET_assert (NULL != diff);
1350
1351   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1352   GNUNET_assert (GNUNET_OK ==
1353                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1354                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1355 }
1356
1357 static void
1358 put_set (struct ConsensusSession *session,
1359          struct SetEntry *set)
1360 {
1361   struct GNUNET_HashCode hash;
1362
1363   GNUNET_assert (NULL != set->h);
1364
1365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366               "Putting set %s\n",
1367               debug_str_set_key (&set->key));
1368
1369   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1370   GNUNET_assert (GNUNET_OK ==
1371                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1372                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1373 }
1374
1375
1376 static void
1377 put_rfn (struct ConsensusSession *session,
1378          struct ReferendumEntry *rfn)
1379 {
1380   struct GNUNET_HashCode hash;
1381
1382   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1383   GNUNET_assert (GNUNET_OK ==
1384                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1385                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1386 }
1387
1388
1389
1390 static void
1391 task_cancel_reconcile (struct TaskEntry *task)
1392 {
1393   /* not implemented yet */
1394   GNUNET_assert (0);
1395 }
1396
1397
1398 static void
1399 apply_diff_to_rfn (struct DiffEntry *diff,
1400                    struct ReferendumEntry *rfn,
1401                    uint16_t voting_peer,
1402                    uint16_t num_peers)
1403 {
1404   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1405   struct DiffElementInfo *di;
1406
1407   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1408
1409   while (GNUNET_YES ==
1410          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1411                                                       NULL,
1412                                                       (const void **) &di))
1413   {
1414     if (di->weight > 0)
1415     {
1416       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1417     }
1418     if (di->weight < 0)
1419     {
1420       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1421     }
1422   }
1423
1424   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1425 }
1426
1427
1428 struct DiffEntry *
1429 diff_create ()
1430 {
1431   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1432
1433   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1434
1435   return d;
1436 }
1437
1438
1439 struct DiffEntry *
1440 diff_compose (struct DiffEntry *diff_1,
1441               struct DiffEntry *diff_2)
1442 {
1443   struct DiffEntry *diff_new;
1444   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1445   struct DiffElementInfo *di;
1446
1447   diff_new = diff_create ();
1448
1449   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1450   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1451   {
1452     diff_insert (diff_new, di->weight, di->element);
1453   }
1454   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1455
1456   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1457   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1458   {
1459     diff_insert (diff_new, di->weight, di->element);
1460   }
1461   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1462
1463   return diff_new;
1464 }
1465
1466
1467 struct ReferendumEntry *
1468 rfn_create (uint16_t size)
1469 {
1470   struct ReferendumEntry *rfn;
1471
1472   rfn = GNUNET_new (struct ReferendumEntry);
1473   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1474   rfn->peer_commited = GNUNET_new_array (size, int);
1475   rfn->peer_contested = GNUNET_new_array (size, int);
1476   rfn->num_peers = size;
1477
1478   return rfn;
1479 }
1480
1481
1482 void
1483 diff_destroy (struct DiffEntry *diff)
1484 {
1485   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1486   GNUNET_free (diff);
1487 }
1488
1489
1490 static void
1491 rfn_majority (const struct ReferendumEntry *rfn,
1492               const struct RfnElementInfo *ri,
1493               uint16_t *ret_majority,
1494               enum ReferendumVote *ret_vote)
1495 {
1496   uint16_t votes_yes = 0;
1497   uint16_t num_commited = 0;
1498   uint16_t i;
1499
1500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1501               "Computing rfn majority for element %s of rfn {%s}\n",
1502               debug_str_element (ri->element),
1503               debug_str_rfn_key (&rfn->key));
1504
1505   for (i = 0; i < rfn->num_peers; i++)
1506   {
1507     if (GNUNET_NO == rfn->peer_commited[i])
1508       continue;
1509     num_commited++;
1510
1511     if (GNUNET_YES == ri->votes[i])
1512       votes_yes++;
1513   }
1514
1515   if (votes_yes > (num_commited) / 2)
1516   {
1517     *ret_vote = ri->proposal;
1518     *ret_majority = votes_yes;
1519   }
1520   else
1521   {
1522     *ret_vote = VOTE_STAY;
1523     *ret_majority = num_commited - votes_yes;
1524   }
1525 }
1526
1527
1528 struct SetCopyCls
1529 {
1530   struct TaskEntry *task;
1531   struct SetKey dst_set_key;
1532 };
1533
1534
1535 static void
1536 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1537 {
1538   struct SetCopyCls *scc = cls;
1539   struct TaskEntry *task = scc->task;
1540   struct SetKey dst_set_key = scc->dst_set_key;
1541   struct SetEntry *set;
1542
1543   GNUNET_free (scc);
1544   set = GNUNET_new (struct SetEntry);
1545   set->h = copy;
1546   set->key = dst_set_key;
1547   put_set (task->step->session, set);
1548
1549   task->start (task);
1550 }
1551
1552
1553 /**
1554  * Call the start function of the given
1555  * task again after we created a copy of the given set.
1556  */
1557 static void
1558 create_set_copy_for_task (struct TaskEntry *task,
1559                           struct SetKey *src_set_key,
1560                           struct SetKey *dst_set_key)
1561 {
1562   struct SetEntry *src_set;
1563   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1564
1565   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1566               "Copying set {%s} to {%s} for task {%s}\n",
1567               debug_str_set_key (src_set_key),
1568               debug_str_set_key (dst_set_key),
1569               debug_str_task_key (&task->key));
1570
1571   scc->task = task;
1572   scc->dst_set_key = *dst_set_key;
1573   src_set = lookup_set (task->step->session, src_set_key);
1574   GNUNET_assert (NULL != src_set);
1575   GNUNET_SET_copy_lazy (src_set->h,
1576                         set_copy_cb,
1577                         scc);
1578 }
1579
1580
1581 struct SetMutationProgressCls
1582 {
1583   int num_pending;
1584   /**
1585    * Task to finish once all changes are through.
1586    */
1587   struct TaskEntry *task;
1588 };
1589
1590
1591 static void
1592 set_mutation_done (void *cls)
1593 {
1594   struct SetMutationProgressCls *pc = cls;
1595
1596   GNUNET_assert (pc->num_pending > 0);
1597
1598   pc->num_pending--;
1599
1600   if (0 == pc->num_pending)
1601   {
1602     struct TaskEntry *task = pc->task;
1603     GNUNET_free (pc);
1604     finish_task (task);
1605   }
1606 }
1607
1608 static void
1609 task_start_apply_round (struct TaskEntry *task)
1610 {
1611   struct ConsensusSession *session = task->step->session;
1612   struct SetKey sk_in;
1613   struct SetKey sk_out;
1614   struct RfnKey rk_in;
1615   struct SetEntry *set_out;
1616   struct ReferendumEntry *rfn_in;
1617   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1618   struct RfnElementInfo *ri;
1619   struct SetMutationProgressCls *progress_cls;
1620
1621   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1622   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1623   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1624
1625   set_out = lookup_set (session, &sk_out);
1626   if (NULL == set_out)
1627   {
1628     create_set_copy_for_task (task, &sk_in, &sk_out);
1629     return;
1630   }
1631
1632   rfn_in = lookup_rfn (session, &rk_in);
1633   GNUNET_assert (NULL != rfn_in);
1634
1635   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1636   progress_cls->task = task;
1637
1638   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1639
1640   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1641   {
1642     uint16_t majority_num;
1643     enum ReferendumVote majority_vote;
1644
1645     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1646
1647     switch (majority_vote)
1648     {
1649       case VOTE_ADD:
1650         progress_cls->num_pending++;
1651         GNUNET_assert (GNUNET_OK ==
1652                        GNUNET_SET_add_element (set_out->h,
1653                                                ri->element,
1654                                                set_mutation_done,
1655                                                progress_cls));
1656         break;
1657       case VOTE_REMOVE:
1658         progress_cls->num_pending++;
1659         GNUNET_assert (GNUNET_OK ==
1660                        GNUNET_SET_remove_element (set_out->h,
1661                                                   ri->element,
1662                                                   set_mutation_done,
1663                                                   progress_cls));
1664         break;
1665       case VOTE_STAY:
1666         // do nothing
1667         break;
1668       default:
1669         GNUNET_assert (0);
1670         break;
1671     }
1672   }
1673
1674   if (progress_cls->num_pending == 0)
1675   {
1676     // call closure right now, no pending ops
1677     GNUNET_free (progress_cls);
1678     finish_task (task);
1679   }
1680 }
1681
1682
1683 #define THRESH(s) (((s)->num_peers / 3))
1684
1685
1686 static void
1687 task_start_grade (struct TaskEntry *task)
1688 {
1689   struct ConsensusSession *session = task->step->session;
1690   struct ReferendumEntry *output_rfn;
1691   struct ReferendumEntry *input_rfn;
1692   struct DiffEntry *input_diff;
1693   struct RfnKey rfn_key;
1694   struct DiffKey diff_key;
1695   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1696   struct RfnElementInfo *ri;
1697   unsigned int gradecast_confidence = 2;
1698
1699   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1700   output_rfn = lookup_rfn (session, &rfn_key);
1701   if (NULL == output_rfn)
1702   {
1703     output_rfn = rfn_create (session->num_peers);
1704     output_rfn->key = rfn_key;
1705     put_rfn (session, output_rfn);
1706   }
1707
1708   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1709   input_diff = lookup_diff (session, &diff_key);
1710   GNUNET_assert (NULL != input_diff);
1711
1712   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1713   input_rfn = lookup_rfn (session, &rfn_key);
1714   GNUNET_assert (NULL != input_rfn);
1715
1716   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1717
1718   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1719
1720   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1721   {
1722     uint16_t majority_num;
1723     enum ReferendumVote majority_vote;
1724
1725     // XXX: we need contested votes and non-contested votes here
1726     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1727
1728     if (majority_num <= session->num_peers / 3)
1729       majority_vote = VOTE_REMOVE;
1730
1731     switch (majority_vote)
1732     {
1733       case VOTE_STAY:
1734         break;
1735       case VOTE_ADD:
1736         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1737         break;
1738       case VOTE_REMOVE:
1739         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1740         break;
1741       default:
1742         GNUNET_assert (0);
1743         break;
1744     }
1745   }
1746
1747   {
1748     uint16_t noncontested;
1749     noncontested = rfn_noncontested (input_rfn);
1750     if (noncontested < (session->num_peers / 3) * 2)
1751     {
1752       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1753     }
1754     if (noncontested < (session->num_peers / 3) + 1)
1755     {
1756       gradecast_confidence = 0;
1757     }
1758   }
1759
1760   if (gradecast_confidence >= 1)
1761     rfn_commit (output_rfn, task->key.leader);
1762
1763   if (gradecast_confidence <= 1)
1764     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1765
1766   finish_task (task);
1767 }
1768
1769
1770 static void
1771 task_start_reconcile (struct TaskEntry *task)
1772 {
1773   struct SetEntry *input;
1774   struct SetOpCls *setop = &task->cls.setop;
1775   struct ConsensusSession *session = task->step->session;
1776
1777   input = lookup_set (session, &setop->input_set);
1778   GNUNET_assert (NULL != input);
1779   GNUNET_assert (NULL != input->h);
1780
1781   /* We create the outputs for the operation here
1782      (rather than in the set operation callback)
1783      because we want something valid in there, even
1784      if the other peer doesn't talk to us */
1785
1786   if (SET_KIND_NONE != setop->output_set.set_kind)
1787   {
1788     /* If we don't have an existing output set,
1789        we clone the input set. */
1790     if (NULL == lookup_set (session, &setop->output_set))
1791     {
1792       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1793       return;
1794     }
1795   }
1796
1797   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1798   {
1799     if (NULL == lookup_rfn (session, &setop->output_rfn))
1800     {
1801       struct ReferendumEntry *rfn;
1802
1803       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1804                   "P%u: output rfn <%s> missing, creating.\n",
1805                   session->local_peer_idx,
1806                   debug_str_rfn_key (&setop->output_rfn));
1807
1808       rfn = rfn_create (session->num_peers);
1809       rfn->key = setop->output_rfn;
1810       put_rfn (session, rfn);
1811     }
1812   }
1813
1814   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1815   {
1816     if (NULL == lookup_diff (session, &setop->output_diff))
1817     {
1818       struct DiffEntry *diff;
1819
1820       diff = diff_create ();
1821       diff->key = setop->output_diff;
1822       put_diff (session, diff);
1823     }
1824   }
1825
1826   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
1827   {
1828     /* XXX: mark the corresponding rfn as commited if necessary */
1829     finish_task (task);
1830     return;
1831   }
1832
1833   if (task->key.peer1 == session->local_peer_idx)
1834   {
1835     struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
1836
1837     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1838                 "P%u: Looking up set {%s} to run remote union\n",
1839                 session->local_peer_idx,
1840                 debug_str_set_key (&setop->input_set));
1841
1842     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
1843     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
1844
1845     rcm.kind = htons (task->key.kind);
1846     rcm.peer1 = htons (task->key.peer1);
1847     rcm.peer2 = htons (task->key.peer2);
1848     rcm.leader = htons (task->key.leader);
1849     rcm.repetition = htons (task->key.repetition);
1850
1851     GNUNET_assert (NULL == setop->op);
1852     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
1853                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
1854
1855     // XXX: maybe this should be done while
1856     // setting up tasks alreays?
1857     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
1858                                     &session->global_id,
1859                                     &rcm.header,
1860                                     GNUNET_SET_RESULT_SYMMETRIC,
1861                                     set_result_cb,
1862                                     task);
1863
1864     commit_set (session, task);
1865   }
1866   else if (task->key.peer2 == session->local_peer_idx)
1867   {
1868     /* Wait for the other peer to contact us */
1869     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
1870                 session->local_peer_idx, task->key.peer1);
1871
1872     if (NULL != setop->op)
1873     {
1874       commit_set (session, task);
1875     }
1876   }
1877   else
1878   {
1879     /* We made an error while constructing the task graph. */
1880     GNUNET_assert (0);
1881   }
1882 }
1883
1884
1885 static void
1886 task_start_eval_echo (struct TaskEntry *task)
1887 {
1888   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1889   struct ReferendumEntry *input_rfn;
1890   struct RfnElementInfo *ri;
1891   struct SetEntry *output_set;
1892   struct SetMutationProgressCls *progress_cls;
1893   struct ConsensusSession *session = task->step->session;
1894   struct SetKey sk_in;
1895   struct SetKey sk_out;
1896   struct RfnKey rk_in;
1897
1898   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1899   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
1900   output_set = lookup_set (session, &sk_out);
1901   if (NULL == output_set)
1902   {
1903     create_set_copy_for_task (task, &sk_in, &sk_out);
1904     return;
1905   }
1906
1907   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1908               "Evaluating referendum in Task {%s}\n",
1909               debug_str_task_key (&task->key));
1910
1911   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1912   progress_cls->task = task;
1913
1914   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1915   input_rfn = lookup_rfn (session, &rk_in);
1916
1917   GNUNET_assert (NULL != input_rfn);
1918
1919   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1920   GNUNET_assert (NULL != iter);
1921
1922   while (GNUNET_YES ==
1923          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1924                                                       NULL,
1925                                                       (const void **) &ri))
1926   {
1927     enum ReferendumVote majority_vote;
1928     uint16_t majority_num;
1929
1930     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1931
1932     if (majority_num < session->num_peers / 3)
1933     {
1934       /* It is not the case that all nonfaulty peers
1935          echoed the same value.  Since we're doing a set reconciliation, we
1936          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
1937          reconciliation as contested.  Other peers might not know that the
1938          leader is faulty, thus we still re-distribute in the confirmation
1939          round. */
1940       output_set->is_contested = GNUNET_YES;
1941     }
1942
1943     switch (majority_vote)
1944     {
1945       case VOTE_ADD:
1946         progress_cls->num_pending++;
1947         GNUNET_assert (GNUNET_OK ==
1948                        GNUNET_SET_add_element (output_set->h,
1949                                                ri->element,
1950                                                set_mutation_done,
1951                                                progress_cls));
1952         break;
1953       case VOTE_REMOVE:
1954         progress_cls->num_pending++;
1955         GNUNET_assert (GNUNET_OK ==
1956                        GNUNET_SET_remove_element (output_set->h,
1957                                                   ri->element,
1958                                                   set_mutation_done,
1959                                                   progress_cls));
1960         break;
1961       case VOTE_STAY:
1962         /* Nothing to do. */
1963         break;
1964       default:
1965         /* not reached */
1966         GNUNET_assert (0);
1967     }
1968   }
1969
1970   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1971
1972   if (progress_cls->num_pending == 0)
1973   {
1974     // call closure right now, no pending ops
1975     GNUNET_free (progress_cls);
1976     finish_task (task);
1977   }
1978 }
1979
1980
1981 static void
1982 task_start_finish (struct TaskEntry *task)
1983 {
1984   struct SetEntry *final_set;
1985   struct ConsensusSession *session = task->step->session;
1986
1987   final_set = lookup_set (session, &task->cls.finish.input_set);
1988
1989   GNUNET_assert (NULL != final_set);
1990
1991
1992   GNUNET_SET_iterate (final_set->h,
1993                       send_to_client_iter,
1994                       task);
1995 }
1996
1997 static void
1998 start_task (struct ConsensusSession *session, struct TaskEntry *task)
1999 {
2000   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2001
2002   GNUNET_assert (GNUNET_NO == task->is_started);
2003   GNUNET_assert (GNUNET_NO == task->is_finished);
2004   GNUNET_assert (NULL != task->start);
2005
2006   task->start (task);
2007
2008   task->is_started = GNUNET_YES;
2009 }
2010
2011
2012 static void finish_step (struct Step *step)
2013 {
2014   unsigned int i;
2015
2016   GNUNET_assert (step->finished_tasks == step->tasks_len);
2017   GNUNET_assert (GNUNET_YES == step->is_running);
2018   GNUNET_assert (GNUNET_NO == step->is_finished);
2019
2020 #ifdef GNUNET_EXTRA_LOGGING
2021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2022               "All tasks of step `%s' with %u subordinates finished.\n",
2023               step->debug_name,
2024               step->subordinates_len);
2025 #endif
2026
2027   for (i = 0; i < step->subordinates_len; i++)
2028   {
2029     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
2030     step->subordinates[i]->pending_prereq--;
2031 #ifdef GNUNET_EXTRA_LOGGING
2032     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2033                 "Decreased pending_prereq to %u for step `%s'.\n",
2034                 step->subordinates[i]->pending_prereq,
2035                 step->subordinates[i]->debug_name);
2036
2037 #endif
2038   }
2039
2040   step->is_finished = GNUNET_YES;
2041
2042   // XXX: maybe schedule as task to avoid recursion?
2043   run_ready_steps (step->session);
2044 }
2045
2046
2047 /*
2048  * Run all steps of the session that don't any
2049  * more dependencies.
2050  */
2051 static void
2052 run_ready_steps (struct ConsensusSession *session)
2053 {
2054   struct Step *step;
2055
2056   step = session->steps_head;
2057
2058   while (NULL != step)
2059   {
2060     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) )
2061     {
2062       size_t i;
2063
2064       GNUNET_assert (0 == step->finished_tasks);
2065
2066 #ifdef GNUNET_EXTRA_LOGGING
2067       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2068                   session->local_peer_idx,
2069                   step->debug_name,
2070                   step->round, step->tasks_len, step->subordinates_len);
2071 #endif
2072
2073       step->is_running = GNUNET_YES;
2074       for (i = 0; i < step->tasks_len; i++)
2075         start_task (session, step->tasks[i]);
2076
2077       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2078       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2079         finish_step (step);
2080
2081       /* Running the next ready steps will be triggered by task completion */
2082       return;
2083     }
2084     step = step->next;
2085   }
2086
2087   return;
2088 }
2089
2090
2091
2092 static void
2093 finish_task (struct TaskEntry *task)
2094 {
2095   GNUNET_assert (GNUNET_NO == task->is_finished);
2096   task->is_finished = GNUNET_YES;
2097
2098   task->step->finished_tasks++;
2099
2100   if (task->step->finished_tasks == task->step->tasks_len)
2101     finish_step (task->step);
2102 }
2103
2104
2105 /**
2106  * Search peer in the list of peers in session.
2107  *
2108  * @param peer peer to find
2109  * @param session session with peer
2110  * @return index of peer, -1 if peer is not in session
2111  */
2112 static int
2113 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2114 {
2115   int i;
2116   for (i = 0; i < session->num_peers; i++)
2117     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2118       return i;
2119   return -1;
2120 }
2121
2122
2123 /**
2124  * Compute a global, (hopefully) unique consensus session id,
2125  * from the local id of the consensus session, and the identities of all participants.
2126  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2127  * exactly the same peers, the global id will be different.
2128  *
2129  * @param session session to generate the global id for
2130  * @param local_session_id local id of the consensus session
2131  */
2132 static void
2133 compute_global_id (struct ConsensusSession *session,
2134                    const struct GNUNET_HashCode *local_session_id)
2135 {
2136   const char *salt = "gnunet-service-consensus/session_id";
2137
2138   GNUNET_assert (GNUNET_YES ==
2139                  GNUNET_CRYPTO_kdf (&session->global_id,
2140                                     sizeof (struct GNUNET_HashCode),
2141                                     salt,
2142                                     strlen (salt),
2143                                     session->peers,
2144                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2145                                     local_session_id,
2146                                     sizeof (struct GNUNET_HashCode),
2147                                     NULL));
2148 }
2149
2150
2151 /**
2152  * Compare two peer identities.
2153  *
2154  * @param h1 some peer identity
2155  * @param h2 some peer identity
2156  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2157  */
2158 static int
2159 peer_id_cmp (const void *h1, const void *h2)
2160 {
2161   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2162 }
2163
2164
2165 /**
2166  * Create the sorted list of peers for the session,
2167  * add the local peer if not in the join message.
2168  */
2169 static void
2170 initialize_session_peer_list (struct ConsensusSession *session,
2171                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2172 {
2173   unsigned int local_peer_in_list;
2174   uint32_t listed_peers;
2175   const struct GNUNET_PeerIdentity *msg_peers;
2176   unsigned int i;
2177
2178   GNUNET_assert (NULL != join_msg);
2179
2180   /* peers in the join message, may or may not include the local peer */
2181   listed_peers = ntohl (join_msg->num_peers);
2182
2183   session->num_peers = listed_peers;
2184
2185   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2186
2187   local_peer_in_list = GNUNET_NO;
2188   for (i = 0; i < listed_peers; i++)
2189   {
2190     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2191     {
2192       local_peer_in_list = GNUNET_YES;
2193       break;
2194     }
2195   }
2196
2197   if (GNUNET_NO == local_peer_in_list)
2198     session->num_peers++;
2199
2200   session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2201
2202   if (GNUNET_NO == local_peer_in_list)
2203     session->peers[session->num_peers - 1] = my_peer;
2204
2205   memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2206   qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
2207 }
2208
2209
2210 static struct TaskEntry *
2211 lookup_task (struct ConsensusSession *session,
2212              struct TaskKey *key)
2213 {
2214   struct GNUNET_HashCode hash;
2215
2216
2217   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2218   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2219               GNUNET_h2s (&hash));
2220   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2221 }
2222
2223
2224 /**
2225  * Called when another peer wants to do a set operation with the
2226  * local peer.
2227  *
2228  * @param cls closure
2229  * @param other_peer the other peer
2230  * @param context_msg message with application specific information from
2231  *        the other peer
2232  * @param request request from the other peer, use GNUNET_SET_accept
2233  *        to accept it, otherwise the request will be refused
2234  *        Note that we don't use a return value here, as it is also
2235  *        necessary to specify the set we want to do the operation with,
2236  *        whith sometimes can be derived from the context message.
2237  *        Also necessary to specify the timeout.
2238  */
2239 static void
2240 set_listen_cb (void *cls,
2241                const struct GNUNET_PeerIdentity *other_peer,
2242                const struct GNUNET_MessageHeader *context_msg,
2243                struct GNUNET_SET_Request *request)
2244 {
2245   struct ConsensusSession *session = cls;
2246   struct TaskKey tk;
2247   struct TaskEntry *task;
2248   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2249
2250   if (NULL == context_msg)
2251   {
2252     GNUNET_break_op (0);
2253     return;
2254   }
2255
2256   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2257   {
2258     GNUNET_break_op (0);
2259     return;
2260   }
2261
2262   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2263   {
2264     GNUNET_break_op (0);
2265     return;
2266   }
2267
2268   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2269
2270   tk = ((struct TaskKey) {
2271       .kind = ntohs (cm->kind),
2272       .peer1 = ntohs (cm->peer1),
2273       .peer2 = ntohs (cm->peer2),
2274       .repetition = ntohs (cm->repetition),
2275       .leader = ntohs (cm->leader),
2276   });
2277
2278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2279               session->local_peer_idx, debug_str_task_key (&tk));
2280
2281   task = lookup_task (session, &tk);
2282
2283   if (NULL == task)
2284   {
2285     GNUNET_break_op (0);
2286     return;
2287   }
2288
2289   if (GNUNET_YES == task->is_finished)
2290   {
2291     GNUNET_break_op (0);
2292     return;
2293   }
2294
2295   if (task->key.peer2 != session->local_peer_idx)
2296   {
2297     /* We're being asked, so we must be thne 2nd peer. */
2298     GNUNET_break_op (0);
2299     return;
2300   }
2301
2302   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2303                     (task->key.peer2 == session->local_peer_idx)));
2304
2305   task->cls.setop.op = GNUNET_SET_accept (request,
2306                                           GNUNET_SET_RESULT_SYMMETRIC,
2307                                           set_result_cb,
2308                                           task);
2309
2310   /* If the task hasn't been started yet,
2311      we wait for that until we commit. */
2312
2313   if (GNUNET_YES == task->is_started)
2314   {
2315     commit_set (session, task);
2316   }
2317 }
2318
2319
2320
2321 static void
2322 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2323           struct TaskEntry *t)
2324 {
2325   struct GNUNET_HashCode round_hash;
2326   struct Step *s;
2327
2328   GNUNET_assert (NULL != t->step);
2329
2330   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2331
2332   s = t->step;
2333
2334   if (s->tasks_len == s->tasks_cap)
2335   {
2336     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2337     GNUNET_array_grow (s->tasks,
2338                        s->tasks_cap,
2339                        target_size);
2340   }
2341
2342 #ifdef GNUNET_EXTRA_LOGGING
2343   GNUNET_assert (NULL != s->debug_name);
2344   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2345               debug_str_task_key (&t->key),
2346               s->debug_name);
2347 #endif
2348
2349   s->tasks[s->tasks_len] = t;
2350   s->tasks_len++;
2351
2352   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2353   GNUNET_assert (GNUNET_OK ==
2354       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2355                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2356 }
2357
2358
2359 static void
2360 install_step_timeouts (struct ConsensusSession *session)
2361 {
2362   /* Given the fully constructed task graph
2363      with rounds for tasks, we can give the tasks timeouts. */
2364
2365   // unsigned int max_round;
2366
2367   /* XXX: implement! */
2368 }
2369
2370
2371
2372 /*
2373  * Arrange two peers in some canonical order.
2374  */
2375 static void
2376 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2377 {
2378   uint16_t a;
2379   uint16_t b;
2380
2381   GNUNET_assert (*p1 < n);
2382   GNUNET_assert (*p2 < n);
2383
2384   if (*p1 < *p2)
2385   {
2386     a = *p1;
2387     b = *p2;
2388   }
2389   else
2390   {
2391     a = *p2;
2392     b = *p1;
2393   }
2394
2395   /* For uniformly random *p1, *p2,
2396      this condition is true with 50% chance */
2397   if (((b - a) + n) % n <= n / 2)
2398   {
2399     *p1 = a;
2400     *p2 = b;
2401   }
2402   else
2403   {
2404     *p1 = b;
2405     *p2 = a;
2406   }
2407 }
2408
2409
2410 /**
2411  * Record @a dep as a dependency of @a step.
2412  */
2413 static void
2414 step_depend_on (struct Step *step, struct Step *dep)
2415 {
2416   /* We're not checking for cyclic dependencies,
2417      but this is a cheap sanity check. */
2418   GNUNET_assert (step != dep);
2419   GNUNET_assert (NULL != step);
2420   GNUNET_assert (NULL != dep);
2421   GNUNET_assert (dep->round <= step->round);
2422
2423 #ifdef GNUNET_EXTRA_LOGGING
2424   /* Make sure we have complete debugging information.
2425      Also checks that we don't screw up too badly
2426      constructing the task graph. */
2427   GNUNET_assert (NULL != step->debug_name);
2428   GNUNET_assert (NULL != dep->debug_name);
2429   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2430               "Making step `%s' depend on `%s'\n",
2431               step->debug_name,
2432               dep->debug_name);
2433 #endif
2434
2435   if (dep->subordinates_cap == dep->subordinates_len)
2436   {
2437     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2438     GNUNET_array_grow (dep->subordinates,
2439                        dep->subordinates_cap,
2440                        target_size);
2441   }
2442
2443   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2444
2445   dep->subordinates[dep->subordinates_len] = step;
2446   dep->subordinates_len++;
2447
2448   step->pending_prereq++;
2449 }
2450
2451
2452 static struct Step *
2453 create_step (struct ConsensusSession *session, int round)
2454 {
2455   struct Step *step;
2456   step = GNUNET_new (struct Step);
2457   step->session = session;
2458   step->round = round;
2459   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2460                                     session->steps_tail,
2461                                     step);
2462   return step;
2463 }
2464
2465
2466 /**
2467  * Construct the task graph for a single
2468  * gradecast.
2469  */
2470 static void
2471 construct_task_graph_gradecast (struct ConsensusSession *session,
2472                                 uint16_t rep,
2473                                 uint16_t lead,
2474                                 struct Step *step_before,
2475                                 struct Step *step_after)
2476 {
2477   uint16_t n = session->num_peers;
2478   uint16_t me = session->local_peer_idx;
2479
2480   uint16_t p1;
2481   uint16_t p2;
2482
2483   /* The task we're currently setting up. */
2484   struct TaskEntry task;
2485
2486   struct Step *step;
2487   struct Step *prev_step;
2488
2489   uint16_t round;
2490
2491   unsigned int k;
2492
2493   round = step_before->round + 1;
2494
2495   /* gcast step 1: leader disseminates */
2496
2497   step = create_step (session, round);
2498
2499 #ifdef GNUNET_EXTRA_LOGGING
2500   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2501 #endif
2502   step_depend_on (step, step_before);
2503
2504   if (lead == me)
2505   {
2506     for (k = 0; k < n; k++)
2507     {
2508       if (k == me)
2509         continue;
2510       p1 = me;
2511       p2 = k;
2512       arrange_peers (&p1, &p2, n);
2513       task = ((struct TaskEntry) {
2514         .step = step,
2515         .start = task_start_reconcile,
2516         .cancel = task_cancel_reconcile,
2517         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2518       });
2519       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2520       put_task (session->taskmap, &task);
2521     }
2522     /* We run this task to make sure that the leader
2523        has the stored the SET_KIND_LEADER set of himself,
2524        so he can participate in the rest of the gradecast
2525        without the code having to handle any special cases. */
2526     task = ((struct TaskEntry) {
2527       .step = step,
2528       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2529       .start = task_start_reconcile,
2530       .cancel = task_cancel_reconcile,
2531     });
2532     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2533     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2534     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2535     put_task (session->taskmap, &task);
2536   }
2537   else
2538   {
2539     p1 = me;
2540     p2 = lead;
2541     arrange_peers (&p1, &p2, n);
2542     task = ((struct TaskEntry) {
2543       .step = step,
2544       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead},
2545       .start = task_start_reconcile,
2546       .cancel = task_cancel_reconcile,
2547     });
2548     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2549     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2550     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2551     put_task (session->taskmap, &task);
2552   }
2553
2554   /* gcast phase 2: echo */
2555   prev_step = step;
2556   round += 1;
2557   step = create_step (session, round);
2558 #ifdef GNUNET_EXTRA_LOGGING
2559   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2560 #endif
2561   step_depend_on (step, prev_step);
2562
2563   for (k = 0; k < n; k++)
2564   {
2565     p1 = k;
2566     p2 = me;
2567     arrange_peers (&p1, &p2, n);
2568     task = ((struct TaskEntry) {
2569       .step = step,
2570       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2571       .start = task_start_reconcile,
2572       .cancel = task_cancel_reconcile,
2573     });
2574     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2575     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2576     put_task (session->taskmap, &task);
2577   }
2578
2579   prev_step = step;
2580   /* Same round, since step only has local tasks */
2581   step = create_step (session, round);
2582 #ifdef GNUNET_EXTRA_LOGGING
2583   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2584 #endif
2585   step_depend_on (step, prev_step);
2586
2587   arrange_peers (&p1, &p2, n);
2588   task = ((struct TaskEntry) {
2589     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2590     .step = step,
2591     .start = task_start_eval_echo
2592   });
2593   put_task (session->taskmap, &task);
2594
2595   prev_step = step;
2596   round += 1;
2597   step = create_step (session, round);
2598 #ifdef GNUNET_EXTRA_LOGGING
2599   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2600 #endif
2601   step_depend_on (step, prev_step);
2602
2603   /* gcast phase 3: confirmation and grading */
2604   for (k = 0; k < n; k++)
2605   {
2606     p1 = k;
2607     p2 = me;
2608     arrange_peers (&p1, &p2, n);
2609     task = ((struct TaskEntry) {
2610       .step = step,
2611       .start = task_start_reconcile,
2612       .cancel = task_cancel_reconcile,
2613       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2614     });
2615     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2616     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2617     /* If there was at least one element in the echo round that was
2618        contested (i.e. it had no n-t majority), then we let the other peers
2619        know, and other peers let us know.  The contested flag for each peer is
2620        stored in the rfn. */
2621     task.cls.setop.transceive_contested = GNUNET_YES;
2622     put_task (session->taskmap, &task);
2623   }
2624
2625   prev_step = step;
2626   /* Same round, since step only has local tasks */
2627   step = create_step (session, round);
2628 #ifdef GNUNET_EXTRA_LOGGING
2629   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2630 #endif
2631   step_depend_on (step, prev_step);
2632
2633   task = ((struct TaskEntry) {
2634     .step = step,
2635     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2636     .start = task_start_grade,
2637   });
2638   put_task (session->taskmap, &task);
2639
2640   step_depend_on (step_after, step);
2641 }
2642
2643
2644 static void
2645 construct_task_graph (struct ConsensusSession *session)
2646 {
2647   uint16_t n = session->num_peers;
2648   uint16_t t = n / 3;
2649
2650   uint16_t me = session->local_peer_idx;
2651
2652   uint16_t p1;
2653   uint16_t p2;
2654
2655   /* The task we're currently setting up. */
2656   struct TaskEntry task;
2657
2658   /* Current leader */
2659   unsigned int lead;
2660
2661   struct Step *step;
2662   struct Step *prev_step;
2663
2664   unsigned int round = 0;
2665
2666   unsigned int i;
2667
2668   // XXX: introduce first step,
2669   // where we wait for all insert acks
2670   // from the set service
2671
2672   /* faster but brittle all-to-all */
2673
2674   // XXX: Not implemented yet
2675
2676   /* all-to-all step */
2677
2678   step = create_step (session, round);
2679
2680 #ifdef GNUNET_EXTRA_LOGGING
2681   step->debug_name = GNUNET_strdup ("all to all");
2682 #endif
2683
2684   for (i = 0; i < n; i++)
2685   {
2686     p1 = me;
2687     p2 = i;
2688     arrange_peers (&p1, &p2, n);
2689     task = ((struct TaskEntry) {
2690       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2691       .step = step,
2692       .start = task_start_reconcile,
2693       .cancel = task_cancel_reconcile,
2694     });
2695     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2696     task.cls.setop.output_set = task.cls.setop.input_set;
2697     task.cls.setop.do_not_remove = GNUNET_YES;
2698     put_task (session->taskmap, &task);
2699   }
2700
2701   prev_step = step;
2702   step = NULL;
2703
2704   round += 1;
2705
2706   /* Byzantine union */
2707
2708   /* sequential repetitions of the gradecasts */
2709   for (i = 0; i < t + 1; i++)
2710   {
2711     struct Step *step_rep_start;
2712     struct Step *step_rep_end;
2713
2714     /* Every repetition is in a separate round. */
2715     step_rep_start = create_step (session, round);
2716 #ifdef GNUNET_EXTRA_LOGGING
2717     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2718 #endif
2719
2720     step_depend_on (step_rep_start, prev_step);
2721
2722     /* gradecast has three rounds */
2723     round += 3;
2724     step_rep_end = create_step (session, round);
2725 #ifdef GNUNET_EXTRA_LOGGING
2726     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2727 #endif
2728
2729     /* parallel gradecasts */
2730     for (lead = 0; lead < n; lead++)
2731       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2732
2733     task = ((struct TaskEntry) {
2734       .step = step_rep_end,
2735       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2736       .start = task_start_apply_round,
2737     });
2738     put_task (session->taskmap, &task);
2739
2740     prev_step = step_rep_end;
2741   }
2742
2743  /* There is no next gradecast round, thus the final
2744     start step is the overall end step of the gradecasts */
2745   round += 1;
2746   step = create_step (session, round);
2747 #ifdef GNUNET_EXTRA_LOGGING
2748   GNUNET_asprintf (&step->debug_name, "finish");
2749 #endif
2750   step_depend_on (step, prev_step);
2751
2752   task = ((struct TaskEntry) {
2753     .step = step,
2754     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2755     .start = task_start_finish,
2756   });
2757   task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 };
2758
2759   put_task (session->taskmap, &task);
2760 }
2761
2762
2763 /**
2764  * Initialize the session, continue receiving messages from the owning client
2765  *
2766  * @param session the session to initialize
2767  * @param join_msg the join message from the client
2768  */
2769 static void
2770 initialize_session (struct ConsensusSession *session,
2771                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2772 {
2773   struct ConsensusSession *other_session;
2774
2775   initialize_session_peer_list (session, join_msg);
2776   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
2777   compute_global_id (session, &join_msg->session_id);
2778
2779   /* Check if some local client already owns the session.
2780      It is only legal to have a session with an existing global id
2781      if all other sessions with this global id are finished.*/
2782   other_session = sessions_head;
2783   while (NULL != other_session)
2784   {
2785     if ((other_session != session) &&
2786         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2787     {
2788       //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2789       //{
2790       //  GNUNET_break (0);
2791       //  destroy_session (session);
2792       //  return;
2793       //}
2794       break;
2795     }
2796     other_session = other_session->next;
2797   }
2798
2799   session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
2800   session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
2801
2802   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %ums created\n",
2803               (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
2804
2805   session->local_peer_idx = get_peer_idx (&my_peer, session);
2806   GNUNET_assert (-1 != session->local_peer_idx);
2807   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2808                                              &session->global_id,
2809                                              set_listen_cb, session);
2810   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2811
2812   session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2813   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2814   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2815   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2816
2817   {
2818     struct SetEntry *client_set;
2819     client_set = GNUNET_new (struct SetEntry);
2820     client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2821     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2822     put_set (session, client_set);
2823   }
2824
2825   session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2826
2827   /* Just construct the task graph,
2828      but don't run anything until the client calls conclude. */
2829   construct_task_graph (session);
2830
2831   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2832 }
2833
2834
2835 static struct ConsensusSession *
2836 get_session_by_client (struct GNUNET_SERVER_Client *client)
2837 {
2838   struct ConsensusSession *session;
2839
2840   session = sessions_head;
2841   while (NULL != session)
2842   {
2843     if (session->client == client)
2844       return session;
2845     session = session->next;
2846   }
2847   return NULL;
2848 }
2849
2850
2851 /**
2852  * Called when a client wants to join a consensus session.
2853  *
2854  * @param cls unused
2855  * @param client client that sent the message
2856  * @param m message sent by the client
2857  */
2858 static void
2859 client_join (void *cls,
2860              struct GNUNET_SERVER_Client *client,
2861              const struct GNUNET_MessageHeader *m)
2862 {
2863   struct ConsensusSession *session;
2864
2865   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
2866
2867   session = get_session_by_client (client);
2868   if (NULL != session)
2869   {
2870     GNUNET_break (0);
2871     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2872     return;
2873   }
2874   session = GNUNET_new (struct ConsensusSession);
2875   session->client = client;
2876   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
2877   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
2878   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
2879   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2880
2881   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
2882 }
2883
2884
2885 static void
2886 client_insert_done (void *cls)
2887 {
2888   // FIXME: implement
2889 }
2890
2891
2892 /**
2893  * Called when a client performs an insert operation.
2894  *
2895  * @param cls (unused)
2896  * @param client client handle
2897  * @param m message sent by the client
2898  */
2899 void
2900 client_insert (void *cls,
2901                struct GNUNET_SERVER_Client *client,
2902                const struct GNUNET_MessageHeader *m)
2903 {
2904   struct ConsensusSession *session;
2905   struct GNUNET_CONSENSUS_ElementMessage *msg;
2906   struct GNUNET_SET_Element *element;
2907   ssize_t element_size;
2908   struct GNUNET_SET_Handle *initial_set;
2909
2910   session = get_session_by_client (client);
2911
2912   if (NULL == session)
2913   {
2914     GNUNET_break (0);
2915     GNUNET_SERVER_client_disconnect (client);
2916     return;
2917   }
2918
2919   if (GNUNET_YES == session->conclude_started)
2920   {
2921     GNUNET_break (0);
2922     GNUNET_SERVER_client_disconnect (client);
2923     return;
2924   }
2925
2926   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
2927   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
2928   if (element_size < 0)
2929   {
2930     GNUNET_break (0);
2931     return;
2932   }
2933
2934   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
2935   element->element_type = msg->element_type;
2936   element->size = element_size;
2937   memcpy (&element[1], &msg[1], element_size);
2938   element->data = &element[1];
2939   {
2940     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
2941     struct SetEntry *entry;
2942     entry = lookup_set (session, &key);
2943     GNUNET_assert (NULL != entry);
2944     initial_set = entry->h;
2945   }
2946   session->num_client_insert_pending++;
2947   GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
2948
2949 #ifdef GNUNET_EXTRA_LOGGING
2950   {
2951     struct GNUNET_HashCode hash;
2952
2953     GNUNET_SET_element_hash (element, &hash);
2954
2955     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
2956                 session->local_peer_idx,
2957                 GNUNET_h2s (&hash));
2958   }
2959 #endif
2960
2961   GNUNET_free (element);
2962   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2963 }
2964
2965
2966 /**
2967  * Called when a client performs the conclude operation.
2968  *
2969  * @param cls (unused)
2970  * @param client client handle
2971  * @param message message sent by the client
2972  */
2973 static void
2974 client_conclude (void *cls,
2975                  struct GNUNET_SERVER_Client *client,
2976                  const struct GNUNET_MessageHeader *message)
2977 {
2978   struct ConsensusSession *session;
2979
2980   session = get_session_by_client (client);
2981   if (NULL == session)
2982   {
2983     /* client not found */
2984     GNUNET_break (0);
2985     GNUNET_SERVER_client_disconnect (client);
2986     return;
2987   }
2988
2989   if (GNUNET_YES == session->conclude_started)
2990   {
2991     /* conclude started twice */
2992     GNUNET_break (0);
2993     GNUNET_SERVER_client_disconnect (client);
2994     destroy_session (session);
2995     return;
2996   }
2997
2998   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
2999
3000   session->conclude_started = GNUNET_YES;
3001
3002   install_step_timeouts (session);
3003   run_ready_steps (session);
3004
3005
3006   GNUNET_SERVER_receive_done (client, GNUNET_OK);
3007 }
3008
3009
3010 /**
3011  * Called to clean up, after a shutdown has been requested.
3012  *
3013  * @param cls closure
3014  * @param tc context information (why was this task triggered now)
3015  */
3016 static void
3017 shutdown_task (void *cls,
3018                const struct GNUNET_SCHEDULER_TaskContext *tc)
3019 {
3020   while (NULL != sessions_head)
3021     destroy_session (sessions_head);
3022
3023   GNUNET_STATISTICS_destroy (statistics, GNUNET_YES);
3024   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
3025 }
3026
3027
3028 /**
3029  * Clean up after a client after it is
3030  * disconnected (either by us or by itself)
3031  *
3032  * @param cls closure, unused
3033  * @param client the client to clean up after
3034  */
3035 void
3036 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
3037 {
3038   struct ConsensusSession *session;
3039
3040   session = get_session_by_client (client);
3041   if (NULL == session)
3042     return;
3043   // FIXME: destroy if we can
3044 }
3045
3046
3047
3048 /**
3049  * Start processing consensus requests.
3050  *
3051  * @param cls closure
3052  * @param server the initialized server
3053  * @param c configuration to use
3054  */
3055 static void
3056 run (void *cls, struct GNUNET_SERVER_Handle *server,
3057      const struct GNUNET_CONFIGURATION_Handle *c)
3058 {
3059   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
3060     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3061         sizeof (struct GNUNET_MessageHeader)},
3062     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
3063     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
3064     {NULL, NULL, 0, 0}
3065   };
3066
3067   cfg = c;
3068   srv = server;
3069   if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
3070   {
3071     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
3072     GNUNET_break (0);
3073     GNUNET_SCHEDULER_shutdown ();
3074     return;
3075   }
3076   statistics = GNUNET_STATISTICS_create ("consensus", cfg);
3077   GNUNET_SERVER_add_handlers (server, server_handlers);
3078   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
3079   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
3080   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
3081 }
3082
3083
3084 /**
3085  * The main function for the consensus service.
3086  *
3087  * @param argc number of arguments from the command line
3088  * @param argv command line arguments
3089  * @return 0 ok, 1 on error
3090  */
3091 int
3092 main (int argc, char *const *argv)
3093 {
3094   int ret;
3095   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
3096   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
3097   return (GNUNET_OK == ret) ? 0 : 1;
3098 }