- fix use of uninitialized memory
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/gnunet-service-consensus.c
23  * @brief multi-peer set reconciliation
24  * @author Florian Dold
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_consensus_service.h"
33 #include "consensus_protocol.h"
34 #include "consensus.h"
35
36
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
38
39
40 enum ReferendumVote
41 {
42   /**
43    * Vote that nothing should change.
44    * This option is never voted explicitly.
45    */
46   VOTE_STAY = 0,
47   /**
48    * Vote that an element should be added.
49    */
50   VOTE_ADD = 1,
51   /**
52    * Vote that an element should be removed.
53    */
54   VOTE_REMOVE = 2,
55 };
56
57
58 GNUNET_NETWORK_STRUCT_BEGIN
59
60
61 struct ContestedPayload
62 {
63 };
64
65 /**
66  * Tuple of integers that together
67  * identify a task uniquely.
68  */
69 struct TaskKey {
70   /**
71    * A value from 'enum PhaseKind'.
72    */
73   uint16_t kind GNUNET_PACKED;
74
75   /**
76    * Number of the first peer
77    * in canonical order.
78    */
79   int16_t peer1 GNUNET_PACKED;
80
81   /**
82    * Number of the second peer in canonical order.
83    */
84   int16_t peer2 GNUNET_PACKED;
85
86   /**
87    * Repetition of the gradecast phase.
88    */
89   int16_t repetition GNUNET_PACKED;
90
91   /**
92    * Leader in the gradecast phase.
93    *
94    * Can be different from both peer1 and peer2.
95    */
96   int16_t leader GNUNET_PACKED;
97 };
98
99
100
101 struct SetKey
102 {
103   int set_kind GNUNET_PACKED;
104   int k1 GNUNET_PACKED;
105   int k2 GNUNET_PACKED;
106 };
107
108
109 struct SetEntry
110 {
111   struct SetKey key;
112   struct GNUNET_SET_Handle *h;
113   /**
114    * GNUNET_YES if the set resulted
115    * from applying a referendum with contested
116    * elements.
117    */
118   int is_contested;
119 };
120
121
122 struct DiffKey
123 {
124   int diff_kind GNUNET_PACKED;
125   int k1 GNUNET_PACKED;
126   int k2 GNUNET_PACKED;
127 };
128
129 struct RfnKey
130 {
131   int rfn_kind GNUNET_PACKED;
132   int k1 GNUNET_PACKED;
133   int k2 GNUNET_PACKED;
134 };
135
136
137 GNUNET_NETWORK_STRUCT_END
138
139 enum PhaseKind
140 {
141   PHASE_KIND_ALL_TO_ALL,
142   PHASE_KIND_GRADECAST_LEADER,
143   PHASE_KIND_GRADECAST_ECHO,
144   PHASE_KIND_GRADECAST_ECHO_GRADE,
145   PHASE_KIND_GRADECAST_CONFIRM,
146   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
147   /**
148    * Apply a repetition of the all-to-all
149    * gradecast to the current set.
150    */
151   PHASE_KIND_APPLY_REP,
152   PHASE_KIND_FINISH,
153 };
154
155
156 enum SetKind
157 {
158   SET_KIND_NONE = 0,
159   SET_KIND_CURRENT,
160   SET_KIND_LEADER_PROPOSAL,
161   SET_KIND_ECHO_RESULT,
162 };
163
164 enum DiffKind
165 {
166   DIFF_KIND_NONE = 0,
167   DIFF_KIND_LEADER_PROPOSAL,
168   DIFF_KIND_LEADER_CONSENSUS,
169   DIFF_KIND_GRADECAST_RESULT,
170 };
171
172 enum RfnKind
173 {
174   RFN_KIND_NONE = 0,
175   RFN_KIND_ECHO,
176   RFN_KIND_CONFIRM,
177   RFN_KIND_GRADECAST_RESULT
178 };
179
180
181 struct SetOpCls
182 {
183   struct SetKey input_set;
184
185   struct SetKey output_set;
186   struct RfnKey output_rfn;
187   struct DiffKey output_diff;
188
189   int do_not_remove;
190
191   int transceive_contested;
192
193   struct GNUNET_SET_OperationHandle *op;
194 };
195
196
197 struct FinishCls
198 {
199   struct SetKey input_set;
200 };
201
202 /**
203  * Closure for both @a start_task
204  * and @a cancel_task.
205  */
206 union TaskFuncCls
207 {
208   struct SetOpCls setop;
209   struct FinishCls finish;
210 };
211
212 struct TaskEntry;
213
214 typedef void (*TaskFunc) (struct TaskEntry *task);
215
216 /*
217  * Node in the consensus task graph.
218  */
219 struct TaskEntry
220 {
221   struct TaskKey key;
222
223   struct Step *step;
224
225   int is_started;
226
227   int is_finished;
228
229   TaskFunc start;
230   TaskFunc cancel;
231
232   union TaskFuncCls cls;
233 };
234
235
236 struct Step
237 {
238   /**
239    * All steps of one session are in a
240    * linked list for easier deallocation.
241    */
242   struct Step *prev;
243
244   /**
245    * All steps of one session are in a
246    * linked list for easier deallocation.
247    */
248   struct Step *next;
249
250   struct ConsensusSession *session;
251
252   struct TaskEntry **tasks;
253   unsigned int tasks_len;
254   unsigned int tasks_cap;
255
256   unsigned int finished_tasks;
257
258   /*
259    * Tasks that have this task as dependency.
260    *
261    * We store pointers to subordinates rather
262    * than to prerequisites since it makes
263    * tracking the readiness of a task easier.
264    */
265   struct Step **subordinates;
266   unsigned int subordinates_len;
267   unsigned int subordinates_cap;
268
269   /**
270    * Counter for the prerequisites of
271    * this step.
272    */
273   size_t pending_prereq;
274
275   /*
276    * Task that will run this step despite
277    * any pending prerequisites.
278    */
279   struct GNUNET_SCHEDULER_Task *timeout_task;
280
281   unsigned int is_running;
282
283   unsigned int is_finished;
284
285   /*
286    * Synchrony round of the task.
287    * Determines the deadline for the task.
288    */
289   unsigned int round;
290
291   /**
292    * Human-readable name for
293    * the task, used for debugging.
294    */
295   char *debug_name;
296 };
297
298
299 struct RfnElementInfo
300 {
301   const struct GNUNET_SET_Element *element;
302
303   /*
304    * GNUNET_YES if the peer votes for the proposal.
305    */
306   int *votes;
307
308   /**
309    * Proposal for this element,
310    * can only be VOTE_ADD or VOTE_REMOVE.
311    */
312   enum ReferendumVote proposal;
313 };
314
315
316 struct ReferendumEntry
317 {
318   struct RfnKey key;
319
320   /*
321    * Elements where there is at least one proposed change.
322    *
323    * Maps the hash of the GNUNET_SET_Element
324    * to 'struct RfnElementInfo'.
325    */
326   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
327
328   unsigned int num_peers;
329
330   /**
331    * Stores, for every peer in the session,
332    * whether the peer finished the whole referendum.
333    *
334    * Votes from peers are only counted if they're
335    * marked as commited (#GNUNET_YES) in the referendum.
336    *
337    * Otherwise (#GNUNET_NO), the requested changes are
338    * not counted for majority votes or thresholds.
339    */
340   int *peer_commited;
341
342
343   /**
344    * Contestation state of the peer.  If a peer is contested, the values it
345    * contributed are still counted for applying changes, but the grading is
346    * affected.
347    */
348   int *peer_contested;
349 };
350
351
352 struct DiffElementInfo
353 {
354   const struct GNUNET_SET_Element *element;
355
356   /**
357    * Positive weight for 'add', negative
358    * weights for 'remove'.
359    */
360   int weight;
361 };
362
363
364 /**
365  * Weighted diff.
366  */
367 struct DiffEntry
368 {
369   struct DiffKey key;
370   struct GNUNET_CONTAINER_MultiHashMap *changes;
371 };
372
373
374
375 /**
376  * A consensus session consists of one local client and the remote authorities.
377  */
378 struct ConsensusSession
379 {
380   /**
381    * Consensus sessions are kept in a DLL.
382    */
383   struct ConsensusSession *next;
384
385   /**
386    * Consensus sessions are kept in a DLL.
387    */
388   struct ConsensusSession *prev;
389
390   unsigned int num_client_insert_pending;
391
392   struct GNUNET_CONTAINER_MultiHashMap *setmap;
393   struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
394   struct GNUNET_CONTAINER_MultiHashMap *diffmap;
395
396   /**
397    * Array of peers with length 'num_peers'.
398    */
399   int *peers_blacklisted;
400
401   /*
402    * Mapping from (hashed) TaskKey to TaskEntry.
403    *
404    * We map the application_id for a round to the task that should be
405    * executed, so we don't have to go through all task whenever we get
406    * an incoming set op request.
407    */
408   struct GNUNET_CONTAINER_MultiHashMap *taskmap;
409
410   struct Step *steps_head;
411   struct Step *steps_tail;
412
413   int conclude_started;
414
415   int conclude_done;
416
417   /**
418   * Global consensus identification, computed
419   * from the session id and participating authorities.
420   */
421   struct GNUNET_HashCode global_id;
422
423   /**
424    * Client that inhabits the session
425    */
426   struct GNUNET_SERVER_Client *client;
427
428   /**
429    * Queued messages to the client.
430    */
431   struct GNUNET_MQ_Handle *client_mq;
432
433   /**
434    * Time when the conclusion of the consensus should begin.
435    */
436   struct GNUNET_TIME_Absolute conclude_start;
437
438   /**
439    * Timeout for all rounds together, single rounds will schedule a timeout task
440    * with a fraction of the conclude timeout.
441    * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
442    */
443   struct GNUNET_TIME_Absolute conclude_deadline;
444
445   struct GNUNET_PeerIdentity *peers;
446
447   /**
448    * Number of other peers in the consensus.
449    */
450   unsigned int num_peers;
451
452   /**
453    * Index of the local peer in the peers array
454    */
455   unsigned int local_peer_idx;
456
457   /**
458    * Listener for requests from other peers.
459    * Uses the session's global id as app id.
460    */
461   struct GNUNET_SET_ListenHandle *set_listener;
462 };
463
464 /**
465  * Linked list of sessions this peer participates in.
466  */
467 static struct ConsensusSession *sessions_head;
468
469 /**
470  * Linked list of sessions this peer participates in.
471  */
472 static struct ConsensusSession *sessions_tail;
473
474 /**
475  * Configuration of the consensus service.
476  */
477 static const struct GNUNET_CONFIGURATION_Handle *cfg;
478
479 /**
480  * Handle to the server for this service.
481  */
482 static struct GNUNET_SERVER_Handle *srv;
483
484 /**
485  * Peer that runs this service.
486  */
487 static struct GNUNET_PeerIdentity my_peer;
488
489
490 static void
491 finish_task (struct TaskEntry *task);
492
493 static void
494 run_ready_steps (struct ConsensusSession *session);
495
496 static const char *
497 phasename (uint16_t phase)
498 {
499   switch (phase)
500   {
501     case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
502     case PHASE_KIND_FINISH: return "FINISH";
503     case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
504     case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
505     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
506     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
507     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
508     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
509     default: return "(unknown)";
510   }
511 }
512
513
514 static const char *
515 setname (uint16_t kind)
516 {
517   switch (kind)
518   {
519     case SET_KIND_CURRENT: return "CURRENT";
520     case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
521     case SET_KIND_NONE: return "NONE";
522     default: return "(unknown)";
523   }
524 }
525
526 static const char *
527 rfnname (uint16_t kind)
528 {
529   switch (kind)
530   {
531     case RFN_KIND_NONE: return "NONE";
532     case RFN_KIND_ECHO: return "ECHO";
533     case RFN_KIND_CONFIRM: return "CONFIRM";
534     default: return "(unknown)";
535   }
536 }
537
538 static const char *
539 diffname (uint16_t kind)
540 {
541   switch (kind)
542   {
543     case DIFF_KIND_NONE: return "NONE";
544     case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
545     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
546     case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
547     default: return "(unknown)";
548   }
549 }
550
551 #ifdef GNUNET_EXTRA_LOGGING
552
553
554 static const char *
555 debug_str_element (const struct GNUNET_SET_Element *el)
556 {
557   struct GNUNET_HashCode hash;
558
559   GNUNET_SET_element_hash (el, &hash);
560
561   return GNUNET_h2s (&hash);
562 }
563
564 static const char *
565 debug_str_task_key (struct TaskKey *tk)
566 {
567   static char buf[256];
568
569   snprintf (buf, sizeof (buf),
570             "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
571             phasename (tk->kind), tk->peer1, tk->peer2,
572             tk->leader, tk->repetition);
573
574   return buf;
575 }
576
577 static const char *
578 debug_str_diff_key (struct DiffKey *dk)
579 {
580   static char buf[256];
581
582   snprintf (buf, sizeof (buf),
583             "DiffKey kind=%s, k1=%d, k2=%d",
584             diffname (dk->diff_kind), dk->k1, dk->k2);
585
586   return buf;
587 }
588
589 static const char *
590 debug_str_set_key (const struct SetKey *sk)
591 {
592   static char buf[256];
593
594   snprintf (buf, sizeof (buf),
595             "SetKey kind=%s, k1=%d, k2=%d",
596             setname (sk->set_kind), sk->k1, sk->k2);
597
598   return buf;
599 }
600
601
602 static const char *
603 debug_str_rfn_key (const struct RfnKey *rk)
604 {
605   static char buf[256];
606
607   snprintf (buf, sizeof (buf),
608             "RfnKey kind=%s, k1=%d, k2=%d",
609             rfnname (rk->rfn_kind), rk->k1, rk->k2);
610
611   return buf;
612 }
613
614 #endif /* GNUNET_EXTRA_LOGGING */
615
616
617 /**
618  * Destroy a session, free all resources associated with it.
619  *
620  * @param session the session to destroy
621  */
622 static void
623 destroy_session (struct ConsensusSession *session)
624 {
625   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
626   if (NULL != session->set_listener)
627   {
628     GNUNET_SET_listen_cancel (session->set_listener);
629     session->set_listener = NULL;
630   }
631   if (NULL != session->client_mq)
632   {
633     GNUNET_MQ_destroy (session->client_mq);
634     session->client_mq = NULL;
635     /* The MQ cleanup will also disconnect the underlying client. */
636     session->client = NULL;
637   }
638   if (NULL != session->client)
639   {
640     GNUNET_SERVER_client_disconnect (session->client);
641     session->client = NULL;
642   }
643   GNUNET_free (session);
644 }
645
646
647 /**
648  * Send the final result set of the consensus to the client, element by
649  * element.
650  *
651  * @param cls closure
652  * @param element the current element, NULL if all elements have been
653  *        iterated over
654  * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
655  */
656 static int
657 send_to_client_iter (void *cls,
658                      const struct GNUNET_SET_Element *element)
659 {
660   struct TaskEntry *task = (struct TaskEntry *) cls;
661   struct ConsensusSession *session = task->step->session;
662   struct GNUNET_MQ_Envelope *ev;
663
664   if (NULL != element)
665   {
666     struct GNUNET_CONSENSUS_ElementMessage *m;
667
668     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669                 "P%d: sending element %s to client\n",
670                 session->local_peer_idx,
671                 debug_str_element (element));
672
673     ev = GNUNET_MQ_msg_extra (m, element->size,
674                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
675     m->element_type = htons (element->element_type);
676     memcpy (&m[1], element->data, element->size);
677     GNUNET_MQ_send (session->client_mq, ev);
678   }
679   else
680   {
681     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
682                 "P%d: finished iterating elements for client\n",
683                 session->local_peer_idx);
684     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
685     GNUNET_MQ_send (session->client_mq, ev);
686   }
687   return GNUNET_YES;
688 }
689
690
691 static struct SetEntry *
692 lookup_set (struct ConsensusSession *session, struct SetKey *key)
693 {
694   struct GNUNET_HashCode hash;
695
696   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
697               "P%u: looking up set {%s}\n",
698               session->local_peer_idx,
699               debug_str_set_key (key));
700
701   GNUNET_assert (SET_KIND_NONE != key->set_kind);
702   GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
703   return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
704 }
705
706
707 static struct DiffEntry *
708 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
709 {
710   struct GNUNET_HashCode hash;
711
712   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713               "P%u: looking up diff {%s}\n",
714               session->local_peer_idx,
715               debug_str_diff_key (key));
716
717   GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
718   GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
719   return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
720 }
721
722
723 static struct ReferendumEntry *
724 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
725 {
726   struct GNUNET_HashCode hash;
727
728   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
729               "P%u: looking up rfn {%s}\n",
730               session->local_peer_idx,
731               debug_str_rfn_key (key));
732
733   GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
734   GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
735   return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
736 }
737
738
739 static void
740 diff_insert (struct DiffEntry *diff,
741              int weight,
742              const struct GNUNET_SET_Element *element)
743 {
744   struct DiffElementInfo *di;
745   struct GNUNET_HashCode hash;
746
747   GNUNET_assert ( (1 == weight) || (-1 == weight));
748
749   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750               "diff_insert with element size %u\n",
751               element->size);
752
753   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
754               "hashing element\n");
755
756   GNUNET_SET_element_hash (element, &hash);
757
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759               "hashed element\n");
760
761   di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
762
763   if (NULL == di)
764   {
765     di = GNUNET_new (struct DiffElementInfo);
766     di->element = GNUNET_SET_element_dup (element);
767     GNUNET_assert (GNUNET_OK ==
768                    GNUNET_CONTAINER_multihashmap_put (diff->changes,
769                                                       &hash, di,
770                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
771   }
772
773   di->weight = weight;
774 }
775
776
777 static void
778 rfn_commit (struct ReferendumEntry *rfn,
779             uint16_t commit_peer)
780 {
781   GNUNET_assert (commit_peer < rfn->num_peers);
782
783   rfn->peer_commited[commit_peer] = GNUNET_YES;
784 }
785
786
787 static void
788 rfn_contest (struct ReferendumEntry *rfn,
789              uint16_t contested_peer)
790 {
791   GNUNET_assert (contested_peer < rfn->num_peers);
792
793   rfn->peer_contested[contested_peer] = GNUNET_YES;
794 }
795
796
797 static uint16_t
798 rfn_noncontested (struct ReferendumEntry *rfn)
799 {
800   uint16_t i;
801   uint16_t ret;
802
803   ret = 0;
804   for (i = 0; i < rfn->num_peers; i++)
805     if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
806       ret++;
807
808   return ret;
809 }
810
811 static void
812 rfn_vote (struct ReferendumEntry *rfn,
813           uint16_t voting_peer,
814           enum ReferendumVote vote,
815           const struct GNUNET_SET_Element *element)
816 {
817   struct RfnElementInfo *ri;
818   struct GNUNET_HashCode hash;
819
820   GNUNET_assert (voting_peer < rfn->num_peers);
821
822   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
823      since VOTE_KEEP is implicit in not voting. */
824   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
825
826   GNUNET_SET_element_hash (element, &hash);
827   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
828
829   if (NULL == ri)
830   {
831     ri = GNUNET_new (struct RfnElementInfo);
832     ri->element = GNUNET_SET_element_dup (element);
833     ri->votes = GNUNET_new_array (rfn->num_peers, int);
834     GNUNET_assert (GNUNET_OK ==
835                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
836                                                       &hash, ri,
837                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
838   }
839
840   ri->votes[voting_peer] = GNUNET_YES;
841   ri->proposal = vote;
842 }
843
844
845 uint16_t
846 task_other_peer (struct TaskEntry *task)
847 {
848   uint16_t me = task->step->session->local_peer_idx;
849   if (task->key.peer1 == me)
850     return task->key.peer2;
851   return task->key.peer1;
852 }
853
854
855 /**
856  * Callback for set operation results. Called for each element
857  * in the result set.
858  *
859  * @param cls closure
860  * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
861  * @param status see enum GNUNET_SET_Status
862  */
863 static void
864 set_result_cb (void *cls,
865                const struct GNUNET_SET_Element *element,
866                enum GNUNET_SET_Status status)
867 {
868   struct TaskEntry *task = cls;
869   struct ConsensusSession *session = task->step->session;
870   struct SetEntry *output_set = NULL;
871   struct DiffEntry *output_diff = NULL;
872   struct ReferendumEntry *output_rfn = NULL;
873   unsigned int other_idx;
874   struct SetOpCls *setop;
875
876   setop = &task->cls.setop;
877
878   
879   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880               "P%u: got set result for {%s}, status %u\n",
881               session->local_peer_idx,
882               debug_str_task_key (&task->key),
883               status);
884
885   if (GNUNET_NO == task->is_started)
886   {
887     GNUNET_break_op (0);
888     return;
889   }
890
891   if (GNUNET_YES == task->is_finished)
892   {
893     GNUNET_break_op (0);
894     return;
895   }
896
897   other_idx = task_other_peer (task);
898
899   if (SET_KIND_NONE != setop->output_set.set_kind)
900   {
901     output_set = lookup_set (session, &setop->output_set);
902     GNUNET_assert (NULL != output_set);
903   }
904
905   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
906   {
907     output_diff = lookup_diff (session, &setop->output_diff);
908     GNUNET_assert (NULL != output_diff);
909   }
910
911   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
912   {
913     output_rfn = lookup_rfn (session, &setop->output_rfn);
914     GNUNET_assert (NULL != output_rfn);
915   }
916
917   if (GNUNET_YES == session->peers_blacklisted[other_idx])
918   {
919     /* Peer might have been blacklisted
920        by a gradecast running in parallel, ignore elements from now */
921     if (GNUNET_SET_STATUS_ADD_LOCAL == status)
922       return;
923     if (GNUNET_SET_STATUS_ADD_REMOTE == status)
924       return;
925   }
926
927   if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
928   {
929     if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
930     {
931       GNUNET_assert (NULL != output_rfn);
932       rfn_contest (output_rfn, task_other_peer (task));
933       return;
934     }
935   }
936
937   switch (status)
938   {
939     case GNUNET_SET_STATUS_ADD_LOCAL:
940       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941                   "Adding element in Task {%s}\n",
942                   debug_str_task_key (&task->key));
943       if (NULL != output_set)
944       {
945         // FIXME: record pending adds, use callback
946         GNUNET_SET_add_element (output_set->h,
947                                 element,
948                                 NULL,
949                                 NULL);
950 #ifdef GNUNET_EXTRA_LOGGING
951         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
952                     "P%u: adding element %s into set {%s} of task {%s}\n",
953                     session->local_peer_idx,
954                     debug_str_element (element),
955                     debug_str_set_key (&setop->output_set),
956                     debug_str_task_key (&task->key));
957 #endif
958       }
959       if (NULL != output_diff)
960       {
961         diff_insert (output_diff, 1, element);
962 #ifdef GNUNET_EXTRA_LOGGING
963         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
964                     "P%u: adding element %s into diff {%s} of task {%s}\n",
965                     session->local_peer_idx,
966                     debug_str_element (element),
967                     debug_str_diff_key (&setop->output_diff),
968                     debug_str_task_key (&task->key));
969 #endif
970       }
971       if (NULL != output_rfn)
972       {
973         rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
974 #ifdef GNUNET_EXTRA_LOGGING
975         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
976                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
977                     session->local_peer_idx,
978                     debug_str_element (element),
979                     debug_str_rfn_key (&setop->output_rfn),
980                     debug_str_task_key (&task->key));
981 #endif
982       }
983       // XXX: add result to structures in task
984       break;
985     case GNUNET_SET_STATUS_ADD_REMOTE:
986       if (GNUNET_YES == setop->do_not_remove)
987         break;
988       if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
989         break;
990       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991                   "Removing element in Task {%s}\n",
992                   debug_str_task_key (&task->key));
993       if (NULL != output_set)
994       {
995         // FIXME: record pending adds, use callback
996         GNUNET_SET_remove_element (output_set->h,
997                                    element,
998                                    NULL,
999                                    NULL);
1000 #ifdef GNUNET_EXTRA_LOGGING
1001         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1002                     "P%u: removing element %s from set {%s} of task {%s}\n",
1003                     session->local_peer_idx,
1004                     debug_str_element (element),
1005                     debug_str_set_key (&setop->output_set),
1006                     debug_str_task_key (&task->key));
1007 #endif
1008       }
1009       if (NULL != output_diff)
1010       {
1011         diff_insert (output_diff, -1, element);
1012 #ifdef GNUNET_EXTRA_LOGGING
1013         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1014                     "P%u: removing element %s from diff {%s} of task {%s}\n",
1015                     session->local_peer_idx,
1016                     debug_str_element (element),
1017                     debug_str_diff_key (&setop->output_diff),
1018                     debug_str_task_key (&task->key));
1019 #endif
1020       }
1021       if (NULL != output_rfn)
1022       {
1023         rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1024 #ifdef GNUNET_EXTRA_LOGGING
1025         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1026                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
1027                     session->local_peer_idx,
1028                     debug_str_element (element),
1029                     debug_str_rfn_key (&setop->output_rfn),
1030                     debug_str_task_key (&task->key));
1031 #endif
1032       }
1033       break;
1034     case GNUNET_SET_STATUS_DONE:
1035       // XXX: check first if any changes to the underlying
1036       // set are still pending
1037       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038                   "Finishing setop in Task {%s}\n",
1039                   debug_str_task_key (&task->key));
1040       if (NULL != output_rfn)
1041       {
1042         rfn_commit (output_rfn, task_other_peer (task));
1043       }
1044       finish_task (task);
1045       break;
1046     case GNUNET_SET_STATUS_FAILURE:
1047       // XXX: cleanup
1048       GNUNET_break_op (0);
1049       finish_task (task);
1050       return;
1051     default:
1052       /* not reached */
1053       GNUNET_assert (0);
1054   }
1055 }
1056
1057 #ifdef EVIL
1058
1059 enum Evilness
1060 {
1061   EVILNESS_NONE,
1062   EVILNESS_CRAM,
1063   EVILNESS_SLACK,
1064 };
1065
1066 static void
1067 get_evilness (struct ConsensusSession *session, enum Evilness *ret_type, unsigned int *ret_num)
1068 {
1069   char *evil_spec;
1070   char *field;
1071   char *evil_type_str = NULL;
1072
1073   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1074   {
1075     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1076                 "P%u: no evilness\n",
1077                 session->local_peer_idx);
1078     *ret_type = EVILNESS_NONE;
1079     return;
1080   }
1081   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082               "P%u: got evilness spec\n",
1083               session->local_peer_idx);
1084
1085   for (field = strtok (evil_spec, "/");
1086        NULL != field;
1087        field = strtok (NULL, "/"))
1088   {
1089     unsigned int peer_num;
1090     unsigned int evil_num;
1091     int ret;
1092
1093     evil_type_str = NULL;
1094
1095     ret = sscanf (field, "%u;%m[a-z];%u", &peer_num, &evil_type_str, &evil_num);
1096
1097     if (ret != 3)
1098     {
1099       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC, behaving like a good peer.\n",
1100                   field); 
1101       goto not_evil;
1102     }
1103
1104     GNUNET_assert (NULL != evil_type_str);
1105
1106     if (peer_num == session->local_peer_idx)
1107     {
1108       if (0 == strcmp ("slack", evil_type_str))
1109         *ret_type = EVILNESS_SLACK;
1110       else if (0 == strcmp ("cram", evil_type_str))
1111       {
1112         *ret_type = EVILNESS_CRAM;
1113         *ret_num = evil_num;
1114       }
1115       else
1116       {
1117         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n"); 
1118         goto not_evil;
1119       }
1120       goto cleanup;
1121     }
1122     /* No GNUNET_free since memory was allocated by libc */
1123     free (evil_type_str);
1124     evil_type_str = NULL;
1125   }
1126 not_evil:
1127   *ret_type = EVILNESS_NONE;
1128 cleanup:
1129   GNUNET_free (evil_spec);
1130   if (NULL != evil_type_str)
1131     free (evil_type_str);
1132 }
1133
1134 #endif
1135
1136
1137 /**
1138  * Commit the appropriate set for a
1139  * task.
1140  */
1141 static void
1142 commit_set (struct ConsensusSession *session,
1143             struct TaskEntry *task)
1144 {
1145   struct SetEntry *set;
1146   struct SetOpCls *setop = &task->cls.setop;
1147
1148   GNUNET_assert (NULL != setop->op);
1149   set = lookup_set (session, &setop->input_set);
1150   GNUNET_assert (NULL != set);
1151
1152 #ifdef EVIL
1153   {
1154     unsigned int i;
1155     unsigned int evil_num;
1156     enum Evilness evilness;
1157
1158     get_evilness (session, &evilness, &evil_num);
1159     switch (evilness)
1160     {
1161       case EVILNESS_CRAM:
1162         /* We're not cramming elements in the
1163            all-to-all round, since that would just
1164            add more elements to the result set, but
1165            wouldn't test robustness. */
1166         if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1167         {
1168           GNUNET_SET_commit (setop->op, set->h);
1169           break;
1170         }
1171         for (i = 0; i < evil_num; i++)
1172         {
1173           struct GNUNET_HashCode hash;
1174           struct GNUNET_SET_Element element;
1175           element.data = &hash;
1176           element.size = sizeof (struct GNUNET_HashCode);
1177           element.element_type = 0;
1178
1179           GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
1180           GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1181 #ifdef GNUNET_EXTRA_LOGGING
1182           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1183                       "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1184                       session->local_peer_idx,
1185                       debug_str_element (&element),
1186                       debug_str_set_key (&setop->input_set),
1187                       debug_str_task_key (&task->key));
1188 #endif
1189         }
1190         GNUNET_SET_commit (setop->op, set->h);
1191         break;
1192       case EVILNESS_SLACK:
1193         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1194                     "P%u: evil peer: slacking\n",
1195                     session->local_peer_idx,
1196                     evil_num);
1197         /* Do nothing. */
1198         break;
1199       case EVILNESS_NONE:
1200         GNUNET_SET_commit (setop->op, set->h);
1201         break;
1202     }
1203   }
1204 #else
1205   if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1206   {
1207     struct GNUNET_SET_Element element;
1208     struct ContestedPayload payload;
1209     element.data = &payload;
1210     element.size = sizeof (struct ContestedPayload);
1211     element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1212     GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1213   }
1214   if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1215   {
1216     GNUNET_SET_commit (setop->op, set->h);
1217   }
1218   else
1219   {
1220     /* For our testcases, we don't want the blacklisted
1221        peers to wait. */
1222     GNUNET_SET_operation_cancel (setop->op);
1223     setop->op = NULL;
1224   }
1225 #endif
1226 }
1227
1228
1229 static void
1230 put_diff (struct ConsensusSession *session,
1231          struct DiffEntry *diff)
1232 {
1233   struct GNUNET_HashCode hash;
1234
1235   GNUNET_assert (NULL != diff);
1236
1237   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1238   GNUNET_assert (GNUNET_OK ==
1239                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1240                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1241 }
1242
1243 static void
1244 put_set (struct ConsensusSession *session,
1245          struct SetEntry *set)
1246 {
1247   struct GNUNET_HashCode hash;
1248
1249   GNUNET_assert (NULL != set->h);
1250
1251   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1252               "Putting set %s\n",
1253               debug_str_set_key (&set->key));
1254
1255   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1256   GNUNET_assert (GNUNET_OK ==
1257                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1258                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1259 }
1260
1261
1262 static void
1263 put_rfn (struct ConsensusSession *session,
1264          struct ReferendumEntry *rfn)
1265 {
1266   struct GNUNET_HashCode hash;
1267
1268   GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1269   GNUNET_assert (GNUNET_OK ==
1270                  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1271                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1272 }
1273
1274
1275
1276 static void
1277 task_cancel_reconcile (struct TaskEntry *task)
1278 {
1279   /* not implemented yet */
1280   GNUNET_assert (0);
1281 }
1282
1283
1284 static void
1285 apply_diff_to_rfn (struct DiffEntry *diff,
1286                    struct ReferendumEntry *rfn,
1287                    uint16_t voting_peer,
1288                    uint16_t num_peers)
1289 {
1290   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1291   struct DiffElementInfo *di;
1292
1293   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1294
1295   while (GNUNET_YES ==
1296          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1297                                                       NULL,
1298                                                       (const void **) &di))
1299   {
1300     if (di->weight > 0)
1301     {
1302       rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1303     }
1304     if (di->weight < 0)
1305     {
1306       rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1307     }
1308   }
1309
1310   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1311 }
1312
1313
1314 struct DiffEntry *
1315 diff_create ()
1316 {
1317   struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1318
1319   d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1320   
1321   return d;
1322 }
1323
1324
1325 struct DiffEntry *
1326 diff_compose (struct DiffEntry *diff_1,
1327               struct DiffEntry *diff_2)
1328 {
1329   struct DiffEntry *diff_new;
1330   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1331   struct DiffElementInfo *di;
1332  
1333   diff_new = diff_create ();
1334
1335   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1336   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1337   {
1338     diff_insert (diff_new, di->weight, di->element);
1339   }
1340   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1341
1342   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1343   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1344   {
1345     diff_insert (diff_new, di->weight, di->element);
1346   }
1347   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1348
1349   return diff_new;
1350 }
1351
1352
1353 struct ReferendumEntry *
1354 rfn_create (uint16_t size)
1355 {
1356   struct ReferendumEntry *rfn;
1357
1358   rfn = GNUNET_new (struct ReferendumEntry);
1359   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1360   rfn->peer_commited = GNUNET_new_array (size, int);
1361   rfn->peer_contested = GNUNET_new_array (size, int);
1362   rfn->num_peers = size;
1363
1364   return rfn;
1365 }
1366
1367
1368 void
1369 diff_destroy (struct DiffEntry *diff)
1370 {
1371   GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1372   GNUNET_free (diff);
1373 }
1374
1375
1376 static void
1377 rfn_majority (const struct ReferendumEntry *rfn,
1378               const struct RfnElementInfo *ri,
1379               uint16_t *ret_majority,
1380               enum ReferendumVote *ret_vote)
1381 {
1382   uint16_t votes_yes = 0;
1383   uint16_t num_commited = 0;
1384   uint16_t i;
1385
1386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1387               "Computing rfn majority for element %s of rfn {%s}\n",
1388               debug_str_element (ri->element),
1389               debug_str_rfn_key (&rfn->key));
1390
1391   for (i = 0; i < rfn->num_peers; i++)
1392   {
1393     if (GNUNET_NO == rfn->peer_commited[i])
1394       continue;
1395     num_commited++;
1396
1397     if (GNUNET_YES == ri->votes[i])
1398       votes_yes++;
1399   }
1400
1401   if (votes_yes > (num_commited) / 2)
1402   {
1403     *ret_vote = ri->proposal;
1404     *ret_majority = votes_yes;
1405   }
1406   else
1407   {
1408     *ret_vote = VOTE_STAY;
1409     *ret_majority = num_commited - votes_yes;
1410   }
1411 }
1412
1413
1414 struct SetCopyCls
1415 {
1416   struct TaskEntry *task;
1417   struct SetKey dst_set_key;
1418 };
1419
1420
1421 static void
1422 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1423 {
1424   struct SetCopyCls *scc = cls;
1425   struct TaskEntry *task = scc->task;
1426   struct SetKey dst_set_key = scc->dst_set_key;
1427   struct SetEntry *set;
1428
1429   GNUNET_free (scc);
1430   set = GNUNET_new (struct SetEntry);
1431   set->h = copy;
1432   set->key = dst_set_key;
1433   put_set (task->step->session, set);
1434
1435   task->start (task);
1436 }
1437
1438
1439 /**
1440  * Call the start function of the given
1441  * task again after we created a copy of the given set.
1442  */
1443 static void
1444 create_set_copy_for_task (struct TaskEntry *task,
1445                           struct SetKey *src_set_key,
1446                           struct SetKey *dst_set_key)
1447 {
1448   struct SetEntry *src_set;
1449   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1450
1451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1452               "Copying set {%s} to {%s} for task {%s}\n",
1453               debug_str_set_key (src_set_key),
1454               debug_str_set_key (dst_set_key),
1455               debug_str_task_key (&task->key));
1456
1457   scc->task = task;
1458   scc->dst_set_key = *dst_set_key;
1459   src_set = lookup_set (task->step->session, src_set_key);
1460   GNUNET_assert (NULL != src_set);
1461   GNUNET_SET_copy_lazy (src_set->h,
1462                         set_copy_cb,
1463                         scc);
1464 }
1465
1466
1467 struct SetMutationProgressCls
1468 {
1469   int num_pending;
1470   /**
1471    * Task to finish once all changes are through.
1472    */
1473   struct TaskEntry *task;
1474 };
1475
1476
1477 static void
1478 set_mutation_done (void *cls)
1479 {
1480   struct SetMutationProgressCls *pc = cls;
1481
1482   GNUNET_assert (pc->num_pending > 0);
1483
1484   pc->num_pending--;
1485
1486   if (0 == pc->num_pending)
1487   {
1488     struct TaskEntry *task = pc->task;
1489     GNUNET_free (pc);
1490     finish_task (task);
1491   }
1492 }
1493
1494 static void
1495 task_start_apply_round (struct TaskEntry *task)
1496 {
1497   struct ConsensusSession *session = task->step->session;
1498   struct SetKey sk_in;
1499   struct SetKey sk_out;
1500   struct RfnKey rk_in;
1501   struct SetEntry *set_out;
1502   struct ReferendumEntry *rfn_in;
1503   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1504   struct RfnElementInfo *ri;
1505   struct SetMutationProgressCls *progress_cls;
1506
1507   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1508   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1509   sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1510
1511   set_out = lookup_set (session, &sk_out);
1512   if (NULL == set_out)
1513   {
1514     create_set_copy_for_task (task, &sk_in, &sk_out);
1515     return;
1516   }
1517
1518   rfn_in = lookup_rfn (session, &rk_in);
1519   GNUNET_assert (NULL != rfn_in);
1520
1521   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1522   progress_cls->task = task;
1523
1524   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1525
1526   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1527   {
1528     uint16_t majority_num;
1529     enum ReferendumVote majority_vote;
1530
1531     rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1532
1533     switch (majority_vote)
1534     {
1535       case VOTE_ADD:
1536         progress_cls->num_pending++;
1537         GNUNET_assert (GNUNET_OK ==
1538                        GNUNET_SET_add_element (set_out->h,
1539                                                ri->element,
1540                                                set_mutation_done,
1541                                                progress_cls));
1542         break;
1543       case VOTE_REMOVE:
1544         progress_cls->num_pending++;
1545         GNUNET_assert (GNUNET_OK ==
1546                        GNUNET_SET_remove_element (set_out->h,
1547                                                   ri->element,
1548                                                   set_mutation_done,
1549                                                   progress_cls));
1550         break;
1551       case VOTE_STAY:
1552         // do nothing
1553         break;
1554       default:
1555         GNUNET_assert (0);
1556         break;
1557     }
1558   }
1559
1560   if (progress_cls->num_pending == 0)
1561   {
1562     // call closure right now, no pending ops
1563     GNUNET_free (progress_cls);
1564     finish_task (task);
1565   }
1566 }
1567
1568
1569 #define THRESH(s) (((s)->num_peers / 3))
1570
1571
1572 static void
1573 task_start_grade (struct TaskEntry *task)
1574 {
1575   struct ConsensusSession *session = task->step->session;
1576   struct ReferendumEntry *output_rfn;
1577   struct ReferendumEntry *input_rfn;
1578   struct DiffEntry *input_diff;
1579   struct RfnKey rfn_key;
1580   struct DiffKey diff_key;
1581   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1582   struct RfnElementInfo *ri;
1583   unsigned int gradecast_confidence = 2;
1584
1585   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1586   output_rfn = lookup_rfn (session, &rfn_key);
1587   if (NULL == output_rfn)
1588   {
1589     output_rfn = rfn_create (session->num_peers);
1590     output_rfn->key = rfn_key;
1591     put_rfn (session, output_rfn);
1592   }
1593
1594   diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1595   input_diff = lookup_diff (session, &diff_key);
1596   GNUNET_assert (NULL != input_diff);
1597
1598   rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1599   input_rfn = lookup_rfn (session, &rfn_key);
1600   GNUNET_assert (NULL != input_rfn);
1601
1602   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1603
1604   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1605
1606   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1607   {
1608     uint16_t majority_num;
1609     enum ReferendumVote majority_vote;
1610
1611     // XXX: we need contested votes and non-contested votes here
1612     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1613
1614     if (majority_num < (session->num_peers / 3) * 2)
1615     {
1616       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1617     }
1618     if (majority_num < (session->num_peers / 3) + 1)
1619     {
1620       gradecast_confidence = 0;
1621     }
1622
1623     switch (majority_vote)
1624     {
1625       case VOTE_STAY:
1626         break;
1627       case VOTE_ADD:
1628         rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1629         break;
1630       case VOTE_REMOVE:
1631         rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1632         break;
1633       default:
1634         GNUNET_assert (0);
1635         break;
1636     }
1637   }
1638
1639   {
1640     uint16_t noncontested;
1641     noncontested = rfn_noncontested (input_rfn);
1642     if (noncontested < (session->num_peers / 3) * 2)
1643     {
1644       gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1645     }
1646     if (noncontested < (session->num_peers / 3) + 1)
1647     {
1648       gradecast_confidence = 0;
1649     }
1650   }
1651
1652   if (gradecast_confidence >= 1)
1653     rfn_commit (output_rfn, task->key.leader);
1654
1655   if (gradecast_confidence <= 1)
1656     session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1657
1658   finish_task (task);
1659 }
1660
1661
1662 static void
1663 task_start_reconcile (struct TaskEntry *task)
1664 {
1665   struct SetEntry *input;
1666   struct SetOpCls *setop = &task->cls.setop;
1667   struct ConsensusSession *session = task->step->session;
1668
1669   input = lookup_set (session, &setop->input_set);
1670   GNUNET_assert (NULL != input);
1671   GNUNET_assert (NULL != input->h);
1672
1673   /* We create the outputs for the operation here
1674      (rather than in the set operation callback)
1675      because we want something valid in there, even
1676      if the other peer doesn't talk to us */
1677
1678   if (SET_KIND_NONE != setop->output_set.set_kind)
1679   {
1680     /* If we don't have an existing output set,
1681        we clone the input set. */
1682     if (NULL == lookup_set (session, &setop->output_set))
1683     {
1684       create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1685       return;
1686     }
1687   }
1688
1689   if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1690   {
1691     if (NULL == lookup_rfn (session, &setop->output_rfn))
1692     {
1693       struct ReferendumEntry *rfn;
1694
1695       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1696                   "P%u: output rfn <%s> missing, creating.\n",
1697                   session->local_peer_idx,
1698                   debug_str_rfn_key (&setop->output_rfn));
1699
1700       rfn = rfn_create (session->num_peers);
1701       rfn->key = setop->output_rfn;
1702       put_rfn (session, rfn);
1703     }
1704   }
1705
1706   if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1707   {
1708     if (NULL == lookup_diff (session, &setop->output_diff))
1709     {
1710       struct DiffEntry *diff;
1711
1712       diff = diff_create ();
1713       diff->key = setop->output_diff;
1714       put_diff (session, diff);
1715     }
1716   }
1717
1718   if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
1719   {
1720     /* XXX: mark the corresponding rfn as commited if necessary */
1721     finish_task (task);
1722     return;
1723   }
1724
1725   if (task->key.peer1 == session->local_peer_idx)
1726   {
1727     struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
1728
1729     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1730                 "P%u: Looking up set {%s} to run remote union\n",
1731                 session->local_peer_idx,
1732                 debug_str_set_key (&setop->input_set));
1733
1734     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
1735     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
1736
1737     rcm.kind = htons (task->key.kind);
1738     rcm.peer1 = htons (task->key.peer1);
1739     rcm.peer2 = htons (task->key.peer2);
1740     rcm.leader = htons (task->key.leader);
1741     rcm.repetition = htons (task->key.repetition);
1742
1743     GNUNET_assert (NULL == setop->op);
1744     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
1745                 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
1746
1747     // XXX: maybe this should be done while
1748     // setting up tasks alreays?
1749     setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
1750                                     &session->global_id,
1751                                     &rcm.header,
1752                                     GNUNET_SET_RESULT_SYMMETRIC,
1753                                     set_result_cb,
1754                                     task);
1755
1756     if (GNUNET_OK != GNUNET_SET_commit (setop->op, input->h))
1757     {
1758       GNUNET_break (0);
1759       /* XXX: cleanup? */
1760       return;
1761     }
1762   }
1763   else if (task->key.peer2 == session->local_peer_idx)
1764   {
1765     /* Wait for the other peer to contact us */
1766     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
1767                 session->local_peer_idx, task->key.peer1);
1768
1769     if (NULL != setop->op)
1770     {
1771       commit_set (session, task);
1772     }
1773   }
1774   else
1775   {
1776     /* We made an error while constructing the task graph. */
1777     GNUNET_assert (0);
1778   }
1779 }
1780
1781
1782 static void
1783 task_start_eval_echo (struct TaskEntry *task)
1784 {
1785   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1786   struct ReferendumEntry *input_rfn;
1787   struct RfnElementInfo *ri;
1788   struct SetEntry *output_set;
1789   struct SetMutationProgressCls *progress_cls;
1790   struct ConsensusSession *session = task->step->session;
1791   struct SetKey sk_in;
1792   struct SetKey sk_out;
1793   struct RfnKey rk_in;
1794
1795   sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1796   sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
1797   output_set = lookup_set (session, &sk_out);
1798   if (NULL == output_set)
1799   {
1800     create_set_copy_for_task (task, &sk_in, &sk_out);
1801     return;
1802   }
1803
1804   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1805               "Evaluating referendum in Task {%s}\n",
1806               debug_str_task_key (&task->key));
1807
1808   progress_cls = GNUNET_new (struct SetMutationProgressCls);
1809   progress_cls->task = task;
1810
1811   rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1812   input_rfn = lookup_rfn (session, &rk_in);
1813
1814   GNUNET_assert (NULL != input_rfn);
1815
1816   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1817   GNUNET_assert (NULL != iter);
1818
1819   while (GNUNET_YES ==
1820          GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1821                                                       NULL,
1822                                                       (const void **) &ri))
1823   {
1824     enum ReferendumVote majority_vote;
1825     uint16_t majority_num;
1826
1827     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1828
1829     if (majority_num < session->num_peers / 3)
1830     {
1831       /* It is not the case that all nonfaulty peers
1832          echoed the same value.  Since we're doing a set reconciliation, we
1833          can't simply send "nothing" for the value.  Thus we mark our 'confirm'
1834          reconciliation as contested.  Other peers might not know that the
1835          leader is faulty, thus we still re-distribute in the confirmation
1836          round. */
1837       output_set->is_contested = GNUNET_YES;
1838     }
1839
1840     switch (majority_vote)
1841     {
1842       case VOTE_ADD:
1843         progress_cls->num_pending++;
1844         GNUNET_assert (GNUNET_OK ==
1845                        GNUNET_SET_add_element (output_set->h,
1846                                                ri->element,
1847                                                set_mutation_done,
1848                                                progress_cls));
1849         break;
1850       case VOTE_REMOVE:
1851         progress_cls->num_pending++;
1852         GNUNET_assert (GNUNET_OK ==
1853                        GNUNET_SET_remove_element (output_set->h,
1854                                                   ri->element,
1855                                                   set_mutation_done,
1856                                                   progress_cls));
1857         break;
1858       case VOTE_STAY:
1859         /* Nothing to do. */
1860         break;
1861       default:
1862         /* not reached */
1863         GNUNET_assert (0);
1864     }
1865   }
1866
1867   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1868
1869   if (progress_cls->num_pending == 0)
1870   {
1871     // call closure right now, no pending ops
1872     GNUNET_free (progress_cls);
1873     finish_task (task);
1874   }
1875 }
1876
1877
1878 static void
1879 task_start_finish (struct TaskEntry *task)
1880 {
1881   struct SetEntry *final_set;
1882   struct ConsensusSession *session = task->step->session;
1883
1884   final_set = lookup_set (session, &task->cls.finish.input_set);
1885
1886   GNUNET_assert (NULL != final_set);
1887
1888
1889   GNUNET_SET_iterate (final_set->h,
1890                       send_to_client_iter,
1891                       task);
1892 }
1893
1894 static void
1895 start_task (struct ConsensusSession *session, struct TaskEntry *task)
1896 {
1897   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
1898
1899   GNUNET_assert (GNUNET_NO == task->is_started);
1900   GNUNET_assert (GNUNET_NO == task->is_finished);
1901   GNUNET_assert (NULL != task->start);
1902
1903   task->start (task);
1904
1905   task->is_started = GNUNET_YES;
1906 }
1907
1908
1909 static void finish_step (struct Step *step)
1910 {
1911   unsigned int i;
1912
1913   GNUNET_assert (step->finished_tasks == step->tasks_len);
1914   GNUNET_assert (GNUNET_YES == step->is_running);
1915   GNUNET_assert (GNUNET_NO == step->is_finished);
1916
1917 #ifdef GNUNET_EXTRA_LOGGING
1918   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919               "All tasks of step `%s' with %u subordinates finished.\n",
1920               step->debug_name,
1921               step->subordinates_len);
1922 #endif
1923
1924   for (i = 0; i < step->subordinates_len; i++)
1925   {
1926     GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1927     step->subordinates[i]->pending_prereq--;
1928 #ifdef GNUNET_EXTRA_LOGGING
1929     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1930                 "Decreased pending_prereq to %u for step `%s'.\n",
1931                 step->subordinates[i]->pending_prereq,
1932                 step->subordinates[i]->debug_name);
1933
1934 #endif
1935   }
1936
1937   step->is_finished = GNUNET_YES;
1938
1939   // XXX: maybe schedule as task to avoid recursion?
1940   run_ready_steps (step->session);
1941 }
1942
1943
1944 /*
1945  * Run all steps of the session that don't any
1946  * more dependencies.
1947  */
1948 static void
1949 run_ready_steps (struct ConsensusSession *session)
1950 {
1951   struct Step *step;
1952
1953   step = session->steps_head;
1954
1955   while (NULL != step)
1956   {
1957     if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) )
1958     {
1959       size_t i;
1960
1961       GNUNET_assert (0 == step->finished_tasks);
1962
1963 #ifdef GNUNET_EXTRA_LOGGING
1964       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
1965                   session->local_peer_idx,
1966                   step->debug_name,
1967                   step->round, step->tasks_len, step->subordinates_len);
1968 #endif
1969
1970       step->is_running = GNUNET_YES;
1971       for (i = 0; i < step->tasks_len; i++)
1972         start_task (session, step->tasks[i]);
1973
1974       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
1975       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
1976         finish_step (step);
1977
1978       /* Running the next ready steps will be triggered by task completion */
1979       return;
1980     }
1981     step = step->next;
1982   }
1983
1984   return;
1985 }
1986
1987
1988
1989 static void
1990 finish_task (struct TaskEntry *task)
1991 {
1992   GNUNET_assert (GNUNET_NO == task->is_finished);
1993   task->is_finished = GNUNET_YES;
1994
1995   task->step->finished_tasks++;
1996
1997   if (task->step->finished_tasks == task->step->tasks_len)
1998     finish_step (task->step);
1999 }
2000
2001
2002 /**
2003  * Search peer in the list of peers in session.
2004  *
2005  * @param peer peer to find
2006  * @param session session with peer
2007  * @return index of peer, -1 if peer is not in session
2008  */
2009 static int
2010 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2011 {
2012   int i;
2013   for (i = 0; i < session->num_peers; i++)
2014     if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2015       return i;
2016   return -1;
2017 }
2018
2019
2020 /**
2021  * Compute a global, (hopefully) unique consensus session id,
2022  * from the local id of the consensus session, and the identities of all participants.
2023  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2024  * exactly the same peers, the global id will be different.
2025  *
2026  * @param session session to generate the global id for
2027  * @param local_session_id local id of the consensus session
2028  */
2029 static void
2030 compute_global_id (struct ConsensusSession *session,
2031                    const struct GNUNET_HashCode *local_session_id)
2032 {
2033   const char *salt = "gnunet-service-consensus/session_id";
2034
2035   GNUNET_assert (GNUNET_YES ==
2036                  GNUNET_CRYPTO_kdf (&session->global_id,
2037                                     sizeof (struct GNUNET_HashCode),
2038                                     salt,
2039                                     strlen (salt),
2040                                     session->peers,
2041                                     session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2042                                     local_session_id,
2043                                     sizeof (struct GNUNET_HashCode),
2044                                     NULL));
2045 }
2046
2047
2048 /**
2049  * Compare two peer identities.
2050  *
2051  * @param h1 some peer identity
2052  * @param h2 some peer identity
2053  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2054  */
2055 static int
2056 peer_id_cmp (const void *h1, const void *h2)
2057 {
2058   return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2059 }
2060
2061
2062 /**
2063  * Create the sorted list of peers for the session,
2064  * add the local peer if not in the join message.
2065  */
2066 static void
2067 initialize_session_peer_list (struct ConsensusSession *session,
2068                               struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2069 {
2070   unsigned int local_peer_in_list;
2071   uint32_t listed_peers;
2072   const struct GNUNET_PeerIdentity *msg_peers;
2073   unsigned int i;
2074
2075   GNUNET_assert (NULL != join_msg);
2076
2077   /* peers in the join message, may or may not include the local peer */
2078   listed_peers = ntohl (join_msg->num_peers);
2079
2080   session->num_peers = listed_peers;
2081
2082   msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2083
2084   local_peer_in_list = GNUNET_NO;
2085   for (i = 0; i < listed_peers; i++)
2086   {
2087     if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2088     {
2089       local_peer_in_list = GNUNET_YES;
2090       break;
2091     }
2092   }
2093
2094   if (GNUNET_NO == local_peer_in_list)
2095     session->num_peers++;
2096
2097   session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2098
2099   if (GNUNET_NO == local_peer_in_list)
2100     session->peers[session->num_peers - 1] = my_peer;
2101
2102   memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2103   qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
2104 }
2105
2106
2107 static struct TaskEntry *
2108 lookup_task (struct ConsensusSession *session,
2109              struct TaskKey *key)
2110 {
2111   struct GNUNET_HashCode hash;
2112
2113
2114   GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2115   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2116               GNUNET_h2s (&hash));
2117   return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2118 }
2119
2120
2121 /**
2122  * Called when another peer wants to do a set operation with the
2123  * local peer.
2124  *
2125  * @param cls closure
2126  * @param other_peer the other peer
2127  * @param context_msg message with application specific information from
2128  *        the other peer
2129  * @param request request from the other peer, use GNUNET_SET_accept
2130  *        to accept it, otherwise the request will be refused
2131  *        Note that we don't use a return value here, as it is also
2132  *        necessary to specify the set we want to do the operation with,
2133  *        whith sometimes can be derived from the context message.
2134  *        Also necessary to specify the timeout.
2135  */
2136 static void
2137 set_listen_cb (void *cls,
2138                const struct GNUNET_PeerIdentity *other_peer,
2139                const struct GNUNET_MessageHeader *context_msg,
2140                struct GNUNET_SET_Request *request)
2141 {
2142   struct ConsensusSession *session = cls;
2143   struct TaskKey tk;
2144   struct TaskEntry *task;
2145   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2146
2147   if (NULL == context_msg)
2148   {
2149     GNUNET_break_op (0);
2150     return;
2151   }
2152
2153   if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2154   {
2155     GNUNET_break_op (0);
2156     return;
2157   }
2158
2159   if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2160   {
2161     GNUNET_break_op (0);
2162     return;
2163   }
2164
2165   cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2166
2167   tk = ((struct TaskKey) {
2168       .kind = ntohs (cm->kind),
2169       .peer1 = ntohs (cm->peer1),
2170       .peer2 = ntohs (cm->peer2),
2171       .repetition = ntohs (cm->repetition),
2172       .leader = ntohs (cm->leader),
2173   });
2174
2175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2176               session->local_peer_idx, debug_str_task_key (&tk));
2177
2178   task = lookup_task (session, &tk);
2179
2180   if (NULL == task)
2181   {
2182     GNUNET_break_op (0);
2183     return;
2184   }
2185
2186   if (GNUNET_YES == task->is_finished)
2187   {
2188     GNUNET_break_op (0);
2189     return;
2190   }
2191
2192   if (task->key.peer2 != session->local_peer_idx)
2193   {
2194     /* We're being asked, so we must be thne 2nd peer. */
2195     GNUNET_break_op (0);
2196     return;
2197   }
2198
2199   GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2200                     (task->key.peer2 == session->local_peer_idx)));
2201
2202   task->cls.setop.op = GNUNET_SET_accept (request,
2203                                           GNUNET_SET_RESULT_SYMMETRIC,
2204                                           set_result_cb,
2205                                           task);
2206   
2207   /* If the task hasn't been started yet, 
2208      we wait for that until we commit. */
2209
2210   if (GNUNET_YES == task->is_started)
2211   {
2212     commit_set (session, task);
2213   }
2214 }
2215
2216
2217
2218 static void
2219 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2220           struct TaskEntry *t)
2221 {
2222   struct GNUNET_HashCode round_hash;
2223   struct Step *s;
2224
2225   GNUNET_assert (NULL != t->step);
2226
2227   t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2228
2229   s = t->step;
2230
2231   if (s->tasks_len == s->tasks_cap)
2232   {
2233     unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2234     GNUNET_array_grow (s->tasks,
2235                        s->tasks_cap,
2236                        target_size);
2237   }
2238
2239 #ifdef GNUNET_EXTRA_LOGGING
2240   GNUNET_assert (NULL != s->debug_name);
2241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2242               debug_str_task_key (&t->key),
2243               s->debug_name);
2244 #endif
2245
2246   s->tasks[s->tasks_len] = t;
2247   s->tasks_len++;
2248
2249   GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2250   GNUNET_assert (GNUNET_OK ==
2251       GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2252                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2253 }
2254
2255
2256 static void
2257 install_step_timeouts (struct ConsensusSession *session)
2258 {
2259   /* Given the fully constructed task graph
2260      with rounds for tasks, we can give the tasks timeouts. */
2261
2262   /* XXX: implement! */
2263 }
2264
2265
2266
2267 /*
2268  * Arrange two peers in some canonical order.
2269  */
2270 static void
2271 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2272 {
2273   uint16_t a;
2274   uint16_t b;
2275
2276   GNUNET_assert (*p1 < n);
2277   GNUNET_assert (*p2 < n);
2278
2279   if (*p1 < *p2)
2280   {
2281     a = *p1;
2282     b = *p2;
2283   }
2284   else
2285   {
2286     a = *p2;
2287     b = *p1;
2288   }
2289
2290   /* For uniformly random *p1, *p2,
2291      this condition is true with 50% chance */
2292   if (((b - a) + n) % n <= n / 2)
2293   {
2294     *p1 = a;
2295     *p2 = b;
2296   }
2297   else
2298   {
2299     *p1 = b;
2300     *p2 = a;
2301   }
2302 }
2303
2304
2305 /**
2306  * Record @a dep as a dependency of @step.
2307  */
2308 static void
2309 step_depend_on (struct Step *step, struct Step *dep)
2310 {
2311   /* We're not checking for cyclic dependencies,
2312      but this is a cheap sanity check. */
2313   GNUNET_assert (step != dep);
2314   GNUNET_assert (NULL != step);
2315   GNUNET_assert (NULL != dep);
2316   GNUNET_assert (dep->round <= step->round);
2317
2318 #ifdef GNUNET_EXTRA_LOGGING
2319   /* Make sure we have complete debugging information.
2320      Also checks that we don't screw up too badly
2321      constructing the task graph. */
2322   GNUNET_assert (NULL != step->debug_name);
2323   GNUNET_assert (NULL != dep->debug_name);
2324   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325               "Making step `%s' depend on `%s'\n",
2326               step->debug_name,
2327               dep->debug_name);
2328 #endif
2329
2330   if (dep->subordinates_cap == dep->subordinates_len)
2331   {
2332     unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2333     GNUNET_array_grow (dep->subordinates,
2334                        dep->subordinates_cap,
2335                        target_size);
2336   }
2337
2338   GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2339
2340   dep->subordinates[dep->subordinates_len] = step;
2341   dep->subordinates_len++;
2342
2343   step->pending_prereq++;
2344 }
2345
2346
2347 static struct Step *
2348 create_step (struct ConsensusSession *session, int round)
2349 {
2350   struct Step *step;
2351   step = GNUNET_new (struct Step);
2352   step->session = session;
2353   step->round = round;
2354   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2355                                     session->steps_tail,
2356                                     step);
2357   return step;
2358 }
2359
2360
2361 /**
2362  * Construct the task graph for a single
2363  * gradecast.
2364  */
2365 static void
2366 construct_task_graph_gradecast (struct ConsensusSession *session,
2367                                 uint16_t rep,
2368                                 uint16_t lead,
2369                                 struct Step *step_before,
2370                                 struct Step *step_after)
2371 {
2372   uint16_t n = session->num_peers;
2373   uint16_t me = session->local_peer_idx;
2374
2375   uint16_t p1;
2376   uint16_t p2;
2377
2378   /* The task we're currently setting up. */
2379   struct TaskEntry task;
2380
2381   struct Step *step;
2382   struct Step *prev_step;
2383
2384   uint16_t round;
2385
2386   unsigned int k;
2387
2388   round = step_before->round + 1;
2389
2390   /* gcast step 1: leader disseminates */
2391
2392   step = create_step (session, round);
2393
2394 #ifdef GNUNET_EXTRA_LOGGING
2395   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2396 #endif
2397   step_depend_on (step, step_before);
2398
2399   if (lead == me)
2400   {
2401     for (k = 0; k < n; k++)
2402     {
2403       if (k == me)
2404         continue;
2405       p1 = me;
2406       p2 = k;
2407       arrange_peers (&p1, &p2, n);
2408       task = ((struct TaskEntry) {
2409         .step = step,
2410         .start = task_start_reconcile,
2411         .cancel = task_cancel_reconcile,
2412         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2413       });
2414       task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2415       put_task (session->taskmap, &task);
2416     }
2417     /* We run this task to make sure that the leader
2418        has the stored the SET_KIND_LEADER set of himself,
2419        so he can participate in the rest of the gradecast
2420        without the code having to handle any special cases. */
2421     task = ((struct TaskEntry) {
2422       .step = step,
2423       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2424       .start = task_start_reconcile,
2425       .cancel = task_cancel_reconcile,
2426     });
2427     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2428     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2429     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2430     put_task (session->taskmap, &task);
2431   }
2432   else
2433   {
2434     p1 = me;
2435     p2 = lead;
2436     arrange_peers (&p1, &p2, n);
2437     task = ((struct TaskEntry) {
2438       .step = step,
2439       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead},
2440       .start = task_start_reconcile,
2441       .cancel = task_cancel_reconcile,
2442     });
2443     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2444     task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2445     task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2446     put_task (session->taskmap, &task);
2447   }
2448
2449   /* gcast phase 2: echo */
2450   prev_step = step;
2451   round += 1;
2452   step = create_step (session, round);
2453 #ifdef GNUNET_EXTRA_LOGGING
2454   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2455 #endif
2456   step_depend_on (step, prev_step);
2457
2458   for (k = 0; k < n; k++)
2459   {
2460     p1 = k;
2461     p2 = me;
2462     arrange_peers (&p1, &p2, n);
2463     task = ((struct TaskEntry) {
2464       .step = step,
2465       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2466       .start = task_start_reconcile,
2467       .cancel = task_cancel_reconcile,
2468     });
2469     task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2470     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2471     put_task (session->taskmap, &task);
2472   }
2473
2474   prev_step = step;
2475   /* Same round, since step only has local tasks */
2476   step = create_step (session, round);
2477 #ifdef GNUNET_EXTRA_LOGGING
2478   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2479 #endif
2480   step_depend_on (step, prev_step);
2481
2482   arrange_peers (&p1, &p2, n);
2483   task = ((struct TaskEntry) {
2484     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2485     .step = step,
2486     .start = task_start_eval_echo
2487   });
2488   put_task (session->taskmap, &task);
2489
2490   prev_step = step;
2491   round += 1;
2492   step = create_step (session, round);
2493 #ifdef GNUNET_EXTRA_LOGGING
2494   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2495 #endif
2496   step_depend_on (step, prev_step);
2497
2498   /* gcast phase 3: confirmation and grading */
2499   for (k = 0; k < n; k++)
2500   {
2501     p1 = k;
2502     p2 = me;
2503     arrange_peers (&p1, &p2, n);
2504     task = ((struct TaskEntry) {
2505       .step = step,
2506       .start = task_start_reconcile,
2507       .cancel = task_cancel_reconcile,
2508       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2509     });
2510     task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2511     task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2512     /* If there was at least one element in the echo round that was
2513        contested (i.e. it had no n-t majority), then we let the other peers
2514        know, and other peers let us know.  The contested flag for each peer is
2515        stored in the rfn. */
2516     task.cls.setop.transceive_contested = GNUNET_YES;
2517     put_task (session->taskmap, &task);
2518   }
2519
2520   prev_step = step;
2521   /* Same round, since step only has local tasks */
2522   step = create_step (session, round);
2523 #ifdef GNUNET_EXTRA_LOGGING
2524   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2525 #endif
2526   step_depend_on (step, prev_step);
2527
2528   task = ((struct TaskEntry) {
2529     .step = step,
2530     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2531     .start = task_start_grade,
2532   });
2533   put_task (session->taskmap, &task);
2534
2535   step_depend_on (step_after, step);
2536 }
2537
2538
2539 static void
2540 construct_task_graph (struct ConsensusSession *session)
2541 {
2542   uint16_t n = session->num_peers;
2543   uint16_t t = n / 3;
2544
2545   uint16_t me = session->local_peer_idx;
2546
2547   uint16_t p1;
2548   uint16_t p2;
2549
2550   /* The task we're currently setting up. */
2551   struct TaskEntry task;
2552
2553   /* Current leader */
2554   unsigned int lead;
2555
2556   struct Step *step;
2557   struct Step *prev_step;
2558
2559   unsigned int round = 0;
2560
2561   unsigned int i;
2562
2563   // XXX: introduce first step,
2564   // where we wait for all insert acks
2565   // from the set service
2566   
2567   /* faster but brittle all-to-all */
2568
2569   // XXX: Not implemented yet
2570
2571   /* all-to-all step */
2572
2573   step = create_step (session, round);
2574
2575 #ifdef GNUNET_EXTRA_LOGGING
2576   step->debug_name = GNUNET_strdup ("all to all");
2577 #endif
2578
2579   for (i = 0; i < n; i++)
2580   {
2581     p1 = me;
2582     p2 = i;
2583     arrange_peers (&p1, &p2, n);
2584     task = ((struct TaskEntry) {
2585       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2586       .step = step,
2587       .start = task_start_reconcile,
2588       .cancel = task_cancel_reconcile,
2589     });
2590     task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2591     task.cls.setop.output_set = task.cls.setop.input_set;
2592     task.cls.setop.do_not_remove = GNUNET_YES;
2593     put_task (session->taskmap, &task);
2594   }
2595
2596   prev_step = step;
2597   step = NULL;
2598
2599   round += 1;
2600
2601   /* Byzantine union */
2602
2603   /* sequential repetitions of the gradecasts */
2604   for (i = 0; i < t + 1; i++)
2605   {
2606     struct Step *step_rep_start;
2607     struct Step *step_rep_end;
2608
2609     /* Every repetition is in a separate round. */
2610     step_rep_start = create_step (session, round);
2611 #ifdef GNUNET_EXTRA_LOGGING
2612     GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2613 #endif
2614
2615     step_depend_on (step_rep_start, prev_step);
2616
2617     /* gradecast has three rounds */
2618     round += 3;
2619     step_rep_end = create_step (session, round);
2620 #ifdef GNUNET_EXTRA_LOGGING
2621     GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2622 #endif
2623
2624     /* parallel gradecasts */
2625     for (lead = 0; lead < n; lead++)
2626       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2627
2628     task = ((struct TaskEntry) {
2629       .step = step_rep_end,
2630       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2631       .start = task_start_apply_round,
2632     });
2633     put_task (session->taskmap, &task);
2634
2635     prev_step = step_rep_end;
2636   }
2637
2638  /* There is no next gradecast round, thus the final
2639     start step is the overall end step of the gradecasts */
2640   round += 1;
2641   step = create_step (session, round);
2642 #ifdef GNUNET_EXTRA_LOGGING
2643   GNUNET_asprintf (&step->debug_name, "finish");
2644 #endif
2645   step_depend_on (step, prev_step);
2646
2647   task = ((struct TaskEntry) {
2648     .step = step,
2649     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2650     .start = task_start_finish,
2651   });
2652   task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 };
2653
2654   put_task (session->taskmap, &task);
2655 }
2656
2657
2658 /**
2659  * Initialize the session, continue receiving messages from the owning client
2660  *
2661  * @param session the session to initialize
2662  * @param join_msg the join message from the client
2663  */
2664 static void
2665 initialize_session (struct ConsensusSession *session,
2666                     struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2667 {
2668   struct ConsensusSession *other_session;
2669
2670   initialize_session_peer_list (session, join_msg);
2671   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
2672   compute_global_id (session, &join_msg->session_id);
2673
2674   /* Check if some local client already owns the session.
2675      It is only legal to have a session with an existing global id
2676      if all other sessions with this global id are finished.*/
2677   other_session = sessions_head;
2678   while (NULL != other_session)
2679   {
2680     if ((other_session != session) &&
2681         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2682     {
2683       //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2684       //{
2685       //  GNUNET_break (0);
2686       //  destroy_session (session);
2687       //  return;
2688       //}
2689       break;
2690     }
2691     other_session = other_session->next;
2692   }
2693
2694   session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
2695   session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
2696
2697   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %ums created\n",
2698               (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
2699
2700   session->local_peer_idx = get_peer_idx (&my_peer, session);
2701   GNUNET_assert (-1 != session->local_peer_idx);
2702   session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2703                                              &session->global_id,
2704                                              set_listen_cb, session);
2705   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2706
2707   session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2708   session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2709   session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2710   session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2711
2712   {
2713     struct SetEntry *client_set;
2714     client_set = GNUNET_new (struct SetEntry);
2715     client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2716     client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2717     put_set (session, client_set);
2718   }
2719
2720   session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2721
2722   /* Just construct the task graph,
2723      but don't run anything until the client calls conclude. */
2724   construct_task_graph (session);
2725
2726   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2727 }
2728
2729
2730 static struct ConsensusSession *
2731 get_session_by_client (struct GNUNET_SERVER_Client *client)
2732 {
2733   struct ConsensusSession *session;
2734
2735   session = sessions_head;
2736   while (NULL != session)
2737   {
2738     if (session->client == client)
2739       return session;
2740     session = session->next;
2741   }
2742   return NULL;
2743 }
2744
2745
2746 /**
2747  * Called when a client wants to join a consensus session.
2748  *
2749  * @param cls unused
2750  * @param client client that sent the message
2751  * @param m message sent by the client
2752  */
2753 static void
2754 client_join (void *cls,
2755              struct GNUNET_SERVER_Client *client,
2756              const struct GNUNET_MessageHeader *m)
2757 {
2758   struct ConsensusSession *session;
2759
2760   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
2761
2762   session = get_session_by_client (client);
2763   if (NULL != session)
2764   {
2765     GNUNET_break (0);
2766     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2767     return;
2768   }
2769   session = GNUNET_new (struct ConsensusSession);
2770   session->client = client;
2771   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
2772   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
2773   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
2774   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2775
2776   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
2777 }
2778
2779
2780 static void
2781 client_insert_done (void *cls)
2782 {
2783   // FIXME: implement
2784 }
2785
2786
2787 /**
2788  * Called when a client performs an insert operation.
2789  *
2790  * @param cls (unused)
2791  * @param client client handle
2792  * @param m message sent by the client
2793  */
2794 void
2795 client_insert (void *cls,
2796                struct GNUNET_SERVER_Client *client,
2797                const struct GNUNET_MessageHeader *m)
2798 {
2799   struct ConsensusSession *session;
2800   struct GNUNET_CONSENSUS_ElementMessage *msg;
2801   struct GNUNET_SET_Element *element;
2802   ssize_t element_size;
2803   struct GNUNET_SET_Handle *initial_set;
2804
2805   session = get_session_by_client (client);
2806
2807   if (NULL == session)
2808   {
2809     GNUNET_break (0);
2810     GNUNET_SERVER_client_disconnect (client);
2811     return;
2812   }
2813
2814   if (GNUNET_YES == session->conclude_started)
2815   {
2816     GNUNET_break (0);
2817     GNUNET_SERVER_client_disconnect (client);
2818     return;
2819   }
2820
2821   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
2822   element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
2823   if (element_size < 0)
2824   {
2825     GNUNET_break (0);
2826     return;
2827   }
2828
2829   element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
2830   element->element_type = msg->element_type;
2831   element->size = element_size;
2832   memcpy (&element[1], &msg[1], element_size);
2833   element->data = &element[1];
2834   {
2835     struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
2836     struct SetEntry *entry;
2837     entry = lookup_set (session, &key);
2838     GNUNET_assert (NULL != entry);
2839     initial_set = entry->h;
2840   }
2841   session->num_client_insert_pending++;
2842   GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
2843
2844 #ifdef GNUNET_EXTRA_LOGGING
2845   {
2846     struct GNUNET_HashCode hash;
2847
2848     GNUNET_SET_element_hash (element, &hash);
2849
2850     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
2851                 session->local_peer_idx,
2852                 GNUNET_h2s (&hash));
2853   }
2854 #endif
2855
2856   GNUNET_free (element);
2857   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2858 }
2859
2860
2861 /**
2862  * Called when a client performs the conclude operation.
2863  *
2864  * @param cls (unused)
2865  * @param client client handle
2866  * @param message message sent by the client
2867  */
2868 static void
2869 client_conclude (void *cls,
2870                  struct GNUNET_SERVER_Client *client,
2871                  const struct GNUNET_MessageHeader *message)
2872 {
2873   struct ConsensusSession *session;
2874
2875   session = get_session_by_client (client);
2876   if (NULL == session)
2877   {
2878     /* client not found */
2879     GNUNET_break (0);
2880     GNUNET_SERVER_client_disconnect (client);
2881     return;
2882   }
2883
2884   if (GNUNET_YES == session->conclude_started)
2885   {
2886     /* conclude started twice */
2887     GNUNET_break (0);
2888     GNUNET_SERVER_client_disconnect (client);
2889     destroy_session (session);
2890     return;
2891   }
2892
2893   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
2894
2895   session->conclude_started = GNUNET_YES;
2896
2897   install_step_timeouts (session);
2898   run_ready_steps (session);
2899
2900
2901   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2902 }
2903
2904
2905 /**
2906  * Called to clean up, after a shutdown has been requested.
2907  *
2908  * @param cls closure
2909  * @param tc context information (why was this task triggered now)
2910  */
2911 static void
2912 shutdown_task (void *cls,
2913                const struct GNUNET_SCHEDULER_TaskContext *tc)
2914 {
2915   while (NULL != sessions_head)
2916     destroy_session (sessions_head);
2917
2918   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
2919 }
2920
2921
2922 /**
2923  * Clean up after a client after it is
2924  * disconnected (either by us or by itself)
2925  *
2926  * @param cls closure, unused
2927  * @param client the client to clean up after
2928  */
2929 void
2930 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
2931 {
2932   struct ConsensusSession *session;
2933
2934   session = get_session_by_client (client);
2935   if (NULL == session)
2936     return;
2937   // FIXME: destroy if we can
2938 }
2939
2940
2941
2942 /**
2943  * Start processing consensus requests.
2944  *
2945  * @param cls closure
2946  * @param server the initialized server
2947  * @param c configuration to use
2948  */
2949 static void
2950 run (void *cls, struct GNUNET_SERVER_Handle *server,
2951      const struct GNUNET_CONFIGURATION_Handle *c)
2952 {
2953   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2954     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
2955         sizeof (struct GNUNET_MessageHeader)},
2956     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
2957     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
2958     {NULL, NULL, 0, 0}
2959   };
2960
2961   cfg = c;
2962   srv = server;
2963   if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
2964   {
2965     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
2966     GNUNET_break (0);
2967     GNUNET_SCHEDULER_shutdown ();
2968     return;
2969   }
2970   GNUNET_SERVER_add_handlers (server, server_handlers);
2971   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
2972   GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
2973   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
2974 }
2975
2976
2977 /**
2978  * The main function for the consensus service.
2979  *
2980  * @param argc number of arguments from the command line
2981  * @param argv command line arguments
2982  * @return 0 ok, 1 on error
2983  */
2984 int
2985 main (int argc, char *const *argv)
2986 {
2987   int ret;
2988   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
2989   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
2990   return (GNUNET_OK == ret) ? 0 : 1;
2991 }
2992