Replace mesh with new version
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct UnionEvaluateOperation
104 {
105   /**
106    * Tunnel to the remote peer.
107    */
108   struct GNUNET_MESH_Tunnel *tunnel;
109
110   /**
111    * Detail information about the set operation,
112    * including the set to use.
113    */
114   struct OperationSpecification *spec;
115
116   /**
117    * Message queue for the peer.
118    */
119   struct GNUNET_MQ_Handle *mq;
120
121   /**
122    * Number of ibf buckets received
123    */
124   unsigned int ibf_buckets_received;
125
126   /**
127    * Copy of the set's strata estimator at the time of
128    * creation of this operation
129    */
130   struct StrataEstimator *se;
131
132   /**
133    * The ibf we currently receive
134    */
135   struct InvertibleBloomFilter *remote_ibf;
136
137   /**
138    * IBF of the set's element.
139    */
140   struct InvertibleBloomFilter *local_ibf;
141
142   /**
143    * Maps IBF-Keys (specific to the current salt) to elements.
144    */
145   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
146
147   /**
148    * Current state of the operation.
149    */
150   enum UnionOperationPhase phase;
151
152   /**
153    * Generation in which the operation handle
154    * was created.
155    */
156   unsigned int generation_created;
157   
158   /**
159    * Evaluate operations are held in
160    * a linked list.
161    */
162   struct UnionEvaluateOperation *next;
163   
164    /**
165    * Evaluate operations are held in
166    * a linked list.
167    */
168   struct UnionEvaluateOperation *prev;
169 };
170
171
172 /**
173  * Information about the element in a set.
174  * All elements are stored in a hash-table
175  * from their hash-code to their 'struct Element',
176  * so that the remove and add operations are reasonably
177  * fast.
178  */
179 struct ElementEntry
180 {
181   /**
182    * The actual element. The data for the element
183    * should be allocated at the end of this struct.
184    */
185   struct GNUNET_SET_Element element;
186
187   /**
188    * Hash of the element.
189    * Will be used to derive the different IBF keys
190    * for different salts.
191    */
192   struct GNUNET_HashCode element_hash;
193
194   /**
195    * Generation the element was added by the client.
196    * Operations of earlier generations will not consider the element.
197    */
198   unsigned int generation_added;
199
200   /**
201    * GNUNET_YES if the element has been removed in some generation.
202    */
203   int removed;
204
205   /**
206    * Generation the element was removed by the client. 
207    * Operations of later generations will not consider the element.
208    * Only valid if is_removed is GNUNET_YES.
209    */
210   unsigned int generation_removed;
211
212   /**
213    * GNUNET_YES if the element is a remote element, and does not belong
214    * to the operation's set.
215    */
216   int remote;
217 };
218
219
220 /**
221  * Entries in the key-to-element map of the union set.
222  */
223 struct KeyEntry
224 {
225   /**
226    * IBF key for the entry, derived from the current salt.
227    */
228   struct IBF_Key ibf_key;
229
230   /**
231    * The actual element associated with the key
232    */
233   struct ElementEntry *element;
234
235   /**
236    * Element that collides with this element
237    * on the ibf key
238    */
239   struct KeyEntry *next_colliding;
240 };
241
242 /**
243  * Used as a closure for sending elements
244  * with a specific IBF key.
245  */
246 struct SendElementClosure
247 {
248   /**
249    * The IBF key whose matching elements should be
250    * sent.
251    */
252   struct IBF_Key ibf_key;
253
254   /**
255    * Operation for which the elements
256    * should be sent.
257    */
258   struct UnionEvaluateOperation *eo;
259 };
260
261
262 /**
263  * Extra state required for efficient set union.
264  */
265 struct UnionState
266 {
267   /**
268    * The strata estimator is only generated once for
269    * each set.
270    * The IBF keys are derived from the element hashes with
271    * salt=0.
272    */
273   struct StrataEstimator *se;
274
275   /**
276    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
277    */
278   struct GNUNET_CONTAINER_MultiHashMap *elements;
279
280   /**
281    * Evaluate operations are held in
282    * a linked list.
283    */
284   struct UnionEvaluateOperation *ops_head;
285
286   /**
287    * Evaluate operations are held in
288    * a linked list.
289    */
290   struct UnionEvaluateOperation *ops_tail;
291
292   /**
293    * Current generation, that is, number of
294    * previously executed operations on this set
295    */
296   unsigned int current_generation;
297 };
298
299
300
301 /**
302  * Iterator over hash map entries.
303  *
304  * @param cls closure
305  * @param key current key code
306  * @param value value in the hash map
307  * @return GNUNET_YES if we should continue to
308  *         iterate,
309  *         GNUNET_NO if not.
310  */
311 static int
312 destroy_elements_iterator (void *cls,
313                            const struct GNUNET_HashCode * key,
314                            void *value)
315 {
316   struct ElementEntry *ee = value;
317
318   GNUNET_free (ee);
319   return GNUNET_YES;
320 }
321
322
323 /**
324  * Destroy the elements belonging to a union set.
325  *
326  * @param us union state that contains the elements
327  */
328 static void
329 destroy_elements (struct UnionState *us)
330 {
331   if (NULL == us->elements)
332     return;
333   GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
334   GNUNET_CONTAINER_multihashmap_destroy (us->elements);
335   us->elements = NULL;
336 }
337
338
339
340 /**
341  * Iterator over hash map entries.
342  *
343  * @param cls closure
344  * @param key current key code
345  * @param value value in the hash map
346  * @return GNUNET_YES if we should continue to
347  *         iterate,
348  *         GNUNET_NO if not.
349  */
350 static int
351 destroy_key_to_element_iter (void *cls,
352                              uint32_t key,
353                              void *value)
354 {
355   struct KeyEntry *k = value;
356   
357   while (NULL != k)
358   {
359     struct KeyEntry *k_tmp = k;
360     k = k->next_colliding;
361     GNUNET_free (k_tmp);
362   }
363   return GNUNET_YES;
364 }
365
366
367 /**
368  * Destroy a union operation, and free all resources
369  * associated with it.
370  *
371  * @param eo the union operation to destroy
372  */
373 void
374 _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
375 {
376   struct UnionState *st = eo->spec->set->state.u;
377
378   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
379
380   if (NULL != eo->tunnel)
381   {
382     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
383     eo->tunnel = NULL;
384     GNUNET_MESH_tunnel_destroy (t);
385   }
386   
387   if (NULL != eo->remote_ibf)
388   {
389     ibf_destroy (eo->remote_ibf);
390     eo->remote_ibf = NULL;
391   }
392   if (NULL != eo->local_ibf)
393   {
394     ibf_destroy (eo->local_ibf);
395     eo->local_ibf = NULL;
396   }
397   if (NULL != eo->se)
398   {
399     strata_estimator_destroy (eo->se);
400     eo->se = NULL;
401   }
402   if (NULL != eo->key_to_element)
403   {
404     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
405     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
406     eo->key_to_element = NULL;
407   }
408
409   GNUNET_CONTAINER_DLL_remove (st->ops_head,
410                                st->ops_tail,
411                                eo);
412   GNUNET_free (eo);
413
414
415   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
416
417
418   /* FIXME: do a garbage collection of the set generations */
419 }
420
421
422 /**
423  * Inform the client that the union operation has failed,
424  * and proceed to destroy the evaluate operation.
425  *
426  * @param eo the union operation to fail
427  */
428 static void
429 fail_union_operation (struct UnionEvaluateOperation *eo)
430 {
431   struct GNUNET_MQ_Envelope *ev;
432   struct GNUNET_SET_ResultMessage *msg;
433
434   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
435   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
436   msg->request_id = htonl (eo->spec->client_request_id);
437   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
438   _GSS_union_operation_destroy (eo);
439 }
440
441
442 /**
443  * Derive the IBF key from a hash code and 
444  * a salt.
445  *
446  * @param src the hash code
447  * @param salt salt to use
448  * @return the derived IBF key
449  */
450 static struct IBF_Key
451 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
452 {
453   struct IBF_Key key;
454
455   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
456                       GCRY_MD_SHA512, GCRY_MD_SHA256,
457                       src, sizeof *src,
458                       &salt, sizeof (salt),
459                       NULL, 0);
460   return key;
461 }
462
463
464 /**
465  * Send a request for the evaluate operation to a remote peer
466  *
467  * @param eo operation with the other peer
468  */
469 static void
470 send_operation_request (struct UnionEvaluateOperation *eo)
471 {
472   struct GNUNET_MQ_Envelope *ev;
473   struct OperationRequestMessage *msg;
474
475   ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
476                                  eo->spec->context_msg);
477
478   if (NULL == ev)
479   {
480     /* the context message is too large */
481     GNUNET_break (0);
482     GNUNET_SERVER_client_disconnect (eo->spec->set->client);
483     return;
484   }
485   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
486   msg->app_id = eo->spec->app_id;
487   GNUNET_MQ_send (eo->mq, ev);
488
489   if (NULL != eo->spec->context_msg)
490   {
491     GNUNET_free (eo->spec->context_msg);
492     eo->spec->context_msg = NULL;
493   }
494
495   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
496 }
497
498
499 /**
500  * Iterator to create the mapping between ibf keys
501  * and element entries.
502  *
503  * @param cls closure
504  * @param key current key code
505  * @param value value in the hash map
506  * @return GNUNET_YES if we should continue to
507  *         iterate,
508  *         GNUNET_NO if not.
509  */
510 static int
511 insert_element_iterator (void *cls,
512                          uint32_t key,
513                          void *value)
514 {
515   struct KeyEntry *const new_k = cls;
516   struct KeyEntry *old_k = value;
517
518   GNUNET_assert (NULL != old_k);
519   do
520   {
521     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
522     {
523       new_k->next_colliding = old_k->next_colliding;
524       old_k->next_colliding = new_k;
525       return GNUNET_NO;
526     }
527     old_k = old_k->next_colliding;
528   } while (NULL != old_k);
529   return GNUNET_YES;
530 }
531
532
533 /**
534  * Insert an element into the union operation's
535  * key-to-element mapping
536  *
537  * @param eo the union operation
538  * @param ee the element entry
539  */
540 static void
541 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
542 {
543   int ret;
544   struct IBF_Key ibf_key;
545   struct KeyEntry *k;
546
547   ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
548   k = GNUNET_new (struct KeyEntry);
549   k->element = ee;
550   k->ibf_key = ibf_key;
551   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
552                                                       (uint32_t) ibf_key.key_val,
553                                                       insert_element_iterator, k);
554
555   /* was the element inserted into a colliding bucket? */
556   if (GNUNET_SYSERR == ret)
557     return;
558
559   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
560                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
561 }
562
563
564 /**
565  * Insert a key into an ibf.
566  *
567  * @param cls the ibf
568  * @param key unused
569  * @param value the key entry to get the key from
570  */
571 static int
572 prepare_ibf_iterator (void *cls,
573                       uint32_t key,
574                       void *value)
575 {
576   struct InvertibleBloomFilter *ibf = cls;
577   struct KeyEntry *ke = value;
578
579   ibf_insert (ibf, ke->ibf_key);
580   return GNUNET_YES;
581 }
582
583
584 /**
585  * Iterator for initializing the
586  * key-to-element mapping of a union operation
587  *
588  * @param cls the union operation
589  * @param key unised
590  * @param value the element entry to insert
591  *        into the key-to-element mapping
592  */
593 static int
594 init_key_to_element_iterator (void *cls,
595                               const struct GNUNET_HashCode *key,
596                               void *value)
597 {
598   struct UnionEvaluateOperation *eo = cls;
599   struct ElementEntry *e = value;
600
601   /* make sure that the element belongs to the set at the time
602    * of creating the operation */
603   if ( (e->generation_added > eo->generation_created) ||
604        ( (GNUNET_YES == e->removed) &&
605          (e->generation_removed < eo->generation_created)))
606     return GNUNET_YES;
607
608   insert_element (eo, e);
609   return GNUNET_YES;
610 }
611
612
613 /**
614  * Create an ibf with the operation's elements
615  * of the specified size
616  *
617  * @param eo the union operation
618  * @param size size of the ibf to create
619  */
620 static void
621 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
622 {
623   if (NULL == eo->key_to_element)
624   {
625     unsigned int len;
626     len = GNUNET_CONTAINER_multihashmap_size (eo->spec->set->state.u->elements);
627     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
628     GNUNET_CONTAINER_multihashmap_iterate (eo->spec->set->state.u->elements,
629                                              init_key_to_element_iterator, eo);
630   }
631   if (NULL != eo->local_ibf)
632     ibf_destroy (eo->local_ibf);
633   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
634   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
635                                            prepare_ibf_iterator, eo->local_ibf);
636 }
637
638
639 /**
640  * Send an ibf of appropriate size.
641  *
642  * @param eo the union operation
643  * @param ibf_order order of the ibf to send, size=2^order
644  */
645 static void
646 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
647 {
648   unsigned int buckets_sent = 0;
649   struct InvertibleBloomFilter *ibf;
650
651   prepare_ibf (eo, 1<<ibf_order);
652
653   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
654
655   ibf = eo->local_ibf;
656
657   while (buckets_sent < (1 << ibf_order))
658   {
659     unsigned int buckets_in_message;
660     struct GNUNET_MQ_Envelope *ev;
661     struct IBFMessage *msg;
662
663     buckets_in_message = (1 << ibf_order) - buckets_sent;
664     /* limit to maximum */
665     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
666       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
667
668     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
669                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
670     msg->order = ibf_order;
671     msg->offset = htons (buckets_sent);
672     ibf_write_slice (ibf, buckets_sent,
673                      buckets_in_message, &msg[1]);
674     buckets_sent += buckets_in_message;
675     GNUNET_MQ_send (eo->mq, ev);
676   }
677
678   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
679 }
680
681
682 /**
683  * Send a strata estimator to the remote peer.
684  *
685  * @param eo the union operation with the remote peer
686  */
687 static void
688 send_strata_estimator (struct UnionEvaluateOperation *eo)
689 {
690   struct GNUNET_MQ_Envelope *ev;
691   struct GNUNET_MessageHeader *strata_msg;
692   struct UnionState *st = eo->spec->set->state.u;
693
694   ev = GNUNET_MQ_msg_header_extra (strata_msg,
695                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
696                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
697   strata_estimator_write (st->se, &strata_msg[1]);
698   GNUNET_MQ_send (eo->mq, ev);
699   eo->phase = PHASE_EXPECT_IBF;
700 }
701
702
703 /**
704  * Compute the necessary order of an ibf
705  * from the size of the symmetric set difference.
706  *
707  * @param diff the difference
708  * @return the required size of the ibf
709  */
710 static unsigned int
711 get_order_from_difference (unsigned int diff)
712 {
713   unsigned int ibf_order;
714
715   ibf_order = 2;
716   while ((1<<ibf_order) < (2 * diff))
717     ibf_order++;
718   if (ibf_order > MAX_IBF_ORDER)
719     ibf_order = MAX_IBF_ORDER;
720   return ibf_order;
721 }
722
723
724 /**
725  * Handle a strata estimator from a remote peer
726  *
727  * @param cls the union operation
728  * @param mh the message
729  */
730 static void
731 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
732 {
733   struct UnionEvaluateOperation *eo = cls;
734   struct StrataEstimator *remote_se;
735   int diff;
736
737
738   if (eo->phase != PHASE_EXPECT_SE)
739   {
740     fail_union_operation (eo);
741     GNUNET_break (0);
742     return;
743   }
744   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
745                                        SE_IBF_HASH_NUM);
746   strata_estimator_read (&mh[1], remote_se);
747   GNUNET_assert (NULL != eo->se);
748   diff = strata_estimator_difference (remote_se, eo->se);
749   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
750   strata_estimator_destroy (remote_se);
751   strata_estimator_destroy (eo->se);
752   eo->se = NULL;
753   send_ibf (eo, get_order_from_difference (diff));
754 }
755
756
757
758 /**
759  * Iterator to send elements to a remote peer
760  *
761  * @param cls closure with the element key and the union operation
762  * @param key ignored
763  * @param value the key entry
764  */
765 static int
766 send_element_iterator (void *cls,
767                        uint32_t key,
768                        void *value)
769 {
770   struct SendElementClosure *sec = cls;
771   struct IBF_Key ibf_key = sec->ibf_key;
772   struct UnionEvaluateOperation *eo = sec->eo;
773   struct KeyEntry *ke = value;
774
775   if (ke->ibf_key.key_val != ibf_key.key_val)
776     return GNUNET_YES;
777   while (NULL != ke)
778   {
779     const struct GNUNET_SET_Element *const element = &ke->element->element;
780     struct GNUNET_MQ_Envelope *ev;
781     struct GNUNET_MessageHeader *mh;
782
783     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
784     ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
785     if (NULL == ev)
786     {
787       /* element too large */
788       GNUNET_break (0);
789       continue;
790     }
791     memcpy (&mh[1], element->data, element->size);
792     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
793     GNUNET_MQ_send (eo->mq, ev);
794     ke = ke->next_colliding;
795   }
796   return GNUNET_NO;
797 }
798
799 /**
800  * Send all elements that have the specified IBF key
801  * to the remote peer of the union operation
802  *
803  * @param eo union operation
804  * @param ibf_key IBF key of interest
805  */
806 static void
807 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
808 {
809   struct SendElementClosure send_cls;
810
811   send_cls.ibf_key = ibf_key;
812   send_cls.eo = eo;
813   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
814                                                 &send_element_iterator, &send_cls);
815 }
816
817
818 /**
819  * Decode which elements are missing on each side, and
820  * send the appropriate elemens and requests
821  *
822  * @param eo union operation
823  */
824 static void
825 decode_and_send (struct UnionEvaluateOperation *eo)
826 {
827   struct IBF_Key key;
828   int side;
829   struct InvertibleBloomFilter *diff_ibf;
830
831   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
832
833   prepare_ibf (eo, eo->remote_ibf->size);
834   diff_ibf = ibf_dup (eo->local_ibf);
835   ibf_subtract (diff_ibf, eo->remote_ibf);
836
837   while (1)
838   {
839     int res;
840
841     res = ibf_decode (diff_ibf, &side, &key);
842     if (GNUNET_SYSERR == res)
843     {
844       int next_order;
845       next_order = 0;
846       while (1<<next_order < diff_ibf->size)
847         next_order++;
848       next_order++;
849       if (next_order <= MAX_IBF_ORDER)
850       {
851         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
852                     "decoding failed, sending larger ibf (size %u)\n",
853                     1<<next_order);
854         send_ibf (eo, next_order);
855       }
856       else
857       {
858         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
859                     "set union failed: reached ibf limit\n");
860       }
861       break;
862     }
863     if (GNUNET_NO == res)
864     {
865       struct GNUNET_MQ_Envelope *ev;
866
867       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
868       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
869       GNUNET_MQ_send (eo->mq, ev);
870       break;
871     }
872     if (1 == side)
873     {
874       send_elements_for_key (eo, key);
875     }
876     else
877     {
878       struct GNUNET_MQ_Envelope *ev;
879       struct GNUNET_MessageHeader *msg;
880
881       /* FIXME: before sending the request, check if we may just have the element */
882       /* FIXME: merge multiple requests */
883       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
884                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
885       *(struct IBF_Key *) &msg[1] = key;
886       GNUNET_MQ_send (eo->mq, ev);
887     }
888   }
889   ibf_destroy (diff_ibf);
890 }
891
892
893 /**
894  * Handle an IBF message from a remote peer.
895  *
896  * @param cls the union operation
897  * @param mh the header of the message
898  */
899 static void
900 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
901 {
902   struct UnionEvaluateOperation *eo = cls;
903   struct IBFMessage *msg = (struct IBFMessage *) mh;
904   unsigned int buckets_in_message;
905
906   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
907        (eo->phase == PHASE_EXPECT_IBF) )
908   {
909     eo->phase = PHASE_EXPECT_IBF_CONT;
910     GNUNET_assert (NULL == eo->remote_ibf);
911     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
912     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
913     if (0 != ntohs (msg->offset))
914     {
915       GNUNET_break (0);
916       fail_union_operation (eo);
917     }
918   }
919   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
920   {
921     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
922          (1<<msg->order != eo->remote_ibf->size) )
923     {
924       GNUNET_break (0);
925       fail_union_operation (eo);
926       return;
927     }
928   }
929
930   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
931
932   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
933   {
934     GNUNET_break (0);
935     fail_union_operation (eo);
936     return;
937   }
938   
939   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
940   eo->ibf_buckets_received += buckets_in_message;
941
942   if (eo->ibf_buckets_received == eo->remote_ibf->size)
943   {
944
945     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
946     eo->phase = PHASE_EXPECT_ELEMENTS;
947     decode_and_send (eo);
948   }
949 }
950
951
952 /**
953  * Send a result message to the client indicating
954  * that there is a new element.
955  *
956  * @param eo union operation
957  * @param element element to send
958  */
959 static void
960 send_client_element (struct UnionEvaluateOperation *eo,
961                      struct GNUNET_SET_Element *element)
962 {
963   struct GNUNET_MQ_Envelope *ev;
964   struct GNUNET_SET_ResultMessage *rm;
965
966   GNUNET_assert (0 != eo->spec->client_request_id);
967   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
968   if (NULL == ev)
969   {
970     GNUNET_MQ_discard (ev);
971     GNUNET_break (0);
972     return;
973   }
974   rm->result_status = htons (GNUNET_SET_STATUS_OK);
975   rm->request_id = htonl (eo->spec->client_request_id);
976   memcpy (&rm[1], element->data, element->size);
977   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
978 }
979
980
981 /**
982  * Send a result message to the client indicating
983  * that the operation is over.
984  * After the result done message has been sent to the client,
985  * destroy the evaluate operation.
986  *
987  * @param eo union operation
988  */
989 static void
990 send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
991 {
992   struct GNUNET_MQ_Envelope *ev;
993   struct GNUNET_SET_ResultMessage *rm;
994
995   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
996   rm->request_id = htonl (eo->spec->client_request_id);
997   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
998   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
999
1000 }
1001
1002
1003 /**
1004  * Handle an element message from a remote peer.
1005  *
1006  * @param cls the union operation
1007  * @param mh the message
1008  */
1009 static void
1010 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1011 {
1012   struct UnionEvaluateOperation *eo = cls;
1013   struct ElementEntry *ee;
1014   uint16_t element_size;
1015
1016   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
1017
1018   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1019        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1020   {
1021     fail_union_operation (eo);
1022     GNUNET_break (0);
1023     return;
1024   }
1025   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1026   ee = GNUNET_malloc (sizeof *eo + element_size);
1027   memcpy (&ee[1], &mh[1], element_size);
1028   ee->element.data = &ee[1];
1029   ee->remote = GNUNET_YES;
1030
1031   insert_element (eo, ee);
1032   send_client_element (eo, &ee->element);
1033
1034   GNUNET_free (ee);
1035 }
1036
1037
1038 /**
1039  * Handle an element request from a remote peer.
1040  *
1041  * @param cls the union operation
1042  * @param mh the message
1043  */
1044 static void
1045 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1046 {
1047   struct UnionEvaluateOperation *eo = cls;
1048   struct IBF_Key *ibf_key;
1049   unsigned int num_keys;
1050
1051   /* look up elements and send them */
1052   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1053   {
1054     GNUNET_break (0);
1055     fail_union_operation (eo);
1056     return;
1057   }
1058
1059   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1060
1061   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1062   {
1063     GNUNET_break (0);
1064     fail_union_operation (eo);
1065     return;
1066   }
1067
1068   ibf_key = (struct IBF_Key *) &mh[1];
1069   while (0 != num_keys--)
1070   {
1071     send_elements_for_key (eo, *ibf_key);
1072     ibf_key++;
1073   }
1074 }
1075
1076
1077 /**
1078  * Callback used for notifications
1079  *
1080  * @param cls closure
1081  */
1082 static void
1083 peer_done_sent_cb (void *cls)
1084 {
1085   struct UnionEvaluateOperation *eo = cls;
1086
1087   send_client_done_and_destroy (eo);
1088 }
1089
1090
1091 /**
1092  * Handle a done message from a remote peer
1093  * 
1094  * @param cls the union operation
1095  * @param mh the message
1096  */
1097 static void
1098 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1099 {
1100   struct UnionEvaluateOperation *eo = cls;
1101
1102   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1103   {
1104     /* we got all requests, but still have to send our elements as response */
1105     struct GNUNET_MQ_Envelope *ev;
1106
1107     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
1108     eo->phase = PHASE_FINISHED;
1109     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1110     GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
1111     GNUNET_MQ_send (eo->mq, ev);
1112     return;
1113   }
1114   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1115   {
1116     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
1117     eo->phase = PHASE_FINISHED;
1118     send_client_done_and_destroy (eo);
1119     return;
1120   }
1121   GNUNET_break (0);
1122   fail_union_operation (eo);
1123 }
1124
1125
1126 /**
1127  * Evaluate a union operation with
1128  * a remote peer.
1129  *
1130  * @param spec specification of the operation the evaluate
1131  * @param tunnel tunnel already connected to the partner peer
1132  * @return a handle to the operation
1133  */
1134 struct UnionEvaluateOperation *
1135 _GSS_union_evaluate (struct OperationSpecification *spec,
1136                      struct GNUNET_MESH_Tunnel *tunnel)
1137 {
1138   struct UnionEvaluateOperation *eo;
1139   struct UnionState *st = spec->set->state.u;
1140
1141   eo = GNUNET_new (struct UnionEvaluateOperation);
1142   eo->se = strata_estimator_dup (spec->set->state.u->se);
1143   eo->spec = spec;
1144   eo->tunnel = tunnel;
1145
1146   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
1147               "evaluating union operation, (app %s)\n", 
1148               GNUNET_h2s (&eo->spec->app_id));
1149
1150   /* we started the operation, thus we have to send the operation request */
1151   eo->phase = PHASE_EXPECT_SE;
1152
1153   GNUNET_CONTAINER_DLL_insert (st->ops_head,
1154                                st->ops_tail,
1155                                eo);
1156
1157   send_operation_request (eo);
1158
1159   return eo;
1160 }
1161
1162
1163 /**
1164  * Accept an union operation request from a remote peer
1165  *
1166  * @param spec all necessary information about the operation
1167  * @param tunnel open tunnel to the partner's peer
1168  * @return operation
1169  */
1170 struct UnionEvaluateOperation *
1171 _GSS_union_accept (struct OperationSpecification *spec,
1172                    struct GNUNET_MESH_Tunnel *tunnel)
1173 {
1174   struct UnionEvaluateOperation *eo;
1175   struct UnionState *st = spec->set->state.u;
1176
1177   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1178
1179   eo = GNUNET_new (struct UnionEvaluateOperation);
1180   eo->generation_created = st->current_generation++;
1181   eo->spec = spec;
1182   eo->tunnel = tunnel;
1183   eo->se = strata_estimator_dup (st->se);
1184   /* transfer ownership of mq and socket from incoming to eo */
1185   GNUNET_CONTAINER_DLL_insert (st->ops_head,
1186                                st->ops_tail,
1187                                eo);
1188   /* kick off the operation */
1189   send_strata_estimator (eo);
1190
1191   return eo;
1192 }
1193
1194
1195 /**
1196  * Create a new set supporting the union operation
1197  *
1198  * @return the newly created set
1199  */
1200 struct Set *
1201 _GSS_union_set_create (void)
1202 {
1203   struct Set *set;
1204
1205   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
1206   
1207   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
1208   set->state.u = (struct UnionState *) &set[1];
1209   set->operation = GNUNET_SET_OPERATION_UNION;
1210   /* keys of the hash map are stored in the element entrys, thus we do not
1211    * want the hash map to copy them */
1212   set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1213   set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
1214                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1215   return set;
1216 }
1217
1218
1219 /**
1220  * Add the element from the given element message to the set.
1221  *
1222  * @param m message with the element
1223  * @param set set to add the element to
1224  */
1225 void
1226 _GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1227 {
1228   struct ElementEntry *ee;
1229   struct ElementEntry *ee_dup;
1230   uint16_t element_size;
1231
1232   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
1233
1234   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1235   element_size = ntohs (m->header.size) - sizeof *m;
1236   ee = GNUNET_malloc (element_size + sizeof *ee);
1237   ee->element.size = element_size;
1238   memcpy (&ee[1], &m[1], element_size);
1239   ee->element.data = &ee[1];
1240   ee->generation_added = set->state.u->current_generation;
1241   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1242   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1243   if (NULL != ee_dup)
1244   {
1245     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1246     GNUNET_free (ee);
1247     return;
1248   }
1249   GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
1250                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1251   strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
1252 }
1253
1254
1255 /**
1256  * Destroy a set that supports the union operation
1257  *
1258  * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
1259  */
1260 void
1261 _GSS_union_set_destroy (struct Set *set)
1262 {
1263   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1264   if (NULL != set->client)
1265   {
1266     GNUNET_SERVER_client_drop (set->client);
1267     set->client = NULL;
1268   }
1269   if (NULL != set->client_mq)
1270   {
1271     GNUNET_MQ_destroy (set->client_mq);
1272     set->client_mq = NULL;
1273   }
1274
1275   if (NULL != set->state.u->se)
1276   {
1277     strata_estimator_destroy (set->state.u->se);
1278     set->state.u->se = NULL;
1279   }
1280
1281   destroy_elements (set->state.u);
1282
1283   while (NULL != set->state.u->ops_head)
1284   {
1285     _GSS_union_operation_destroy (set->state.u->ops_head);
1286   }
1287 }
1288
1289 /**
1290  * Remove the element given in the element message from the set.
1291  * Only marks the element as removed, so that older set operations can still exchange it.
1292  *
1293  * @param m message with the element
1294  * @param set set to remove the element from
1295  */
1296 void
1297 _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1298 {
1299   struct GNUNET_HashCode hash;
1300   struct ElementEntry *ee;
1301
1302   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1303   GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1304   ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1305   if (NULL == ee)
1306   {
1307     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1308     return;
1309   }
1310   if (GNUNET_YES == ee->removed)
1311   {
1312     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1313     return;
1314   }
1315   ee->removed = GNUNET_YES;
1316   ee->generation_removed = set->state.u->current_generation;
1317 }
1318
1319
1320 /**
1321  * Dispatch messages for a union operation.
1322  *
1323  * @param cls closure
1324  * @param tunnel mesh tunnel
1325  * @param tunnel_ctx tunnel context
1326  * @param mh message to process
1327  * @return GNUNET_SYSERR if the tunnel should be disconnected,
1328  *         GNUNET_OK otherwise
1329  */
1330 int
1331 _GSS_union_handle_p2p_message (void *cls,
1332                                struct GNUNET_MESH_Tunnel *tunnel,
1333                                void **tunnel_ctx,
1334                                const struct GNUNET_MessageHeader *mh)
1335 {
1336   struct TunnelContext *tc = *tunnel_ctx;
1337   struct UnionEvaluateOperation *eo;
1338
1339   if (CONTEXT_OPERATION_UNION != tc->type)
1340   {
1341     return GNUNET_SYSERR;
1342   }
1343
1344   eo = tc->data.union_op;
1345
1346   switch (ntohs (mh->type))
1347   {
1348     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1349       handle_p2p_ibf (eo, mh);
1350       break;
1351     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1352       handle_p2p_strata_estimator (eo, mh);
1353       break;
1354     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1355       handle_p2p_elements (eo, mh);
1356       break;
1357     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1358       handle_p2p_element_requests (eo, mh);
1359       break;
1360     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1361       handle_p2p_done (eo, mh);
1362       break;
1363     default:
1364       /* something wrong with mesh's message handlers? */
1365       GNUNET_assert (0);
1366   }
1367   return GNUNET_OK;
1368 }