fix consensus to match new MQ API -- and correct CONCLUDE_DONE to be fixed-size
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35
36
37 /**
38  * Handle for the service.
39  */
40 struct GNUNET_CONSENSUS_Handle
41 {
42   /**
43    * Configuration to use.
44    */
45   const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47   /**
48    * Client connected to the consensus service, may be NULL if not connected.
49    */
50   struct GNUNET_CLIENT_Connection *client;
51
52   /**
53    * Callback for new elements. Not called for elements added locally.
54    */
55   GNUNET_CONSENSUS_ElementCallback new_element_cb;
56
57   /**
58    * Closure for new_element_cb
59    */
60   void *new_element_cls;
61
62   /**
63    * The (local) session identifier for the consensus session.
64    */
65   struct GNUNET_HashCode session_id;
66
67   /**
68    * GNUNES_YES iff the join message has been sent to the service.
69    */
70   int joined;
71
72   /**
73    * Called when the conclude operation finishes or fails.
74    */
75   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
76
77   /**
78    * Closure for the conclude callback.
79    */
80   void *conclude_cls;
81
82   /**
83    * Deadline for the conclude operation.
84    */
85   struct GNUNET_TIME_Absolute conclude_deadline;
86
87   /**
88    * Message queue for the client.
89    */
90   struct GNUNET_MQ_Handle *mq;
91 };
92
93
94 /**
95  * FIXME: this should not bee necessary when the API
96  * issue has been fixed
97  */
98 struct InsertDoneInfo
99 {
100   GNUNET_CONSENSUS_InsertDoneCallback idc;
101   void *cls;
102 };
103
104
105 /**
106  * Called when the server has sent is a new element
107  *
108  * @param cls consensus handle
109  * @param msg element message
110  */
111 static int
112 check_new_element (void *cls,
113                    const struct GNUNET_CONSENSUS_ElementMessage *msg)
114 {
115   /* any size is fine, elements are variable-size */
116   return GNUNET_OK;
117 }
118
119
120 /**
121  * Called when the server has sent is a new element
122  *
123  * @param cls consensus handle
124  * @param msg element message
125  */
126 static void
127 handle_new_element (void *cls,
128                     const struct GNUNET_CONSENSUS_ElementMessage *msg)
129 {
130   struct GNUNET_CONSENSUS_Handle *consensus = cls;
131   struct GNUNET_SET_Element element;
132
133   LOG (GNUNET_ERROR_TYPE_DEBUG,
134        "received new element\n");
135   element.element_type = msg->element_type;
136   element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
137   element.data = &msg[1];
138   consensus->new_element_cb (consensus->new_element_cls,
139                              &element);
140 }
141
142
143 /**
144  * Called when the server has announced
145  * that the conclusion is over.
146  *
147  * @param cls consensus handle
148  * @param msg conclude done message
149  */
150 static void
151 handle_conclude_done (void *cls,
152                       const struct GNUNET_MessageHeader *msg)
153 {
154   struct GNUNET_CONSENSUS_Handle *consensus = cls;
155   GNUNET_CONSENSUS_ConcludeCallback cc;
156
157   GNUNET_MQ_destroy (consensus->mq);
158   consensus->mq = NULL;
159   GNUNET_CLIENT_disconnect (consensus->client);
160   consensus->client = NULL;
161   GNUNET_assert (NULL != (cc = consensus->conclude_cb));
162   consensus->conclude_cb = NULL;
163   cc (consensus->conclude_cls);
164 }
165
166
167 /**
168  * Generic error handler, called with the appropriate
169  * error code and the same closure specified at the creation of
170  * the message queue.
171  * Not every message queue implementation supports an error handler.
172  *
173  * @param cls closure, same closure as for the message handlers
174  * @param error error code
175  */
176 static void
177 mq_error_handler (void *cls,
178                   enum GNUNET_MQ_Error error)
179 {
180   LOG (GNUNET_ERROR_TYPE_WARNING,
181        "consensus service disconnected us\n");
182 }
183
184
185 /**
186  * Create a consensus session.
187  *
188  * @param cfg configuration to use for connecting to the consensus service
189  * @param num_peers number of peers in the peers array
190  * @param peers array of peers participating in this consensus session
191  *              Inclusion of the local peer is optional.
192  * @param session_id session identifier
193  *                   Allows a group of peers to have more than consensus session.
194  * @param start start time of the consensus, conclude should be called before
195  *              the start time.
196  * @param deadline time when the consensus should have concluded
197  * @param new_element_cb callback, called when a new element is added to the set by
198  *                    another peer
199  * @param new_element_cls closure for new_element
200  * @return handle to use, NULL on error
201  */
202 struct GNUNET_CONSENSUS_Handle *
203 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
204                          unsigned int num_peers,
205                          const struct GNUNET_PeerIdentity *peers,
206                          const struct GNUNET_HashCode *session_id,
207                          struct GNUNET_TIME_Absolute start,
208                          struct GNUNET_TIME_Absolute deadline,
209                          GNUNET_CONSENSUS_ElementCallback new_element_cb,
210                          void *new_element_cls)
211 {
212   GNUNET_MQ_hd_var_size (new_element,
213                          GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT,
214                          struct GNUNET_CONSENSUS_ElementMessage);
215   GNUNET_MQ_hd_fixed_size (conclude_done,
216                            GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE,
217                            struct GNUNET_MessageHeader);
218   struct GNUNET_CONSENSUS_Handle *consensus
219     = GNUNET_new (struct GNUNET_CONSENSUS_Handle);
220   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
221     make_new_element_handler (consensus),
222     make_conclude_done_handler (consensus),
223     GNUNET_MQ_handler_end ()
224   };
225   struct GNUNET_CONSENSUS_JoinMessage *join_msg;
226   struct GNUNET_MQ_Envelope *ev;
227
228   consensus->cfg = cfg;
229   consensus->new_element_cb = new_element_cb;
230   consensus->new_element_cls = new_element_cls;
231   consensus->session_id = *session_id;
232   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
233   consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
234                                                          mq_handlers, mq_error_handler, consensus);
235
236   GNUNET_assert (consensus->client != NULL);
237
238   ev = GNUNET_MQ_msg_extra (join_msg,
239                             (num_peers * sizeof (struct GNUNET_PeerIdentity)),
240                             GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
241
242   join_msg->session_id = consensus->session_id;
243   join_msg->start = GNUNET_TIME_absolute_hton (start);
244   join_msg->deadline = GNUNET_TIME_absolute_hton (deadline);
245   join_msg->num_peers = htonl (num_peers);
246   memcpy(&join_msg[1],
247          peers,
248          num_peers * sizeof (struct GNUNET_PeerIdentity));
249
250   GNUNET_MQ_send (consensus->mq, ev);
251   return consensus;
252 }
253
254
255 static void
256 idc_adapter (void *cls)
257 {
258   struct InsertDoneInfo *i = cls;
259   i->idc (i->cls, GNUNET_OK);
260   GNUNET_free (i);
261 }
262
263 /**
264  * Insert an element in the set being reconsiled.  Must not be called after
265  * "GNUNET_CONSENSUS_conclude".
266  *
267  * @param consensus handle for the consensus session
268  * @param element the element to be inserted
269  * @param idc function called when we are done with this element and it
270  *            is thus allowed to call GNUNET_CONSENSUS_insert again
271  * @param idc_cls closure for 'idc'
272  */
273 void
274 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
275                          const struct GNUNET_SET_Element *element,
276                          GNUNET_CONSENSUS_InsertDoneCallback idc,
277                          void *idc_cls)
278 {
279   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
280   struct GNUNET_MQ_Envelope *ev;
281   struct InsertDoneInfo *i;
282
283   LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
284
285   ev = GNUNET_MQ_msg_extra (element_msg, element->size,
286                             GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
287
288   memcpy (&element_msg[1], element->data, element->size);
289
290   if (NULL != idc)
291   {
292     i = GNUNET_new (struct InsertDoneInfo);
293     i->idc = idc;
294     i->cls = idc_cls;
295     GNUNET_MQ_notify_sent (ev, idc_adapter, i);
296   }
297   GNUNET_MQ_send (consensus->mq, ev);
298 }
299
300
301 /**
302  * We are done with inserting new elements into the consensus;
303  * try to conclude the consensus within a given time window.
304  * After conclude has been called, no further elements may be
305  * inserted by the client.
306  *
307  * @param consensus consensus session
308  * @param deadline deadline after which the conculde callback
309  *                must be called
310  * @param conclude called when the conclusion was successful
311  * @param conclude_cls closure for the conclude callback
312  */
313 void
314 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
315                            GNUNET_CONSENSUS_ConcludeCallback conclude,
316                            void *conclude_cls)
317 {
318   struct GNUNET_MQ_Envelope *ev;
319
320   GNUNET_assert (NULL != conclude);
321   GNUNET_assert (NULL == consensus->conclude_cb);
322
323   consensus->conclude_cls = conclude_cls;
324   consensus->conclude_cb = conclude;
325
326   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
327   GNUNET_MQ_send (consensus->mq, ev);
328 }
329
330
331 /**
332  * Destroy a consensus handle (free all state associated with
333  * it, no longer call any of the callbacks).
334  *
335  * @param consensus handle to destroy
336  */
337 void
338 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
339 {
340   if (NULL != consensus->mq)
341   {
342     GNUNET_MQ_destroy (consensus->mq);
343     consensus->mq = NULL;
344   }
345   if (NULL != consensus->client)
346   {
347     GNUNET_CLIENT_disconnect (consensus->client);
348     consensus->client = NULL;
349   }
350   GNUNET_free (consensus);
351 }