fixed doxygen
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_protocols.h"
28 #include "gnunet_client_lib.h"
29 #include "gnunet_consensus_service.h"
30 #include "consensus.h"
31
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
34
35
36 /**
37  * Handle for the service.
38  */
39 struct GNUNET_CONSENSUS_Handle
40 {
41   /**
42    * Configuration to use.
43    */
44   const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46   /**
47    * Socket (if available).
48    */
49   struct GNUNET_CLIENT_Connection *client;
50
51   /**
52    * Callback for new elements. Not called for elements added locally.
53    */
54   GNUNET_CONSENSUS_NewElementCallback new_element_cb;
55
56   /**
57    * Closure for new_element_cb
58    */
59   void *new_element_cls;
60
61   /**
62    * Session identifier for the consensus session.
63    */
64   struct GNUNET_HashCode session_id;
65
66   /**
67    * Number of peers in the consensus. Optionally includes the local peer.
68    */
69   int num_peers;
70
71   /**
72    * Peer identities of peers in the consensus. Optionally includes the local peer.
73    */
74   struct GNUNET_PeerIdentity *peers;
75
76   /**
77    * Currently active transmit request.
78    */
79   struct GNUNET_CLIENT_TransmitHandle *th;
80
81   /**
82    * GNUNES_YES iff the join message has been sent to the service.
83    */
84   int joined;
85
86   /**
87    * Called when the current insertion operation finishes.
88    * NULL if there is no insert operation active.
89    */
90   GNUNET_CONSENSUS_InsertDoneCallback idc;
91
92   /**
93    * Closure for the insert done callback.
94    */
95   void *idc_cls;
96
97   /**
98    * An element that was requested to be inserted.
99    */
100   struct GNUNET_CONSENSUS_Element *insert_element;
101
102   /**
103    * Called when the conclude operation finishes or fails.
104    */
105   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
106
107   /**
108    * Closure for the conclude callback.
109    */
110   void *conclude_cls;
111
112   /**
113    * Deadline for the conclude operation.
114    */
115   struct GNUNET_TIME_Absolute conclude_deadline;
116 };
117
118
119 static void
120 handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
121                    struct GNUNET_CONSENSUS_ElementMessage *msg)
122 {
123   struct GNUNET_CONSENSUS_Element element;
124   element.type = msg->element_type;
125   element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
126   element.data = &msg[1];
127   consensus->new_element_cb(consensus->new_element_cls, &element);
128 }
129
130 static void
131 handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
132                      struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
133 {
134   GNUNET_assert (NULL != consensus->conclude_cb);
135   consensus->conclude_cb(consensus->conclude_cls,
136                          msg->num_peers,
137                          (struct GNUNET_PeerIdentity *) &msg[1]);
138   consensus->conclude_cb = NULL;
139 }
140
141
142
143 /**
144  * Type of a function to call when we receive a message
145  * from the service.
146  *
147  * @param cls closure
148  * @param msg message received, NULL on timeout or fatal error
149  */
150 static void
151 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
152 {
153   struct GNUNET_CONSENSUS_Handle *consensus = cls;
154
155   if (msg == NULL)
156   {
157     /* Error, timeout, death */
158     GNUNET_CLIENT_disconnect (consensus->client);
159     consensus->client = NULL;
160     consensus->new_element_cb(NULL, NULL);
161     if (NULL != consensus->idc)
162     {
163       consensus->idc(consensus->idc_cls, GNUNET_NO);
164       consensus->idc = NULL;
165       consensus->idc_cls = NULL;
166     }
167     return;
168   }
169
170   switch (ntohs(msg->type))
171   {
172     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
173       handle_new_element(consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
174       break;
175     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
176       handle_conclude_done(consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
177       break;
178     default:
179       LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
180   }
181   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
182                          GNUNET_TIME_UNIT_FOREVER_REL);
183 }
184
185
186
187
188 /**
189  * Function called to notify a client about the connection
190  * begin ready to queue more data.  "buf" will be
191  * NULL and "size" zero if the connection was closed for
192  * writing in the meantime.
193  *
194  * @param cls closure
195  * @param size number of bytes available in buf
196  * @param buf where the callee should write the message
197  * @return number of bytes written to buf
198  */
199 static size_t
200 transmit_insert (void *cls, size_t size, void *buf)
201 {
202   struct GNUNET_CONSENSUS_ElementMessage *msg;
203   struct GNUNET_CONSENSUS_Handle *consensus;
204   GNUNET_CONSENSUS_InsertDoneCallback idc;
205   int msize;
206   void *idc_cls;
207
208   GNUNET_assert (NULL != buf);
209
210   consensus = cls;
211
212   GNUNET_assert (NULL != consensus->insert_element);
213
214   consensus->th = NULL;
215
216   msg = buf;
217
218   msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
219       consensus->insert_element->size;
220
221   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
222   msg->header.size = htons (msize);
223   memcpy(&msg[1],
224          consensus->insert_element->data,
225          consensus->insert_element->size);
226
227
228   idc = consensus->idc;
229   consensus->idc = NULL;
230   idc_cls = consensus->idc_cls;
231   consensus->idc_cls = NULL;
232   idc(idc_cls, GNUNET_YES);
233
234   return msize;
235 }
236
237
238 /**
239  * Function called to notify a client about the connection
240  * begin ready to queue more data.  "buf" will be
241  * NULL and "size" zero if the connection was closed for
242  * writing in the meantime.
243  *
244  * @param cls closure
245  * @param size number of bytes available in buf
246  * @param buf where the callee should write the message
247  * @return number of bytes written to buf
248  */
249 static size_t
250 transmit_join (void *cls, size_t size, void *buf)
251 {
252   struct GNUNET_CONSENSUS_JoinMessage *msg;
253   struct GNUNET_CONSENSUS_Handle *consensus;
254   int msize;
255
256   LOG(GNUNET_ERROR_TYPE_DEBUG, "transmitting CLIENT_JOIN to service\n");
257
258   GNUNET_assert (NULL != buf);
259
260   consensus = cls;
261   consensus->th = NULL;
262   consensus->joined = 1;
263
264   msg = buf;
265
266   msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
267       consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
268
269   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
270   msg->header.size = htons (msize);
271   msg->session_id = consensus->session_id;
272   msg->num_peers = htons (consensus->num_peers);
273   memcpy(&msg[1],
274          consensus->peers,
275          consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
276
277   if (consensus->insert_element != NULL)
278   {
279     consensus->th =
280         GNUNET_CLIENT_notify_transmit_ready (consensus->client,
281                                              msize,
282                                              GNUNET_TIME_UNIT_FOREVER_REL,
283                                              GNUNET_NO, &transmit_insert, consensus);
284   }
285
286
287   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
288                          GNUNET_TIME_UNIT_FOREVER_REL);
289   
290   return msize;
291 }
292
293
294 /**
295  * Function called to notify a client about the connection
296  * begin ready to queue more data.  "buf" will be
297  * NULL and "size" zero if the connection was closed for
298  * writing in the meantime.
299  *
300  * @param cls closure
301  * @param size number of bytes available in buf
302  * @param buf where the callee should write the message
303  * @return number of bytes written to buf
304  */
305 static size_t
306 transmit_conclude (void *cls, size_t size, void *buf)
307 {
308   struct GNUNET_CONSENSUS_ConcludeMessage *msg;
309   struct GNUNET_CONSENSUS_Handle *consensus;
310   int msize;
311
312   GNUNET_assert (NULL != buf);
313
314   consensus = cls;
315   consensus->th = NULL;
316
317   msg = buf;
318
319   msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
320
321   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
322   msg->header.size = htons (msize);
323   msg->timeout =
324       GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
325
326   return msize;
327 }
328
329
330 /**
331  * Function called to notify a client about the connection
332  * begin ready to queue more data.  "buf" will be
333  * NULL and "size" zero if the connection was closed for
334  * writing in the meantime.
335  *
336  * @param cls the consensus handle
337  * @param size number of bytes available in buf
338  * @param buf where the callee should write the message
339  * @return number of bytes written to buf
340  */
341 static size_t
342 transmit_begin (void *cls, size_t size, void *buf)
343 {
344   struct GNUNET_MessageHeader *msg;
345   struct GNUNET_CONSENSUS_Handle *consensus;
346   int msize;
347
348   GNUNET_assert (NULL != buf);
349
350   consensus = cls;
351   consensus->th = NULL;
352
353   msg = buf;
354
355   msize = sizeof (struct GNUNET_MessageHeader);
356
357   msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
358   msg->size = htons (msize);
359
360   return msize;
361 }
362
363
364 /**
365  * Create a consensus session.
366  *
367  * @param cfg
368  * @param num_peers
369  * @param peers array of peers participating in this consensus session
370  *              Inclusion of the local peer is optional.
371  * @param session_id session identifier
372  *                   Allows a group of peers to have more than consensus session.
373  * @param new_element_cb callback, called when a new element is added to the set by
374  *                    another peer
375  * @param new_element_cls closure for new_element
376  * @return handle to use, NULL on error
377  */
378 struct GNUNET_CONSENSUS_Handle *
379 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
380                          unsigned int num_peers,
381                          const struct GNUNET_PeerIdentity *peers,
382                          const struct GNUNET_HashCode *session_id,
383                          GNUNET_CONSENSUS_NewElementCallback new_element_cb,
384                          void *new_element_cls)
385 {
386   struct GNUNET_CONSENSUS_Handle *consensus;
387   size_t join_message_size;
388
389
390   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
391   consensus->cfg = cfg;
392   consensus->new_element_cb = new_element_cb;
393   consensus->new_element_cls = new_element_cls;
394   consensus->num_peers = num_peers;
395   consensus->session_id = *session_id;
396
397
398
399   if (0 == num_peers)
400   {
401     consensus->peers = NULL;
402   }
403   else if (num_peers > 0)
404   {
405
406     consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
407   }
408   else
409   {
410     GNUNET_break (0);
411   }
412
413
414   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
415
416   GNUNET_assert (consensus->client != NULL);
417
418   join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
419       (num_peers * sizeof (struct GNUNET_PeerIdentity));
420
421   consensus->th =
422       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
423                                            join_message_size,
424                                            GNUNET_TIME_UNIT_FOREVER_REL,
425                                            GNUNET_NO, &transmit_join, consensus);
426
427
428   GNUNET_assert (consensus->th != NULL);
429
430   return consensus;
431 }
432
433
434
435 /**
436  * Insert an element in the set being reconsiled.  Must not be called after
437  * "GNUNET_CONSENSUS_conclude".
438  *
439  * @param consensus handle for the consensus session
440  * @param element the element to be inserted
441  * @param idc function called when we are done with this element and it 
442  *            is thus allowed to call GNUNET_CONSENSUS_insert again
443  * @param idc_cls closure for 'idc'
444  */
445 void
446 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
447                          const struct GNUNET_CONSENSUS_Element *element,
448                          GNUNET_CONSENSUS_InsertDoneCallback idc,
449                          void *idc_cls)
450 {
451
452   GNUNET_assert (NULL == consensus->idc);
453   GNUNET_assert (NULL == consensus->insert_element);
454
455   consensus->idc = idc;
456   consensus->idc_cls = idc_cls;
457   consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
458
459   if (consensus->joined == 0)
460   {
461     GNUNET_assert (NULL != consensus->th);
462     return;
463   }
464
465   GNUNET_assert (NULL == consensus->th);
466
467   consensus->th =
468       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
469                                            element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage),
470                                            GNUNET_TIME_UNIT_FOREVER_REL,
471                                            GNUNET_NO, &transmit_insert, consensus);
472 }
473
474
475 /**
476  * Begin reconciling elements with other peers.
477  *
478  * @param consensus handle for the consensus session
479  */
480 void
481 GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
482 {
483   GNUNET_assert (NULL == consensus->idc);
484   GNUNET_assert (NULL == consensus->insert_element);
485
486   consensus->th =
487       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
488                                            sizeof (struct GNUNET_MessageHeader),
489                                            GNUNET_TIME_UNIT_FOREVER_REL,
490                                            GNUNET_NO, &transmit_begin, consensus);
491 }
492
493
494 /**
495  * We are finished inserting new elements into the consensus;
496  * try to conclude the consensus within a given time window.
497  *
498  * @param consensus consensus session
499  * @param timeout timeout after which the conculde callback
500  *                must be called
501  * @param conclude called when the conclusion was successful
502  * @param conclude_cls closure for the conclude callback
503  */
504 void
505 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
506                            struct GNUNET_TIME_Relative timeout,
507                            GNUNET_CONSENSUS_ConcludeCallback conclude,
508                            void *conclude_cls)
509 {
510   GNUNET_assert (NULL == consensus->th);
511   GNUNET_assert (NULL == consensus->conclude_cb);
512
513   consensus->conclude_cls = conclude_cls;
514   consensus->conclude_cb = conclude;
515   consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
516
517   consensus->th =
518       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
519                                            sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
520                                            timeout,
521                                            GNUNET_NO, &transmit_conclude, consensus);
522   if (NULL == consensus->th)
523   {
524     conclude(conclude_cls, 0, NULL);
525   }
526 }
527
528
529 /**
530  * Destroy a consensus handle (free all state associated with
531  * it, no longer call any of the callbacks).
532  *
533  * @param consensus handle to destroy
534  */
535 void
536 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
537 {
538   if (consensus->client != NULL)
539   {
540     GNUNET_CLIENT_disconnect (consensus->client);
541     consensus->client = NULL;
542   }
543   GNUNET_free (consensus->peers);
544   GNUNET_free (consensus);
545 }
546