Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union.h"
32 #include "gnunet-service-set_union_strata_estimator.h"
33 #include "gnunet-service-set_protocol.h"
34 #include <gcrypt.h>
35
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
38
39
40 /**
41  * Number of IBFs in a strata estimator.
42  */
43 #define SE_STRATA_COUNT 32
44
45 /**
46  * Size of the IBFs in the strata estimator.
47  */
48 #define SE_IBF_SIZE 80
49
50 /**
51  * The hash num parameter for the difference digests and strata estimators.
52  */
53 #define SE_IBF_HASH_NUM 4
54
55 /**
56  * Number of buckets that can be transmitted in one message.
57  */
58 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
59
60 /**
61  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
62  * Choose this value so that computing the IBF is still cheaper
63  * than transmitting all values.
64  */
65 #define MAX_IBF_ORDER (20)
66
67 /**
68  * Number of buckets used in the ibf per estimated
69  * difference.
70  */
71 #define IBF_ALPHA 4
72
73
74 /**
75  * Current phase we are in for a union operation.
76  */
77 enum UnionOperationPhase
78 {
79   /**
80    * We sent the request message, and expect a strata estimator.
81    */
82   PHASE_EXPECT_SE,
83
84   /**
85    * We sent the strata estimator, and expect an IBF. This phase is entered once
86    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87    *
88    * XXX: could use better wording.
89    * XXX: repurposed to also expect a "request full set" message, should be renamed
90    *
91    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
92    */
93   PHASE_EXPECT_IBF,
94
95   /**
96    * Continuation for multi part IBFs.
97    */
98   PHASE_EXPECT_IBF_CONT,
99
100   /**
101    * We are decoding an IBF.
102    */
103   PHASE_INVENTORY_ACTIVE,
104
105   /**
106    * The other peer is decoding the IBF we just sent.
107    */
108   PHASE_INVENTORY_PASSIVE,
109
110   /**
111    * The protocol is almost finished, but we still have to flush our message
112    * queue and/or expect some elements.
113    */
114   PHASE_FINISH_CLOSING,
115
116   /**
117    * In the penultimate phase,
118    * we wait until all our demands
119    * are satisfied.  Then we send a done
120    * message, and wait for another done message.
121    */
122   PHASE_FINISH_WAITING,
123
124   /**
125    * In the ultimate phase, we wait until
126    * our demands are satisfied and then
127    * quit (sending another DONE message).
128    */
129   PHASE_DONE,
130
131   /**
132    * After sending the full set, wait for responses with the elements
133    * that the local peer is missing.
134    */
135   PHASE_FULL_SENDING,
136 };
137
138
139 /**
140  * State of an evaluate operation with another peer.
141  */
142 struct OperationState
143 {
144   /**
145    * Copy of the set's strata estimator at the time of
146    * creation of this operation.
147    */
148   struct StrataEstimator *se;
149
150   /**
151    * The IBF we currently receive.
152    */
153   struct InvertibleBloomFilter *remote_ibf;
154
155   /**
156    * The IBF with the local set's element.
157    */
158   struct InvertibleBloomFilter *local_ibf;
159
160   /**
161    * Maps unsalted IBF-Keys to elements.
162    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
163    * Colliding IBF-Keys are linked.
164    */
165   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
166
167   /**
168    * Current state of the operation.
169    */
170   enum UnionOperationPhase phase;
171
172   /**
173    * Did we send the client that we are done?
174    */
175   int client_done_sent;
176
177   /**
178    * Number of ibf buckets already received into the @a remote_ibf.
179    */
180   unsigned int ibf_buckets_received;
181
182   /**
183    * Hashes for elements that we have demanded from the other peer.
184    */
185   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
186
187   /**
188    * Salt that we're using for sending IBFs
189    */
190   uint32_t salt_send;
191
192   /**
193    * Salt for the IBF we've received and that we're currently decoding.
194    */
195   uint32_t salt_receive;
196
197   /**
198    * Number of elements we received from the other peer
199    * that were not in the local set yet.
200    */
201   uint32_t received_fresh;
202
203   /**
204    * Total number of elements received from the other peer.
205    */
206   uint32_t received_total;
207
208   /**
209    * Initial size of our set, just before
210    * the operation started.
211    */
212   uint64_t initial_size;
213 };
214
215
216 /**
217  * The key entry is used to associate an ibf key with an element.
218  */
219 struct KeyEntry
220 {
221   /**
222    * IBF key for the entry, derived from the current salt.
223    */
224   struct IBF_Key ibf_key;
225
226   /**
227    * The actual element associated with the key.
228    *
229    * Only owned by the union operation if element->operation
230    * is #GNUNET_YES.
231    */
232   struct ElementEntry *element;
233
234   /**
235    * Did we receive this element?
236    * Even if element->is_foreign is false, we might
237    * have received the element, so this indicates that
238    * the other peer has it.
239    */
240   int received;
241 };
242
243
244 /**
245  * Used as a closure for sending elements
246  * with a specific IBF key.
247  */
248 struct SendElementClosure
249 {
250   /**
251    * The IBF key whose matching elements should be
252    * sent.
253    */
254   struct IBF_Key ibf_key;
255
256   /**
257    * Operation for which the elements
258    * should be sent.
259    */
260   struct Operation *op;
261 };
262
263
264 /**
265  * Extra state required for efficient set union.
266  */
267 struct SetState
268 {
269   /**
270    * The strata estimator is only generated once for
271    * each set.
272    * The IBF keys are derived from the element hashes with
273    * salt=0.
274    */
275   struct StrataEstimator *se;
276 };
277
278
279 /**
280  * Iterator over hash map entries, called to
281  * destroy the linked list of colliding ibf key entries.
282  *
283  * @param cls closure
284  * @param key current key code
285  * @param value value in the hash map
286  * @return #GNUNET_YES if we should continue to iterate,
287  *         #GNUNET_NO if not.
288  */
289 static int
290 destroy_key_to_element_iter (void *cls,
291                              uint32_t key,
292                              void *value)
293 {
294   struct KeyEntry *k = value;
295
296   GNUNET_assert (NULL != k);
297   if (GNUNET_YES == k->element->remote)
298   {
299     GNUNET_free (k->element);
300     k->element = NULL;
301   }
302   GNUNET_free (k);
303   return GNUNET_YES;
304 }
305
306
307 /**
308  * Destroy the union operation.  Only things specific to the union
309  * operation are destroyed.
310  *
311  * @param op union operation to destroy
312  */
313 static void
314 union_op_cancel (struct Operation *op)
315 {
316   LOG (GNUNET_ERROR_TYPE_DEBUG,
317        "destroying union op\n");
318   /* check if the op was canceled twice */
319   GNUNET_assert (NULL != op->state);
320   if (NULL != op->state->remote_ibf)
321   {
322     ibf_destroy (op->state->remote_ibf);
323     op->state->remote_ibf = NULL;
324   }
325   if (NULL != op->state->demanded_hashes)
326   {
327     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
328     op->state->demanded_hashes = NULL;
329   }
330   if (NULL != op->state->local_ibf)
331   {
332     ibf_destroy (op->state->local_ibf);
333     op->state->local_ibf = NULL;
334   }
335   if (NULL != op->state->se)
336   {
337     strata_estimator_destroy (op->state->se);
338     op->state->se = NULL;
339   }
340   if (NULL != op->state->key_to_element)
341   {
342     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
343                                              &destroy_key_to_element_iter,
344                                              NULL);
345     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
346     op->state->key_to_element = NULL;
347   }
348   GNUNET_free (op->state);
349   op->state = NULL;
350   LOG (GNUNET_ERROR_TYPE_DEBUG,
351        "destroying union op done\n");
352 }
353
354
355 /**
356  * Inform the client that the union operation has failed,
357  * and proceed to destroy the evaluate operation.
358  *
359  * @param op the union operation to fail
360  */
361 static void
362 fail_union_operation (struct Operation *op)
363 {
364   struct GNUNET_MQ_Envelope *ev;
365   struct GNUNET_SET_ResultMessage *msg;
366
367   LOG (GNUNET_ERROR_TYPE_ERROR,
368        "union operation failed\n");
369   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371   msg->request_id = htonl (op->client_request_id);
372   msg->element_type = htons (0);
373   GNUNET_MQ_send (op->set->cs->mq,
374                   ev);
375   _GSS_operation_destroy (op, GNUNET_YES);
376 }
377
378
379 /**
380  * Derive the IBF key from a hash code and
381  * a salt.
382  *
383  * @param src the hash code
384  * @return the derived IBF key
385  */
386 static struct IBF_Key
387 get_ibf_key (const struct GNUNET_HashCode *src)
388 {
389   struct IBF_Key key;
390   uint16_t salt = 0;
391
392   GNUNET_CRYPTO_kdf (&key, sizeof (key),
393                      src, sizeof *src,
394                      &salt, sizeof (salt),
395                      NULL, 0);
396   return key;
397 }
398
399
400 /**
401  * Context for #op_get_element_iterator
402  */
403 struct GetElementContext
404 {
405   /**
406    * FIXME.
407    */
408   struct GNUNET_HashCode hash;
409
410   /**
411    * FIXME.
412    */
413   struct KeyEntry *k;
414 };
415
416
417 /**
418  * Iterator over the mapping from IBF keys to element entries.  Checks if we
419  * have an element with a given GNUNET_HashCode.
420  *
421  * @param cls closure
422  * @param key current key code
423  * @param value value in the hash map
424  * @return #GNUNET_YES if we should search further,
425  *         #GNUNET_NO if we've found the element.
426  */
427 static int
428 op_get_element_iterator (void *cls,
429                          uint32_t key,
430                          void *value)
431 {
432   struct GetElementContext *ctx = cls;
433   struct KeyEntry *k = value;
434
435   GNUNET_assert (NULL != k);
436   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
437                                    &ctx->hash))
438   {
439     ctx->k = k;
440     return GNUNET_NO;
441   }
442   return GNUNET_YES;
443 }
444
445
446 /**
447  * Determine whether the given element is already in the operation's element
448  * set.
449  *
450  * @param op operation that should be tested for 'element_hash'
451  * @param element_hash hash of the element to look for
452  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
453  */
454 static struct KeyEntry *
455 op_get_element (struct Operation *op,
456                 const struct GNUNET_HashCode *element_hash)
457 {
458   int ret;
459   struct IBF_Key ibf_key;
460   struct GetElementContext ctx = {{{ 0 }} , 0};
461
462   ctx.hash = *element_hash;
463
464   ibf_key = get_ibf_key (element_hash);
465   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
466                                                       (uint32_t) ibf_key.key_val,
467                                                       op_get_element_iterator,
468                                                       &ctx);
469
470   /* was the iteration aborted because we found the element? */
471   if (GNUNET_SYSERR == ret)
472   {
473     GNUNET_assert (NULL != ctx.k);
474     return ctx.k;
475   }
476   return NULL;
477 }
478
479
480 /**
481  * Insert an element into the union operation's
482  * key-to-element mapping. Takes ownership of 'ee'.
483  * Note that this does not insert the element in the set,
484  * only in the operation's key-element mapping.
485  * This is done to speed up re-tried operations, if some elements
486  * were transmitted, and then the IBF fails to decode.
487  *
488  * XXX: clarify ownership, doesn't sound right.
489  *
490  * @param op the union operation
491  * @param ee the element entry
492  * @parem received was this element received from the remote peer?
493  */
494 static void
495 op_register_element (struct Operation *op,
496                      struct ElementEntry *ee,
497                      int received)
498 {
499   struct IBF_Key ibf_key;
500   struct KeyEntry *k;
501
502   ibf_key = get_ibf_key (&ee->element_hash);
503   k = GNUNET_new (struct KeyEntry);
504   k->element = ee;
505   k->ibf_key = ibf_key;
506   k->received = received;
507   GNUNET_assert (GNUNET_OK ==
508                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
509                                                       (uint32_t) ibf_key.key_val,
510                                                       k,
511                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
512 }
513
514
515 /**
516  * FIXME.
517  */
518 static void
519 salt_key (const struct IBF_Key *k_in,
520           uint32_t salt,
521           struct IBF_Key *k_out)
522 {
523   int s = salt % 64;
524   uint64_t x = k_in->key_val;
525   /* rotate ibf key */
526   x = (x >> s) | (x << (64 - s));
527   k_out->key_val = x;
528 }
529
530
531 /**
532  * FIXME.
533  */
534 static void
535 unsalt_key (const struct IBF_Key *k_in,
536             uint32_t salt,
537             struct IBF_Key *k_out)
538 {
539   int s = salt % 64;
540   uint64_t x = k_in->key_val;
541   x = (x << s) | (x >> (64 - s));
542   k_out->key_val = x;
543 }
544
545
546 /**
547  * Insert a key into an ibf.
548  *
549  * @param cls the ibf
550  * @param key unused
551  * @param value the key entry to get the key from
552  */
553 static int
554 prepare_ibf_iterator (void *cls,
555                       uint32_t key,
556                       void *value)
557 {
558   struct Operation *op = cls;
559   struct KeyEntry *ke = value;
560   struct IBF_Key salted_key;
561
562   LOG (GNUNET_ERROR_TYPE_DEBUG,
563        "[OP %x] inserting %lx (hash %s) into ibf\n",
564        (void *) op,
565        (unsigned long) ke->ibf_key.key_val,
566        GNUNET_h2s (&ke->element->element_hash));
567   salt_key (&ke->ibf_key,
568             op->state->salt_send,
569             &salted_key);
570   ibf_insert (op->state->local_ibf, salted_key);
571   return GNUNET_YES;
572 }
573
574
575 /**
576  * Iterator for initializing the
577  * key-to-element mapping of a union operation
578  *
579  * @param cls the union operation `struct Operation *`
580  * @param key unused
581  * @param value the `struct ElementEntry *` to insert
582  *        into the key-to-element mapping
583  * @return #GNUNET_YES (to continue iterating)
584  */
585 static int
586 init_key_to_element_iterator (void *cls,
587                               const struct GNUNET_HashCode *key,
588                               void *value)
589 {
590   struct Operation *op = cls;
591   struct ElementEntry *ee = value;
592
593   /* make sure that the element belongs to the set at the time
594    * of creating the operation */
595   if (GNUNET_NO ==
596       _GSS_is_element_of_operation (ee,
597                                     op))
598     return GNUNET_YES;
599   GNUNET_assert (GNUNET_NO == ee->remote);
600   op_register_element (op,
601                        ee,
602                        GNUNET_NO);
603   return GNUNET_YES;
604 }
605
606
607 /**
608  * Initialize the IBF key to element mapping local to this set
609  * operation.
610  *
611  * @param op the set union operation
612  */
613 static void
614 initialize_key_to_element (struct Operation *op)
615 {
616   unsigned int len;
617
618   GNUNET_assert (NULL == op->state->key_to_element);
619   len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
620   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
621   GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
622                                          &init_key_to_element_iterator,
623                                          op);
624 }
625
626
627 /**
628  * Create an ibf with the operation's elements
629  * of the specified size
630  *
631  * @param op the union operation
632  * @param size size of the ibf to create
633  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
634  */
635 static int
636 prepare_ibf (struct Operation *op,
637              uint32_t size)
638 {
639   GNUNET_assert (NULL != op->state->key_to_element);
640
641   if (NULL != op->state->local_ibf)
642     ibf_destroy (op->state->local_ibf);
643   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
644   if (NULL == op->state->local_ibf)
645   {
646     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
647                 "Failed to allocate local IBF\n");
648     return GNUNET_SYSERR;
649   }
650   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
651                                            &prepare_ibf_iterator,
652                                            op);
653   return GNUNET_OK;
654 }
655
656
657 /**
658  * Send an ibf of appropriate size.
659  *
660  * Fragments the IBF into multiple messages if necessary.
661  *
662  * @param op the union operation
663  * @param ibf_order order of the ibf to send, size=2^order
664  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
665  */
666 static int
667 send_ibf (struct Operation *op,
668           uint16_t ibf_order)
669 {
670   unsigned int buckets_sent = 0;
671   struct InvertibleBloomFilter *ibf;
672
673   if (GNUNET_OK !=
674       prepare_ibf (op, 1<<ibf_order))
675   {
676     /* allocation failed */
677     return GNUNET_SYSERR;
678   }
679
680   LOG (GNUNET_ERROR_TYPE_DEBUG,
681        "sending ibf of size %u\n",
682        1<<ibf_order);
683
684   {
685     char name[64] = { 0 };
686     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
687     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
688   }
689
690   ibf = op->state->local_ibf;
691
692   while (buckets_sent < (1 << ibf_order))
693   {
694     unsigned int buckets_in_message;
695     struct GNUNET_MQ_Envelope *ev;
696     struct IBFMessage *msg;
697
698     buckets_in_message = (1 << ibf_order) - buckets_sent;
699     /* limit to maximum */
700     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
701       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
702
703     ev = GNUNET_MQ_msg_extra (msg,
704                               buckets_in_message * IBF_BUCKET_SIZE,
705                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
706     msg->reserved1 = 0;
707     msg->reserved2 = 0;
708     msg->order = ibf_order;
709     msg->offset = htonl (buckets_sent);
710     msg->salt = htonl (op->state->salt_send);
711     ibf_write_slice (ibf, buckets_sent,
712                      buckets_in_message, &msg[1]);
713     buckets_sent += buckets_in_message;
714     LOG (GNUNET_ERROR_TYPE_DEBUG,
715          "ibf chunk size %u, %u/%u sent\n",
716          buckets_in_message,
717          buckets_sent,
718          1<<ibf_order);
719     GNUNET_MQ_send (op->mq, ev);
720   }
721
722   /* The other peer must decode the IBF, so
723    * we're passive. */
724   op->state->phase = PHASE_INVENTORY_PASSIVE;
725   return GNUNET_OK;
726 }
727
728
729 /**
730  * Compute the necessary order of an ibf
731  * from the size of the symmetric set difference.
732  *
733  * @param diff the difference
734  * @return the required size of the ibf
735  */
736 static unsigned int
737 get_order_from_difference (unsigned int diff)
738 {
739   unsigned int ibf_order;
740
741   ibf_order = 2;
742   while ( ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
743             ((1<<ibf_order) < SE_IBF_HASH_NUM) ) &&
744           (ibf_order < MAX_IBF_ORDER) )
745     ibf_order++;
746   // add one for correction
747   return ibf_order + 1;
748 }
749
750
751 /**
752  * Send a set element.
753  *
754  * @param cls the union operation `struct Operation *`
755  * @param key unused
756  * @param value the `struct ElementEntry *` to insert
757  *        into the key-to-element mapping
758  * @return #GNUNET_YES (to continue iterating)
759  */
760 static int
761 send_full_element_iterator (void *cls,
762                        const struct GNUNET_HashCode *key,
763                        void *value)
764 {
765   struct Operation *op = cls;
766   struct GNUNET_SET_ElementMessage *emsg;
767   struct ElementEntry *ee = value;
768   struct GNUNET_SET_Element *el = &ee->element;
769   struct GNUNET_MQ_Envelope *ev;
770
771   LOG (GNUNET_ERROR_TYPE_DEBUG,
772        "Sending element %s\n",
773        GNUNET_h2s (key));
774   ev = GNUNET_MQ_msg_extra (emsg,
775                             el->size,
776                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
777   emsg->element_type = htons (el->element_type);
778   GNUNET_memcpy (&emsg[1],
779                  el->data,
780                  el->size);
781   GNUNET_MQ_send (op->mq,
782                   ev);
783   return GNUNET_YES;
784 }
785
786
787 /**
788  * Switch to full set transmission for @a op.
789  *
790  * @param op operation to switch to full set transmission.
791  */
792 static void
793 send_full_set (struct Operation *op)
794 {
795   struct GNUNET_MQ_Envelope *ev;
796
797   op->state->phase = PHASE_FULL_SENDING;
798   LOG (GNUNET_ERROR_TYPE_DEBUG,
799        "Dedicing to transmit the full set\n");
800   /* FIXME: use a more memory-friendly way of doing this with an
801      iterator, just as we do in the non-full case! */
802   (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
803                                                 &send_full_element_iterator,
804                                                 op);
805   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
806   GNUNET_MQ_send (op->mq,
807                   ev);
808 }
809
810
811 /**
812  * Handle a strata estimator from a remote peer
813  *
814  * @param cls the union operation
815  * @param msg the message
816  */
817 int
818 check_union_p2p_strata_estimator (void *cls,
819                                   const struct StrataEstimatorMessage *msg)
820 {
821   struct Operation *op = cls;
822   int is_compressed;
823   size_t len;
824
825   if (op->state->phase != PHASE_EXPECT_SE)
826   {
827     GNUNET_break (0);
828     return GNUNET_SYSERR;
829   }
830   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
831   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
832   if ( (GNUNET_NO == is_compressed) &&
833        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
834   {
835     GNUNET_break (0);
836     return GNUNET_SYSERR;
837   }
838   return GNUNET_OK;
839 }
840
841
842 /**
843  * Handle a strata estimator from a remote peer
844  *
845  * @param cls the union operation
846  * @param msg the message
847  */
848 void
849 handle_union_p2p_strata_estimator (void *cls,
850                                    const struct StrataEstimatorMessage *msg)
851 {
852   struct Operation *op = cls;
853   struct StrataEstimator *remote_se;
854   unsigned int diff;
855   uint64_t other_size;
856   size_t len;
857   int is_compressed;
858
859   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
860   GNUNET_STATISTICS_update (_GSS_statistics,
861                             "# bytes of SE received",
862                             ntohs (msg->header.size),
863                             GNUNET_NO);
864   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
865   other_size = GNUNET_ntohll (msg->set_size);
866   remote_se = strata_estimator_create (SE_STRATA_COUNT,
867                                        SE_IBF_SIZE,
868                                        SE_IBF_HASH_NUM);
869   if (NULL == remote_se)
870   {
871     /* insufficient resources, fail */
872     fail_union_operation (op);
873     return;
874   }
875   if (GNUNET_OK !=
876       strata_estimator_read (&msg[1],
877                              len,
878                              is_compressed,
879                              remote_se))
880   {
881     /* decompression failed */
882     strata_estimator_destroy (remote_se);
883     fail_union_operation (op);
884     return;
885   }
886   GNUNET_assert (NULL != op->state->se);
887   diff = strata_estimator_difference (remote_se,
888                                       op->state->se);
889
890   if (diff > 200)
891     diff = diff * 3 / 2;
892
893   strata_estimator_destroy (remote_se);
894   strata_estimator_destroy (op->state->se);
895   op->state->se = NULL;
896   LOG (GNUNET_ERROR_TYPE_DEBUG,
897        "got se diff=%d, using ibf size %d\n",
898        diff,
899        1U << get_order_from_difference (diff));
900
901   {
902     char *set_debug;
903
904     set_debug = getenv ("GNUNET_SET_BENCHMARK");
905     if ( (NULL != set_debug) &&
906          (0 == strcmp (set_debug, "1")) )
907     {
908       FILE *f = fopen ("set.log", "a");
909       fprintf (f, "%llu\n", (unsigned long long) diff);
910       fclose (f);
911     }
912   }
913
914   if ( (GNUNET_YES == op->byzantine) &&
915        (other_size < op->byzantine_lower_bound) )
916   {
917     GNUNET_break (0);
918     fail_union_operation (op);
919     return;
920   }
921
922   if ( (GNUNET_YES == op->force_full) ||
923        (diff > op->state->initial_size / 4) ||
924        (0 == other_size) )
925   {
926     LOG (GNUNET_ERROR_TYPE_DEBUG,
927          "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
928          diff,
929          op->state->initial_size);
930     GNUNET_STATISTICS_update (_GSS_statistics,
931                               "# of full sends",
932                               1,
933                               GNUNET_NO);
934     if ( (op->state->initial_size <= other_size) ||
935          (0 == other_size) )
936     {
937       send_full_set (op);
938     }
939     else
940     {
941       struct GNUNET_MQ_Envelope *ev;
942
943       LOG (GNUNET_ERROR_TYPE_DEBUG,
944            "Telling other peer that we expect its full set\n");
945       op->state->phase = PHASE_EXPECT_IBF;
946       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
947       GNUNET_MQ_send (op->mq,
948                       ev);
949     }
950   }
951   else
952   {
953     GNUNET_STATISTICS_update (_GSS_statistics,
954                               "# of ibf sends",
955                               1,
956                               GNUNET_NO);
957     if (GNUNET_OK !=
958         send_ibf (op,
959                   get_order_from_difference (diff)))
960     {
961       /* Internal error, best we can do is shut the connection */
962       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
963                   "Failed to send IBF, closing connection\n");
964       fail_union_operation (op);
965       return;
966     }
967   }
968   GNUNET_CADET_receive_done (op->channel);
969 }
970
971
972 /**
973  * Iterator to send elements to a remote peer
974  *
975  * @param cls closure with the element key and the union operation
976  * @param key ignored
977  * @param value the key entry
978  */
979 static int
980 send_offers_iterator (void *cls,
981                       uint32_t key,
982                       void *value)
983 {
984   struct SendElementClosure *sec = cls;
985   struct Operation *op = sec->op;
986   struct KeyEntry *ke = value;
987   struct GNUNET_MQ_Envelope *ev;
988   struct GNUNET_MessageHeader *mh;
989
990   /* Detect 32-bit key collision for the 64-bit IBF keys. */
991   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
992     return GNUNET_YES;
993
994   ev = GNUNET_MQ_msg_header_extra (mh,
995                                    sizeof (struct GNUNET_HashCode),
996                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
997
998   GNUNET_assert (NULL != ev);
999   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1000   LOG (GNUNET_ERROR_TYPE_DEBUG,
1001        "[OP %x] sending element offer (%s) to peer\n",
1002        (void *) op,
1003        GNUNET_h2s (&ke->element->element_hash));
1004   GNUNET_MQ_send (op->mq, ev);
1005   return GNUNET_YES;
1006 }
1007
1008
1009 /**
1010  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1011  *
1012  * @param op union operation
1013  * @param ibf_key IBF key of interest
1014  */
1015 static void
1016 send_offers_for_key (struct Operation *op,
1017                      struct IBF_Key ibf_key)
1018 {
1019   struct SendElementClosure send_cls;
1020
1021   send_cls.ibf_key = ibf_key;
1022   send_cls.op = op;
1023   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
1024                                                        (uint32_t) ibf_key.key_val,
1025                                                        &send_offers_iterator,
1026                                                        &send_cls);
1027 }
1028
1029
1030 /**
1031  * Decode which elements are missing on each side, and
1032  * send the appropriate offers and inquiries.
1033  *
1034  * @param op union operation
1035  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1036  */
1037 static int
1038 decode_and_send (struct Operation *op)
1039 {
1040   struct IBF_Key key;
1041   struct IBF_Key last_key;
1042   int side;
1043   unsigned int num_decoded;
1044   struct InvertibleBloomFilter *diff_ibf;
1045
1046   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1047
1048   if (GNUNET_OK !=
1049       prepare_ibf (op,
1050                    op->state->remote_ibf->size))
1051   {
1052     GNUNET_break (0);
1053     /* allocation failed */
1054     return GNUNET_SYSERR;
1055   }
1056   diff_ibf = ibf_dup (op->state->local_ibf);
1057   ibf_subtract (diff_ibf,
1058                 op->state->remote_ibf);
1059
1060   ibf_destroy (op->state->remote_ibf);
1061   op->state->remote_ibf = NULL;
1062
1063   LOG (GNUNET_ERROR_TYPE_DEBUG,
1064        "decoding IBF (size=%u)\n",
1065        diff_ibf->size);
1066
1067   num_decoded = 0;
1068   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1069
1070   while (1)
1071   {
1072     int res;
1073     int cycle_detected = GNUNET_NO;
1074
1075     last_key = key;
1076
1077     res = ibf_decode (diff_ibf, &side, &key);
1078     if (res == GNUNET_OK)
1079     {
1080       LOG (GNUNET_ERROR_TYPE_DEBUG,
1081            "decoded ibf key %lx\n",
1082            (unsigned long) key.key_val);
1083       num_decoded += 1;
1084       if ( (num_decoded > diff_ibf->size) ||
1085            ( (num_decoded > 1) &&
1086              (last_key.key_val == key.key_val) ) )
1087       {
1088         LOG (GNUNET_ERROR_TYPE_DEBUG,
1089              "detected cyclic ibf (decoded %u/%u)\n",
1090              num_decoded,
1091              diff_ibf->size);
1092         cycle_detected = GNUNET_YES;
1093       }
1094     }
1095     if ( (GNUNET_SYSERR == res) ||
1096          (GNUNET_YES == cycle_detected) )
1097     {
1098       int next_order;
1099       next_order = 0;
1100       while (1<<next_order < diff_ibf->size)
1101         next_order++;
1102       next_order++;
1103       if (next_order <= MAX_IBF_ORDER)
1104       {
1105         LOG (GNUNET_ERROR_TYPE_DEBUG,
1106              "decoding failed, sending larger ibf (size %u)\n",
1107              1<<next_order);
1108         GNUNET_STATISTICS_update (_GSS_statistics,
1109                                   "# of IBF retries",
1110                                   1,
1111                                   GNUNET_NO);
1112         op->state->salt_send++;
1113         if (GNUNET_OK !=
1114             send_ibf (op, next_order))
1115         {
1116           /* Internal error, best we can do is shut the connection */
1117           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1118                       "Failed to send IBF, closing connection\n");
1119           fail_union_operation (op);
1120           ibf_destroy (diff_ibf);
1121           return GNUNET_SYSERR;
1122         }
1123       }
1124       else
1125       {
1126         GNUNET_STATISTICS_update (_GSS_statistics,
1127                                   "# of failed union operations (too large)",
1128                                   1,
1129                                   GNUNET_NO);
1130         // XXX: Send the whole set, element-by-element
1131         LOG (GNUNET_ERROR_TYPE_ERROR,
1132              "set union failed: reached ibf limit\n");
1133         fail_union_operation (op);
1134         ibf_destroy (diff_ibf);
1135         return GNUNET_SYSERR;
1136       }
1137       break;
1138     }
1139     if (GNUNET_NO == res)
1140     {
1141       struct GNUNET_MQ_Envelope *ev;
1142
1143       LOG (GNUNET_ERROR_TYPE_DEBUG,
1144            "transmitted all values, sending DONE\n");
1145       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1146       GNUNET_MQ_send (op->mq, ev);
1147       /* We now wait until we get a DONE message back
1148        * and then wait for our MQ to be flushed and all our
1149        * demands be delivered. */
1150       break;
1151     }
1152     if (1 == side)
1153     {
1154       struct IBF_Key unsalted_key;
1155
1156       unsalt_key (&key,
1157                   op->state->salt_receive,
1158                   &unsalted_key);
1159       send_offers_for_key (op,
1160                            unsalted_key);
1161     }
1162     else if (-1 == side)
1163     {
1164       struct GNUNET_MQ_Envelope *ev;
1165       struct InquiryMessage *msg;
1166
1167       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1168        * the effort additional complexity. */
1169       ev = GNUNET_MQ_msg_extra (msg,
1170                                 sizeof (struct IBF_Key),
1171                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1172       msg->salt = htonl (op->state->salt_receive);
1173       GNUNET_memcpy (&msg[1],
1174               &key,
1175               sizeof (struct IBF_Key));
1176       LOG (GNUNET_ERROR_TYPE_DEBUG,
1177            "sending element inquiry for IBF key %lx\n",
1178            (unsigned long) key.key_val);
1179       GNUNET_MQ_send (op->mq, ev);
1180     }
1181     else
1182     {
1183       GNUNET_assert (0);
1184     }
1185   }
1186   ibf_destroy (diff_ibf);
1187   return GNUNET_OK;
1188 }
1189
1190
1191 /**
1192  * Check an IBF message from a remote peer.
1193  *
1194  * Reassemble the IBF from multiple pieces, and
1195  * process the whole IBF once possible.
1196  *
1197  * @param cls the union operation
1198  * @param msg the header of the message
1199  * @return #GNUNET_OK if @a msg is well-formed
1200  */
1201 int
1202 check_union_p2p_ibf (void *cls,
1203                      const struct IBFMessage *msg)
1204 {
1205   struct Operation *op = cls;
1206   unsigned int buckets_in_message;
1207
1208   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1209   {
1210     GNUNET_break_op (0);
1211     return GNUNET_SYSERR;
1212   }
1213   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1214   if (0 == buckets_in_message)
1215   {
1216     GNUNET_break_op (0);
1217     return GNUNET_SYSERR;
1218   }
1219   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1220   {
1221     GNUNET_break_op (0);
1222     return GNUNET_SYSERR;
1223   }
1224   if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1225   {
1226     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1227     {
1228       GNUNET_break_op (0);
1229       return GNUNET_SYSERR;
1230     }
1231     if (1<<msg->order != op->state->remote_ibf->size)
1232     {
1233       GNUNET_break_op (0);
1234       return GNUNET_SYSERR;
1235     }
1236     if (ntohl (msg->salt) != op->state->salt_receive)
1237     {
1238       GNUNET_break_op (0);
1239       return GNUNET_SYSERR;
1240     }
1241   }
1242   else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1243             (op->state->phase != PHASE_EXPECT_IBF) )
1244   {
1245     GNUNET_break_op (0);
1246     return GNUNET_SYSERR;
1247   }
1248
1249   return GNUNET_OK;
1250 }
1251
1252
1253 /**
1254  * Handle an IBF message from a remote peer.
1255  *
1256  * Reassemble the IBF from multiple pieces, and
1257  * process the whole IBF once possible.
1258  *
1259  * @param cls the union operation
1260  * @param msg the header of the message
1261  */
1262 void
1263 handle_union_p2p_ibf (void *cls,
1264                       const struct IBFMessage *msg)
1265 {
1266   struct Operation *op = cls;
1267   unsigned int buckets_in_message;
1268
1269   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1270   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1271        (op->state->phase == PHASE_EXPECT_IBF) )
1272   {
1273     op->state->phase = PHASE_EXPECT_IBF_CONT;
1274     GNUNET_assert (NULL == op->state->remote_ibf);
1275     LOG (GNUNET_ERROR_TYPE_DEBUG,
1276          "Creating new ibf of size %u\n",
1277          1 << msg->order);
1278     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1279     op->state->salt_receive = ntohl (msg->salt);
1280     LOG (GNUNET_ERROR_TYPE_DEBUG,
1281          "Receiving new IBF with salt %u\n",
1282          op->state->salt_receive);
1283     if (NULL == op->state->remote_ibf)
1284     {
1285       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1286                   "Failed to parse remote IBF, closing connection\n");
1287       fail_union_operation (op);
1288       return;
1289     }
1290     op->state->ibf_buckets_received = 0;
1291     if (0 != ntohl (msg->offset))
1292     {
1293       GNUNET_break_op (0);
1294       fail_union_operation (op);
1295       return;
1296     }
1297   }
1298   else
1299   {
1300     GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1301     LOG (GNUNET_ERROR_TYPE_DEBUG,
1302          "Received more of IBF\n");
1303   }
1304   GNUNET_assert (NULL != op->state->remote_ibf);
1305
1306   ibf_read_slice (&msg[1],
1307                   op->state->ibf_buckets_received,
1308                   buckets_in_message,
1309                   op->state->remote_ibf);
1310   op->state->ibf_buckets_received += buckets_in_message;
1311
1312   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1313   {
1314     LOG (GNUNET_ERROR_TYPE_DEBUG,
1315          "received full ibf\n");
1316     op->state->phase = PHASE_INVENTORY_ACTIVE;
1317     if (GNUNET_OK !=
1318         decode_and_send (op))
1319     {
1320       /* Internal error, best we can do is shut down */
1321       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1322                   "Failed to decode IBF, closing connection\n");
1323       fail_union_operation (op);
1324       return;
1325     }
1326   }
1327   GNUNET_CADET_receive_done (op->channel);
1328 }
1329
1330
1331 /**
1332  * Send a result message to the client indicating
1333  * that there is a new element.
1334  *
1335  * @param op union operation
1336  * @param element element to send
1337  * @param status status to send with the new element
1338  */
1339 static void
1340 send_client_element (struct Operation *op,
1341                      struct GNUNET_SET_Element *element,
1342                      int status)
1343 {
1344   struct GNUNET_MQ_Envelope *ev;
1345   struct GNUNET_SET_ResultMessage *rm;
1346
1347   LOG (GNUNET_ERROR_TYPE_DEBUG,
1348        "sending element (size %u) to client\n",
1349        element->size);
1350   GNUNET_assert (0 != op->client_request_id);
1351   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1352   if (NULL == ev)
1353   {
1354     GNUNET_MQ_discard (ev);
1355     GNUNET_break (0);
1356     return;
1357   }
1358   rm->result_status = htons (status);
1359   rm->request_id = htonl (op->client_request_id);
1360   rm->element_type = htons (element->element_type);
1361   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1362   GNUNET_memcpy (&rm[1],
1363                  element->data,
1364                  element->size);
1365   GNUNET_MQ_send (op->set->cs->mq,
1366                   ev);
1367 }
1368
1369
1370 /**
1371  * Destroy remote channel.
1372  *
1373  * @param op operation
1374  */
1375 void destroy_channel (struct Operation *op)
1376 {
1377   struct GNUNET_CADET_Channel *channel;
1378
1379   if (NULL != (channel = op->channel))
1380   {
1381     /* This will free op; called conditionally as this helper function
1382        is also called from within the channel disconnect handler. */
1383     op->channel = NULL;
1384     GNUNET_CADET_channel_destroy (channel);
1385   }
1386 }
1387
1388
1389 /**
1390  * Signal to the client that the operation has finished and
1391  * destroy the operation.
1392  *
1393  * @param cls operation to destroy
1394  */
1395 static void
1396 send_client_done (void *cls)
1397 {
1398   struct Operation *op = cls;
1399   struct GNUNET_MQ_Envelope *ev;
1400   struct GNUNET_SET_ResultMessage *rm;
1401
1402   if (GNUNET_YES == op->state->client_done_sent) {
1403     return;
1404   }
1405
1406   if (PHASE_DONE != op->state->phase) {
1407     LOG (GNUNET_ERROR_TYPE_ERROR,
1408          "union operation failed\n");
1409     ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1410     rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1411     rm->request_id = htonl (op->client_request_id);
1412     rm->element_type = htons (0);
1413     GNUNET_MQ_send (op->set->cs->mq,
1414                     ev);
1415     return;
1416   }
1417
1418   op->state->client_done_sent = GNUNET_YES;
1419
1420   LOG (GNUNET_ERROR_TYPE_INFO,
1421        "Signalling client that union operation is done\n");
1422   ev = GNUNET_MQ_msg (rm,
1423                       GNUNET_MESSAGE_TYPE_SET_RESULT);
1424   rm->request_id = htonl (op->client_request_id);
1425   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1426   rm->element_type = htons (0);
1427   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1428   GNUNET_MQ_send (op->set->cs->mq,
1429                   ev);
1430 }
1431
1432
1433 /**
1434  * Tests if the operation is finished, and if so notify.
1435  *
1436  * @param op operation to check
1437  */
1438 static void
1439 maybe_finish (struct Operation *op)
1440 {
1441   unsigned int num_demanded;
1442
1443   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1444
1445   if (PHASE_FINISH_WAITING == op->state->phase)
1446   {
1447     LOG (GNUNET_ERROR_TYPE_DEBUG,
1448          "In PHASE_FINISH_WAITING, pending %u demands\n",
1449          num_demanded);
1450     if (0 == num_demanded)
1451     {
1452       struct GNUNET_MQ_Envelope *ev;
1453
1454       op->state->phase = PHASE_DONE;
1455       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1456       GNUNET_MQ_send (op->mq,
1457                       ev);
1458       /* We now wait until the other peer sends P2P_OVER
1459        * after it got all elements from us. */
1460     }
1461   }
1462   if (PHASE_FINISH_CLOSING == op->state->phase)
1463   {
1464     LOG (GNUNET_ERROR_TYPE_DEBUG,
1465          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1466          num_demanded);
1467     if (0 == num_demanded)
1468     {
1469       op->state->phase = PHASE_DONE;
1470       send_client_done (op);
1471       destroy_channel (op);
1472     }
1473   }
1474 }
1475
1476
1477 /**
1478  * Check an element message from a remote peer.
1479  *
1480  * @param cls the union operation
1481  * @param emsg the message
1482  */
1483 int
1484 check_union_p2p_elements (void *cls,
1485                           const struct GNUNET_SET_ElementMessage *emsg)
1486 {
1487   struct Operation *op = cls;
1488
1489   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1490   {
1491     GNUNET_break_op (0);
1492     return GNUNET_SYSERR;
1493   }
1494   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1495   {
1496     GNUNET_break_op (0);
1497     return GNUNET_SYSERR;
1498   }
1499   return GNUNET_OK;
1500 }
1501
1502
1503 /**
1504  * Handle an element message from a remote peer.
1505  * Sent by the other peer either because we decoded an IBF and placed a demand,
1506  * or because the other peer switched to full set transmission.
1507  *
1508  * @param cls the union operation
1509  * @param emsg the message
1510  */
1511 void
1512 handle_union_p2p_elements (void *cls,
1513                            const struct GNUNET_SET_ElementMessage *emsg)
1514 {
1515   struct Operation *op = cls;
1516   struct ElementEntry *ee;
1517   struct KeyEntry *ke;
1518   uint16_t element_size;
1519
1520   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1521   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1522   GNUNET_memcpy (&ee[1],
1523                  &emsg[1],
1524                  element_size);
1525   ee->element.size = element_size;
1526   ee->element.data = &ee[1];
1527   ee->element.element_type = ntohs (emsg->element_type);
1528   ee->remote = GNUNET_YES;
1529   GNUNET_SET_element_hash (&ee->element,
1530                            &ee->element_hash);
1531   if (GNUNET_NO ==
1532       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1533                                             &ee->element_hash,
1534                                             NULL))
1535   {
1536     /* We got something we didn't demand, since it's not in our map. */
1537     GNUNET_break_op (0);
1538     fail_union_operation (op);
1539     return;
1540   }
1541
1542   LOG (GNUNET_ERROR_TYPE_DEBUG,
1543        "Got element (size %u, hash %s) from peer\n",
1544        (unsigned int) element_size,
1545        GNUNET_h2s (&ee->element_hash));
1546
1547   GNUNET_STATISTICS_update (_GSS_statistics,
1548                             "# received elements",
1549                             1,
1550                             GNUNET_NO);
1551   GNUNET_STATISTICS_update (_GSS_statistics,
1552                             "# exchanged elements",
1553                             1,
1554                             GNUNET_NO);
1555
1556   op->state->received_total++;
1557
1558   ke = op_get_element (op, &ee->element_hash);
1559   if (NULL != ke)
1560   {
1561     /* Got repeated element.  Should not happen since
1562      * we track demands. */
1563     GNUNET_STATISTICS_update (_GSS_statistics,
1564                               "# repeated elements",
1565                               1,
1566                               GNUNET_NO);
1567     ke->received = GNUNET_YES;
1568     GNUNET_free (ee);
1569   }
1570   else
1571   {
1572     LOG (GNUNET_ERROR_TYPE_DEBUG,
1573          "Registering new element from remote peer\n");
1574     op->state->received_fresh++;
1575     op_register_element (op, ee, GNUNET_YES);
1576     /* only send results immediately if the client wants it */
1577     switch (op->result_mode)
1578     {
1579       case GNUNET_SET_RESULT_ADDED:
1580         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1581         break;
1582       case GNUNET_SET_RESULT_SYMMETRIC:
1583         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1584         break;
1585       default:
1586         /* Result mode not supported, should have been caught earlier. */
1587         GNUNET_break (0);
1588         break;
1589     }
1590   }
1591
1592   if ( (op->state->received_total > 8) &&
1593        (op->state->received_fresh < op->state->received_total / 3) )
1594   {
1595     /* The other peer gave us lots of old elements, there's something wrong. */
1596     GNUNET_break_op (0);
1597     fail_union_operation (op);
1598     return;
1599   }
1600   GNUNET_CADET_receive_done (op->channel);
1601   maybe_finish (op);
1602 }
1603
1604
1605 /**
1606  * Check a full element message from a remote peer.
1607  *
1608  * @param cls the union operation
1609  * @param emsg the message
1610  */
1611 int
1612 check_union_p2p_full_element (void *cls,
1613                               const struct GNUNET_SET_ElementMessage *emsg)
1614 {
1615   struct Operation *op = cls;
1616
1617   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1618   {
1619     GNUNET_break_op (0);
1620     return GNUNET_SYSERR;
1621   }
1622   // FIXME: check that we expect full elements here?
1623   return GNUNET_OK;
1624 }
1625
1626
1627 /**
1628  * Handle an element message from a remote peer.
1629  *
1630  * @param cls the union operation
1631  * @param emsg the message
1632  */
1633 void
1634 handle_union_p2p_full_element (void *cls,
1635                                const struct GNUNET_SET_ElementMessage *emsg)
1636 {
1637   struct Operation *op = cls;
1638   struct ElementEntry *ee;
1639   struct KeyEntry *ke;
1640   uint16_t element_size;
1641
1642   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1643   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1644   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1645   ee->element.size = element_size;
1646   ee->element.data = &ee[1];
1647   ee->element.element_type = ntohs (emsg->element_type);
1648   ee->remote = GNUNET_YES;
1649   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1650
1651   LOG (GNUNET_ERROR_TYPE_DEBUG,
1652        "Got element (full diff, size %u, hash %s) from peer\n",
1653        (unsigned int) element_size,
1654        GNUNET_h2s (&ee->element_hash));
1655
1656   GNUNET_STATISTICS_update (_GSS_statistics,
1657                             "# received elements",
1658                             1,
1659                             GNUNET_NO);
1660   GNUNET_STATISTICS_update (_GSS_statistics,
1661                             "# exchanged elements",
1662                             1,
1663                             GNUNET_NO);
1664
1665   op->state->received_total++;
1666
1667   ke = op_get_element (op, &ee->element_hash);
1668   if (NULL != ke)
1669   {
1670     /* Got repeated element.  Should not happen since
1671      * we track demands. */
1672     GNUNET_STATISTICS_update (_GSS_statistics,
1673                               "# repeated elements",
1674                               1,
1675                               GNUNET_NO);
1676     ke->received = GNUNET_YES;
1677     GNUNET_free (ee);
1678   }
1679   else
1680   {
1681     LOG (GNUNET_ERROR_TYPE_DEBUG,
1682          "Registering new element from remote peer\n");
1683     op->state->received_fresh++;
1684     op_register_element (op, ee, GNUNET_YES);
1685     /* only send results immediately if the client wants it */
1686     switch (op->result_mode)
1687     {
1688       case GNUNET_SET_RESULT_ADDED:
1689         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1690         break;
1691       case GNUNET_SET_RESULT_SYMMETRIC:
1692         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1693         break;
1694       default:
1695         /* Result mode not supported, should have been caught earlier. */
1696         GNUNET_break (0);
1697         break;
1698     }
1699   }
1700
1701   if ( (GNUNET_YES == op->byzantine) &&
1702        (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1703        (op->state->received_fresh < op->state->received_total / 6) )
1704   {
1705     /* The other peer gave us lots of old elements, there's something wrong. */
1706     LOG (GNUNET_ERROR_TYPE_ERROR,
1707          "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1708          (unsigned long long) op->state->received_fresh,
1709          (unsigned long long) op->state->received_total);
1710     GNUNET_break_op (0);
1711     fail_union_operation (op);
1712     return;
1713   }
1714   GNUNET_CADET_receive_done (op->channel);
1715 }
1716
1717
1718 /**
1719  * Send offers (for GNUNET_Hash-es) in response
1720  * to inquiries (for IBF_Key-s).
1721  *
1722  * @param cls the union operation
1723  * @param msg the message
1724  */
1725 int
1726 check_union_p2p_inquiry (void *cls,
1727                          const struct InquiryMessage *msg)
1728 {
1729   struct Operation *op = cls;
1730   unsigned int num_keys;
1731
1732   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1733   {
1734     GNUNET_break_op (0);
1735     return GNUNET_SYSERR;
1736   }
1737   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1738   {
1739     GNUNET_break_op (0);
1740     return GNUNET_SYSERR;
1741   }
1742   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1743     / sizeof (struct IBF_Key);
1744   if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1745       != num_keys * sizeof (struct IBF_Key))
1746   {
1747     GNUNET_break_op (0);
1748     return GNUNET_SYSERR;
1749   }
1750   return GNUNET_OK;
1751 }
1752
1753
1754 /**
1755  * Send offers (for GNUNET_Hash-es) in response
1756  * to inquiries (for IBF_Key-s).
1757  *
1758  * @param cls the union operation
1759  * @param msg the message
1760  */
1761 void
1762 handle_union_p2p_inquiry (void *cls,
1763                           const struct InquiryMessage *msg)
1764 {
1765   struct Operation *op = cls;
1766   const struct IBF_Key *ibf_key;
1767   unsigned int num_keys;
1768
1769   LOG (GNUNET_ERROR_TYPE_DEBUG,
1770        "Received union inquiry\n");
1771   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1772     / sizeof (struct IBF_Key);
1773   ibf_key = (const struct IBF_Key *) &msg[1];
1774   while (0 != num_keys--)
1775   {
1776     struct IBF_Key unsalted_key;
1777
1778     unsalt_key (ibf_key,
1779                 ntohl (msg->salt),
1780                 &unsalted_key);
1781     send_offers_for_key (op,
1782                          unsalted_key);
1783     ibf_key++;
1784   }
1785   GNUNET_CADET_receive_done (op->channel);
1786 }
1787
1788
1789 /**
1790  * Iterator over hash map entries, called to
1791  * destroy the linked list of colliding ibf key entries.
1792  *
1793  * @param cls closure
1794  * @param key current key code
1795  * @param value value in the hash map
1796  * @return #GNUNET_YES if we should continue to iterate,
1797  *         #GNUNET_NO if not.
1798  */
1799 static int
1800 send_missing_full_elements_iter (void *cls,
1801                                  uint32_t key,
1802                                  void *value)
1803 {
1804   struct Operation *op = cls;
1805   struct KeyEntry *ke = value;
1806   struct GNUNET_MQ_Envelope *ev;
1807   struct GNUNET_SET_ElementMessage *emsg;
1808   struct ElementEntry *ee = ke->element;
1809
1810   if (GNUNET_YES == ke->received)
1811     return GNUNET_YES;
1812   ev = GNUNET_MQ_msg_extra (emsg,
1813                             ee->element.size,
1814                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1815   GNUNET_memcpy (&emsg[1],
1816                  ee->element.data,
1817                  ee->element.size);
1818   emsg->element_type = htons (ee->element.element_type);
1819   GNUNET_MQ_send (op->mq,
1820                   ev);
1821   return GNUNET_YES;
1822 }
1823
1824
1825 /**
1826  * Handle a request for full set transmission.
1827  *
1828  * @parem cls closure, a set union operation
1829  * @param mh the demand message
1830  */
1831 void
1832 handle_union_p2p_request_full (void *cls,
1833                                const struct GNUNET_MessageHeader *mh)
1834 {
1835   struct Operation *op = cls;
1836
1837   LOG (GNUNET_ERROR_TYPE_DEBUG,
1838        "Received request for full set transmission\n");
1839   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1840   {
1841     GNUNET_break_op (0);
1842     fail_union_operation (op);
1843     return;
1844   }
1845   if (PHASE_EXPECT_IBF != op->state->phase)
1846   {
1847     GNUNET_break_op (0);
1848     fail_union_operation (op);
1849     return;
1850   }
1851
1852   // FIXME: we need to check that our set is larger than the
1853   // byzantine_lower_bound by some threshold
1854   send_full_set (op);
1855   GNUNET_CADET_receive_done (op->channel);
1856 }
1857
1858
1859 /**
1860  * Handle a "full done" message.
1861  *
1862  * @parem cls closure, a set union operation
1863  * @param mh the demand message
1864  */
1865 void
1866 handle_union_p2p_full_done (void *cls,
1867                             const struct GNUNET_MessageHeader *mh)
1868 {
1869   struct Operation *op = cls;
1870
1871   switch (op->state->phase)
1872   {
1873   case PHASE_EXPECT_IBF:
1874     {
1875       struct GNUNET_MQ_Envelope *ev;
1876
1877       LOG (GNUNET_ERROR_TYPE_DEBUG,
1878            "got FULL DONE, sending elements that other peer is missing\n");
1879
1880       /* send all the elements that did not come from the remote peer */
1881       GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1882                                                &send_missing_full_elements_iter,
1883                                                op);
1884
1885       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1886       GNUNET_MQ_send (op->mq,
1887                       ev);
1888       op->state->phase = PHASE_DONE;
1889       /* we now wait until the other peer sends us the OVER message*/
1890     }
1891     break;
1892   case PHASE_FULL_SENDING:
1893     {
1894       LOG (GNUNET_ERROR_TYPE_DEBUG,
1895            "got FULL DONE, finishing\n");
1896       /* We sent the full set, and got the response for that.  We're done. */
1897       op->state->phase = PHASE_DONE;
1898       GNUNET_CADET_receive_done (op->channel);
1899       send_client_done (op);
1900       destroy_channel (op);
1901       return;
1902     }
1903     break;
1904   default:
1905     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906                 "Handle full done phase is %u\n",
1907                 (unsigned) op->state->phase);
1908     GNUNET_break_op (0);
1909     fail_union_operation (op);
1910     return;
1911   }
1912   GNUNET_CADET_receive_done (op->channel);
1913 }
1914
1915
1916 /**
1917  * Check a demand by the other peer for elements based on a list
1918  * of `struct GNUNET_HashCode`s.
1919  *
1920  * @parem cls closure, a set union operation
1921  * @param mh the demand message
1922  * @return #GNUNET_OK if @a mh is well-formed
1923  */
1924 int
1925 check_union_p2p_demand (void *cls,
1926                         const struct GNUNET_MessageHeader *mh)
1927 {
1928   struct Operation *op = cls;
1929   unsigned int num_hashes;
1930
1931   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1932   {
1933     GNUNET_break_op (0);
1934     return GNUNET_SYSERR;
1935   }
1936   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1937     / sizeof (struct GNUNET_HashCode);
1938   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1939       != num_hashes * sizeof (struct GNUNET_HashCode))
1940   {
1941     GNUNET_break_op (0);
1942     return GNUNET_SYSERR;
1943   }
1944   return GNUNET_OK;
1945 }
1946
1947
1948 /**
1949  * Handle a demand by the other peer for elements based on a list
1950  * of `struct GNUNET_HashCode`s.
1951  *
1952  * @parem cls closure, a set union operation
1953  * @param mh the demand message
1954  */
1955 void
1956 handle_union_p2p_demand (void *cls,
1957                          const struct GNUNET_MessageHeader *mh)
1958 {
1959   struct Operation *op = cls;
1960   struct ElementEntry *ee;
1961   struct GNUNET_SET_ElementMessage *emsg;
1962   const struct GNUNET_HashCode *hash;
1963   unsigned int num_hashes;
1964   struct GNUNET_MQ_Envelope *ev;
1965
1966   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1967     / sizeof (struct GNUNET_HashCode);
1968   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1969        num_hashes > 0;
1970        hash++, num_hashes--)
1971   {
1972     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1973                                             hash);
1974     if (NULL == ee)
1975     {
1976       /* Demand for non-existing element. */
1977       GNUNET_break_op (0);
1978       fail_union_operation (op);
1979       return;
1980     }
1981     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1982     {
1983       /* Probably confused lazily copied sets. */
1984       GNUNET_break_op (0);
1985       fail_union_operation (op);
1986       return;
1987     }
1988     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1989     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1990     emsg->reserved = htons (0);
1991     emsg->element_type = htons (ee->element.element_type);
1992     LOG (GNUNET_ERROR_TYPE_DEBUG,
1993          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1994          (void *) op,
1995          (unsigned int) ee->element.size,
1996          GNUNET_h2s (&ee->element_hash));
1997     GNUNET_MQ_send (op->mq, ev);
1998     GNUNET_STATISTICS_update (_GSS_statistics,
1999                               "# exchanged elements",
2000                               1,
2001                               GNUNET_NO);
2002
2003     switch (op->result_mode)
2004     {
2005       case GNUNET_SET_RESULT_ADDED:
2006         /* Nothing to do. */
2007         break;
2008       case GNUNET_SET_RESULT_SYMMETRIC:
2009         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2010         break;
2011       default:
2012         /* Result mode not supported, should have been caught earlier. */
2013         GNUNET_break (0);
2014         break;
2015     }
2016   }
2017   GNUNET_CADET_receive_done (op->channel);
2018 }
2019
2020
2021 /**
2022  * Check offer (of `struct GNUNET_HashCode`s).
2023  *
2024  * @param cls the union operation
2025  * @param mh the message
2026  * @return #GNUNET_OK if @a mh is well-formed
2027  */
2028 int
2029 check_union_p2p_offer (void *cls,
2030                         const struct GNUNET_MessageHeader *mh)
2031 {
2032   struct Operation *op = cls;
2033   unsigned int num_hashes;
2034
2035   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2036   {
2037     GNUNET_break_op (0);
2038     return GNUNET_SYSERR;
2039   }
2040   /* look up elements and send them */
2041   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2042        (op->state->phase != PHASE_INVENTORY_ACTIVE))
2043   {
2044     GNUNET_break_op (0);
2045     return GNUNET_SYSERR;
2046   }
2047   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2048     / sizeof (struct GNUNET_HashCode);
2049   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) !=
2050       num_hashes * sizeof (struct GNUNET_HashCode))
2051   {
2052     GNUNET_break_op (0);
2053     return GNUNET_SYSERR;
2054   }
2055   return GNUNET_OK;
2056 }
2057
2058
2059 /**
2060  * Handle offers (of `struct GNUNET_HashCode`s) and
2061  * respond with demands (of `struct GNUNET_HashCode`s).
2062  *
2063  * @param cls the union operation
2064  * @param mh the message
2065  */
2066 void
2067 handle_union_p2p_offer (void *cls,
2068                         const struct GNUNET_MessageHeader *mh)
2069 {
2070   struct Operation *op = cls;
2071   const struct GNUNET_HashCode *hash;
2072   unsigned int num_hashes;
2073
2074   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2075     / sizeof (struct GNUNET_HashCode);
2076   for (hash = (const struct GNUNET_HashCode *) &mh[1];
2077        num_hashes > 0;
2078        hash++, num_hashes--)
2079   {
2080     struct ElementEntry *ee;
2081     struct GNUNET_MessageHeader *demands;
2082     struct GNUNET_MQ_Envelope *ev;
2083
2084     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2085                                             hash);
2086     if (NULL != ee)
2087       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2088         continue;
2089
2090     if (GNUNET_YES ==
2091         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2092                                                 hash))
2093     {
2094       LOG (GNUNET_ERROR_TYPE_DEBUG,
2095            "Skipped sending duplicate demand\n");
2096       continue;
2097     }
2098
2099     GNUNET_assert (GNUNET_OK ==
2100                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
2101                                                       hash,
2102                                                       NULL,
2103                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2104
2105     LOG (GNUNET_ERROR_TYPE_DEBUG,
2106          "[OP %x] Requesting element (hash %s)\n",
2107          (void *) op, GNUNET_h2s (hash));
2108     ev = GNUNET_MQ_msg_header_extra (demands,
2109                                      sizeof (struct GNUNET_HashCode),
2110                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2111     GNUNET_memcpy (&demands[1],
2112                    hash,
2113                    sizeof (struct GNUNET_HashCode));
2114     GNUNET_MQ_send (op->mq, ev);
2115   }
2116   GNUNET_CADET_receive_done (op->channel);
2117 }
2118
2119
2120 /**
2121  * Handle a done message from a remote peer
2122  *
2123  * @param cls the union operation
2124  * @param mh the message
2125  */
2126 void
2127 handle_union_p2p_done (void *cls,
2128                        const struct GNUNET_MessageHeader *mh)
2129 {
2130   struct Operation *op = cls;
2131
2132   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2133   {
2134     GNUNET_break_op (0);
2135     fail_union_operation (op);
2136     return;
2137   }
2138   switch (op->state->phase)
2139   {
2140   case PHASE_INVENTORY_PASSIVE:
2141     /* We got all requests, but still have to send our elements in response. */
2142     op->state->phase = PHASE_FINISH_WAITING;
2143
2144     LOG (GNUNET_ERROR_TYPE_DEBUG,
2145          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2146     /* The active peer is done sending offers
2147      * and inquiries.  This means that all
2148      * our responses to that (demands and offers)
2149      * must be in flight (queued or in mesh).
2150      *
2151      * We should notify the active peer once
2152      * all our demands are satisfied, so that the active
2153      * peer can quit if we gave him everything.
2154      */
2155     GNUNET_CADET_receive_done (op->channel);
2156     maybe_finish (op);
2157     return;
2158   case PHASE_INVENTORY_ACTIVE:
2159     LOG (GNUNET_ERROR_TYPE_DEBUG,
2160          "got DONE (as active partner), waiting to finish\n");
2161     /* All demands of the other peer are satisfied,
2162      * and we processed all offers, thus we know
2163      * exactly what our demands must be.
2164      *
2165      * We'll close the channel
2166      * to the other peer once our demands are met.
2167      */
2168     op->state->phase = PHASE_FINISH_CLOSING;
2169     GNUNET_CADET_receive_done (op->channel);
2170     maybe_finish (op);
2171     return;
2172   default:
2173     GNUNET_break_op (0);
2174     fail_union_operation (op);
2175     return;
2176   }
2177 }
2178
2179 /**
2180  * Handle a over message from a remote peer
2181  *
2182  * @param cls the union operation
2183  * @param mh the message
2184  */
2185 void
2186 handle_union_p2p_over (void *cls,
2187                        const struct GNUNET_MessageHeader *mh)
2188 {
2189   send_client_done (cls);
2190 }
2191
2192
2193 /**
2194  * Initiate operation to evaluate a set union with a remote peer.
2195  *
2196  * @param op operation to perform (to be initialized)
2197  * @param opaque_context message to be transmitted to the listener
2198  *        to convince him to accept, may be NULL
2199  */
2200 static struct OperationState *
2201 union_evaluate (struct Operation *op,
2202                 const struct GNUNET_MessageHeader *opaque_context)
2203 {
2204   struct OperationState *state;
2205   struct GNUNET_MQ_Envelope *ev;
2206   struct OperationRequestMessage *msg;
2207
2208   ev = GNUNET_MQ_msg_nested_mh (msg,
2209                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2210                                 opaque_context);
2211   if (NULL == ev)
2212   {
2213     /* the context message is too large */
2214     GNUNET_break (0);
2215     return NULL;
2216   }
2217   state = GNUNET_new (struct OperationState);
2218   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2219                                                                  GNUNET_NO);
2220   /* copy the current generation's strata estimator for this operation */
2221   state->se = strata_estimator_dup (op->set->state->se);
2222   /* we started the operation, thus we have to send the operation request */
2223   state->phase = PHASE_EXPECT_SE;
2224   state->salt_receive = state->salt_send = 42; // FIXME?????
2225   LOG (GNUNET_ERROR_TYPE_DEBUG,
2226        "Initiating union operation evaluation\n");
2227   GNUNET_STATISTICS_update (_GSS_statistics,
2228                             "# of total union operations",
2229                             1,
2230                             GNUNET_NO);
2231   GNUNET_STATISTICS_update (_GSS_statistics,
2232                             "# of initiated union operations",
2233                             1,
2234                             GNUNET_NO);
2235   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2236   GNUNET_MQ_send (op->mq,
2237                   ev);
2238
2239   if (NULL != opaque_context)
2240     LOG (GNUNET_ERROR_TYPE_DEBUG,
2241          "sent op request with context message\n");
2242   else
2243     LOG (GNUNET_ERROR_TYPE_DEBUG,
2244          "sent op request without context message\n");
2245
2246   op->state = state;
2247   initialize_key_to_element (op);
2248   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2249   return state;
2250 }
2251
2252
2253 /**
2254  * Accept an union operation request from a remote peer.
2255  * Only initializes the private operation state.
2256  *
2257  * @param op operation that will be accepted as a union operation
2258  */
2259 static struct OperationState *
2260 union_accept (struct Operation *op)
2261 {
2262   struct OperationState *state;
2263   const struct StrataEstimator *se;
2264   struct GNUNET_MQ_Envelope *ev;
2265   struct StrataEstimatorMessage *strata_msg;
2266   char *buf;
2267   size_t len;
2268   uint16_t type;
2269
2270   LOG (GNUNET_ERROR_TYPE_DEBUG,
2271        "accepting set union operation\n");
2272   GNUNET_STATISTICS_update (_GSS_statistics,
2273                             "# of accepted union operations",
2274                             1,
2275                             GNUNET_NO);
2276   GNUNET_STATISTICS_update (_GSS_statistics,
2277                             "# of total union operations",
2278                             1,
2279                             GNUNET_NO);
2280
2281   state = GNUNET_new (struct OperationState);
2282   state->se = strata_estimator_dup (op->set->state->se);
2283   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2284                                                                  GNUNET_NO);
2285   state->salt_receive = state->salt_send = 42; // FIXME?????
2286   op->state = state;
2287   initialize_key_to_element (op);
2288   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2289
2290   /* kick off the operation */
2291   se = state->se;
2292   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2293   len = strata_estimator_write (se,
2294                                 buf);
2295   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2296     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2297   else
2298     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2299   ev = GNUNET_MQ_msg_extra (strata_msg,
2300                             len,
2301                             type);
2302   GNUNET_memcpy (&strata_msg[1],
2303                  buf,
2304                  len);
2305   GNUNET_free (buf);
2306   strata_msg->set_size
2307     = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
2308   GNUNET_MQ_send (op->mq,
2309                   ev);
2310   state->phase = PHASE_EXPECT_IBF;
2311   return state;
2312 }
2313
2314
2315 /**
2316  * Create a new set supporting the union operation
2317  *
2318  * We maintain one strata estimator per set and then manipulate it over the
2319  * lifetime of the set, as recreating a strata estimator would be expensive.
2320  *
2321  * @return the newly created set, NULL on error
2322  */
2323 static struct SetState *
2324 union_set_create (void)
2325 {
2326   struct SetState *set_state;
2327
2328   LOG (GNUNET_ERROR_TYPE_DEBUG,
2329        "union set created\n");
2330   set_state = GNUNET_new (struct SetState);
2331   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2332                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2333   if (NULL == set_state->se)
2334   {
2335     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2336                 "Failed to allocate strata estimator\n");
2337     GNUNET_free (set_state);
2338     return NULL;
2339   }
2340   return set_state;
2341 }
2342
2343
2344 /**
2345  * Add the element from the given element message to the set.
2346  *
2347  * @param set_state state of the set want to add to
2348  * @param ee the element to add to the set
2349  */
2350 static void
2351 union_add (struct SetState *set_state,
2352            struct ElementEntry *ee)
2353 {
2354   strata_estimator_insert (set_state->se,
2355                            get_ibf_key (&ee->element_hash));
2356 }
2357
2358
2359 /**
2360  * Remove the element given in the element message from the set.
2361  * Only marks the element as removed, so that older set operations can still exchange it.
2362  *
2363  * @param set_state state of the set to remove from
2364  * @param ee set element to remove
2365  */
2366 static void
2367 union_remove (struct SetState *set_state,
2368               struct ElementEntry *ee)
2369 {
2370   strata_estimator_remove (set_state->se,
2371                            get_ibf_key (&ee->element_hash));
2372 }
2373
2374
2375 /**
2376  * Destroy a set that supports the union operation.
2377  *
2378  * @param set_state the set to destroy
2379  */
2380 static void
2381 union_set_destroy (struct SetState *set_state)
2382 {
2383   if (NULL != set_state->se)
2384   {
2385     strata_estimator_destroy (set_state->se);
2386     set_state->se = NULL;
2387   }
2388   GNUNET_free (set_state);
2389 }
2390
2391
2392 /**
2393  * Copy union-specific set state.
2394  *
2395  * @param state source state for copying the union state
2396  * @return a copy of the union-specific set state
2397  */
2398 static struct SetState *
2399 union_copy_state (struct SetState *state)
2400 {
2401   struct SetState *new_state;
2402
2403   GNUNET_assert ( (NULL != state) &&
2404                   (NULL != state->se) );
2405   new_state = GNUNET_new (struct SetState);
2406   new_state->se = strata_estimator_dup (state->se);
2407
2408   return new_state;
2409 }
2410
2411
2412 /**
2413  * Handle case where channel went down for an operation.
2414  *
2415  * @param op operation that lost the channel
2416  */
2417 static void
2418 union_channel_death (struct Operation *op)
2419 {
2420   send_client_done (op);
2421   _GSS_operation_destroy (op,
2422                           GNUNET_YES);
2423 }
2424
2425
2426 /**
2427  * Get the table with implementing functions for
2428  * set union.
2429  *
2430  * @return the operation specific VTable
2431  */
2432 const struct SetVT *
2433 _GSS_union_vt ()
2434 {
2435   static const struct SetVT union_vt = {
2436     .create = &union_set_create,
2437     .add = &union_add,
2438     .remove = &union_remove,
2439     .destroy_set = &union_set_destroy,
2440     .evaluate = &union_evaluate,
2441     .accept = &union_accept,
2442     .cancel = &union_op_cancel,
2443     .copy_state = &union_copy_state,
2444     .channel_death = &union_channel_death
2445   };
2446
2447   return &union_vt;
2448 }