implemented the modified consensus api, started implementing p2p protocol for consensus
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21
22 /**
23  * @file consensus/gnunet-service-consensus.c
24  * @brief 
25  * @author Florian Dold
26  */
27
28 #include "platform.h"
29 #include "gnunet_common.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_consensus_service.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_mesh_service.h"
36 #include "consensus.h"
37
38
39 struct ConsensusSession;
40
41 static void
42 send_next (struct ConsensusSession *session);
43
44
45 /**
46  * An element that is waiting to be transmitted to a client.
47  */
48 struct PendingElement
49 {
50   /**
51    * Pending elements are kept in a DLL.
52    */
53   struct PendingElement *next;
54
55   /**
56    * Pending elements are kept in a DLL.
57    */
58   struct PendingElement *prev;
59
60   /**
61    * The actual element
62    */
63   struct GNUNET_CONSENSUS_Element *element;
64 };
65
66
67 /*
68  * A peer that is also in a consensus session.
69  * Note that 'this' peer is not in the list.
70  */
71 struct ConsensusPeer
72 {
73   struct GNUNET_PeerIdentity *peer_id;
74
75   /**
76    * Incoming tunnel from the peer.
77    */
78   struct GNUNET_MESH_Tunnel *incoming_tunnel;
79
80   struct InvertibleBloomFilter *last_ibf;
81
82 };
83
84
85 /**
86  * A consensus session consists of one local client and the remote authorities.
87  */
88 struct ConsensusSession
89 {
90   /**
91    * Consensus sessions are kept in a DLL.
92    */
93   struct ConsensusSession *next;
94
95   /**
96    * Consensus sessions are kept in a DLL.
97    */
98   struct ConsensusSession *prev;
99
100   /**
101    * Local consensus identification, chosen by clients.
102    */
103   struct GNUNET_HashCode *local_id;
104  
105   /**
106   * Global consensus identification, computed
107   * from the local id and participating authorities.
108   */
109   struct GNUNET_HashCode *global_id;
110
111   /**
112    * Local client in this consensus session.
113    * There is only one client per consensus session.
114    */
115   struct GNUNET_SERVER_Client *client;
116
117   /**
118    * Values in the consensus set of this session,
119    * all of them either have been sent by or approved by the client.
120    */
121   struct GNUNET_CONTAINER_MultiHashMap *values;
122
123   /**
124    * Elements that have not been sent to the client yet.
125    */
126   struct PendingElement *transmit_pending_head;
127
128   /**
129    * Elements that have not been sent to the client yet.
130    */
131   struct PendingElement *transmit_pending_tail;
132
133   /**
134    * Elements that have not been approved (or rejected) by the client yet.
135    */
136   struct PendingElement *approval_pending_head;
137
138   /**
139    * Elements that have not been approved (or rejected) by the client yet.
140    */
141   struct PendingElement *approval_pending_tail;
142
143   /**
144    * Currently active transmit handle for sending to the client
145    */
146   struct GNUNET_SERVER_TransmitHandle *th;
147
148   /**
149    * Once conclude_requested is GNUNET_YES, the client may not
150    * insert any more values.
151    */
152   int conclude_requested;
153
154   /**
155    * Client has been informed about the conclusion.
156    */
157   int conclude_sent;
158
159   /**
160    * Minimum number of peers to form a consensus group
161    */
162   int conclude_group_min;
163
164   /**
165    * Current round of the conclusion
166    */
167   int current_round;
168
169   /**
170    * Soft deadline for conclude.
171    * Speed up the speed of the consensus at the cost of consensus quality, as
172    * the time approached or crosses the deadline.
173    */
174   struct GNUNET_TIME_Absolute conclude_deadline;
175
176   /**
177    * Number of other peers in the consensus
178    */
179   unsigned int num_peers;
180
181   /**
182    * Other peers in the consensus, array of ConsensusPeer
183    */
184   struct ConsensusPeer *peers;
185
186   /**
187    * Tunnel for broadcasting to all other authorities
188    */
189   struct GNUNET_MESH_Tunnel *broadcast_tunnel;
190
191   /**
192    * Time limit for one round of pairwise exchange.
193    * FIXME: should not actually be a constant
194    */
195   struct GNUNET_TIME_Relative round_time;
196
197   /**
198    * Task identifier for the round timeout task
199    */
200   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
201 };
202
203
204 /**
205  * Linked list of sesstions this peer participates in.
206  */
207 static struct ConsensusSession *sessions_head;
208
209 /**
210  * Linked list of sesstions this peer participates in.
211  */
212 static struct ConsensusSession *sessions_tail;
213
214 /**
215  * Configuration of the consensus service.
216  */
217 static const struct GNUNET_CONFIGURATION_Handle *cfg;
218
219 /**
220  * Handle to the server for this service.
221  */
222 static struct GNUNET_SERVER_Handle *srv;
223
224 /**
225  * Peer that runs this service
226  */
227 static struct GNUNET_PeerIdentity *my_peer;
228
229 /**
230  * Handle to the mesh service.
231  */
232 static struct GNUNET_MESH_Handle *mesh;
233
234 /**
235  * Handle to the core service. Only used during service startup, will be NULL after that.
236  */
237 static struct GNUNET_CORE_Handle *core;
238
239 static void
240 disconnect_client (struct GNUNET_SERVER_Client *client)
241 {
242   GNUNET_SERVER_client_disconnect (client);
243   /* FIXME: free data structures that this client owns */
244 }
245
246 static void
247 compute_global_id (struct GNUNET_HashCode *dst,
248                    const struct GNUNET_HashCode *local_id,
249                    const struct GNUNET_PeerIdentity *peers,
250                    int num_peers)
251 {
252   int i;
253   struct GNUNET_HashCode tmp;
254
255   *dst = *local_id;
256   for (i = 0; i < num_peers; ++i)
257   {
258     GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
259     *dst = tmp;
260     GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
261     *dst = tmp;
262   }
263 }
264
265
266 static size_t
267 transmit_pending (void *cls, size_t size, void *buf)
268 {
269   struct GNUNET_CONSENSUS_Element *element;
270   struct GNUNET_CONSENSUS_ElementMessage *msg;
271   struct ConsensusSession *session;
272
273   session = (struct ConsensusSession *) cls;
274   msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf;
275   element = session->transmit_pending_head->element;
276
277   GNUNET_assert (NULL != element);
278
279   session->th = NULL;
280
281   msg->element_type = element->type;
282   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
283   msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size);
284   memcpy (&msg[1], element->data, element->size);
285
286   session->transmit_pending_head = session->transmit_pending_head->next;
287
288   send_next (session);
289
290   return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size;
291 }
292
293
294 static size_t
295 transmit_conclude_done (void *cls, size_t size, void *buf)
296 {
297   struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg;
298
299   msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf;
300   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
301   msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage));
302   msg->num_peers = htons (0);
303
304   return sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage);
305 }
306
307
308 /**
309  * Schedule sending the next message (if there is any) to a client.
310  *
311  * @param cli the client to send the next message to
312  */
313 static void
314 send_next (struct ConsensusSession *session)
315 {
316   int msize;
317
318   GNUNET_assert (NULL != session);
319
320   if (NULL != session->th)
321   {
322     return;
323   }
324
325   if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO))
326   {
327     /* FIXME */
328     msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
329     session->th =
330         GNUNET_SERVER_notify_transmit_ready (session->client, msize,
331                                              GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, session);
332     session->conclude_sent = GNUNET_YES;
333   }
334   else if (NULL != session->transmit_pending_head)
335   {
336     msize = session->transmit_pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage);
337     session->th =
338         GNUNET_SERVER_notify_transmit_ready (session->client, msize,
339                                              GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, session);
340     /* TODO: insert into ack pending */
341   }
342 }
343
344
345 /**
346  * Method called whenever a peer has disconnected from the tunnel.
347  * Implementations of this callback must NOT call
348  * GNUNET_MESH_tunnel_destroy immediately, but instead schedule those
349  * to run in some other task later.  However, calling 
350  * "GNUNET_MESH_notify_transmit_ready_cancel" is allowed.
351  *
352  * @param cls closure
353  * @param peer peer identity the tunnel stopped working with
354  */
355 static void
356 disconnect_handler (void *cls, const struct GNUNET_PeerIdentity *peer)
357 {
358   /* FIXME: how do we handle this */
359 }
360
361
362 /**
363  * Method called whenever a peer has connected to the tunnel.
364  *
365  * @param cls closure
366  * @param peer peer identity the tunnel was created to, NULL on timeout
367  * @param atsi performance data for the connection
368  */
369 static void
370 connect_handler (void *cls,
371                  const struct GNUNET_PeerIdentity *peer,
372                  const struct GNUNET_ATS_Information *atsi)
373 {
374   /* not much we can do here, now we know the other peer has been added to our broadcast tunnel */
375 }
376
377
378 /**
379  * Called when a client wants to join a consensus session.
380  *
381  * @param cls unused
382  * @param client client that sent the message
383  * @param m message sent by the client
384  */
385 static void
386 client_join (void *cls,
387              struct GNUNET_SERVER_Client *client,
388              const struct GNUNET_MessageHeader *m)
389 {
390   struct GNUNET_HashCode global_id;
391   const struct GNUNET_CONSENSUS_JoinMessage *msg;
392   struct ConsensusSession *session;
393   unsigned int i;
394
395   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n");
396
397   msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
398
399   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session id is %s\n", GNUNET_h2s (&msg->session_id));
400
401   compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers);
402
403   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "computed global id is %s\n", GNUNET_h2s (&global_id));
404
405   session = sessions_head;
406   while (NULL != session)
407   {
408     if (client == session->client)
409     {
410
411       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n");
412       disconnect_client (client);
413       return;
414     }
415     if (0 == memcmp (session->global_id, &global_id, sizeof (struct GNUNET_HashCode)))
416     {
417       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "session already owned by another client\n");
418       disconnect_client (client);
419       return;
420     }
421     session = session->next;
422   }
423
424   GNUNET_SERVER_client_keep (client);
425
426   /* session does not exist yet, create it */
427   session = GNUNET_malloc (sizeof (struct ConsensusSession));
428   session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode));
429   session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode));
430   session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
431   session->client = client;
432   /* FIXME: should not be a constant, but chosen adaptively */
433   session->round_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
434
435   session->broadcast_tunnel = GNUNET_MESH_tunnel_create (mesh, session, connect_handler, disconnect_handler, session);
436
437   session->num_peers = 0;
438
439   /* count the peers that are not the local peer */
440   for (i = 0; i < msg->num_peers; i++)
441   {
442     struct GNUNET_PeerIdentity *peers;
443     peers = (struct GNUNET_PeerIdentity *) &msg[1];
444     if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
445       session->num_peers++;
446   }
447
448   session->peers = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeer));
449
450   /* copy the peer identities and add peers to broadcast tunnel */
451   for (i = 0; i < msg->num_peers; i++)
452   {
453     struct GNUNET_PeerIdentity *peers;
454     peers = (struct GNUNET_PeerIdentity *) &msg[1];
455     if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
456     {
457       *session->peers->peer_id = peers[i];
458       GNUNET_MESH_peer_request_connect_add (session->broadcast_tunnel, &peers[i]);
459     }
460   }
461
462   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n");
463
464   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
465
466   GNUNET_SERVER_receive_done (client, GNUNET_OK);
467 }
468
469
470 /**
471  * Called when a client performs an insert operation.
472  */
473 void
474 client_insert (void *cls,
475              struct GNUNET_SERVER_Client *client,
476              const struct GNUNET_MessageHeader *m)
477 {
478   struct ConsensusSession *session;
479   struct GNUNET_CONSENSUS_ElementMessage *msg;
480   struct GNUNET_CONSENSUS_Element *element;
481   struct GNUNET_HashCode key;
482   int element_size;
483
484   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n");
485
486   session = sessions_head;
487   while (NULL != session)
488   {
489     if (session->client == client)
490       break;
491   }
492
493   if (NULL == session)
494   {
495     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
496     GNUNET_SERVER_client_disconnect (client);
497     return;
498   }
499
500   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
501   element_size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
502
503   element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
504
505   element->type = msg->element_type;
506   element->size = element_size;
507   memcpy (&element[1], &msg[1], element_size);
508   element->data = &element[1];
509
510   GNUNET_CRYPTO_hash (element, element_size, &key);
511
512   GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
513                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
514
515   GNUNET_SERVER_receive_done (client, GNUNET_OK);
516
517   send_next (session);
518 }
519
520
521 /**
522  * Do one round of the conclusion.
523  * Start by broadcasting the set difference estimator (IBF strata).
524  *
525  */
526 void
527 conclude_do_round (struct ConsensusSession *session)
528 {
529   /* FIXME */
530 }
531
532
533 /**
534  * Cancel the current round if necessary, decide to run another round or
535  * terminate.
536  */
537 void
538 conclude_round_done (struct ConsensusSession *session)
539 {
540   /* FIXME */
541 }
542
543
544 /**
545  * Called when a client performs the conclude operation.
546  */
547 void
548 client_conclude (void *cls,
549              struct GNUNET_SERVER_Client *client,
550              const struct GNUNET_MessageHeader *message)
551 {
552   struct ConsensusSession *session;
553
554   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
555
556   session = sessions_head;
557   while ((session != NULL) && (session->client != client))
558   {
559     session = session->next;
560   }
561   if (NULL == session)
562   {
563     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client not found\n");
564     GNUNET_SERVER_client_disconnect (client);
565     return;
566   }
567
568   if (GNUNET_YES == session->conclude_requested)
569   {
570     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client requested conclude twice\n");
571     GNUNET_SERVER_client_disconnect (client);
572     return;
573   }
574
575   session->conclude_requested = GNUNET_YES;
576
577   conclude_do_round (session);
578
579   GNUNET_SERVER_receive_done (client, GNUNET_OK);
580
581   send_next (session);
582 }
583
584
585 /**
586  * Called when a client sends an ack
587  */
588 void
589 client_ack (void *cls,
590              struct GNUNET_SERVER_Client *client,
591              const struct GNUNET_MessageHeader *message)
592 {
593   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n");
594 }
595
596 /**
597  * Task that disconnects from core.
598  *
599  * @param cls core handle
600  * @param tc context information (why was this task triggered now)
601  */
602 static void
603 disconnect_core (void *cls,
604                  const struct GNUNET_SCHEDULER_TaskContext *tc)
605 {
606   GNUNET_CORE_disconnect (core);
607   core = NULL;
608   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
609 }
610
611
612 static void
613 core_startup (void *cls,
614               struct GNUNET_CORE_Handle *core,
615               const struct GNUNET_PeerIdentity *peer)
616 {
617   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
618     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
619     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
620     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
621         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
622     {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
623         sizeof (struct GNUNET_CONSENSUS_AckMessage)},
624     {NULL, NULL, 0, 0}
625   };
626
627   GNUNET_SERVER_add_handlers (srv, handlers);
628   my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
629   /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
630   GNUNET_SCHEDULER_add_now (&disconnect_core, core);
631   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
632 }
633
634
635
636 /**
637  * Method called whenever another peer has added us to a tunnel
638  * the other peer initiated.
639  * Only called (once) upon reception of data with a message type which was
640  * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
641  * causes te tunnel to be ignored and no further notifications are sent about
642  * the same tunnel.
643  *
644  * @param cls closure
645  * @param tunnel new handle to the tunnel
646  * @param initiator peer that started the tunnel
647  * @param atsi performance information for the tunnel
648  * @return initial tunnel context for the tunnel
649  *         (can be NULL -- that's not an error)
650  */
651 static void *
652 new_tunnel (void *cls,
653             struct GNUNET_MESH_Tunnel *tunnel,
654             const struct GNUNET_PeerIdentity *initiator,
655             const struct GNUNET_ATS_Information *atsi)
656 {
657   /* there's nothing we can do here, as we don't have the global consensus id yet */
658   return NULL;
659 }
660
661
662 /**
663  * Function called whenever an inbound tunnel is destroyed.  Should clean up
664  * any associated state.  This function is NOT called if the client has
665  * explicitly asked for the tunnel to be destroyed using
666  * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
667  * the tunnel.
668  *
669  * @param cls closure (set from GNUNET_MESH_connect)
670  * @param tunnel connection to the other end (henceforth invalid)
671  * @param tunnel_ctx place where local state associated
672  *                   with the tunnel is stored
673  */
674 static void
675 cleaner (void *cls, const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
676 {
677   /* FIXME: what to do here? */
678 }
679
680
681
682 /**
683  * Called to clean up, after a shutdown has been requested.
684  *
685  * @param cls closure
686  * @param tc context information (why was this task triggered now)
687  */
688 static void
689 shutdown_task (void *cls,
690                const struct GNUNET_SCHEDULER_TaskContext *tc)
691 {
692   /* mesh requires all the tunnels to be destroyed manually */
693   while (NULL != sessions_head)
694   {
695     struct ConsensusSession *session;
696     session = sessions_head;
697     GNUNET_MESH_tunnel_destroy (sessions_head->broadcast_tunnel);
698     sessions_head = sessions_head->next;
699     GNUNET_free (session);
700   }
701
702   if (NULL != mesh)
703   {
704     GNUNET_MESH_disconnect (mesh);
705     mesh = NULL;
706   }
707   if (NULL != core)
708   {
709     GNUNET_CORE_disconnect (core);
710     core = NULL;
711   }
712 }
713
714
715
716 /**
717  * Functions with this signature are called whenever a message is
718  * received.
719  *
720  * @param cls closure (set from GNUNET_MESH_connect)
721  * @param tunnel connection to the other end
722  * @param tunnel_ctx place to store local state associated with the tunnel
723  * @param sender who sent the message
724  * @param message the actual message
725  * @param atsi performance data for the connection
726  * @return GNUNET_OK to keep the connection open,
727  *         GNUNET_SYSERR to close it (signal serious error)
728  */
729 static int
730 p2p_delta_estimate (void *cls,
731                     struct GNUNET_MESH_Tunnel * tunnel,
732                     void **tunnel_ctx,
733                     const struct GNUNET_PeerIdentity *sender,
734                     const struct GNUNET_MessageHeader *message,
735                     const struct GNUNET_ATS_Information *atsi)
736 {
737   /* FIXME */
738   return GNUNET_OK;
739 }
740
741
742 /**
743  * Functions with this signature are called whenever a message is
744  * received.
745  *
746  * @param cls closure (set from GNUNET_MESH_connect)
747  * @param tunnel connection to the other end
748  * @param tunnel_ctx place to store local state associated with the tunnel
749  * @param sender who sent the message
750  * @param message the actual message
751  * @param atsi performance data for the connection
752  * @return GNUNET_OK to keep the connection open,
753  *         GNUNET_SYSERR to close it (signal serious error)
754  */
755 static int
756 p2p_difference_digest (void *cls,
757                        struct GNUNET_MESH_Tunnel * tunnel,
758                        void **tunnel_ctx,
759                        const struct GNUNET_PeerIdentity *sender,
760                        const struct GNUNET_MessageHeader *message,
761                        const struct GNUNET_ATS_Information *atsi)
762 {
763   /* FIXME */
764   return GNUNET_OK;
765 }
766
767
768 /**
769  * Functions with this signature are called whenever a message is
770  * received.
771  *
772  * @param cls closure (set from GNUNET_MESH_connect)
773  * @param tunnel connection to the other end
774  * @param tunnel_ctx place to store local state associated with the tunnel
775  * @param sender who sent the message
776  * @param message the actual message
777  * @param atsi performance data for the connection
778  * @return GNUNET_OK to keep the connection open,
779  *         GNUNET_SYSERR to close it (signal serious error)
780  */
781 static int
782 p2p_elements_and_requests (void *cls,
783                            struct GNUNET_MESH_Tunnel * tunnel,
784                            void **tunnel_ctx,
785                            const struct GNUNET_PeerIdentity *sender,
786                            const struct GNUNET_MessageHeader *message,
787                            const struct GNUNET_ATS_Information *atsi)
788 {
789   /* FIXME */
790   return GNUNET_OK;
791 }
792
793
794 /**
795  * Start processing consensus requests.
796  *
797  * @param cls closure
798  * @param server the initialized server
799  * @param c configuration to use
800  */
801 static void
802 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
803 {
804   static const struct GNUNET_CORE_MessageHandler handlers[] = {
805     {NULL, 0, 0}
806   };
807   static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
808     {p2p_delta_estimate, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE, 0},
809     {p2p_difference_digest, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST, 0},
810     {p2p_elements_and_requests, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS, 0},
811     {NULL, 0, 0}
812   };
813   static const GNUNET_MESH_ApplicationType app_types[] = { 
814     GNUNET_APPLICATION_TYPE_CONSENSUS,
815     GNUNET_APPLICATION_TYPE_END
816   };
817
818   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
819
820   cfg = c;
821   srv = server;
822
823   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
824
825   mesh = GNUNET_MESH_connect (cfg, NULL, new_tunnel, cleaner, mesh_handlers, app_types);
826   GNUNET_assert (NULL != mesh);
827
828   /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
829   core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers);
830   GNUNET_assert (NULL != core);
831 }
832
833
834 /**
835  * The main function for the consensus service.
836  *
837  * @param argc number of arguments from the command line
838  * @param argv command line arguments
839  * @return 0 ok, 1 on error
840  */
841 int
842 main (int argc, char *const *argv)
843 {
844   int ret;
845   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
846   return (GNUNET_OK == ret) ? 0 : 1;
847 }
848