-cleanup and some remarks
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct UnionEvaluateOperation
104 {
105   /**
106    * Local set the operation is evaluated on.
107    */
108   struct Set *set;
109
110   /**
111    * Peer with the remote set
112    */
113   struct GNUNET_PeerIdentity peer;
114
115   /**
116    * Application-specific identifier
117    */
118   struct GNUNET_HashCode app_id;
119
120   /**
121    * Context message, given to us
122    * by the client, may be NULL.
123    */
124   struct GNUNET_MessageHeader *context_msg;
125
126   /**
127    * Stream socket connected to the other peer
128    */
129   struct GNUNET_STREAM_Socket *socket;
130
131   /**
132    * Message queue for the peer on the other
133    * end
134    */
135   struct GNUNET_MQ_MessageQueue *mq;
136
137   /**
138    * Request ID to multiplex set operations to
139    * the client inhabiting the set.
140    */
141   uint32_t request_id;
142
143   /**
144    * Number of ibf buckets received
145    */
146   unsigned int ibf_buckets_received;
147
148   /**
149    * Copy of the set's strata estimator at the time of
150    * creation of this operation
151    */
152   struct StrataEstimator *se;
153
154   /**
155    * The ibf we currently receive
156    */
157   struct InvertibleBloomFilter *remote_ibf;
158
159   /**
160    * IBF of the set's element.
161    */
162   struct InvertibleBloomFilter *local_ibf;
163
164   /**
165    * Maps IBF-Keys (specific to the current salt) to elements.
166    */
167   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
168
169   /**
170    * Current state of the operation.
171    */
172   enum UnionOperationPhase phase;
173
174   /**
175    * Salt to use for this operation.
176    */
177   uint16_t salt;
178
179   /**
180    * Generation in which the operation handle
181    * was created.
182    */
183   unsigned int generation_created;
184   
185   /**
186    * Evaluate operations are held in
187    * a linked list.
188    */
189   struct UnionEvaluateOperation *next;
190   
191    /**
192    * Evaluate operations are held in
193    * a linked list.
194    */
195   struct UnionEvaluateOperation *prev;
196 };
197
198
199 /**
200  * Information about the element in a set.
201  * All elements are stored in a hash-table
202  * from their hash-code to their 'struct Element',
203  * so that the remove and add operations are reasonably
204  * fast.
205  */
206 struct ElementEntry
207 {
208   /**
209    * The actual element. The data for the element
210    * should be allocated at the end of this struct.
211    */
212   struct GNUNET_SET_Element element;
213
214   /**
215    * Hash of the element.
216    * Will be used to derive the different IBF keys
217    * for different salts.
218    */
219   struct GNUNET_HashCode element_hash;
220
221   /**
222    * Generation the element was added by the client.
223    * Operations of earlier generations will not consider the element.
224    */
225   unsigned int generation_added;
226
227   /**
228    * GNUNET_YES if the element has been removed in some generation.
229    */
230   int removed;
231
232   /**
233    * Generation the element was removed by the client. 
234    * Operations of later generations will not consider the element.
235    * Only valid if is_removed is GNUNET_YES.
236    */
237   unsigned int generation_removed;
238
239   /**
240    * GNUNET_YES if the element is a remote element, and does not belong
241    * to the operation's set.
242    */
243   int remote;
244 };
245
246
247 /**
248  * Information about the element used for 
249  * a specific union operation.
250  */
251 struct KeyEntry
252 {
253   /**
254    * IBF key for the entry, derived from the current salt.
255    */
256   struct IBF_Key ibf_key;
257
258   /**
259    * The actual element associated with the key
260    */
261   struct ElementEntry *element;
262
263   /**
264    * Element that collides with this element
265    * on the ibf key
266    */
267   struct KeyEntry *next_colliding;
268 };
269
270 /**
271  * Used as a closure for sending elements
272  * with a specific IBF key.
273  */
274 struct SendElementClosure
275 {
276   /**
277    * The IBF key whose matching elements should be
278    * sent.
279    */
280   struct IBF_Key ibf_key;
281
282   /**
283    * Operation for which the elements
284    * should be sent.
285    */
286   struct UnionEvaluateOperation *eo;
287 };
288
289
290 /**
291  * Extra state required for efficient set union.
292  */
293 struct UnionState
294 {
295   /**
296    * The strata estimator is only generated once for
297    * each set.
298    * The IBF keys are derived from the element hashes with
299    * salt=0.
300    */
301   struct StrataEstimator *se;
302
303   /**
304    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
305    */
306   struct GNUNET_CONTAINER_MultiHashMap *elements;
307
308   /**
309    * Evaluate operations are held in
310    * a linked list.
311    */
312   struct UnionEvaluateOperation *ops_head;
313
314   /**
315    * Evaluate operations are held in
316    * a linked list.
317    */
318   struct UnionEvaluateOperation *ops_tail;
319
320   /**
321    * Current generation, that is, number of
322    * previously executed operations on this set
323    */
324   unsigned int current_generation;
325 };
326
327
328
329 /**
330  * Iterator over hash map entries.
331  *
332  * @param cls closure
333  * @param key current key code
334  * @param value value in the hash map
335  * @return GNUNET_YES if we should continue to
336  *         iterate,
337  *         GNUNET_NO if not.
338  */
339 static int
340 destroy_elements_iterator (void *cls,
341                            const struct GNUNET_HashCode * key,
342                            void *value)
343 {
344   struct ElementEntry *ee = value;
345
346   GNUNET_free (ee);
347   return GNUNET_YES;
348 }
349
350
351 /**
352  * Destroy the elements belonging to a union set.
353  *
354  * @param us union state that contains the elements
355  */
356 static void
357 destroy_elements (struct UnionState *us)
358 {
359   if (NULL == us->elements)
360     return;
361   GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
362   GNUNET_CONTAINER_multihashmap_destroy (us->elements);
363   us->elements = NULL;
364 }
365
366
367
368 /**
369  * Iterator over hash map entries.
370  *
371  * @param cls closure
372  * @param key current key code
373  * @param value value in the hash map
374  * @return GNUNET_YES if we should continue to
375  *         iterate,
376  *         GNUNET_NO if not.
377  */
378 static int
379 destroy_key_to_element_iter (void *cls,
380                              uint32_t key,
381                              void *value)
382 {
383   struct KeyEntry *k = value;
384   
385   while (NULL != k)
386   {
387     struct KeyEntry *k_tmp = k;
388     k = k->next_colliding;
389     GNUNET_free (k_tmp);
390   }
391   return GNUNET_YES;
392 }
393
394
395 /**
396  * Destroy a union operation, and free all resources
397  * associated with it.
398  *
399  * @param eo the union operation to destroy
400  */
401 static void
402 destroy_union_operation (struct UnionEvaluateOperation *eo)
403 {
404   if (NULL != eo->mq)
405   {
406     GNUNET_MQ_destroy (eo->mq);
407     eo->mq = NULL;
408   }
409   if (NULL != eo->socket)
410   {
411     GNUNET_STREAM_close (eo->socket);
412     eo->socket = NULL;
413   }
414   if (NULL != eo->remote_ibf)
415   {
416     ibf_destroy (eo->remote_ibf);
417     eo->remote_ibf = NULL;
418   }
419   if (NULL != eo->local_ibf)
420   {
421     ibf_destroy (eo->local_ibf);
422     eo->local_ibf = NULL;
423   }
424   if (NULL != eo->se)
425   {
426     strata_estimator_destroy (eo->se);
427     eo->se = NULL;
428   }
429   if (NULL != eo->key_to_element)
430   {
431     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
432     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
433     eo->key_to_element = NULL;
434   }
435
436
437   GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
438                                eo->set->state.u->ops_tail,
439                                eo);
440   GNUNET_free (eo);
441   /* FIXME: free and destroy everything else */
442 }
443
444
445 /**
446  * Inform the client that the union operation has failed,
447  * and proceed to destroy the evaluate operation.
448  *
449  * @param eo the union operation to fail
450  */
451 static void
452 fail_union_operation (struct UnionEvaluateOperation *eo)
453 {
454   struct GNUNET_MQ_Message *mqm;
455   struct ResultMessage *msg;
456
457   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
458   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
459   msg->request_id = htonl (eo->request_id);
460   GNUNET_MQ_send (eo->set->client_mq, mqm);
461   destroy_union_operation (eo);
462 }
463
464
465 /**
466  * Derive the IBF key from a hash code and 
467  * a salt.
468  *
469  * @param src the hash code
470  * @param salt salt to use
471  * @return the derived IBF key
472  */
473 static struct IBF_Key
474 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
475 {
476   struct IBF_Key key;
477
478   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
479                       GCRY_MD_SHA512, GCRY_MD_SHA256,
480                       src, sizeof *src,
481                       &salt, sizeof (salt),
482                       NULL, 0);
483   return key;
484 }
485
486
487 /**
488  * Send a request for the evaluate operation to a remote peer
489  *
490  * @param eo operation with the other peer
491  */
492 static void
493 send_operation_request (struct UnionEvaluateOperation *eo)
494 {
495   struct GNUNET_MQ_Message *mqm;
496   struct OperationRequestMessage *msg;
497
498   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
499   if (NULL != eo->context_msg)
500     if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
501     {
502       /* the context message is too large */
503       GNUNET_break (0);
504       GNUNET_SERVER_client_disconnect (eo->set->client);
505       GNUNET_MQ_discard (mqm);
506       return;
507     }
508   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
509   msg->app_id = eo->app_id;
510   GNUNET_MQ_send (eo->mq, mqm);
511
512   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
513 }
514
515
516 /**
517  * Iterator to create the mapping between ibf keys
518  * and element entries.
519  *
520  * @param cls closure
521  * @param key current key code
522  * @param value value in the hash map
523  * @return GNUNET_YES if we should continue to
524  *         iterate,
525  *         GNUNET_NO if not.
526  */
527 static int
528 insert_element_iterator (void *cls,
529                          uint32_t key,
530                          void *value)
531 {
532   struct KeyEntry *const new_k = cls;
533   struct KeyEntry *old_k = value;
534
535   GNUNET_assert (NULL != old_k);
536   do
537   {
538     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
539     {
540       new_k->next_colliding = old_k;
541       old_k->next_colliding = new_k;
542       return GNUNET_NO;
543     }
544     old_k = old_k->next_colliding;
545   } while (NULL != old_k);
546   return GNUNET_YES;
547 }
548
549
550 /**
551  * Insert an element into the union operation's
552  * key-to-element mapping
553  *
554  * @param the union operation
555  * @param ee the element entry
556  */
557 static void
558 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
559 {
560   int ret;
561   struct IBF_Key ibf_key;
562   struct KeyEntry *k;
563
564   ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
565   k = GNUNET_new (struct KeyEntry);
566   k->element = ee;
567   k->ibf_key = ibf_key;
568   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
569                                                       (uint32_t) ibf_key.key_val,
570                                                       insert_element_iterator, k);
571   /* was the element inserted into a colliding bucket? */
572   if (GNUNET_SYSERR == ret)
573   {
574     GNUNET_assert (NULL != k->next_colliding);
575     return;
576   }
577   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
578                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
579 }
580
581
582 /**
583  * Insert a key into an ibf.
584  *
585  * @param cls the ibf
586  * @param key unused
587  * @param value the key entry to get the key from
588  */
589 static int
590 prepare_ibf_iterator (void *cls,
591                       uint32_t key,
592                       void *value)
593 {
594   struct InvertibleBloomFilter *ibf = cls;
595   struct KeyEntry *ke = value;
596
597   ibf_insert (ibf, ke->ibf_key);
598   return GNUNET_YES;
599 }
600
601
602 /**
603  * Iterator for initializing the
604  * key-to-element mapping of a union operation
605  *
606  * @param cls the union operation
607  * @param key unised
608  * @param value the element entry to insert
609  *        into the key-to-element mapping
610  */
611 static int
612 init_key_to_element_iterator (void *cls,
613                               const struct GNUNET_HashCode *key,
614                               void *value)
615 {
616   struct UnionEvaluateOperation *eo = cls;
617   struct ElementEntry *e = value;
618
619   /* make sure that the element belongs to the set at the time
620    * of creating the operation */
621   if ( (e->generation_added > eo->generation_created) ||
622        ( (GNUNET_YES == e->removed) &&
623          (e->generation_removed < eo->generation_created)))
624     return GNUNET_YES;
625
626   insert_element (eo, e);
627   return GNUNET_YES;
628 }
629
630
631 /**
632  * Create an ibf with the operation's elements
633  * of the specified size
634  *
635  * @param eo the union operation
636  * @param size size of the ibf to create
637  */
638 static void
639 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
640 {
641   if (NULL == eo->key_to_element)
642   {
643     unsigned int len;
644     len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
645     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
646     GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
647                                              init_key_to_element_iterator, eo);
648   }
649   if (NULL != eo->local_ibf)
650     ibf_destroy (eo->local_ibf);
651   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
652   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
653                                            prepare_ibf_iterator, eo->local_ibf);
654 }
655
656
657 /**
658  * Send an ibf of appropriate size.
659  *
660  * @param eo the union operation
661  * @param ibf_order order of the ibf to send, size=2^order
662  */
663 static void
664 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
665 {
666   unsigned int buckets_sent = 0;
667   struct InvertibleBloomFilter *ibf;
668
669   prepare_ibf (eo, 1<<ibf_order);
670
671   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
672
673   ibf = eo->local_ibf;
674
675   while (buckets_sent < (1 << ibf_order))
676   {
677     unsigned int buckets_in_message;
678     struct GNUNET_MQ_Message *mqm;
679     struct IBFMessage *msg;
680
681     buckets_in_message = (1 << ibf_order) - buckets_sent;
682     /* limit to maximum */
683     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
684       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
685
686     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
687                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
688     msg->order = ibf_order;
689     msg->offset = htons (buckets_sent);
690     ibf_write_slice (ibf, buckets_sent,
691                      buckets_in_message, &msg[1]);
692     buckets_sent += buckets_in_message;
693     GNUNET_MQ_send (eo->mq, mqm);
694   }
695
696   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
697 }
698
699
700 /**
701  * Send a strata estimator to the remote peer.
702  *
703  * @param eo the union operation with the remote peer
704  */
705 static void
706 send_strata_estimator (struct UnionEvaluateOperation *eo)
707 {
708   struct GNUNET_MQ_Message *mqm;
709   struct GNUNET_MessageHeader *strata_msg;
710
711   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
712                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
713                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
714   strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
715   GNUNET_MQ_send (eo->mq, mqm);
716   eo->phase = PHASE_EXPECT_IBF;
717 }
718
719
720 /**
721  * Compute the necessary order of an ibf
722  * from the size of the symmetric set difference.
723  *
724  * @param diff the difference
725  * @return the required size of the ibf
726  */
727 static unsigned int
728 get_order_from_difference (unsigned int diff)
729 {
730   unsigned int ibf_order;
731
732   ibf_order = 2;
733   while ((1<<ibf_order) < (2 * diff))
734     ibf_order++;
735   if (ibf_order > MAX_IBF_ORDER)
736     ibf_order = MAX_IBF_ORDER;
737   return ibf_order;
738 }
739
740
741 /**
742  * Handle a strata estimator from a remote peer
743  *
744  * @param the union operation
745  * @param mh the message
746  */
747 static void
748 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
749 {
750   struct UnionEvaluateOperation *eo = cls;
751   struct StrataEstimator *remote_se;
752   int diff;
753
754
755   if (eo->phase != PHASE_EXPECT_SE)
756   {
757     fail_union_operation (eo);
758     GNUNET_break (0);
759     return;
760   }
761   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
762                                        SE_IBF_HASH_NUM);
763   strata_estimator_read (&mh[1], remote_se);
764   GNUNET_assert (NULL != eo->se);
765   diff = strata_estimator_difference (remote_se, eo->se);
766   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
767   strata_estimator_destroy (remote_se);
768   strata_estimator_destroy (eo->se);
769   eo->se = NULL;
770   send_ibf (eo, get_order_from_difference (diff));
771 }
772
773
774
775 /**
776  * Iterator to send elements to a remote peer
777  *
778  * @param cls closure with the element key and the union operation
779  * @param key ignored
780  * @param value the key entry
781  */
782 static int
783 send_element_iterator (void *cls,
784                       uint32_t key,
785                       void *value)
786 {
787   struct SendElementClosure *sec = cls;
788   struct IBF_Key ibf_key = sec->ibf_key;
789   struct UnionEvaluateOperation *eo = sec->eo;
790   struct KeyEntry *ke = value;
791
792   if (ke->ibf_key.key_val != ibf_key.key_val)
793     return GNUNET_YES;
794   while (NULL != ke)
795   {
796     const struct GNUNET_SET_Element *const element = &ke->element->element;
797     struct GNUNET_MQ_Message *mqm;
798
799     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
800     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
801     if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
802     {
803       GNUNET_break (0);
804       GNUNET_MQ_discard (mqm);
805       continue;
806     }
807     GNUNET_MQ_send (eo->mq, mqm);
808     ke = ke->next_colliding;
809   }
810   return GNUNET_NO;
811 }
812
813 /**
814  * Send all elements that have the specified IBF key
815  * to the remote peer of the union operation
816  *
817  * @param eo union operation
818  * @param ibf_key IBF key of interest
819  */
820 static void
821 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
822 {
823   struct SendElementClosure send_cls;
824
825   send_cls.ibf_key = ibf_key;
826   send_cls.eo = eo;
827   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
828                                                 &send_element_iterator, &send_cls);
829 }
830
831
832 /**
833  * Decode which elements are missing on each side, and
834  * send the appropriate elemens and requests
835  *
836  * @param eo union operation
837  */
838 static void
839 decode_and_send (struct UnionEvaluateOperation *eo)
840 {
841   struct IBF_Key key;
842   int side;
843   struct InvertibleBloomFilter *diff_ibf;
844
845   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
846
847   prepare_ibf (eo, eo->remote_ibf->size);
848   diff_ibf = ibf_dup (eo->local_ibf);
849   ibf_subtract (diff_ibf, eo->remote_ibf);
850
851   while (1)
852   {
853     int res;
854
855     res = ibf_decode (diff_ibf, &side, &key);
856     if (GNUNET_SYSERR == res)
857     {
858       int next_order;
859       next_order = 0;
860       while (1<<next_order < diff_ibf->size)
861         next_order++;
862       next_order++;
863       if (next_order <= MAX_IBF_ORDER)
864       {
865         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
866                     "decoding failed, sending larger ibf (size %u)\n",
867                     1<<next_order);
868         send_ibf (eo, next_order);
869       }
870       else
871       {
872         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
873                     "set union failed: reached ibf limit\n");
874       }
875       break;
876     }
877     if (GNUNET_NO == res)
878     {
879       struct GNUNET_MQ_Message *mqm;
880
881       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
882       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
883       GNUNET_MQ_send (eo->mq, mqm);
884       break;
885     }
886     if (1 == side)
887     {
888       send_elements_for_key (eo, key);
889     }
890     else
891     {
892       struct GNUNET_MQ_Message *mqm;
893       struct GNUNET_MessageHeader *msg;
894
895       /* FIXME: before sending the request, check if we may just have the element */
896       /* FIXME: merge multiple requests */
897       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
898                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
899       *(struct IBF_Key *) &msg[1] = key;
900       GNUNET_MQ_send (eo->mq, mqm);
901     }
902   }
903   ibf_destroy (diff_ibf);
904 }
905
906
907 /**
908  * Handle an IBF message from a remote peer.
909  *
910  * @param cls the union operation
911  * @param mh the header of the message
912  */
913 static void
914 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
915 {
916   struct UnionEvaluateOperation *eo = cls;
917   struct IBFMessage *msg = (struct IBFMessage *) mh;
918   unsigned int buckets_in_message;
919
920   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
921        (eo->phase == PHASE_EXPECT_IBF) )
922   {
923     eo->phase = PHASE_EXPECT_IBF_CONT;
924     GNUNET_assert (NULL == eo->remote_ibf);
925     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
926     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
927     if (0 != ntohs (msg->offset))
928     {
929       GNUNET_break (0);
930       fail_union_operation (eo);
931     }
932   }
933   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
934   {
935     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
936          (1<<msg->order != eo->remote_ibf->size) )
937     {
938       GNUNET_break (0);
939       fail_union_operation (eo);
940       return;
941     }
942   }
943
944   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
945
946   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
947   {
948     GNUNET_break (0);
949     fail_union_operation (eo);
950     return;
951   }
952   
953   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
954   eo->ibf_buckets_received += buckets_in_message;
955
956   if (eo->ibf_buckets_received == eo->remote_ibf->size)
957   {
958
959     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
960     eo->phase = PHASE_EXPECT_ELEMENTS;
961     decode_and_send (eo);
962   }
963 }
964
965
966 /**
967  * Send a result message to the client indicating
968  * that there is a new element.
969  *
970  * @param eo union operation
971  * @param element element to send
972  */
973 static void
974 send_client_element (struct UnionEvaluateOperation *eo,
975                      struct GNUNET_SET_Element *element)
976 {
977   struct GNUNET_MQ_Message *mqm;
978   struct ResultMessage *rm;
979
980   GNUNET_assert (0 != eo->request_id);
981   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
982   rm->result_status = htons (GNUNET_SET_STATUS_OK);
983   rm->request_id = htonl (eo->request_id);
984   if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
985   {
986     GNUNET_MQ_discard (mqm);
987     GNUNET_break (0);
988     return;
989   }
990
991   GNUNET_MQ_send (eo->set->client_mq, mqm);
992 }
993
994
995 /**
996  * Callback used for notifications
997  *
998  * @param cls closure
999  */
1000 static void
1001 client_done_sent_cb (void *cls)
1002 {
1003   //struct UnionEvaluateOperation *eo = cls;
1004   /* FIXME: destroy eo */
1005 }
1006
1007
1008 /**
1009  * Send a result message to the client indicating
1010  * that the operation is over.
1011  * After the result done message has been sent to the client,
1012  * destroy the evaluate operation.
1013  *
1014  * @param eo union operation
1015  * @param element element to send
1016  */
1017 static void
1018 send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1019 {
1020   struct GNUNET_MQ_Message *mqm;
1021   struct ResultMessage *rm;
1022
1023   GNUNET_assert (0 != eo->request_id);
1024   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1025   rm->request_id = htonl (eo->request_id);
1026   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1027   GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo);
1028   GNUNET_MQ_send (eo->set->client_mq, mqm);
1029
1030   /* FIXME: destroy the eo */
1031 }
1032
1033
1034 /**
1035  * Handle an element message from a remote peer.
1036  *
1037  * @param cls the union operation
1038  * @param mh the message
1039  */
1040 static void
1041 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1042 {
1043   struct UnionEvaluateOperation *eo = cls;
1044   struct ElementEntry *ee;
1045   uint16_t element_size;
1046
1047   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
1048
1049   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1050        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1051   {
1052     fail_union_operation (eo);
1053     GNUNET_break (0);
1054     return;
1055   }
1056   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1057   ee = GNUNET_malloc (sizeof *eo + element_size);
1058   memcpy (&ee[1], &mh[1], element_size);
1059   ee->element.data = &ee[1];
1060   ee->remote = GNUNET_YES;
1061
1062   insert_element (eo, ee);
1063   send_client_element (eo, &ee->element);
1064
1065   GNUNET_free (ee);
1066 }
1067
1068
1069 /**
1070  * Handle an element request from a remote peer.
1071  *
1072  * @param cls the union operation
1073  * @param mh the message
1074  */
1075 static void
1076 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1077 {
1078   struct UnionEvaluateOperation *eo = cls;
1079   struct IBF_Key *ibf_key;
1080   unsigned int num_keys;
1081
1082   /* look up elements and send them */
1083   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1084   {
1085     GNUNET_break (0);
1086     fail_union_operation (eo);
1087     return;
1088   }
1089
1090   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1091
1092   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1093   {
1094     GNUNET_break (0);
1095     fail_union_operation (eo);
1096     return;
1097   }
1098
1099   ibf_key = (struct IBF_Key *) &mh[1];
1100   while (0 != num_keys--)
1101   {
1102     send_elements_for_key (eo, *ibf_key);
1103     ibf_key++;
1104   }
1105 }
1106
1107
1108 /**
1109  * Callback used for notifications
1110  *
1111  * @param cls closure
1112  */
1113 static void
1114 peer_done_sent_cb (void *cls)
1115 {
1116   struct UnionEvaluateOperation *eo = cls;
1117
1118   send_client_done_and_destroy (eo);
1119 }
1120
1121
1122 /**
1123  * Handle a done message from a remote peer
1124  * 
1125  * @param cls the union operation
1126  * @param mh the message
1127  */
1128 static void
1129 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1130 {
1131   struct UnionEvaluateOperation *eo = cls;
1132
1133   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1134   {
1135     /* we got all requests, but still have to send our elements as response */
1136     struct GNUNET_MQ_Message *mqm;
1137
1138     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
1139     eo->phase = PHASE_FINISHED;
1140     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1141     GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
1142     GNUNET_MQ_send (eo->mq, mqm);
1143     return;
1144   }
1145   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1146   {
1147     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
1148     eo->phase = PHASE_FINISHED;
1149     send_client_done_and_destroy (eo);
1150     return;
1151   }
1152   GNUNET_break (0);
1153   fail_union_operation (eo);
1154 }
1155
1156
1157 /**
1158  * The handlers array, used for both evaluate and accept
1159  */
1160 static const struct GNUNET_MQ_Handler union_handlers[] = {
1161   {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
1162   {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
1163   {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
1164   {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
1165   {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
1166   GNUNET_MQ_HANDLERS_END
1167 };
1168
1169
1170 /**
1171  * Functions of this type will be called when a stream is established
1172  * 
1173  * @param cls the closure from GNUNET_STREAM_open
1174  * @param socket socket to use to communicate with the
1175  *        other side (read/write)
1176  */
1177 static void
1178 stream_open_cb (void *cls,
1179                 struct GNUNET_STREAM_Socket *socket)
1180 {
1181   struct UnionEvaluateOperation *eo = cls;
1182
1183   GNUNET_assert (NULL == eo->mq);
1184   GNUNET_assert (socket == eo->socket);
1185   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
1186               "open cb successful\n");
1187   eo->mq = GNUNET_STREAM_mq_create (eo->socket, union_handlers, NULL, eo);
1188   /* we started the operation, thus we have to send the operation request */
1189   send_operation_request (eo);
1190   eo->phase = PHASE_EXPECT_SE;
1191 }
1192
1193
1194 /**
1195  * Evaluate a union operation with
1196  * a remote peer.
1197  *
1198  * @param m the evaluate request message from the client
1199  * @parem set the set to evaluate the operation with
1200  */
1201 void
1202 _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
1203 {
1204   struct UnionEvaluateOperation *eo;
1205
1206   eo = GNUNET_new (struct UnionEvaluateOperation);
1207   eo->peer = m->peer;
1208   eo->set = set;
1209   eo->request_id = htonl (m->request_id);
1210   GNUNET_assert (0 != eo->request_id);
1211   eo->se = strata_estimator_dup (set->state.u->se);
1212   eo->salt = ntohs (m->salt);
1213   eo->app_id = m->app_id;
1214
1215   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
1216               "evaluating union operation, (app %s)\n", 
1217               GNUNET_h2s (&eo->app_id));
1218
1219   eo->socket = 
1220       GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
1221                           &stream_open_cb, eo,
1222                           GNUNET_STREAM_OPTION_END);
1223   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1224                                eo->set->state.u->ops_tail,
1225                                eo);
1226   /* the stream open callback will kick off the operation */
1227 }
1228
1229
1230 /**
1231  * Accept an union operation request from a remote peer
1232  *
1233  * @param m the accept message from the client
1234  * @param set the set of the client
1235  * @param incoming information about the requesting remote peer
1236  */
1237 void
1238 _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
1239                    struct Incoming *incoming)
1240 {
1241   struct UnionEvaluateOperation *eo;
1242
1243   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1244
1245   eo = GNUNET_new (struct UnionEvaluateOperation);
1246   eo->generation_created = set->state.u->current_generation++;
1247   eo->set = set;
1248   eo->peer = incoming->peer;
1249   eo->salt = ntohs (incoming->salt);
1250   GNUNET_assert (0 != ntohl (m->request_id));
1251   eo->request_id = ntohl (m->request_id);
1252   eo->se = strata_estimator_dup (set->state.u->se);
1253   eo->set = set; // FIXME: redundant!?
1254   eo->mq = incoming->mq;
1255   /* transfer ownership of mq and socket from incoming to eo */
1256   incoming->mq = NULL;
1257   eo->socket = incoming->socket;
1258   incoming->socket = NULL;
1259   /* the peer's socket is now ours, we'll receive all messages */
1260   GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
1261
1262   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1263                                eo->set->state.u->ops_tail,
1264                                eo);
1265
1266   /* kick off the operation */
1267   send_strata_estimator (eo);
1268 }
1269
1270
1271 /**
1272  * Create a new set supporting the union operation
1273  *
1274  * @return the newly created set
1275  */
1276 struct Set *
1277 _GSS_union_set_create (void)
1278 {
1279   struct Set *set;
1280
1281   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
1282   
1283   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
1284   set->state.u = (struct UnionState *) &set[1];
1285   set->operation = GNUNET_SET_OPERATION_UNION;
1286   /* keys of the hash map are stored in the element entrys, thus we do not
1287    * want the hash map to copy them */
1288   set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1289   set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
1290                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1291   return set;
1292 }
1293
1294
1295 /**
1296  * Add the element from the given element message to the set.
1297  *
1298  * @param m message with the element
1299  * @param set set to add the element to
1300  */
1301 void
1302 _GSS_union_add (struct ElementMessage *m, struct Set *set)
1303 {
1304   struct ElementEntry *ee;
1305   struct ElementEntry *ee_dup;
1306   uint16_t element_size;
1307
1308   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
1309
1310   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1311   element_size = ntohs (m->header.size) - sizeof *m;
1312   ee = GNUNET_malloc (element_size + sizeof *ee);
1313   ee->element.size = element_size;
1314   memcpy (&ee[1], &m[1], element_size);
1315   ee->element.data = &ee[1];
1316   ee->generation_added = set->state.u->current_generation;
1317   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1318   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1319   if (NULL != ee_dup)
1320   {
1321     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1322     GNUNET_free (ee);
1323     return;
1324   }
1325   GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
1326                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1327   strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
1328 }
1329
1330
1331 /**
1332  * Destroy a set that supports the union operation
1333  *
1334  * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
1335  */
1336 void
1337 _GSS_union_set_destroy (struct Set *set)
1338 {
1339   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1340   if (NULL != set->client)
1341   {
1342     GNUNET_SERVER_client_drop (set->client);
1343     set->client = NULL;
1344   }
1345   if (NULL != set->client_mq)
1346   {
1347     GNUNET_MQ_destroy (set->client_mq);
1348     set->client_mq = NULL;
1349   }
1350
1351   if (NULL != set->state.u->se)
1352   {
1353     strata_estimator_destroy (set->state.u->se);
1354     set->state.u->se = NULL;
1355   }
1356
1357   destroy_elements (set->state.u);
1358
1359   while (NULL != set->state.u->ops_head)
1360     destroy_union_operation (set->state.u->ops_head);
1361 }
1362
1363 /**
1364  * Remove the element given in the element message from the set.
1365  * Only marks the element as removed, so that older set operations can still exchange it.
1366  *
1367  * @param m message with the element
1368  * @param set set to remove the element from
1369  */
1370 void
1371 _GSS_union_remove (struct ElementMessage *m, struct Set *set)
1372 {
1373   struct GNUNET_HashCode hash;
1374   struct ElementEntry *ee;
1375
1376   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1377   GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1378   ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1379   if (NULL == ee)
1380   {
1381     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1382     return;
1383   }
1384   if (GNUNET_YES == ee->removed)
1385   {
1386     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1387     return;
1388   }
1389   ee->removed = GNUNET_YES;
1390   ee->generation_removed = set->state.u->current_generation;
1391 }
1392