- gnunet-consensus now profiling tool
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21
22 /**
23  * @file consensus/gnunet-service-consensus.c
24  * @brief 
25  * @author Florian Dold
26  */
27
28 #include "platform.h"
29 #include "gnunet_common.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_consensus_service.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_stream_lib.h"
36 #include "consensus_protocol.h"
37 #include "ibf.h"
38 #include "consensus.h"
39
40
41 /**
42  * Number of IBFs in a strata estimator.
43  */
44 #define STRATA_COUNT 32
45 /**
46  * Number of buckets per IBF.
47  */
48 #define STRATA_IBF_BUCKETS 80
49 /**
50  * hash num parameter of the IBF
51  */
52 #define STRATA_HASH_NUM 3
53 /**
54  * Number of strata that can be transmitted in one message.
55  */
56 #define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS))
57
58
59
60 /* forward declarations */
61
62 struct ConsensusSession;
63 struct IncomingSocket;
64
65 static void
66 send_next (struct ConsensusSession *session);
67
68 static void 
69 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
70
71 static int
72 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
73
74
75 /**
76  * An element that is waiting to be transmitted to a client.
77  */
78 struct PendingElement
79 {
80   /**
81    * Pending elements are kept in a DLL.
82    */
83   struct PendingElement *next;
84
85   /**
86    * Pending elements are kept in a DLL.
87    */
88   struct PendingElement *prev;
89
90   /**
91    * The actual element
92    */
93   struct GNUNET_CONSENSUS_Element *element;
94 };
95
96 struct ConsensusPeerInformation
97 {
98   struct GNUNET_STREAM_Socket *socket;
99
100   /**
101    * Is socket's connection established, i.e. can we write to it?
102    * Only relevent on outgoing cpi.
103    */
104   int is_connected;
105
106   /**
107    * Type of the peer in the all-to-all rounds,
108    * GNUNET_YES if we initiate reconciliation.
109    */
110   int is_outgoing;
111
112   /**
113    * Did we receive/send a consensus hello?
114    */
115   int hello;
116
117   /**
118    * Handle for currently active read
119    */
120   struct GNUNET_STREAM_ReadHandle *rh;
121
122   /**
123    * Handle for currently active read
124    */
125   struct GNUNET_STREAM_WriteHandle *wh;
126
127   /**
128    * How many of the strate in the ibf were
129    * sent or received in this round?
130    */
131   int strata_counter;
132
133   struct InvertibleBloomFilter *my_ibf;
134
135   int my_ibf_bucket_counter;
136
137   struct InvertibleBloomFilter *peer_ibf;
138
139   int peer_ibf_bucket_counter;
140
141   /**
142    * Strata estimator of the peer, NULL if our peer
143    * initiated the reconciliation.
144    */
145   struct InvertibleBloomFilter **strata;
146
147   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
148
149   struct ConsensusSession *session;
150 };
151
152 struct QueuedMessage
153 {
154   struct GNUNET_MessageHeader *msg;
155
156   /**
157    * Queued messages are stored in a doubly linked list.
158    */
159   struct QueuedMessage *next;
160
161   /**
162    * Queued messages are stored in a doubly linked list.
163    */
164   struct QueuedMessage *prev;
165 };
166
167
168 /**
169  * A consensus session consists of one local client and the remote authorities.
170  */
171 struct ConsensusSession
172 {
173   /**
174    * Consensus sessions are kept in a DLL.
175    */
176   struct ConsensusSession *next;
177
178   /**
179    * Consensus sessions are kept in a DLL.
180    */
181   struct ConsensusSession *prev;
182
183   /**
184    * Join message. Used to initialize the session later,
185    * if the identity of the local peer is not yet known.
186    * NULL if the session has been fully initialized.
187    */
188   struct GNUNET_CONSENSUS_JoinMessage *join_msg;
189
190   /**
191   * Global consensus identification, computed
192   * from the local id and participating authorities.
193   */
194   struct GNUNET_HashCode global_id;
195
196   /**
197    * Local client in this consensus session.
198    * There is only one client per consensus session.
199    */
200   struct GNUNET_SERVER_Client *client;
201
202   /**
203    * Values in the consensus set of this session,
204    * all of them either have been sent by or approved by the client.
205    */
206   struct GNUNET_CONTAINER_MultiHashMap *values;
207
208   /**
209    * Elements that have not been sent to the client yet.
210    */
211   struct PendingElement *transmit_pending_head;
212
213   /**
214    * Elements that have not been sent to the client yet.
215    */
216   struct PendingElement *transmit_pending_tail;
217
218   /**
219    * Elements that have not been approved (or rejected) by the client yet.
220    */
221   struct PendingElement *approval_pending_head;
222
223   /**
224    * Elements that have not been approved (or rejected) by the client yet.
225    */
226   struct PendingElement *approval_pending_tail;
227
228   struct QueuedMessage *client_messages_head;
229
230   struct QueuedMessage *client_messages_tail;
231
232   /**
233    * Currently active transmit handle for sending to the client
234    */
235   struct GNUNET_SERVER_TransmitHandle *th;
236
237   /**
238    * Once conclude_requested is GNUNET_YES, the client may not
239    * insert any more values.
240    */
241   int conclude_requested;
242
243   /**
244    * Minimum number of peers to form a consensus group
245    */
246   int conclude_group_min;
247
248   /**
249    * Current round of the conclusion
250    */
251   int current_round;
252
253   /**
254    * Soft deadline for conclude.
255    * Speed up the speed of the consensus at the cost of consensus quality, as
256    * the time approached or crosses the deadline.
257    */
258   struct GNUNET_TIME_Absolute conclude_deadline;
259
260   /**
261    * Number of other peers in the consensus
262    */
263   unsigned int num_peers;
264
265   struct ConsensusPeerInformation *info;
266
267   /**
268    * Sorted array of peer identities in this consensus session,
269    * includes the local peer.
270    */
271   struct GNUNET_PeerIdentity *peers;
272
273   /**
274    * Index of the local peer in the peers array
275    */
276   int local_peer_idx;
277
278   /**
279    * Task identifier for the round timeout task
280    */
281   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
282
283   struct InvertibleBloomFilter **strata;
284 };
285
286
287 /**
288  * Sockets from other peers who want to communicate with us.
289  * It may not be known yet which consensus session they belong to.
290  */
291 struct IncomingSocket
292 {
293   /**
294    * Incoming sockets are kept in a double linked list.
295    */
296   struct IncomingSocket *next;
297
298   /**
299    * Incoming sockets are kept in a double linked list.
300    */
301   struct IncomingSocket *prev;
302
303   /**
304    * The actual socket.
305    */
306   struct GNUNET_STREAM_Socket *socket;
307
308   /**
309    * Handle for currently active read
310    */
311   struct GNUNET_STREAM_ReadHandle *rh;
312
313   /**
314    * Peer that connected to us with the socket.
315    */
316   struct GNUNET_PeerIdentity *peer;
317
318   /**
319    * Message stream tokenizer for this socket.
320    */
321   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
322
323   /**
324    * Peer-in-session this socket belongs to, once known, otherwise NULL.
325    */
326   struct ConsensusPeerInformation *cpi;
327 };
328
329 static struct IncomingSocket *incoming_sockets_head;
330 static struct IncomingSocket *incoming_sockets_tail;
331
332 /**
333  * Linked list of sesstions this peer participates in.
334  */
335 static struct ConsensusSession *sessions_head;
336
337 /**
338  * Linked list of sesstions this peer participates in.
339  */
340 static struct ConsensusSession *sessions_tail;
341
342 /**
343  * Configuration of the consensus service.
344  */
345 static const struct GNUNET_CONFIGURATION_Handle *cfg;
346
347 /**
348  * Handle to the server for this service.
349  */
350 static struct GNUNET_SERVER_Handle *srv;
351
352 /**
353  * Peer that runs this service.
354  */
355 static struct GNUNET_PeerIdentity *my_peer;
356
357 /**
358  * Handle to the core service. Only used during service startup, will be NULL after that.
359  */
360 static struct GNUNET_CORE_Handle *core;
361
362 /**
363  * Listener for sockets from peers that want to reconcile with us.
364  */
365 static struct GNUNET_STREAM_ListenSocket *listener;
366
367
368 static int
369 estimate_difference (struct InvertibleBloomFilter** strata1,
370                      struct InvertibleBloomFilter** strata2)
371 {
372   int i;
373   int count;
374   count = 0;
375   for (i = STRATA_COUNT - 1; i >= 0; i--)
376   {
377     struct InvertibleBloomFilter *diff;
378     int ibf_count;
379     int more;
380     ibf_count = 0;
381     diff = ibf_dup (strata1[i]);
382     ibf_subtract (diff, strata2[i]);
383     for (;;)
384     {
385       more = ibf_decode (diff, NULL, NULL);
386       if (GNUNET_NO == more)
387       {
388         count += ibf_count;
389         break;
390       }
391       if (GNUNET_SYSERR == more)
392       {
393         return count * (1 << (i + 1));
394       }
395       ibf_count++;
396     }
397     ibf_destroy (diff);
398   }
399   return count;
400 }
401
402
403 /**
404  * Functions of this signature are called whenever data is available from the
405  * stream.
406  *
407  * @param cls the closure from GNUNET_STREAM_read
408  * @param status the status of the stream at the time this function is called
409  * @param data traffic from the other side
410  * @param size the number of bytes available in data read; will be 0 on timeout 
411  * @return number of bytes of processed from 'data' (any data remaining should be
412  *         given to the next time the read processor is called).
413  */
414 static size_t
415 stream_data_processor (void *cls,
416                        enum GNUNET_STREAM_Status status,
417                        const void *data,
418                        size_t size)
419 {
420   struct IncomingSocket *incoming;
421   int ret;
422
423   GNUNET_assert (GNUNET_STREAM_OK == status);
424
425   incoming = (struct IncomingSocket *) cls;
426
427   ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_NO);
428   if (GNUNET_SYSERR == ret)
429   {
430     /* FIXME: handle this correctly */
431     GNUNET_assert (0);
432   }
433
434   /* read again */
435   incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
436                                      &stream_data_processor, incoming);
437
438   /* we always read all data */
439   return size;
440 }
441
442 static int
443 handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
444 {
445   int i;
446   int num_strata;
447   struct GNUNET_HashCode *hash_src;
448   uint8_t *count_src;
449
450   GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
451
452   if (NULL == cpi->strata)
453   {
454     cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
455     for (i = 0; i < STRATA_COUNT; i++)
456       cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
457   }
458
459   num_strata = ntohs (strata_msg->num_strata);
460
461   /* for correct message alignment, copy bucket types seperately */
462   hash_src = (struct GNUNET_HashCode *) &strata_msg[1];
463
464   for (i = 0; i < num_strata; i++)
465   {
466     memcpy (cpi->strata[cpi->strata_counter+i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
467     hash_src += STRATA_IBF_BUCKETS;
468   }
469
470   for (i = 0; i < num_strata; i++)
471   {
472     memcpy (cpi->strata[cpi->strata_counter+i]->id_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
473     hash_src += STRATA_IBF_BUCKETS;
474   }
475
476   count_src = (uint8_t *) hash_src;
477
478   for (i = 0; i < num_strata; i++)
479   {
480     uint8_t zero[STRATA_IBF_BUCKETS];
481     memset (zero, 0, STRATA_IBF_BUCKETS);
482     memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS);
483     count_src += STRATA_IBF_BUCKETS;
484   }
485
486   GNUNET_assert (count_src == (((uint8_t *) &strata_msg[1]) + STRATA_IBF_BUCKETS * num_strata * IBF_BUCKET_SIZE));
487
488   cpi->strata_counter += num_strata;
489
490   if (STRATA_COUNT == cpi->strata_counter)
491   {
492     int diff;
493     diff = estimate_difference (cpi->session->strata, cpi->strata);
494     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "diff=%d\n", diff);
495   }
496
497   return GNUNET_YES;
498 }
499
500
501 static int
502 handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *strata)
503 {
504   return GNUNET_YES;
505 }
506
507
508 static int
509 handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct Element *strata)
510 {
511   return GNUNET_YES;
512 }
513
514
515 static int
516 handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
517 {
518   struct ConsensusSession *session;
519   session = sessions_head;
520   while (NULL != session)
521   {
522     if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
523     {
524       int idx;
525       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n");
526       idx = get_peer_idx (inc->peer, session);
527       GNUNET_assert (-1 != idx);
528       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx);
529       inc->cpi = &session->info[idx];
530       GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
531       inc->cpi->mst = inc->mst;
532       inc->cpi->hello = GNUNET_YES;
533       inc->cpi->socket = inc->socket;
534       return GNUNET_YES;
535     }
536     session = session->next;
537   }
538   GNUNET_assert (0);
539   return GNUNET_NO;
540 }
541
542
543 /**
544  * Functions with this signature are called whenever a
545  * complete message is received by the tokenizer.
546  *
547  * Do not call GNUNET_SERVER_mst_destroy in callback
548  *
549  * @param cls closure
550  * @param client identification of the client
551  * @param message the actual message
552  *
553  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
554  */
555 static int
556 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
557 {
558   struct ConsensusPeerInformation *cpi;
559   cpi = (struct ConsensusPeerInformation *) cls;
560   switch (ntohs( message->type))
561   {
562     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
563       return handle_p2p_strata (cpi, (struct StrataMessage *) message);
564     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
565       return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
566     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
567       return handle_p2p_element (cpi, (struct Element *) message);
568     default:
569       /* FIXME: handle correctly */
570       GNUNET_assert (0);
571   }
572   return GNUNET_OK;
573 }
574
575
576 /**
577  * Handle tokenized messages from stream sockets.
578  * Delegate them if the socket belongs to a session,
579  * handle hello messages otherwise.
580  *
581  * Do not call GNUNET_SERVER_mst_destroy in callback
582  *
583  * @param cls closure, unused
584  * @param client incoming socket this message comes from
585  * @param message the actual message
586  *
587  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
588  */
589 static int
590 mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
591 {
592   struct IncomingSocket *inc;
593   inc = (struct IncomingSocket *) client;
594   switch (ntohs( message->type))
595   {
596     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
597       return handle_p2p_hello (inc, (struct ConsensusHello *) message);
598     default:
599       if (NULL != inc->cpi)
600         return mst_session_callback (inc->cpi, client, message);
601       /* FIXME: disconnect peer properly */
602       GNUNET_assert (0);
603   }
604   return GNUNET_OK;
605 }
606
607
608 /**
609  * Functions of this type are called upon new stream connection from other peers
610  * or upon binding error which happen when the app_port given in
611  * GNUNET_STREAM_listen() is already taken.
612  *
613  * @param cls the closure from GNUNET_STREAM_listen
614  * @param socket the socket representing the stream; NULL on binding error
615  * @param initiator the identity of the peer who wants to establish a stream
616  *            with us; NULL on binding error
617  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
618  *             stream (the socket will be invalid after the call)
619  */
620 static int
621 listen_cb (void *cls,
622            struct GNUNET_STREAM_Socket *socket,
623            const struct GNUNET_PeerIdentity *initiator)
624 {
625   struct IncomingSocket *incoming;
626
627   GNUNET_assert (NULL != socket);
628
629   incoming = GNUNET_malloc (sizeof *incoming);
630
631   incoming->socket = socket;
632   incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
633
634   incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
635                                      &stream_data_processor, incoming);
636
637
638   incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
639
640   GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
641
642   return GNUNET_OK;
643 }
644
645
646 static void
647 destroy_session (struct ConsensusSession *session)
648 {
649   /* FIXME: more stuff to free! */
650   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
651   GNUNET_SERVER_client_drop (session->client);
652   GNUNET_free (session);
653 }
654
655
656 /**
657  * Disconnect a client, and destroy all sessions associated with it.
658  *
659  * @param client the client to disconnect
660  */
661 static void
662 disconnect_client (struct GNUNET_SERVER_Client *client)
663 {
664   struct ConsensusSession *session;
665   GNUNET_SERVER_client_disconnect (client);
666   
667   /* if the client owns a session, remove it */
668   session = sessions_head;
669   while (NULL != session)
670   {
671     if (client == session->client)
672     {
673       destroy_session (session);
674       break;
675     }
676     session = session->next;
677   }
678 }
679
680
681 /**
682  * Compute a global, (hopefully) unique consensus session id,
683  * from the local id of the consensus session, and the identities of all participants.
684  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
685  * exactly the same peers, the global id will be different.
686  *
687  * @param local_id local id of the consensus session
688  * @param peers array of all peers participating in the consensus session
689  * @param num_peers number of elements in the peers array
690  * @param dst where the result is stored, may not be NULL
691  */
692 static void
693 compute_global_id (const struct GNUNET_HashCode *local_id,
694                    const struct GNUNET_PeerIdentity *peers, int num_peers, 
695                    struct GNUNET_HashCode *dst)
696 {
697   int i;
698   struct GNUNET_HashCode tmp;
699
700   *dst = *local_id;
701   for (i = 0; i < num_peers; ++i)
702   {
703     GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
704     *dst = tmp;
705     GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
706     *dst = tmp;
707   }
708 }
709
710
711 /**
712  * Function called to notify a client about the connection
713  * begin ready to queue more data.  "buf" will be
714  * NULL and "size" zero if the connection was closed for
715  * writing in the meantime.
716  *
717  * @param cls consensus session
718  * @param size number of bytes available in buf
719  * @param buf where the callee should write the message
720  * @return number of bytes written to buf
721  */
722 static size_t
723 transmit_queued (void *cls, size_t size,
724                  void *buf)
725 {
726   struct ConsensusSession *session;
727   struct QueuedMessage *qmsg;
728   size_t msg_size;
729
730   session = (struct ConsensusSession *) cls;
731   session->th = NULL;
732
733
734   qmsg = session->client_messages_head;
735   GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
736   GNUNET_assert (qmsg);
737
738   if (NULL == buf)
739   {
740     destroy_session (session);
741     return 0;
742   }
743
744   msg_size = ntohs (qmsg->msg->size);
745
746   GNUNET_assert (size >= msg_size);
747
748   memcpy (buf, qmsg->msg, msg_size);
749   GNUNET_free (qmsg->msg);
750   GNUNET_free (qmsg);
751
752   send_next (session);
753
754   return msg_size;
755 }
756
757
758 /**
759  * Schedule sending the next message (if there is any) to a client.
760  *
761  * @param cli the client to send the next message to
762  */
763 static void
764 send_next (struct ConsensusSession *session)
765 {
766
767   GNUNET_assert (NULL != session);
768
769   if (NULL != session->th)
770     return;
771
772   if (NULL != session->client_messages_head)
773   {
774     int msize;
775     msize = ntohs (session->client_messages_head->msg->size);
776     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
777     session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, 
778                                                        GNUNET_TIME_UNIT_FOREVER_REL,
779                                                        &transmit_queued, session);
780   }
781 }
782
783
784 /**
785  * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
786  * the correct signature to be used with e.g. qsort.
787  * We use this function instead.
788  *
789  * @param h1 some hash code
790  * @param h2 some hash code
791  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
792  */
793 static int
794 hash_cmp (const void *a, const void *b)
795 {
796   return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b);
797 }
798
799
800 /**
801  * Search peer in the list of peers in session.
802  *
803  * @param peer peer to find
804  * @param session session with peer
805  * @return index of peer, -1 if peer is not in session
806  */
807 static int
808 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
809 {
810   const struct GNUNET_PeerIdentity *needle;
811   needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
812   if (NULL == needle)
813     return -1;
814   return needle - session->peers;
815 }
816
817
818
819 static void
820 hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
821 {
822   struct ConsensusPeerInformation *cpi;
823
824   cpi = (struct ConsensusPeerInformation *) cls;
825   cpi->hello = GNUNET_YES;
826   
827   GNUNET_assert (GNUNET_STREAM_OK == status);
828
829   cpi = (struct ConsensusPeerInformation *) cls;
830
831   if (cpi->session->conclude_requested)
832   {
833     write_strata (cpi, GNUNET_STREAM_OK, 0);  
834   }
835 }
836
837
838 /**
839  * Functions of this type will be called when a stream is established
840  *
841  * @param cls the closure from GNUNET_STREAM_open
842  * @param socket socket to use to communicate with the other side (read/write)
843  */
844 static void
845 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
846 {
847   struct ConsensusPeerInformation *cpi;
848   struct ConsensusHello *hello;
849
850
851   cpi = (struct ConsensusPeerInformation *) cls;
852   cpi->is_connected = GNUNET_YES;
853
854   hello = GNUNET_malloc (sizeof *hello);
855   hello->header.size = htons (sizeof *hello);
856   hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
857   memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
858
859
860   cpi->wh =
861       GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
862
863 }
864
865
866 static void
867 initialize_session_info (struct ConsensusSession *session)
868 {
869   int i;
870   int last;
871
872   for (i = 0; i < session->num_peers; ++i)
873   {
874     /* initialize back-references, so consensus peer information can
875      * be used as closure */
876     session->info[i].session = session;
877
878   }
879
880   last = (session->local_peer_idx + (session->num_peers / 2)) % session->num_peers;
881   i = (session->local_peer_idx + 1) % session->num_peers;
882   while (i != last)
883   {
884     session->info[i].is_outgoing = GNUNET_YES;
885     session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS,
886                                                   open_cb, &session->info[i], GNUNET_STREAM_OPTION_END);
887     session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, session);
888     i = (i + 1) % session->num_peers;
889   }
890   // tie-breaker for even number of peers
891   if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
892   {
893     session->info[last].is_outgoing = GNUNET_YES;
894     session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
895                                                      open_cb, &session->info[last], GNUNET_STREAM_OPTION_END);
896   }
897 }
898
899
900 /**
901  * Create the sorted list of peers for the session,
902  * add the local peer if not in the join message.
903  */
904 static void
905 initialize_session_peer_list (struct ConsensusSession *session)
906 {
907   int local_peer_in_list;
908   int listed_peers;
909   const struct GNUNET_PeerIdentity *msg_peers;
910   unsigned int i;
911
912   GNUNET_assert (NULL != session->join_msg);
913
914   /* peers in the join message, may or may not include the local peer */
915   listed_peers = ntohs (session->join_msg->num_peers);
916   
917   session->num_peers = listed_peers;
918
919   msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
920
921   local_peer_in_list = GNUNET_NO;
922   for (i = 0; i < listed_peers; i++)
923   {
924     if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
925     {
926       local_peer_in_list = GNUNET_YES;
927       break;
928     }
929   }
930
931   if (GNUNET_NO == local_peer_in_list)
932     session->num_peers++;
933
934   session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
935
936   if (GNUNET_NO == local_peer_in_list)
937     session->peers[session->num_peers - 1] = *my_peer;
938
939   memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
940   qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
941 }
942
943
944 static void
945 strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key)
946 {
947   uint32_t v;
948   int i;
949   v = key->bits[0];
950   /* count trailing '1'-bits of v */
951   for (i = 0; v & 1; v>>=1, i++);
952
953   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata insert at %d\n", i);
954
955   ibf_insert (strata[i], key);
956 }
957
958
959 /**
960  * Initialize the session, continue receiving messages from the owning client
961  *
962  * @param session the session to initialize
963  */
964 static void
965 initialize_session (struct ConsensusSession *session)
966 {
967   const struct ConsensusSession *other_session;
968   int i;
969
970   GNUNET_assert (NULL != session->join_msg);
971
972   initialize_session_peer_list (session);
973
974   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
975
976   compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id);
977
978   /* Check if some local client already owns the session. */
979   other_session = sessions_head;
980   while (NULL != other_session)
981   {
982     if ((other_session != session) && 
983         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
984     {
985       /* session already owned by another client */
986       GNUNET_break (0);
987       disconnect_client (session->client);
988       return;
989     }
990     other_session = other_session->next;
991   }
992
993   session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
994
995   session->local_peer_idx = get_peer_idx (my_peer, session);
996   GNUNET_assert (-1 != session->local_peer_idx);
997
998   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
999
1000   session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
1001   for (i = 0; i < STRATA_COUNT; i++)
1002     session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
1003
1004   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1005
1006   initialize_session_info (session);
1007
1008   GNUNET_free (session->join_msg);
1009   session->join_msg = NULL;
1010
1011   GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1012   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1013 }
1014
1015
1016 /**
1017  * Called when a client wants to join a consensus session.
1018  *
1019  * @param cls unused
1020  * @param client client that sent the message
1021  * @param m message sent by the client
1022  */
1023 static void
1024 client_join (void *cls,
1025              struct GNUNET_SERVER_Client *client,
1026              const struct GNUNET_MessageHeader *m)
1027 {
1028   struct ConsensusSession *session;
1029
1030   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join received\n");
1031
1032   // make sure the client has not already joined a session
1033   session = sessions_head;
1034   while (NULL != session)
1035   {
1036     if (session->client == client)
1037     {
1038       GNUNET_break (0);
1039       disconnect_client (client);
1040       return;
1041     }
1042     session = session->next;
1043   }
1044
1045   session = GNUNET_malloc (sizeof (struct ConsensusSession));
1046   session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
1047   session->client = client;
1048   GNUNET_SERVER_client_keep (client);
1049
1050   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1051
1052   // Initialize session later if local peer identity is not known yet.
1053   if (NULL == my_peer)
1054   {
1055     GNUNET_SERVER_disable_receive_done_warning (client);
1056     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init delayed\n");
1057     return;
1058   }
1059
1060   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init now\n");
1061   initialize_session (session);
1062 }
1063
1064
1065 /**
1066  * Called when a client performs an insert operation.
1067  *
1068  * @param cls (unused)
1069  * @param client client handle
1070  * @param message message sent by the client
1071  */
1072 void
1073 client_insert (void *cls,
1074              struct GNUNET_SERVER_Client *client,
1075              const struct GNUNET_MessageHeader *m)
1076 {
1077   struct ConsensusSession *session;
1078   struct GNUNET_CONSENSUS_ElementMessage *msg;
1079   struct GNUNET_CONSENSUS_Element *element;
1080   struct GNUNET_HashCode key;
1081   int element_size;
1082
1083   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "insert\n");
1084
1085   session = sessions_head;
1086   while (NULL != session)
1087   {
1088     if (session->client == client)
1089       break;
1090   }
1091
1092   if (NULL == session)
1093   {
1094     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
1095     GNUNET_SERVER_client_disconnect (client);
1096     return;
1097   }
1098
1099   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1100   element_size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1101
1102   element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
1103
1104   element->type = msg->element_type;
1105   element->size = element_size;
1106   memcpy (&element[1], &msg[1], element_size);
1107   element->data = &element[1];
1108
1109   GNUNET_CRYPTO_hash (element, element_size, &key);
1110
1111   GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1112                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1113
1114   strata_insert (session->strata, &key);
1115
1116   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1117
1118   send_next (session);
1119 }
1120
1121
1122
1123 /**
1124  * Functions of this signature are called whenever writing operations
1125  * on a stream are executed
1126  *
1127  * @param cls the closure from GNUNET_STREAM_write
1128  * @param status the status of the stream at the time this function is called;
1129  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
1130  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1131  *          (this doesn't mean that the data is never sent, the receiver may
1132  *          have read the data but its ACKs may have been lost);
1133  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1134  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1135  *          be processed.
1136  * @param size the number of bytes written
1137  */
1138 static void 
1139 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1140 {
1141   struct ConsensusPeerInformation *cpi;
1142   struct StrataMessage *strata_msg;
1143   size_t msize;
1144   int i;
1145   struct GNUNET_HashCode *hash_dst;
1146   uint8_t *count_dst;
1147   int num_strata;
1148
1149   cpi = (struct ConsensusPeerInformation *) cls;
1150
1151   GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
1152
1153   /* FIXME: handle this */
1154   GNUNET_assert (GNUNET_STREAM_OK == status);
1155
1156   if (STRATA_COUNT == cpi->strata_counter)
1157   {
1158     /* strata have been written, wait for other side's IBF */
1159     return;
1160   }
1161
1162   if ((STRATA_COUNT - cpi->strata_counter) < STRATA_PER_MESSAGE)
1163     num_strata = (STRATA_COUNT - cpi->strata_counter);
1164   else
1165     num_strata = STRATA_PER_MESSAGE;
1166
1167
1168   msize = (sizeof *strata_msg) + (num_strata * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1169
1170   strata_msg = GNUNET_malloc (msize);
1171   strata_msg->header.size = htons (msize);
1172   strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1173   strata_msg->num_strata = htons (num_strata);
1174
1175   /* for correct message alignment, copy bucket types seperately */
1176   hash_dst = (struct GNUNET_HashCode *) &strata_msg[1];
1177
1178   for (i = 0; i < num_strata; i++)
1179   {
1180     memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1181     hash_dst += STRATA_IBF_BUCKETS;
1182   }
1183
1184   for (i = 0; i < num_strata; i++)
1185   {
1186     memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->id_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1187     hash_dst += STRATA_IBF_BUCKETS;
1188   }
1189
1190   count_dst = (uint8_t *) hash_dst;
1191
1192   for (i = 0; i < num_strata; i++)
1193   {
1194     memcpy (count_dst, cpi->session->strata[cpi->strata_counter+i]->count, STRATA_IBF_BUCKETS);
1195     count_dst += STRATA_IBF_BUCKETS;
1196   }
1197
1198   cpi->strata_counter += num_strata;
1199
1200   cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1201                                  write_strata, cpi);
1202
1203   GNUNET_assert (NULL != cpi->wh);
1204 }
1205
1206
1207 /**
1208  * Functions of this signature are called whenever writing operations
1209  * on a stream are executed
1210  *
1211  * @param cls the closure from GNUNET_STREAM_write
1212  * @param status the status of the stream at the time this function is called;
1213  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
1214  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1215  *          (this doesn't mean that the data is never sent, the receiver may
1216  *          have read the data but its ACKs may have been lost);
1217  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1218  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1219  *          be processed.
1220  * @param size the number of bytes written
1221  */
1222 static void 
1223 write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1224 {
1225   struct ConsensusPeerInformation *cpi;
1226
1227   cpi = (struct ConsensusPeerInformation *) cls;
1228 }
1229
1230
1231 /**
1232  * Functions of this signature are called whenever writing operations
1233  * on a stream are executed
1234  *
1235  * @param cls the closure from GNUNET_STREAM_write
1236  * @param status the status of the stream at the time this function is called;
1237  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
1238  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1239  *          (this doesn't mean that the data is never sent, the receiver may
1240  *          have read the data but its ACKs may have been lost);
1241  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1242  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1243  *          be processed.
1244  * @param size the number of bytes written
1245  */
1246 static void 
1247 write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1248 {
1249   struct ConsensusPeerInformation *cpi;
1250
1251   cpi = (struct ConsensusPeerInformation *) cls;
1252 }
1253
1254
1255 /**
1256  * Called when a client performs the conclude operation.
1257  *
1258  * @param cls (unused)
1259  * @param client client handle
1260  * @param message message sent by the client
1261  */
1262 void
1263 client_conclude (void *cls,
1264              struct GNUNET_SERVER_Client *client,
1265              const struct GNUNET_MessageHeader *message)
1266 {
1267   struct ConsensusSession *session;
1268   int i;
1269
1270   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
1271
1272   session = sessions_head;
1273   while ((session != NULL) && (session->client != client))
1274     session = session->next;
1275   if (NULL == session)
1276   {
1277     /* client not found */
1278     GNUNET_break (0);
1279     GNUNET_SERVER_client_disconnect (client);
1280     return;
1281   }
1282
1283   if (GNUNET_YES == session->conclude_requested)
1284   {
1285     /* client requested conclude twice */
1286     GNUNET_break (0);
1287     disconnect_client (client);
1288     return;
1289   }
1290
1291   session->conclude_requested = GNUNET_YES;
1292
1293   /* FIXME: write to already connected sockets */
1294
1295   for (i = 0; i < session->num_peers; i++)
1296   {
1297     if ( (GNUNET_YES == session->info[i].is_outgoing) &&
1298          (GNUNET_YES == session->info[i].hello) )
1299     {
1300       /* kick off transmitting strata by calling the write continuation */
1301       write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
1302     }
1303   }
1304   
1305
1306   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1307   send_next (session);
1308 }
1309
1310
1311 /**
1312  * Called when a client sends an ack
1313  *
1314  * @param cls (unused)
1315  * @param client client handle
1316  * @param message message sent by the client
1317  */
1318 void
1319 client_ack (void *cls,
1320              struct GNUNET_SERVER_Client *client,
1321              const struct GNUNET_MessageHeader *message)
1322 {
1323   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n");
1324 }
1325
1326 /**
1327  * Task that disconnects from core.
1328  *
1329  * @param cls core handle
1330  * @param tc context information (why was this task triggered now)
1331  */
1332 static void
1333 disconnect_core (void *cls,
1334                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1335 {
1336   GNUNET_CORE_disconnect (core);
1337   core = NULL;
1338   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
1339 }
1340
1341
1342 static void
1343 core_startup (void *cls,
1344               struct GNUNET_CORE_Handle *core,
1345               const struct GNUNET_PeerIdentity *peer)
1346 {
1347   struct ConsensusSession *session;
1348
1349   my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
1350   /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
1351   GNUNET_SCHEDULER_add_now (&disconnect_core, core);
1352   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
1353
1354   session = sessions_head;
1355   while (NULL != session)
1356   {
1357     if (NULL != session->join_msg)
1358       initialize_session (session);
1359     session = session->next;
1360   }
1361 }
1362
1363
1364 /**
1365  * Called to clean up, after a shutdown has been requested.
1366  *
1367  * @param cls closure
1368  * @param tc context information (why was this task triggered now)
1369  */
1370 static void
1371 shutdown_task (void *cls,
1372                const struct GNUNET_SCHEDULER_TaskContext *tc)
1373 {
1374   while (NULL != sessions_head)
1375   {
1376     struct ConsensusSession *session;
1377     session = sessions_head;
1378     sessions_head = sessions_head->next;
1379     GNUNET_free (session);
1380   }
1381
1382   if (NULL != core)
1383   {
1384     GNUNET_CORE_disconnect (core);
1385     core = NULL;
1386   }
1387
1388   if (NULL != listener)
1389   {
1390     GNUNET_STREAM_listen_close (listener);
1391     listener = NULL;
1392   } 
1393
1394   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
1395 }
1396
1397
1398 /**
1399  * Start processing consensus requests.
1400  *
1401  * @param cls closure
1402  * @param server the initialized server
1403  * @param c configuration to use
1404  */
1405 static void
1406 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
1407 {
1408   static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
1409     {NULL, 0, 0}
1410   };
1411   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1412     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1413     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1414     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1415         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
1416     {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
1417         sizeof (struct GNUNET_CONSENSUS_AckMessage)},
1418     {NULL, NULL, 0, 0}
1419   };
1420
1421   cfg = c;
1422   srv = server;
1423
1424   GNUNET_SERVER_add_handlers (server, server_handlers);
1425
1426   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1427
1428
1429   listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
1430                                    listen_cb, NULL,
1431                                    GNUNET_STREAM_OPTION_END);
1432
1433
1434   /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
1435   core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
1436   GNUNET_assert (NULL != core);
1437
1438   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1439   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata per msg: %d\n", STRATA_PER_MESSAGE);
1440 }
1441
1442
1443 /**
1444  * The main function for the consensus service.
1445  *
1446  * @param argc number of arguments from the command line
1447  * @param argv command line arguments
1448  * @return 0 ok, 1 on error
1449  */
1450 int
1451 main (int argc, char *const *argv)
1452 {
1453   int ret;
1454   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1455   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
1456   return (GNUNET_OK == ret) ? 0 : 1;
1457 }
1458