add attestation API
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union.h"
32 #include "gnunet-service-set_union_strata_estimator.h"
33 #include "gnunet-service-set_protocol.h"
34 #include <gcrypt.h>
35
36
37 #define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__)
38
39
40 /**
41  * Number of IBFs in a strata estimator.
42  */
43 #define SE_STRATA_COUNT 32
44
45 /**
46  * Size of the IBFs in the strata estimator.
47  */
48 #define SE_IBF_SIZE 80
49
50 /**
51  * The hash num parameter for the difference digests and strata estimators.
52  */
53 #define SE_IBF_HASH_NUM 4
54
55 /**
56  * Number of buckets that can be transmitted in one message.
57  */
58 #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
59
60 /**
61  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
62  * Choose this value so that computing the IBF is still cheaper
63  * than transmitting all values.
64  */
65 #define MAX_IBF_ORDER (20)
66
67 /**
68  * Number of buckets used in the ibf per estimated
69  * difference.
70  */
71 #define IBF_ALPHA 4
72
73
74 /**
75  * Current phase we are in for a union operation.
76  */
77 enum UnionOperationPhase
78 {
79   /**
80    * We sent the request message, and expect a strata estimator.
81    */
82   PHASE_EXPECT_SE,
83
84   /**
85    * We sent the strata estimator, and expect an IBF. This phase is entered once
86    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87    *
88    * XXX: could use better wording.
89    * XXX: repurposed to also expect a "request full set" message, should be renamed
90    *
91    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
92    */
93   PHASE_EXPECT_IBF,
94
95   /**
96    * Continuation for multi part IBFs.
97    */
98   PHASE_EXPECT_IBF_CONT,
99
100   /**
101    * We are decoding an IBF.
102    */
103   PHASE_INVENTORY_ACTIVE,
104
105   /**
106    * The other peer is decoding the IBF we just sent.
107    */
108   PHASE_INVENTORY_PASSIVE,
109
110   /**
111    * The protocol is almost finished, but we still have to flush our message
112    * queue and/or expect some elements.
113    */
114   PHASE_FINISH_CLOSING,
115
116   /**
117    * In the penultimate phase,
118    * we wait until all our demands
119    * are satisfied.  Then we send a done
120    * message, and wait for another done message.
121    */
122   PHASE_FINISH_WAITING,
123
124   /**
125    * In the ultimate phase, we wait until
126    * our demands are satisfied and then
127    * quit (sending another DONE message).
128    */
129   PHASE_DONE,
130
131   /**
132    * After sending the full set, wait for responses with the elements
133    * that the local peer is missing.
134    */
135   PHASE_FULL_SENDING,
136 };
137
138
139 /**
140  * State of an evaluate operation with another peer.
141  */
142 struct OperationState
143 {
144   /**
145    * Copy of the set's strata estimator at the time of
146    * creation of this operation.
147    */
148   struct StrataEstimator *se;
149
150   /**
151    * The IBF we currently receive.
152    */
153   struct InvertibleBloomFilter *remote_ibf;
154
155   /**
156    * The IBF with the local set's element.
157    */
158   struct InvertibleBloomFilter *local_ibf;
159
160   /**
161    * Maps unsalted IBF-Keys to elements.
162    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
163    * Colliding IBF-Keys are linked.
164    */
165   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
166
167   /**
168    * Current state of the operation.
169    */
170   enum UnionOperationPhase phase;
171
172   /**
173    * Did we send the client that we are done?
174    */
175   int client_done_sent;
176
177   /**
178    * Number of ibf buckets already received into the @a remote_ibf.
179    */
180   unsigned int ibf_buckets_received;
181
182   /**
183    * Hashes for elements that we have demanded from the other peer.
184    */
185   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
186
187   /**
188    * Salt that we're using for sending IBFs
189    */
190   uint32_t salt_send;
191
192   /**
193    * Salt for the IBF we've received and that we're currently decoding.
194    */
195   uint32_t salt_receive;
196
197   /**
198    * Number of elements we received from the other peer
199    * that were not in the local set yet.
200    */
201   uint32_t received_fresh;
202
203   /**
204    * Total number of elements received from the other peer.
205    */
206   uint32_t received_total;
207
208   /**
209    * Initial size of our set, just before
210    * the operation started.
211    */
212   uint64_t initial_size;
213 };
214
215
216 /**
217  * The key entry is used to associate an ibf key with an element.
218  */
219 struct KeyEntry
220 {
221   /**
222    * IBF key for the entry, derived from the current salt.
223    */
224   struct IBF_Key ibf_key;
225
226   /**
227    * The actual element associated with the key.
228    *
229    * Only owned by the union operation if element->operation
230    * is #GNUNET_YES.
231    */
232   struct ElementEntry *element;
233
234   /**
235    * Did we receive this element?
236    * Even if element->is_foreign is false, we might
237    * have received the element, so this indicates that
238    * the other peer has it.
239    */
240   int received;
241 };
242
243
244 /**
245  * Used as a closure for sending elements
246  * with a specific IBF key.
247  */
248 struct SendElementClosure
249 {
250   /**
251    * The IBF key whose matching elements should be
252    * sent.
253    */
254   struct IBF_Key ibf_key;
255
256   /**
257    * Operation for which the elements
258    * should be sent.
259    */
260   struct Operation *op;
261 };
262
263
264 /**
265  * Extra state required for efficient set union.
266  */
267 struct SetState
268 {
269   /**
270    * The strata estimator is only generated once for
271    * each set.
272    * The IBF keys are derived from the element hashes with
273    * salt=0.
274    */
275   struct StrataEstimator *se;
276 };
277
278
279 /**
280  * Iterator over hash map entries, called to
281  * destroy the linked list of colliding ibf key entries.
282  *
283  * @param cls closure
284  * @param key current key code
285  * @param value value in the hash map
286  * @return #GNUNET_YES if we should continue to iterate,
287  *         #GNUNET_NO if not.
288  */
289 static int
290 destroy_key_to_element_iter (void *cls,
291                              uint32_t key,
292                              void *value)
293 {
294   struct KeyEntry *k = value;
295
296   GNUNET_assert (NULL != k);
297   if (GNUNET_YES == k->element->remote)
298   {
299     GNUNET_free (k->element);
300     k->element = NULL;
301   }
302   GNUNET_free (k);
303   return GNUNET_YES;
304 }
305
306
307 /**
308  * Destroy the union operation.  Only things specific to the union
309  * operation are destroyed.
310  *
311  * @param op union operation to destroy
312  */
313 static void
314 union_op_cancel (struct Operation *op)
315 {
316   LOG (GNUNET_ERROR_TYPE_DEBUG,
317        "destroying union op\n");
318   /* check if the op was canceled twice */
319   GNUNET_assert (NULL != op->state);
320   if (NULL != op->state->remote_ibf)
321   {
322     ibf_destroy (op->state->remote_ibf);
323     op->state->remote_ibf = NULL;
324   }
325   if (NULL != op->state->demanded_hashes)
326   {
327     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
328     op->state->demanded_hashes = NULL;
329   }
330   if (NULL != op->state->local_ibf)
331   {
332     ibf_destroy (op->state->local_ibf);
333     op->state->local_ibf = NULL;
334   }
335   if (NULL != op->state->se)
336   {
337     strata_estimator_destroy (op->state->se);
338     op->state->se = NULL;
339   }
340   if (NULL != op->state->key_to_element)
341   {
342     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
343                                              &destroy_key_to_element_iter,
344                                              NULL);
345     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
346     op->state->key_to_element = NULL;
347   }
348   GNUNET_free (op->state);
349   op->state = NULL;
350   LOG (GNUNET_ERROR_TYPE_DEBUG,
351        "destroying union op done\n");
352 }
353
354
355 /**
356  * Inform the client that the union operation has failed,
357  * and proceed to destroy the evaluate operation.
358  *
359  * @param op the union operation to fail
360  */
361 static void
362 fail_union_operation (struct Operation *op)
363 {
364   struct GNUNET_MQ_Envelope *ev;
365   struct GNUNET_SET_ResultMessage *msg;
366
367   LOG (GNUNET_ERROR_TYPE_WARNING,
368        "union operation failed\n");
369   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371   msg->request_id = htonl (op->client_request_id);
372   msg->element_type = htons (0);
373   GNUNET_MQ_send (op->set->cs->mq,
374                   ev);
375   _GSS_operation_destroy (op, GNUNET_YES);
376 }
377
378
379 /**
380  * Derive the IBF key from a hash code and
381  * a salt.
382  *
383  * @param src the hash code
384  * @return the derived IBF key
385  */
386 static struct IBF_Key
387 get_ibf_key (const struct GNUNET_HashCode *src)
388 {
389   struct IBF_Key key;
390   uint16_t salt = 0;
391
392   GNUNET_assert (GNUNET_OK ==
393                  GNUNET_CRYPTO_kdf (&key, sizeof(key),
394                                     src, sizeof *src,
395                                     &salt, sizeof(salt),
396                                     NULL, 0));
397   return key;
398 }
399
400
401 /**
402  * Context for #op_get_element_iterator
403  */
404 struct GetElementContext
405 {
406   /**
407    * FIXME.
408    */
409   struct GNUNET_HashCode hash;
410
411   /**
412    * FIXME.
413    */
414   struct KeyEntry *k;
415 };
416
417
418 /**
419  * Iterator over the mapping from IBF keys to element entries.  Checks if we
420  * have an element with a given GNUNET_HashCode.
421  *
422  * @param cls closure
423  * @param key current key code
424  * @param value value in the hash map
425  * @return #GNUNET_YES if we should search further,
426  *         #GNUNET_NO if we've found the element.
427  */
428 static int
429 op_get_element_iterator (void *cls,
430                          uint32_t key,
431                          void *value)
432 {
433   struct GetElementContext *ctx = cls;
434   struct KeyEntry *k = value;
435
436   GNUNET_assert (NULL != k);
437   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
438                                    &ctx->hash))
439   {
440     ctx->k = k;
441     return GNUNET_NO;
442   }
443   return GNUNET_YES;
444 }
445
446
447 /**
448  * Determine whether the given element is already in the operation's element
449  * set.
450  *
451  * @param op operation that should be tested for 'element_hash'
452  * @param element_hash hash of the element to look for
453  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
454  */
455 static struct KeyEntry *
456 op_get_element (struct Operation *op,
457                 const struct GNUNET_HashCode *element_hash)
458 {
459   int ret;
460   struct IBF_Key ibf_key;
461   struct GetElementContext ctx = { { { 0 } }, 0 };
462
463   ctx.hash = *element_hash;
464
465   ibf_key = get_ibf_key (element_hash);
466   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
467                                                       (uint32_t) ibf_key.key_val,
468                                                       op_get_element_iterator,
469                                                       &ctx);
470
471   /* was the iteration aborted because we found the element? */
472   if (GNUNET_SYSERR == ret)
473   {
474     GNUNET_assert (NULL != ctx.k);
475     return ctx.k;
476   }
477   return NULL;
478 }
479
480
481 /**
482  * Insert an element into the union operation's
483  * key-to-element mapping. Takes ownership of 'ee'.
484  * Note that this does not insert the element in the set,
485  * only in the operation's key-element mapping.
486  * This is done to speed up re-tried operations, if some elements
487  * were transmitted, and then the IBF fails to decode.
488  *
489  * XXX: clarify ownership, doesn't sound right.
490  *
491  * @param op the union operation
492  * @param ee the element entry
493  * @parem received was this element received from the remote peer?
494  */
495 static void
496 op_register_element (struct Operation *op,
497                      struct ElementEntry *ee,
498                      int received)
499 {
500   struct IBF_Key ibf_key;
501   struct KeyEntry *k;
502
503   ibf_key = get_ibf_key (&ee->element_hash);
504   k = GNUNET_new (struct KeyEntry);
505   k->element = ee;
506   k->ibf_key = ibf_key;
507   k->received = received;
508   GNUNET_assert (GNUNET_OK ==
509                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
510                                                       (uint32_t) ibf_key.key_val,
511                                                       k,
512                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
513 }
514
515
516 /**
517  * FIXME.
518  */
519 static void
520 salt_key (const struct IBF_Key *k_in,
521           uint32_t salt,
522           struct IBF_Key *k_out)
523 {
524   int s = salt % 64;
525   uint64_t x = k_in->key_val;
526
527   /* rotate ibf key */
528   x = (x >> s) | (x << (64 - s));
529   k_out->key_val = x;
530 }
531
532
533 /**
534  * FIXME.
535  */
536 static void
537 unsalt_key (const struct IBF_Key *k_in,
538             uint32_t salt,
539             struct IBF_Key *k_out)
540 {
541   int s = salt % 64;
542   uint64_t x = k_in->key_val;
543
544   x = (x << s) | (x >> (64 - s));
545   k_out->key_val = x;
546 }
547
548
549 /**
550  * Insert a key into an ibf.
551  *
552  * @param cls the ibf
553  * @param key unused
554  * @param value the key entry to get the key from
555  */
556 static int
557 prepare_ibf_iterator (void *cls,
558                       uint32_t key,
559                       void *value)
560 {
561   struct Operation *op = cls;
562   struct KeyEntry *ke = value;
563   struct IBF_Key salted_key;
564
565   LOG (GNUNET_ERROR_TYPE_DEBUG,
566        "[OP %x] inserting %lx (hash %s) into ibf\n",
567        (void *) op,
568        (unsigned long) ke->ibf_key.key_val,
569        GNUNET_h2s (&ke->element->element_hash));
570   salt_key (&ke->ibf_key,
571             op->state->salt_send,
572             &salted_key);
573   ibf_insert (op->state->local_ibf, salted_key);
574   return GNUNET_YES;
575 }
576
577
578 /**
579  * Iterator for initializing the
580  * key-to-element mapping of a union operation
581  *
582  * @param cls the union operation `struct Operation *`
583  * @param key unused
584  * @param value the `struct ElementEntry *` to insert
585  *        into the key-to-element mapping
586  * @return #GNUNET_YES (to continue iterating)
587  */
588 static int
589 init_key_to_element_iterator (void *cls,
590                               const struct GNUNET_HashCode *key,
591                               void *value)
592 {
593   struct Operation *op = cls;
594   struct ElementEntry *ee = value;
595
596   /* make sure that the element belongs to the set at the time
597    * of creating the operation */
598   if (GNUNET_NO ==
599       _GSS_is_element_of_operation (ee,
600                                     op))
601     return GNUNET_YES;
602   GNUNET_assert (GNUNET_NO == ee->remote);
603   op_register_element (op,
604                        ee,
605                        GNUNET_NO);
606   return GNUNET_YES;
607 }
608
609
610 /**
611  * Initialize the IBF key to element mapping local to this set
612  * operation.
613  *
614  * @param op the set union operation
615  */
616 static void
617 initialize_key_to_element (struct Operation *op)
618 {
619   unsigned int len;
620
621   GNUNET_assert (NULL == op->state->key_to_element);
622   len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
623   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
624   GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
625                                          &init_key_to_element_iterator,
626                                          op);
627 }
628
629
630 /**
631  * Create an ibf with the operation's elements
632  * of the specified size
633  *
634  * @param op the union operation
635  * @param size size of the ibf to create
636  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
637  */
638 static int
639 prepare_ibf (struct Operation *op,
640              uint32_t size)
641 {
642   GNUNET_assert (NULL != op->state->key_to_element);
643
644   if (NULL != op->state->local_ibf)
645     ibf_destroy (op->state->local_ibf);
646   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
647   if (NULL == op->state->local_ibf)
648   {
649     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
650                 "Failed to allocate local IBF\n");
651     return GNUNET_SYSERR;
652   }
653   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
654                                            &prepare_ibf_iterator,
655                                            op);
656   return GNUNET_OK;
657 }
658
659
660 /**
661  * Send an ibf of appropriate size.
662  *
663  * Fragments the IBF into multiple messages if necessary.
664  *
665  * @param op the union operation
666  * @param ibf_order order of the ibf to send, size=2^order
667  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
668  */
669 static int
670 send_ibf (struct Operation *op,
671           uint16_t ibf_order)
672 {
673   unsigned int buckets_sent = 0;
674   struct InvertibleBloomFilter *ibf;
675
676   if (GNUNET_OK !=
677       prepare_ibf (op, 1 << ibf_order))
678   {
679     /* allocation failed */
680     return GNUNET_SYSERR;
681   }
682
683   LOG (GNUNET_ERROR_TYPE_DEBUG,
684        "sending ibf of size %u\n",
685        1 << ibf_order);
686
687   {
688     char name[64] = { 0 };
689     snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
690     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
691   }
692
693   ibf = op->state->local_ibf;
694
695   while (buckets_sent < (1 << ibf_order))
696   {
697     unsigned int buckets_in_message;
698     struct GNUNET_MQ_Envelope *ev;
699     struct IBFMessage *msg;
700
701     buckets_in_message = (1 << ibf_order) - buckets_sent;
702     /* limit to maximum */
703     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
704       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
705
706     ev = GNUNET_MQ_msg_extra (msg,
707                               buckets_in_message * IBF_BUCKET_SIZE,
708                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
709     msg->reserved1 = 0;
710     msg->reserved2 = 0;
711     msg->order = ibf_order;
712     msg->offset = htonl (buckets_sent);
713     msg->salt = htonl (op->state->salt_send);
714     ibf_write_slice (ibf, buckets_sent,
715                      buckets_in_message, &msg[1]);
716     buckets_sent += buckets_in_message;
717     LOG (GNUNET_ERROR_TYPE_DEBUG,
718          "ibf chunk size %u, %u/%u sent\n",
719          buckets_in_message,
720          buckets_sent,
721          1 << ibf_order);
722     GNUNET_MQ_send (op->mq, ev);
723   }
724
725   /* The other peer must decode the IBF, so
726    * we're passive. */
727   op->state->phase = PHASE_INVENTORY_PASSIVE;
728   return GNUNET_OK;
729 }
730
731
732 /**
733  * Compute the necessary order of an ibf
734  * from the size of the symmetric set difference.
735  *
736  * @param diff the difference
737  * @return the required size of the ibf
738  */
739 static unsigned int
740 get_order_from_difference (unsigned int diff)
741 {
742   unsigned int ibf_order;
743
744   ibf_order = 2;
745   while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
746           ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
747          (ibf_order < MAX_IBF_ORDER))
748     ibf_order++;
749   // add one for correction
750   return ibf_order + 1;
751 }
752
753
754 /**
755  * Send a set element.
756  *
757  * @param cls the union operation `struct Operation *`
758  * @param key unused
759  * @param value the `struct ElementEntry *` to insert
760  *        into the key-to-element mapping
761  * @return #GNUNET_YES (to continue iterating)
762  */
763 static int
764 send_full_element_iterator (void *cls,
765                             const struct GNUNET_HashCode *key,
766                             void *value)
767 {
768   struct Operation *op = cls;
769   struct GNUNET_SET_ElementMessage *emsg;
770   struct ElementEntry *ee = value;
771   struct GNUNET_SET_Element *el = &ee->element;
772   struct GNUNET_MQ_Envelope *ev;
773
774   LOG (GNUNET_ERROR_TYPE_DEBUG,
775        "Sending element %s\n",
776        GNUNET_h2s (key));
777   ev = GNUNET_MQ_msg_extra (emsg,
778                             el->size,
779                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
780   emsg->element_type = htons (el->element_type);
781   GNUNET_memcpy (&emsg[1],
782                  el->data,
783                  el->size);
784   GNUNET_MQ_send (op->mq,
785                   ev);
786   return GNUNET_YES;
787 }
788
789
790 /**
791  * Switch to full set transmission for @a op.
792  *
793  * @param op operation to switch to full set transmission.
794  */
795 static void
796 send_full_set (struct Operation *op)
797 {
798   struct GNUNET_MQ_Envelope *ev;
799
800   op->state->phase = PHASE_FULL_SENDING;
801   LOG (GNUNET_ERROR_TYPE_DEBUG,
802        "Dedicing to transmit the full set\n");
803   /* FIXME: use a more memory-friendly way of doing this with an
804      iterator, just as we do in the non-full case! */
805   (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
806                                                 &send_full_element_iterator,
807                                                 op);
808   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
809   GNUNET_MQ_send (op->mq,
810                   ev);
811 }
812
813
814 /**
815  * Handle a strata estimator from a remote peer
816  *
817  * @param cls the union operation
818  * @param msg the message
819  */
820 int
821 check_union_p2p_strata_estimator (void *cls,
822                                   const struct StrataEstimatorMessage *msg)
823 {
824   struct Operation *op = cls;
825   int is_compressed;
826   size_t len;
827
828   if (op->state->phase != PHASE_EXPECT_SE)
829   {
830     GNUNET_break (0);
831     return GNUNET_SYSERR;
832   }
833   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
834                      msg->header.type));
835   len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
836   if ((GNUNET_NO == is_compressed) &&
837       (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
838   {
839     GNUNET_break (0);
840     return GNUNET_SYSERR;
841   }
842   return GNUNET_OK;
843 }
844
845
846 /**
847  * Handle a strata estimator from a remote peer
848  *
849  * @param cls the union operation
850  * @param msg the message
851  */
852 void
853 handle_union_p2p_strata_estimator (void *cls,
854                                    const struct StrataEstimatorMessage *msg)
855 {
856   struct Operation *op = cls;
857   struct StrataEstimator *remote_se;
858   unsigned int diff;
859   uint64_t other_size;
860   size_t len;
861   int is_compressed;
862
863   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
864                      msg->header.type));
865   GNUNET_STATISTICS_update (_GSS_statistics,
866                             "# bytes of SE received",
867                             ntohs (msg->header.size),
868                             GNUNET_NO);
869   len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
870   other_size = GNUNET_ntohll (msg->set_size);
871   remote_se = strata_estimator_create (SE_STRATA_COUNT,
872                                        SE_IBF_SIZE,
873                                        SE_IBF_HASH_NUM);
874   if (NULL == remote_se)
875   {
876     /* insufficient resources, fail */
877     fail_union_operation (op);
878     return;
879   }
880   if (GNUNET_OK !=
881       strata_estimator_read (&msg[1],
882                              len,
883                              is_compressed,
884                              remote_se))
885   {
886     /* decompression failed */
887     strata_estimator_destroy (remote_se);
888     fail_union_operation (op);
889     return;
890   }
891   GNUNET_assert (NULL != op->state->se);
892   diff = strata_estimator_difference (remote_se,
893                                       op->state->se);
894
895   if (diff > 200)
896     diff = diff * 3 / 2;
897
898   strata_estimator_destroy (remote_se);
899   strata_estimator_destroy (op->state->se);
900   op->state->se = NULL;
901   LOG (GNUNET_ERROR_TYPE_DEBUG,
902        "got se diff=%d, using ibf size %d\n",
903        diff,
904        1U << get_order_from_difference (diff));
905
906   {
907     char *set_debug;
908
909     set_debug = getenv ("GNUNET_SET_BENCHMARK");
910     if ((NULL != set_debug) &&
911         (0 == strcmp (set_debug, "1")))
912     {
913       FILE *f = fopen ("set.log", "a");
914       fprintf (f, "%llu\n", (unsigned long long) diff);
915       fclose (f);
916     }
917   }
918
919   if ((GNUNET_YES == op->byzantine) &&
920       (other_size < op->byzantine_lower_bound))
921   {
922     GNUNET_break (0);
923     fail_union_operation (op);
924     return;
925   }
926
927   if ((GNUNET_YES == op->force_full) ||
928       (diff > op->state->initial_size / 4) ||
929       (0 == other_size))
930   {
931     LOG (GNUNET_ERROR_TYPE_DEBUG,
932          "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
933          diff,
934          op->state->initial_size);
935     GNUNET_STATISTICS_update (_GSS_statistics,
936                               "# of full sends",
937                               1,
938                               GNUNET_NO);
939     if ((op->state->initial_size <= other_size) ||
940         (0 == other_size))
941     {
942       send_full_set (op);
943     }
944     else
945     {
946       struct GNUNET_MQ_Envelope *ev;
947
948       LOG (GNUNET_ERROR_TYPE_DEBUG,
949            "Telling other peer that we expect its full set\n");
950       op->state->phase = PHASE_EXPECT_IBF;
951       ev = GNUNET_MQ_msg_header (
952         GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
953       GNUNET_MQ_send (op->mq,
954                       ev);
955     }
956   }
957   else
958   {
959     GNUNET_STATISTICS_update (_GSS_statistics,
960                               "# of ibf sends",
961                               1,
962                               GNUNET_NO);
963     if (GNUNET_OK !=
964         send_ibf (op,
965                   get_order_from_difference (diff)))
966     {
967       /* Internal error, best we can do is shut the connection */
968       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
969                   "Failed to send IBF, closing connection\n");
970       fail_union_operation (op);
971       return;
972     }
973   }
974   GNUNET_CADET_receive_done (op->channel);
975 }
976
977
978 /**
979  * Iterator to send elements to a remote peer
980  *
981  * @param cls closure with the element key and the union operation
982  * @param key ignored
983  * @param value the key entry
984  */
985 static int
986 send_offers_iterator (void *cls,
987                       uint32_t key,
988                       void *value)
989 {
990   struct SendElementClosure *sec = cls;
991   struct Operation *op = sec->op;
992   struct KeyEntry *ke = value;
993   struct GNUNET_MQ_Envelope *ev;
994   struct GNUNET_MessageHeader *mh;
995
996   /* Detect 32-bit key collision for the 64-bit IBF keys. */
997   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
998     return GNUNET_YES;
999
1000   ev = GNUNET_MQ_msg_header_extra (mh,
1001                                    sizeof(struct GNUNET_HashCode),
1002                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
1003
1004   GNUNET_assert (NULL != ev);
1005   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1006   LOG (GNUNET_ERROR_TYPE_DEBUG,
1007        "[OP %x] sending element offer (%s) to peer\n",
1008        (void *) op,
1009        GNUNET_h2s (&ke->element->element_hash));
1010   GNUNET_MQ_send (op->mq, ev);
1011   return GNUNET_YES;
1012 }
1013
1014
1015 /**
1016  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1017  *
1018  * @param op union operation
1019  * @param ibf_key IBF key of interest
1020  */
1021 static void
1022 send_offers_for_key (struct Operation *op,
1023                      struct IBF_Key ibf_key)
1024 {
1025   struct SendElementClosure send_cls;
1026
1027   send_cls.ibf_key = ibf_key;
1028   send_cls.op = op;
1029   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
1030     op->state->key_to_element,
1031     (uint32_t) ibf_key.
1032     key_val,
1033     &send_offers_iterator,
1034     &send_cls);
1035 }
1036
1037
1038 /**
1039  * Decode which elements are missing on each side, and
1040  * send the appropriate offers and inquiries.
1041  *
1042  * @param op union operation
1043  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1044  */
1045 static int
1046 decode_and_send (struct Operation *op)
1047 {
1048   struct IBF_Key key;
1049   struct IBF_Key last_key;
1050   int side;
1051   unsigned int num_decoded;
1052   struct InvertibleBloomFilter *diff_ibf;
1053
1054   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1055
1056   if (GNUNET_OK !=
1057       prepare_ibf (op,
1058                    op->state->remote_ibf->size))
1059   {
1060     GNUNET_break (0);
1061     /* allocation failed */
1062     return GNUNET_SYSERR;
1063   }
1064   diff_ibf = ibf_dup (op->state->local_ibf);
1065   ibf_subtract (diff_ibf,
1066                 op->state->remote_ibf);
1067
1068   ibf_destroy (op->state->remote_ibf);
1069   op->state->remote_ibf = NULL;
1070
1071   LOG (GNUNET_ERROR_TYPE_DEBUG,
1072        "decoding IBF (size=%u)\n",
1073        diff_ibf->size);
1074
1075   num_decoded = 0;
1076   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1077
1078   while (1)
1079   {
1080     int res;
1081     int cycle_detected = GNUNET_NO;
1082
1083     last_key = key;
1084
1085     res = ibf_decode (diff_ibf, &side, &key);
1086     if (res == GNUNET_OK)
1087     {
1088       LOG (GNUNET_ERROR_TYPE_DEBUG,
1089            "decoded ibf key %lx\n",
1090            (unsigned long) key.key_val);
1091       num_decoded += 1;
1092       if ((num_decoded > diff_ibf->size) ||
1093           ((num_decoded > 1) &&
1094            (last_key.key_val == key.key_val)))
1095       {
1096         LOG (GNUNET_ERROR_TYPE_DEBUG,
1097              "detected cyclic ibf (decoded %u/%u)\n",
1098              num_decoded,
1099              diff_ibf->size);
1100         cycle_detected = GNUNET_YES;
1101       }
1102     }
1103     if ((GNUNET_SYSERR == res) ||
1104         (GNUNET_YES == cycle_detected))
1105     {
1106       int next_order;
1107       next_order = 0;
1108       while (1 << next_order < diff_ibf->size)
1109         next_order++;
1110       next_order++;
1111       if (next_order <= MAX_IBF_ORDER)
1112       {
1113         LOG (GNUNET_ERROR_TYPE_DEBUG,
1114              "decoding failed, sending larger ibf (size %u)\n",
1115              1 << next_order);
1116         GNUNET_STATISTICS_update (_GSS_statistics,
1117                                   "# of IBF retries",
1118                                   1,
1119                                   GNUNET_NO);
1120         op->state->salt_send++;
1121         if (GNUNET_OK !=
1122             send_ibf (op, next_order))
1123         {
1124           /* Internal error, best we can do is shut the connection */
1125           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1126                       "Failed to send IBF, closing connection\n");
1127           fail_union_operation (op);
1128           ibf_destroy (diff_ibf);
1129           return GNUNET_SYSERR;
1130         }
1131       }
1132       else
1133       {
1134         GNUNET_STATISTICS_update (_GSS_statistics,
1135                                   "# of failed union operations (too large)",
1136                                   1,
1137                                   GNUNET_NO);
1138         // XXX: Send the whole set, element-by-element
1139         LOG (GNUNET_ERROR_TYPE_ERROR,
1140              "set union failed: reached ibf limit\n");
1141         fail_union_operation (op);
1142         ibf_destroy (diff_ibf);
1143         return GNUNET_SYSERR;
1144       }
1145       break;
1146     }
1147     if (GNUNET_NO == res)
1148     {
1149       struct GNUNET_MQ_Envelope *ev;
1150
1151       LOG (GNUNET_ERROR_TYPE_DEBUG,
1152            "transmitted all values, sending DONE\n");
1153       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1154       GNUNET_MQ_send (op->mq, ev);
1155       /* We now wait until we get a DONE message back
1156        * and then wait for our MQ to be flushed and all our
1157        * demands be delivered. */
1158       break;
1159     }
1160     if (1 == side)
1161     {
1162       struct IBF_Key unsalted_key;
1163
1164       unsalt_key (&key,
1165                   op->state->salt_receive,
1166                   &unsalted_key);
1167       send_offers_for_key (op,
1168                            unsalted_key);
1169     }
1170     else if (-1 == side)
1171     {
1172       struct GNUNET_MQ_Envelope *ev;
1173       struct InquiryMessage *msg;
1174
1175       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1176        * the effort additional complexity. */
1177       ev = GNUNET_MQ_msg_extra (msg,
1178                                 sizeof(struct IBF_Key),
1179                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1180       msg->salt = htonl (op->state->salt_receive);
1181       GNUNET_memcpy (&msg[1],
1182                      &key,
1183                      sizeof(struct IBF_Key));
1184       LOG (GNUNET_ERROR_TYPE_DEBUG,
1185            "sending element inquiry for IBF key %lx\n",
1186            (unsigned long) key.key_val);
1187       GNUNET_MQ_send (op->mq, ev);
1188     }
1189     else
1190     {
1191       GNUNET_assert (0);
1192     }
1193   }
1194   ibf_destroy (diff_ibf);
1195   return GNUNET_OK;
1196 }
1197
1198
1199 /**
1200  * Check an IBF message from a remote peer.
1201  *
1202  * Reassemble the IBF from multiple pieces, and
1203  * process the whole IBF once possible.
1204  *
1205  * @param cls the union operation
1206  * @param msg the header of the message
1207  * @return #GNUNET_OK if @a msg is well-formed
1208  */
1209 int
1210 check_union_p2p_ibf (void *cls,
1211                      const struct IBFMessage *msg)
1212 {
1213   struct Operation *op = cls;
1214   unsigned int buckets_in_message;
1215
1216   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1217   {
1218     GNUNET_break_op (0);
1219     return GNUNET_SYSERR;
1220   }
1221   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1222                        / IBF_BUCKET_SIZE;
1223   if (0 == buckets_in_message)
1224   {
1225     GNUNET_break_op (0);
1226     return GNUNET_SYSERR;
1227   }
1228   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1229       * IBF_BUCKET_SIZE)
1230   {
1231     GNUNET_break_op (0);
1232     return GNUNET_SYSERR;
1233   }
1234   if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1235   {
1236     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1237     {
1238       GNUNET_break_op (0);
1239       return GNUNET_SYSERR;
1240     }
1241     if (1 << msg->order != op->state->remote_ibf->size)
1242     {
1243       GNUNET_break_op (0);
1244       return GNUNET_SYSERR;
1245     }
1246     if (ntohl (msg->salt) != op->state->salt_receive)
1247     {
1248       GNUNET_break_op (0);
1249       return GNUNET_SYSERR;
1250     }
1251   }
1252   else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1253            (op->state->phase != PHASE_EXPECT_IBF))
1254   {
1255     GNUNET_break_op (0);
1256     return GNUNET_SYSERR;
1257   }
1258
1259   return GNUNET_OK;
1260 }
1261
1262
1263 /**
1264  * Handle an IBF message from a remote peer.
1265  *
1266  * Reassemble the IBF from multiple pieces, and
1267  * process the whole IBF once possible.
1268  *
1269  * @param cls the union operation
1270  * @param msg the header of the message
1271  */
1272 void
1273 handle_union_p2p_ibf (void *cls,
1274                       const struct IBFMessage *msg)
1275 {
1276   struct Operation *op = cls;
1277   unsigned int buckets_in_message;
1278
1279   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1280                        / IBF_BUCKET_SIZE;
1281   if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1282       (op->state->phase == PHASE_EXPECT_IBF))
1283   {
1284     op->state->phase = PHASE_EXPECT_IBF_CONT;
1285     GNUNET_assert (NULL == op->state->remote_ibf);
1286     LOG (GNUNET_ERROR_TYPE_DEBUG,
1287          "Creating new ibf of size %u\n",
1288          1 << msg->order);
1289     op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1290     op->state->salt_receive = ntohl (msg->salt);
1291     LOG (GNUNET_ERROR_TYPE_DEBUG,
1292          "Receiving new IBF with salt %u\n",
1293          op->state->salt_receive);
1294     if (NULL == op->state->remote_ibf)
1295     {
1296       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1297                   "Failed to parse remote IBF, closing connection\n");
1298       fail_union_operation (op);
1299       return;
1300     }
1301     op->state->ibf_buckets_received = 0;
1302     if (0 != ntohl (msg->offset))
1303     {
1304       GNUNET_break_op (0);
1305       fail_union_operation (op);
1306       return;
1307     }
1308   }
1309   else
1310   {
1311     GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1312     LOG (GNUNET_ERROR_TYPE_DEBUG,
1313          "Received more of IBF\n");
1314   }
1315   GNUNET_assert (NULL != op->state->remote_ibf);
1316
1317   ibf_read_slice (&msg[1],
1318                   op->state->ibf_buckets_received,
1319                   buckets_in_message,
1320                   op->state->remote_ibf);
1321   op->state->ibf_buckets_received += buckets_in_message;
1322
1323   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1324   {
1325     LOG (GNUNET_ERROR_TYPE_DEBUG,
1326          "received full ibf\n");
1327     op->state->phase = PHASE_INVENTORY_ACTIVE;
1328     if (GNUNET_OK !=
1329         decode_and_send (op))
1330     {
1331       /* Internal error, best we can do is shut down */
1332       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1333                   "Failed to decode IBF, closing connection\n");
1334       fail_union_operation (op);
1335       return;
1336     }
1337   }
1338   GNUNET_CADET_receive_done (op->channel);
1339 }
1340
1341
1342 /**
1343  * Send a result message to the client indicating
1344  * that there is a new element.
1345  *
1346  * @param op union operation
1347  * @param element element to send
1348  * @param status status to send with the new element
1349  */
1350 static void
1351 send_client_element (struct Operation *op,
1352                      struct GNUNET_SET_Element *element,
1353                      int status)
1354 {
1355   struct GNUNET_MQ_Envelope *ev;
1356   struct GNUNET_SET_ResultMessage *rm;
1357
1358   LOG (GNUNET_ERROR_TYPE_DEBUG,
1359        "sending element (size %u) to client\n",
1360        element->size);
1361   GNUNET_assert (0 != op->client_request_id);
1362   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1363   if (NULL == ev)
1364   {
1365     GNUNET_MQ_discard (ev);
1366     GNUNET_break (0);
1367     return;
1368   }
1369   rm->result_status = htons (status);
1370   rm->request_id = htonl (op->client_request_id);
1371   rm->element_type = htons (element->element_type);
1372   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1373                                       op->state->key_to_element));
1374   GNUNET_memcpy (&rm[1],
1375                  element->data,
1376                  element->size);
1377   GNUNET_MQ_send (op->set->cs->mq,
1378                   ev);
1379 }
1380
1381
1382 /**
1383  * Signal to the client that the operation has finished and
1384  * destroy the operation.
1385  *
1386  * @param cls operation to destroy
1387  */
1388 static void
1389 send_client_done (void *cls)
1390 {
1391   struct Operation *op = cls;
1392   struct GNUNET_MQ_Envelope *ev;
1393   struct GNUNET_SET_ResultMessage *rm;
1394
1395   if (GNUNET_YES == op->state->client_done_sent)
1396   {
1397     return;
1398   }
1399
1400   if (PHASE_DONE != op->state->phase)
1401   {
1402     LOG (GNUNET_ERROR_TYPE_WARNING,
1403          "Union operation failed\n");
1404     GNUNET_STATISTICS_update (_GSS_statistics,
1405                               "# Union operations failed",
1406                               1,
1407                               GNUNET_NO);
1408     ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1409     rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1410     rm->request_id = htonl (op->client_request_id);
1411     rm->element_type = htons (0);
1412     GNUNET_MQ_send (op->set->cs->mq,
1413                     ev);
1414     return;
1415   }
1416
1417   op->state->client_done_sent = GNUNET_YES;
1418
1419   GNUNET_STATISTICS_update (_GSS_statistics,
1420                             "# Union operations succeeded",
1421                             1,
1422                             GNUNET_NO);
1423   LOG (GNUNET_ERROR_TYPE_INFO,
1424        "Signalling client that union operation is done\n");
1425   ev = GNUNET_MQ_msg (rm,
1426                       GNUNET_MESSAGE_TYPE_SET_RESULT);
1427   rm->request_id = htonl (op->client_request_id);
1428   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1429   rm->element_type = htons (0);
1430   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1431                                       op->state->key_to_element));
1432   GNUNET_MQ_send (op->set->cs->mq,
1433                   ev);
1434 }
1435
1436
1437 /**
1438  * Tests if the operation is finished, and if so notify.
1439  *
1440  * @param op operation to check
1441  */
1442 static void
1443 maybe_finish (struct Operation *op)
1444 {
1445   unsigned int num_demanded;
1446
1447   num_demanded = GNUNET_CONTAINER_multihashmap_size (
1448     op->state->demanded_hashes);
1449
1450   if (PHASE_FINISH_WAITING == op->state->phase)
1451   {
1452     LOG (GNUNET_ERROR_TYPE_DEBUG,
1453          "In PHASE_FINISH_WAITING, pending %u demands\n",
1454          num_demanded);
1455     if (0 == num_demanded)
1456     {
1457       struct GNUNET_MQ_Envelope *ev;
1458
1459       op->state->phase = PHASE_DONE;
1460       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1461       GNUNET_MQ_send (op->mq,
1462                       ev);
1463       /* We now wait until the other peer sends P2P_OVER
1464        * after it got all elements from us. */
1465     }
1466   }
1467   if (PHASE_FINISH_CLOSING == op->state->phase)
1468   {
1469     LOG (GNUNET_ERROR_TYPE_DEBUG,
1470          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1471          num_demanded);
1472     if (0 == num_demanded)
1473     {
1474       op->state->phase = PHASE_DONE;
1475       send_client_done (op);
1476       _GSS_operation_destroy2 (op);
1477     }
1478   }
1479 }
1480
1481
1482 /**
1483  * Check an element message from a remote peer.
1484  *
1485  * @param cls the union operation
1486  * @param emsg the message
1487  */
1488 int
1489 check_union_p2p_elements (void *cls,
1490                           const struct GNUNET_SET_ElementMessage *emsg)
1491 {
1492   struct Operation *op = cls;
1493
1494   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1495   {
1496     GNUNET_break_op (0);
1497     return GNUNET_SYSERR;
1498   }
1499   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1500   {
1501     GNUNET_break_op (0);
1502     return GNUNET_SYSERR;
1503   }
1504   return GNUNET_OK;
1505 }
1506
1507
1508 /**
1509  * Handle an element message from a remote peer.
1510  * Sent by the other peer either because we decoded an IBF and placed a demand,
1511  * or because the other peer switched to full set transmission.
1512  *
1513  * @param cls the union operation
1514  * @param emsg the message
1515  */
1516 void
1517 handle_union_p2p_elements (void *cls,
1518                            const struct GNUNET_SET_ElementMessage *emsg)
1519 {
1520   struct Operation *op = cls;
1521   struct ElementEntry *ee;
1522   struct KeyEntry *ke;
1523   uint16_t element_size;
1524
1525   element_size = ntohs (emsg->header.size) - sizeof(struct
1526                                                     GNUNET_SET_ElementMessage);
1527   ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1528   GNUNET_memcpy (&ee[1],
1529                  &emsg[1],
1530                  element_size);
1531   ee->element.size = element_size;
1532   ee->element.data = &ee[1];
1533   ee->element.element_type = ntohs (emsg->element_type);
1534   ee->remote = GNUNET_YES;
1535   GNUNET_SET_element_hash (&ee->element,
1536                            &ee->element_hash);
1537   if (GNUNET_NO ==
1538       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1539                                             &ee->element_hash,
1540                                             NULL))
1541   {
1542     /* We got something we didn't demand, since it's not in our map. */
1543     GNUNET_break_op (0);
1544     fail_union_operation (op);
1545     return;
1546   }
1547
1548   LOG (GNUNET_ERROR_TYPE_DEBUG,
1549        "Got element (size %u, hash %s) from peer\n",
1550        (unsigned int) element_size,
1551        GNUNET_h2s (&ee->element_hash));
1552
1553   GNUNET_STATISTICS_update (_GSS_statistics,
1554                             "# received elements",
1555                             1,
1556                             GNUNET_NO);
1557   GNUNET_STATISTICS_update (_GSS_statistics,
1558                             "# exchanged elements",
1559                             1,
1560                             GNUNET_NO);
1561
1562   op->state->received_total++;
1563
1564   ke = op_get_element (op, &ee->element_hash);
1565   if (NULL != ke)
1566   {
1567     /* Got repeated element.  Should not happen since
1568      * we track demands. */
1569     GNUNET_STATISTICS_update (_GSS_statistics,
1570                               "# repeated elements",
1571                               1,
1572                               GNUNET_NO);
1573     ke->received = GNUNET_YES;
1574     GNUNET_free (ee);
1575   }
1576   else
1577   {
1578     LOG (GNUNET_ERROR_TYPE_DEBUG,
1579          "Registering new element from remote peer\n");
1580     op->state->received_fresh++;
1581     op_register_element (op, ee, GNUNET_YES);
1582     /* only send results immediately if the client wants it */
1583     switch (op->result_mode)
1584     {
1585     case GNUNET_SET_RESULT_ADDED:
1586       send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1587       break;
1588
1589     case GNUNET_SET_RESULT_SYMMETRIC:
1590       send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1591       break;
1592
1593     default:
1594       /* Result mode not supported, should have been caught earlier. */
1595       GNUNET_break (0);
1596       break;
1597     }
1598   }
1599
1600   if ((op->state->received_total > 8) &&
1601       (op->state->received_fresh < op->state->received_total / 3))
1602   {
1603     /* The other peer gave us lots of old elements, there's something wrong. */
1604     GNUNET_break_op (0);
1605     fail_union_operation (op);
1606     return;
1607   }
1608   GNUNET_CADET_receive_done (op->channel);
1609   maybe_finish (op);
1610 }
1611
1612
1613 /**
1614  * Check a full element message from a remote peer.
1615  *
1616  * @param cls the union operation
1617  * @param emsg the message
1618  */
1619 int
1620 check_union_p2p_full_element (void *cls,
1621                               const struct GNUNET_SET_ElementMessage *emsg)
1622 {
1623   struct Operation *op = cls;
1624
1625   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1626   {
1627     GNUNET_break_op (0);
1628     return GNUNET_SYSERR;
1629   }
1630   // FIXME: check that we expect full elements here?
1631   return GNUNET_OK;
1632 }
1633
1634
1635 /**
1636  * Handle an element message from a remote peer.
1637  *
1638  * @param cls the union operation
1639  * @param emsg the message
1640  */
1641 void
1642 handle_union_p2p_full_element (void *cls,
1643                                const struct GNUNET_SET_ElementMessage *emsg)
1644 {
1645   struct Operation *op = cls;
1646   struct ElementEntry *ee;
1647   struct KeyEntry *ke;
1648   uint16_t element_size;
1649
1650   element_size = ntohs (emsg->header.size) - sizeof(struct
1651                                                     GNUNET_SET_ElementMessage);
1652   ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1653   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1654   ee->element.size = element_size;
1655   ee->element.data = &ee[1];
1656   ee->element.element_type = ntohs (emsg->element_type);
1657   ee->remote = GNUNET_YES;
1658   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1659
1660   LOG (GNUNET_ERROR_TYPE_DEBUG,
1661        "Got element (full diff, size %u, hash %s) from peer\n",
1662        (unsigned int) element_size,
1663        GNUNET_h2s (&ee->element_hash));
1664
1665   GNUNET_STATISTICS_update (_GSS_statistics,
1666                             "# received elements",
1667                             1,
1668                             GNUNET_NO);
1669   GNUNET_STATISTICS_update (_GSS_statistics,
1670                             "# exchanged elements",
1671                             1,
1672                             GNUNET_NO);
1673
1674   op->state->received_total++;
1675
1676   ke = op_get_element (op, &ee->element_hash);
1677   if (NULL != ke)
1678   {
1679     /* Got repeated element.  Should not happen since
1680      * we track demands. */
1681     GNUNET_STATISTICS_update (_GSS_statistics,
1682                               "# repeated elements",
1683                               1,
1684                               GNUNET_NO);
1685     ke->received = GNUNET_YES;
1686     GNUNET_free (ee);
1687   }
1688   else
1689   {
1690     LOG (GNUNET_ERROR_TYPE_DEBUG,
1691          "Registering new element from remote peer\n");
1692     op->state->received_fresh++;
1693     op_register_element (op, ee, GNUNET_YES);
1694     /* only send results immediately if the client wants it */
1695     switch (op->result_mode)
1696     {
1697     case GNUNET_SET_RESULT_ADDED:
1698       send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1699       break;
1700
1701     case GNUNET_SET_RESULT_SYMMETRIC:
1702       send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1703       break;
1704
1705     default:
1706       /* Result mode not supported, should have been caught earlier. */
1707       GNUNET_break (0);
1708       break;
1709     }
1710   }
1711
1712   if ((GNUNET_YES == op->byzantine) &&
1713       (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1714       (op->state->received_fresh < op->state->received_total / 6))
1715   {
1716     /* The other peer gave us lots of old elements, there's something wrong. */
1717     LOG (GNUNET_ERROR_TYPE_ERROR,
1718          "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1719          (unsigned long long) op->state->received_fresh,
1720          (unsigned long long) op->state->received_total);
1721     GNUNET_break_op (0);
1722     fail_union_operation (op);
1723     return;
1724   }
1725   GNUNET_CADET_receive_done (op->channel);
1726 }
1727
1728
1729 /**
1730  * Send offers (for GNUNET_Hash-es) in response
1731  * to inquiries (for IBF_Key-s).
1732  *
1733  * @param cls the union operation
1734  * @param msg the message
1735  */
1736 int
1737 check_union_p2p_inquiry (void *cls,
1738                          const struct InquiryMessage *msg)
1739 {
1740   struct Operation *op = cls;
1741   unsigned int num_keys;
1742
1743   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1744   {
1745     GNUNET_break_op (0);
1746     return GNUNET_SYSERR;
1747   }
1748   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1749   {
1750     GNUNET_break_op (0);
1751     return GNUNET_SYSERR;
1752   }
1753   num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1754              / sizeof(struct IBF_Key);
1755   if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1756       != num_keys * sizeof(struct IBF_Key))
1757   {
1758     GNUNET_break_op (0);
1759     return GNUNET_SYSERR;
1760   }
1761   return GNUNET_OK;
1762 }
1763
1764
1765 /**
1766  * Send offers (for GNUNET_Hash-es) in response
1767  * to inquiries (for IBF_Key-s).
1768  *
1769  * @param cls the union operation
1770  * @param msg the message
1771  */
1772 void
1773 handle_union_p2p_inquiry (void *cls,
1774                           const struct InquiryMessage *msg)
1775 {
1776   struct Operation *op = cls;
1777   const struct IBF_Key *ibf_key;
1778   unsigned int num_keys;
1779
1780   LOG (GNUNET_ERROR_TYPE_DEBUG,
1781        "Received union inquiry\n");
1782   num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1783              / sizeof(struct IBF_Key);
1784   ibf_key = (const struct IBF_Key *) &msg[1];
1785   while (0 != num_keys--)
1786   {
1787     struct IBF_Key unsalted_key;
1788
1789     unsalt_key (ibf_key,
1790                 ntohl (msg->salt),
1791                 &unsalted_key);
1792     send_offers_for_key (op,
1793                          unsalted_key);
1794     ibf_key++;
1795   }
1796   GNUNET_CADET_receive_done (op->channel);
1797 }
1798
1799
1800 /**
1801  * Iterator over hash map entries, called to
1802  * destroy the linked list of colliding ibf key entries.
1803  *
1804  * @param cls closure
1805  * @param key current key code
1806  * @param value value in the hash map
1807  * @return #GNUNET_YES if we should continue to iterate,
1808  *         #GNUNET_NO if not.
1809  */
1810 static int
1811 send_missing_full_elements_iter (void *cls,
1812                                  uint32_t key,
1813                                  void *value)
1814 {
1815   struct Operation *op = cls;
1816   struct KeyEntry *ke = value;
1817   struct GNUNET_MQ_Envelope *ev;
1818   struct GNUNET_SET_ElementMessage *emsg;
1819   struct ElementEntry *ee = ke->element;
1820
1821   if (GNUNET_YES == ke->received)
1822     return GNUNET_YES;
1823   ev = GNUNET_MQ_msg_extra (emsg,
1824                             ee->element.size,
1825                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1826   GNUNET_memcpy (&emsg[1],
1827                  ee->element.data,
1828                  ee->element.size);
1829   emsg->element_type = htons (ee->element.element_type);
1830   GNUNET_MQ_send (op->mq,
1831                   ev);
1832   return GNUNET_YES;
1833 }
1834
1835
1836 /**
1837  * Handle a request for full set transmission.
1838  *
1839  * @parem cls closure, a set union operation
1840  * @param mh the demand message
1841  */
1842 void
1843 handle_union_p2p_request_full (void *cls,
1844                                const struct GNUNET_MessageHeader *mh)
1845 {
1846   struct Operation *op = cls;
1847
1848   LOG (GNUNET_ERROR_TYPE_DEBUG,
1849        "Received request for full set transmission\n");
1850   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1851   {
1852     GNUNET_break_op (0);
1853     fail_union_operation (op);
1854     return;
1855   }
1856   if (PHASE_EXPECT_IBF != op->state->phase)
1857   {
1858     GNUNET_break_op (0);
1859     fail_union_operation (op);
1860     return;
1861   }
1862
1863   // FIXME: we need to check that our set is larger than the
1864   // byzantine_lower_bound by some threshold
1865   send_full_set (op);
1866   GNUNET_CADET_receive_done (op->channel);
1867 }
1868
1869
1870 /**
1871  * Handle a "full done" message.
1872  *
1873  * @parem cls closure, a set union operation
1874  * @param mh the demand message
1875  */
1876 void
1877 handle_union_p2p_full_done (void *cls,
1878                             const struct GNUNET_MessageHeader *mh)
1879 {
1880   struct Operation *op = cls;
1881
1882   switch (op->state->phase)
1883   {
1884   case PHASE_EXPECT_IBF:
1885     {
1886       struct GNUNET_MQ_Envelope *ev;
1887
1888       LOG (GNUNET_ERROR_TYPE_DEBUG,
1889            "got FULL DONE, sending elements that other peer is missing\n");
1890
1891       /* send all the elements that did not come from the remote peer */
1892       GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1893                                                &send_missing_full_elements_iter,
1894                                                op);
1895
1896       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1897       GNUNET_MQ_send (op->mq,
1898                       ev);
1899       op->state->phase = PHASE_DONE;
1900       /* we now wait until the other peer sends us the OVER message*/
1901     }
1902     break;
1903
1904   case PHASE_FULL_SENDING:
1905     {
1906       LOG (GNUNET_ERROR_TYPE_DEBUG,
1907            "got FULL DONE, finishing\n");
1908       /* We sent the full set, and got the response for that.  We're done. */
1909       op->state->phase = PHASE_DONE;
1910       GNUNET_CADET_receive_done (op->channel);
1911       send_client_done (op);
1912       _GSS_operation_destroy2 (op);
1913       return;
1914     }
1915     break;
1916
1917   default:
1918     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919                 "Handle full done phase is %u\n",
1920                 (unsigned) op->state->phase);
1921     GNUNET_break_op (0);
1922     fail_union_operation (op);
1923     return;
1924   }
1925   GNUNET_CADET_receive_done (op->channel);
1926 }
1927
1928
1929 /**
1930  * Check a demand by the other peer for elements based on a list
1931  * of `struct GNUNET_HashCode`s.
1932  *
1933  * @parem cls closure, a set union operation
1934  * @param mh the demand message
1935  * @return #GNUNET_OK if @a mh is well-formed
1936  */
1937 int
1938 check_union_p2p_demand (void *cls,
1939                         const struct GNUNET_MessageHeader *mh)
1940 {
1941   struct Operation *op = cls;
1942   unsigned int num_hashes;
1943
1944   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1945   {
1946     GNUNET_break_op (0);
1947     return GNUNET_SYSERR;
1948   }
1949   num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1950                / sizeof(struct GNUNET_HashCode);
1951   if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1952       != num_hashes * sizeof(struct GNUNET_HashCode))
1953   {
1954     GNUNET_break_op (0);
1955     return GNUNET_SYSERR;
1956   }
1957   return GNUNET_OK;
1958 }
1959
1960
1961 /**
1962  * Handle a demand by the other peer for elements based on a list
1963  * of `struct GNUNET_HashCode`s.
1964  *
1965  * @parem cls closure, a set union operation
1966  * @param mh the demand message
1967  */
1968 void
1969 handle_union_p2p_demand (void *cls,
1970                          const struct GNUNET_MessageHeader *mh)
1971 {
1972   struct Operation *op = cls;
1973   struct ElementEntry *ee;
1974   struct GNUNET_SET_ElementMessage *emsg;
1975   const struct GNUNET_HashCode *hash;
1976   unsigned int num_hashes;
1977   struct GNUNET_MQ_Envelope *ev;
1978
1979   num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1980                / sizeof(struct GNUNET_HashCode);
1981   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1982        num_hashes > 0;
1983        hash++, num_hashes--)
1984   {
1985     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1986                                             hash);
1987     if (NULL == ee)
1988     {
1989       /* Demand for non-existing element. */
1990       GNUNET_break_op (0);
1991       fail_union_operation (op);
1992       return;
1993     }
1994     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1995     {
1996       /* Probably confused lazily copied sets. */
1997       GNUNET_break_op (0);
1998       fail_union_operation (op);
1999       return;
2000     }
2001     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size,
2002                               GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
2003     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
2004     emsg->reserved = htons (0);
2005     emsg->element_type = htons (ee->element.element_type);
2006     LOG (GNUNET_ERROR_TYPE_DEBUG,
2007          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
2008          (void *) op,
2009          (unsigned int) ee->element.size,
2010          GNUNET_h2s (&ee->element_hash));
2011     GNUNET_MQ_send (op->mq, ev);
2012     GNUNET_STATISTICS_update (_GSS_statistics,
2013                               "# exchanged elements",
2014                               1,
2015                               GNUNET_NO);
2016
2017     switch (op->result_mode)
2018     {
2019     case GNUNET_SET_RESULT_ADDED:
2020       /* Nothing to do. */
2021       break;
2022
2023     case GNUNET_SET_RESULT_SYMMETRIC:
2024       send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2025       break;
2026
2027     default:
2028       /* Result mode not supported, should have been caught earlier. */
2029       GNUNET_break (0);
2030       break;
2031     }
2032   }
2033   GNUNET_CADET_receive_done (op->channel);
2034 }
2035
2036
2037 /**
2038  * Check offer (of `struct GNUNET_HashCode`s).
2039  *
2040  * @param cls the union operation
2041  * @param mh the message
2042  * @return #GNUNET_OK if @a mh is well-formed
2043  */
2044 int
2045 check_union_p2p_offer (void *cls,
2046                        const struct GNUNET_MessageHeader *mh)
2047 {
2048   struct Operation *op = cls;
2049   unsigned int num_hashes;
2050
2051   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2052   {
2053     GNUNET_break_op (0);
2054     return GNUNET_SYSERR;
2055   }
2056   /* look up elements and send them */
2057   if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2058       (op->state->phase != PHASE_INVENTORY_ACTIVE))
2059   {
2060     GNUNET_break_op (0);
2061     return GNUNET_SYSERR;
2062   }
2063   num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2064                / sizeof(struct GNUNET_HashCode);
2065   if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2066       num_hashes * sizeof(struct GNUNET_HashCode))
2067   {
2068     GNUNET_break_op (0);
2069     return GNUNET_SYSERR;
2070   }
2071   return GNUNET_OK;
2072 }
2073
2074
2075 /**
2076  * Handle offers (of `struct GNUNET_HashCode`s) and
2077  * respond with demands (of `struct GNUNET_HashCode`s).
2078  *
2079  * @param cls the union operation
2080  * @param mh the message
2081  */
2082 void
2083 handle_union_p2p_offer (void *cls,
2084                         const struct GNUNET_MessageHeader *mh)
2085 {
2086   struct Operation *op = cls;
2087   const struct GNUNET_HashCode *hash;
2088   unsigned int num_hashes;
2089
2090   num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2091                / sizeof(struct GNUNET_HashCode);
2092   for (hash = (const struct GNUNET_HashCode *) &mh[1];
2093        num_hashes > 0;
2094        hash++, num_hashes--)
2095   {
2096     struct ElementEntry *ee;
2097     struct GNUNET_MessageHeader *demands;
2098     struct GNUNET_MQ_Envelope *ev;
2099
2100     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2101                                             hash);
2102     if (NULL != ee)
2103       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2104         continue;
2105
2106     if (GNUNET_YES ==
2107         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2108                                                 hash))
2109     {
2110       LOG (GNUNET_ERROR_TYPE_DEBUG,
2111            "Skipped sending duplicate demand\n");
2112       continue;
2113     }
2114
2115     GNUNET_assert (GNUNET_OK ==
2116                    GNUNET_CONTAINER_multihashmap_put (
2117                      op->state->demanded_hashes,
2118                      hash,
2119                      NULL,
2120                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2121
2122     LOG (GNUNET_ERROR_TYPE_DEBUG,
2123          "[OP %x] Requesting element (hash %s)\n",
2124          (void *) op, GNUNET_h2s (hash));
2125     ev = GNUNET_MQ_msg_header_extra (demands,
2126                                      sizeof(struct GNUNET_HashCode),
2127                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2128     GNUNET_memcpy (&demands[1],
2129                    hash,
2130                    sizeof(struct GNUNET_HashCode));
2131     GNUNET_MQ_send (op->mq, ev);
2132   }
2133   GNUNET_CADET_receive_done (op->channel);
2134 }
2135
2136
2137 /**
2138  * Handle a done message from a remote peer
2139  *
2140  * @param cls the union operation
2141  * @param mh the message
2142  */
2143 void
2144 handle_union_p2p_done (void *cls,
2145                        const struct GNUNET_MessageHeader *mh)
2146 {
2147   struct Operation *op = cls;
2148
2149   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2150   {
2151     GNUNET_break_op (0);
2152     fail_union_operation (op);
2153     return;
2154   }
2155   switch (op->state->phase)
2156   {
2157   case PHASE_INVENTORY_PASSIVE:
2158     /* We got all requests, but still have to send our elements in response. */
2159     op->state->phase = PHASE_FINISH_WAITING;
2160
2161     LOG (GNUNET_ERROR_TYPE_DEBUG,
2162          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2163     /* The active peer is done sending offers
2164      * and inquiries.  This means that all
2165      * our responses to that (demands and offers)
2166      * must be in flight (queued or in mesh).
2167      *
2168      * We should notify the active peer once
2169      * all our demands are satisfied, so that the active
2170      * peer can quit if we gave it everything.
2171      */GNUNET_CADET_receive_done (op->channel);
2172     maybe_finish (op);
2173     return;
2174
2175   case PHASE_INVENTORY_ACTIVE:
2176     LOG (GNUNET_ERROR_TYPE_DEBUG,
2177          "got DONE (as active partner), waiting to finish\n");
2178     /* All demands of the other peer are satisfied,
2179      * and we processed all offers, thus we know
2180      * exactly what our demands must be.
2181      *
2182      * We'll close the channel
2183      * to the other peer once our demands are met.
2184      */op->state->phase = PHASE_FINISH_CLOSING;
2185     GNUNET_CADET_receive_done (op->channel);
2186     maybe_finish (op);
2187     return;
2188
2189   default:
2190     GNUNET_break_op (0);
2191     fail_union_operation (op);
2192     return;
2193   }
2194 }
2195
2196
2197 /**
2198  * Handle a over message from a remote peer
2199  *
2200  * @param cls the union operation
2201  * @param mh the message
2202  */
2203 void
2204 handle_union_p2p_over (void *cls,
2205                        const struct GNUNET_MessageHeader *mh)
2206 {
2207   send_client_done (cls);
2208 }
2209
2210
2211 /**
2212  * Initiate operation to evaluate a set union with a remote peer.
2213  *
2214  * @param op operation to perform (to be initialized)
2215  * @param opaque_context message to be transmitted to the listener
2216  *        to convince it to accept, may be NULL
2217  */
2218 static struct OperationState *
2219 union_evaluate (struct Operation *op,
2220                 const struct GNUNET_MessageHeader *opaque_context)
2221 {
2222   struct OperationState *state;
2223   struct GNUNET_MQ_Envelope *ev;
2224   struct OperationRequestMessage *msg;
2225
2226   ev = GNUNET_MQ_msg_nested_mh (msg,
2227                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2228                                 opaque_context);
2229   if (NULL == ev)
2230   {
2231     /* the context message is too large */
2232     GNUNET_break (0);
2233     return NULL;
2234   }
2235   state = GNUNET_new (struct OperationState);
2236   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2237                                                                  GNUNET_NO);
2238   /* copy the current generation's strata estimator for this operation */
2239   state->se = strata_estimator_dup (op->set->state->se);
2240   /* we started the operation, thus we have to send the operation request */
2241   state->phase = PHASE_EXPECT_SE;
2242   state->salt_receive = state->salt_send = 42; // FIXME?????
2243   LOG (GNUNET_ERROR_TYPE_DEBUG,
2244        "Initiating union operation evaluation\n");
2245   GNUNET_STATISTICS_update (_GSS_statistics,
2246                             "# of total union operations",
2247                             1,
2248                             GNUNET_NO);
2249   GNUNET_STATISTICS_update (_GSS_statistics,
2250                             "# of initiated union operations",
2251                             1,
2252                             GNUNET_NO);
2253   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2254   GNUNET_MQ_send (op->mq,
2255                   ev);
2256
2257   if (NULL != opaque_context)
2258     LOG (GNUNET_ERROR_TYPE_DEBUG,
2259          "sent op request with context message\n");
2260   else
2261     LOG (GNUNET_ERROR_TYPE_DEBUG,
2262          "sent op request without context message\n");
2263
2264   op->state = state;
2265   initialize_key_to_element (op);
2266   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
2267     state->key_to_element);
2268   return state;
2269 }
2270
2271
2272 /**
2273  * Accept an union operation request from a remote peer.
2274  * Only initializes the private operation state.
2275  *
2276  * @param op operation that will be accepted as a union operation
2277  */
2278 static struct OperationState *
2279 union_accept (struct Operation *op)
2280 {
2281   struct OperationState *state;
2282   const struct StrataEstimator *se;
2283   struct GNUNET_MQ_Envelope *ev;
2284   struct StrataEstimatorMessage *strata_msg;
2285   char *buf;
2286   size_t len;
2287   uint16_t type;
2288
2289   LOG (GNUNET_ERROR_TYPE_DEBUG,
2290        "accepting set union operation\n");
2291   GNUNET_STATISTICS_update (_GSS_statistics,
2292                             "# of accepted union operations",
2293                             1,
2294                             GNUNET_NO);
2295   GNUNET_STATISTICS_update (_GSS_statistics,
2296                             "# of total union operations",
2297                             1,
2298                             GNUNET_NO);
2299
2300   state = GNUNET_new (struct OperationState);
2301   state->se = strata_estimator_dup (op->set->state->se);
2302   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2303                                                                  GNUNET_NO);
2304   state->salt_receive = state->salt_send = 42; // FIXME?????
2305   op->state = state;
2306   initialize_key_to_element (op);
2307   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
2308     state->key_to_element);
2309
2310   /* kick off the operation */
2311   se = state->se;
2312   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2313   len = strata_estimator_write (se,
2314                                 buf);
2315   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2316     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2317   else
2318     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2319   ev = GNUNET_MQ_msg_extra (strata_msg,
2320                             len,
2321                             type);
2322   GNUNET_memcpy (&strata_msg[1],
2323                  buf,
2324                  len);
2325   GNUNET_free (buf);
2326   strata_msg->set_size
2327     = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
2328                        op->set->content->elements));
2329   GNUNET_MQ_send (op->mq,
2330                   ev);
2331   state->phase = PHASE_EXPECT_IBF;
2332   return state;
2333 }
2334
2335
2336 /**
2337  * Create a new set supporting the union operation
2338  *
2339  * We maintain one strata estimator per set and then manipulate it over the
2340  * lifetime of the set, as recreating a strata estimator would be expensive.
2341  *
2342  * @return the newly created set, NULL on error
2343  */
2344 static struct SetState *
2345 union_set_create (void)
2346 {
2347   struct SetState *set_state;
2348
2349   LOG (GNUNET_ERROR_TYPE_DEBUG,
2350        "union set created\n");
2351   set_state = GNUNET_new (struct SetState);
2352   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2353                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2354   if (NULL == set_state->se)
2355   {
2356     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2357                 "Failed to allocate strata estimator\n");
2358     GNUNET_free (set_state);
2359     return NULL;
2360   }
2361   return set_state;
2362 }
2363
2364
2365 /**
2366  * Add the element from the given element message to the set.
2367  *
2368  * @param set_state state of the set want to add to
2369  * @param ee the element to add to the set
2370  */
2371 static void
2372 union_add (struct SetState *set_state,
2373            struct ElementEntry *ee)
2374 {
2375   strata_estimator_insert (set_state->se,
2376                            get_ibf_key (&ee->element_hash));
2377 }
2378
2379
2380 /**
2381  * Remove the element given in the element message from the set.
2382  * Only marks the element as removed, so that older set operations can still exchange it.
2383  *
2384  * @param set_state state of the set to remove from
2385  * @param ee set element to remove
2386  */
2387 static void
2388 union_remove (struct SetState *set_state,
2389               struct ElementEntry *ee)
2390 {
2391   strata_estimator_remove (set_state->se,
2392                            get_ibf_key (&ee->element_hash));
2393 }
2394
2395
2396 /**
2397  * Destroy a set that supports the union operation.
2398  *
2399  * @param set_state the set to destroy
2400  */
2401 static void
2402 union_set_destroy (struct SetState *set_state)
2403 {
2404   if (NULL != set_state->se)
2405   {
2406     strata_estimator_destroy (set_state->se);
2407     set_state->se = NULL;
2408   }
2409   GNUNET_free (set_state);
2410 }
2411
2412
2413 /**
2414  * Copy union-specific set state.
2415  *
2416  * @param state source state for copying the union state
2417  * @return a copy of the union-specific set state
2418  */
2419 static struct SetState *
2420 union_copy_state (struct SetState *state)
2421 {
2422   struct SetState *new_state;
2423
2424   GNUNET_assert ((NULL != state) &&
2425                  (NULL != state->se));
2426   new_state = GNUNET_new (struct SetState);
2427   new_state->se = strata_estimator_dup (state->se);
2428
2429   return new_state;
2430 }
2431
2432
2433 /**
2434  * Handle case where channel went down for an operation.
2435  *
2436  * @param op operation that lost the channel
2437  */
2438 static void
2439 union_channel_death (struct Operation *op)
2440 {
2441   send_client_done (op);
2442   _GSS_operation_destroy (op,
2443                           GNUNET_YES);
2444 }
2445
2446
2447 /**
2448  * Get the table with implementing functions for
2449  * set union.
2450  *
2451  * @return the operation specific VTable
2452  */
2453 const struct SetVT *
2454 _GSS_union_vt ()
2455 {
2456   static const struct SetVT union_vt = {
2457     .create = &union_set_create,
2458     .add = &union_add,
2459     .remove = &union_remove,
2460     .destroy_set = &union_set_destroy,
2461     .evaluate = &union_evaluate,
2462     .accept = &union_accept,
2463     .cancel = &union_op_cancel,
2464     .copy_state = &union_copy_state,
2465     .channel_death = &union_channel_death
2466   };
2467
2468   return &union_vt;
2469 }