some fixes to the pt/vpn testcase.
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35
36 /**
37  * Actions that can be queued.
38  */
39 struct QueuedMessage
40 {
41   /**
42    * Queued messages are stored in a doubly linked list.
43    */
44   struct QueuedMessage *next;
45
46   /**
47    * Queued messages are stored in a doubly linked list.
48    */
49   struct QueuedMessage *prev;
50
51   /**
52    * The actual queued message.
53    */
54   struct GNUNET_MessageHeader *msg;
55
56   /**
57    * Will be called after transmit, if not NULL
58    */
59   GNUNET_CONSENSUS_InsertDoneCallback idc;
60
61   /**
62    * Closure for idc
63    */
64   void *idc_cls;
65 };
66
67
68 /**
69  * Handle for the service.
70  */
71 struct GNUNET_CONSENSUS_Handle
72 {
73   /**
74    * Configuration to use.
75    */
76   const struct GNUNET_CONFIGURATION_Handle *cfg;
77
78   /**
79    * Client connected to the consensus service, may be NULL if not connected.
80    */
81   struct GNUNET_CLIENT_Connection *client;
82
83   /**
84    * Callback for new elements. Not called for elements added locally.
85    */
86   GNUNET_CONSENSUS_ElementCallback new_element_cb;
87
88   /**
89    * Closure for new_element_cb
90    */
91   void *new_element_cls;
92
93   /**
94    * The (local) session identifier for the consensus session.
95    */
96   struct GNUNET_HashCode session_id;
97
98   /**
99    * Number of peers in the consensus. Optionally includes the local peer.
100    */
101   int num_peers;
102
103   /**
104    * Peer identities of peers participating in the consensus, includes the local peer.
105    */
106   struct GNUNET_PeerIdentity **peers;
107
108   /**
109    * Currently active transmit request.
110    */
111   struct GNUNET_CLIENT_TransmitHandle *th;
112
113   /**
114    * GNUNES_YES iff the join message has been sent to the service.
115    */
116   int joined;
117
118   /**
119    * Closure for the insert done callback.
120    */
121   void *idc_cls;
122
123   /**
124    * Called when the conclude operation finishes or fails.
125    */
126   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
127
128   /**
129    * Closure for the conclude callback.
130    */
131   void *conclude_cls;
132
133   /**
134    * Deadline for the conclude operation.
135    */
136   struct GNUNET_TIME_Absolute conclude_deadline;
137
138   unsigned int conclude_min_size;
139
140   struct QueuedMessage *messages_head;
141   struct QueuedMessage *messages_tail;
142
143   /**
144    * GNUNET_YES when currently in a section where destroy may not be
145    * called.
146    */
147   int may_not_destroy;
148 };
149
150
151
152 /**
153  * Schedule transmitting the next message.
154  *
155  * @param consensus consensus handle
156  */
157 static void
158 send_next (struct GNUNET_CONSENSUS_Handle *consensus);
159
160
161 /**
162  * Function called to notify a client about the connection
163  * begin ready to queue more data.  "buf" will be
164  * NULL and "size" zero if the connection was closed for
165  * writing in the meantime.
166  *
167  * @param cls closure
168  * @param size number of bytes available in buf
169  * @param buf where the callee should write the message
170  * @return number of bytes written to buf
171  */
172 static size_t
173 transmit_queued (void *cls, size_t size,
174                  void *buf)
175 {
176   struct GNUNET_CONSENSUS_Handle *consensus;
177   struct QueuedMessage *qmsg;
178   size_t msg_size;
179
180   consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
181   consensus->th = NULL;
182
183   qmsg = consensus->messages_head;
184   GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
185
186   if (NULL == buf)
187   {
188     if (NULL != qmsg->idc)
189     {
190       qmsg->idc (qmsg->idc_cls, GNUNET_YES);
191     }
192     return 0;
193   }
194
195   msg_size = ntohs (qmsg->msg->size);
196
197   GNUNET_assert (size >= msg_size);
198
199   memcpy (buf, qmsg->msg, msg_size);
200   if (NULL != qmsg->idc)
201   {
202     qmsg->idc (qmsg->idc_cls, GNUNET_YES);
203   }
204   GNUNET_free (qmsg->msg);
205   GNUNET_free (qmsg);
206   /* FIXME: free the messages */
207
208   send_next (consensus);
209
210   return msg_size;
211 }
212
213
214 /**
215  * Schedule transmitting the next message.
216  *
217  * @param consensus consensus handle
218  */
219 static void
220 send_next (struct GNUNET_CONSENSUS_Handle *consensus)
221 {
222   if (NULL != consensus->th)
223     return;
224
225   if (NULL != consensus->messages_head)
226   {
227     consensus->th = 
228         GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
229                                              GNUNET_TIME_UNIT_FOREVER_REL,
230                                              GNUNET_NO, &transmit_queued, consensus);
231   }
232 }
233
234 static void
235 queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg)
236 {
237   struct QueuedMessage *qm;
238   qm = GNUNET_malloc (sizeof *qm);
239   qm->msg = msg;
240   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm);
241 }
242
243
244 /**
245  * Called when the server has sent is a new element
246  * 
247  * @param consensus consensus handle
248  * @param msg element message
249  */
250 static void
251 handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
252                    struct GNUNET_CONSENSUS_ElementMessage *msg)
253 {
254   struct GNUNET_CONSENSUS_Element element;
255   struct GNUNET_CONSENSUS_AckMessage *ack_msg;
256   int ret;
257
258   LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
259
260   element.type = msg->element_type;
261   element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
262   element.data = &msg[1];
263
264   ret = consensus->new_element_cb (consensus->new_element_cls, &element);
265
266   ack_msg = GNUNET_new (struct GNUNET_CONSENSUS_AckMessage);
267   ack_msg->header.size = htons (sizeof *ack_msg);
268   ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
269   ack_msg->keep = ret;
270
271   queue_message (consensus, &ack_msg->header);
272
273   send_next (consensus);
274 }
275
276
277 /**
278  * Called when the server has announced
279  * that the conclusion is over.
280  * 
281  * @param consensus consensus handle
282  * @param msg conclude done message
283  */
284 static void
285 handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
286                      const struct GNUNET_MessageHeader *msg)
287 {
288   GNUNET_assert (NULL != consensus->conclude_cb);
289   consensus->may_not_destroy = GNUNET_YES;
290   consensus->conclude_cb (consensus->conclude_cls);
291   consensus->may_not_destroy = GNUNET_NO;
292   consensus->conclude_cb = NULL;
293 }
294
295
296
297 /**
298  * Type of a function to call when we receive a message
299  * from the service.
300  *
301  * @param cls closure
302  * @param msg message received, NULL on timeout or fatal error
303  */
304 static void
305 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
306 {
307   struct GNUNET_CONSENSUS_Handle *consensus = cls;
308
309   LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
310
311   if (msg == NULL)
312   {
313     /* Error, timeout, death */
314     LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
315     GNUNET_CLIENT_disconnect (consensus->client);
316     consensus->client = NULL;
317     consensus->new_element_cb (consensus->new_element_cls, NULL);
318     return;
319   }
320
321   switch (ntohs (msg->type))
322   {
323     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
324       handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
325       break;
326     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
327       handle_conclude_done (consensus, msg);
328       break;
329     default:
330       GNUNET_break (0);
331   }
332   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
333                          GNUNET_TIME_UNIT_FOREVER_REL);
334 }
335
336 /**
337  * Function called to notify a client about the connection
338  * begin ready to queue more data.  "buf" will be
339  * NULL and "size" zero if the connection was closed for
340  * writing in the meantime.
341  *
342  * @param cls closure
343  * @param size number of bytes available in buf
344  * @param buf where the callee should write the message
345  * @return number of bytes written to buf
346  */
347 static size_t
348 transmit_join (void *cls, size_t size, void *buf)
349 {
350   struct GNUNET_CONSENSUS_JoinMessage *msg;
351   struct GNUNET_CONSENSUS_Handle *consensus;
352   int msize;
353
354   GNUNET_assert (NULL != buf);
355
356   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
357
358   consensus = cls;
359   consensus->th = NULL;
360   consensus->joined = 1;
361
362   msg = buf;
363
364   msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
365       consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
366
367   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
368   msg->header.size = htons (msize);
369   msg->session_id = consensus->session_id;
370   msg->num_peers = htonl (consensus->num_peers);
371   memcpy(&msg[1],
372          consensus->peers,
373          consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
374   send_next (consensus);
375   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
376                          GNUNET_TIME_UNIT_FOREVER_REL);
377   
378   return msize;
379 }
380
381 /**
382  * Create a consensus session.
383  *
384  * @param cfg configuration to use for connecting to the consensus service
385  * @param num_peers number of peers in the peers array
386  * @param peers array of peers participating in this consensus session
387  *              Inclusion of the local peer is optional.
388  * @param session_id session identifier
389  *                   Allows a group of peers to have more than consensus session.
390  * @param new_element_cb callback, called when a new element is added to the set by
391  *                    another peer
392  * @param new_element_cls closure for new_element
393  * @return handle to use, NULL on error
394  */
395 struct GNUNET_CONSENSUS_Handle *
396 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
397                          unsigned int num_peers,
398                          const struct GNUNET_PeerIdentity *peers,
399                          const struct GNUNET_HashCode *session_id,
400                          GNUNET_CONSENSUS_ElementCallback new_element_cb,
401                          void *new_element_cls)
402 {
403   struct GNUNET_CONSENSUS_Handle *consensus;
404   size_t join_message_size;
405
406   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
407   consensus->cfg = cfg;
408   consensus->new_element_cb = new_element_cb;
409   consensus->new_element_cls = new_element_cls;
410   consensus->num_peers = num_peers;
411   consensus->session_id = *session_id;
412
413   if (0 == num_peers)
414     consensus->peers = NULL;
415   else if (num_peers > 0)
416     consensus->peers =
417         GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
418
419   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
420
421   GNUNET_assert (consensus->client != NULL);
422
423   join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
424       (num_peers * sizeof (struct GNUNET_PeerIdentity));
425
426   consensus->th =
427       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
428                                            join_message_size,
429                                            GNUNET_TIME_UNIT_FOREVER_REL,
430                                            GNUNET_NO, &transmit_join, consensus);
431
432
433   GNUNET_assert (consensus->th != NULL);
434   return consensus;
435 }
436
437
438
439 /**
440  * Insert an element in the set being reconsiled.  Must not be called after
441  * "GNUNET_CONSENSUS_conclude".
442  *
443  * @param consensus handle for the consensus session
444  * @param element the element to be inserted
445  * @param idc function called when we are done with this element and it 
446  *            is thus allowed to call GNUNET_CONSENSUS_insert again
447  * @param idc_cls closure for 'idc'
448  */
449 void
450 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
451                          const struct GNUNET_CONSENSUS_Element *element,
452                          GNUNET_CONSENSUS_InsertDoneCallback idc,
453                          void *idc_cls)
454 {
455   struct QueuedMessage *qmsg;
456   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
457   size_t element_msg_size;
458
459   LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
460
461   element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
462                                element->size);
463
464   element_msg = GNUNET_malloc (element_msg_size);
465   element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
466   element_msg->header.size = htons (element_msg_size);
467   memcpy (&element_msg[1], element->data, element->size);
468
469   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
470   qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
471   qmsg->idc = idc;
472   qmsg->idc_cls = idc_cls;
473
474   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
475
476   send_next (consensus);
477 }
478
479
480 /**
481  * We are done with inserting new elements into the consensus;
482  * try to conclude the consensus within a given time window.
483  * After conclude has been called, no further elements may be
484  * inserted by the client.
485  *
486  * @param consensus consensus session
487  * @param timeout timeout after which the conculde callback
488  *                must be called
489  * @param conclude called when the conclusion was successful
490  * @param conclude_cls closure for the conclude callback
491  */
492 void
493 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
494                            struct GNUNET_TIME_Relative timeout,
495                            GNUNET_CONSENSUS_ConcludeCallback conclude,
496                            void *conclude_cls)
497 {
498   struct QueuedMessage *qmsg;
499   struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
500
501   GNUNET_assert (NULL != conclude);
502   GNUNET_assert (NULL == consensus->conclude_cb);
503
504   consensus->conclude_cls = conclude_cls;
505   consensus->conclude_cb = conclude;
506
507   conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
508   conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
509   conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
510   conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
511
512   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
513   qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
514
515   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
516
517   send_next (consensus);
518 }
519
520
521 /**
522  * Destroy a consensus handle (free all state associated with
523  * it, no longer call any of the callbacks).
524  *
525  * @param consensus handle to destroy
526  */
527 void
528 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
529 {
530   if (GNUNET_YES == consensus->may_not_destroy)
531   {
532     LOG (GNUNET_ERROR_TYPE_ERROR, "destroy may not be called right now\n");
533     GNUNET_assert (0);
534   }
535   if (consensus->client != NULL)
536   {
537     GNUNET_CLIENT_disconnect (consensus->client);
538     consensus->client = NULL;
539   }
540   if (NULL != consensus->peers)
541     GNUNET_free (consensus->peers);
542   GNUNET_free (consensus);
543 }
544