consensus api, consensus service (local), peer driver and ibf sketch
[oweals/gnunet.git] / src / consensus / consensus_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file consensus/consensus_api.c
23  * @brief 
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_client_lib.h"
30 #include "gnunet_consensus_service.h"
31 #include "consensus.h"
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35
36 struct ElementAck
37 {
38   struct ElementAck *next;
39   struct ElementAck *prev;
40   int keep;
41   struct GNUNET_CONSENSUS_Element *element;
42 };
43
44 /**
45  * Handle for the service.
46  */
47 struct GNUNET_CONSENSUS_Handle
48 {
49   /**
50    * Configuration to use.
51    */
52   const struct GNUNET_CONFIGURATION_Handle *cfg;
53
54   /**
55    * Socket (if available).
56    */
57   struct GNUNET_CLIENT_Connection *client;
58
59   /**
60    * Callback for new elements. Not called for elements added locally.
61    */
62   GNUNET_CONSENSUS_NewElementCallback new_element_cb;
63
64   /**
65    * Closure for new_element_cb
66    */
67   void *new_element_cls;
68
69   /**
70    * Session identifier for the consensus session.
71    */
72   struct GNUNET_HashCode session_id;
73
74   /**
75    * Number of peers in the consensus. Optionally includes the local peer.
76    */
77   int num_peers;
78
79   /**
80    * Peer identities of peers in the consensus. Optionally includes the local peer.
81    */
82   struct GNUNET_PeerIdentity *peers;
83
84   /**
85    * Currently active transmit request.
86    */
87   struct GNUNET_CLIENT_TransmitHandle *th;
88
89   /**
90    * GNUNES_YES iff the join message has been sent to the service.
91    */
92   int joined;
93
94   /**
95    * Called when the current insertion operation finishes.
96    * NULL if there is no insert operation active.
97    */
98   GNUNET_CONSENSUS_InsertDoneCallback idc;
99
100   /**
101    * Closure for the insert done callback.
102    */
103   void *idc_cls;
104
105   /**
106    * An element that was requested to be inserted.
107    */
108   struct GNUNET_CONSENSUS_Element *insert_element;
109
110   /**
111    * Called when the conclude operation finishes or fails.
112    */
113   GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
114
115   /**
116    * Closure for the conclude callback.
117    */
118   void *conclude_cls;
119
120   /**
121    * Deadline for the conclude operation.
122    */
123   struct GNUNET_TIME_Absolute conclude_deadline;
124
125   struct ElementAck *ack_head;
126   struct ElementAck *ack_tail;
127
128   /**
129    * Set to GNUNET_YES if the begin message has been transmitted to the service
130    */
131   int begin_sent;
132
133   /**
134    * Set to GNUNET_YES it the begin message should be transmitted to the service
135    */
136   int begin_requested;
137 };
138
139
140 static size_t
141 transmit_ack (void *cls, size_t size, void *buf);
142
143 static size_t
144 transmit_insert (void *cls, size_t size, void *buf);
145
146 static size_t
147 transmit_conclude (void *cls, size_t size, void *buf);
148
149 static size_t
150 transmit_begin (void *cls, size_t size, void *buf);
151
152
153 /**
154  * Call notify_transmit_ready for ack if necessary and possible.
155  */
156 static void
157 ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus)
158 {
159   if ((NULL == consensus->th) && (NULL != consensus->ack_head))
160   {
161     consensus->th =
162         GNUNET_CLIENT_notify_transmit_ready (consensus->client,
163                                              sizeof (struct GNUNET_CONSENSUS_AckMessage),
164                                              GNUNET_TIME_UNIT_FOREVER_REL,
165                                              GNUNET_NO, &transmit_ack, consensus);
166   }
167 }
168
169
170 /**
171  * Call notify_transmit_ready for ack if necessary and possible.
172  */
173 static void
174 ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus)
175 {
176   if ((NULL == consensus->th) && (NULL != consensus->insert_element))
177   {
178     consensus->th =
179         GNUNET_CLIENT_notify_transmit_ready (consensus->client,
180                                              sizeof (struct GNUNET_CONSENSUS_ElementMessage) + 
181                                                 consensus->insert_element->size,
182                                              GNUNET_TIME_UNIT_FOREVER_REL,
183                                              GNUNET_NO, &transmit_insert, consensus);
184   }
185 }
186
187
188 /**
189  * Call notify_transmit_ready for ack if necessary and possible.
190  */
191 static void
192 ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus)
193 {
194   if ((NULL == consensus->th) && (NULL != consensus->conclude_cb))
195   {
196     consensus->th =
197         GNUNET_CLIENT_notify_transmit_ready (consensus->client,
198                                              sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
199                                              GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline),
200                                              GNUNET_NO, &transmit_conclude, consensus);
201   }
202 }
203
204
205 /**
206  * Call notify_transmit_ready for ack if necessary and possible.
207  */
208 static void
209 ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus)
210 {
211   if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) &&
212       (GNUNET_NO == consensus->begin_sent))
213   {
214     consensus->th =
215         GNUNET_CLIENT_notify_transmit_ready (consensus->client,
216                                              sizeof (struct GNUNET_MessageHeader),
217                                              GNUNET_TIME_UNIT_FOREVER_REL,
218                                              GNUNET_NO, &transmit_begin, consensus);
219   }
220 }
221
222 /**
223  * Called when the server has sent is a new element
224  * 
225  * @param consensus consensus handle
226  * @param msg element message
227  */
228 static void
229 handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
230                    struct GNUNET_CONSENSUS_ElementMessage *msg)
231 {
232   struct GNUNET_CONSENSUS_Element element;
233   struct ElementAck *ack;
234   int ret;
235
236   element.type = msg->element_type;
237   element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
238   element.data = &msg[1];
239
240   ret = consensus->new_element_cb (consensus->new_element_cls, &element);
241   ack = GNUNET_malloc (sizeof (struct ElementAck));
242   ack->keep = ret;
243   GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack);
244
245   ntr_ack (consensus);
246 }
247
248
249 /**
250  * Called when the server has announced
251  * that the conclusion is over.
252  * 
253  * @param consensus consensus handle
254  * @param msg conclude done message
255  */
256 static void
257 handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
258                      struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
259 {
260   GNUNET_assert (NULL != consensus->conclude_cb);
261   consensus->conclude_cb(consensus->conclude_cls,
262                          msg->num_peers,
263                          (struct GNUNET_PeerIdentity *) &msg[1]);
264   consensus->conclude_cb = NULL;
265 }
266
267
268
269 /**
270  * Type of a function to call when we receive a message
271  * from the service.
272  *
273  * @param cls closure
274  * @param msg message received, NULL on timeout or fatal error
275  */
276 static void
277 message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
278 {
279   struct GNUNET_CONSENSUS_Handle *consensus = cls;
280
281   LOG (GNUNET_ERROR_TYPE_INFO, "received message from consensus service\n");
282
283   if (msg == NULL)
284   {
285     /* Error, timeout, death */
286     LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
287     GNUNET_CLIENT_disconnect (consensus->client);
288     consensus->client = NULL;
289     consensus->new_element_cb (NULL, NULL);
290     if (NULL != consensus->idc)
291     {
292       consensus->idc(consensus->idc_cls, GNUNET_NO);
293       consensus->idc = NULL;
294       consensus->idc_cls = NULL;
295     }
296     return;
297   }
298
299   switch (ntohs (msg->type))
300   {
301     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
302       handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
303       break;
304     case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
305       handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
306       break;
307     default:
308       LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
309   }
310   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
311                          GNUNET_TIME_UNIT_FOREVER_REL);
312 }
313
314
315
316
317 /**
318  * Function called to notify a client about the connection
319  * begin ready to queue more data.  "buf" will be
320  * NULL and "size" zero if the connection was closed for
321  * writing in the meantime.
322  *
323  * @param cls closure
324  * @param size number of bytes available in buf
325  * @param buf where the callee should write the message
326  * @return number of bytes written to buf
327  */
328 static size_t
329 transmit_ack (void *cls, size_t size, void *buf)
330 {
331   struct GNUNET_CONSENSUS_AckMessage *msg;
332   struct GNUNET_CONSENSUS_Handle *consensus;
333
334   consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
335
336   GNUNET_assert (NULL != consensus->ack_head);
337
338   msg = (struct GNUNET_CONSENSUS_AckMessage *) buf;
339   msg->keep = consensus->ack_head->keep;
340   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
341   msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage));
342
343   consensus->ack_head = consensus->ack_head->next;
344
345   consensus->th = NULL;
346
347   ntr_insert (consensus);
348   ntr_ack (consensus);
349   ntr_conclude (consensus);
350
351   return sizeof (struct GNUNET_CONSENSUS_AckMessage);
352 }
353
354 /**
355  * Function called to notify a client about the connection
356  * begin ready to queue more data.  "buf" will be
357  * NULL and "size" zero if the connection was closed for
358  * writing in the meantime.
359  *
360  * @param cls closure
361  * @param size number of bytes available in buf
362  * @param buf where the callee should write the message
363  * @return number of bytes written to buf
364  */
365 static size_t
366 transmit_insert (void *cls, size_t size, void *buf)
367 {
368   struct GNUNET_CONSENSUS_ElementMessage *msg;
369   struct GNUNET_CONSENSUS_Handle *consensus;
370   GNUNET_CONSENSUS_InsertDoneCallback idc;
371   int msize;
372   void *idc_cls;
373
374   GNUNET_assert (NULL != buf);
375
376   consensus = cls;
377
378   GNUNET_assert (NULL != consensus->insert_element);
379
380   consensus->th = NULL;
381
382   msg = buf;
383
384   msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
385       consensus->insert_element->size;
386
387   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
388   msg->header.size = htons (msize);
389   memcpy (&msg[1],
390           consensus->insert_element->data,
391           consensus->insert_element->size);
392
393   consensus->insert_element = NULL;
394
395   idc = consensus->idc;
396   consensus->idc = NULL;
397   idc_cls = consensus->idc_cls;
398   consensus->idc_cls = NULL;
399   idc (idc_cls, GNUNET_YES);
400
401
402   ntr_ack (consensus);
403   ntr_insert (consensus);
404   ntr_conclude (consensus);
405
406   return msize;
407 }
408
409
410 /**
411  * Function called to notify a client about the connection
412  * begin ready to queue more data.  "buf" will be
413  * NULL and "size" zero if the connection was closed for
414  * writing in the meantime.
415  *
416  * @param cls closure
417  * @param size number of bytes available in buf
418  * @param buf where the callee should write the message
419  * @return number of bytes written to buf
420  */
421 static size_t
422 transmit_join (void *cls, size_t size, void *buf)
423 {
424   struct GNUNET_CONSENSUS_JoinMessage *msg;
425   struct GNUNET_CONSENSUS_Handle *consensus;
426   int msize;
427
428   GNUNET_assert (NULL != buf);
429
430   LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
431
432   consensus = cls;
433   consensus->th = NULL;
434   consensus->joined = 1;
435
436   msg = buf;
437
438   msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
439       consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
440
441   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
442   msg->header.size = htons (msize);
443   msg->session_id = consensus->session_id;
444   msg->num_peers = htons (consensus->num_peers);
445   if (0 != msg->num_peers)
446     memcpy(&msg[1],
447            consensus->peers,
448            consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
449
450   ntr_insert (consensus);
451   ntr_begin (consensus);
452   ntr_conclude (consensus);
453
454   GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
455                          GNUNET_TIME_UNIT_FOREVER_REL);
456   
457   return msize;
458 }
459
460
461 /**
462  * Function called to notify a client about the connection
463  * begin ready to queue more data.  "buf" will be
464  * NULL and "size" zero if the connection was closed for
465  * writing in the meantime.
466  *
467  * @param cls closure
468  * @param size number of bytes available in buf
469  * @param buf where the callee should write the message
470  * @return number of bytes written to buf
471  */
472 static size_t
473 transmit_conclude (void *cls, size_t size, void *buf)
474 {
475   struct GNUNET_CONSENSUS_ConcludeMessage *msg;
476   struct GNUNET_CONSENSUS_Handle *consensus;
477   int msize;
478
479   GNUNET_assert (NULL != buf);
480
481   consensus = cls;
482   consensus->th = NULL;
483
484   msg = buf;
485
486   msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
487
488   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
489   msg->header.size = htons (msize);
490   msg->timeout =
491       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
492
493   ntr_ack (consensus);
494
495   return msize;
496 }
497
498
499 /**
500  * Function called to notify a client about the connection
501  * begin ready to queue more data.  "buf" will be
502  * NULL and "size" zero if the connection was closed for
503  * writing in the meantime.
504  *
505  * @param cls the consensus handle
506  * @param size number of bytes available in buf
507  * @param buf where the callee should write the message
508  * @return number of bytes written to buf
509  */
510 static size_t
511 transmit_begin (void *cls, size_t size, void *buf)
512 {
513   struct GNUNET_MessageHeader *msg;
514   struct GNUNET_CONSENSUS_Handle *consensus;
515   int msize;
516
517   GNUNET_assert (NULL != buf);
518
519   consensus = cls;
520   consensus->th = NULL;
521
522   msg = buf;
523
524   msize = sizeof (struct GNUNET_MessageHeader);
525
526   msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
527   msg->size = htons (msize);
528
529   ntr_ack (consensus);
530   ntr_insert (consensus);
531   ntr_conclude (consensus);
532
533   return msize;
534 }
535
536
537 /**
538  * Create a consensus session.
539  *
540  * @param cfg
541  * @param num_peers
542  * @param peers array of peers participating in this consensus session
543  *              Inclusion of the local peer is optional.
544  * @param session_id session identifier
545  *                   Allows a group of peers to have more than consensus session.
546  * @param new_element_cb callback, called when a new element is added to the set by
547  *                    another peer
548  * @param new_element_cls closure for new_element
549  * @return handle to use, NULL on error
550  */
551 struct GNUNET_CONSENSUS_Handle *
552 GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
553                          unsigned int num_peers,
554                          const struct GNUNET_PeerIdentity *peers,
555                          const struct GNUNET_HashCode *session_id,
556                          GNUNET_CONSENSUS_NewElementCallback new_element_cb,
557                          void *new_element_cls)
558 {
559   struct GNUNET_CONSENSUS_Handle *consensus;
560   size_t join_message_size;
561
562   consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
563   consensus->cfg = cfg;
564   consensus->new_element_cb = new_element_cb;
565   consensus->new_element_cls = new_element_cls;
566   consensus->num_peers = num_peers;
567   consensus->session_id = *session_id;
568
569   if (0 == num_peers)
570   {
571     consensus->peers = NULL;
572   }
573   else if (num_peers > 0)
574   {
575     consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
576   }
577   else
578   {
579     GNUNET_break (0);
580   }
581
582   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
583
584   GNUNET_assert (consensus->client != NULL);
585
586   join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
587       (num_peers * sizeof (struct GNUNET_PeerIdentity));
588
589   consensus->th =
590       GNUNET_CLIENT_notify_transmit_ready (consensus->client,
591                                            join_message_size,
592                                            GNUNET_TIME_UNIT_FOREVER_REL,
593                                            GNUNET_NO, &transmit_join, consensus);
594
595
596   GNUNET_assert (consensus->th != NULL);
597   return consensus;
598 }
599
600
601
602 /**
603  * Insert an element in the set being reconsiled.  Must not be called after
604  * "GNUNET_CONSENSUS_conclude".
605  *
606  * @param consensus handle for the consensus session
607  * @param element the element to be inserted
608  * @param idc function called when we are done with this element and it 
609  *            is thus allowed to call GNUNET_CONSENSUS_insert again
610  * @param idc_cls closure for 'idc'
611  */
612 void
613 GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
614                          const struct GNUNET_CONSENSUS_Element *element,
615                          GNUNET_CONSENSUS_InsertDoneCallback idc,
616                          void *idc_cls)
617 {
618   GNUNET_assert (NULL == consensus->idc);
619   GNUNET_assert (NULL == consensus->insert_element);
620   GNUNET_assert (NULL == consensus->conclude_cb);
621
622   consensus->idc = idc;
623   consensus->idc_cls = idc_cls;
624   consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
625
626   if (consensus->joined == 0)
627   {
628     return;
629   }
630
631   ntr_insert (consensus);
632 }
633
634
635 /**
636  * Begin reconciling elements with other peers.
637  *
638  * @param consensus handle for the consensus session
639  */
640 void
641 GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
642 {
643   GNUNET_assert (NULL == consensus->idc);
644   GNUNET_assert (NULL == consensus->insert_element);
645   GNUNET_assert (GNUNET_NO == consensus->begin_requested);
646   GNUNET_assert (GNUNET_NO == consensus->begin_sent);
647
648   consensus->begin_requested = GNUNET_YES;
649
650   ntr_begin (consensus);
651 }
652
653
654 /**
655  * We are finished inserting new elements into the consensus;
656  * try to conclude the consensus within a given time window.
657  *
658  * @param consensus consensus session
659  * @param timeout timeout after which the conculde callback
660  *                must be called
661  * @param conclude called when the conclusion was successful
662  * @param conclude_cls closure for the conclude callback
663  */
664 void
665 GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
666                            struct GNUNET_TIME_Relative timeout,
667                            GNUNET_CONSENSUS_ConcludeCallback conclude,
668                            void *conclude_cls)
669 {
670   GNUNET_assert (NULL != conclude);
671   GNUNET_assert (NULL == consensus->conclude_cb);
672
673   consensus->conclude_cls = conclude_cls;
674   consensus->conclude_cb = conclude;
675   consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
676
677
678   /* if transmitting the conclude message is not possible right now, transmit_join
679    * or transmit_ack will handle it */
680   ntr_conclude (consensus);
681 }
682
683
684 /**
685  * Destroy a consensus handle (free all state associated with
686  * it, no longer call any of the callbacks).
687  *
688  * @param consensus handle to destroy
689  */
690 void
691 GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
692 {
693   if (consensus->client != NULL)
694   {
695     GNUNET_CLIENT_disconnect (consensus->client);
696     consensus->client = NULL;
697   }
698   if (NULL != consensus->peers)
699     GNUNET_free (consensus->peers);
700   GNUNET_free (consensus);
701 }
702