5c0494254c62cd27ff7d6c1363b623307111bf0f
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35
36 /**
37  * Actions that can be queued.
38  */
39 struct QueuedMessage
40 {
41   /**
42    * Queued messages are stored in a doubly linked list.
43    */
44   struct QueuedMessage *next;
45
46   /**
47    * Queued messages are stored in a doubly linked list.
48    */
49   struct QueuedMessage *prev;
50
51   /**
52    * The actual queued message.
53    */
54   struct GNUNET_MessageHeader *msg;
55
56   /**
57    * Will be called after transmit, if not NULL
58    */
59   GNUNET_CONSENSUS_InsertDoneCallback idc;
60
61   /**
62    * Closure for idc
63    */
64   void *idc_cls;
65 };
66
67
68 /**
69  * Handle for the service.
70  */
71 struct GNUNET_CONSENSUS_Handle
72 {
73   /**
74    * Configuration to use.
75    */
76   const struct GNUNET_CONFIGURATION_Handle *cfg;
77
78   /**
79    * Client connected to the consensus service, may be NULL if not connected.
80    */
81   struct GNUNET_CLIENT_Connection *client;
82
83   /**
84    * Callback for new elements. Not called for elements added locally.
85    */
86   GNUNET_CONSENSUS_ElementCallback new_element_cb;
87
88   /**
89    * Closure for new_element_cb
90    */
91   void *new_element_cls;
92
93   /**
94    * The (local) session identifier for the consensus session.
95    */
96   struct GNUNET_HashCode session_id;
97
98   /**
99    * Number of peers in the consensus. Optionally includes the local peer.
100    */
101   int num_peers;
102
103   /**
104    * Peer identities of peers participating in the consensus, includes the local peer.
105    */
106   struct GNUNET_PeerIdentity **peers;
107
108   /**
109    * Currently active transmit request.
110    */
111   struct GNUNET_CLIENT_TransmitHandle *th;
112
113   /**
114    * GNUNES_YES iff the join message has been sent to the service.
115    */
116   int joined;
117
118   /**
119    * Closure for the insert done callback.
120    */
121   void *idc_cls;
122
123   /**
124    * Called when the conclude operation finishes or fails.
125    */
126   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
127
128   /**
129    * Closure for the conclude callback.
130    */
131   void *conclude_cls;
132
133   /**
134    * Deadline for the conclude operation.
135    */
136   struct GNUNET_TIME_Absolute conclude_deadline;
137
138   unsigned int conclude_min_size;
139
140   struct QueuedMessage *messages_head;
141   struct QueuedMessage *messages_tail;
142 };
143
144
145
146 /**
147  * Schedule transmitting the next message.
148  *
149  * @param consensus consensus handle
150  */
151 static void
152 send_next (struct GNUNET_CONSENSUS_Handle *consensus);
153
154
155 /**
156  * Function called to notify a client about the connection
157  * begin ready to queue more data.  "buf" will be
158  * NULL and "size" zero if the connection was closed for
159  * writing in the meantime.
160  *
161  * @param cls closure
162  * @param size number of bytes available in buf
163  * @param buf where the callee should write the message
164  * @return number of bytes written to buf
165  */
166 static size_t
167 transmit_queued (void *cls, size_t size,
168                  void *buf)
169 {
170   struct GNUNET_CONSENSUS_Handle *consensus;
171   struct QueuedMessage *qmsg;
172   size_t msg_size;
173
174   consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
175   consensus->th = NULL;
176
177   qmsg = consensus->messages_head;
178   GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
179   GNUNET_assert (qmsg);
180
181   if (NULL == buf)
182   {
183     if (NULL != qmsg->idc)
184     {
185       qmsg->idc (qmsg->idc_cls, GNUNET_YES);
186     }
187     return 0;
188   }
189
190   msg_size = ntohs (qmsg->msg->size);
191
192   GNUNET_assert (size >= msg_size);
193
194   memcpy (buf, qmsg->msg, msg_size);
195   if (NULL != qmsg->idc)
196   {
197     qmsg->idc (qmsg->idc_cls, GNUNET_YES);
198   }
199   GNUNET_free (qmsg->msg);
200   GNUNET_free (qmsg);
201
202   send_next (consensus);
203
204   return msg_size;
205 }
206
207
208 /**
209  * Schedule transmitting the next message.
210  *
211  * @param consensus consensus handle
212  */
213 static void
214 send_next (struct GNUNET_CONSENSUS_Handle *consensus)
215 {
216   if (NULL != consensus->th)
217     return;
218
219   if (NULL != consensus->messages_head)
220   {
221     LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
222     consensus->th = 
223         GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
224                                              GNUNET_TIME_UNIT_FOREVER_REL,
225                                              GNUNET_NO, &transmit_queued, consensus);
226   }
227 }
228
229
230 /**
231  * Called when the server has sent is a new element
232  * 
233  * @param consensus consensus handle
234  * @param msg element message
235  */
236 static void
237 handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
238                    struct GNUNET_CONSENSUS_ElementMessage *msg)
239 {
240   struct GNUNET_CONSENSUS_Element element;
241   struct GNUNET_CONSENSUS_AckMessage *ack_msg;
242   struct QueuedMessage *queued_msg;
243   int ret;
244
245   element.type = msg->element_type;
246   element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
247   element.data = &msg[1];
248
249   ret = consensus->new_element_cb (consensus->new_element_cls, &element);
250
251   queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct GNUNET_CONSENSUS_AckMessage));
252   queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1];
253
254   ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg;
255   ack_msg->keep = ret;
256
257   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail,
258                                     queued_msg);
259 }
260
261
262 /**
263  * Called when the server has announced
264  * that the conclusion is over.
265  * 
266  * @param consensus consensus handle
267  * @param msg conclude done message
268  */
269 static void
270 handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
271                      struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
272 {
273   GNUNET_assert (NULL != consensus->conclude_cb);
274   consensus->conclude_cb (consensus->conclude_cls, NULL);
275   consensus->conclude_cb = NULL;
276 }
277
278
279
280 /**
281  * Type of a function to call when we receive a message
282  * from the service.
283  *
284  * @param cls closure
285  * @param msg message received, NULL on timeout or fatal error
286  */
287 static void
288 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
289 {
290   struct GNUNET_CONSENSUS_Handle *consensus = cls;
291
292   LOG (GNUNET_ERROR_TYPE_INFO, "received message from consensus service\n");
293
294   if (msg == NULL)
295   {
296     /* Error, timeout, death */
297     LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
298     GNUNET_CLIENT_disconnect (consensus->client);
299     consensus->client = NULL;
300     consensus->new_element_cb (NULL, NULL);
301     return;
302   }
303
304   switch (ntohs (msg->type))
305   {
306     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
307       handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
308       break;
309     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
310       handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
311       break;
312     default:
313       GNUNET_break (0);
314   }
315   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
316                          GNUNET_TIME_UNIT_FOREVER_REL);
317 }
318
319 /**
320  * Function called to notify a client about the connection
321  * begin ready to queue more data.  "buf" will be
322  * NULL and "size" zero if the connection was closed for
323  * writing in the meantime.
324  *
325  * @param cls closure
326  * @param size number of bytes available in buf
327  * @param buf where the callee should write the message
328  * @return number of bytes written to buf
329  */
330 static size_t
331 transmit_join (void *cls, size_t size, void *buf)
332 {
333   struct GNUNET_CONSENSUS_JoinMessage *msg;
334   struct GNUNET_CONSENSUS_Handle *consensus;
335   int msize;
336
337   GNUNET_assert (NULL != buf);
338
339   LOG (GNUNET_ERROR_TYPE_INFO, "transmitting join message\n");
340
341   consensus = cls;
342   consensus->th = NULL;
343   consensus->joined = 1;
344
345   msg = buf;
346
347   msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
348       consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
349
350   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
351   msg->header.size = htons (msize);
352   msg->session_id = consensus->session_id;
353   msg->num_peers = htons (consensus->num_peers);
354   if (0 != msg->num_peers)
355     memcpy(&msg[1],
356            consensus->peers,
357            consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
358
359   send_next (consensus);
360
361   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
362                          GNUNET_TIME_UNIT_FOREVER_REL);
363   
364   return msize;
365 }
366
367 /**
368  * Create a consensus session.
369  *
370  * @param cfg configuration to use for connecting to the consensus service
371  * @param num_peers number of peers in the peers array
372  * @param peers array of peers participating in this consensus session
373  *              Inclusion of the local peer is optional.
374  * @param session_id session identifier
375  *                   Allows a group of peers to have more than consensus session.
376  * @param new_element_cb callback, called when a new element is added to the set by
377  *                    another peer
378  * @param new_element_cls closure for new_element
379  * @return handle to use, NULL on error
380  */
381 struct GNUNET_CONSENSUS_Handle *
382 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
383                          unsigned int num_peers,
384                          const struct GNUNET_PeerIdentity *peers,
385                          const struct GNUNET_HashCode *session_id,
386                          GNUNET_CONSENSUS_ElementCallback new_element_cb,
387                          void *new_element_cls)
388 {
389   struct GNUNET_CONSENSUS_Handle *consensus;
390   size_t join_message_size;
391
392   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
393   consensus->cfg = cfg;
394   consensus->new_element_cb = new_element_cb;
395   consensus->new_element_cls = new_element_cls;
396   consensus->num_peers = num_peers;
397   consensus->session_id = *session_id;
398
399   if (0 == num_peers)
400     consensus->peers = NULL;
401   else if (num_peers > 0)
402     consensus->peers =
403         GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
404
405   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
406
407   GNUNET_assert (consensus->client != NULL);
408
409   join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
410       (num_peers * sizeof (struct GNUNET_PeerIdentity));
411
412   consensus->th =
413       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
414                                            join_message_size,
415                                            GNUNET_TIME_UNIT_FOREVER_REL,
416                                            GNUNET_NO, &transmit_join, consensus);
417
418
419   GNUNET_assert (consensus->th != NULL);
420   return consensus;
421 }
422
423
424
425 /**
426  * Insert an element in the set being reconsiled.  Must not be called after
427  * "GNUNET_CONSENSUS_conclude".
428  *
429  * @param consensus handle for the consensus session
430  * @param element the element to be inserted
431  * @param idc function called when we are done with this element and it 
432  *            is thus allowed to call GNUNET_CONSENSUS_insert again
433  * @param idc_cls closure for 'idc'
434  */
435 void
436 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
437                          const struct GNUNET_CONSENSUS_Element *element,
438                          GNUNET_CONSENSUS_InsertDoneCallback idc,
439                          void *idc_cls)
440 {
441   struct QueuedMessage *qmsg;
442   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
443   size_t element_msg_size;
444
445   LOG (GNUNET_ERROR_TYPE_INFO, "inserting, size=%llu\n", element->size);
446
447   element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
448                                element->size);
449
450   element_msg = GNUNET_malloc (element_msg_size);
451   element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
452   element_msg->header.size = htons (element_msg_size);
453   memcpy (&element_msg[1], element->data, element->size);
454
455   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
456   qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
457   qmsg->idc = idc;
458   qmsg->idc_cls = idc_cls;
459
460   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
461
462   send_next (consensus);
463 }
464
465
466 /**
467  * We are done with inserting new elements into the consensus;
468  * try to conclude the consensus within a given time window.
469  * After conclude has been called, no further elements may be
470  * inserted by the client.
471  *
472  * @param consensus consensus session
473  * @param timeout timeout after which the conculde callback
474  *                must be called
475  * @param conclude called when the conclusion was successful
476  * @param conclude_cls closure for the conclude callback
477  */
478 void
479 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
480                            struct GNUNET_TIME_Relative timeout,
481                            unsigned int min_group_size_in_consensus,
482                            GNUNET_CONSENSUS_ConcludeCallback conclude,
483                            void *conclude_cls)
484 {
485   struct QueuedMessage *qmsg;
486   struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
487
488   GNUNET_assert (NULL != conclude);
489   GNUNET_assert (NULL == consensus->conclude_cb);
490
491   consensus->conclude_cls = conclude_cls;
492   consensus->conclude_cb = conclude;
493
494   conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
495   conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
496   conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
497   conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
498   conclude_msg->min_group_size = min_group_size_in_consensus;
499
500   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
501   qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
502
503   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
504
505   send_next (consensus);
506 }
507
508
509 /**
510  * Destroy a consensus handle (free all state associated with
511  * it, no longer call any of the callbacks).
512  *
513  * @param consensus handle to destroy
514  */
515 void
516 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
517 {
518   if (consensus->client != NULL)
519   {
520     GNUNET_CLIENT_disconnect (consensus->client);
521     consensus->client = NULL;
522   }
523   if (NULL != consensus->peers)
524     GNUNET_free (consensus->peers);
525   GNUNET_free (consensus);
526 }
527