added consensus log-round simulation, work on consensus service, still problems with...
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35
36 /**
37  * Actions that can be queued.
38  */
39 struct QueuedMessage
40 {
41   /**
42    * Queued messages are stored in a doubly linked list.
43    */
44   struct QueuedMessage *next;
45
46   /**
47    * Queued messages are stored in a doubly linked list.
48    */
49   struct QueuedMessage *prev;
50
51   /**
52    * The actual queued message.
53    */
54   struct GNUNET_MessageHeader *msg;
55
56   /**
57    * Will be called after transmit, if not NULL
58    */
59   GNUNET_CONSENSUS_InsertDoneCallback idc;
60
61   /**
62    * Closure for idc
63    */
64   void *idc_cls;
65 };
66
67
68 /**
69  * Handle for the service.
70  */
71 struct GNUNET_CONSENSUS_Handle
72 {
73   /**
74    * Configuration to use.
75    */
76   const struct GNUNET_CONFIGURATION_Handle *cfg;
77
78   /**
79    * Client connected to the consensus service, may be NULL if not connected.
80    */
81   struct GNUNET_CLIENT_Connection *client;
82
83   /**
84    * Callback for new elements. Not called for elements added locally.
85    */
86   GNUNET_CONSENSUS_ElementCallback new_element_cb;
87
88   /**
89    * Closure for new_element_cb
90    */
91   void *new_element_cls;
92
93   /**
94    * The (local) session identifier for the consensus session.
95    */
96   struct GNUNET_HashCode session_id;
97
98   /**
99    * Number of peers in the consensus. Optionally includes the local peer.
100    */
101   int num_peers;
102
103   /**
104    * Peer identities of peers participating in the consensus, includes the local peer.
105    */
106   struct GNUNET_PeerIdentity **peers;
107
108   /**
109    * Currently active transmit request.
110    */
111   struct GNUNET_CLIENT_TransmitHandle *th;
112
113   /**
114    * GNUNES_YES iff the join message has been sent to the service.
115    */
116   int joined;
117
118   /**
119    * Closure for the insert done callback.
120    */
121   void *idc_cls;
122
123   /**
124    * Called when the conclude operation finishes or fails.
125    */
126   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
127
128   /**
129    * Closure for the conclude callback.
130    */
131   void *conclude_cls;
132
133   /**
134    * Deadline for the conclude operation.
135    */
136   struct GNUNET_TIME_Absolute conclude_deadline;
137
138   unsigned int conclude_min_size;
139
140   struct QueuedMessage *messages_head;
141   struct QueuedMessage *messages_tail;
142
143   /**
144    * GNUNET_YES when currently in a section where destroy may not be
145    * called.
146    */
147   int may_not_destroy;
148 };
149
150
151
152 /**
153  * Schedule transmitting the next message.
154  *
155  * @param consensus consensus handle
156  */
157 static void
158 send_next (struct GNUNET_CONSENSUS_Handle *consensus);
159
160
161 /**
162  * Function called to notify a client about the connection
163  * begin ready to queue more data.  "buf" will be
164  * NULL and "size" zero if the connection was closed for
165  * writing in the meantime.
166  *
167  * @param cls closure
168  * @param size number of bytes available in buf
169  * @param buf where the callee should write the message
170  * @return number of bytes written to buf
171  */
172 static size_t
173 transmit_queued (void *cls, size_t size,
174                  void *buf)
175 {
176   struct GNUNET_CONSENSUS_Handle *consensus;
177   struct QueuedMessage *qmsg;
178   size_t msg_size;
179
180   consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
181   consensus->th = NULL;
182
183   qmsg = consensus->messages_head;
184   GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
185
186   if (NULL == buf)
187   {
188     if (NULL != qmsg->idc)
189     {
190       qmsg->idc (qmsg->idc_cls, GNUNET_YES);
191     }
192     return 0;
193   }
194
195   msg_size = ntohs (qmsg->msg->size);
196
197   GNUNET_assert (size >= msg_size);
198
199   memcpy (buf, qmsg->msg, msg_size);
200   if (NULL != qmsg->idc)
201   {
202     qmsg->idc (qmsg->idc_cls, GNUNET_YES);
203   }
204   GNUNET_free (qmsg->msg);
205   GNUNET_free (qmsg);
206   /* FIXME: free the messages */
207
208   send_next (consensus);
209
210   return msg_size;
211 }
212
213
214 /**
215  * Schedule transmitting the next message.
216  *
217  * @param consensus consensus handle
218  */
219 static void
220 send_next (struct GNUNET_CONSENSUS_Handle *consensus)
221 {
222   if (NULL != consensus->th)
223     return;
224
225   if (NULL != consensus->messages_head)
226   {
227     consensus->th = 
228         GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
229                                              GNUNET_TIME_UNIT_FOREVER_REL,
230                                              GNUNET_NO, &transmit_queued, consensus);
231   }
232 }
233
234
235 /**
236  * Called when the server has sent is a new element
237  * 
238  * @param consensus consensus handle
239  * @param msg element message
240  */
241 static void
242 handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
243                    struct GNUNET_CONSENSUS_ElementMessage *msg)
244 {
245   struct GNUNET_CONSENSUS_Element element;
246
247   LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
248
249   element.type = msg->element_type;
250   element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
251   element.data = &msg[1];
252
253   consensus->new_element_cb (consensus->new_element_cls, &element);
254
255   send_next (consensus);
256 }
257
258
259 /**
260  * Called when the server has announced
261  * that the conclusion is over.
262  * 
263  * @param consensus consensus handle
264  * @param msg conclude done message
265  */
266 static void
267 handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
268                      const struct GNUNET_MessageHeader *msg)
269 {
270   GNUNET_assert (NULL != consensus->conclude_cb);
271   consensus->may_not_destroy = GNUNET_YES;
272   consensus->conclude_cb (consensus->conclude_cls);
273   consensus->may_not_destroy = GNUNET_NO;
274   consensus->conclude_cb = NULL;
275 }
276
277
278
279 /**
280  * Type of a function to call when we receive a message
281  * from the service.
282  *
283  * @param cls closure
284  * @param msg message received, NULL on timeout or fatal error
285  */
286 static void
287 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
288 {
289   struct GNUNET_CONSENSUS_Handle *consensus = cls;
290
291   LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
292
293   if (msg == NULL)
294   {
295     /* Error, timeout, death */
296     LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
297     GNUNET_CLIENT_disconnect (consensus->client);
298     consensus->client = NULL;
299     consensus->new_element_cb (consensus->new_element_cls, NULL);
300     return;
301   }
302
303   switch (ntohs (msg->type))
304   {
305     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
306       handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
307       break;
308     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
309       handle_conclude_done (consensus, msg);
310       break;
311     default:
312       GNUNET_break (0);
313   }
314   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
315                          GNUNET_TIME_UNIT_FOREVER_REL);
316 }
317
318 /**
319  * Function called to notify a client about the connection
320  * begin ready to queue more data.  "buf" will be
321  * NULL and "size" zero if the connection was closed for
322  * writing in the meantime.
323  *
324  * @param cls closure
325  * @param size number of bytes available in buf
326  * @param buf where the callee should write the message
327  * @return number of bytes written to buf
328  */
329 static size_t
330 transmit_join (void *cls, size_t size, void *buf)
331 {
332   struct GNUNET_CONSENSUS_JoinMessage *msg;
333   struct GNUNET_CONSENSUS_Handle *consensus;
334   int msize;
335
336   GNUNET_assert (NULL != buf);
337
338   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
339
340   consensus = cls;
341   consensus->th = NULL;
342   consensus->joined = 1;
343
344   msg = buf;
345
346   msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
347       consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
348
349   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
350   msg->header.size = htons (msize);
351   msg->session_id = consensus->session_id;
352   msg->num_peers = htonl (consensus->num_peers);
353   memcpy(&msg[1],
354          consensus->peers,
355          consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
356   send_next (consensus);
357   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
358                          GNUNET_TIME_UNIT_FOREVER_REL);
359   
360   return msize;
361 }
362
363 /**
364  * Create a consensus session.
365  *
366  * @param cfg configuration to use for connecting to the consensus service
367  * @param num_peers number of peers in the peers array
368  * @param peers array of peers participating in this consensus session
369  *              Inclusion of the local peer is optional.
370  * @param session_id session identifier
371  *                   Allows a group of peers to have more than consensus session.
372  * @param new_element_cb callback, called when a new element is added to the set by
373  *                    another peer
374  * @param new_element_cls closure for new_element
375  * @return handle to use, NULL on error
376  */
377 struct GNUNET_CONSENSUS_Handle *
378 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
379                          unsigned int num_peers,
380                          const struct GNUNET_PeerIdentity *peers,
381                          const struct GNUNET_HashCode *session_id,
382                          GNUNET_CONSENSUS_ElementCallback new_element_cb,
383                          void *new_element_cls)
384 {
385   struct GNUNET_CONSENSUS_Handle *consensus;
386   size_t join_message_size;
387
388   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
389   consensus->cfg = cfg;
390   consensus->new_element_cb = new_element_cb;
391   consensus->new_element_cls = new_element_cls;
392   consensus->num_peers = num_peers;
393   consensus->session_id = *session_id;
394
395   if (0 == num_peers)
396     consensus->peers = NULL;
397   else if (num_peers > 0)
398     consensus->peers =
399         GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
400
401   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
402
403   GNUNET_assert (consensus->client != NULL);
404
405   join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
406       (num_peers * sizeof (struct GNUNET_PeerIdentity));
407
408   consensus->th =
409       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
410                                            join_message_size,
411                                            GNUNET_TIME_UNIT_FOREVER_REL,
412                                            GNUNET_NO, &transmit_join, consensus);
413
414
415   GNUNET_assert (consensus->th != NULL);
416   return consensus;
417 }
418
419
420
421 /**
422  * Insert an element in the set being reconsiled.  Must not be called after
423  * "GNUNET_CONSENSUS_conclude".
424  *
425  * @param consensus handle for the consensus session
426  * @param element the element to be inserted
427  * @param idc function called when we are done with this element and it 
428  *            is thus allowed to call GNUNET_CONSENSUS_insert again
429  * @param idc_cls closure for 'idc'
430  */
431 void
432 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
433                          const struct GNUNET_CONSENSUS_Element *element,
434                          GNUNET_CONSENSUS_InsertDoneCallback idc,
435                          void *idc_cls)
436 {
437   struct QueuedMessage *qmsg;
438   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
439   size_t element_msg_size;
440
441   LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
442
443   element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
444                                element->size);
445
446   element_msg = GNUNET_malloc (element_msg_size);
447   element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
448   element_msg->header.size = htons (element_msg_size);
449   memcpy (&element_msg[1], element->data, element->size);
450
451   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
452   qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
453   qmsg->idc = idc;
454   qmsg->idc_cls = idc_cls;
455
456   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
457
458   send_next (consensus);
459 }
460
461
462 /**
463  * We are done with inserting new elements into the consensus;
464  * try to conclude the consensus within a given time window.
465  * After conclude has been called, no further elements may be
466  * inserted by the client.
467  *
468  * @param consensus consensus session
469  * @param timeout timeout after which the conculde callback
470  *                must be called
471  * @param conclude called when the conclusion was successful
472  * @param conclude_cls closure for the conclude callback
473  */
474 void
475 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
476                            struct GNUNET_TIME_Relative timeout,
477                            GNUNET_CONSENSUS_ConcludeCallback conclude,
478                            void *conclude_cls)
479 {
480   struct QueuedMessage *qmsg;
481   struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
482
483   GNUNET_assert (NULL != conclude);
484   GNUNET_assert (NULL == consensus->conclude_cb);
485
486   consensus->conclude_cls = conclude_cls;
487   consensus->conclude_cb = conclude;
488
489   conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
490   conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
491   conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
492   conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
493
494   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
495   qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
496
497   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
498
499   send_next (consensus);
500 }
501
502
503 /**
504  * Destroy a consensus handle (free all state associated with
505  * it, no longer call any of the callbacks).
506  *
507  * @param consensus handle to destroy
508  */
509 void
510 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
511 {
512   if (GNUNET_YES == consensus->may_not_destroy)
513   {
514     LOG (GNUNET_ERROR_TYPE_ERROR, "destroy may not be called right now\n");
515     GNUNET_assert (0);
516   }
517   if (consensus->client != NULL)
518   {
519     GNUNET_CLIENT_disconnect (consensus->client);
520     consensus->client = NULL;
521   }
522   if (NULL != consensus->peers)
523     GNUNET_free (consensus->peers);
524   GNUNET_free (consensus);
525 }
526