90b0fdf16ddbea86acd9c0ecd95cce309fcc272f
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_protocols.h"
28 #include "gnunet_client_lib.h"
29 #include "gnunet_consensus_service.h"
30 #include "consensus.h"
31
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
34
35
36 /**
37  * Handle for the service.
38  */
39 struct GNUNET_CONSENSUS_Handle
40 {
41   /**
42    * Configuration to use.
43    */
44   const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46   /**
47    * Socket (if available).
48    */
49   struct GNUNET_CLIENT_Connection *client;
50
51   /**
52    * Callback for new elements. Not called for elements added locally.
53    */
54   GNUNET_CONSENSUS_NewElementCallback new_element_cb;
55
56   /**
57    * Closure for new_element_cb
58    */
59   void *new_element_cls;
60
61   /**
62    * Session identifier for the consensus session.
63    */
64   struct GNUNET_HashCode session_id;
65
66   /**
67    * Number of peers in the consensus. Optionally includes the local peer.
68    */
69   int num_peers;
70
71   /**
72    * Peer identities of peers in the consensus. Optionally includes the local peer.
73    */
74   struct GNUNET_PeerIdentity *peers;
75
76   /**
77    * Currently active transmit request.
78    */
79   struct GNUNET_CLIENT_TransmitHandle *th;
80
81   /**
82    * GNUNES_YES iff the join message has been sent to the service.
83    */
84   int joined;
85
86   /**
87    * Called when the current insertion operation finishes.
88    * NULL if there is no insert operation active.
89    */
90   GNUNET_CONSENSUS_InsertDoneCallback idc;
91
92   /**
93    * Closure for the insert done callback.
94    */
95   void *idc_cls;
96
97   /**
98    * An element that was requested to be inserted.
99    */
100   struct GNUNET_CONSENSUS_Element *insert_element;
101
102   /**
103    * Called when the conclude operation finishes or fails.
104    */
105   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
106
107   /**
108    * Closure for the conclude callback.
109    */
110   void *conclude_cls;
111
112   /**
113    * Deadline for the conclude operation.
114    */
115   struct GNUNET_TIME_Absolute conclude_deadline;
116 };
117
118
119 static void
120 handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
121                    struct GNUNET_CONSENSUS_ElementMessage *msg)
122 {
123   struct GNUNET_CONSENSUS_Element element;
124   element.type = msg->element_type;
125   element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
126   element.data = &msg[1];
127   consensus->new_element_cb (consensus->new_element_cls, &element);
128 }
129
130 static void
131 handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
132                      struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
133 {
134   GNUNET_assert (NULL != consensus->conclude_cb);
135   consensus->conclude_cb(consensus->conclude_cls,
136                          msg->num_peers,
137                          (struct GNUNET_PeerIdentity *) &msg[1]);
138   consensus->conclude_cb = NULL;
139 }
140
141
142
143 /**
144  * Type of a function to call when we receive a message
145  * from the service.
146  *
147  * @param cls closure
148  * @param msg message received, NULL on timeout or fatal error
149  */
150 static void
151 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
152 {
153   struct GNUNET_CONSENSUS_Handle *consensus = cls;
154
155   LOG (GNUNET_ERROR_TYPE_INFO, "received message from consensus service\n");
156
157   if (msg == NULL)
158   {
159     /* Error, timeout, death */
160     LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
161     GNUNET_CLIENT_disconnect (consensus->client);
162     consensus->client = NULL;
163     consensus->new_element_cb (NULL, NULL);
164     if (NULL != consensus->idc)
165     {
166       consensus->idc(consensus->idc_cls, GNUNET_NO);
167       consensus->idc = NULL;
168       consensus->idc_cls = NULL;
169     }
170     return;
171   }
172
173   switch (ntohs(msg->type))
174   {
175     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
176       handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
177       break;
178     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
179       handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
180       break;
181     default:
182       LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
183   }
184   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
185                          GNUNET_TIME_UNIT_FOREVER_REL);
186 }
187
188
189
190
191 /**
192  * Function called to notify a client about the connection
193  * begin ready to queue more data.  "buf" will be
194  * NULL and "size" zero if the connection was closed for
195  * writing in the meantime.
196  *
197  * @param cls closure
198  * @param size number of bytes available in buf
199  * @param buf where the callee should write the message
200  * @return number of bytes written to buf
201  */
202 static size_t
203 transmit_insert (void *cls, size_t size, void *buf)
204 {
205   struct GNUNET_CONSENSUS_ElementMessage *msg;
206   struct GNUNET_CONSENSUS_Handle *consensus;
207   GNUNET_CONSENSUS_InsertDoneCallback idc;
208   int msize;
209   void *idc_cls;
210
211   GNUNET_assert (NULL != buf);
212
213   consensus = cls;
214
215   GNUNET_assert (NULL != consensus->insert_element);
216
217   consensus->th = NULL;
218
219   msg = buf;
220
221   msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
222       consensus->insert_element->size;
223
224   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
225   msg->header.size = htons (msize);
226   memcpy (&msg[1],
227           consensus->insert_element->data,
228           consensus->insert_element->size);
229
230
231   idc = consensus->idc;
232   consensus->idc = NULL;
233   idc_cls = consensus->idc_cls;
234   consensus->idc_cls = NULL;
235   idc (idc_cls, GNUNET_YES);
236
237   return msize;
238 }
239
240
241 /**
242  * Function called to notify a client about the connection
243  * begin ready to queue more data.  "buf" will be
244  * NULL and "size" zero if the connection was closed for
245  * writing in the meantime.
246  *
247  * @param cls closure
248  * @param size number of bytes available in buf
249  * @param buf where the callee should write the message
250  * @return number of bytes written to buf
251  */
252 static size_t
253 transmit_join (void *cls, size_t size, void *buf)
254 {
255   struct GNUNET_CONSENSUS_JoinMessage *msg;
256   struct GNUNET_CONSENSUS_Handle *consensus;
257   int msize;
258
259   GNUNET_assert (NULL != buf);
260
261   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
262
263   consensus = cls;
264   consensus->th = NULL;
265   consensus->joined = 1;
266
267   msg = buf;
268
269   msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
270       consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
271
272   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
273   msg->header.size = htons (msize);
274   msg->session_id = consensus->session_id;
275   msg->num_peers = htons (consensus->num_peers);
276   memcpy(&msg[1],
277          consensus->peers,
278          consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
279
280   if (consensus->insert_element != NULL)
281   {
282     consensus->th =
283         GNUNET_CLIENT_notify_transmit_ready (consensus->client,
284                                              msize,
285                                              GNUNET_TIME_UNIT_FOREVER_REL,
286                                              GNUNET_NO, &transmit_insert, consensus);
287   }
288
289   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
290                          GNUNET_TIME_UNIT_FOREVER_REL);
291   
292   return msize;
293 }
294
295
296 /**
297  * Function called to notify a client about the connection
298  * begin ready to queue more data.  "buf" will be
299  * NULL and "size" zero if the connection was closed for
300  * writing in the meantime.
301  *
302  * @param cls closure
303  * @param size number of bytes available in buf
304  * @param buf where the callee should write the message
305  * @return number of bytes written to buf
306  */
307 static size_t
308 transmit_conclude (void *cls, size_t size, void *buf)
309 {
310   struct GNUNET_CONSENSUS_ConcludeMessage *msg;
311   struct GNUNET_CONSENSUS_Handle *consensus;
312   int msize;
313
314   GNUNET_assert (NULL != buf);
315
316   consensus = cls;
317   consensus->th = NULL;
318
319   msg = buf;
320
321   msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
322
323   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
324   msg->header.size = htons (msize);
325   msg->timeout =
326       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
327
328   return msize;
329 }
330
331
332 /**
333  * Function called to notify a client about the connection
334  * begin ready to queue more data.  "buf" will be
335  * NULL and "size" zero if the connection was closed for
336  * writing in the meantime.
337  *
338  * @param cls the consensus handle
339  * @param size number of bytes available in buf
340  * @param buf where the callee should write the message
341  * @return number of bytes written to buf
342  */
343 static size_t
344 transmit_begin (void *cls, size_t size, void *buf)
345 {
346   struct GNUNET_MessageHeader *msg;
347   struct GNUNET_CONSENSUS_Handle *consensus;
348   int msize;
349
350   GNUNET_assert (NULL != buf);
351
352   consensus = cls;
353   consensus->th = NULL;
354
355   msg = buf;
356
357   msize = sizeof (struct GNUNET_MessageHeader);
358
359   msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
360   msg->size = htons (msize);
361
362   return msize;
363 }
364
365
366 /**
367  * Create a consensus session.
368  *
369  * @param cfg
370  * @param num_peers
371  * @param peers array of peers participating in this consensus session
372  *              Inclusion of the local peer is optional.
373  * @param session_id session identifier
374  *                   Allows a group of peers to have more than consensus session.
375  * @param new_element_cb callback, called when a new element is added to the set by
376  *                    another peer
377  * @param new_element_cls closure for new_element
378  * @return handle to use, NULL on error
379  */
380 struct GNUNET_CONSENSUS_Handle *
381 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
382                          unsigned int num_peers,
383                          const struct GNUNET_PeerIdentity *peers,
384                          const struct GNUNET_HashCode *session_id,
385                          GNUNET_CONSENSUS_NewElementCallback new_element_cb,
386                          void *new_element_cls)
387 {
388   struct GNUNET_CONSENSUS_Handle *consensus;
389   size_t join_message_size;
390
391   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
392   consensus->cfg = cfg;
393   consensus->new_element_cb = new_element_cb;
394   consensus->new_element_cls = new_element_cls;
395   consensus->num_peers = num_peers;
396   consensus->session_id = *session_id;
397
398   if (0 == num_peers)
399   {
400     consensus->peers = NULL;
401   }
402   else if (num_peers > 0)
403   {
404     consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
405   }
406   else
407   {
408     GNUNET_break (0);
409   }
410
411   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
412
413   GNUNET_assert (consensus->client != NULL);
414
415   join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
416       (num_peers * sizeof (struct GNUNET_PeerIdentity));
417
418   consensus->th =
419       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
420                                            join_message_size,
421                                            GNUNET_TIME_UNIT_FOREVER_REL,
422                                            GNUNET_NO, &transmit_join, consensus);
423
424   GNUNET_assert (consensus->th != NULL);
425
426   return consensus;
427 }
428
429
430
431 /**
432  * Insert an element in the set being reconsiled.  Must not be called after
433  * "GNUNET_CONSENSUS_conclude".
434  *
435  * @param consensus handle for the consensus session
436  * @param element the element to be inserted
437  * @param idc function called when we are done with this element and it 
438  *            is thus allowed to call GNUNET_CONSENSUS_insert again
439  * @param idc_cls closure for 'idc'
440  */
441 void
442 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
443                          const struct GNUNET_CONSENSUS_Element *element,
444                          GNUNET_CONSENSUS_InsertDoneCallback idc,
445                          void *idc_cls)
446 {
447
448   GNUNET_assert (NULL == consensus->idc);
449   GNUNET_assert (NULL == consensus->insert_element);
450
451   consensus->idc = idc;
452   consensus->idc_cls = idc_cls;
453   consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
454
455   if (consensus->joined == 0)
456   {
457     GNUNET_assert (NULL != consensus->th);
458     return;
459   }
460
461   GNUNET_assert (NULL == consensus->th);
462
463   consensus->th =
464       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
465                                            element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage),
466                                            GNUNET_TIME_UNIT_FOREVER_REL,
467                                            GNUNET_NO, &transmit_insert, consensus);
468 }
469
470
471 /**
472  * Begin reconciling elements with other peers.
473  *
474  * @param consensus handle for the consensus session
475  */
476 void
477 GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
478 {
479   GNUNET_assert (NULL == consensus->idc);
480   GNUNET_assert (NULL == consensus->insert_element);
481
482   consensus->th =
483       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
484                                            sizeof (struct GNUNET_MessageHeader),
485                                            GNUNET_TIME_UNIT_FOREVER_REL,
486                                            GNUNET_NO, &transmit_begin, consensus);
487 }
488
489
490 /**
491  * We are finished inserting new elements into the consensus;
492  * try to conclude the consensus within a given time window.
493  *
494  * @param consensus consensus session
495  * @param timeout timeout after which the conculde callback
496  *                must be called
497  * @param conclude called when the conclusion was successful
498  * @param conclude_cls closure for the conclude callback
499  */
500 void
501 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
502                            struct GNUNET_TIME_Relative timeout,
503                            GNUNET_CONSENSUS_ConcludeCallback conclude,
504                            void *conclude_cls)
505 {
506   GNUNET_assert (NULL == consensus->th);
507   GNUNET_assert (NULL == consensus->conclude_cb);
508
509   consensus->conclude_cls = conclude_cls;
510   consensus->conclude_cb = conclude;
511   consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
512
513   consensus->th =
514       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
515                                            sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
516                                            timeout,
517                                            GNUNET_NO, &transmit_conclude, consensus);
518   if (NULL == consensus->th)
519   {
520     conclude(conclude_cls, 0, NULL);
521   }
522 }
523
524
525 /**
526  * Destroy a consensus handle (free all state associated with
527  * it, no longer call any of the callbacks).
528  *
529  * @param consensus handle to destroy
530  */
531 void
532 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
533 {
534   if (consensus->client != NULL)
535   {
536     GNUNET_CLIENT_disconnect (consensus->client);
537     consensus->client = NULL;
538   }
539   GNUNET_free (consensus->peers);
540   GNUNET_free (consensus);
541 }
542