-fix uninitialized sendto
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35
36 /**
37  * Actions that can be queued.
38  */
39 struct QueuedMessage
40 {
41   /**
42    * Queued messages are stored in a doubly linked list.
43    */
44   struct QueuedMessage *next;
45
46   /**
47    * Queued messages are stored in a doubly linked list.
48    */
49   struct QueuedMessage *prev;
50
51   /**
52    * The actual queued message.
53    */
54   struct GNUNET_MessageHeader *msg;
55
56   /**
57    * Will be called after transmit, if not NULL
58    */
59   GNUNET_CONSENSUS_InsertDoneCallback idc;
60
61   /**
62    * Closure for idc
63    */
64   void *idc_cls;
65 };
66
67
68 /**
69  * Handle for the service.
70  */
71 struct GNUNET_CONSENSUS_Handle
72 {
73   /**
74    * Configuration to use.
75    */
76   const struct GNUNET_CONFIGURATION_Handle *cfg;
77
78   /**
79    * Client connected to the consensus service, may be NULL if not connected.
80    */
81   struct GNUNET_CLIENT_Connection *client;
82
83   /**
84    * Callback for new elements. Not called for elements added locally.
85    */
86   GNUNET_CONSENSUS_ElementCallback new_element_cb;
87
88   /**
89    * Closure for new_element_cb
90    */
91   void *new_element_cls;
92
93   /**
94    * The (local) session identifier for the consensus session.
95    */
96   struct GNUNET_HashCode session_id;
97
98   /**
99    * Number of peers in the consensus. Optionally includes the local peer.
100    */
101   int num_peers;
102
103   /**
104    * Peer identities of peers participating in the consensus, includes the local peer.
105    */
106   struct GNUNET_PeerIdentity **peers;
107
108   /**
109    * Currently active transmit request.
110    */
111   struct GNUNET_CLIENT_TransmitHandle *th;
112
113   /**
114    * GNUNES_YES iff the join message has been sent to the service.
115    */
116   int joined;
117
118   /**
119    * Closure for the insert done callback.
120    */
121   void *idc_cls;
122
123   /**
124    * Called when the conclude operation finishes or fails.
125    */
126   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
127
128   /**
129    * Closure for the conclude callback.
130    */
131   void *conclude_cls;
132
133   /**
134    * Deadline for the conclude operation.
135    */
136   struct GNUNET_TIME_Absolute conclude_deadline;
137
138   unsigned int conclude_min_size;
139
140   struct QueuedMessage *messages_head;
141   struct QueuedMessage *messages_tail;
142
143   /**
144    * GNUNET_YES when currently in a section where destroy may not be
145    * called.
146    */
147   int may_not_destroy;
148 };
149
150
151
152 /**
153  * Schedule transmitting the next message.
154  *
155  * @param consensus consensus handle
156  */
157 static void
158 send_next (struct GNUNET_CONSENSUS_Handle *consensus);
159
160
161 /**
162  * Function called to notify a client about the connection
163  * begin ready to queue more data.  "buf" will be
164  * NULL and "size" zero if the connection was closed for
165  * writing in the meantime.
166  *
167  * @param cls closure
168  * @param size number of bytes available in buf
169  * @param buf where the callee should write the message
170  * @return number of bytes written to buf
171  */
172 static size_t
173 transmit_queued (void *cls, size_t size,
174                  void *buf)
175 {
176   struct GNUNET_CONSENSUS_Handle *consensus;
177   struct QueuedMessage *qmsg;
178   size_t msg_size;
179
180   consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
181   consensus->th = NULL;
182
183   qmsg = consensus->messages_head;
184   GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
185
186   if (NULL == buf)
187   {
188     if (NULL != qmsg->idc)
189     {
190       qmsg->idc (qmsg->idc_cls, GNUNET_YES);
191     }
192     return 0;
193   }
194
195   msg_size = ntohs (qmsg->msg->size);
196
197   GNUNET_assert (size >= msg_size);
198
199   memcpy (buf, qmsg->msg, msg_size);
200   if (NULL != qmsg->idc)
201   {
202     qmsg->idc (qmsg->idc_cls, GNUNET_YES);
203   }
204
205   /* FIXME: free the messages */
206
207   send_next (consensus);
208
209   return msg_size;
210 }
211
212
213 /**
214  * Schedule transmitting the next message.
215  *
216  * @param consensus consensus handle
217  */
218 static void
219 send_next (struct GNUNET_CONSENSUS_Handle *consensus)
220 {
221   if (NULL != consensus->th)
222     return;
223
224   if (NULL != consensus->messages_head)
225   {
226     consensus->th = 
227         GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
228                                              GNUNET_TIME_UNIT_FOREVER_REL,
229                                              GNUNET_NO, &transmit_queued, consensus);
230   }
231 }
232
233 static void
234 queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg)
235 {
236   struct QueuedMessage *qm;
237   qm = GNUNET_malloc (sizeof *qm);
238   qm->msg = msg;
239   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm);
240 }
241
242
243 /**
244  * Called when the server has sent is a new element
245  * 
246  * @param consensus consensus handle
247  * @param msg element message
248  */
249 static void
250 handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
251                    struct GNUNET_CONSENSUS_ElementMessage *msg)
252 {
253   struct GNUNET_CONSENSUS_Element element;
254   struct GNUNET_CONSENSUS_AckMessage *ack_msg;
255   int ret;
256
257   LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
258
259   element.type = msg->element_type;
260   element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
261   element.data = &msg[1];
262
263   ret = consensus->new_element_cb (consensus->new_element_cls, &element);
264
265   ack_msg = GNUNET_malloc (sizeof *ack_msg);
266   ack_msg->header.size = htons (sizeof *ack_msg);
267   ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
268   ack_msg->keep = ret;
269
270   queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg);
271
272   send_next (consensus);
273 }
274
275
276 /**
277  * Called when the server has announced
278  * that the conclusion is over.
279  * 
280  * @param consensus consensus handle
281  * @param msg conclude done message
282  */
283 static void
284 handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
285                      struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
286 {
287   GNUNET_assert (NULL != consensus->conclude_cb);
288   consensus->may_not_destroy = GNUNET_YES;
289   consensus->conclude_cb (consensus->conclude_cls, NULL);
290   consensus->may_not_destroy = GNUNET_NO;
291   consensus->conclude_cb = NULL;
292 }
293
294
295
296 /**
297  * Type of a function to call when we receive a message
298  * from the service.
299  *
300  * @param cls closure
301  * @param msg message received, NULL on timeout or fatal error
302  */
303 static void
304 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
305 {
306   struct GNUNET_CONSENSUS_Handle *consensus = cls;
307
308   LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
309
310   if (msg == NULL)
311   {
312     /* Error, timeout, death */
313     LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
314     GNUNET_CLIENT_disconnect (consensus->client);
315     consensus->client = NULL;
316     consensus->new_element_cb (consensus->new_element_cls, NULL);
317     return;
318   }
319
320   switch (ntohs (msg->type))
321   {
322     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
323       handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
324       break;
325     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
326       handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
327       break;
328     default:
329       GNUNET_break (0);
330   }
331   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
332                          GNUNET_TIME_UNIT_FOREVER_REL);
333 }
334
335 /**
336  * Function called to notify a client about the connection
337  * begin ready to queue more data.  "buf" will be
338  * NULL and "size" zero if the connection was closed for
339  * writing in the meantime.
340  *
341  * @param cls closure
342  * @param size number of bytes available in buf
343  * @param buf where the callee should write the message
344  * @return number of bytes written to buf
345  */
346 static size_t
347 transmit_join (void *cls, size_t size, void *buf)
348 {
349   struct GNUNET_CONSENSUS_JoinMessage *msg;
350   struct GNUNET_CONSENSUS_Handle *consensus;
351   int msize;
352
353   GNUNET_assert (NULL != buf);
354
355   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
356
357   consensus = cls;
358   consensus->th = NULL;
359   consensus->joined = 1;
360
361   msg = buf;
362
363   msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
364       consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
365
366   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
367   msg->header.size = htons (msize);
368   msg->session_id = consensus->session_id;
369   msg->num_peers = htonl (consensus->num_peers);
370   memcpy(&msg[1],
371          consensus->peers,
372          consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
373   send_next (consensus);
374   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
375                          GNUNET_TIME_UNIT_FOREVER_REL);
376   
377   return msize;
378 }
379
380 /**
381  * Create a consensus session.
382  *
383  * @param cfg configuration to use for connecting to the consensus service
384  * @param num_peers number of peers in the peers array
385  * @param peers array of peers participating in this consensus session
386  *              Inclusion of the local peer is optional.
387  * @param session_id session identifier
388  *                   Allows a group of peers to have more than consensus session.
389  * @param new_element_cb callback, called when a new element is added to the set by
390  *                    another peer
391  * @param new_element_cls closure for new_element
392  * @return handle to use, NULL on error
393  */
394 struct GNUNET_CONSENSUS_Handle *
395 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
396                          unsigned int num_peers,
397                          const struct GNUNET_PeerIdentity *peers,
398                          const struct GNUNET_HashCode *session_id,
399                          GNUNET_CONSENSUS_ElementCallback new_element_cb,
400                          void *new_element_cls)
401 {
402   struct GNUNET_CONSENSUS_Handle *consensus;
403   size_t join_message_size;
404
405   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
406   consensus->cfg = cfg;
407   consensus->new_element_cb = new_element_cb;
408   consensus->new_element_cls = new_element_cls;
409   consensus->num_peers = num_peers;
410   consensus->session_id = *session_id;
411
412   if (0 == num_peers)
413     consensus->peers = NULL;
414   else if (num_peers > 0)
415     consensus->peers =
416         GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
417
418   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
419
420   GNUNET_assert (consensus->client != NULL);
421
422   join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
423       (num_peers * sizeof (struct GNUNET_PeerIdentity));
424
425   consensus->th =
426       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
427                                            join_message_size,
428                                            GNUNET_TIME_UNIT_FOREVER_REL,
429                                            GNUNET_NO, &transmit_join, consensus);
430
431
432   GNUNET_assert (consensus->th != NULL);
433   return consensus;
434 }
435
436
437
438 /**
439  * Insert an element in the set being reconsiled.  Must not be called after
440  * "GNUNET_CONSENSUS_conclude".
441  *
442  * @param consensus handle for the consensus session
443  * @param element the element to be inserted
444  * @param idc function called when we are done with this element and it 
445  *            is thus allowed to call GNUNET_CONSENSUS_insert again
446  * @param idc_cls closure for 'idc'
447  */
448 void
449 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
450                          const struct GNUNET_CONSENSUS_Element *element,
451                          GNUNET_CONSENSUS_InsertDoneCallback idc,
452                          void *idc_cls)
453 {
454   struct QueuedMessage *qmsg;
455   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
456   size_t element_msg_size;
457
458   LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
459
460   element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
461                                element->size);
462
463   element_msg = GNUNET_malloc (element_msg_size);
464   element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
465   element_msg->header.size = htons (element_msg_size);
466   memcpy (&element_msg[1], element->data, element->size);
467
468   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
469   qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
470   qmsg->idc = idc;
471   qmsg->idc_cls = idc_cls;
472
473   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
474
475   send_next (consensus);
476 }
477
478
479 /**
480  * We are done with inserting new elements into the consensus;
481  * try to conclude the consensus within a given time window.
482  * After conclude has been called, no further elements may be
483  * inserted by the client.
484  *
485  * @param consensus consensus session
486  * @param timeout timeout after which the conculde callback
487  *                must be called
488  * @param conclude called when the conclusion was successful
489  * @param conclude_cls closure for the conclude callback
490  */
491 void
492 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
493                            struct GNUNET_TIME_Relative timeout,
494                            unsigned int min_group_size_in_consensus,
495                            GNUNET_CONSENSUS_ConcludeCallback conclude,
496                            void *conclude_cls)
497 {
498   struct QueuedMessage *qmsg;
499   struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
500
501   GNUNET_assert (NULL != conclude);
502   GNUNET_assert (NULL == consensus->conclude_cb);
503
504   consensus->conclude_cls = conclude_cls;
505   consensus->conclude_cb = conclude;
506
507   conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
508   conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
509   conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
510   conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
511   conclude_msg->min_group_size = min_group_size_in_consensus;
512
513   qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
514   qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
515
516   GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
517
518   send_next (consensus);
519 }
520
521
522 /**
523  * Destroy a consensus handle (free all state associated with
524  * it, no longer call any of the callbacks).
525  *
526  * @param consensus handle to destroy
527  */
528 void
529 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
530 {
531   if (GNUNET_YES == consensus->may_not_destroy)
532   {
533     LOG (GNUNET_ERROR_TYPE_ERROR, "destroy may not be called right now\n");
534     GNUNET_assert (0);
535   }
536   if (consensus->client != NULL)
537   {
538     GNUNET_CLIENT_disconnect (consensus->client);
539     consensus->client = NULL;
540   }
541   if (NULL != consensus->peers)
542     GNUNET_free (consensus->peers);
543   GNUNET_free (consensus);
544 }
545