5b1f28cf432b32258274802c902043e7af52fc96
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       (C) 2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file set/gnunet-service-set.c
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26
27
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
31 #include "ibf.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
34 #include <gcrypt.h>
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * hash num parameter for the difference digests and strata estimators
47  */
48 #define SE_IBF_HASH_NUM 3
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62
63 /**
64  * Current phase we are in for a union operation
65  */
66 enum UnionOperationPhase
67 {
68   /**
69    * We sent the request message, and expect a strata estimator
70    */
71   PHASE_EXPECT_SE,
72   /**
73    * We sent the strata estimator, and expect an IBF
74    */
75   PHASE_EXPECT_IBF,
76   /**
77    * We know what type of IBF the other peer wants to send us,
78    * and expect the remaining parts
79    */
80   PHASE_EXPECT_IBF_CONT,
81   /**
82    * We are sending request and elements,
83    * and thus only expect elements from the other peer.
84    */
85   PHASE_EXPECT_ELEMENTS,
86   /**
87    * We are expecting elements and requests, and send
88    * requested elements back to the other peer.
89    */
90   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
91   /**
92    * The protocol is over.
93    * Results may still have to be sent to the client.
94    */
95   PHASE_FINISHED
96 };
97
98
99 /**
100  * State of an evaluate operation
101  * with another peer.
102  */
103 struct UnionEvaluateOperation
104 {
105   /**
106    * Local set the operation is evaluated on.
107    */
108   struct Set *set;
109
110   /**
111    * Peer with the remote set
112    */
113   struct GNUNET_PeerIdentity peer;
114
115   /**
116    * Application-specific identifier
117    */
118   struct GNUNET_HashCode app_id;
119
120   /**
121    * Context message, given to us
122    * by the client, may be NULL.
123    */
124   struct GNUNET_MessageHeader *context_msg;
125
126   /**
127    * Tunnel context for the peer we
128    * evaluate the union operation with.
129    */
130   struct TunnelContext *tc;
131
132   /**
133    * Request ID to multiplex set operations to
134    * the client inhabiting the set.
135    */
136   uint32_t request_id;
137
138   /**
139    * Number of ibf buckets received
140    */
141   unsigned int ibf_buckets_received;
142
143   /**
144    * Copy of the set's strata estimator at the time of
145    * creation of this operation
146    */
147   struct StrataEstimator *se;
148
149   /**
150    * The ibf we currently receive
151    */
152   struct InvertibleBloomFilter *remote_ibf;
153
154   /**
155    * IBF of the set's element.
156    */
157   struct InvertibleBloomFilter *local_ibf;
158
159   /**
160    * Maps IBF-Keys (specific to the current salt) to elements.
161    */
162   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
163
164   /**
165    * Current state of the operation.
166    */
167   enum UnionOperationPhase phase;
168
169   /**
170    * Salt to use for this operation.
171    */
172   uint16_t salt;
173
174   /**
175    * Generation in which the operation handle
176    * was created.
177    */
178   unsigned int generation_created;
179   
180   /**
181    * Evaluate operations are held in
182    * a linked list.
183    */
184   struct UnionEvaluateOperation *next;
185   
186    /**
187    * Evaluate operations are held in
188    * a linked list.
189    */
190   struct UnionEvaluateOperation *prev;
191 };
192
193
194 /**
195  * Information about the element in a set.
196  * All elements are stored in a hash-table
197  * from their hash-code to their 'struct Element',
198  * so that the remove and add operations are reasonably
199  * fast.
200  */
201 struct ElementEntry
202 {
203   /**
204    * The actual element. The data for the element
205    * should be allocated at the end of this struct.
206    */
207   struct GNUNET_SET_Element element;
208
209   /**
210    * Hash of the element.
211    * Will be used to derive the different IBF keys
212    * for different salts.
213    */
214   struct GNUNET_HashCode element_hash;
215
216   /**
217    * Generation the element was added by the client.
218    * Operations of earlier generations will not consider the element.
219    */
220   unsigned int generation_added;
221
222   /**
223    * GNUNET_YES if the element has been removed in some generation.
224    */
225   int removed;
226
227   /**
228    * Generation the element was removed by the client. 
229    * Operations of later generations will not consider the element.
230    * Only valid if is_removed is GNUNET_YES.
231    */
232   unsigned int generation_removed;
233
234   /**
235    * GNUNET_YES if the element is a remote element, and does not belong
236    * to the operation's set.
237    */
238   int remote;
239 };
240
241
242 /**
243  * Entries in the key-to-element map of the union set.
244  */
245 struct KeyEntry
246 {
247   /**
248    * IBF key for the entry, derived from the current salt.
249    */
250   struct IBF_Key ibf_key;
251
252   /**
253    * The actual element associated with the key
254    */
255   struct ElementEntry *element;
256
257   /**
258    * Element that collides with this element
259    * on the ibf key
260    */
261   struct KeyEntry *next_colliding;
262 };
263
264 /**
265  * Used as a closure for sending elements
266  * with a specific IBF key.
267  */
268 struct SendElementClosure
269 {
270   /**
271    * The IBF key whose matching elements should be
272    * sent.
273    */
274   struct IBF_Key ibf_key;
275
276   /**
277    * Operation for which the elements
278    * should be sent.
279    */
280   struct UnionEvaluateOperation *eo;
281 };
282
283
284 /**
285  * Extra state required for efficient set union.
286  */
287 struct UnionState
288 {
289   /**
290    * The strata estimator is only generated once for
291    * each set.
292    * The IBF keys are derived from the element hashes with
293    * salt=0.
294    */
295   struct StrataEstimator *se;
296
297   /**
298    * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
299    */
300   struct GNUNET_CONTAINER_MultiHashMap *elements;
301
302   /**
303    * Evaluate operations are held in
304    * a linked list.
305    */
306   struct UnionEvaluateOperation *ops_head;
307
308   /**
309    * Evaluate operations are held in
310    * a linked list.
311    */
312   struct UnionEvaluateOperation *ops_tail;
313
314   /**
315    * Current generation, that is, number of
316    * previously executed operations on this set
317    */
318   unsigned int current_generation;
319 };
320
321
322
323 /**
324  * Iterator over hash map entries.
325  *
326  * @param cls closure
327  * @param key current key code
328  * @param value value in the hash map
329  * @return GNUNET_YES if we should continue to
330  *         iterate,
331  *         GNUNET_NO if not.
332  */
333 static int
334 destroy_elements_iterator (void *cls,
335                            const struct GNUNET_HashCode * key,
336                            void *value)
337 {
338   struct ElementEntry *ee = value;
339
340   GNUNET_free (ee);
341   return GNUNET_YES;
342 }
343
344
345 /**
346  * Destroy the elements belonging to a union set.
347  *
348  * @param us union state that contains the elements
349  */
350 static void
351 destroy_elements (struct UnionState *us)
352 {
353   if (NULL == us->elements)
354     return;
355   GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
356   GNUNET_CONTAINER_multihashmap_destroy (us->elements);
357   us->elements = NULL;
358 }
359
360
361
362 /**
363  * Iterator over hash map entries.
364  *
365  * @param cls closure
366  * @param key current key code
367  * @param value value in the hash map
368  * @return GNUNET_YES if we should continue to
369  *         iterate,
370  *         GNUNET_NO if not.
371  */
372 static int
373 destroy_key_to_element_iter (void *cls,
374                              uint32_t key,
375                              void *value)
376 {
377   struct KeyEntry *k = value;
378   
379   while (NULL != k)
380   {
381     struct KeyEntry *k_tmp = k;
382     k = k->next_colliding;
383     GNUNET_free (k_tmp);
384   }
385   return GNUNET_YES;
386 }
387
388
389 /**
390  * Destroy a union operation, and free all resources
391  * associated with it.
392  *
393  * @param eo the union operation to destroy
394  */
395 void
396 _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
397 {
398   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
399   
400   if (NULL != eo->tc)
401   {
402     GNUNET_MQ_destroy (eo->tc->mq);
403     GNUNET_MESH_tunnel_destroy (eo->tc->tunnel);
404     GNUNET_free (eo->tc);
405     eo->tc = NULL;
406   }
407
408   if (NULL != eo->remote_ibf)
409   {
410     ibf_destroy (eo->remote_ibf);
411     eo->remote_ibf = NULL;
412   }
413   if (NULL != eo->local_ibf)
414   {
415     ibf_destroy (eo->local_ibf);
416     eo->local_ibf = NULL;
417   }
418   if (NULL != eo->se)
419   {
420     strata_estimator_destroy (eo->se);
421     eo->se = NULL;
422   }
423   if (NULL != eo->key_to_element)
424   {
425     GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
426     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
427     eo->key_to_element = NULL;
428   }
429
430   GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
431                                eo->set->state.u->ops_tail,
432                                eo);
433   GNUNET_free (eo);
434
435
436   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
437
438
439   /* FIXME: do a garbage collection of the set generations */
440 }
441
442
443 /**
444  * Inform the client that the union operation has failed,
445  * and proceed to destroy the evaluate operation.
446  *
447  * @param eo the union operation to fail
448  */
449 static void
450 fail_union_operation (struct UnionEvaluateOperation *eo)
451 {
452   struct GNUNET_MQ_Envelope *mqm;
453   struct GNUNET_SET_ResultMessage *msg;
454
455   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
456   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
457   msg->request_id = htonl (eo->request_id);
458   GNUNET_MQ_send (eo->set->client_mq, mqm);
459   _GSS_union_operation_destroy (eo);
460 }
461
462
463 /**
464  * Derive the IBF key from a hash code and 
465  * a salt.
466  *
467  * @param src the hash code
468  * @param salt salt to use
469  * @return the derived IBF key
470  */
471 static struct IBF_Key
472 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
473 {
474   struct IBF_Key key;
475
476   GNUNET_CRYPTO_hkdf (&key, sizeof (key),
477                       GCRY_MD_SHA512, GCRY_MD_SHA256,
478                       src, sizeof *src,
479                       &salt, sizeof (salt),
480                       NULL, 0);
481   return key;
482 }
483
484
485 /**
486  * Send a request for the evaluate operation to a remote peer
487  *
488  * @param eo operation with the other peer
489  */
490 static void
491 send_operation_request (struct UnionEvaluateOperation *eo)
492 {
493   struct GNUNET_MQ_Envelope *mqm;
494   struct OperationRequestMessage *msg;
495
496   mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
497                                  eo->context_msg);
498
499   if (NULL == mqm)
500   {
501     /* the context message is too large */
502     GNUNET_break (0);
503     GNUNET_SERVER_client_disconnect (eo->set->client);
504     return;
505   }
506   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
507   msg->app_id = eo->app_id;
508   GNUNET_MQ_send (eo->tc->mq, mqm);
509
510   if (NULL != eo->context_msg)
511   {
512     GNUNET_free (eo->context_msg);
513     eo->context_msg = NULL;
514   }
515
516   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
517 }
518
519
520 /**
521  * Iterator to create the mapping between ibf keys
522  * and element entries.
523  *
524  * @param cls closure
525  * @param key current key code
526  * @param value value in the hash map
527  * @return GNUNET_YES if we should continue to
528  *         iterate,
529  *         GNUNET_NO if not.
530  */
531 static int
532 insert_element_iterator (void *cls,
533                          uint32_t key,
534                          void *value)
535 {
536   struct KeyEntry *const new_k = cls;
537   struct KeyEntry *old_k = value;
538
539   GNUNET_assert (NULL != old_k);
540   do
541   {
542     if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
543     {
544       new_k->next_colliding = old_k->next_colliding;
545       old_k->next_colliding = new_k;
546       return GNUNET_NO;
547     }
548     old_k = old_k->next_colliding;
549   } while (NULL != old_k);
550   return GNUNET_YES;
551 }
552
553
554 /**
555  * Insert an element into the union operation's
556  * key-to-element mapping
557  *
558  * @param eo the union operation
559  * @param ee the element entry
560  */
561 static void
562 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
563 {
564   int ret;
565   struct IBF_Key ibf_key;
566   struct KeyEntry *k;
567
568   ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
569   k = GNUNET_new (struct KeyEntry);
570   k->element = ee;
571   k->ibf_key = ibf_key;
572   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
573                                                       (uint32_t) ibf_key.key_val,
574                                                       insert_element_iterator, k);
575
576   /* was the element inserted into a colliding bucket? */
577   if (GNUNET_SYSERR == ret)
578     return;
579
580   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
581                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
582 }
583
584
585 /**
586  * Insert a key into an ibf.
587  *
588  * @param cls the ibf
589  * @param key unused
590  * @param value the key entry to get the key from
591  */
592 static int
593 prepare_ibf_iterator (void *cls,
594                       uint32_t key,
595                       void *value)
596 {
597   struct InvertibleBloomFilter *ibf = cls;
598   struct KeyEntry *ke = value;
599
600   ibf_insert (ibf, ke->ibf_key);
601   return GNUNET_YES;
602 }
603
604
605 /**
606  * Iterator for initializing the
607  * key-to-element mapping of a union operation
608  *
609  * @param cls the union operation
610  * @param key unised
611  * @param value the element entry to insert
612  *        into the key-to-element mapping
613  */
614 static int
615 init_key_to_element_iterator (void *cls,
616                               const struct GNUNET_HashCode *key,
617                               void *value)
618 {
619   struct UnionEvaluateOperation *eo = cls;
620   struct ElementEntry *e = value;
621
622   /* make sure that the element belongs to the set at the time
623    * of creating the operation */
624   if ( (e->generation_added > eo->generation_created) ||
625        ( (GNUNET_YES == e->removed) &&
626          (e->generation_removed < eo->generation_created)))
627     return GNUNET_YES;
628
629   insert_element (eo, e);
630   return GNUNET_YES;
631 }
632
633
634 /**
635  * Create an ibf with the operation's elements
636  * of the specified size
637  *
638  * @param eo the union operation
639  * @param size size of the ibf to create
640  */
641 static void
642 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
643 {
644   if (NULL == eo->key_to_element)
645   {
646     unsigned int len;
647     len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
648     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
649     GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
650                                              init_key_to_element_iterator, eo);
651   }
652   if (NULL != eo->local_ibf)
653     ibf_destroy (eo->local_ibf);
654   eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
655   GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
656                                            prepare_ibf_iterator, eo->local_ibf);
657 }
658
659
660 /**
661  * Send an ibf of appropriate size.
662  *
663  * @param eo the union operation
664  * @param ibf_order order of the ibf to send, size=2^order
665  */
666 static void
667 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
668 {
669   unsigned int buckets_sent = 0;
670   struct InvertibleBloomFilter *ibf;
671
672   prepare_ibf (eo, 1<<ibf_order);
673
674   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
675
676   ibf = eo->local_ibf;
677
678   while (buckets_sent < (1 << ibf_order))
679   {
680     unsigned int buckets_in_message;
681     struct GNUNET_MQ_Envelope *mqm;
682     struct IBFMessage *msg;
683
684     buckets_in_message = (1 << ibf_order) - buckets_sent;
685     /* limit to maximum */
686     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
687       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
688
689     mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
690                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
691     msg->order = ibf_order;
692     msg->offset = htons (buckets_sent);
693     ibf_write_slice (ibf, buckets_sent,
694                      buckets_in_message, &msg[1]);
695     buckets_sent += buckets_in_message;
696     GNUNET_MQ_send (eo->tc->mq, mqm);
697   }
698
699   eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
700 }
701
702
703 /**
704  * Send a strata estimator to the remote peer.
705  *
706  * @param eo the union operation with the remote peer
707  */
708 static void
709 send_strata_estimator (struct UnionEvaluateOperation *eo)
710 {
711   struct GNUNET_MQ_Envelope *mqm;
712   struct GNUNET_MessageHeader *strata_msg;
713
714   mqm = GNUNET_MQ_msg_header_extra (strata_msg,
715                                     SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
716                                     GNUNET_MESSAGE_TYPE_SET_P2P_SE);
717   strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
718   GNUNET_MQ_send (eo->tc->mq, mqm);
719   eo->phase = PHASE_EXPECT_IBF;
720 }
721
722
723 /**
724  * Compute the necessary order of an ibf
725  * from the size of the symmetric set difference.
726  *
727  * @param diff the difference
728  * @return the required size of the ibf
729  */
730 static unsigned int
731 get_order_from_difference (unsigned int diff)
732 {
733   unsigned int ibf_order;
734
735   ibf_order = 2;
736   while ((1<<ibf_order) < (2 * diff))
737     ibf_order++;
738   if (ibf_order > MAX_IBF_ORDER)
739     ibf_order = MAX_IBF_ORDER;
740   return ibf_order;
741 }
742
743
744 /**
745  * Handle a strata estimator from a remote peer
746  *
747  * @param cls the union operation
748  * @param mh the message
749  */
750 static void
751 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
752 {
753   struct UnionEvaluateOperation *eo = cls;
754   struct StrataEstimator *remote_se;
755   int diff;
756
757
758   if (eo->phase != PHASE_EXPECT_SE)
759   {
760     fail_union_operation (eo);
761     GNUNET_break (0);
762     return;
763   }
764   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
765                                        SE_IBF_HASH_NUM);
766   strata_estimator_read (&mh[1], remote_se);
767   GNUNET_assert (NULL != eo->se);
768   diff = strata_estimator_difference (remote_se, eo->se);
769   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
770   strata_estimator_destroy (remote_se);
771   strata_estimator_destroy (eo->se);
772   eo->se = NULL;
773   send_ibf (eo, get_order_from_difference (diff));
774 }
775
776
777
778 /**
779  * Iterator to send elements to a remote peer
780  *
781  * @param cls closure with the element key and the union operation
782  * @param key ignored
783  * @param value the key entry
784  */
785 static int
786 send_element_iterator (void *cls,
787                        uint32_t key,
788                        void *value)
789 {
790   struct SendElementClosure *sec = cls;
791   struct IBF_Key ibf_key = sec->ibf_key;
792   struct UnionEvaluateOperation *eo = sec->eo;
793   struct KeyEntry *ke = value;
794
795   if (ke->ibf_key.key_val != ibf_key.key_val)
796     return GNUNET_YES;
797   while (NULL != ke)
798   {
799     const struct GNUNET_SET_Element *const element = &ke->element->element;
800     struct GNUNET_MQ_Envelope *mqm;
801     struct GNUNET_MessageHeader *mh;
802
803     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
804     mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
805     if (NULL == mqm)
806     {
807       /* element too large */
808       GNUNET_break (0);
809       continue;
810     }
811     memcpy (&mh[1], element->data, element->size);
812     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
813     GNUNET_MQ_send (eo->tc->mq, mqm);
814     ke = ke->next_colliding;
815   }
816   return GNUNET_NO;
817 }
818
819 /**
820  * Send all elements that have the specified IBF key
821  * to the remote peer of the union operation
822  *
823  * @param eo union operation
824  * @param ibf_key IBF key of interest
825  */
826 static void
827 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
828 {
829   struct SendElementClosure send_cls;
830
831   send_cls.ibf_key = ibf_key;
832   send_cls.eo = eo;
833   GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
834                                                 &send_element_iterator, &send_cls);
835 }
836
837
838 /**
839  * Decode which elements are missing on each side, and
840  * send the appropriate elemens and requests
841  *
842  * @param eo union operation
843  */
844 static void
845 decode_and_send (struct UnionEvaluateOperation *eo)
846 {
847   struct IBF_Key key;
848   int side;
849   struct InvertibleBloomFilter *diff_ibf;
850
851   GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
852
853   prepare_ibf (eo, eo->remote_ibf->size);
854   diff_ibf = ibf_dup (eo->local_ibf);
855   ibf_subtract (diff_ibf, eo->remote_ibf);
856
857   while (1)
858   {
859     int res;
860
861     res = ibf_decode (diff_ibf, &side, &key);
862     if (GNUNET_SYSERR == res)
863     {
864       int next_order;
865       next_order = 0;
866       while (1<<next_order < diff_ibf->size)
867         next_order++;
868       next_order++;
869       if (next_order <= MAX_IBF_ORDER)
870       {
871         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
872                     "decoding failed, sending larger ibf (size %u)\n",
873                     1<<next_order);
874         send_ibf (eo, next_order);
875       }
876       else
877       {
878         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
879                     "set union failed: reached ibf limit\n");
880       }
881       break;
882     }
883     if (GNUNET_NO == res)
884     {
885       struct GNUNET_MQ_Envelope *mqm;
886
887       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
888       mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
889       GNUNET_MQ_send (eo->tc->mq, mqm);
890       break;
891     }
892     if (1 == side)
893     {
894       send_elements_for_key (eo, key);
895     }
896     else
897     {
898       struct GNUNET_MQ_Envelope *mqm;
899       struct GNUNET_MessageHeader *msg;
900
901       /* FIXME: before sending the request, check if we may just have the element */
902       /* FIXME: merge multiple requests */
903       mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
904                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
905       *(struct IBF_Key *) &msg[1] = key;
906       GNUNET_MQ_send (eo->tc->mq, mqm);
907     }
908   }
909   ibf_destroy (diff_ibf);
910 }
911
912
913 /**
914  * Handle an IBF message from a remote peer.
915  *
916  * @param cls the union operation
917  * @param mh the header of the message
918  */
919 static void
920 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
921 {
922   struct UnionEvaluateOperation *eo = cls;
923   struct IBFMessage *msg = (struct IBFMessage *) mh;
924   unsigned int buckets_in_message;
925
926   if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
927        (eo->phase == PHASE_EXPECT_IBF) )
928   {
929     eo->phase = PHASE_EXPECT_IBF_CONT;
930     GNUNET_assert (NULL == eo->remote_ibf);
931     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
932     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
933     if (0 != ntohs (msg->offset))
934     {
935       GNUNET_break (0);
936       fail_union_operation (eo);
937     }
938   }
939   else if (eo->phase == PHASE_EXPECT_IBF_CONT)
940   {
941     if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
942          (1<<msg->order != eo->remote_ibf->size) )
943     {
944       GNUNET_break (0);
945       fail_union_operation (eo);
946       return;
947     }
948   }
949
950   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
951
952   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
953   {
954     GNUNET_break (0);
955     fail_union_operation (eo);
956     return;
957   }
958   
959   ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
960   eo->ibf_buckets_received += buckets_in_message;
961
962   if (eo->ibf_buckets_received == eo->remote_ibf->size)
963   {
964
965     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
966     eo->phase = PHASE_EXPECT_ELEMENTS;
967     decode_and_send (eo);
968   }
969 }
970
971
972 /**
973  * Send a result message to the client indicating
974  * that there is a new element.
975  *
976  * @param eo union operation
977  * @param element element to send
978  */
979 static void
980 send_client_element (struct UnionEvaluateOperation *eo,
981                      struct GNUNET_SET_Element *element)
982 {
983   struct GNUNET_MQ_Envelope *mqm;
984   struct GNUNET_SET_ResultMessage *rm;
985
986   GNUNET_assert (0 != eo->request_id);
987   mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
988   if (NULL == mqm)
989   {
990     GNUNET_MQ_discard (mqm);
991     GNUNET_break (0);
992     return;
993   }
994   rm->result_status = htons (GNUNET_SET_STATUS_OK);
995   rm->request_id = htonl (eo->request_id);
996   memcpy (&rm[1], element->data, element->size);
997   GNUNET_MQ_send (eo->set->client_mq, mqm);
998 }
999
1000
1001 /**
1002  * Send a result message to the client indicating
1003  * that the operation is over.
1004  * After the result done message has been sent to the client,
1005  * destroy the evaluate operation.
1006  *
1007  * @param eo union operation
1008  */
1009 static void
1010 send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1011 {
1012   struct GNUNET_MQ_Envelope *mqm;
1013   struct GNUNET_SET_ResultMessage *rm;
1014
1015   GNUNET_assert (0 != eo->request_id);
1016   mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1017   rm->request_id = htonl (eo->request_id);
1018   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1019   GNUNET_MQ_send (eo->set->client_mq, mqm);
1020
1021 }
1022
1023
1024 /**
1025  * Handle an element message from a remote peer.
1026  *
1027  * @param cls the union operation
1028  * @param mh the message
1029  */
1030 static void
1031 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1032 {
1033   struct UnionEvaluateOperation *eo = cls;
1034   struct ElementEntry *ee;
1035   uint16_t element_size;
1036
1037   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
1038
1039   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1040        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1041   {
1042     fail_union_operation (eo);
1043     GNUNET_break (0);
1044     return;
1045   }
1046   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1047   ee = GNUNET_malloc (sizeof *eo + element_size);
1048   memcpy (&ee[1], &mh[1], element_size);
1049   ee->element.data = &ee[1];
1050   ee->remote = GNUNET_YES;
1051
1052   insert_element (eo, ee);
1053   send_client_element (eo, &ee->element);
1054
1055   GNUNET_free (ee);
1056 }
1057
1058
1059 /**
1060  * Handle an element request from a remote peer.
1061  *
1062  * @param cls the union operation
1063  * @param mh the message
1064  */
1065 static void
1066 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1067 {
1068   struct UnionEvaluateOperation *eo = cls;
1069   struct IBF_Key *ibf_key;
1070   unsigned int num_keys;
1071
1072   /* look up elements and send them */
1073   if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1074   {
1075     GNUNET_break (0);
1076     fail_union_operation (eo);
1077     return;
1078   }
1079
1080   num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1081
1082   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1083   {
1084     GNUNET_break (0);
1085     fail_union_operation (eo);
1086     return;
1087   }
1088
1089   ibf_key = (struct IBF_Key *) &mh[1];
1090   while (0 != num_keys--)
1091   {
1092     send_elements_for_key (eo, *ibf_key);
1093     ibf_key++;
1094   }
1095 }
1096
1097
1098 /**
1099  * Callback used for notifications
1100  *
1101  * @param cls closure
1102  */
1103 static void
1104 peer_done_sent_cb (void *cls)
1105 {
1106   struct UnionEvaluateOperation *eo = cls;
1107
1108   send_client_done_and_destroy (eo);
1109 }
1110
1111
1112 /**
1113  * Handle a done message from a remote peer
1114  * 
1115  * @param cls the union operation
1116  * @param mh the message
1117  */
1118 static void
1119 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1120 {
1121   struct UnionEvaluateOperation *eo = cls;
1122
1123   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1124   {
1125     /* we got all requests, but still have to send our elements as response */
1126     struct GNUNET_MQ_Envelope *mqm;
1127
1128     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
1129     eo->phase = PHASE_FINISHED;
1130     mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1131     GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
1132     GNUNET_MQ_send (eo->tc->mq, mqm);
1133     return;
1134   }
1135   if (eo->phase == PHASE_EXPECT_ELEMENTS)
1136   {
1137     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
1138     eo->phase = PHASE_FINISHED;
1139     send_client_done_and_destroy (eo);
1140     return;
1141   }
1142   GNUNET_break (0);
1143   fail_union_operation (eo);
1144 }
1145
1146
1147 /**
1148  * Evaluate a union operation with
1149  * a remote peer.
1150  *
1151  * @param m the evaluate request message from the client
1152  * @param set the set to evaluate the operation with
1153  */
1154 void
1155 _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
1156 {
1157   struct UnionEvaluateOperation *eo;
1158   struct GNUNET_MessageHeader *context_msg;
1159
1160   eo = GNUNET_new (struct UnionEvaluateOperation);
1161   eo->peer = m->target_peer;
1162   eo->set = set;
1163   eo->request_id = htonl (m->request_id);
1164   GNUNET_assert (0 != eo->request_id);
1165   eo->se = strata_estimator_dup (set->state.u->se);
1166   eo->salt = ntohs (m->salt);
1167   eo->app_id = m->app_id;
1168   
1169   context_msg = GNUNET_MQ_extract_nested_mh (m);
1170   if (NULL != context_msg)
1171   {
1172     eo->context_msg = GNUNET_copy_message (context_msg);
1173   }
1174
1175   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
1176               "evaluating union operation, (app %s)\n", 
1177               GNUNET_h2s (&eo->app_id));
1178
1179   eo->tc = GNUNET_new (struct TunnelContext);
1180   eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
1181                                               GNUNET_APPLICATION_TYPE_SET);
1182   GNUNET_assert (NULL != eo->tc->tunnel);
1183   eo->tc->peer = eo->peer;
1184   eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
1185   /* we started the operation, thus we have to send the operation request */
1186   eo->phase = PHASE_EXPECT_SE;
1187
1188   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1189                                eo->set->state.u->ops_tail,
1190                                eo);
1191
1192   send_operation_request (eo);
1193 }
1194
1195
1196 /**
1197  * Accept an union operation request from a remote peer
1198  *
1199  * @param m the accept message from the client
1200  * @param set the set of the client
1201  * @param incoming information about the requesting remote peer
1202  */
1203 void
1204 _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
1205                    struct Incoming *incoming)
1206 {
1207   struct UnionEvaluateOperation *eo;
1208
1209   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1210
1211   eo = GNUNET_new (struct UnionEvaluateOperation);
1212   eo->tc = incoming->tc;
1213   eo->generation_created = set->state.u->current_generation++;
1214   eo->set = set;
1215   eo->salt = ntohs (incoming->salt);
1216   GNUNET_assert (0 != ntohl (m->request_id));
1217   eo->request_id = ntohl (m->request_id);
1218   eo->se = strata_estimator_dup (set->state.u->se);
1219   /* transfer ownership of mq and socket from incoming to eo */
1220   GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1221                                eo->set->state.u->ops_tail,
1222                                eo);
1223   /* kick off the operation */
1224   send_strata_estimator (eo);
1225 }
1226
1227
1228 /**
1229  * Create a new set supporting the union operation
1230  *
1231  * @return the newly created set
1232  */
1233 struct Set *
1234 _GSS_union_set_create (void)
1235 {
1236   struct Set *set;
1237
1238   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
1239   
1240   set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
1241   set->state.u = (struct UnionState *) &set[1];
1242   set->operation = GNUNET_SET_OPERATION_UNION;
1243   /* keys of the hash map are stored in the element entrys, thus we do not
1244    * want the hash map to copy them */
1245   set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1246   set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
1247                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
1248   return set;
1249 }
1250
1251
1252 /**
1253  * Add the element from the given element message to the set.
1254  *
1255  * @param m message with the element
1256  * @param set set to add the element to
1257  */
1258 void
1259 _GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1260 {
1261   struct ElementEntry *ee;
1262   struct ElementEntry *ee_dup;
1263   uint16_t element_size;
1264
1265   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
1266
1267   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1268   element_size = ntohs (m->header.size) - sizeof *m;
1269   ee = GNUNET_malloc (element_size + sizeof *ee);
1270   ee->element.size = element_size;
1271   memcpy (&ee[1], &m[1], element_size);
1272   ee->element.data = &ee[1];
1273   ee->generation_added = set->state.u->current_generation;
1274   GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1275   ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1276   if (NULL != ee_dup)
1277   {
1278     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1279     GNUNET_free (ee);
1280     return;
1281   }
1282   GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
1283                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1284   strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
1285 }
1286
1287
1288 /**
1289  * Destroy a set that supports the union operation
1290  *
1291  * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
1292  */
1293 void
1294 _GSS_union_set_destroy (struct Set *set)
1295 {
1296   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1297   if (NULL != set->client)
1298   {
1299     GNUNET_SERVER_client_drop (set->client);
1300     set->client = NULL;
1301   }
1302   if (NULL != set->client_mq)
1303   {
1304     GNUNET_MQ_destroy (set->client_mq);
1305     set->client_mq = NULL;
1306   }
1307
1308   if (NULL != set->state.u->se)
1309   {
1310     strata_estimator_destroy (set->state.u->se);
1311     set->state.u->se = NULL;
1312   }
1313
1314   destroy_elements (set->state.u);
1315
1316   while (NULL != set->state.u->ops_head)
1317   {
1318     _GSS_union_operation_destroy (set->state.u->ops_head);
1319   }
1320 }
1321
1322 /**
1323  * Remove the element given in the element message from the set.
1324  * Only marks the element as removed, so that older set operations can still exchange it.
1325  *
1326  * @param m message with the element
1327  * @param set set to remove the element from
1328  */
1329 void
1330 _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1331 {
1332   struct GNUNET_HashCode hash;
1333   struct ElementEntry *ee;
1334
1335   GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1336   GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1337   ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1338   if (NULL == ee)
1339   {
1340     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1341     return;
1342   }
1343   if (GNUNET_YES == ee->removed)
1344   {
1345     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1346     return;
1347   }
1348   ee->removed = GNUNET_YES;
1349   ee->generation_removed = set->state.u->current_generation;
1350 }
1351
1352
1353 /**
1354  * Dispatch messages for a union operation.
1355  *
1356  * @param cls closure
1357  * @param tunnel mesh tunnel
1358  * @param tunnel_ctx tunnel context
1359  * @param mh message to process
1360  * @return ???
1361  */
1362 int
1363 _GSS_union_handle_p2p_message (void *cls,
1364                                struct GNUNET_MESH_Tunnel *tunnel,
1365                                void **tunnel_ctx,
1366                                const struct GNUNET_MessageHeader *mh)
1367 {
1368   struct TunnelContext *tc = *tunnel_ctx;
1369   struct UnionEvaluateOperation *eo;
1370
1371   if (CONTEXT_OPERATION_UNION != tc->type)
1372   {
1373     GNUNET_break_op (0);
1374     return GNUNET_SYSERR;
1375   }
1376
1377   eo = tc->data;
1378
1379   switch (ntohs (mh->type))
1380   {
1381     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1382       handle_p2p_ibf (eo, mh);
1383       break;
1384     case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1385       handle_p2p_strata_estimator (eo, mh);
1386       break;
1387     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1388       handle_p2p_elements (eo, mh);
1389       break;
1390     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1391       handle_p2p_element_requests (eo, mh);
1392       break;
1393     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1394       handle_p2p_done (eo, mh);
1395       break;
1396     default:
1397       /* something wrong with mesh's message handlers? */
1398       GNUNET_assert (0);
1399   }
1400   return GNUNET_OK;
1401 }