-options to play with
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21
22 #include "platform.h"
23 #include "gnunet_common.h"
24 #include "gnunet_protocols.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_consensus_service.h"
27 #include "gnunet_core_service.h"
28 #include "gnunet_mesh_service.h"
29 #include "consensus.h"
30
31
32 struct ConsensusSession;
33
34 static void
35 send_next (struct ConsensusSession *session);
36
37
38 /**
39  * An element that is waiting to be transmitted to a client.
40  */
41 struct PendingElement
42 {
43   /**
44    * Pending elements are kept in a DLL.
45    */
46   struct PendingElement *next;
47
48   /**
49    * Pending elements are kept in a DLL.
50    */
51   struct PendingElement *prev;
52
53   /**
54    * The actual element
55    */
56   struct GNUNET_CONSENSUS_Element *element;
57 };
58
59
60 /**
61  * A consensus session consists of one local client and the remote authorities.
62  */
63 struct ConsensusSession
64 {
65   /**
66    * Consensus sessions are kept in a DLL.
67    */
68   struct ConsensusSession *next;
69
70   /**
71    * Consensus sessions are kept in a DLL.
72    */
73   struct ConsensusSession *prev;
74
75   /**
76    * Local consensus identification, chosen by clients.
77    */
78   struct GNUNET_HashCode *local_id;
79  
80   /**
81   * Global consensus identification, computed
82   * from the local id and participating authorities.
83   */
84   struct GNUNET_HashCode *global_id;
85
86   /**
87    * Corresponding server handle.
88    */
89   struct GNUNET_SERVER_Client *client;
90
91   /**
92    * Client wants to receive and send updates.
93    */
94   int begin;
95
96   /**
97    * Values in the consensus set of this session,
98    * all of them either have been sent or approved by the client.
99    */
100   struct GNUNET_CONTAINER_MultiHashMap *values;
101
102   /**
103    * Elements that have not been sent to the client yet.
104    */
105   struct PendingElement *transmit_pending_head;
106
107   /**
108    * Elements that have not been sent to the client yet.
109    */
110   struct PendingElement *transmit_pending_tail;
111
112   /**
113    * Elements that have not been sent to the client yet.
114    */
115   struct PendingElement *approval_pending_head;
116
117   /**
118    * Elements that have not been sent to the client yet.
119    */
120   struct PendingElement *approval_pending_tail;
121
122   /**
123    * Currently active transmit handle for sending to the client
124    */
125   struct GNUNET_SERVER_TransmitHandle *th;
126
127   /**
128    * Once conclude_requested is GNUNET_YES, the client may not
129    * insert any more values.
130    */
131   int conclude_requested;
132
133   /**
134    * Client has been informed about the conclusion.
135    */
136   int conclude_sent;
137
138   /**
139    * Number of other peers in the consensus
140    */
141   int num_peers;
142 };
143
144
145 /**
146  * Linked list of sesstions this peer participates in.
147  */
148 static struct ConsensusSession *sessions_head;
149
150 /**
151  * Linked list of sesstions this peer participates in.
152  */
153 static struct ConsensusSession *sessions_tail;
154
155 /**
156  * Configuration of the consensus service.
157  */
158 static const struct GNUNET_CONFIGURATION_Handle *cfg;
159
160 /**
161  * Handle to the server for this service.
162  */
163 static struct GNUNET_SERVER_Handle *srv;
164
165 /**
166  * Peer that runs this service
167  */
168 static struct GNUNET_PeerIdentity *my_peer;
169
170 static void
171 disconnect_client (struct GNUNET_SERVER_Client *client)
172 {
173   /* FIXME */
174 }
175
176 static void
177 compute_global_id (struct GNUNET_HashCode *dst,
178                    const struct GNUNET_HashCode *local_id,
179                    const struct GNUNET_PeerIdentity *peers,
180                    int num_peers)
181 {
182   int i;
183   struct GNUNET_HashCode tmp;
184
185   *dst = *local_id;
186   for (i = 0; i < num_peers; ++i)
187   {
188     /* FIXME: maybe hash_xor/hash allow aliased source/target, and we can get by without tmp */
189     GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
190     *dst = tmp;
191     GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
192     *dst = tmp;
193   }
194 }
195
196
197 static size_t
198 transmit_pending (void *cls, size_t size, void *buf)
199 {
200   struct GNUNET_CONSENSUS_Element *element;
201   struct GNUNET_CONSENSUS_ElementMessage *msg;
202   struct ConsensusSession *session;
203
204   session = (struct ConsensusSession *) cls;
205   msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf;
206   element = session->transmit_pending_head->element;
207
208   GNUNET_assert (NULL != element);
209
210   session->th = NULL;
211
212   msg->element_type = element->type;
213   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
214   msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size);
215   memcpy (&msg[1], element->data, element->size);
216
217   session->transmit_pending_head = session->transmit_pending_head->next;
218
219   send_next (session);
220
221   return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size;
222 }
223
224
225 static size_t
226 transmit_conclude_done (void *cls, size_t size, void *buf)
227 {
228   struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg;
229
230   msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf;
231   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
232   msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage));
233   msg->num_peers = htons (0);
234
235   return sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage);
236 }
237
238
239 /**
240  * Schedule sending the next message (if there is any) to a client.
241  *
242  * @param cli the client to send the next message to
243  */
244 static void
245 send_next (struct ConsensusSession *session)
246 {
247   int msize;
248
249   GNUNET_assert (NULL != session);
250
251   if (NULL != session->th)
252   {
253     return;
254   }
255
256   if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO))
257   {
258     /* just the conclude message with no other authorities in the dummy */
259     msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
260     session->th =
261         GNUNET_SERVER_notify_transmit_ready (session->client, msize,
262                                              GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, session);
263     session->conclude_sent = GNUNET_YES;
264   }
265   else if (NULL != session->transmit_pending_head)
266   {
267     msize = session->transmit_pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage);
268     session->th =
269         GNUNET_SERVER_notify_transmit_ready (session->client, msize,
270                                              GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, session);
271     /* TODO: insert into ack pending */
272   }
273 }
274
275
276 /**
277  * Called when a client wants to join a consensus session.
278  *
279  * @param cls unused
280  * @param client client that sent the message
281  * @param m message sent by the client
282  */
283 static void
284 client_join (void *cls,
285              struct GNUNET_SERVER_Client *client,
286              const struct GNUNET_MessageHeader *m)
287 {
288   struct GNUNET_HashCode global_id;
289   const struct GNUNET_CONSENSUS_JoinMessage *msg;
290   struct ConsensusSession *session;
291
292   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n");
293
294   msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
295
296   compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers);
297
298   session = sessions_head;
299   while (NULL != session)
300   {
301     if (client == session->client)
302     {
303       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n");
304       disconnect_client (client);
305       return;
306     }
307     if (0 == memcmp (session->global_id, &global_id, sizeof (struct GNUNET_HashCode)))
308     {
309       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "session already owned by another client\n");
310       disconnect_client (client);
311       return;
312     }
313   }
314
315   GNUNET_SERVER_client_keep (client);
316
317   /* session does not exist yet, create it */
318   session = GNUNET_malloc (sizeof (struct ConsensusSession));
319   session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode));
320   session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode));
321   session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
322   session->client = client;
323
324   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
325
326   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n");
327
328   GNUNET_SERVER_receive_done (client, GNUNET_OK);
329 }
330
331
332 /**
333  * Called when a client performs an insert operation.
334  */
335 void
336 client_insert (void *cls,
337              struct GNUNET_SERVER_Client *client,
338              const struct GNUNET_MessageHeader *m)
339 {
340   struct ConsensusSession *session;
341   struct GNUNET_CONSENSUS_ElementMessage *msg;
342   struct GNUNET_CONSENSUS_Element *element;
343   struct GNUNET_HashCode key;
344   int element_size;
345
346   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n");
347
348   session = sessions_head;
349   while (NULL != session)
350   {
351     if (session->client == client)
352       break;
353   }
354
355   if (NULL == session)
356   {
357     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
358     GNUNET_SERVER_client_disconnect (client);
359     return;
360   }
361
362   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
363   element_size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
364
365   element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
366
367   element->type = msg->element_type;
368   element->size = element_size;
369   memcpy (&element[1], &msg[1], element_size);
370   element->data = &element[1];
371
372   GNUNET_CRYPTO_hash (element, element_size, &key);
373
374   GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
375                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
376
377   GNUNET_SERVER_receive_done (client, GNUNET_OK);
378
379   send_next (session);
380 }
381
382
383 /**
384  * Called when a client wants to begin
385  */
386 void
387 client_begin (void *cls,
388              struct GNUNET_SERVER_Client *client,
389              const struct GNUNET_MessageHeader *message)
390 {
391   struct ConsensusSession *session;
392
393   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client requested begin\n");
394
395   session = sessions_head;
396   while (NULL != session)
397   {
398     if (session->client == client)
399       break;
400   }
401
402   if (NULL == session)
403   {
404     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to 'begin', but client is not in any session\n");
405     GNUNET_SERVER_client_disconnect (client);
406     return;
407   }
408
409   session->begin = GNUNET_YES;
410
411   send_next (session);
412
413   GNUNET_SERVER_receive_done (client, GNUNET_OK);
414 }
415
416
417
418 /**
419  * Called when a client performs the conclude operation.
420  */
421 void
422 client_conclude (void *cls,
423              struct GNUNET_SERVER_Client *client,
424              const struct GNUNET_MessageHeader *message)
425 {
426   struct ConsensusSession *session;
427
428   session = sessions_head;
429   while ((session != NULL) && (session->client != client))
430   {
431     session = session->next;
432   }
433   if (NULL == session)
434   {
435     GNUNET_SERVER_client_disconnect (client);
436     return;
437   }
438   session->conclude_requested = GNUNET_YES;
439   send_next (session);
440   GNUNET_SERVER_receive_done (client, GNUNET_OK);
441 }
442
443
444 /**
445  * Called when a client sends an ack
446  */
447 void
448 client_ack (void *cls,
449              struct GNUNET_SERVER_Client *client,
450              const struct GNUNET_MessageHeader *message)
451 {
452   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n");
453 }
454
455 /**
456  * Task that disconnects from core.
457  *
458  * @param cls core handle
459  * @param tc context information (why was this task triggered now)
460  */
461 static void
462 disconnect_core (void *cls,
463                  const struct GNUNET_SCHEDULER_TaskContext *tc)
464 {
465   struct GNUNET_CORE_Handle *core;
466   core = (struct GNUNET_CORE_Handle *) cls;
467   GNUNET_CORE_disconnect (core);
468
469   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
470 }
471
472
473 static void
474 core_startup (void *cls,
475               struct GNUNET_CORE_Handle *core,
476               const struct GNUNET_PeerIdentity *peer)
477 {
478   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
479     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
480     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
481     {&client_begin, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN,
482         sizeof (struct GNUNET_MessageHeader)},
483     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
484         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
485     {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
486         sizeof (struct GNUNET_CONSENSUS_AckMessage)},
487     {NULL, NULL, 0, 0}
488   };
489
490   GNUNET_SERVER_add_handlers (srv, handlers);
491   my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
492   GNUNET_SCHEDULER_add_now (&disconnect_core, core);
493   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
494 }
495
496
497 /**
498  * Process consensus requests.
499  *
500  * @param cls closure
501  * @param server the initialized server
502  * @param c configuration to use
503  */
504 static void
505 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
506 {
507   struct GNUNET_CORE_Handle *my_core;
508   static const struct GNUNET_CORE_MessageHandler handlers[] = {
509     {NULL, 0, 0}
510   };
511
512   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus  running\n");
513
514   cfg = c;
515   srv = server;
516   my_core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers);
517   GNUNET_assert (NULL != my_core);
518 }
519
520
521 /**
522  * The main function for the statistics service.
523  *
524  * @param argc number of arguments from the command line
525  * @param argv command line arguments
526  * @return 0 ok, 1 on error
527  */
528 int
529 main (int argc, char *const *argv)
530 {
531   return (GNUNET_OK ==
532           GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
533 }
534