started implementing consensus api and service
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_protocols.h"
28 #include "gnunet_client_lib.h"
29 #include "gnunet_consensus_service.h"
30 #include "consensus.h"
31
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
34
35
36 /**
37  * Handle for the service.
38  */
39 struct GNUNET_CONSENSUS_Handle
40 {
41   /**
42    * Configuration to use.
43    */
44   const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46   /**
47    * Socket (if available).
48    */
49   struct GNUNET_CLIENT_Connection *client;
50
51   /**
52    * Callback for new elements. Not called for elements added locally.
53    */
54   GNUNET_CONSENSUS_NewElementCallback new_element_cb;
55
56   /**
57    * Closure for new_element_cb
58    */
59   void *new_element_cls;
60
61   /**
62    * Session identifier for the consensus session.
63    */
64   struct GNUNET_HashCode session_id;
65
66   /**
67    * Number of peers in the consensus. Optionally includes the local peer.
68    */
69   int num_peers;
70
71   /**
72    * Peer identities of peers in the consensus. Optionally includes the local peer.
73    */
74   struct GNUNET_PeerIdentity *peers;
75
76   /**
77    * Currently active transmit request.
78    */
79   struct GNUNET_CLIENT_TransmitHandle *th;
80
81   /**
82    * GNUNES_YES iff the join message has been sent to the service.
83    */
84   int joined;
85
86   /**
87    * Called when the current insertion operation finishes.
88    * NULL if there is no insert operation active.
89    */
90   GNUNET_CONSENSUS_InsertDoneCallback idc;
91
92   /**
93    * Closure for the insert done callback.
94    */
95   void *idc_cls;
96
97   /**
98    * An element that was requested to be inserted.
99    */
100   struct GNUNET_CONSENSUS_Element *insert_element;
101
102   /**
103    * Called when the conclude operation finishes or fails.
104    */
105   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
106
107   /**
108    * Closure for the conclude callback.
109    */
110   void *conclude_cls;
111
112   /**
113    * Deadline for the conclude operation.
114    */
115   struct GNUNET_TIME_Absolute conclude_deadline;
116 };
117
118
119 static void
120 handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
121                    struct GNUNET_CONSENSUS_ElementMessage *msg)
122 {
123   struct GNUNET_CONSENSUS_Element element;
124   element.type = msg->element_type;
125   element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
126   element.data = &msg[1];
127   consensus->new_element_cb(consensus->new_element_cls, &element);
128 }
129
130 static void
131 handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
132                      struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
133 {
134   GNUNET_assert (NULL != consensus->conclude_cb);
135   consensus->conclude_cb(consensus->conclude_cls,
136                          msg->num_peers,
137                          (struct GNUNET_PeerIdentity *) &msg[1]);
138   consensus->conclude_cb = NULL;
139 }
140
141
142
143 /**
144  * Type of a function to call when we receive a message
145  * from the service.
146  *
147  * @param cls closure
148  * @param msg message received, NULL on timeout or fatal error
149  */
150 static void
151 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
152 {
153   struct GNUNET_CONSENSUS_Handle *consensus = cls;
154   GNUNET_CONSENSUS_InsertDoneCallback idc;
155   void *idc_cls;
156
157   if (msg == NULL)
158   {
159     /* Error, timeout, death */
160     GNUNET_CLIENT_disconnect (consensus->client);
161     consensus->client = NULL;
162     consensus->new_element_cb(NULL, NULL);
163     if (NULL != consensus->idc)
164     {
165       consensus->idc(consensus->idc_cls, GNUNET_NO);
166       consensus->idc = NULL;
167       consensus->idc_cls = NULL;
168     }
169     return;
170   }
171
172   switch (ntohs(msg->type))
173   {
174     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT_ACK:
175       idc = consensus->idc;
176       consensus->idc = NULL;
177       idc_cls = consensus->idc_cls;
178       consensus->idc_cls = NULL;
179       idc(idc_cls, GNUNET_YES);
180       break;
181     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
182       handle_new_element(consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
183       break;
184     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
185       handle_conclude_done(consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
186       break;
187     default:
188       LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
189   }
190   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
191                          GNUNET_TIME_UNIT_FOREVER_REL);
192 }
193
194
195
196
197 /**
198  * Function called to notify a client about the connection
199  * begin ready to queue more data.  "buf" will be
200  * NULL and "size" zero if the connection was closed for
201  * writing in the meantime.
202  *
203  * @param cls closure
204  * @param size number of bytes available in buf
205  * @param buf where the callee should write the message
206  * @return number of bytes written to buf
207  */
208 static size_t
209 transmit_insert (void *cls, size_t size, void *buf)
210 {
211   struct GNUNET_CONSENSUS_ElementMessage *msg;
212   struct GNUNET_CONSENSUS_Handle *consensus;
213   int msize;
214
215   GNUNET_assert (NULL != buf);
216
217   consensus = cls;
218
219   GNUNET_assert (NULL != consensus->insert_element);
220
221   consensus->th = NULL;
222
223
224   msg = buf;
225
226   msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
227       consensus->insert_element->size;
228
229   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
230   msg->header.size = htons (msize);
231   memcpy(&msg[1],
232          consensus->insert_element->data,
233          consensus->insert_element->size);
234
235   return msize;
236 }
237
238
239 /**
240  * Function called to notify a client about the connection
241  * begin ready to queue more data.  "buf" will be
242  * NULL and "size" zero if the connection was closed for
243  * writing in the meantime.
244  *
245  * @param cls closure
246  * @param size number of bytes available in buf
247  * @param buf where the callee should write the message
248  * @return number of bytes written to buf
249  */
250 static size_t
251 transmit_join (void *cls, size_t size, void *buf)
252 {
253   struct GNUNET_CONSENSUS_JoinMessage *msg;
254   struct GNUNET_CONSENSUS_Handle *consensus;
255   int msize;
256
257   LOG(GNUNET_ERROR_TYPE_DEBUG, "transmitting CLIENT_JOIN to service\n");
258
259   GNUNET_assert (NULL != buf);
260
261   consensus = cls;
262   consensus->th = NULL;
263   consensus->joined = 1;
264
265   msg = buf;
266
267   msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
268       consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
269
270   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
271   msg->header.size = htons (msize);
272   msg->session_id = consensus->session_id;
273   msg->num_peers = htons (consensus->num_peers);
274   memcpy(&msg[1],
275          consensus->peers,
276          consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
277
278   if (consensus->insert_element != NULL)
279   {
280     consensus->th =
281         GNUNET_CLIENT_notify_transmit_ready (consensus->client,
282                                              msize,
283                                              GNUNET_TIME_UNIT_FOREVER_REL,
284                                              GNUNET_NO, &transmit_insert, consensus);
285   }
286
287
288   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
289                          GNUNET_TIME_UNIT_FOREVER_REL);
290   
291   return msize;
292 }
293
294
295 /**
296  * Function called to notify a client about the connection
297  * begin ready to queue more data.  "buf" will be
298  * NULL and "size" zero if the connection was closed for
299  * writing in the meantime.
300  *
301  * @param cls closure
302  * @param size number of bytes available in buf
303  * @param buf where the callee should write the message
304  * @return number of bytes written to buf
305  */
306 static size_t
307 transmit_conclude (void *cls, size_t size, void *buf)
308 {
309   struct GNUNET_CONSENSUS_ConcludeMessage *msg;
310   struct GNUNET_CONSENSUS_Handle *consensus;
311   int msize;
312
313   GNUNET_assert (NULL != buf);
314
315   consensus = cls;
316   consensus->th = NULL;
317
318   msg = buf;
319
320   msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
321
322   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
323   msg->header.size = htons (msize);
324   msg->timeout =
325       GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
326
327   return msize;
328 }
329
330
331
332
333 /**
334  * Create a consensus session.
335  *
336  * @param cfg
337  * @param num_peers
338  * @param peers array of peers participating in this consensus session
339  *              Inclusion of the local peer is optional.
340  * @param session_id session identifier
341  *                   Allows a group of peers to have more than consensus session.
342  * @param num_initial_elements number of entries in the 'initial_elements' array
343  * @param initial_elements our elements for the consensus (each of 'element_size'
344  * @param new_element callback, called when a new element is added to the set by
345  *                    another peer
346  * @param new_element_cls closure for new_element
347  * @return handle to use, NULL on error
348  */
349 struct GNUNET_CONSENSUS_Handle *
350 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
351                          unsigned int num_peers,
352                          const struct GNUNET_PeerIdentity *peers,
353                          const struct GNUNET_HashCode *session_id,
354                          /*
355                          unsigned int num_initial_elements,
356                          const struct GNUNET_CONSENSUS_Element **initial_elements,
357                          */
358                          GNUNET_CONSENSUS_NewElementCallback new_element,
359                          void *new_element_cls)
360 {
361   struct GNUNET_CONSENSUS_Handle *consensus;
362   size_t join_message_size;
363
364
365   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
366   consensus->cfg = cfg;
367   consensus->new_element_cb = new_element;
368   consensus->new_element_cls = new_element_cls;
369   consensus->num_peers = num_peers;
370   consensus->session_id = *session_id;
371
372
373
374   if (0 == num_peers)
375   {
376     consensus->peers = NULL;
377   }
378   else if (num_peers > 0)
379   {
380
381     consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
382   }
383   else
384   {
385     GNUNET_break (0);
386   }
387
388
389   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
390
391   GNUNET_assert (consensus->client != NULL);
392
393   join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
394       (num_peers * sizeof (struct GNUNET_PeerIdentity));
395
396   consensus->th =
397       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
398                                            join_message_size,
399                                            GNUNET_TIME_UNIT_FOREVER_REL,
400                                            GNUNET_NO, &transmit_join, consensus);
401
402
403   GNUNET_assert (consensus->th != NULL);
404
405   return consensus;
406 }
407
408
409
410 /**
411  * Insert an element in the set being reconsiled.  Must not be called after
412  * "GNUNET_CONSENSUS_conclude".
413  *
414  * @param consensus handle for the consensus session
415  * @param element the element to be inserted
416  * @param idc function called when we are done with this element and it 
417  *            is thus allowed to call GNUNET_CONSENSUS_insert again
418  * @param idc_cls closure for 'idc'
419  */
420 void
421 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
422                          const struct GNUNET_CONSENSUS_Element *element,
423                          GNUNET_CONSENSUS_InsertDoneCallback idc,
424                          void *idc_cls)
425 {
426
427   GNUNET_assert (NULL == consensus->idc);
428   GNUNET_assert (NULL == consensus->insert_element);
429
430   consensus->idc = idc;
431   consensus->idc_cls = idc_cls;
432   consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
433
434   if (consensus->joined == 0)
435   {
436     GNUNET_assert (NULL != consensus->th);
437     return;
438   }
439
440   GNUNET_assert (NULL == consensus->th);
441
442   consensus->th =
443       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
444                                            element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage),
445                                            GNUNET_TIME_UNIT_FOREVER_REL,
446                                            GNUNET_NO, &transmit_insert, consensus);
447 }
448
449
450 /**
451  * We are finished inserting new elements into the consensus;
452  * try to conclude the consensus within a given time window.
453  *
454  * @param consensus consensus session
455  * @param timeout timeout after which the conculde callback
456  *                must be called
457  * @param conclude called when the conclusion was successful
458  * @param conclude_cls closure for the conclude callback
459  */
460 void
461 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
462                            struct GNUNET_TIME_Relative timeout,
463                            GNUNET_CONSENSUS_ConcludeCallback conclude,
464                            void *conclude_cls)
465 {
466   GNUNET_assert (NULL == consensus->th);
467   GNUNET_assert (NULL == consensus->conclude_cb);
468
469   consensus->conclude_cls = conclude_cls;
470   consensus->conclude_cb = conclude;
471   consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
472
473   consensus->th =
474       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
475                                            sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
476                                            timeout,
477                                            GNUNET_NO, &transmit_conclude, consensus);
478   if (NULL == consensus->th)
479   {
480     conclude(conclude_cls, 0, NULL);
481   }
482 }
483
484
485 /**
486  * Destroy a consensus handle (free all state associated with
487  * it, no longer call any of the callbacks).
488  *
489  * @param consensus handle to destroy
490  */
491 void
492 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
493 {
494   if (consensus->client != NULL)
495   {
496     GNUNET_CLIENT_disconnect (consensus->client);
497     consensus->client = NULL;
498   }
499   GNUNET_free (consensus->peers);
500   GNUNET_free (consensus);
501 }
502