d654f33e8f10d9245824a5107bfc36430d6eb2ec
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian M. Fuchs
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation
65  */
66 enum IntersectionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct IntersectionEvaluateOperation
104 {
105   /**
106    * Local set the operation is evaluated on.
107    */
108   struct Set *set;
109
110   /**
111    * Peer with the remote set
112    */
113   struct GNUNET_PeerIdentity peer;
114
115   /**
116    * Application-specific identifier
117    */
118   struct GNUNET_HashCode app_id;
119
120   /**
121    * Context message, given to us
122    * by the client, may be NULL.
123    */
124   struct GNUNET_MessageHeader *context_msg;
125
126   /**
127    * Tunnel context for the peer we
128    * evaluate the union operation with.
129    */
130   struct TunnelContext *tc;
131
132   /**
133    * Request ID to multiplex set operations to
134    * the client inhabiting the set.
135    */
136   uint32_t request_id;
137
138   /**
139    * Number of ibf buckets received
140    */
141   unsigned int ibf_buckets_received;
142
143   /**
144    * Copy of the set's strata estimator at the time of
145    * creation of this operation
146    */
147   struct StrataEstimator *se;
148
149   /**
150    * The ibf we currently receive
151    */
152   struct InvertibleBloomFilter *remote_ibf;
153
154   /**
155    * IBF of the set's element.
156    */
157   struct InvertibleBloomFilter *local_ibf;
158
159   /**
160    * Maps IBF-Keys (specific to the current salt) to elements.
161    */
162   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
163
164   /**
165    * Current state of the operation.
166    */
167   enum IntersectionOperationPhase phase;
168
169   /**
170    * Salt to use for this operation.
171    */
172   uint16_t salt;
173
174   /**
175    * Generation in which the operation handle
176    * was created.
177    */
178   unsigned int generation_created;
179   
180   /**
181    * Evaluate operations are held in
182    * a linked list.
183    */
184   struct IntersectionEvaluateOperation *next;
185   
186    /**
187    * Evaluate operations are held in
188    * a linked list.
189    */
190   struct IntersectionEvaluateOperation *prev;
191 };
192
193
194 /**
195  * Information about the element in a set.
196  * All elements are stored in a hash-table
197  * from their hash-code to their 'struct Element',
198  * so that the remove and add operations are reasonably
199  * fast.
200  */
201 struct ElementEntry
202 {
203   /**
204    * The actual element. The data for the element
205    * should be allocated at the end of this struct.
206    */
207   struct GNUNET_SET_Element element;
208
209   /**
210    * Hash of the element.
211    * Will be used to derive the different IBF keys
212    * for different salts.
213    */
214   struct GNUNET_HashCode element_hash;
215
216   /**
217    * Generation the element was added by the client.
218    * Operations of earlier generations will not consider the element.
219    */
220   unsigned int generation_added;
221
222   /**
223    * GNUNET_YES if the element has been removed in some generation.
224    */
225   int removed;
226
227   /**
228    * Generation the element was removed by the client. 
229    * Operations of later generations will not consider the element.
230    * Only valid if is_removed is GNUNET_YES.
231    */
232   unsigned int generation_removed;
233
234   /**
235    * GNUNET_YES if the element is a remote element, and does not belong
236    * to the operation's set.
237    */
238   int remote;
239 };
240
241
242 /**
243  * Entries in the key-to-element map of the union set.
244  */
245 struct KeyEntry
246 {
247   /**
248    * IBF key for the entry, derived from the current salt.
249    */
250   struct IBF_Key ibf_key;
251
252   /**
253    * The actual element associated with the key
254    */
255   struct ElementEntry *element;
256
257   /**
258    * Element that collides with this element
259    * on the ibf key
260    */
261   struct KeyEntry *next_colliding;
262 };
263
264 /**
265  * Used as a closure for sending elements
266  * with a specific IBF key.
267  */
268 struct SendElementClosure
269 {
270   /**
271    * The IBF key whose matching elements should be
272    * sent.
273    */
274   struct IBF_Key ibf_key;
275
276   /**
277    * Operation for which the elements
278    * should be sent.
279    */
280   struct UnionEvaluateOperation *eo;
281 };
282
283
284 /**
285  * Extra state required for efficient set union.
286  */
287 struct IntersectionState
288 {
289   /**
290    * The strata estimator is only generated once for
291    * each set.
292    * The IBF keys are derived from the element hashes with
293    * salt=0.
294    */
295   struct StrataEstimator *se;
296
297   /**
298    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
299    */
300   struct GNUNET_CONTAINER_MultiHashMap *elements;
301
302   /**
303    * Evaluate operations are held in
304    * a linked list.
305    */
306   struct UnionEvaluateOperation *ops_head;
307
308   /**
309    * Evaluate operations are held in
310    * a linked list.
311    */
312   struct UnionEvaluateOperation *ops_tail;
313
314   /**
315    * Current generation, that is, number of
316    * previously executed operations on this set
317    */
318   unsigned int current_generation;
319 };
320
321
322
323 /**
324  * Iterator over hash map entries.
325  *
326  * @param cls closure
327  * @param key current key code
328  * @param value value in the hash map
329  * @return GNUNET_YES if we should continue to
330  *         iterate,
331  *         GNUNET_NO if not.
332  */
333 static int
334 destroy_elements_iterator (void *cls,
335                            const struct GNUNET_HashCode * key,
336                            void *value)
337 {
338   struct ElementEntry *ee = value;
339
340   GNUNET_free (ee);
341   return GNUNET_YES;
342 }
343
344
345 /**
346  * Destroy the elements belonging to a union set.
347  *
348  * @param us union state that contains the elements
349  */
350 static void
351 destroy_elements (struct IntersectionState *us)
352 {
353   if (NULL == us->elements)
354     return;
355   GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
356   GNUNET_CONTAINER_multihashmap_destroy (us->elements);
357   us->elements = NULL;
358 }
359
360
361
362 /**
363  * Iterator over hash map entries.
364  *
365  * @param cls closure
366  * @param key current key code
367  * @param value value in the hash map
368  * @return GNUNET_YES if we should continue to
369  *         iterate,
370  *         GNUNET_NO if not.
371  */
372 static int
373 destroy_key_to_element_iter (void *cls,
374                              uint32_t key,
375                              void *value)
376 {
377   struct KeyEntry *k = value;
378   
379   while (NULL != k)
380   {
381     struct KeyEntry *k_tmp = k;
382     k = k->next_colliding;
383     GNUNET_free (k_tmp);
384   }
385   return GNUNET_YES;
386 }
387
388
389 /**
390  * Destroy a union operation, and free all resources
391  * associated with it.
392  *
393  * @param eo the union operation to destroy
394  */
395 void
396 _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
397 {
398   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
399   
400   if (NULL != eo->tc)
401   {
402     GNUNET_MQ_destroy (eo->tc->mq);
403     GNUNET_MESH_tunnel_destroy (eo->tc->tunnel);
404     GNUNET_free (eo->tc);
405     eo->tc = NULL;
406   }
407
408   if (NULL != eo->remote_ibf)
409   {
410     ibf_destroy (eo->remote_ibf);
411     eo->remote_ibf = NULL;
412   }
413   if (NULL != eo->local_ibf)
414   {
415     ibf_destroy (eo->local_ibf);
416     eo->local_ibf = NULL;
417   }
418   if (NULL != eo->se)
419   {
420     strata_estimator_destroy (eo->se);
421     eo->se = NULL;
422   }
423   if (NULL != eo->key_to_element)
424   {
425     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
426     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
427     eo->key_to_element = NULL;
428   }
429
430   GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
431                                eo->set->state.u->ops_tail,
432                                eo);
433   GNUNET_free (eo);
434
435
436   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
437
438
439   /* FIXME: do a garbage collection of the set generations */
440 }
441
442
443 /**
444  * Inform the client that the union operation has failed,
445  * and proceed to destroy the evaluate operation.
446  *
447  * @param eo the union operation to fail
448  */
449 static void
450 fail_union_operation (struct UnionEvaluateOperation *eo)
451 {
452   struct GNUNET_MQ_Envelope *mqm;
453   struct GNUNET_SET_ResultMessage *msg;
454
455   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
456   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
457   msg->request_id = htonl (eo->request_id);
458   GNUNET_MQ_send (eo->set->client_mq, mqm);
459   _GSS_union_operation_destroy (eo);
460 }
461
462
463 /**
464  * Derive the IBF key from a hash code and 
465  * a salt.
466  *
467  * @param src the hash code
468  * @param salt salt to use
469  * @return the derived IBF key
470  */
471 static struct IBF_Key
472 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
473 {
474   struct IBF_Key key;
475
476   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
477                       GCRY_MD_SHA512, GCRY_MD_SHA256,
478                       src, sizeof *src,
479                       &salt, sizeof (salt),
480                       NULL, 0);
481   return key;
482 }
483
484
485 /**
486  * Send a request for the evaluate operation to a remote peer
487  *
488  * @param eo operation with the other peer
489  */
490 static void
491 send_operation_request (struct UnionEvaluateOperation *eo)
492 {
493   struct GNUNET_MQ_Envelope *mqm;
494   struct OperationRequestMessage *msg;
495
496   mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
497
498   if (NULL == mqm)
499   {
500     /* the context message is too large */
501     GNUNET_break (0);
502     GNUNET_SERVER_client_disconnect (eo->set->client);
503     return;
504   }
505   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
506   msg->app_id = eo->app_id;
507   GNUNET_MQ_send (eo->tc->mq, mqm);
508
509   if (NULL != eo->context_msg)
510   {
511     GNUNET_free (eo->context_msg);
512     eo->context_msg = NULL;
513   }
514
515   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
516 }
517
518
519 /**
520  * Iterator to create the mapping between ibf keys
521  * and element entries.
522  *
523  * @param cls closure
524  * @param key current key code
525  * @param value value in the hash map
526  * @return GNUNET_YES if we should continue to
527  *         iterate,
528  *         GNUNET_NO if not.
529  */
530 static int
531 insert_element_iterator (void *cls,
532                          uint32_t key,
533                          void *value)
534 {
535   struct KeyEntry *const new_k = cls;
536   struct KeyEntry *old_k = value;
537
538   GNUNET_assert (NULL != old_k);
539   do
540   {
541     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
542     {
543       new_k->next_colliding = old_k->next_colliding;
544       old_k->next_colliding = new_k;
545       return GNUNET_NO;
546     }
547     old_k = old_k->next_colliding;
548   } while (NULL != old_k);
549   return GNUNET_YES;
550 }
551
552
553 /**
554  * Insert an element into the union operation's
555  * key-to-element mapping
556  *
557  * @param eo the union operation
558  * @param ee the element entry
559  */
560 static void
561 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
562 {
563   int ret;
564   struct IBF_Key ibf_key;
565   struct KeyEntry *k;
566
567   ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
568   k = GNUNET_new (struct KeyEntry);
569   k->element = ee;
570   k->ibf_key = ibf_key;
571   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
572                                                       (uint32_t) ibf_key.key_val,
573                                                       insert_element_iterator, k);
574
575   /* was the element inserted into a colliding bucket? */
576   if (GNUNET_SYSERR == ret)
577     return;
578
579   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
580                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
581 }
582
583
584 /**
585  * Insert a key into an ibf.
586  *
587  * @param cls the ibf
588  * @param key unused
589  * @param value the key entry to get the key from
590  */
591 static int
592 prepare_ibf_iterator (void *cls,
593                       uint32_t key,
594                       void *value)
595 {
596   struct InvertibleBloomFilter *ibf = cls;
597   struct KeyEntry *ke = value;
598
599   ibf_insert (ibf, ke->ibf_key);
600   return GNUNET_YES;
601 }
602
603
604 /**
605  * Iterator for initializing the
606  * key-to-element mapping of a union operation
607  *
608  * @param cls the union operation
609  * @param key unised
610  * @param value the element entry to insert
611  *        into the key-to-element mapping
612  */
613 static int
614 init_key_to_element_iterator (void *cls,
615                               const struct GNUNET_HashCode *key,
616                               void *value)
617 {
618   struct UnionEvaluateOperation *eo = cls;
619   struct ElementEntry *e = value;
620
621   /* make sure that the element belongs to the set at the time
622    * of creating the operation */
623   if ( (e->generation_added > eo->generation_created) ||
624        ( (GNUNET_YES == e->removed) &&
625          (e->generation_removed < eo->generation_created)))
626     return GNUNET_YES;
627
628   insert_element (eo, e);
629   return GNUNET_YES;
630 }
631
632
633 /**
634  * Create an ibf with the operation's elements
635  * of the specified size
636  *
637  * @param eo the union operation
638  * @param size size of the ibf to create
639  */
640 static void
641 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
642 {
643   if (NULL == eo->key_to_element)
644   {
645     unsigned int len;
646     len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
647     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
648     GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
649                                              init_key_to_element_iterator, eo);
650   }
651   if (NULL != eo->local_ibf)
652     ibf_destroy (eo->local_ibf);
653   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
654   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
655                                            prepare_ibf_iterator, eo->local_ibf);
656 }
657
658
659 /**
660  * Send an ibf of appropriate size.
661  *
662  * @param eo the union operation
663  * @param ibf_order order of the ibf to send, size=2^order
664  */
665 static void
666 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
667 {
668   unsigned int buckets_sent = 0;
669   struct InvertibleBloomFilter *ibf;
670
671   prepare_ibf (eo, 1<<ibf_order);
672
673   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
674
675   ibf = eo->local_ibf;
676
677   while (buckets_sent < (1 << ibf_order))
678   {
679     unsigned int buckets_in_message;
680     struct GNUNET_MQ_Envelope *mqm;
681     struct IBFMessage *msg;
682
683     buckets_in_message = (1 << ibf_order) - buckets_sent;
684     /* limit to maximum */
685     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
686       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
687
688     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
689                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
690     msg->order = ibf_order;
691     msg->offset = htons (buckets_sent);
692     ibf_write_slice (ibf, buckets_sent,
693                      buckets_in_message, &msg[1]);
694     buckets_sent += buckets_in_message;
695     GNUNET_MQ_send (eo->tc->mq, mqm);
696   }
697
698   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
699 }
700
701
702 /**
703  * Send a strata estimator to the remote peer.
704  *
705  * @param eo the union operation with the remote peer
706  */
707 static void
708 send_strata_estimator (struct UnionEvaluateOperation *eo)
709 {
710   struct GNUNET_MQ_Envelope *mqm;
711   struct GNUNET_MessageHeader *strata_msg;
712
713   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
714                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
715                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
716   strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
717   GNUNET_MQ_send (eo->tc->mq, mqm);
718   eo->phase = PHASE_EXPECT_IBF;
719 }
720
721
722 /**
723  * Compute the necessary order of an ibf
724  * from the size of the symmetric set difference.
725  *
726  * @param diff the difference
727  * @return the required size of the ibf
728  */
729 static unsigned int
730 get_order_from_difference (unsigned int diff)
731 {
732   unsigned int ibf_order;
733
734   ibf_order = 2;
735   while ((1<<ibf_order) < (2 * diff))
736     ibf_order++;
737   if (ibf_order > MAX_IBF_ORDER)
738     ibf_order = MAX_IBF_ORDER;
739   return ibf_order;
740 }
741
742
743 /**
744  * Handle a strata estimator from a remote peer
745  *
746  * @param cls the union operation
747  * @param mh the message
748  */
749 static void
750 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
751 {
752   struct UnionEvaluateOperation *eo = cls;
753   struct StrataEstimator *remote_se;
754   int diff;
755
756
757   if (eo->phase != PHASE_EXPECT_SE)
758   {
759     fail_union_operation (eo);
760     GNUNET_break (0);
761     return;
762   }
763   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
764                                        SE_IBF_HASH_NUM);
765   strata_estimator_read (&mh[1], remote_se);
766   GNUNET_assert (NULL != eo->se);
767   diff = strata_estimator_difference (remote_se, eo->se);
768   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
769   strata_estimator_destroy (remote_se);
770   strata_estimator_destroy (eo->se);
771   eo->se = NULL;
772   send_ibf (eo, get_order_from_difference (diff));
773 }
774
775
776
777 /**
778  * Iterator to send elements to a remote peer
779  *
780  * @param cls closure with the element key and the union operation
781  * @param key ignored
782  * @param value the key entry
783  */
784 static int
785 send_element_iterator (void *cls,
786                        uint32_t key,
787                        void *value)
788 {
789   struct SendElementClosure *sec = cls;
790   struct IBF_Key ibf_key = sec->ibf_key;
791   struct UnionEvaluateOperation *eo = sec->eo;
792   struct KeyEntry *ke = value;
793
794   if (ke->ibf_key.key_val != ibf_key.key_val)
795     return GNUNET_YES;
796   while (NULL != ke)
797   {
798     const struct GNUNET_SET_Element *const element = &ke->element->element;
799     struct GNUNET_MQ_Envelope *mqm;
800     struct GNUNET_MessageHeader *mh;
801
802     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
803     mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
804     if (NULL == mqm)
805     {
806       /* element too large */
807       GNUNET_break (0);
808       continue;
809     }
810     memcpy (&mh[1], element->data, element->size);
811     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
812     GNUNET_MQ_send (eo->tc->mq, mqm);
813     ke = ke->next_colliding;
814   }
815   return GNUNET_NO;
816 }
817
818 /**
819  * Send all elements that have the specified IBF key
820  * to the remote peer of the union operation
821  *
822  * @param eo union operation
823  * @param ibf_key IBF key of interest
824  */
825 static void
826 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
827 {
828   struct SendElementClosure send_cls;
829
830   send_cls.ibf_key = ibf_key;
831   send_cls.eo = eo;
832   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
833                                                 &send_element_iterator, &send_cls);
834 }
835
836
837 /**
838  * Decode which elements are missing on each side, and
839  * send the appropriate elemens and requests
840  *
841  * @param eo union operation
842  */
843 static void
844 decode_and_send (struct UnionEvaluateOperation *eo)
845 {
846   struct IBF_Key key;
847   int side;
848   struct InvertibleBloomFilter *diff_ibf;
849
850   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
851
852   prepare_ibf (eo, eo->remote_ibf->size);
853   diff_ibf = ibf_dup (eo->local_ibf);
854   ibf_subtract (diff_ibf, eo->remote_ibf);
855
856   while (1)
857   {
858     int res;
859
860     res = ibf_decode (diff_ibf, &side, &key);
861     if (GNUNET_SYSERR == res)
862     {
863       int next_order;
864       next_order = 0;
865       while (1<<next_order < diff_ibf->size)
866         next_order++;
867       next_order++;
868       if (next_order <= MAX_IBF_ORDER)
869       {
870         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
871                     "decoding failed, sending larger ibf (size %u)\n",
872                     1<<next_order);
873         send_ibf (eo, next_order);
874       }
875       else
876       {
877         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
878                     "set union failed: reached ibf limit\n");
879       }
880       break;
881     }
882     if (GNUNET_NO == res)
883     {
884       struct GNUNET_MQ_Envelope *mqm;
885
886       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
887       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
888       GNUNET_MQ_send (eo->tc->mq, mqm);
889       break;
890     }
891     if (1 == side)
892     {
893       send_elements_for_key (eo, key);
894     }
895     else
896     {
897       struct GNUNET_MQ_Envelope *mqm;
898       struct GNUNET_MessageHeader *msg;
899
900       /* FIXME: before sending the request, check if we may just have the element */
901       /* FIXME: merge multiple requests */
902       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
903                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
904       *(struct IBF_Key *) &msg[1] = key;
905       GNUNET_MQ_send (eo->tc->mq, mqm);
906     }
907   }
908   ibf_destroy (diff_ibf);
909 }
910
911
912 /**
913  * Handle an IBF message from a remote peer.
914  *
915  * @param cls the union operation
916  * @param mh the header of the message
917  */
918 static void
919 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
920 {
921   struct UnionEvaluateOperation *eo = cls;
922   struct IBFMessage *msg = (struct IBFMessage *) mh;
923   unsigned int buckets_in_message;
924
925   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
926        (eo->phase == PHASE_EXPECT_IBF) )
927   {
928     eo->phase = PHASE_EXPECT_IBF_CONT;
929     GNUNET_assert (NULL == eo->remote_ibf);
930     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
931     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
932     if (0 != ntohs (msg->offset))
933     {
934       GNUNET_break (0);
935       fail_union_operation (eo);
936     }
937   }
938   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
939   {
940     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
941          (1<<msg->order != eo->remote_ibf->size) )
942     {
943       GNUNET_break (0);
944       fail_union_operation (eo);
945       return;
946     }
947   }
948
949   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
950
951   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
952   {
953     GNUNET_break (0);
954     fail_union_operation (eo);
955     return;
956   }
957   
958   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
959   eo->ibf_buckets_received += buckets_in_message;
960
961   if (eo->ibf_buckets_received == eo->remote_ibf->size)
962   {
963
964     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
965     eo->phase = PHASE_EXPECT_ELEMENTS;
966     decode_and_send (eo);
967   }
968 }
969
970
971 /**
972  * Send a result message to the client indicating
973  * that there is a new element.
974  *
975  * @param eo union operation
976  * @param element element to send
977  */
978 static void
979 send_client_element (struct UnionEvaluateOperation *eo,
980                      struct GNUNET_SET_Element *element)
981 {
982   struct GNUNET_MQ_Envelope *mqm;
983   struct GNUNET_SET_ResultMessage *rm;
984
985   GNUNET_assert (0 != eo->request_id);
986   mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
987   if (NULL == mqm)
988   {
989     GNUNET_MQ_discard (mqm);
990     GNUNET_break (0);
991     return;
992   }
993   rm->result_status = htons (GNUNET_SET_STATUS_OK);
994   rm->request_id = htonl (eo->request_id);
995   memcpy (&rm[1], element->data, element->size);
996   GNUNET_MQ_send (eo->set->client_mq, mqm);
997 }
998
999
1000 /**
1001  * Send a result message to the client indicating
1002  * that the operation is over.
1003  * After the result done message has been sent to the client,
1004  * destroy the evaluate operation.
1005  *
1006  * @param eo union operation
1007  */
1008 static void
1009 send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1010 {
1011   struct GNUNET_MQ_Envelope *mqm;
1012   struct GNUNET_SET_ResultMessage *rm;
1013
1014   GNUNET_assert (0 != eo->request_id);
1015   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1016   rm->request_id = htonl (eo->request_id);
1017   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1018   GNUNET_MQ_send (eo->set->client_mq, mqm);
1019
1020 }
1021
1022
1023 /**
1024  * Handle an element message from a remote peer.
1025  *
1026  * @param cls the union operation
1027  * @param mh the message
1028  */
1029 static void
1030 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1031 {
1032   struct UnionEvaluateOperation *eo = cls;
1033   struct ElementEntry *ee;
1034   uint16_t element_size;
1035
1036   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
1037
1038   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1039        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1040   {
1041     fail_union_operation (eo);
1042     GNUNET_break (0);
1043     return;
1044   }
1045   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1046   ee = GNUNET_malloc (sizeof *eo + element_size);
1047   memcpy (&ee[1], &mh[1], element_size);
1048   ee->element.data = &ee[1];
1049   ee->remote = GNUNET_YES;
1050
1051   insert_element (eo, ee);
1052   send_client_element (eo, &ee->element);
1053
1054   GNUNET_free (ee);
1055 }
1056
1057
1058 /**
1059  * Handle an element request from a remote peer.
1060  *
1061  * @param cls the union operation
1062  * @param mh the message
1063  */
1064 static void
1065 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1066 {
1067   struct UnionEvaluateOperation *eo = cls;
1068   struct IBF_Key *ibf_key;
1069   unsigned int num_keys;
1070
1071   /* look up elements and send them */
1072   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1073   {
1074     GNUNET_break (0);
1075     fail_union_operation (eo);
1076     return;
1077   }
1078
1079   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1080
1081   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1082   {
1083     GNUNET_break (0);
1084     fail_union_operation (eo);
1085     return;
1086   }
1087
1088   ibf_key = (struct IBF_Key *) &mh[1];
1089   while (0 != num_keys--)
1090   {
1091     send_elements_for_key (eo, *ibf_key);
1092     ibf_key++;
1093   }
1094 }
1095
1096
1097 /**
1098  * Callback used for notifications
1099  *
1100  * @param cls closure
1101  */
1102 static void
1103 peer_done_sent_cb (void *cls)
1104 {
1105   struct UnionEvaluateOperation *eo = cls;
1106
1107   send_client_done_and_destroy (eo);
1108 }
1109
1110
1111 /**
1112  * Handle a done message from a remote peer
1113  * 
1114  * @param cls the union operation
1115  * @param mh the message
1116  */
1117 static void
1118 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1119 {
1120   struct UnionEvaluateOperation *eo = cls;
1121
1122   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1123   {
1124     /* we got all requests, but still have to send our elements as response */
1125     struct GNUNET_MQ_Envelope *mqm;
1126
1127     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
1128     eo->phase = PHASE_FINISHED;
1129     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1130     GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
1131     GNUNET_MQ_send (eo->tc->mq, mqm);
1132     return;
1133   }
1134   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1135   {
1136     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
1137     eo->phase = PHASE_FINISHED;
1138     send_client_done_and_destroy (eo);
1139     return;
1140   }
1141   GNUNET_break (0);
1142   fail_union_operation (eo);
1143 }
1144
1145
1146 /**
1147  * Evaluate a union operation with
1148  * a remote peer.
1149  *
1150  * @param m the evaluate request message from the client
1151  * @param set the set to evaluate the operation with
1152  */
1153 void
1154 _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
1155 {
1156   struct IntersectionEvaluateOperation *eo;
1157   struct GNUNET_MessageHeader *context_msg;
1158
1159   eo = GNUNET_new (struct IntersectionEvaluateOperation);
1160   eo->peer = m->target_peer;
1161   eo->set = set;
1162   eo->request_id = htonl (m->request_id);
1163   GNUNET_assert (0 != eo->request_id);
1164   eo->se = strata_estimator_dup (set->state.i->se);
1165   eo->salt = ntohs (m->salt);
1166   eo->app_id = m->app_id;
1167   
1168   context_msg = GNUNET_MQ_extract_nested_mh (m);
1169   if (NULL != context_msg)
1170   {
1171     eo->context_msg = GNUNET_copy_message (context_msg);
1172   }
1173
1174   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
1175               "evaluating intersection operation, (app %s)\n", 
1176               GNUNET_h2s (&eo->app_id));
1177
1178   eo->tc = GNUNET_new (struct TunnelContext);
1179   eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
1180                                               GNUNET_APPLICATION_TYPE_SET);
1181   GNUNET_assert (NULL != eo->tc->tunnel);
1182   eo->tc->peer = eo->peer;
1183   eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
1184   /* we started the operation, thus we have to send the operation request */
1185   eo->phase = PHASE_EXPECT_SE;
1186
1187   GNUNET_CONTAINER_DLL_insert (eo->set->state.i->ops_head,
1188                                eo->set->state.i->ops_tail,
1189                                eo);
1190
1191   send_operation_request (eo);
1192 }
1193
1194
1195 /**
1196  * Accept an union operation request from a remote peer
1197  *
1198  * @param m the accept message from the client
1199  * @param set the set of the client
1200  * @param incoming information about the requesting remote peer
1201  */
1202 void
1203 _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
1204                    struct Incoming *incoming)
1205 {
1206   struct UnionEvaluateOperation *eo;
1207
1208   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1209
1210   eo = GNUNET_new (struct UnionEvaluateOperation);
1211   eo->tc = incoming->tc;
1212   eo->generation_created = set->state.u->current_generation++;
1213   eo->set = set;
1214   eo->salt = ntohs (incoming->salt);
1215   GNUNET_assert (0 != ntohl (m->request_id));
1216   eo->request_id = ntohl (m->request_id);
1217   eo->se = strata_estimator_dup (set->state.u->se);
1218   /* transfer ownership of mq and socket from incoming to eo */
1219   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1220                                eo->set->state.u->ops_tail,
1221                                eo);
1222   /* kick off the operation */
1223   send_strata_estimator (eo);
1224 }
1225
1226
1227 /**
1228  * Create a new set supporting the intersection operation
1229  *
1230  * @return the newly created set
1231  */
1232 struct Set *
1233 _GSS_intersection_set_create (void)
1234 {
1235   struct Set *set;
1236
1237   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "intersection set created\n");
1238   
1239   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct IntersectionState));
1240   set->state.i = (struct IntersectionState *) &set[1];
1241   set->operation = GNUNET_SET_OPERATION_INTERSECTION;
1242   /* keys of the hash map are stored in the element entrys, thus we do not
1243    * want the hash map to copy them */
1244   set->state.i->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1245   set->state.i->se = strata_estimator_create (SE_STRATA_COUNT,
1246                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1247   return set;
1248 }
1249
1250
1251 /**
1252  * Add the element from the given element message to the set.
1253  *
1254  * @param m message with the element
1255  * @param set set to add the element to
1256  */
1257 void
1258 _GSS_intersection_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1259 {
1260   struct ElementEntry *ee;
1261   struct ElementEntry *ee_dup;
1262   uint16_t element_size;
1263
1264   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
1265
1266   GNUNET_assert (GNUNET_SET_OPERATION_INTERSECTION == set->operation);
1267   element_size = ntohs (m->header.size) - sizeof *m;
1268   ee = GNUNET_malloc (element_size + sizeof *ee);
1269   ee->element.size = element_size;
1270   memcpy (&ee[1], &m[1], element_size);
1271   ee->element.data = &ee[1];
1272   ee->generation_added = set->state.i->current_generation;
1273   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1274   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.i->elements, &ee->element_hash);
1275   if (NULL != ee_dup)
1276   {
1277     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1278     GNUNET_free (ee);
1279     return;
1280   }
1281   GNUNET_CONTAINER_multihashmap_put (set->state.i->elements, &ee->element_hash, ee,
1282                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1283   strata_estimator_insert (set->state.i->se, get_ibf_key (&ee->element_hash, 0));
1284 }
1285
1286
1287 /**
1288  * Destroy a set that supports the union operation
1289  *
1290  * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
1291  */
1292 void
1293 _GSS_union_set_destroy (struct Set *set)
1294 {
1295   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1296   if (NULL != set->client)
1297   {
1298     GNUNET_SERVER_client_drop (set->client);
1299     set->client = NULL;
1300   }
1301   if (NULL != set->client_mq)
1302   {
1303     GNUNET_MQ_destroy (set->client_mq);
1304     set->client_mq = NULL;
1305   }
1306
1307   if (NULL != set->state.u->se)
1308   {
1309     strata_estimator_destroy (set->state.u->se);
1310     set->state.u->se = NULL;
1311   }
1312
1313   destroy_elements (set->state.u);
1314
1315   while (NULL != set->state.u->ops_head)
1316   {
1317     _GSS_union_operation_destroy (set->state.u->ops_head);
1318   }
1319 }
1320
1321 /**
1322  * Remove the element given in the element message from the set.
1323  * Only marks the element as removed, so that older set operations can still exchange it.
1324  *
1325  * @param m message with the element
1326  * @param set set to remove the element from
1327  */
1328 void
1329 _GSS_intersection_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1330 {
1331   struct GNUNET_HashCode hash;
1332   struct ElementEntry *ee;
1333
1334   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1335   GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1336   ee = GNUNET_CONTAINER_multihashmap_get (set->state.i->elements, &hash);
1337   if (NULL == ee)
1338   {
1339     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1340     return;
1341   }
1342   if (GNUNET_YES == ee->removed)
1343   {
1344     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1345     return;
1346   }
1347   ee->removed = GNUNET_YES;
1348   ee->generation_removed = set->state.i->current_generation;
1349 }
1350
1351
1352 /**
1353  * Dispatch messages for a union operation.
1354  *
1355  * @param cls closure
1356  * @param tunnel mesh tunnel
1357  * @param tunnel_ctx tunnel context
1358  * @param sender ???
1359  * @param mh message to process
1360  * @return ???
1361  */
1362 int
1363 _GSS_union_handle_p2p_message (void *cls,
1364                                struct GNUNET_MESH_Tunnel *tunnel,
1365                                void **tunnel_ctx,
1366                                const struct GNUNET_PeerIdentity *sender,
1367                                const struct GNUNET_MessageHeader *mh)
1368 {
1369   struct TunnelContext *tc = *tunnel_ctx;
1370   struct UnionEvaluateOperation *eo;
1371
1372   if (CONTEXT_OPERATION_UNION != tc->type)
1373   {
1374     /* FIXME: kill the tunnel */
1375     /* never kill mesh */
1376     return GNUNET_OK;
1377   }
1378
1379   eo = tc->data;
1380
1381   switch (ntohs (mh->type))
1382   {
1383     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1384       handle_p2p_ibf (eo, mh);
1385       break;
1386     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1387       handle_p2p_strata_estimator (eo, mh);
1388       break;
1389     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1390       handle_p2p_elements (eo, mh);
1391       break;
1392     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1393       handle_p2p_element_requests (eo, mh);
1394       break;
1395     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1396       handle_p2p_done (eo, mh);
1397       break;
1398     default:
1399       /* something wrong with mesh's message handlers? */
1400       GNUNET_assert (0);
1401   }
1402   /* never kill mesh! */
1403   return GNUNET_OK;
1404 }