-simplify
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35
36 /**
37  * Actions that can be queued.
38  */
39 struct QueuedMessage
40 {
41   /**
42    * Queued messages are stored in a doubly linked list.
43    */
44   struct QueuedMessage *next;
45
46   /**
47    * Queued messages are stored in a doubly linked list.
48    */
49   struct QueuedMessage *prev;
50
51   /**
52    * The actual queued message.
53    */
54   struct GNUNET_MessageHeader *msg;
55
56   /**
57    * Will be called after transmit, if not NULL
58    */
59   GNUNET_CONSENSUS_InsertDoneCallback idc;
60
61   /**
62    * Closure for idc
63    */
64   void *idc_cls;
65 };
66
67
68 /**
69  * Handle for the service.
70  */
71 struct GNUNET_CONSENSUS_Handle
72 {
73   /**
74    * Configuration to use.
75    */
76   const struct GNUNET_CONFIGURATION_Handle *cfg;
77
78   /**
79    * Client connected to the consensus service, may be NULL if not connected.
80    */
81   struct GNUNET_CLIENT_Connection *client;
82
83   /**
84    * Callback for new elements. Not called for elements added locally.
85    */
86   GNUNET_CONSENSUS_ElementCallback new_element_cb;
87
88   /**
89    * Closure for new_element_cb
90    */
91   void *new_element_cls;
92
93   /**
94    * The (local) session identifier for the consensus session.
95    */
96   struct GNUNET_HashCode session_id;
97
98   /**
99    * Number of peers in the consensus. Optionally includes the local peer.
100    */
101   int num_peers;
102
103   /**
104    * Peer identities of peers participating in the consensus, includes the local peer.
105    */
106   struct GNUNET_PeerIdentity **peers;
107
108   /**
109    * Currently active transmit request.
110    */
111   struct GNUNET_CLIENT_TransmitHandle *th;
112
113   /**
114    * GNUNES_YES iff the join message has been sent to the service.
115    */
116   int joined;
117
118   /**
119    * Closure for the insert done callback.
120    */
121   void *idc_cls;
122
123   /**
124    * Called when the conclude operation finishes or fails.
125    */
126   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
127
128   /**
129    * Closure for the conclude callback.
130    */
131   void *conclude_cls;
132
133   /**
134    * Deadline for the conclude operation.
135    */
136   struct GNUNET_TIME_Absolute conclude_deadline;
137
138   unsigned int conclude_min_size;
139
140   struct QueuedMessage *messages_head;
141
142   struct QueuedMessage *messages_tail;
143
144 };
145
146
147
148 /**
149  * Schedule transmitting the next message.
150  *
151  * @param consensus consensus handle
152  */
153 static void
154 send_next (struct GNUNET_CONSENSUS_Handle *consensus);
155
156
157 /**
158  * Function called to notify a client about the connection
159  * begin ready to queue more data.  "buf" will be
160  * NULL and "size" zero if the connection was closed for
161  * writing in the meantime.
162  *
163  * @param cls closure
164  * @param size number of bytes available in buf
165  * @param buf where the callee should write the message
166  * @return number of bytes written to buf
167  */
168 static size_t
169 transmit_queued (void *cls, size_t size,
170                  void *buf)
171 {
172   struct GNUNET_CONSENSUS_Handle *consensus;
173   struct QueuedMessage *qmsg;
174   size_t msg_size;
175
176   consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
177   consensus->th = NULL;
178
179   qmsg = consensus->messages_head;
180   GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
181
182   if (NULL == buf)
183   {
184     if (NULL != qmsg->idc)
185     {
186       qmsg->idc (qmsg->idc_cls, GNUNET_YES);
187     }
188     return 0;
189   }
190
191   msg_size = ntohs (qmsg->msg->size);
192
193   GNUNET_assert (size >= msg_size);
194
195   memcpy (buf, qmsg->msg, msg_size);
196   if (NULL != qmsg->idc)
197   {
198     qmsg->idc (qmsg->idc_cls, GNUNET_YES);
199   }
200   GNUNET_free (qmsg->msg);
201   GNUNET_free (qmsg);
202   /* FIXME: free the messages */
203
204   send_next (consensus);
205
206   return msg_size;
207 }
208
209
210 /**
211  * Schedule transmitting the next message.
212  *
213  * @param consensus consensus handle
214  */
215 static void
216 send_next (struct GNUNET_CONSENSUS_Handle *consensus)
217 {
218   if (NULL != consensus->th)
219     return;
220
221   if (NULL != consensus->messages_head)
222   {
223     consensus->th = 
224         GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
225                                              GNUNET_TIME_UNIT_FOREVER_REL,
226                                              GNUNET_NO, &transmit_queued, consensus);
227   }
228 }
229
230
231 /**
232  * Called when the server has sent is a new element
233  * 
234  * @param consensus consensus handle
235  * @param msg element message
236  */
237 static void
238 handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
239                    struct GNUNET_CONSENSUS_ElementMessage *msg)
240 {
241   struct GNUNET_CONSENSUS_Element element;
242
243   LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
244
245   element.type = msg->element_type;
246   element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
247   element.data = &msg[1];
248
249   consensus->new_element_cb (consensus->new_element_cls, &element);
250
251   send_next (consensus);
252 }
253
254
255 /**
256  * Called when the server has announced
257  * that the conclusion is over.
258  * 
259  * @param consensus consensus handle
260  * @param msg conclude done message
261  */
262 static void
263 handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
264                       const struct GNUNET_MessageHeader *msg)
265 {
266   GNUNET_CONSENSUS_ConcludeCallback cc;
267
268   GNUNET_assert (NULL != (cc = consensus->conclude_cb));
269   consensus->conclude_cb = NULL;
270   cc (consensus->conclude_cls);
271 }
272
273
274 /**
275  * Type of a function to call when we receive a message
276  * from the service.
277  *
278  * @param cls closure
279  * @param msg message received, NULL on timeout or fatal error
280  */
281 static void
282 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
283 {
284   struct GNUNET_CONSENSUS_Handle *consensus = cls;
285
286   LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
287
288   if (NULL == msg)
289   {
290     /* Error, timeout, death */
291     LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
292     GNUNET_CLIENT_disconnect (consensus->client);
293     consensus->client = NULL;
294     consensus->new_element_cb (consensus->new_element_cls, NULL);
295     return;
296   }
297   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
298                          GNUNET_TIME_UNIT_FOREVER_REL);
299   switch (ntohs (msg->type))
300   {
301     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
302       handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
303       break;
304     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
305       handle_conclude_done (consensus, msg);
306       break;
307     default:
308       GNUNET_break (0);
309   }
310 }
311
312 /**
313  * Function called to notify a client about the connection
314  * begin ready to queue more data.  "buf" will be
315  * NULL and "size" zero if the connection was closed for
316  * writing in the meantime.
317  *
318  * @param cls closure
319  * @param size number of bytes available in buf
320  * @param buf where the callee should write the message
321  * @return number of bytes written to buf
322  */
323 static size_t
324 transmit_join (void *cls, size_t size, void *buf)
325 {
326   struct GNUNET_CONSENSUS_JoinMessage *msg;
327   struct GNUNET_CONSENSUS_Handle *consensus;
328   int msize;
329
330   GNUNET_assert (NULL != buf);
331
332   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
333
334   consensus = cls;
335   consensus->th = NULL;
336   consensus->joined = 1;
337
338   msg = buf;
339
340   msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
341       consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
342
343   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
344   msg->header.size = htons (msize);
345   msg->session_id = consensus->session_id;
346   msg->num_peers = htonl (consensus->num_peers);
347   memcpy(&msg[1],
348          consensus->peers,
349          consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
350   send_next (consensus);
351   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
352                          GNUNET_TIME_UNIT_FOREVER_REL);
353   
354   return msize;
355 }
356
357 /**
358  * Create a consensus session.
359  *
360  * @param cfg configuration to use for connecting to the consensus service
361  * @param num_peers number of peers in the peers array
362  * @param peers array of peers participating in this consensus session
363  *              Inclusion of the local peer is optional.
364  * @param session_id session identifier
365  *                   Allows a group of peers to have more than consensus session.
366  * @param new_element_cb callback, called when a new element is added to the set by
367  *                    another peer
368  * @param new_element_cls closure for new_element
369  * @return handle to use, NULL on error
370  */
371 struct GNUNET_CONSENSUS_Handle *
372 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
373                          unsigned int num_peers,
374                          const struct GNUNET_PeerIdentity *peers,
375                          const struct GNUNET_HashCode *session_id,
376                          GNUNET_CONSENSUS_ElementCallback new_element_cb,
377                          void *new_element_cls)
378 {
379   struct GNUNET_CONSENSUS_Handle *consensus;
380   size_t join_message_size;
381
382   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
383   consensus->cfg = cfg;
384   consensus->new_element_cb = new_element_cb;
385   consensus->new_element_cls = new_element_cls;
386   consensus->num_peers = num_peers;
387   consensus->session_id = *session_id;
388
389   if (0 == num_peers)
390     consensus->peers = NULL;
391   else if (num_peers > 0)
392     consensus->peers =
393         GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
394
395   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
396
397   GNUNET_assert (consensus->client != NULL);
398
399   join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
400       (num_peers * sizeof (struct GNUNET_PeerIdentity));
401
402   consensus->th =
403       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
404                                            join_message_size,
405                                            GNUNET_TIME_UNIT_FOREVER_REL,
406                                            GNUNET_NO, &transmit_join, consensus);
407
408
409   GNUNET_assert (consensus->th != NULL);
410   return consensus;
411 }
412
413
414
415 /**
416  * Insert an element in the set being reconsiled.  Must not be called after
417  * "GNUNET_CONSENSUS_conclude".
418  *
419  * @param consensus handle for the consensus session
420  * @param element the element to be inserted
421  * @param idc function called when we are done with this element and it 
422  *            is thus allowed to call GNUNET_CONSENSUS_insert again
423  * @param idc_cls closure for 'idc'
424  */
425 void
426 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
427                          const struct GNUNET_CONSENSUS_Element *element,
428                          GNUNET_CONSENSUS_InsertDoneCallback idc,
429                          void *idc_cls)
430 {
431   struct QueuedMessage *qmsg;
432   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
433   size_t element_msg_size;
434
435   LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
436
437   element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
438                                element->size);
439
440   element_msg = GNUNET_malloc (element_msg_size);
441   element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
442   element_msg->header.size = htons (element_msg_size);
443   memcpy (&element_msg[1], element->data, element->size);
444
445   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
446   qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
447   qmsg->idc = idc;
448   qmsg->idc_cls = idc_cls;
449
450   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
451
452   send_next (consensus);
453 }
454
455
456 /**
457  * We are done with inserting new elements into the consensus;
458  * try to conclude the consensus within a given time window.
459  * After conclude has been called, no further elements may be
460  * inserted by the client.
461  *
462  * @param consensus consensus session
463  * @param timeout timeout after which the conculde callback
464  *                must be called
465  * @param conclude called when the conclusion was successful
466  * @param conclude_cls closure for the conclude callback
467  */
468 void
469 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
470                            struct GNUNET_TIME_Relative timeout,
471                            GNUNET_CONSENSUS_ConcludeCallback conclude,
472                            void *conclude_cls)
473 {
474   struct QueuedMessage *qmsg;
475   struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
476
477   GNUNET_assert (NULL != conclude);
478   GNUNET_assert (NULL == consensus->conclude_cb);
479
480   consensus->conclude_cls = conclude_cls;
481   consensus->conclude_cb = conclude;
482
483   conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
484   conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
485   conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
486   conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
487
488   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
489   qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
490
491   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
492
493   send_next (consensus);
494 }
495
496
497 /**
498  * Destroy a consensus handle (free all state associated with
499  * it, no longer call any of the callbacks).
500  *
501  * @param consensus handle to destroy
502  */
503 void
504 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
505 {
506   if (consensus->client != NULL)
507   {
508     GNUNET_CLIENT_disconnect (consensus->client);
509     consensus->client = NULL;
510   }
511   if (NULL != consensus->peers)
512     GNUNET_free (consensus->peers);
513   GNUNET_free (consensus);
514 }
515