-log messages
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35
36 /**
37  * Actions that can be queued.
38  */
39 struct QueuedMessage
40 {
41   /**
42    * Queued messages are stored in a doubly linked list.
43    */
44   struct QueuedMessage *next;
45
46   /**
47    * Queued messages are stored in a doubly linked list.
48    */
49   struct QueuedMessage *prev;
50
51   /**
52    * The actual queued message.
53    */
54   struct GNUNET_MessageHeader *msg;
55
56   /**
57    * Will be called after transmit, if not NULL
58    */
59   GNUNET_CONSENSUS_InsertDoneCallback idc;
60
61   /**
62    * Closure for idc
63    */
64   void *idc_cls;
65 };
66
67
68 /**
69  * Handle for the service.
70  */
71 struct GNUNET_CONSENSUS_Handle
72 {
73   /**
74    * Configuration to use.
75    */
76   const struct GNUNET_CONFIGURATION_Handle *cfg;
77
78   /**
79    * Client connected to the consensus service, may be NULL if not connected.
80    */
81   struct GNUNET_CLIENT_Connection *client;
82
83   /**
84    * Callback for new elements. Not called for elements added locally.
85    */
86   GNUNET_CONSENSUS_ElementCallback new_element_cb;
87
88   /**
89    * Closure for new_element_cb
90    */
91   void *new_element_cls;
92
93   /**
94    * The (local) session identifier for the consensus session.
95    */
96   struct GNUNET_HashCode session_id;
97
98   /**
99    * Number of peers in the consensus. Optionally includes the local peer.
100    */
101   int num_peers;
102
103   /**
104    * Peer identities of peers participating in the consensus, includes the local peer.
105    */
106   struct GNUNET_PeerIdentity **peers;
107
108   /**
109    * Currently active transmit request.
110    */
111   struct GNUNET_CLIENT_TransmitHandle *th;
112
113   /**
114    * GNUNES_YES iff the join message has been sent to the service.
115    */
116   int joined;
117
118   /**
119    * Closure for the insert done callback.
120    */
121   void *idc_cls;
122
123   /**
124    * Called when the conclude operation finishes or fails.
125    */
126   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
127
128   /**
129    * Closure for the conclude callback.
130    */
131   void *conclude_cls;
132
133   /**
134    * Deadline for the conclude operation.
135    */
136   struct GNUNET_TIME_Absolute conclude_deadline;
137
138   unsigned int conclude_min_size;
139
140   struct QueuedMessage *messages_head;
141   struct QueuedMessage *messages_tail;
142 };
143
144
145
146 /**
147  * Schedule transmitting the next message.
148  *
149  * @param consensus consensus handle
150  */
151 static void
152 send_next (struct GNUNET_CONSENSUS_Handle *consensus);
153
154
155 /**
156  * Function called to notify a client about the connection
157  * begin ready to queue more data.  "buf" will be
158  * NULL and "size" zero if the connection was closed for
159  * writing in the meantime.
160  *
161  * @param cls closure
162  * @param size number of bytes available in buf
163  * @param buf where the callee should write the message
164  * @return number of bytes written to buf
165  */
166 static size_t
167 transmit_queued (void *cls, size_t size,
168                  void *buf)
169 {
170   struct GNUNET_CONSENSUS_Handle *consensus;
171   struct QueuedMessage *qmsg;
172   size_t msg_size;
173
174   consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
175   consensus->th = NULL;
176
177   qmsg = consensus->messages_head;
178   GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
179
180   if (NULL == buf)
181   {
182     if (NULL != qmsg->idc)
183     {
184       qmsg->idc (qmsg->idc_cls, GNUNET_YES);
185     }
186     return 0;
187   }
188
189   msg_size = ntohs (qmsg->msg->size);
190
191   GNUNET_assert (size >= msg_size);
192
193   memcpy (buf, qmsg->msg, msg_size);
194   if (NULL != qmsg->idc)
195   {
196     qmsg->idc (qmsg->idc_cls, GNUNET_YES);
197   }
198
199   /* FIXME: free the messages */
200
201   send_next (consensus);
202
203   return msg_size;
204 }
205
206
207 /**
208  * Schedule transmitting the next message.
209  *
210  * @param consensus consensus handle
211  */
212 static void
213 send_next (struct GNUNET_CONSENSUS_Handle *consensus)
214 {
215   if (NULL != consensus->th)
216     return;
217
218   if (NULL != consensus->messages_head)
219   {
220     consensus->th = 
221         GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
222                                              GNUNET_TIME_UNIT_FOREVER_REL,
223                                              GNUNET_NO, &transmit_queued, consensus);
224   }
225 }
226
227 static void
228 queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg)
229 {
230   struct QueuedMessage *qm;
231   qm = GNUNET_malloc (sizeof *qm);
232   qm->msg = msg;
233   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm);
234 }
235
236
237 /**
238  * Called when the server has sent is a new element
239  * 
240  * @param consensus consensus handle
241  * @param msg element message
242  */
243 static void
244 handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
245                    struct GNUNET_CONSENSUS_ElementMessage *msg)
246 {
247   struct GNUNET_CONSENSUS_Element element;
248   struct GNUNET_CONSENSUS_AckMessage *ack_msg;
249   int ret;
250
251   LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
252
253   element.type = msg->element_type;
254   element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
255   element.data = &msg[1];
256
257   ret = consensus->new_element_cb (consensus->new_element_cls, &element);
258
259   ack_msg = GNUNET_malloc (sizeof *ack_msg);
260   ack_msg->header.size = htons (sizeof *ack_msg);
261   ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
262   ack_msg->keep = ret;
263
264   queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg);
265
266   send_next (consensus);
267 }
268
269
270 /**
271  * Called when the server has announced
272  * that the conclusion is over.
273  * 
274  * @param consensus consensus handle
275  * @param msg conclude done message
276  */
277 static void
278 handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
279                      struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
280 {
281   GNUNET_assert (NULL != consensus->conclude_cb);
282   consensus->conclude_cb (consensus->conclude_cls, NULL);
283   consensus->conclude_cb = NULL;
284 }
285
286
287
288 /**
289  * Type of a function to call when we receive a message
290  * from the service.
291  *
292  * @param cls closure
293  * @param msg message received, NULL on timeout or fatal error
294  */
295 static void
296 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
297 {
298   struct GNUNET_CONSENSUS_Handle *consensus = cls;
299
300   LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
301
302   if (msg == NULL)
303   {
304     /* Error, timeout, death */
305     LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
306     GNUNET_CLIENT_disconnect (consensus->client);
307     consensus->client = NULL;
308     consensus->new_element_cb (NULL, NULL);
309     return;
310   }
311
312   switch (ntohs (msg->type))
313   {
314     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
315       handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
316       break;
317     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
318       handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
319       break;
320     default:
321       GNUNET_break (0);
322   }
323   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
324                          GNUNET_TIME_UNIT_FOREVER_REL);
325 }
326
327 /**
328  * Function called to notify a client about the connection
329  * begin ready to queue more data.  "buf" will be
330  * NULL and "size" zero if the connection was closed for
331  * writing in the meantime.
332  *
333  * @param cls closure
334  * @param size number of bytes available in buf
335  * @param buf where the callee should write the message
336  * @return number of bytes written to buf
337  */
338 static size_t
339 transmit_join (void *cls, size_t size, void *buf)
340 {
341   struct GNUNET_CONSENSUS_JoinMessage *msg;
342   struct GNUNET_CONSENSUS_Handle *consensus;
343   int msize;
344
345   GNUNET_assert (NULL != buf);
346
347   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
348
349   consensus = cls;
350   consensus->th = NULL;
351   consensus->joined = 1;
352
353   msg = buf;
354
355   msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
356       consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
357
358   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
359   msg->header.size = htons (msize);
360   msg->session_id = consensus->session_id;
361   msg->num_peers = htons (consensus->num_peers);
362   if (0 != msg->num_peers)
363     memcpy(&msg[1],
364            consensus->peers,
365            consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
366
367   send_next (consensus);
368
369   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
370                          GNUNET_TIME_UNIT_FOREVER_REL);
371   
372   return msize;
373 }
374
375 /**
376  * Create a consensus session.
377  *
378  * @param cfg configuration to use for connecting to the consensus service
379  * @param num_peers number of peers in the peers array
380  * @param peers array of peers participating in this consensus session
381  *              Inclusion of the local peer is optional.
382  * @param session_id session identifier
383  *                   Allows a group of peers to have more than consensus session.
384  * @param new_element_cb callback, called when a new element is added to the set by
385  *                    another peer
386  * @param new_element_cls closure for new_element
387  * @return handle to use, NULL on error
388  */
389 struct GNUNET_CONSENSUS_Handle *
390 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
391                          unsigned int num_peers,
392                          const struct GNUNET_PeerIdentity *peers,
393                          const struct GNUNET_HashCode *session_id,
394                          GNUNET_CONSENSUS_ElementCallback new_element_cb,
395                          void *new_element_cls)
396 {
397   struct GNUNET_CONSENSUS_Handle *consensus;
398   size_t join_message_size;
399
400   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
401   consensus->cfg = cfg;
402   consensus->new_element_cb = new_element_cb;
403   consensus->new_element_cls = new_element_cls;
404   consensus->num_peers = num_peers;
405   consensus->session_id = *session_id;
406
407   if (0 == num_peers)
408     consensus->peers = NULL;
409   else if (num_peers > 0)
410     consensus->peers =
411         GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
412
413   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
414
415   GNUNET_assert (consensus->client != NULL);
416
417   join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
418       (num_peers * sizeof (struct GNUNET_PeerIdentity));
419
420   consensus->th =
421       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
422                                            join_message_size,
423                                            GNUNET_TIME_UNIT_FOREVER_REL,
424                                            GNUNET_NO, &transmit_join, consensus);
425
426
427   GNUNET_assert (consensus->th != NULL);
428   return consensus;
429 }
430
431
432
433 /**
434  * Insert an element in the set being reconsiled.  Must not be called after
435  * "GNUNET_CONSENSUS_conclude".
436  *
437  * @param consensus handle for the consensus session
438  * @param element the element to be inserted
439  * @param idc function called when we are done with this element and it 
440  *            is thus allowed to call GNUNET_CONSENSUS_insert again
441  * @param idc_cls closure for 'idc'
442  */
443 void
444 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
445                          const struct GNUNET_CONSENSUS_Element *element,
446                          GNUNET_CONSENSUS_InsertDoneCallback idc,
447                          void *idc_cls)
448 {
449   struct QueuedMessage *qmsg;
450   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
451   size_t element_msg_size;
452
453   LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
454
455   element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
456                                element->size);
457
458   element_msg = GNUNET_malloc (element_msg_size);
459   element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
460   element_msg->header.size = htons (element_msg_size);
461   memcpy (&element_msg[1], element->data, element->size);
462
463   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
464   qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
465   qmsg->idc = idc;
466   qmsg->idc_cls = idc_cls;
467
468   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
469
470   send_next (consensus);
471 }
472
473
474 /**
475  * We are done with inserting new elements into the consensus;
476  * try to conclude the consensus within a given time window.
477  * After conclude has been called, no further elements may be
478  * inserted by the client.
479  *
480  * @param consensus consensus session
481  * @param timeout timeout after which the conculde callback
482  *                must be called
483  * @param conclude called when the conclusion was successful
484  * @param conclude_cls closure for the conclude callback
485  */
486 void
487 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
488                            struct GNUNET_TIME_Relative timeout,
489                            unsigned int min_group_size_in_consensus,
490                            GNUNET_CONSENSUS_ConcludeCallback conclude,
491                            void *conclude_cls)
492 {
493   struct QueuedMessage *qmsg;
494   struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
495
496   GNUNET_assert (NULL != conclude);
497   GNUNET_assert (NULL == consensus->conclude_cb);
498
499   consensus->conclude_cls = conclude_cls;
500   consensus->conclude_cb = conclude;
501
502   conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
503   conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
504   conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
505   conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
506   conclude_msg->min_group_size = min_group_size_in_consensus;
507
508   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
509   qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
510
511   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
512
513   send_next (consensus);
514 }
515
516
517 /**
518  * Destroy a consensus handle (free all state associated with
519  * it, no longer call any of the callbacks).
520  *
521  * @param consensus handle to destroy
522  */
523 void
524 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
525 {
526   if (consensus->client != NULL)
527   {
528     GNUNET_CLIENT_disconnect (consensus->client);
529     consensus->client = NULL;
530   }
531   if (NULL != consensus->peers)
532     GNUNET_free (consensus->peers);
533   GNUNET_free (consensus);
534 }
535