a1dc4082616010de81a5c162cc95af0254e36f9e
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35
36
37 /**
38  * Handle for the service.
39  */
40 struct GNUNET_CONSENSUS_Handle
41 {
42   /**
43    * Configuration to use.
44    */
45   const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47   /**
48    * Client connected to the consensus service, may be NULL if not connected.
49    */
50   struct GNUNET_CLIENT_Connection *client;
51
52   /**
53    * Callback for new elements. Not called for elements added locally.
54    */
55   GNUNET_CONSENSUS_ElementCallback new_element_cb;
56
57   /**
58    * Closure for new_element_cb
59    */
60   void *new_element_cls;
61
62   /**
63    * The (local) session identifier for the consensus session.
64    */
65   struct GNUNET_HashCode session_id;
66
67   /**
68    * Number of peers in the consensus. Optionally includes the local peer.
69    */
70   int num_peers;
71
72   /**
73    * Peer identities of peers participating in the consensus, includes the local peer.
74    */
75   struct GNUNET_PeerIdentity **peers;
76
77   /**
78    * GNUNES_YES iff the join message has been sent to the service.
79    */
80   int joined;
81
82   /**
83    * Called when the conclude operation finishes or fails.
84    */
85   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
86
87   /**
88    * Closure for the conclude callback.
89    */
90   void *conclude_cls;
91
92   /**
93    * Deadline for the conclude operation.
94    */
95   struct GNUNET_TIME_Absolute conclude_deadline;
96
97   /**
98    * Message queue for the client.
99    */
100   struct GNUNET_MQ_Handle *mq;
101 };
102
103 /**
104  * FIXME: this should not bee necessary when the API
105  * issue has been fixed
106  */
107 struct InsertDoneInfo
108 {
109   GNUNET_CONSENSUS_InsertDoneCallback idc;
110   void *cls;
111 };
112
113
114 /**
115  * Called when the server has sent is a new element
116  * 
117  * @param cls consensus handle
118  * @param mh element message
119  */
120 static void
121 handle_new_element (void *cls,
122                     const struct GNUNET_MessageHeader *mh)
123 {
124   struct GNUNET_CONSENSUS_Handle *consensus = cls;
125   const struct GNUNET_CONSENSUS_ElementMessage *msg
126       = (const struct GNUNET_CONSENSUS_ElementMessage *) mh;
127   struct GNUNET_SET_Element element;
128
129   LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
130
131   element.type = msg->element_type;
132   element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
133   element.data = &msg[1];
134
135   consensus->new_element_cb (consensus->new_element_cls, &element);
136 }
137
138
139 /**
140  * Called when the server has announced
141  * that the conclusion is over.
142  * 
143  * @param cls consensus handle
144  * @param msg conclude done message
145  */
146 static void
147 handle_conclude_done (void *cls,
148                       const struct GNUNET_MessageHeader *msg)
149 {
150   struct GNUNET_CONSENSUS_Handle *consensus = cls;
151
152   GNUNET_CONSENSUS_ConcludeCallback cc;
153
154   GNUNET_assert (NULL != (cc = consensus->conclude_cb));
155   consensus->conclude_cb = NULL;
156   cc (consensus->conclude_cls);
157 }
158
159
160 /**
161  * Create a consensus session.
162  *
163  * @param cfg configuration to use for connecting to the consensus service
164  * @param num_peers number of peers in the peers array
165  * @param peers array of peers participating in this consensus session
166  *              Inclusion of the local peer is optional.
167  * @param session_id session identifier
168  *                   Allows a group of peers to have more than consensus session.
169  * @param new_element_cb callback, called when a new element is added to the set by
170  *                    another peer
171  * @param new_element_cls closure for new_element
172  * @return handle to use, NULL on error
173  */
174 struct GNUNET_CONSENSUS_Handle *
175 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
176                          unsigned int num_peers,
177                          const struct GNUNET_PeerIdentity *peers,
178                          const struct GNUNET_HashCode *session_id,
179                          GNUNET_CONSENSUS_ElementCallback new_element_cb,
180                          void *new_element_cls)
181 {
182   struct GNUNET_CONSENSUS_Handle *consensus;
183   struct GNUNET_CONSENSUS_JoinMessage *join_msg;
184   struct GNUNET_MQ_Envelope *ev;
185   const static struct GNUNET_MQ_MessageHandler mq_handlers[] = {
186     {handle_new_element,
187       GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0},
188     {handle_conclude_done,
189       GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0},
190     GNUNET_MQ_HANDLERS_END
191   };
192
193   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
194   consensus->cfg = cfg;
195   consensus->new_element_cb = new_element_cb;
196   consensus->new_element_cls = new_element_cls;
197   consensus->num_peers = num_peers;
198   consensus->session_id = *session_id;
199
200   if (0 == num_peers)
201     consensus->peers = NULL;
202   else if (num_peers > 0)
203     consensus->peers =
204         GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
205
206   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
207   consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
208                                                          mq_handlers, consensus);
209
210   GNUNET_assert (consensus->client != NULL);
211
212   ev = GNUNET_MQ_msg_extra (join_msg,
213                             (num_peers * sizeof (struct GNUNET_PeerIdentity)),
214                             GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
215
216   join_msg->session_id = consensus->session_id;
217   join_msg->num_peers = htonl (consensus->num_peers);
218   memcpy(&join_msg[1],
219          consensus->peers,
220          consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
221
222   GNUNET_MQ_send (consensus->mq, ev);
223   return consensus;
224 }
225
226
227 static void
228 idc_adapter (void *cls)
229 {
230   struct InsertDoneInfo *i = cls;
231   i->idc (i->cls, GNUNET_OK);
232   GNUNET_free (i);
233 }
234
235 /**
236  * Insert an element in the set being reconsiled.  Must not be called after
237  * "GNUNET_CONSENSUS_conclude".
238  *
239  * @param consensus handle for the consensus session
240  * @param element the element to be inserted
241  * @param idc function called when we are done with this element and it 
242  *            is thus allowed to call GNUNET_CONSENSUS_insert again
243  * @param idc_cls closure for 'idc'
244  */
245 void
246 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
247                          const struct GNUNET_SET_Element *element,
248                          GNUNET_CONSENSUS_InsertDoneCallback idc,
249                          void *idc_cls)
250 {
251   struct GNUNET_CONSENSUS_ElementMessage *element_msg;
252   struct GNUNET_MQ_Envelope *ev;
253   struct InsertDoneInfo *i;
254
255   LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
256
257   ev = GNUNET_MQ_msg_extra (element_msg, element->size,
258                             GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
259
260   memcpy (&element_msg[1], element->data, element->size);
261   
262   if (NULL != idc)
263   {
264     i = GNUNET_new (struct InsertDoneInfo);
265     i->idc = idc;
266     i->cls = idc_cls;
267     GNUNET_MQ_notify_sent (ev, idc_adapter, i);
268   }
269 }
270
271
272 /**
273  * We are done with inserting new elements into the consensus;
274  * try to conclude the consensus within a given time window.
275  * After conclude has been called, no further elements may be
276  * inserted by the client.
277  *
278  * @param consensus consensus session
279  * @param timeout timeout after which the conculde callback
280  *                must be called
281  * @param conclude called when the conclusion was successful
282  * @param conclude_cls closure for the conclude callback
283  */
284 void
285 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
286                            struct GNUNET_TIME_Relative timeout,
287                            GNUNET_CONSENSUS_ConcludeCallback conclude,
288                            void *conclude_cls)
289 {
290   struct GNUNET_MQ_Envelope *ev;
291   struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
292
293   GNUNET_assert (NULL != conclude);
294   GNUNET_assert (NULL == consensus->conclude_cb);
295
296   consensus->conclude_cls = conclude_cls;
297   consensus->conclude_cb = conclude;
298
299   ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
300   conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
301
302   GNUNET_MQ_send (consensus->mq, ev);
303 }
304
305
306 /**
307  * Destroy a consensus handle (free all state associated with
308  * it, no longer call any of the callbacks).
309  *
310  * @param consensus handle to destroy
311  */
312 void
313 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
314 {
315   if (consensus->client != NULL)
316   {
317     GNUNET_CLIENT_disconnect (consensus->client);
318     consensus->client = NULL;
319   }
320   if (NULL != consensus->peers)
321     GNUNET_free (consensus->peers);
322   GNUNET_free (consensus);
323 }
324