use new connecT API
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35
36
37 /**
38  * Handle for the service.
39  */
40 struct GNUNET_CONSENSUS_Handle
41 {
42   /**
43    * Configuration to use.
44    */
45   const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47   /**
48    * Callback for new elements. Not called for elements added locally.
49    */
50   GNUNET_CONSENSUS_ElementCallback new_element_cb;
51
52   /**
53    * Closure for @e new_element_cb
54    */
55   void *new_element_cls;
56
57   /**
58    * The (local) session identifier for the consensus session.
59    */
60   struct GNUNET_HashCode session_id;
61
62   /**
63    * #GNUNET_YES iff the join message has been sent to the service.
64    */
65   int joined;
66
67   /**
68    * Called when the conclude operation finishes or fails.
69    */
70   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
71
72   /**
73    * Closure for the @e conclude_cb callback.
74    */
75   void *conclude_cls;
76
77   /**
78    * Deadline for the conclude operation.
79    */
80   struct GNUNET_TIME_Absolute conclude_deadline;
81
82   /**
83    * Message queue for the client.
84    */
85   struct GNUNET_MQ_Handle *mq;
86 };
87
88
89 /**
90  * FIXME: this should not bee necessary when the API
91  * issue has been fixed
92  */
93 struct InsertDoneInfo
94 {
95   GNUNET_CONSENSUS_InsertDoneCallback idc;
96   void *cls;
97 };
98
99
100 /**
101  * Called when the server has sent is a new element
102  *
103  * @param cls consensus handle
104  * @param msg element message
105  */
106 static int
107 check_new_element (void *cls,
108                    const struct GNUNET_CONSENSUS_ElementMessage *msg)
109 {
110   /* any size is fine, elements are variable-size */
111   return GNUNET_OK;
112 }
113
114
115 /**
116  * Called when the server has sent is a new element
117  *
118  * @param cls consensus handle
119  * @param msg element message
120  */
121 static void
122 handle_new_element (void *cls,
123                     const struct GNUNET_CONSENSUS_ElementMessage *msg)
124 {
125   struct GNUNET_CONSENSUS_Handle *consensus = cls;
126   struct GNUNET_SET_Element element;
127
128   LOG (GNUNET_ERROR_TYPE_DEBUG,
129        "received new element\n");
130   element.element_type = msg->element_type;
131   element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
132   element.data = &msg[1];
133   consensus->new_element_cb (consensus->new_element_cls,
134                              &element);
135 }
136
137
138 /**
139  * Called when the server has announced
140  * that the conclusion is over.
141  *
142  * @param cls consensus handle
143  * @param msg conclude done message
144  */
145 static void
146 handle_conclude_done (void *cls,
147                       const struct GNUNET_MessageHeader *msg)
148 {
149   struct GNUNET_CONSENSUS_Handle *consensus = cls;
150   GNUNET_CONSENSUS_ConcludeCallback cc;
151
152   GNUNET_MQ_destroy (consensus->mq);
153   consensus->mq = NULL;
154   GNUNET_assert (NULL != (cc = consensus->conclude_cb));
155   consensus->conclude_cb = NULL;
156   cc (consensus->conclude_cls);
157 }
158
159
160 /**
161  * Generic error handler, called with the appropriate
162  * error code and the same closure specified at the creation of
163  * the message queue.
164  * Not every message queue implementation supports an error handler.
165  *
166  * @param cls closure, same closure as for the message handlers
167  * @param error error code
168  */
169 static void
170 mq_error_handler (void *cls,
171                   enum GNUNET_MQ_Error error)
172 {
173   LOG (GNUNET_ERROR_TYPE_WARNING,
174        "consensus service disconnected us\n");
175 }
176
177
178 /**
179  * Create a consensus session.
180  *
181  * @param cfg configuration to use for connecting to the consensus service
182  * @param num_peers number of peers in the peers array
183  * @param peers array of peers participating in this consensus session
184  *              Inclusion of the local peer is optional.
185  * @param session_id session identifier
186  *                   Allows a group of peers to have more than consensus session.
187  * @param start start time of the consensus, conclude should be called before
188  *              the start time.
189  * @param deadline time when the consensus should have concluded
190  * @param new_element_cb callback, called when a new element is added to the set by
191  *                    another peer
192  * @param new_element_cls closure for new_element
193  * @return handle to use, NULL on error
194  */
195 struct GNUNET_CONSENSUS_Handle *
196 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
197                          unsigned int num_peers,
198                          const struct GNUNET_PeerIdentity *peers,
199                          const struct GNUNET_HashCode *session_id,
200                          struct GNUNET_TIME_Absolute start,
201                          struct GNUNET_TIME_Absolute deadline,
202                          GNUNET_CONSENSUS_ElementCallback new_element_cb,
203                          void *new_element_cls)
204 {
205   GNUNET_MQ_hd_var_size (new_element,
206                          GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT,
207                          struct GNUNET_CONSENSUS_ElementMessage);
208   GNUNET_MQ_hd_fixed_size (conclude_done,
209                            GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE,
210                            struct GNUNET_MessageHeader);
211   struct GNUNET_CONSENSUS_Handle *consensus
212     = GNUNET_new (struct GNUNET_CONSENSUS_Handle);
213   struct GNUNET_MQ_MessageHandler mq_handlers[] = {
214     make_new_element_handler (consensus),
215     make_conclude_done_handler (consensus),
216     GNUNET_MQ_handler_end ()
217   };
218   struct GNUNET_CONSENSUS_JoinMessage *join_msg;
219   struct GNUNET_MQ_Envelope *ev;
220   struct GNUNET_CLIENT_Connection *client;
221
222   consensus->cfg = cfg;
223   consensus->new_element_cb = new_element_cb;
224   consensus->new_element_cls = new_element_cls;
225   consensus->session_id = *session_id;
226   client = GNUNET_CLIENT_connect ("consensus", cfg);
227   if (NULL == client)
228   {
229     GNUNET_free (consensus);
230     return NULL;
231   }
232   consensus->mq = GNUNET_MQ_queue_for_connection_client (client,
233                                                          mq_handlers,
234                                                          &mq_error_handler,
235                                                          consensus);
236   ev = GNUNET_MQ_msg_extra (join_msg,
237                             (num_peers * sizeof (struct GNUNET_PeerIdentity)),
238                             GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
239
240   join_msg->session_id = consensus->session_id;
241   join_msg->start = GNUNET_TIME_absolute_hton (start);
242   join_msg->deadline = GNUNET_TIME_absolute_hton (deadline);
243   join_msg->num_peers = htonl (num_peers);
244   memcpy(&join_msg[1],
245          peers,
246          num_peers * sizeof (struct GNUNET_PeerIdentity));
247
248   GNUNET_MQ_send (consensus->mq, ev);
249   return consensus;
250 }
251
252
253 static void
254 idc_adapter (void *cls)
255 {
256   struct InsertDoneInfo *i = cls;
257   i->idc (i->cls, GNUNET_OK);
258   GNUNET_free (i);
259 }
260
261 /**
262  * Insert an element in the set being reconsiled.  Must not be called after
263  * "GNUNET_CONSENSUS_conclude".
264  *
265  * @param consensus handle for the consensus session
266  * @param element the element to be inserted
267  * @param idc function called when we are done with this element and it
268  *            is thus allowed to call GNUNET_CONSENSUS_insert again
269  * @param idc_cls closure for 'idc'
270  */
271 void
272 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
273                          const struct GNUNET_SET_Element *element,
274                          GNUNET_CONSENSUS_InsertDoneCallback idc,
275                          void *idc_cls)
276 {
277   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
278   struct GNUNET_MQ_Envelope *ev;
279   struct InsertDoneInfo *i;
280
281   LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
282
283   ev = GNUNET_MQ_msg_extra (element_msg, element->size,
284                             GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
285
286   memcpy (&element_msg[1], element->data, element->size);
287
288   if (NULL != idc)
289   {
290     i = GNUNET_new (struct InsertDoneInfo);
291     i->idc = idc;
292     i->cls = idc_cls;
293     GNUNET_MQ_notify_sent (ev, idc_adapter, i);
294   }
295   GNUNET_MQ_send (consensus->mq, ev);
296 }
297
298
299 /**
300  * We are done with inserting new elements into the consensus;
301  * try to conclude the consensus within a given time window.
302  * After conclude has been called, no further elements may be
303  * inserted by the client.
304  *
305  * @param consensus consensus session
306  * @param deadline deadline after which the conculde callback
307  *                must be called
308  * @param conclude called when the conclusion was successful
309  * @param conclude_cls closure for the conclude callback
310  */
311 void
312 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
313                            GNUNET_CONSENSUS_ConcludeCallback conclude,
314                            void *conclude_cls)
315 {
316   struct GNUNET_MQ_Envelope *ev;
317
318   GNUNET_assert (NULL != conclude);
319   GNUNET_assert (NULL == consensus->conclude_cb);
320
321   consensus->conclude_cls = conclude_cls;
322   consensus->conclude_cb = conclude;
323
324   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
325   GNUNET_MQ_send (consensus->mq, ev);
326 }
327
328
329 /**
330  * Destroy a consensus handle (free all state associated with
331  * it, no longer call any of the callbacks).
332  *
333  * @param consensus handle to destroy
334  */
335 void
336 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
337 {
338   if (NULL != consensus->mq)
339   {
340     GNUNET_MQ_destroy (consensus->mq);
341     consensus->mq = NULL;
342   }
343   GNUNET_free (consensus);
344 }
345
346 /* end of consensus_api.c */