fixed copying of wrong memory address
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21
22 #include "platform.h"
23 #include "gnunet_protocols.h"
24 #include "gnunet_common.h"
25 #include "gnunet_service_lib.h"
26 #include "gnunet_consensus_service.h"
27 #include "gnunet_core_service.h"
28 #include "gnunet_container_lib.h"
29 #include "consensus.h"
30
31
32 struct ConsensusClient;
33
34 static void
35 send_next (struct ConsensusClient *cli);
36
37
38 /**
39  * An element that is waiting to be transmitted to a client.
40  */
41 struct PendingElement
42 {
43   /**
44    * Pending elements are kept in a DLL.
45    */
46   struct PendingElement *next;
47
48   /**
49    * Pending elements are kept in a DLL.
50    */
51   struct PendingElement *prev;
52
53   /**
54    * The actual element
55    */
56   struct GNUNET_CONSENSUS_Element *element;
57 };
58
59
60 /**
61  * A consensus session consists of one or more local clients,
62  * as well as zero or more remote authorities.
63  */
64 struct ConsensusSession
65 {
66   /**
67    * Consensus sessions are kept in a DLL.
68    */
69   struct ConsensusSession *next;
70
71   /**
72    * Consensus sessions are kept in a DLL.
73    */
74   struct ConsensusSession *prev;
75
76   /**
77    * Consensus clients are kept in a DLL.
78    */
79   struct ConsensusClient *clients_head;
80
81   /**
82    * Consensus clients are kept in a DLL.
83    */
84   struct ConsensusClient *clients_tail;
85
86   /**
87   * Local consensus identification, chosen by clients.
88   */
89   struct GNUNET_HashCode *local_id;
90  
91   /**
92   * Global consensus identification, computed
93   * from the local id and participating authorities.
94   */
95   struct GNUNET_HashCode *global_id;
96
97   /**
98    * Values in the consensus set of this session.
99    */
100   struct GNUNET_CONTAINER_MultiHashMap *values;
101 };
102
103
104 struct ConsensusClient
105 {
106   /**
107    * Consensus clients are kept in a DLL.
108    */
109   struct ConsensusClient *next;
110   /**
111    * Consensus clients are kept in a DLL.
112    */
113   struct ConsensusClient *prev;
114
115   /**
116    * Corresponding server handle.
117    */
118   struct GNUNET_SERVER_Client *client;
119
120   /**
121    * Client wants to receive and send updates.
122    */
123   int begin;
124
125   /**
126    * Session this client belongs to
127    */
128   struct ConsensusSession *session;
129
130   /**
131    * Values in the consensus set of this client.
132    * Includes pending elements.
133    */
134   struct GNUNET_CONTAINER_MultiHashMap *values;
135
136   /**
137    * Elements that have not been set to the client yet.
138    */
139   struct PendingElement *pending_head;
140   /**
141    * Elements that have not been set to the client yet.
142    */
143   struct PendingElement *pending_tail;
144
145   /**
146    * Currently active transmit handle for sending to the client
147    */
148   struct GNUNET_SERVER_TransmitHandle *th;
149
150   /**
151    * Once conclude_requested is GNUNET_YES, the client may not
152    * insert any more values.
153    */
154   int conclude_requested;
155
156   /**
157    * Client has been informed about the conclusion.
158    */
159   int conclude_sent;
160 };
161
162
163 /**
164  * Linked list of sesstions this peer participates in.
165  */
166 static struct ConsensusSession *sessions_head;
167
168 /**
169  * Linked list of sesstions this peer participates in.
170  */
171 static struct ConsensusSession *sessions_tail;
172
173 /**
174  * Configuration of the consensus service.
175  */
176 static const struct GNUNET_CONFIGURATION_Handle *cfg;
177
178 /**
179  * Handle to the server for this service.
180  */
181 static struct GNUNET_SERVER_Handle *srv;
182
183 /**
184  * Peer that runs this service
185  */
186 static struct GNUNET_PeerIdentity *my_peer;
187
188
189 struct ConsensusClient *
190 find_client (const struct GNUNET_SERVER_Client *srv_client)
191 {
192   struct ConsensusSession *session;
193   struct ConsensusClient *client;
194
195   session = sessions_head;
196   while (NULL != session)
197   {
198     client = session->clients_head;
199     while (NULL != client)
200     {
201       if (client->client == srv_client)
202       {
203         return client;
204       }
205       client = client->next;
206     }
207     session = session->next;
208   }
209   return NULL;
210 }
211
212 static void
213 disconnect_client (struct GNUNET_SERVER_Client *client)
214 {
215   /* FIXME */
216 }
217
218 static void
219 compute_global_id (struct GNUNET_HashCode *dst,
220                    const struct GNUNET_HashCode *local_id,
221                    const struct GNUNET_PeerIdentity *peers,
222                    int num_peers)
223 {
224   *dst = *local_id;
225
226   /* FIXME: hash other peers into global id */
227 }
228
229
230
231 /**
232  * Iterator over hash map entries.
233  *
234  * @param cls closure, the client
235  * @param key current key code
236  * @param value value in the hash map
237  * @return GNUNET_YES if we should continue to
238  *         iterate,
239  *         GNUNET_NO if not.
240  */
241 int
242 update_pending (void *cls,
243                  const struct GNUNET_HashCode *key,
244                  void *value)
245 {
246   struct ConsensusClient *cli;
247   struct GNUNET_CONSENSUS_Element *element;
248   struct PendingElement *pending_element;
249
250   cli = (struct ConsensusClient *) cls;
251   element = (struct GNUNET_CONSENSUS_Element *) value;
252   pending_element = GNUNET_malloc (sizeof (struct PendingElement));
253   pending_element->element = element;
254
255   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (cli->values, key))
256   {
257     GNUNET_CONTAINER_DLL_insert_tail (cli->pending_head, cli->pending_tail, pending_element);
258     GNUNET_CONTAINER_multihashmap_put (cli->values, key, element, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
259   }
260   
261   return GNUNET_YES;
262 }
263
264
265
266
267 static size_t
268 transmit_pending (void *cls, size_t size, void *buf)
269 {
270   struct GNUNET_CONSENSUS_Element *element;
271   struct GNUNET_CONSENSUS_ElementMessage *msg;
272   struct ConsensusClient *cli;
273
274   cli = (struct ConsensusClient *) cls;
275   msg = (struct GNUNET_CONSENSUS_ElementMessage *) buf;
276   element = cli->pending_head->element;
277
278   GNUNET_assert (NULL != element);
279
280   cli->th = NULL;
281
282   msg->element_type = element->type;
283   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
284   msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size);
285   memcpy (&msg[1], element->data, element->size);
286
287
288   cli->pending_head = cli->pending_head->next;
289
290   send_next (cli);
291
292   return sizeof (struct GNUNET_CONSENSUS_ElementMessage) + element->size;
293 }
294
295
296 static size_t
297 transmit_conclude_done (void *cls, size_t size, void *buf)
298 {
299   struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg;
300
301   msg = (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) buf;
302   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
303   msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage));
304   msg->num_peers = htons (0);
305
306   return sizeof (struct GNUNET_CONSENSUS_ConcludeDoneMessage);
307 }
308
309
310 /**
311  * Schedule sending the next message (if there is any) to a client.
312  *
313  * @param cli the client to send the next message to
314  */
315 static void
316 send_next (struct ConsensusClient *cli)
317 {
318   int msize;
319
320   GNUNET_assert (NULL != cli);
321
322   if (NULL != cli->th)
323   {
324     return;
325   }
326
327   if ((cli->conclude_requested == GNUNET_YES) && (cli->conclude_sent == GNUNET_NO))
328   {
329     /* just the conclude message with no other authorities in the dummy */
330     msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
331     cli->th =
332         GNUNET_SERVER_notify_transmit_ready (cli->client, msize,
333                                              GNUNET_TIME_UNIT_FOREVER_REL, &transmit_conclude_done, cli);
334     cli->conclude_sent = GNUNET_YES;
335   }
336   else if (NULL != cli->pending_head)
337   {
338     msize = cli->pending_head->element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage);
339     cli->th =
340         GNUNET_SERVER_notify_transmit_ready (cli->client, msize,
341                                              GNUNET_TIME_UNIT_FOREVER_REL, &transmit_pending, cli);
342   }
343 }
344
345
346 /**
347  * Called when a client wants to join a consensus session.
348  */
349 static void
350 client_join (void *cls,
351              struct GNUNET_SERVER_Client *client,
352              const struct GNUNET_MessageHeader *m)
353 {
354   struct GNUNET_HashCode global_id;
355   const struct GNUNET_CONSENSUS_JoinMessage *msg;
356   struct ConsensusSession *session;
357   struct ConsensusClient *consensus_client;
358
359   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join\n");
360
361   fprintf(stderr, "foobar\n");
362
363   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joined\n");
364
365   msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
366   
367   /* kill the client if it already is in a session */
368   if (NULL != find_client (client))
369   {
370     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to join twice\n");
371     disconnect_client (client);
372     return;
373   }
374
375   consensus_client = GNUNET_malloc (sizeof (struct ConsensusClient));
376   consensus_client->client = client;
377   consensus_client->begin = GNUNET_NO;
378   consensus_client->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
379
380   GNUNET_SERVER_client_keep (client);
381
382   GNUNET_assert (NULL != consensus_client->values);
383
384   compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers);
385
386   /* look if we already have a session for this local id */
387   session = sessions_head;
388   while (NULL != session)
389   {
390     if (0 == memcmp(&global_id, session->global_id, sizeof (struct GNUNET_HashCode)))
391     {
392       GNUNET_CONTAINER_DLL_insert (session->clients_head, session->clients_tail, consensus_client);
393       GNUNET_SERVER_receive_done (client, GNUNET_OK);
394       return;
395     }
396     session = session->next;
397   }
398
399   /* session does not exist yet, create it */
400   session = GNUNET_malloc (sizeof (struct ConsensusSession));
401   session->local_id = GNUNET_memdup (&msg->session_id, sizeof (struct GNUNET_HashCode));
402   session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode));
403   session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
404
405   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
406   GNUNET_CONTAINER_DLL_insert (session->clients_head, session->clients_tail, consensus_client);
407
408   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session");
409
410   GNUNET_SERVER_receive_done (client, GNUNET_OK);
411 }
412
413
414 /**
415  * Called when a client performs an insert operation.
416  */
417 void
418 client_insert (void *cls,
419              struct GNUNET_SERVER_Client *client,
420              const struct GNUNET_MessageHeader *m)
421 {
422   struct ConsensusClient *consensus_client;
423   struct GNUNET_CONSENSUS_ElementMessage *msg;
424   struct GNUNET_CONSENSUS_Element *element;
425   struct PendingElement *pending_element;
426   struct GNUNET_HashCode key;
427   int element_size;
428
429   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n");
430
431   consensus_client = find_client (client);
432
433   if (NULL == consensus_client)
434   {
435     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
436     GNUNET_SERVER_client_disconnect (client);
437     return;
438   }
439
440   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
441   element_size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
442
443   element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
444
445   element->type = msg->element_type;
446   element->size = element_size;
447   memcpy (&element[1], &msg[1], element_size);
448   element->data = &element[1];
449
450   GNUNET_CRYPTO_hash (element, element_size, &key);
451
452   GNUNET_CONTAINER_multihashmap_put (consensus_client->session->values, &key, element,
453                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
454   GNUNET_CONTAINER_multihashmap_put (consensus_client->values, &key, element,
455                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
456
457   /* send the new value to all clients that don't have it */
458
459   consensus_client = consensus_client->session->clients_head;
460   while (NULL != consensus_client)
461   {
462     if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (consensus_client->values, &key))
463     {
464       pending_element = GNUNET_malloc (sizeof (struct PendingElement));
465       pending_element->element = element;
466       GNUNET_CONTAINER_DLL_insert_tail (consensus_client->pending_head, consensus_client->pending_tail, pending_element);
467       GNUNET_CONTAINER_multihashmap_put (consensus_client->values, &key, element,
468                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
469       send_next (consensus_client);
470     }
471   }
472
473   GNUNET_SERVER_receive_done (client, GNUNET_OK);
474 }
475
476
477 /**
478  * Called when a client wants to begin
479  */
480 void
481 client_begin (void *cls,
482              struct GNUNET_SERVER_Client *client,
483              const struct GNUNET_MessageHeader *message)
484 {
485   struct ConsensusClient *consensus_client;
486
487   consensus_client = find_client (client);
488
489   if (NULL == consensus_client)
490   {
491     GNUNET_SERVER_client_disconnect (client);
492     return;
493   }
494
495   consensus_client->begin = GNUNET_YES;
496
497   GNUNET_CONTAINER_multihashmap_iterate (consensus_client->session->values, &update_pending, NULL);
498   send_next (consensus_client);
499
500   GNUNET_SERVER_receive_done (client, GNUNET_OK);
501 }
502
503
504
505 /**
506  * Called when a client performs the conclude operation.
507  */
508 void
509 client_conclude (void *cls,
510              struct GNUNET_SERVER_Client *client,
511              const struct GNUNET_MessageHeader *message)
512 {
513   struct ConsensusClient *consensus_client;
514
515   consensus_client = find_client (client);
516   if (NULL == consensus_client)
517   {
518     GNUNET_SERVER_client_disconnect (client);
519     return;
520   }
521   consensus_client->conclude_requested = GNUNET_YES;
522   send_next (consensus_client);
523
524   GNUNET_SERVER_receive_done (client, GNUNET_OK);
525 }
526
527 /**
528  * Task that disconnects from core.
529  *
530  * @param cls core handle
531  * @param tc context information (why was this task triggered now)
532  */
533 static void
534 disconnect_core (void *cls,
535                  const struct GNUNET_SCHEDULER_TaskContext *tc)
536 {
537   struct GNUNET_CORE_Handle *core;
538   core = (struct GNUNET_CORE_Handle *) cls;
539   GNUNET_CORE_disconnect (core);
540
541   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
542 }
543
544
545 static void
546 core_startup (void *cls,
547               struct GNUNET_CORE_Handle *core,
548               const struct GNUNET_PeerIdentity *peer)
549 {
550   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
551     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
552     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
553     {&client_begin, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN,
554         sizeof (struct GNUNET_MessageHeader)},
555     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
556         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
557     {NULL, NULL, 0, 0}
558   };
559
560
561   GNUNET_SERVER_add_handlers (srv, handlers);
562
563   my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
564
565   GNUNET_SCHEDULER_add_now (&disconnect_core, core);
566
567   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
568 }
569
570
571 /**
572  * Process consensus requests.
573  *
574  * @param cls closure
575  * @param server the initialized server
576  * @param c configuration to use
577  */
578 static void
579 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
580 {
581   struct GNUNET_CORE_Handle *my_core;
582   static const struct GNUNET_CORE_MessageHandler handlers[] = {
583     {NULL, 0, 0}
584   };
585
586   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "run\n");
587
588   cfg = c;
589   srv = server;
590   my_core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers);
591   GNUNET_assert (NULL != my_core);
592 }
593
594
595 /**
596  * The main function for the statistics service.
597  *
598  * @param argc number of arguments from the command line
599  * @param argv command line arguments
600  * @return 0 ok, 1 on error
601  */
602 int
603 main (int argc, char *const *argv)
604 {
605   return (GNUNET_OK ==
606           GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
607 }
608