add completion callback for overlay topology configure functions
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
1 /*
2       This file is part of GNUnet
3       (C) 2012 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21
22 /**
23  * @file consensus/gnunet-service-consensus.c
24  * @brief 
25  * @author Florian Dold
26  */
27
28 #include "platform.h"
29 #include "gnunet_common.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_util_lib.h"
33 #include "gnunet_consensus_service.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_stream_lib.h"
36 #include "consensus_protocol.h"
37 #include "ibf.h"
38 #include "consensus.h"
39
40
41 /**
42  * Number of IBFs in a strata estimator.
43  */
44 #define STRATA_COUNT 32
45 /**
46  * Number of buckets per IBF.
47  */
48 #define STRATA_IBF_BUCKETS 80
49 /**
50  * hash num parameter of the IBF
51  */
52 #define STRATA_HASH_NUM 3
53 /**
54  * Number of strata that can be transmitted in one message.
55  */
56 #define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS))
57
58 #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
59
60 #define MAX_IBF_ORDER (64)
61
62
63 /* forward declarations */
64
65 struct ConsensusSession;
66 struct IncomingSocket;
67 struct ConsensusPeerInformation;
68
69 static void
70 send_next (struct ConsensusSession *session);
71
72 static void 
73 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
74
75 static void 
76 write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size);
77
78 static void 
79 write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size);
80
81 static int
82 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
83
84
85 /**
86  * An element that is waiting to be transmitted to a client.
87  */
88 struct PendingElement
89 {
90   /**
91    * Pending elements are kept in a DLL.
92    */
93   struct PendingElement *next;
94
95   /**
96    * Pending elements are kept in a DLL.
97    */
98   struct PendingElement *prev;
99
100   /**
101    * The actual element
102    */
103   struct GNUNET_CONSENSUS_Element *element;
104
105   /* peer this element is coming from */
106   struct ConsensusPeerInformation *cpi;
107 };
108
109 struct ConsensusPeerInformation
110 {
111   struct GNUNET_STREAM_Socket *socket;
112
113   /**
114    * Is socket's connection established, i.e. can we write to it?
115    * Only relevent on outgoing cpi.
116    */
117   int is_connected;
118
119   /**
120    * Type of the peer in the all-to-all rounds,
121    * GNUNET_YES if we initiate reconciliation.
122    */
123   int is_outgoing;
124
125   /**
126    * Did we receive/send a consensus hello?
127    */
128   int hello;
129
130   /**
131    * Handle for currently active read
132    */
133   struct GNUNET_STREAM_ReadHandle *rh;
134
135   /**
136    * Handle for currently active read
137    */
138   struct GNUNET_STREAM_WriteHandle *wh;
139
140   /**
141    * How many of the strate in the ibf were
142    * sent or received in this round?
143    */
144   int strata_counter;
145
146   int ibf_order;
147
148   struct InvertibleBloomFilter *outgoing_ibf;
149
150   int outgoing_bucket_counter;
151
152   struct InvertibleBloomFilter *incoming_ibf;
153
154   int incoming_bucket_counter;
155
156   /**
157    * NULL or incoming_ibf - outgoing_ibf.
158    * Decoded values of side '1' are to be requested from the the peer.
159    */
160   struct InvertibleBloomFilter *diff_ibf;
161
162   /**
163    * Strata estimator of the peer, NULL if our peer
164    * initiated the reconciliation.
165    */
166   struct InvertibleBloomFilter **strata;
167
168   unsigned int diff;
169
170   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
171
172   struct ConsensusSession *session;
173 };
174
175 struct QueuedMessage
176 {
177   struct GNUNET_MessageHeader *msg;
178
179   /**
180    * Queued messages are stored in a doubly linked list.
181    */
182   struct QueuedMessage *next;
183
184   /**
185    * Queued messages are stored in a doubly linked list.
186    */
187   struct QueuedMessage *prev;
188 };
189
190
191 /**
192  * A consensus session consists of one local client and the remote authorities.
193  */
194 struct ConsensusSession
195 {
196   /**
197    * Consensus sessions are kept in a DLL.
198    */
199   struct ConsensusSession *next;
200
201   /**
202    * Consensus sessions are kept in a DLL.
203    */
204   struct ConsensusSession *prev;
205
206   /**
207    * Join message. Used to initialize the session later,
208    * if the identity of the local peer is not yet known.
209    * NULL if the session has been fully initialized.
210    */
211   struct GNUNET_CONSENSUS_JoinMessage *join_msg;
212
213   /**
214   * Global consensus identification, computed
215   * from the local id and participating authorities.
216   */
217   struct GNUNET_HashCode global_id;
218
219   /**
220    * Local client in this consensus session.
221    * There is only one client per consensus session.
222    */
223   struct GNUNET_SERVER_Client *client;
224
225   /**
226    * Values in the consensus set of this session,
227    * all of them either have been sent by or approved by the client.
228    */
229   struct GNUNET_CONTAINER_MultiHashMap *values;
230
231   /**
232    * Elements that have not been approved (or rejected) by the client yet.
233    */
234   struct PendingElement *approval_pending_head;
235
236   /**
237    * Elements that have not been approved (or rejected) by the client yet.
238    */
239   struct PendingElement *approval_pending_tail;
240
241   struct QueuedMessage *client_messages_head;
242
243   struct QueuedMessage *client_messages_tail;
244
245   /**
246    * Currently active transmit handle for sending to the client
247    */
248   struct GNUNET_SERVER_TransmitHandle *th;
249
250   /**
251    * Once conclude_requested is GNUNET_YES, the client may not
252    * insert any more values.
253    */
254   int conclude_requested;
255
256   /**
257    * Minimum number of peers to form a consensus group
258    */
259   int conclude_group_min;
260
261   /**
262    * Current round of the conclusion
263    */
264   int current_round;
265
266   /**
267    * Soft deadline for conclude.
268    * Speed up the speed of the consensus at the cost of consensus quality, as
269    * the time approached or crosses the deadline.
270    */
271   struct GNUNET_TIME_Absolute conclude_deadline;
272
273   /**
274    * Number of other peers in the consensus
275    */
276   unsigned int num_peers;
277
278   struct ConsensusPeerInformation *info;
279
280   /**
281    * Sorted array of peer identities in this consensus session,
282    * includes the local peer.
283    */
284   struct GNUNET_PeerIdentity *peers;
285
286   /**
287    * Index of the local peer in the peers array
288    */
289   int local_peer_idx;
290
291   /**
292    * Task identifier for the round timeout task
293    */
294   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
295
296   struct InvertibleBloomFilter **strata;
297
298   struct InvertibleBloomFilter **ibfs;
299 };
300
301
302 /**
303  * Sockets from other peers who want to communicate with us.
304  * It may not be known yet which consensus session they belong to.
305  */
306 struct IncomingSocket
307 {
308   /**
309    * Incoming sockets are kept in a double linked list.
310    */
311   struct IncomingSocket *next;
312
313   /**
314    * Incoming sockets are kept in a double linked list.
315    */
316   struct IncomingSocket *prev;
317
318   /**
319    * The actual socket.
320    */
321   struct GNUNET_STREAM_Socket *socket;
322
323   /**
324    * Handle for currently active read
325    */
326   struct GNUNET_STREAM_ReadHandle *rh;
327
328   /**
329    * Peer that connected to us with the socket.
330    */
331   struct GNUNET_PeerIdentity *peer;
332
333   /**
334    * Message stream tokenizer for this socket.
335    */
336   struct GNUNET_SERVER_MessageStreamTokenizer *mst;
337
338   /**
339    * Peer-in-session this socket belongs to, once known, otherwise NULL.
340    */
341   struct ConsensusPeerInformation *cpi;
342 };
343
344 static struct IncomingSocket *incoming_sockets_head;
345 static struct IncomingSocket *incoming_sockets_tail;
346
347 /**
348  * Linked list of sesstions this peer participates in.
349  */
350 static struct ConsensusSession *sessions_head;
351
352 /**
353  * Linked list of sesstions this peer participates in.
354  */
355 static struct ConsensusSession *sessions_tail;
356
357 /**
358  * Configuration of the consensus service.
359  */
360 static const struct GNUNET_CONFIGURATION_Handle *cfg;
361
362 /**
363  * Handle to the server for this service.
364  */
365 static struct GNUNET_SERVER_Handle *srv;
366
367 /**
368  * Peer that runs this service.
369  */
370 static struct GNUNET_PeerIdentity *my_peer;
371
372 /**
373  * Handle to the core service. Only used during service startup, will be NULL after that.
374  */
375 static struct GNUNET_CORE_Handle *core;
376
377 /**
378  * Listener for sockets from peers that want to reconcile with us.
379  */
380 static struct GNUNET_STREAM_ListenSocket *listener;
381
382
383 static void
384 queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg)
385 {
386   struct QueuedMessage *qm;
387   qm = GNUNET_malloc (sizeof *qm);
388   qm->msg = msg;
389   GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm);
390 }
391
392
393 static int
394 estimate_difference (struct InvertibleBloomFilter** strata1,
395                      struct InvertibleBloomFilter** strata2)
396 {
397   int i;
398   int count;
399   count = 0;
400   for (i = STRATA_COUNT - 1; i >= 0; i--)
401   {
402     struct InvertibleBloomFilter *diff;
403     int ibf_count;
404     int more;
405     ibf_count = 0;
406     diff = ibf_dup (strata1[i]);
407     ibf_subtract (diff, strata2[i]);
408     for (;;)
409     {
410       more = ibf_decode (diff, NULL, NULL);
411       if (GNUNET_NO == more)
412       {
413         count += ibf_count;
414         break;
415       }
416       if (GNUNET_SYSERR == more)
417       {
418         return count * (1 << (i + 1));
419       }
420       ibf_count++;
421     }
422     ibf_destroy (diff);
423   }
424   return count;
425 }
426
427
428
429 /**
430  * Functions of this signature are called whenever data is available from the
431  * stream.
432  *
433  * @param cls the closure from GNUNET_STREAM_read
434  * @param status the status of the stream at the time this function is called
435  * @param data traffic from the other side
436  * @param size the number of bytes available in data read; will be 0 on timeout 
437  * @return number of bytes of processed from 'data' (any data remaining should be
438  *         given to the next time the read processor is called).
439  */
440 static size_t
441 session_stream_data_processor (void *cls,
442                        enum GNUNET_STREAM_Status status,
443                        const void *data,
444                        size_t size)
445 {
446   struct ConsensusPeerInformation *cpi;
447   int ret;
448
449   GNUNET_assert (GNUNET_STREAM_OK == status);
450
451   cpi = cls;
452
453   GNUNET_assert (NULL != cpi->mst);
454
455   ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
456   if (GNUNET_SYSERR == ret)
457   {
458     /* FIXME: handle this correctly */
459     GNUNET_assert (0);
460   }
461
462   /* read again */
463   cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
464                                 &session_stream_data_processor, cpi);
465
466   /* we always read all data */
467   return size;
468 }
469
470 /**
471  * Functions of this signature are called whenever data is available from the
472  * stream.
473  *
474  * @param cls the closure from GNUNET_STREAM_read
475  * @param status the status of the stream at the time this function is called
476  * @param data traffic from the other side
477  * @param size the number of bytes available in data read; will be 0 on timeout 
478  * @return number of bytes of processed from 'data' (any data remaining should be
479  *         given to the next time the read processor is called).
480  */
481 static size_t
482 incoming_stream_data_processor (void *cls,
483                        enum GNUNET_STREAM_Status status,
484                        const void *data,
485                        size_t size)
486 {
487   struct IncomingSocket *incoming;
488   int ret;
489
490   GNUNET_assert (GNUNET_STREAM_OK == status);
491
492   incoming = cls;
493
494   ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES);
495   if (GNUNET_SYSERR == ret)
496   {
497     /* FIXME: handle this correctly */
498     GNUNET_assert (0);
499   }
500
501   /* read again */
502   incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
503                                      &incoming_stream_data_processor, incoming);
504
505   /* we always read all data */
506   return size;
507 }
508
509
510 /**
511  * Iterator over hash map entries.
512  *
513  * @param cls closure
514  * @param key current key code
515  * @param value value in the hash map
516  * @return GNUNET_YES if we should continue to
517  *         iterate,
518  *         GNUNET_NO if not.
519  */
520 static int
521 ibf_values_iterator (void *cls,
522                      const struct GNUNET_HashCode *key,
523                      void *value)
524 {
525   struct ConsensusPeerInformation *cpi;
526   cpi = cls;
527   ibf_insert (cpi->session->ibfs[cpi->ibf_order], key);
528   return GNUNET_YES;
529 }
530
531
532 static void
533 create_outgoing_ibf (struct ConsensusPeerInformation *cpi)
534 {
535   if (NULL == cpi->session->ibfs[cpi->ibf_order])
536   {
537     cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
538     GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
539   }
540   cpi->outgoing_ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
541 }
542
543 static int
544 handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
545 {
546   int i;
547   int num_strata;
548   struct GNUNET_HashCode *hash_src;
549   uint8_t *count_src;
550
551   GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
552
553   if (NULL == cpi->strata)
554   {
555     cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
556     for (i = 0; i < STRATA_COUNT; i++)
557       cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
558   }
559
560   num_strata = ntohs (strata_msg->num_strata);
561
562   /* for correct message alignment, copy bucket types seperately */
563   hash_src = (struct GNUNET_HashCode *) &strata_msg[1];
564
565   for (i = 0; i < num_strata; i++)
566   {
567     memcpy (cpi->strata[cpi->strata_counter+i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
568     hash_src += STRATA_IBF_BUCKETS;
569   }
570
571   for (i = 0; i < num_strata; i++)
572   {
573     memcpy (cpi->strata[cpi->strata_counter+i]->id_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
574     hash_src += STRATA_IBF_BUCKETS;
575   }
576
577   count_src = (uint8_t *) hash_src;
578
579   for (i = 0; i < num_strata; i++)
580   {
581     memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS);
582     count_src += STRATA_IBF_BUCKETS;
583   }
584
585   GNUNET_assert (count_src == (((uint8_t *) &strata_msg[1]) + STRATA_IBF_BUCKETS * num_strata * IBF_BUCKET_SIZE));
586
587   cpi->strata_counter += num_strata;
588
589   if (STRATA_COUNT == cpi->strata_counter)
590   {
591
592     cpi->diff = estimate_difference (cpi->session->strata, cpi->strata);
593     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff);
594     cpi->ibf_order = 0;
595     while ((1 << cpi->ibf_order) < cpi->diff)
596       cpi->ibf_order++;
597     if (cpi->ibf_order > MAX_IBF_ORDER)
598       cpi->ibf_order = MAX_IBF_ORDER;
599     cpi->ibf_order += 2;
600     create_outgoing_ibf (cpi);
601     write_ibf (cpi, GNUNET_STREAM_OK, 0);
602   }
603
604   return GNUNET_YES;
605 }
606
607
608 static int
609 handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
610 {
611   struct GNUNET_HashCode *hash_src;
612   int num_buckets;
613   uint8_t *count_src;
614
615   num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
616
617   if (cpi->is_outgoing == GNUNET_YES)
618   {
619     /* we receive the ibf as an initiator, thus we're interested in the order */
620     cpi->ibf_order = digest->order;
621     if ((0 == cpi->outgoing_bucket_counter) && (NULL == cpi->wh))
622     {
623       create_outgoing_ibf (cpi);
624       write_ibf (cpi, GNUNET_STREAM_OK, 0);
625     }
626     /* FIXME: ensure that orders do not differ each time */
627   }
628   else
629   {
630     /* FIXME: handle correctly */
631     GNUNET_assert (cpi->ibf_order == digest->order);
632   }
633
634   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->incoming_bucket_counter, (1 << cpi->ibf_order));
635
636   if (cpi->incoming_bucket_counter + num_buckets > (1 << cpi->ibf_order))
637   {
638     /* TODO: handle this */
639     GNUNET_assert (0);
640   }
641
642   if (NULL == cpi->incoming_ibf)
643     cpi->incoming_ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
644
645   hash_src = (struct GNUNET_HashCode *) &digest[1];
646
647   memcpy (cpi->incoming_ibf->hash_sum, hash_src, num_buckets * sizeof *hash_src);
648   hash_src += num_buckets;
649
650   memcpy (cpi->incoming_ibf->id_sum, hash_src, num_buckets * sizeof *hash_src);
651   hash_src += num_buckets;
652
653   count_src = (uint8_t *) hash_src;
654
655   memcpy (cpi->incoming_ibf->count, count_src, num_buckets * sizeof *count_src);
656
657   cpi->incoming_bucket_counter += num_buckets;
658
659   if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order))
660   {
661     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
662     if ((NULL == cpi->wh) && (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order)))
663       write_values (cpi, GNUNET_STREAM_OK, 0);
664   }
665   return GNUNET_YES;
666 }
667
668
669 static int
670 handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
671 {
672   struct PendingElement *pending_element;
673   struct GNUNET_CONSENSUS_Element *element;
674   struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
675   size_t size;
676
677   size = ntohs (element_msg->size) - sizeof *element_msg;
678
679   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size);
680
681   element = GNUNET_malloc (size + sizeof *element);
682   element->size = size;
683   memcpy (&element[1], &element_msg[1], size);
684   element->data = &element[1];
685
686   pending_element = GNUNET_malloc (sizeof *pending_element);
687   pending_element->element = element;
688   GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element);
689
690   client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
691   client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
692   client_element_msg->header.size = htons (size + sizeof *client_element_msg);
693   memcpy (&client_element_msg[1], &element[1], size);
694
695   queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg);
696
697   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n");
698
699   send_next (cpi->session);
700   
701   return GNUNET_YES;
702 }
703
704
705 static int
706 handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
707 {
708   /* FIXME: session might not exist yet */
709   struct ConsensusSession *session;
710   session = sessions_head;
711   while (NULL != session)
712   {
713     if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
714     {
715       int idx;
716       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n");
717       idx = get_peer_idx (inc->peer, session);
718       GNUNET_assert (-1 != idx);
719       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx);
720       inc->cpi = &session->info[idx];
721       GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
722       inc->cpi->mst = inc->mst;
723       inc->cpi->hello = GNUNET_YES;
724       inc->cpi->socket = inc->socket;
725       return GNUNET_YES;
726     }
727     session = session->next;
728   }
729   GNUNET_assert (0);
730   return GNUNET_NO;
731 }
732
733
734 /**
735  * Functions with this signature are called whenever a
736  * complete message is received by the tokenizer.
737  *
738  * Do not call GNUNET_SERVER_mst_destroy in callback
739  *
740  * @param cls closure
741  * @param client identification of the client
742  * @param message the actual message
743  *
744  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
745  */
746 static int
747 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
748 {
749   struct ConsensusPeerInformation *cpi;
750   cpi =  cls;
751   switch (ntohs (message->type))
752   {
753     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
754       return handle_p2p_strata (cpi, (struct StrataMessage *) message);
755     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
756       return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
757     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
758       return handle_p2p_element (cpi, message);
759     default:
760       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type));
761       /* FIXME: handle correctly */
762       GNUNET_assert (0);
763   }
764   return GNUNET_OK;
765 }
766
767
768 /**
769  * Handle tokenized messages from stream sockets.
770  * Delegate them if the socket belongs to a session,
771  * handle hello messages otherwise.
772  *
773  * Do not call GNUNET_SERVER_mst_destroy in callback
774  *
775  * @param cls closure, unused
776  * @param client incoming socket this message comes from
777  * @param message the actual message
778  *
779  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
780  */
781 static int
782 mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
783 {
784   struct IncomingSocket *inc;
785   inc = (struct IncomingSocket *) client;
786   switch (ntohs( message->type))
787   {
788     case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
789       return handle_p2p_hello (inc, (struct ConsensusHello *) message);
790     default:
791       if (NULL != inc->cpi)
792         return mst_session_callback (inc->cpi, client, message);
793       /* FIXME: disconnect peer properly */
794       GNUNET_assert (0);
795   }
796   return GNUNET_OK;
797 }
798
799
800 /**
801  * Functions of this type are called upon new stream connection from other peers
802  * or upon binding error which happen when the app_port given in
803  * GNUNET_STREAM_listen() is already taken.
804  *
805  * @param cls the closure from GNUNET_STREAM_listen
806  * @param socket the socket representing the stream; NULL on binding error
807  * @param initiator the identity of the peer who wants to establish a stream
808  *            with us; NULL on binding error
809  * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
810  *             stream (the socket will be invalid after the call)
811  */
812 static int
813 listen_cb (void *cls,
814            struct GNUNET_STREAM_Socket *socket,
815            const struct GNUNET_PeerIdentity *initiator)
816 {
817   struct IncomingSocket *incoming;
818
819   GNUNET_assert (NULL != socket);
820
821   incoming = GNUNET_malloc (sizeof *incoming);
822
823   incoming->socket = socket;
824   incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
825
826   incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
827                                      &incoming_stream_data_processor, incoming);
828
829
830   incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
831
832   GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
833
834   return GNUNET_OK;
835 }
836
837
838 static void
839 destroy_session (struct ConsensusSession *session)
840 {
841   /* FIXME: more stuff to free! */
842   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
843   GNUNET_SERVER_client_drop (session->client);
844   GNUNET_free (session);
845 }
846
847
848 /**
849  * Disconnect a client, and destroy all sessions associated with it.
850  *
851  * @param client the client to disconnect
852  */
853 static void
854 disconnect_client (struct GNUNET_SERVER_Client *client)
855 {
856   struct ConsensusSession *session;
857   GNUNET_SERVER_client_disconnect (client);
858   
859   /* if the client owns a session, remove it */
860   session = sessions_head;
861   while (NULL != session)
862   {
863     if (client == session->client)
864     {
865       destroy_session (session);
866       break;
867     }
868     session = session->next;
869   }
870 }
871
872
873 /**
874  * Compute a global, (hopefully) unique consensus session id,
875  * from the local id of the consensus session, and the identities of all participants.
876  * Thus, if the local id of two consensus sessions coincide, but are not comprised of
877  * exactly the same peers, the global id will be different.
878  *
879  * @param local_id local id of the consensus session
880  * @param peers array of all peers participating in the consensus session
881  * @param num_peers number of elements in the peers array
882  * @param dst where the result is stored, may not be NULL
883  */
884 static void
885 compute_global_id (const struct GNUNET_HashCode *local_id,
886                    const struct GNUNET_PeerIdentity *peers, int num_peers, 
887                    struct GNUNET_HashCode *dst)
888 {
889   int i;
890   struct GNUNET_HashCode tmp;
891
892   *dst = *local_id;
893   for (i = 0; i < num_peers; ++i)
894   {
895     GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
896     *dst = tmp;
897     GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
898     *dst = tmp;
899   }
900 }
901
902
903 /**
904  * Function called to notify a client about the connection
905  * begin ready to queue more data.  "buf" will be
906  * NULL and "size" zero if the connection was closed for
907  * writing in the meantime.
908  *
909  * @param cls consensus session
910  * @param size number of bytes available in buf
911  * @param buf where the callee should write the message
912  * @return number of bytes written to buf
913  */
914 static size_t
915 transmit_queued (void *cls, size_t size,
916                  void *buf)
917 {
918   struct ConsensusSession *session;
919   struct QueuedMessage *qmsg;
920   size_t msg_size;
921
922   session = cls;
923   session->th = NULL;
924
925
926   qmsg = session->client_messages_head;
927   GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
928   GNUNET_assert (qmsg);
929
930   if (NULL == buf)
931   {
932     destroy_session (session);
933     return 0;
934   }
935
936   msg_size = ntohs (qmsg->msg->size);
937
938   GNUNET_assert (size >= msg_size);
939
940   memcpy (buf, qmsg->msg, msg_size);
941   GNUNET_free (qmsg->msg);
942   GNUNET_free (qmsg);
943
944   send_next (session);
945
946   return msg_size;
947 }
948
949
950 /**
951  * Schedule sending the next message (if there is any) to a client.
952  *
953  * @param cli the client to send the next message to
954  */
955 static void
956 send_next (struct ConsensusSession *session)
957 {
958
959   GNUNET_assert (NULL != session);
960
961   if (NULL != session->th)
962     return;
963
964   if (NULL != session->client_messages_head)
965   {
966     int msize;
967     msize = ntohs (session->client_messages_head->msg->size);
968     session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, 
969                                                        GNUNET_TIME_UNIT_FOREVER_REL,
970                                                        &transmit_queued, session);
971   }
972 }
973
974
975 /**
976  * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
977  * the correct signature to be used with e.g. qsort.
978  * We use this function instead.
979  *
980  * @param h1 some hash code
981  * @param h2 some hash code
982  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
983  */
984 static int
985 hash_cmp (const void *a, const void *b)
986 {
987   return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b);
988 }
989
990
991 /**
992  * Search peer in the list of peers in session.
993  *
994  * @param peer peer to find
995  * @param session session with peer
996  * @return index of peer, -1 if peer is not in session
997  */
998 static int
999 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
1000 {
1001   const struct GNUNET_PeerIdentity *needle;
1002   needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
1003   if (NULL == needle)
1004     return -1;
1005   return needle - session->peers;
1006 }
1007
1008
1009
1010 static void
1011 hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1012 {
1013   struct ConsensusPeerInformation *cpi;
1014
1015   cpi = cls;
1016   cpi->hello = GNUNET_YES;
1017   
1018   GNUNET_assert (GNUNET_STREAM_OK == status);
1019
1020   if (cpi->session->conclude_requested)
1021   {
1022     write_strata (cpi, GNUNET_STREAM_OK, 0);  
1023   }
1024 }
1025
1026
1027 /**
1028  * Functions of this type will be called when a stream is established
1029  *
1030  * @param cls the closure from GNUNET_STREAM_open
1031  * @param socket socket to use to communicate with the other side (read/write)
1032  */
1033 static void
1034 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1035 {
1036   struct ConsensusPeerInformation *cpi;
1037   struct ConsensusHello *hello;
1038
1039
1040   cpi = cls;
1041   cpi->is_connected = GNUNET_YES;
1042
1043   hello = GNUNET_malloc (sizeof *hello);
1044   hello->header.size = htons (sizeof *hello);
1045   hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1046   memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1047
1048   cpi->wh =
1049       GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
1050
1051   cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1052                                 &session_stream_data_processor, cpi);
1053
1054 }
1055
1056
1057 static void
1058 initialize_session_info (struct ConsensusSession *session)
1059 {
1060   int i;
1061   int last;
1062
1063   for (i = 0; i < session->num_peers; ++i)
1064   {
1065     /* initialize back-references, so consensus peer information can
1066      * be used as closure */
1067     session->info[i].session = session;
1068   }
1069
1070   last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1071   i = (session->local_peer_idx + 1) % session->num_peers;
1072   while (i != last)
1073   {
1074     session->info[i].is_outgoing = GNUNET_YES;
1075     session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS,
1076                                                   open_cb, &session->info[i], GNUNET_STREAM_OPTION_END);
1077     session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]);
1078     i = (i + 1) % session->num_peers;
1079
1080     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i);
1081   }
1082   // tie-breaker for even number of peers
1083   if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
1084   {
1085     session->info[last].is_outgoing = GNUNET_YES;
1086     session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
1087                                                      open_cb, &session->info[last], GNUNET_STREAM_OPTION_END);
1088     session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]);
1089
1090     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last);
1091   }
1092 }
1093
1094
1095 /**
1096  * Create the sorted list of peers for the session,
1097  * add the local peer if not in the join message.
1098  */
1099 static void
1100 initialize_session_peer_list (struct ConsensusSession *session)
1101 {
1102   int local_peer_in_list;
1103   int listed_peers;
1104   const struct GNUNET_PeerIdentity *msg_peers;
1105   unsigned int i;
1106
1107   GNUNET_assert (NULL != session->join_msg);
1108
1109   /* peers in the join message, may or may not include the local peer */
1110   listed_peers = ntohs (session->join_msg->num_peers);
1111   
1112   session->num_peers = listed_peers;
1113
1114   msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
1115
1116   local_peer_in_list = GNUNET_NO;
1117   for (i = 0; i < listed_peers; i++)
1118   {
1119     if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
1120     {
1121       local_peer_in_list = GNUNET_YES;
1122       break;
1123     }
1124   }
1125
1126   if (GNUNET_NO == local_peer_in_list)
1127     session->num_peers++;
1128
1129   session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
1130
1131   if (GNUNET_NO == local_peer_in_list)
1132     session->peers[session->num_peers - 1] = *my_peer;
1133
1134   memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
1135   qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
1136 }
1137
1138
1139 static void
1140 strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key)
1141 {
1142   uint32_t v;
1143   int i;
1144   v = key->bits[0];
1145   /* count trailing '1'-bits of v */
1146   for (i = 0; v & 1; v>>=1, i++);
1147   ibf_insert (strata[i], key);
1148 }
1149
1150
1151 /**
1152  * Initialize the session, continue receiving messages from the owning client
1153  *
1154  * @param session the session to initialize
1155  */
1156 static void
1157 initialize_session (struct ConsensusSession *session)
1158 {
1159   const struct ConsensusSession *other_session;
1160   int i;
1161
1162   GNUNET_assert (NULL != session->join_msg);
1163
1164   initialize_session_peer_list (session);
1165
1166   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
1167
1168   compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id);
1169
1170   /* Check if some local client already owns the session. */
1171   other_session = sessions_head;
1172   while (NULL != other_session)
1173   {
1174     if ((other_session != session) && 
1175         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1176     {
1177       /* session already owned by another client */
1178       GNUNET_break (0);
1179       disconnect_client (session->client);
1180       return;
1181     }
1182     other_session = other_session->next;
1183   }
1184
1185   session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
1186
1187   session->local_peer_idx = get_peer_idx (my_peer, session);
1188   GNUNET_assert (-1 != session->local_peer_idx);
1189
1190   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
1191
1192   session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
1193   for (i = 0; i < STRATA_COUNT; i++)
1194     session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
1195
1196   session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *));
1197
1198   session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1199   initialize_session_info (session);
1200
1201   GNUNET_free (session->join_msg);
1202   session->join_msg = NULL;
1203
1204   GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1205   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1206 }
1207
1208
1209 /**
1210  * Called when a client wants to join a consensus session.
1211  *
1212  * @param cls unused
1213  * @param client client that sent the message
1214  * @param m message sent by the client
1215  */
1216 static void
1217 client_join (void *cls,
1218              struct GNUNET_SERVER_Client *client,
1219              const struct GNUNET_MessageHeader *m)
1220 {
1221   struct ConsensusSession *session;
1222
1223   // make sure the client has not already joined a session
1224   session = sessions_head;
1225   while (NULL != session)
1226   {
1227     if (session->client == client)
1228     {
1229       GNUNET_break (0);
1230       disconnect_client (client);
1231       return;
1232     }
1233     session = session->next;
1234   }
1235
1236   session = GNUNET_malloc (sizeof (struct ConsensusSession));
1237   session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
1238   session->client = client;
1239   GNUNET_SERVER_client_keep (client);
1240
1241   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1242
1243   // Initialize session later if local peer identity is not known yet.
1244   if (NULL == my_peer)
1245   {
1246     GNUNET_SERVER_disable_receive_done_warning (client);
1247     return;
1248   }
1249
1250   initialize_session (session);
1251 }
1252
1253
1254 /**
1255  * Called when a client performs an insert operation.
1256  *
1257  * @param cls (unused)
1258  * @param client client handle
1259  * @param message message sent by the client
1260  */
1261 void
1262 client_insert (void *cls,
1263              struct GNUNET_SERVER_Client *client,
1264              const struct GNUNET_MessageHeader *m)
1265 {
1266   struct ConsensusSession *session;
1267   struct GNUNET_CONSENSUS_ElementMessage *msg;
1268   struct GNUNET_CONSENSUS_Element *element;
1269   struct GNUNET_HashCode key;
1270   int element_size;
1271
1272   session = sessions_head;
1273   while (NULL != session)
1274   {
1275     if (session->client == client)
1276       break;
1277   }
1278
1279   if (NULL == session)
1280   {
1281     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
1282     GNUNET_SERVER_client_disconnect (client);
1283     return;
1284   }
1285
1286   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1287   element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1288
1289   element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
1290
1291   element->type = msg->element_type;
1292   element->size = element_size;
1293   memcpy (&element[1], &msg[1], element_size);
1294   element->data = &element[1];
1295
1296   GNUNET_assert (NULL != element->data);
1297
1298   GNUNET_CRYPTO_hash (element, element_size, &key);
1299
1300   GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1301                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1302
1303   strata_insert (session->strata, &key);
1304
1305   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1306
1307   send_next (session);
1308 }
1309
1310
1311
1312 /**
1313  * Functions of this signature are called whenever writing operations
1314  * on a stream are executed
1315  *
1316  * @param cls the closure from GNUNET_STREAM_write
1317  * @param status the status of the stream at the time this function is called;
1318  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
1319  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1320  *          (this doesn't mean that the data is never sent, the receiver may
1321  *          have read the data but its ACKs may have been lost);
1322  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1323  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1324  *          be processed.
1325  * @param size the number of bytes written
1326  */
1327 static void 
1328 write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1329 {
1330   struct ConsensusPeerInformation *cpi;
1331   struct StrataMessage *strata_msg;
1332   size_t msize;
1333   int i;
1334   struct GNUNET_HashCode *hash_dst;
1335   uint8_t *count_dst;
1336   int num_strata;
1337
1338   cpi = cls;
1339   cpi->wh = NULL;
1340
1341   GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
1342
1343   /* FIXME: handle this */
1344   GNUNET_assert (GNUNET_STREAM_OK == status);
1345
1346   if (STRATA_COUNT == cpi->strata_counter)
1347   {
1348     /* strata have been written, wait for other side's IBF */
1349     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n");
1350     return;
1351   }
1352
1353   if ((STRATA_COUNT - cpi->strata_counter) < STRATA_PER_MESSAGE)
1354     num_strata = (STRATA_COUNT - cpi->strata_counter);
1355   else
1356     num_strata = STRATA_PER_MESSAGE;
1357
1358
1359   msize = (sizeof *strata_msg) + (num_strata * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1360
1361   strata_msg = GNUNET_malloc (msize);
1362   strata_msg->header.size = htons (msize);
1363   strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1364   strata_msg->num_strata = htons (num_strata);
1365
1366   /* for correct message alignment, copy bucket types seperately */
1367   hash_dst = (struct GNUNET_HashCode *) &strata_msg[1];
1368
1369   for (i = 0; i < num_strata; i++)
1370   {
1371     memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1372     hash_dst += STRATA_IBF_BUCKETS;
1373   }
1374
1375   for (i = 0; i < num_strata; i++)
1376   {
1377     memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->id_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1378     hash_dst += STRATA_IBF_BUCKETS;
1379   }
1380
1381   count_dst = (uint8_t *) hash_dst;
1382
1383   for (i = 0; i < num_strata; i++)
1384   {
1385     memcpy (count_dst, cpi->session->strata[cpi->strata_counter+i]->count, STRATA_IBF_BUCKETS);
1386     count_dst += STRATA_IBF_BUCKETS;
1387   }
1388
1389   cpi->strata_counter += num_strata;
1390
1391   cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1392                                  write_strata, cpi);
1393
1394   GNUNET_assert (NULL != cpi->wh);
1395 }
1396
1397
1398 /**
1399  * Functions of this signature are called whenever writing operations
1400  * on a stream are executed
1401  *
1402  * @param cls the closure from GNUNET_STREAM_write
1403  * @param status the status of the stream at the time this function is called;
1404  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
1405  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1406  *          (this doesn't mean that the data is never sent, the receiver may
1407  *          have read the data but its ACKs may have been lost);
1408  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1409  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1410  *          be processed.
1411  * @param size the number of bytes written
1412  */
1413 static void 
1414 write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1415 {
1416   struct ConsensusPeerInformation *cpi;
1417   struct DifferenceDigest *digest;
1418   int msize;
1419   struct GNUNET_HashCode *hash_dst;
1420   uint8_t *count_dst;
1421   int num_buckets;
1422
1423   cpi = cls;
1424   cpi->wh = NULL;
1425
1426   if (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order))
1427   {
1428     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
1429     if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order))
1430       write_values (cpi, GNUNET_STREAM_OK, 0);
1431     return;
1432   }
1433
1434   /* remaining buckets */
1435   num_buckets = (1 << cpi->ibf_order) - cpi->outgoing_bucket_counter;
1436
1437   /* limit to maximum */
1438   if (num_buckets > BUCKETS_PER_MESSAGE)
1439     num_buckets = BUCKETS_PER_MESSAGE;
1440
1441   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->outgoing_bucket_counter, (1<<cpi->ibf_order));
1442
1443   msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
1444
1445   digest = GNUNET_malloc (msize);
1446   digest->header.size = htons (msize);
1447   digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1448   digest->order = cpi->ibf_order;
1449
1450   hash_dst = (struct GNUNET_HashCode *) &digest[1];
1451
1452   memcpy (hash_dst, cpi->outgoing_ibf->hash_sum, num_buckets * sizeof *hash_dst);
1453   hash_dst += num_buckets;
1454
1455   memcpy (hash_dst, cpi->outgoing_ibf->id_sum, num_buckets * sizeof *hash_dst);
1456   hash_dst += num_buckets;
1457
1458   count_dst = (uint8_t *) hash_dst;
1459
1460   memcpy (count_dst, cpi->outgoing_ibf->count, num_buckets * sizeof *count_dst);
1461
1462   cpi->outgoing_bucket_counter += num_buckets;
1463
1464   cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1465                                  write_ibf, cpi);
1466
1467   GNUNET_assert (NULL != cpi->wh);
1468 }
1469
1470
1471 /**
1472  * Functions of this signature are called whenever writing operations
1473  * on a stream are executed
1474  *
1475  * @param cls the closure from GNUNET_STREAM_write
1476  * @param status the status of the stream at the time this function is called;
1477  *          GNUNET_STREAM_OK if writing to stream was completed successfully;
1478  *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1479  *          (this doesn't mean that the data is never sent, the receiver may
1480  *          have read the data but its ACKs may have been lost);
1481  *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1482  *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1483  *          be processed.
1484  * @param size the number of bytes written
1485  */
1486 static void 
1487 write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1488 {
1489   struct ConsensusPeerInformation *cpi;
1490   struct GNUNET_HashCode key;
1491   struct GNUNET_CONSENSUS_Element *element;
1492   struct GNUNET_MessageHeader *element_msg;
1493   int side;
1494   int msize;
1495
1496   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n");
1497
1498   cpi = cls;
1499   cpi->wh = NULL;
1500
1501   if (NULL == cpi->diff_ibf)
1502   {
1503     GNUNET_assert (NULL != cpi->incoming_ibf);
1504     GNUNET_assert (NULL != cpi->outgoing_ibf);
1505     GNUNET_assert (cpi->outgoing_ibf->size == cpi->incoming_ibf->size);
1506     cpi->diff_ibf = ibf_dup (cpi->incoming_ibf);
1507     ibf_subtract (cpi->diff_ibf, cpi->outgoing_ibf);
1508   }
1509
1510   for (;;)
1511   {
1512     int res;
1513     res = ibf_decode (cpi->diff_ibf, &side, &key);
1514     if (GNUNET_SYSERR == res)
1515     {
1516       /* TODO: handle this correctly, request new ibf */
1517       GNUNET_break (0);
1518       return;
1519     }
1520     if (GNUNET_NO == res)
1521     {
1522       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n");
1523       return;
1524     }
1525     if (-1 == side)
1526       break;
1527   }
1528
1529   element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &key);
1530
1531   if (NULL == element)
1532   {
1533     /* FIXME: handle correctly */
1534     GNUNET_break (0);
1535     return;
1536   }
1537
1538   msize = sizeof (struct GNUNET_MessageHeader) + element->size;
1539
1540   element_msg = GNUNET_malloc (msize);
1541   element_msg->size = htons (msize);
1542   element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
1543
1544
1545   GNUNET_assert (NULL != element->data);
1546
1547
1548   memcpy (&element_msg[1], element->data, element->size);
1549
1550   cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1551                                  write_values, cpi);
1552
1553   GNUNET_free (element_msg);
1554
1555
1556   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n");
1557
1558   GNUNET_assert (NULL != cpi->wh);
1559 }
1560
1561
1562 /**
1563  * Called when a client performs the conclude operation.
1564  *
1565  * @param cls (unused)
1566  * @param client client handle
1567  * @param message message sent by the client
1568  */
1569 void
1570 client_conclude (void *cls,
1571              struct GNUNET_SERVER_Client *client,
1572              const struct GNUNET_MessageHeader *message)
1573 {
1574   struct ConsensusSession *session;
1575   int i;
1576
1577   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
1578
1579   session = sessions_head;
1580   while ((session != NULL) && (session->client != client))
1581     session = session->next;
1582   if (NULL == session)
1583   {
1584     /* client not found */
1585     GNUNET_break (0);
1586     GNUNET_SERVER_client_disconnect (client);
1587     return;
1588   }
1589
1590   if (GNUNET_YES == session->conclude_requested)
1591   {
1592     /* client requested conclude twice */
1593     GNUNET_break (0);
1594     disconnect_client (client);
1595     return;
1596   }
1597
1598   session->conclude_requested = GNUNET_YES;
1599
1600   /* FIXME: write to already connected sockets */
1601
1602   for (i = 0; i < session->num_peers; i++)
1603   {
1604     if ( (GNUNET_YES == session->info[i].is_outgoing) &&
1605          (GNUNET_YES == session->info[i].hello) )
1606     {
1607       /* kick off transmitting strata by calling the write continuation */
1608       write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
1609     }
1610   }
1611
1612   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1613   send_next (session);
1614 }
1615
1616
1617 /**
1618  * Called when a client sends an ack
1619  *
1620  * @param cls (unused)
1621  * @param client client handle
1622  * @param message message sent by the client
1623  */
1624 void
1625 client_ack (void *cls,
1626              struct GNUNET_SERVER_Client *client,
1627              const struct GNUNET_MessageHeader *message)
1628 {
1629   struct ConsensusSession *session;
1630   struct GNUNET_CONSENSUS_AckMessage *msg;
1631   struct PendingElement *pending;
1632   struct GNUNET_CONSENSUS_Element *element;
1633   struct GNUNET_HashCode key;
1634
1635   session = sessions_head;
1636   while (NULL != session)
1637   {
1638     if (session->client == client)
1639       break;
1640   }
1641
1642   if (NULL == session)
1643   {
1644     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n");
1645     GNUNET_SERVER_client_disconnect (client);
1646     return;
1647   }
1648
1649   pending = session->approval_pending_head;
1650
1651   GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, session->approval_pending_tail, pending);
1652
1653   msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
1654
1655   if (msg->keep)
1656   {
1657
1658     element = pending->element;
1659
1660     GNUNET_CRYPTO_hash (element, element->size, &key);
1661
1662     GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1663                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1664
1665     strata_insert (session->strata, &key);
1666   }
1667
1668   /* FIXME: also remove element from strata */
1669
1670   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1671 }
1672
1673 /**
1674  * Task that disconnects from core.
1675  *
1676  * @param cls core handle
1677  * @param tc context information (why was this task triggered now)
1678  */
1679 static void
1680 disconnect_core (void *cls,
1681                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1682 {
1683   GNUNET_CORE_disconnect (core);
1684   core = NULL;
1685   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
1686 }
1687
1688
1689 static void
1690 core_startup (void *cls,
1691               struct GNUNET_CORE_Handle *core,
1692               const struct GNUNET_PeerIdentity *peer)
1693 {
1694   struct ConsensusSession *session;
1695
1696   my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
1697   /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
1698   GNUNET_SCHEDULER_add_now (&disconnect_core, core);
1699   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
1700
1701   session = sessions_head;
1702   while (NULL != session)
1703   {
1704     if (NULL != session->join_msg)
1705       initialize_session (session);
1706     session = session->next;
1707   }
1708 }
1709
1710
1711 /**
1712  * Called to clean up, after a shutdown has been requested.
1713  *
1714  * @param cls closure
1715  * @param tc context information (why was this task triggered now)
1716  */
1717 static void
1718 shutdown_task (void *cls,
1719                const struct GNUNET_SCHEDULER_TaskContext *tc)
1720 {
1721
1722   /* FIXME: complete; write separate destructors for different data types */
1723
1724   while (NULL != incoming_sockets_head)
1725   {
1726     struct IncomingSocket *socket;
1727     socket = incoming_sockets_head;
1728     if (NULL == socket->cpi)
1729     {
1730       GNUNET_STREAM_close (socket->socket);
1731     }
1732     incoming_sockets_head = incoming_sockets_head->next;
1733     GNUNET_free (socket);
1734   }
1735
1736   while (NULL != sessions_head)
1737   {
1738     struct ConsensusSession *session;
1739     int i;
1740
1741     session = sessions_head;
1742
1743     for (i = 0; session->num_peers; i++)
1744     {
1745       struct ConsensusPeerInformation *cpi;
1746       cpi = &session->info[i];
1747       if ((NULL != cpi) && (NULL != cpi->socket))
1748       {
1749         GNUNET_STREAM_close (cpi->socket);
1750       }
1751     }
1752
1753     if (NULL != session->client)
1754       GNUNET_SERVER_client_disconnect (session->client);
1755
1756     sessions_head = sessions_head->next;
1757     GNUNET_free (session);
1758   }
1759
1760   if (NULL != core)
1761   {
1762     GNUNET_CORE_disconnect (core);
1763     core = NULL;
1764   }
1765
1766   if (NULL != listener)
1767   {
1768     GNUNET_STREAM_listen_close (listener);
1769     listener = NULL;
1770   } 
1771
1772   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
1773 }
1774
1775
1776 /**
1777  * Start processing consensus requests.
1778  *
1779  * @param cls closure
1780  * @param server the initialized server
1781  * @param c configuration to use
1782  */
1783 static void
1784 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
1785 {
1786   static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
1787     {NULL, 0, 0}
1788   };
1789   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1790     {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1791     {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1792     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1793         sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
1794     {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
1795         sizeof (struct GNUNET_CONSENSUS_AckMessage)},
1796     {NULL, NULL, 0, 0}
1797   };
1798
1799   cfg = c;
1800   srv = server;
1801
1802   GNUNET_SERVER_add_handlers (server, server_handlers);
1803
1804   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1805
1806
1807   listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
1808                                    listen_cb, NULL,
1809                                    GNUNET_STREAM_OPTION_END);
1810
1811
1812   /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
1813   core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
1814   GNUNET_assert (NULL != core);
1815
1816   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1817 }
1818
1819
1820 /**
1821  * The main function for the consensus service.
1822  *
1823  * @param argc number of arguments from the command line
1824  * @param argv command line arguments
1825  * @return 0 ok, 1 on error
1826  */
1827 int
1828 main (int argc, char *const *argv)
1829 {
1830   int ret;
1831   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1832   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
1833   return (GNUNET_OK == ret) ? 0 : 1;
1834 }
1835