-ftbfs
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set_intersection.c
23  * @brief two-peer set intersection
24  * @author Christian M. Fuchs
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation
65  */
66 enum IntersectionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct IntersectionEvaluateOperation
104 {
105   /**
106    * Local set the operation is evaluated on.
107    */
108   struct Set *set;
109
110   /**
111    * Peer with the remote set
112    */
113   struct GNUNET_PeerIdentity peer;
114
115   /**
116    * Application-specific identifier
117    */
118   struct GNUNET_HashCode app_id;
119
120   /**
121    * Context message, given to us
122    * by the client, may be NULL.
123    */
124   struct GNUNET_MessageHeader *context_msg;
125
126   /**
127    * Tunnel to the other peer.
128    */
129   struct GNUNET_MESH_Tunnel *tunnel;
130
131   /**
132    * Request ID to multiplex set operations to
133    * the client inhabiting the set.
134    */
135   uint32_t request_id;
136
137   /**
138    * Number of ibf buckets received
139    */
140   unsigned int ibf_buckets_received;
141
142   /**
143    * Copy of the set's strata estimator at the time of
144    * creation of this operation
145    */
146   struct StrataEstimator *se;
147
148   /**
149    * The ibf we currently receive
150    */
151   struct InvertibleBloomFilter *remote_ibf;
152
153   /**
154    * IBF of the set's element.
155    */
156   struct InvertibleBloomFilter *local_ibf;
157
158   /**
159    * Maps IBF-Keys (specific to the current salt) to elements.
160    */
161   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
162
163   /**
164    * Current state of the operation.
165    */
166   enum IntersectionOperationPhase phase;
167
168   /**
169    * Salt to use for this operation.
170    */
171   uint16_t salt;
172
173   /**
174    * Generation in which the operation handle
175    * was created.
176    */
177   unsigned int generation_created;
178
179   /**
180    * Evaluate operations are held in
181    * a linked list.
182    */
183   struct IntersectionEvaluateOperation *next;
184
185    /**
186    * Evaluate operations are held in
187    * a linked list.
188    */
189   struct IntersectionEvaluateOperation *prev;
190 };
191
192
193 /**
194  * Information about the element in a set.
195  * All elements are stored in a hash-table
196  * from their hash-code to their 'struct Element',
197  * so that the remove and add operations are reasonably
198  * fast.
199  */
200 struct ElementEntry
201 {
202   /**
203    * The actual element. The data for the element
204    * should be allocated at the end of this struct.
205    */
206   struct GNUNET_SET_Element element;
207
208   /**
209    * Hash of the element.
210    * Will be used to derive the different IBF keys
211    * for different salts.
212    */
213   struct GNUNET_HashCode element_hash;
214
215   /**
216    * Generation the element was added by the client.
217    * Operations of earlier generations will not consider the element.
218    */
219   unsigned int generation_added;
220
221   /**
222    * GNUNET_YES if the element has been removed in some generation.
223    */
224   int removed;
225
226   /**
227    * Generation the element was removed by the client.
228    * Operations of later generations will not consider the element.
229    * Only valid if is_removed is GNUNET_YES.
230    */
231   unsigned int generation_removed;
232
233   /**
234    * GNUNET_YES if the element is a remote element, and does not belong
235    * to the operation's set.
236    */
237   int remote;
238 };
239
240
241 /**
242  * Entries in the key-to-element map of the union set.
243  */
244 struct KeyEntry
245 {
246   /**
247    * IBF key for the entry, derived from the current salt.
248    */
249   struct IBF_Key ibf_key;
250
251   /**
252    * The actual element associated with the key
253    */
254   struct ElementEntry *element;
255
256   /**
257    * Element that collides with this element
258    * on the ibf key
259    */
260   struct KeyEntry *next_colliding;
261 };
262
263 /**
264  * Used as a closure for sending elements
265  * with a specific IBF key.
266  */
267 struct SendElementClosure
268 {
269   /**
270    * The IBF key whose matching elements should be
271    * sent.
272    */
273   struct IBF_Key ibf_key;
274
275   /**
276    * Operation for which the elements
277    * should be sent.
278    */
279   struct IntersectionEvaluateOperation *eo;
280 };
281
282
283 /**
284  * Extra state required for efficient set union.
285  */
286 struct IntersectionState
287 {
288   /**
289    * The strata estimator is only generated once for
290    * each set.
291    * The IBF keys are derived from the element hashes with
292    * salt=0.
293    */
294   struct StrataEstimator *se;
295
296   /**
297    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
298    */
299   struct GNUNET_CONTAINER_MultiHashMap *elements;
300
301   /**
302    * Evaluate operations are held in
303    * a linked list.
304    */
305   struct IntersectionEvaluateOperation *ops_head;
306
307   /**
308    * Evaluate operations are held in
309    * a linked list.
310    */
311   struct IntersectionEvaluateOperation *ops_tail;
312
313   /**
314    * Current generation, that is, number of
315    * previously executed operations on this set
316    */
317   unsigned int current_generation;
318 };
319
320
321
322 /**
323  * Iterator over hash map entries.
324  *
325  * @param cls closure
326  * @param key current key code
327  * @param value value in the hash map
328  * @return GNUNET_YES if we should continue to
329  *         iterate,
330  *         GNUNET_NO if not.
331  */
332 static int
333 destroy_elements_iterator (void *cls,
334                            const struct GNUNET_HashCode * key,
335                            void *value)
336 {
337   struct ElementEntry *ee = value;
338
339   GNUNET_free (ee);
340   return GNUNET_YES;
341 }
342
343
344 /**
345  * Destroy the elements belonging to a union set.
346  *
347  * @param us union state that contains the elements
348  */
349 static void
350 destroy_elements (struct IntersectionState *us)
351 {
352   if (NULL == us->elements)
353     return;
354   GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
355   GNUNET_CONTAINER_multihashmap_destroy (us->elements);
356   us->elements = NULL;
357 }
358
359
360
361 /**
362  * Iterator over hash map entries.
363  *
364  * @param cls closure
365  * @param key current key code
366  * @param value value in the hash map
367  * @return GNUNET_YES if we should continue to
368  *         iterate,
369  *         GNUNET_NO if not.
370  */
371 static int
372 destroy_key_to_element_iter (void *cls,
373                              uint32_t key,
374                              void *value)
375 {
376   struct KeyEntry *k = value;
377
378   while (NULL != k)
379   {
380     struct KeyEntry *k_tmp = k;
381     k = k->next_colliding;
382     GNUNET_free (k_tmp);
383   }
384   return GNUNET_YES;
385 }
386
387
388 /**
389  * Destroy a union operation, and free all resources
390  * associated with it.
391  *
392  * @param eo the union operation to destroy
393  */
394 void
395 _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
396 {
397   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
398
399   if (NULL != eo->tunnel)
400   {
401     GNUNET_MESH_tunnel_destroy (eo->tunnel);
402     /* wait for the final destruction by the tunnel cleaner */
403     return;
404   }
405
406   if (NULL != eo->remote_ibf)
407   {
408     ibf_destroy (eo->remote_ibf);
409     eo->remote_ibf = NULL;
410   }
411   if (NULL != eo->local_ibf)
412   {
413     ibf_destroy (eo->local_ibf);
414     eo->local_ibf = NULL;
415   }
416   if (NULL != eo->se)
417   {
418     strata_estimator_destroy (eo->se);
419     eo->se = NULL;
420   }
421   if (NULL != eo->key_to_element)
422   {
423     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
424     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
425     eo->key_to_element = NULL;
426   }
427
428   GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
429                                eo->set->state.u->ops_tail,
430                                eo);
431   GNUNET_free (eo);
432
433   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
434
435   /* FIXME: do a garbage collection of the set generations */
436 }
437
438
439 /**
440  * Inform the client that the union operation has failed,
441  * and proceed to destroy the evaluate operation.
442  *
443  * @param eo the union operation to fail
444  */
445 static void
446 fail_union_operation (struct UnionEvaluateOperation *eo)
447 {
448   struct GNUNET_MQ_Envelope *mqm;
449   struct GNUNET_SET_ResultMessage *msg;
450
451   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
452   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
453   msg->request_id = htonl (eo->request_id);
454   GNUNET_MQ_send (eo->set->client_mq, mqm);
455   _GSS_union_operation_destroy (eo);
456 }
457
458
459 /**
460  * Derive the IBF key from a hash code and
461  * a salt.
462  *
463  * @param src the hash code
464  * @param salt salt to use
465  * @return the derived IBF key
466  */
467 static struct IBF_Key
468 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
469 {
470   struct IBF_Key key;
471
472   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
473                       GCRY_MD_SHA512, GCRY_MD_SHA256,
474                       src, sizeof *src,
475                       &salt, sizeof (salt),
476                       NULL, 0);
477   return key;
478 }
479
480
481 /**
482  * Send a request for the evaluate operation to a remote peer
483  *
484  * @param eo operation with the other peer
485  */
486 static void
487 send_operation_request (struct IntersectionEvaluateOperation *eo)
488 {
489   struct GNUNET_MQ_Envelope *mqm;
490   struct OperationRequestMessage *msg;
491
492   mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
493
494   if (NULL == mqm)
495   {
496     /* the context message is too large */
497     GNUNET_break (0);
498     GNUNET_SERVER_client_disconnect (eo->set->client);
499     return;
500   }
501   msg->operation = htons (GNUNET_SET_OPERATION_INTERSECTION);
502   msg->app_id = eo->app_id;
503   GNUNET_MQ_send (eo->tc->mq, mqm);
504
505   if (NULL != eo->context_msg)
506   {
507     GNUNET_free (eo->context_msg);
508     eo->context_msg = NULL;
509   }
510
511   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
512 }
513
514
515 /**
516  * Iterator to create the mapping between ibf keys
517  * and element entries.
518  *
519  * @param cls closure
520  * @param key current key code
521  * @param value value in the hash map
522  * @return GNUNET_YES if we should continue to
523  *         iterate,
524  *         GNUNET_NO if not.
525  */
526 static int
527 insert_element_iterator (void *cls,
528                          uint32_t key,
529                          void *value)
530 {
531   struct KeyEntry *const new_k = cls;
532   struct KeyEntry *old_k = value;
533
534   GNUNET_assert (NULL != old_k);
535   do
536   {
537     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
538     {
539       new_k->next_colliding = old_k->next_colliding;
540       old_k->next_colliding = new_k;
541       return GNUNET_NO;
542     }
543     old_k = old_k->next_colliding;
544   } while (NULL != old_k);
545   return GNUNET_YES;
546 }
547
548
549 /**
550  * Insert an element into the union operation's
551  * key-to-element mapping
552  *
553  * @param eo the union operation
554  * @param ee the element entry
555  */
556 static void
557 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
558 {
559   int ret;
560   struct IBF_Key ibf_key;
561   struct KeyEntry *k;
562
563   ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
564   k = GNUNET_new (struct KeyEntry);
565   k->element = ee;
566   k->ibf_key = ibf_key;
567   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
568                                                       (uint32_t) ibf_key.key_val,
569                                                       insert_element_iterator, k);
570
571   /* was the element inserted into a colliding bucket? */
572   if (GNUNET_SYSERR == ret)
573     return;
574
575   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
576                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
577 }
578
579
580 /**
581  * Insert a key into an ibf.
582  *
583  * @param cls the ibf
584  * @param key unused
585  * @param value the key entry to get the key from
586  */
587 static int
588 prepare_ibf_iterator (void *cls,
589                       uint32_t key,
590                       void *value)
591 {
592   struct InvertibleBloomFilter *ibf = cls;
593   struct KeyEntry *ke = value;
594
595   ibf_insert (ibf, ke->ibf_key);
596   return GNUNET_YES;
597 }
598
599
600 /**
601  * Iterator for initializing the
602  * key-to-element mapping of a union operation
603  *
604  * @param cls the union operation
605  * @param key unised
606  * @param value the element entry to insert
607  *        into the key-to-element mapping
608  */
609 static int
610 init_key_to_element_iterator (void *cls,
611                               const struct GNUNET_HashCode *key,
612                               void *value)
613 {
614   struct UnionEvaluateOperation *eo = cls;
615   struct ElementEntry *e = value;
616
617   /* make sure that the element belongs to the set at the time
618    * of creating the operation */
619   if ( (e->generation_added > eo->generation_created) ||
620        ( (GNUNET_YES == e->removed) &&
621          (e->generation_removed < eo->generation_created)))
622     return GNUNET_YES;
623
624   insert_element (eo, e);
625   return GNUNET_YES;
626 }
627
628
629 /**
630  * Create an ibf with the operation's elements
631  * of the specified size
632  *
633  * @param eo the union operation
634  * @param size size of the ibf to create
635  */
636 static void
637 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
638 {
639   if (NULL == eo->key_to_element)
640   {
641     unsigned int len;
642     len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
643     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
644     GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
645                                              init_key_to_element_iterator, eo);
646   }
647   if (NULL != eo->local_ibf)
648     ibf_destroy (eo->local_ibf);
649   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
650   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
651                                            prepare_ibf_iterator, eo->local_ibf);
652 }
653
654
655 /**
656  * Send an ibf of appropriate size.
657  *
658  * @param eo the union operation
659  * @param ibf_order order of the ibf to send, size=2^order
660  */
661 static void
662 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
663 {
664   unsigned int buckets_sent = 0;
665   struct InvertibleBloomFilter *ibf;
666
667   prepare_ibf (eo, 1<<ibf_order);
668
669   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
670
671   ibf = eo->local_ibf;
672
673   while (buckets_sent < (1 << ibf_order))
674   {
675     unsigned int buckets_in_message;
676     struct GNUNET_MQ_Envelope *mqm;
677     struct IBFMessage *msg;
678
679     buckets_in_message = (1 << ibf_order) - buckets_sent;
680     /* limit to maximum */
681     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
682       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
683
684     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
685                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
686     msg->order = ibf_order;
687     msg->offset = htons (buckets_sent);
688     ibf_write_slice (ibf, buckets_sent,
689                      buckets_in_message, &msg[1]);
690     buckets_sent += buckets_in_message;
691     GNUNET_MQ_send (eo->tc->mq, mqm);
692   }
693
694   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
695 }
696
697
698 /**
699  * Send a strata estimator to the remote peer.
700  *
701  * @param eo the union operation with the remote peer
702  */
703 static void
704 send_strata_estimator (struct IntersectionEvaluateOperation *eo)
705 {
706   struct GNUNET_MQ_Envelope *mqm;
707   struct GNUNET_MessageHeader *strata_msg;
708
709   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
710                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
711                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
712   strata_estimator_write (eo->set->state.i->se, &strata_msg[1]);
713   GNUNET_MQ_send (eo->tc->mq, mqm);
714   eo->phase = PHASE_EXPECT_IBF;
715 }
716
717
718 /**
719  * Compute the necessary order of an ibf
720  * from the size of the symmetric set difference.
721  *
722  * @param diff the difference
723  * @return the required size of the ibf
724  */
725 static unsigned int
726 get_order_from_difference (unsigned int diff)
727 {
728   unsigned int ibf_order;
729
730   ibf_order = 2;
731   while ((1<<ibf_order) < (2 * diff))
732     ibf_order++;
733   if (ibf_order > MAX_IBF_ORDER)
734     ibf_order = MAX_IBF_ORDER;
735   return ibf_order;
736 }
737
738
739 /**
740  * Handle a strata estimator from a remote peer
741  *
742  * @param cls the union operation
743  * @param mh the message
744  */
745 static void
746 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
747 {
748   struct UnionEvaluateOperation *eo = cls;
749   struct StrataEstimator *remote_se;
750   int diff;
751
752
753   if (eo->phase != PHASE_EXPECT_SE)
754   {
755     fail_union_operation (eo);
756     GNUNET_break (0);
757     return;
758   }
759   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
760                                        SE_IBF_HASH_NUM);
761   strata_estimator_read (&mh[1], remote_se);
762   GNUNET_assert (NULL != eo->se);
763   diff = strata_estimator_difference (remote_se, eo->se);
764   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
765   strata_estimator_destroy (remote_se);
766   strata_estimator_destroy (eo->se);
767   eo->se = NULL;
768   send_ibf (eo, get_order_from_difference (diff));
769 }
770
771
772
773 /**
774  * Iterator to send elements to a remote peer
775  *
776  * @param cls closure with the element key and the union operation
777  * @param key ignored
778  * @param value the key entry
779  */
780 static int
781 send_element_iterator (void *cls,
782                        uint32_t key,
783                        void *value)
784 {
785   struct SendElementClosure *sec = cls;
786   struct IBF_Key ibf_key = sec->ibf_key;
787   struct UnionEvaluateOperation *eo = sec->eo;
788   struct KeyEntry *ke = value;
789
790   if (ke->ibf_key.key_val != ibf_key.key_val)
791     return GNUNET_YES;
792   while (NULL != ke)
793   {
794     const struct GNUNET_SET_Element *const element = &ke->element->element;
795     struct GNUNET_MQ_Envelope *mqm;
796     struct GNUNET_MessageHeader *mh;
797
798     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
799     mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
800     if (NULL == mqm)
801     {
802       /* element too large */
803       GNUNET_break (0);
804       continue;
805     }
806     memcpy (&mh[1], element->data, element->size);
807     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
808     GNUNET_MQ_send (eo->tc->mq, mqm);
809     ke = ke->next_colliding;
810   }
811   return GNUNET_NO;
812 }
813
814 /**
815  * Send all elements that have the specified IBF key
816  * to the remote peer of the union operation
817  *
818  * @param eo union operation
819  * @param ibf_key IBF key of interest
820  */
821 static void
822 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
823 {
824   struct SendElementClosure send_cls;
825
826   send_cls.ibf_key = ibf_key;
827   send_cls.eo = eo;
828   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
829                                                 &send_element_iterator, &send_cls);
830 }
831
832
833 /**
834  * Decode which elements are missing on each side, and
835  * send the appropriate elemens and requests
836  *
837  * @param eo union operation
838  */
839 static void
840 decode_and_send (struct UnionEvaluateOperation *eo)
841 {
842   struct IBF_Key key;
843   int side;
844   struct InvertibleBloomFilter *diff_ibf;
845
846   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
847
848   prepare_ibf (eo, eo->remote_ibf->size);
849   diff_ibf = ibf_dup (eo->local_ibf);
850   ibf_subtract (diff_ibf, eo->remote_ibf);
851
852   while (1)
853   {
854     int res;
855
856     res = ibf_decode (diff_ibf, &side, &key);
857     if (GNUNET_SYSERR == res)
858     {
859       int next_order;
860       next_order = 0;
861       while (1<<next_order < diff_ibf->size)
862         next_order++;
863       next_order++;
864       if (next_order <= MAX_IBF_ORDER)
865       {
866         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
867                     "decoding failed, sending larger ibf (size %u)\n",
868                     1<<next_order);
869         send_ibf (eo, next_order);
870       }
871       else
872       {
873         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
874                     "set union failed: reached ibf limit\n");
875       }
876       break;
877     }
878     if (GNUNET_NO == res)
879     {
880       struct GNUNET_MQ_Envelope *mqm;
881
882       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
883       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
884       GNUNET_MQ_send (eo->tc->mq, mqm);
885       break;
886     }
887     if (1 == side)
888     {
889       send_elements_for_key (eo, key);
890     }
891     else
892     {
893       struct GNUNET_MQ_Envelope *mqm;
894       struct GNUNET_MessageHeader *msg;
895
896       /* FIXME: before sending the request, check if we may just have the element */
897       /* FIXME: merge multiple requests */
898       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
899                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
900       *(struct IBF_Key *) &msg[1] = key;
901       GNUNET_MQ_send (eo->tc->mq, mqm);
902     }
903   }
904   ibf_destroy (diff_ibf);
905 }
906
907
908 /**
909  * Handle an IBF message from a remote peer.
910  *
911  * @param cls the union operation
912  * @param mh the header of the message
913  */
914 static void
915 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
916 {
917   struct UnionEvaluateOperation *eo = cls;
918   struct IBFMessage *msg = (struct IBFMessage *) mh;
919   unsigned int buckets_in_message;
920
921   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
922        (eo->phase == PHASE_EXPECT_IBF) )
923   {
924     eo->phase = PHASE_EXPECT_IBF_CONT;
925     GNUNET_assert (NULL == eo->remote_ibf);
926     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
927     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
928     if (0 != ntohs (msg->offset))
929     {
930       GNUNET_break (0);
931       fail_union_operation (eo);
932     }
933   }
934   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
935   {
936     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
937          (1<<msg->order != eo->remote_ibf->size) )
938     {
939       GNUNET_break (0);
940       fail_union_operation (eo);
941       return;
942     }
943   }
944
945   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
946
947   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
948   {
949     GNUNET_break (0);
950     fail_union_operation (eo);
951     return;
952   }
953
954   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
955   eo->ibf_buckets_received += buckets_in_message;
956
957   if (eo->ibf_buckets_received == eo->remote_ibf->size)
958   {
959
960     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
961     eo->phase = PHASE_EXPECT_ELEMENTS;
962     decode_and_send (eo);
963   }
964 }
965
966
967 /**
968  * Send a result message to the client indicating
969  * that there is a new element.
970  *
971  * @param eo union operation
972  * @param element element to send
973  */
974 static void
975 send_client_element (struct UnionEvaluateOperation *eo,
976                      struct GNUNET_SET_Element *element)
977 {
978   struct GNUNET_MQ_Envelope *mqm;
979   struct GNUNET_SET_ResultMessage *rm;
980
981   GNUNET_assert (0 != eo->request_id);
982   mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
983   if (NULL == mqm)
984   {
985     GNUNET_MQ_discard (mqm);
986     GNUNET_break (0);
987     return;
988   }
989   rm->result_status = htons (GNUNET_SET_STATUS_OK);
990   rm->request_id = htonl (eo->request_id);
991   memcpy (&rm[1], element->data, element->size);
992   GNUNET_MQ_send (eo->set->client_mq, mqm);
993 }
994
995
996 /**
997  * Send a result message to the client indicating
998  * that the operation is over.
999  * After the result done message has been sent to the client,
1000  * destroy the evaluate operation.
1001  *
1002  * @param eo union operation
1003  */
1004 static void
1005 send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1006 {
1007   struct GNUNET_MQ_Envelope *mqm;
1008   struct GNUNET_SET_ResultMessage *rm;
1009
1010   GNUNET_assert (0 != eo->request_id);
1011   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1012   rm->request_id = htonl (eo->request_id);
1013   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1014   GNUNET_MQ_send (eo->set->client_mq, mqm);
1015
1016 }
1017
1018
1019 /**
1020  * Handle an element message from a remote peer.
1021  *
1022  * @param cls the union operation
1023  * @param mh the message
1024  */
1025 static void
1026 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1027 {
1028   struct UnionEvaluateOperation *eo = cls;
1029   struct ElementEntry *ee;
1030   uint16_t element_size;
1031
1032   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
1033
1034   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1035        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1036   {
1037     fail_union_operation (eo);
1038     GNUNET_break (0);
1039     return;
1040   }
1041   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1042   ee = GNUNET_malloc (sizeof *ee + element_size);
1043   memcpy (&ee[1], &mh[1], element_size);
1044   ee->element.data = &ee[1];
1045   ee->remote = GNUNET_YES;
1046
1047   insert_element (eo, ee);
1048   send_client_element (eo, &ee->element);
1049
1050   GNUNET_free (ee);
1051 }
1052
1053
1054 /**
1055  * Handle an element request from a remote peer.
1056  *
1057  * @param cls the union operation
1058  * @param mh the message
1059  */
1060 static void
1061 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1062 {
1063   struct UnionEvaluateOperation *eo = cls;
1064   struct IBF_Key *ibf_key;
1065   unsigned int num_keys;
1066
1067   /* look up elements and send them */
1068   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1069   {
1070     GNUNET_break (0);
1071     fail_union_operation (eo);
1072     return;
1073   }
1074
1075   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1076
1077   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1078   {
1079     GNUNET_break (0);
1080     fail_union_operation (eo);
1081     return;
1082   }
1083
1084   ibf_key = (struct IBF_Key *) &mh[1];
1085   while (0 != num_keys--)
1086   {
1087     send_elements_for_key (eo, *ibf_key);
1088     ibf_key++;
1089   }
1090 }
1091
1092
1093 /**
1094  * Callback used for notifications
1095  *
1096  * @param cls closure
1097  */
1098 static void
1099 peer_done_sent_cb (void *cls)
1100 {
1101   struct UnionEvaluateOperation *eo = cls;
1102
1103   send_client_done_and_destroy (eo);
1104 }
1105
1106
1107 /**
1108  * Handle a done message from a remote peer
1109  *
1110  * @param cls the union operation
1111  * @param mh the message
1112  */
1113 static void
1114 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1115 {
1116   struct UnionEvaluateOperation *eo = cls;
1117
1118   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1119   {
1120     /* we got all requests, but still have to send our elements as response */
1121     struct GNUNET_MQ_Envelope *mqm;
1122
1123     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
1124     eo->phase = PHASE_FINISHED;
1125     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1126     GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
1127     GNUNET_MQ_send (eo->tc->mq, mqm);
1128     return;
1129   }
1130   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1131   {
1132     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
1133     eo->phase = PHASE_FINISHED;
1134     send_client_done_and_destroy (eo);
1135     return;
1136   }
1137   GNUNET_break (0);
1138   fail_union_operation (eo);
1139 }
1140
1141
1142 /**
1143  * Evaluate a union operation with
1144  * a remote peer.
1145  *
1146  * @param m the evaluate request message from the client
1147  * @param set the set to evaluate the operation with
1148  */
1149 void
1150 _GSS_intersection_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
1151 {
1152   struct IntersectionEvaluateOperation *eo;
1153   struct GNUNET_MessageHeader *context_msg;
1154
1155   eo = GNUNET_new (struct IntersectionEvaluateOperation);
1156   eo->peer = m->target_peer;
1157   eo->set = set;
1158   eo->request_id = htonl (m->request_id);
1159   GNUNET_assert (0 != eo->request_id);
1160   eo->se = strata_estimator_dup (set->state.i->se);
1161   eo->salt = ntohs (m->salt);
1162   eo->app_id = m->app_id;
1163
1164   context_msg = GNUNET_MQ_extract_nested_mh (m);
1165   if (NULL != context_msg)
1166   {
1167     eo->context_msg = GNUNET_copy_message (context_msg);
1168   }
1169
1170   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1171               "evaluating intersection operation, (app %s)\n",
1172               GNUNET_h2s (&eo->app_id));
1173
1174   eo->tc = GNUNET_new (struct TunnelContext);
1175   eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
1176                                               GNUNET_APPLICATION_TYPE_SET);
1177   GNUNET_assert (NULL != eo->tc->tunnel);
1178   eo->tc->peer = eo->peer;
1179   eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
1180   /* we started the operation, thus we have to send the operation request */
1181   eo->phase = PHASE_EXPECT_SE;
1182
1183   GNUNET_CONTAINER_DLL_insert (eo->set->state.i->ops_head,
1184                                eo->set->state.i->ops_tail,
1185                                eo);
1186
1187   send_operation_request (eo);
1188 }
1189
1190
1191 /**
1192  * Accept an union operation request from a remote peer
1193  *
1194  * @param m the accept message from the client
1195  * @param set the set of the client
1196  * @param incoming information about the requesting remote peer
1197  */
1198 void
1199 _GSS_intersection_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
1200                    struct Incoming *incoming)
1201 {
1202   struct IntersectionEvaluateOperation *eo;
1203
1204   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1205
1206   eo = GNUNET_new (struct IntersectionEvaluateOperation);
1207   eo->tc = incoming->tc;
1208   eo->generation_created = set->state.i->current_generation++;
1209   eo->set = set;
1210   eo->salt = ntohs (incoming->salt);
1211   GNUNET_assert (0 != ntohl (m->request_id));
1212   eo->request_id = ntohl (m->request_id);
1213   eo->se = strata_estimator_dup (set->state.i->se);
1214   /* transfer ownership of mq and socket from incoming to eo */
1215   GNUNET_CONTAINER_DLL_insert (eo->set->state.i->ops_head,
1216                                eo->set->state.i->ops_tail,
1217                                eo);
1218   /* kick off the operation */
1219   send_strata_estimator (eo);
1220 }
1221
1222
1223 /**
1224  * Create a new set supporting the intersection operation
1225  *
1226  * @return the newly created set
1227  */
1228 struct Set *
1229 _GSS_intersection_set_create (void)
1230 {
1231   struct Set *set;
1232
1233   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "intersection set created\n");
1234
1235   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct IntersectionState));
1236   set->state.i = (struct IntersectionState *) &set[1];
1237   set->operation = GNUNET_SET_OPERATION_INTERSECTION;
1238   /* keys of the hash map are stored in the element entrys, thus we do not
1239    * want the hash map to copy them */
1240   set->state.i->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1241   set->state.i->se = strata_estimator_create (SE_STRATA_COUNT,
1242                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);
1243   return set;
1244 }
1245
1246
1247 /**
1248  * Add the element from the given element message to the set.
1249  *
1250  * @param m message with the element
1251  * @param set set to add the element to
1252  */
1253 void
1254 _GSS_intersection_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1255 {
1256   struct ElementEntry *ee;
1257   struct ElementEntry *ee_dup;
1258   uint16_t element_size;
1259
1260   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
1261
1262   GNUNET_assert (GNUNET_SET_OPERATION_INTERSECTION == set->operation);
1263   element_size = ntohs (m->header.size) - sizeof *m;
1264   ee = GNUNET_malloc (element_size + sizeof *ee);
1265   ee->element.size = element_size;
1266   memcpy (&ee[1], &m[1], element_size);
1267   ee->element.data = &ee[1];
1268   ee->generation_added = set->state.i->current_generation;
1269   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1270   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.i->elements, &ee->element_hash);
1271   if (NULL != ee_dup)
1272   {
1273     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1274     GNUNET_free (ee);
1275     return;
1276   }
1277   GNUNET_CONTAINER_multihashmap_put (set->state.i->elements, &ee->element_hash, ee,
1278                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1279   strata_estimator_insert (set->state.i->se, get_ibf_key (&ee->element_hash, 0));
1280 }
1281
1282
1283 /**
1284  * Destroy a set that supports the union operation
1285  *
1286  * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
1287  */
1288 void
1289 _GSS_union_set_destroy (struct Set *set)
1290 {
1291   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1292   if (NULL != set->client)
1293   {
1294     GNUNET_SERVER_client_drop (set->client);
1295     set->client = NULL;
1296   }
1297   if (NULL != set->client_mq)
1298   {
1299     GNUNET_MQ_destroy (set->client_mq);
1300     set->client_mq = NULL;
1301   }
1302
1303   if (NULL != set->state.u->se)
1304   {
1305     strata_estimator_destroy (set->state.u->se);
1306     set->state.u->se = NULL;
1307   }
1308
1309   destroy_elements (set->state.u);
1310
1311   while (NULL != set->state.u->ops_head)
1312   {
1313     _GSS_union_operation_destroy (set->state.u->ops_head);
1314   }
1315 }
1316
1317 /**
1318  * Remove the element given in the element message from the set.
1319  * Only marks the element as removed, so that older set operations can still exchange it.
1320  *
1321  * @param m message with the element
1322  * @param set set to remove the element from
1323  */
1324 void
1325 _GSS_intersection_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1326 {
1327   struct GNUNET_HashCode hash;
1328   struct ElementEntry *ee;
1329
1330   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1331   GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1332   ee = GNUNET_CONTAINER_multihashmap_get (set->state.i->elements, &hash);
1333   if (NULL == ee)
1334   {
1335     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1336     return;
1337   }
1338   if (GNUNET_YES == ee->removed)
1339   {
1340     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1341     return;
1342   }
1343   ee->removed = GNUNET_YES;
1344   ee->generation_removed = set->state.i->current_generation;
1345 }
1346
1347
1348 /**
1349  * Dispatch messages for a union operation.
1350  *
1351  * @param cls closure
1352  * @param tunnel mesh tunnel
1353  * @param tunnel_ctx tunnel context
1354  * @param mh message to process
1355  * @return ???
1356  */
1357 int
1358 _GSS_union_handle_p2p_message (void *cls,
1359                                struct GNUNET_MESH_Tunnel *tunnel,
1360                                void **tunnel_ctx,
1361                                const struct GNUNET_MessageHeader *mh)
1362 {
1363   struct TunnelContext *tc = *tunnel_ctx;
1364   struct UnionEvaluateOperation *eo;
1365
1366   if (CONTEXT_OPERATION_UNION != tc->type)
1367   {
1368     /* never kill mesh */
1369     return GNUNET_OK;
1370   }
1371
1372   eo = tc->data;
1373
1374   switch (ntohs (mh->type))
1375   {
1376     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1377       handle_p2p_ibf (eo, mh);
1378       break;
1379     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1380       handle_p2p_strata_estimator (eo, mh);
1381       break;
1382     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1383       handle_p2p_elements (eo, mh);
1384       break;
1385     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1386       handle_p2p_element_requests (eo, mh);
1387       break;
1388     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1389       handle_p2p_done (eo, mh);
1390       break;
1391     default:
1392       /* something wrong with mesh's message handlers? */
1393       GNUNET_assert (0);
1394   }
1395   /* never kill mesh! */
1396   return GNUNET_OK;
1397 }