use fprintf, change output style
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file set/gnunet-service-set_union.c
20  * @brief two-peer set operations
21  * @author Florian Dold
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_statistics_service.h"
27 #include "gnunet-service-set.h"
28 #include "ibf.h"
29 #include "gnunet-service-set_union.h"
30 #include "gnunet-service-set_union_strata_estimator.h"
31 #include "gnunet-service-set_protocol.h"
32 #include <gcrypt.h>
33
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
36
37
38 /**
39  * Number of IBFs in a strata estimator.
40  */
41 #define SE_STRATA_COUNT 32
42
43 /**
44  * Size of the IBFs in the strata estimator.
45  */
46 #define SE_IBF_SIZE 80
47
48 /**
49  * The hash num parameter for the difference digests and strata estimators.
50  */
51 #define SE_IBF_HASH_NUM 4
52
53 /**
54  * Number of buckets that can be transmitted in one message.
55  */
56 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
57
58 /**
59  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60  * Choose this value so that computing the IBF is still cheaper
61  * than transmitting all values.
62  */
63 #define MAX_IBF_ORDER (20)
64
65 /**
66  * Number of buckets used in the ibf per estimated
67  * difference.
68  */
69 #define IBF_ALPHA 4
70
71
72 /**
73  * Current phase we are in for a union operation.
74  */
75 enum UnionOperationPhase
76 {
77   /**
78    * We sent the request message, and expect a strata estimator.
79    */
80   PHASE_EXPECT_SE,
81
82   /**
83    * We sent the strata estimator, and expect an IBF. This phase is entered once
84    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
85    *
86    * XXX: could use better wording.
87    * XXX: repurposed to also expect a "request full set" message, should be renamed
88    *
89    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
90    */
91   PHASE_EXPECT_IBF,
92
93   /**
94    * Continuation for multi part IBFs.
95    */
96   PHASE_EXPECT_IBF_CONT,
97
98   /**
99    * We are decoding an IBF.
100    */
101   PHASE_INVENTORY_ACTIVE,
102
103   /**
104    * The other peer is decoding the IBF we just sent.
105    */
106   PHASE_INVENTORY_PASSIVE,
107
108   /**
109    * The protocol is almost finished, but we still have to flush our message
110    * queue and/or expect some elements.
111    */
112   PHASE_FINISH_CLOSING,
113
114   /**
115    * In the penultimate phase,
116    * we wait until all our demands
117    * are satisfied.  Then we send a done
118    * message, and wait for another done message.
119    */
120   PHASE_FINISH_WAITING,
121
122   /**
123    * In the ultimate phase, we wait until
124    * our demands are satisfied and then
125    * quit (sending another DONE message).
126    */
127   PHASE_DONE,
128
129   /**
130    * After sending the full set, wait for responses with the elements
131    * that the local peer is missing.
132    */
133   PHASE_FULL_SENDING,
134 };
135
136
137 /**
138  * State of an evaluate operation with another peer.
139  */
140 struct OperationState
141 {
142   /**
143    * Copy of the set's strata estimator at the time of
144    * creation of this operation.
145    */
146   struct StrataEstimator *se;
147
148   /**
149    * The IBF we currently receive.
150    */
151   struct InvertibleBloomFilter *remote_ibf;
152
153   /**
154    * The IBF with the local set's element.
155    */
156   struct InvertibleBloomFilter *local_ibf;
157
158   /**
159    * Maps unsalted IBF-Keys to elements.
160    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
161    * Colliding IBF-Keys are linked.
162    */
163   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
164
165   /**
166    * Current state of the operation.
167    */
168   enum UnionOperationPhase phase;
169
170   /**
171    * Did we send the client that we are done?
172    */
173   int client_done_sent;
174
175   /**
176    * Number of ibf buckets already received into the @a remote_ibf.
177    */
178   unsigned int ibf_buckets_received;
179
180   /**
181    * Hashes for elements that we have demanded from the other peer.
182    */
183   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
184
185   /**
186    * Salt that we're using for sending IBFs
187    */
188   uint32_t salt_send;
189
190   /**
191    * Salt for the IBF we've received and that we're currently decoding.
192    */
193   uint32_t salt_receive;
194
195   /**
196    * Number of elements we received from the other peer
197    * that were not in the local set yet.
198    */
199   uint32_t received_fresh;
200
201   /**
202    * Total number of elements received from the other peer.
203    */
204   uint32_t received_total;
205
206   /**
207    * Initial size of our set, just before
208    * the operation started.
209    */
210   uint64_t initial_size;
211 };
212
213
214 /**
215  * The key entry is used to associate an ibf key with an element.
216  */
217 struct KeyEntry
218 {
219   /**
220    * IBF key for the entry, derived from the current salt.
221    */
222   struct IBF_Key ibf_key;
223
224   /**
225    * The actual element associated with the key.
226    *
227    * Only owned by the union operation if element->operation
228    * is #GNUNET_YES.
229    */
230   struct ElementEntry *element;
231
232   /**
233    * Did we receive this element?
234    * Even if element->is_foreign is false, we might
235    * have received the element, so this indicates that
236    * the other peer has it.
237    */
238   int received;
239 };
240
241
242 /**
243  * Used as a closure for sending elements
244  * with a specific IBF key.
245  */
246 struct SendElementClosure
247 {
248   /**
249    * The IBF key whose matching elements should be
250    * sent.
251    */
252   struct IBF_Key ibf_key;
253
254   /**
255    * Operation for which the elements
256    * should be sent.
257    */
258   struct Operation *op;
259 };
260
261
262 /**
263  * Extra state required for efficient set union.
264  */
265 struct SetState
266 {
267   /**
268    * The strata estimator is only generated once for
269    * each set.
270    * The IBF keys are derived from the element hashes with
271    * salt=0.
272    */
273   struct StrataEstimator *se;
274 };
275
276
277 /**
278  * Iterator over hash map entries, called to
279  * destroy the linked list of colliding ibf key entries.
280  *
281  * @param cls closure
282  * @param key current key code
283  * @param value value in the hash map
284  * @return #GNUNET_YES if we should continue to iterate,
285  *         #GNUNET_NO if not.
286  */
287 static int
288 destroy_key_to_element_iter (void *cls,
289                              uint32_t key,
290                              void *value)
291 {
292   struct KeyEntry *k = value;
293
294   GNUNET_assert (NULL != k);
295   if (GNUNET_YES == k->element->remote)
296   {
297     GNUNET_free (k->element);
298     k->element = NULL;
299   }
300   GNUNET_free (k);
301   return GNUNET_YES;
302 }
303
304
305 /**
306  * Destroy the union operation.  Only things specific to the union
307  * operation are destroyed.
308  *
309  * @param op union operation to destroy
310  */
311 static void
312 union_op_cancel (struct Operation *op)
313 {
314   LOG (GNUNET_ERROR_TYPE_DEBUG,
315        "destroying union op\n");
316   /* check if the op was canceled twice */
317   GNUNET_assert (NULL != op->state);
318   if (NULL != op->state->remote_ibf)
319   {
320     ibf_destroy (op->state->remote_ibf);
321     op->state->remote_ibf = NULL;
322   }
323   if (NULL != op->state->demanded_hashes)
324   {
325     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
326     op->state->demanded_hashes = NULL;
327   }
328   if (NULL != op->state->local_ibf)
329   {
330     ibf_destroy (op->state->local_ibf);
331     op->state->local_ibf = NULL;
332   }
333   if (NULL != op->state->se)
334   {
335     strata_estimator_destroy (op->state->se);
336     op->state->se = NULL;
337   }
338   if (NULL != op->state->key_to_element)
339   {
340     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
341                                              &destroy_key_to_element_iter,
342                                              NULL);
343     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
344     op->state->key_to_element = NULL;
345   }
346   GNUNET_free (op->state);
347   op->state = NULL;
348   LOG (GNUNET_ERROR_TYPE_DEBUG,
349        "destroying union op done\n");
350 }
351
352
353 /**
354  * Inform the client that the union operation has failed,
355  * and proceed to destroy the evaluate operation.
356  *
357  * @param op the union operation to fail
358  */
359 static void
360 fail_union_operation (struct Operation *op)
361 {
362   struct GNUNET_MQ_Envelope *ev;
363   struct GNUNET_SET_ResultMessage *msg;
364
365   LOG (GNUNET_ERROR_TYPE_WARNING,
366        "union operation failed\n");
367   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
368   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
369   msg->request_id = htonl (op->client_request_id);
370   msg->element_type = htons (0);
371   GNUNET_MQ_send (op->set->cs->mq,
372                   ev);
373   _GSS_operation_destroy (op, GNUNET_YES);
374 }
375
376
377 /**
378  * Derive the IBF key from a hash code and
379  * a salt.
380  *
381  * @param src the hash code
382  * @return the derived IBF key
383  */
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
386 {
387   struct IBF_Key key;
388   uint16_t salt = 0;
389
390   GNUNET_assert (GNUNET_OK ==
391                  GNUNET_CRYPTO_kdf (&key, sizeof (key),
392                                     src, sizeof *src,
393                                     &salt, sizeof (salt),
394                                     NULL, 0));
395   return key;
396 }
397
398
399 /**
400  * Context for #op_get_element_iterator
401  */
402 struct GetElementContext
403 {
404   /**
405    * FIXME.
406    */
407   struct GNUNET_HashCode hash;
408
409   /**
410    * FIXME.
411    */
412   struct KeyEntry *k;
413 };
414
415
416 /**
417  * Iterator over the mapping from IBF keys to element entries.  Checks if we
418  * have an element with a given GNUNET_HashCode.
419  *
420  * @param cls closure
421  * @param key current key code
422  * @param value value in the hash map
423  * @return #GNUNET_YES if we should search further,
424  *         #GNUNET_NO if we've found the element.
425  */
426 static int
427 op_get_element_iterator (void *cls,
428                          uint32_t key,
429                          void *value)
430 {
431   struct GetElementContext *ctx = cls;
432   struct KeyEntry *k = value;
433
434   GNUNET_assert (NULL != k);
435   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
436                                    &ctx->hash))
437   {
438     ctx->k = k;
439     return GNUNET_NO;
440   }
441   return GNUNET_YES;
442 }
443
444
445 /**
446  * Determine whether the given element is already in the operation's element
447  * set.
448  *
449  * @param op operation that should be tested for 'element_hash'
450  * @param element_hash hash of the element to look for
451  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
452  */
453 static struct KeyEntry *
454 op_get_element (struct Operation *op,
455                 const struct GNUNET_HashCode *element_hash)
456 {
457   int ret;
458   struct IBF_Key ibf_key;
459   struct GetElementContext ctx = {{{ 0 }} , 0};
460
461   ctx.hash = *element_hash;
462
463   ibf_key = get_ibf_key (element_hash);
464   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
465                                                       (uint32_t) ibf_key.key_val,
466                                                       op_get_element_iterator,
467                                                       &ctx);
468
469   /* was the iteration aborted because we found the element? */
470   if (GNUNET_SYSERR == ret)
471   {
472     GNUNET_assert (NULL != ctx.k);
473     return ctx.k;
474   }
475   return NULL;
476 }
477
478
479 /**
480  * Insert an element into the union operation's
481  * key-to-element mapping. Takes ownership of 'ee'.
482  * Note that this does not insert the element in the set,
483  * only in the operation's key-element mapping.
484  * This is done to speed up re-tried operations, if some elements
485  * were transmitted, and then the IBF fails to decode.
486  *
487  * XXX: clarify ownership, doesn't sound right.
488  *
489  * @param op the union operation
490  * @param ee the element entry
491  * @parem received was this element received from the remote peer?
492  */
493 static void
494 op_register_element (struct Operation *op,
495                      struct ElementEntry *ee,
496                      int received)
497 {
498   struct IBF_Key ibf_key;
499   struct KeyEntry *k;
500
501   ibf_key = get_ibf_key (&ee->element_hash);
502   k = GNUNET_new (struct KeyEntry);
503   k->element = ee;
504   k->ibf_key = ibf_key;
505   k->received = received;
506   GNUNET_assert (GNUNET_OK ==
507                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
508                                                       (uint32_t) ibf_key.key_val,
509                                                       k,
510                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
511 }
512
513
514 /**
515  * FIXME.
516  */
517 static void
518 salt_key (const struct IBF_Key *k_in,
519           uint32_t salt,
520           struct IBF_Key *k_out)
521 {
522   int s = salt % 64;
523   uint64_t x = k_in->key_val;
524   /* rotate ibf key */
525   x = (x >> s) | (x << (64 - s));
526   k_out->key_val = x;
527 }
528
529
530 /**
531  * FIXME.
532  */
533 static void
534 unsalt_key (const struct IBF_Key *k_in,
535             uint32_t salt,
536             struct IBF_Key *k_out)
537 {
538   int s = salt % 64;
539   uint64_t x = k_in->key_val;
540   x = (x << s) | (x >> (64 - s));
541   k_out->key_val = x;
542 }
543
544
545 /**
546  * Insert a key into an ibf.
547  *
548  * @param cls the ibf
549  * @param key unused
550  * @param value the key entry to get the key from
551  */
552 static int
553 prepare_ibf_iterator (void *cls,
554                       uint32_t key,
555                       void *value)
556 {
557   struct Operation *op = cls;
558   struct KeyEntry *ke = value;
559   struct IBF_Key salted_key;
560
561   LOG (GNUNET_ERROR_TYPE_DEBUG,
562        "[OP %x] inserting %lx (hash %s) into ibf\n",
563        (void *) op,
564        (unsigned long) ke->ibf_key.key_val,
565        GNUNET_h2s (&ke->element->element_hash));
566   salt_key (&ke->ibf_key,
567             op->state->salt_send,
568             &salted_key);
569   ibf_insert (op->state->local_ibf, salted_key);
570   return GNUNET_YES;
571 }
572
573
574 /**
575  * Iterator for initializing the
576  * key-to-element mapping of a union operation
577  *
578  * @param cls the union operation `struct Operation *`
579  * @param key unused
580  * @param value the `struct ElementEntry *` to insert
581  *        into the key-to-element mapping
582  * @return #GNUNET_YES (to continue iterating)
583  */
584 static int
585 init_key_to_element_iterator (void *cls,
586                               const struct GNUNET_HashCode *key,
587                               void *value)
588 {
589   struct Operation *op = cls;
590   struct ElementEntry *ee = value;
591
592   /* make sure that the element belongs to the set at the time
593    * of creating the operation */
594   if (GNUNET_NO ==
595       _GSS_is_element_of_operation (ee,
596                                     op))
597     return GNUNET_YES;
598   GNUNET_assert (GNUNET_NO == ee->remote);
599   op_register_element (op,
600                        ee,
601                        GNUNET_NO);
602   return GNUNET_YES;
603 }
604
605
606 /**
607  * Initialize the IBF key to element mapping local to this set
608  * operation.
609  *
610  * @param op the set union operation
611  */
612 static void
613 initialize_key_to_element (struct Operation *op)
614 {
615   unsigned int len;
616
617   GNUNET_assert (NULL == op->state->key_to_element);
618   len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
619   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
620   GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
621                                          &init_key_to_element_iterator,
622                                          op);
623 }
624
625
626 /**
627  * Create an ibf with the operation's elements
628  * of the specified size
629  *
630  * @param op the union operation
631  * @param size size of the ibf to create
632  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
633  */
634 static int
635 prepare_ibf (struct Operation *op,
636              uint32_t size)
637 {
638   GNUNET_assert (NULL != op->state->key_to_element);
639
640   if (NULL != op->state->local_ibf)
641     ibf_destroy (op->state->local_ibf);
642   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
643   if (NULL == op->state->local_ibf)
644   {
645     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
646                 "Failed to allocate local IBF\n");
647     return GNUNET_SYSERR;
648   }
649   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
650                                            &prepare_ibf_iterator,
651                                            op);
652   return GNUNET_OK;
653 }
654
655
656 /**
657  * Send an ibf of appropriate size.
658  *
659  * Fragments the IBF into multiple messages if necessary.
660  *
661  * @param op the union operation
662  * @param ibf_order order of the ibf to send, size=2^order
663  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
664  */
665 static int
666 send_ibf (struct Operation *op,
667           uint16_t ibf_order)
668 {
669   unsigned int buckets_sent = 0;
670   struct InvertibleBloomFilter *ibf;
671
672   if (GNUNET_OK !=
673       prepare_ibf (op, 1<<ibf_order))
674   {
675     /* allocation failed */
676     return GNUNET_SYSERR;
677   }
678
679   LOG (GNUNET_ERROR_TYPE_DEBUG,
680        "sending ibf of size %u\n",
681        1<<ibf_order);
682
683   {
684     char name[64] = { 0 };
685     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
686     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
687   }
688
689   ibf = op->state->local_ibf;
690
691   while (buckets_sent < (1 << ibf_order))
692   {
693     unsigned int buckets_in_message;
694     struct GNUNET_MQ_Envelope *ev;
695     struct IBFMessage *msg;
696
697     buckets_in_message = (1 << ibf_order) - buckets_sent;
698     /* limit to maximum */
699     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
700       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
701
702     ev = GNUNET_MQ_msg_extra (msg,
703                               buckets_in_message * IBF_BUCKET_SIZE,
704                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
705     msg->reserved1 = 0;
706     msg->reserved2 = 0;
707     msg->order = ibf_order;
708     msg->offset = htonl (buckets_sent);
709     msg->salt = htonl (op->state->salt_send);
710     ibf_write_slice (ibf, buckets_sent,
711                      buckets_in_message, &msg[1]);
712     buckets_sent += buckets_in_message;
713     LOG (GNUNET_ERROR_TYPE_DEBUG,
714          "ibf chunk size %u, %u/%u sent\n",
715          buckets_in_message,
716          buckets_sent,
717          1<<ibf_order);
718     GNUNET_MQ_send (op->mq, ev);
719   }
720
721   /* The other peer must decode the IBF, so
722    * we're passive. */
723   op->state->phase = PHASE_INVENTORY_PASSIVE;
724   return GNUNET_OK;
725 }
726
727
728 /**
729  * Compute the necessary order of an ibf
730  * from the size of the symmetric set difference.
731  *
732  * @param diff the difference
733  * @return the required size of the ibf
734  */
735 static unsigned int
736 get_order_from_difference (unsigned int diff)
737 {
738   unsigned int ibf_order;
739
740   ibf_order = 2;
741   while ( ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
742             ((1<<ibf_order) < SE_IBF_HASH_NUM) ) &&
743           (ibf_order < MAX_IBF_ORDER) )
744     ibf_order++;
745   // add one for correction
746   return ibf_order + 1;
747 }
748
749
750 /**
751  * Send a set element.
752  *
753  * @param cls the union operation `struct Operation *`
754  * @param key unused
755  * @param value the `struct ElementEntry *` to insert
756  *        into the key-to-element mapping
757  * @return #GNUNET_YES (to continue iterating)
758  */
759 static int
760 send_full_element_iterator (void *cls,
761                             const struct GNUNET_HashCode *key,
762                             void *value)
763 {
764   struct Operation *op = cls;
765   struct GNUNET_SET_ElementMessage *emsg;
766   struct ElementEntry *ee = value;
767   struct GNUNET_SET_Element *el = &ee->element;
768   struct GNUNET_MQ_Envelope *ev;
769
770   LOG (GNUNET_ERROR_TYPE_DEBUG,
771        "Sending element %s\n",
772        GNUNET_h2s (key));
773   ev = GNUNET_MQ_msg_extra (emsg,
774                             el->size,
775                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
776   emsg->element_type = htons (el->element_type);
777   GNUNET_memcpy (&emsg[1],
778                  el->data,
779                  el->size);
780   GNUNET_MQ_send (op->mq,
781                   ev);
782   return GNUNET_YES;
783 }
784
785
786 /**
787  * Switch to full set transmission for @a op.
788  *
789  * @param op operation to switch to full set transmission.
790  */
791 static void
792 send_full_set (struct Operation *op)
793 {
794   struct GNUNET_MQ_Envelope *ev;
795
796   op->state->phase = PHASE_FULL_SENDING;
797   LOG (GNUNET_ERROR_TYPE_DEBUG,
798        "Dedicing to transmit the full set\n");
799   /* FIXME: use a more memory-friendly way of doing this with an
800      iterator, just as we do in the non-full case! */
801   (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
802                                                 &send_full_element_iterator,
803                                                 op);
804   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
805   GNUNET_MQ_send (op->mq,
806                   ev);
807 }
808
809
810 /**
811  * Handle a strata estimator from a remote peer
812  *
813  * @param cls the union operation
814  * @param msg the message
815  */
816 int
817 check_union_p2p_strata_estimator (void *cls,
818                                   const struct StrataEstimatorMessage *msg)
819 {
820   struct Operation *op = cls;
821   int is_compressed;
822   size_t len;
823
824   if (op->state->phase != PHASE_EXPECT_SE)
825   {
826     GNUNET_break (0);
827     return GNUNET_SYSERR;
828   }
829   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
830   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
831   if ( (GNUNET_NO == is_compressed) &&
832        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
833   {
834     GNUNET_break (0);
835     return GNUNET_SYSERR;
836   }
837   return GNUNET_OK;
838 }
839
840
841 /**
842  * Handle a strata estimator from a remote peer
843  *
844  * @param cls the union operation
845  * @param msg the message
846  */
847 void
848 handle_union_p2p_strata_estimator (void *cls,
849                                    const struct StrataEstimatorMessage *msg)
850 {
851   struct Operation *op = cls;
852   struct StrataEstimator *remote_se;
853   unsigned int diff;
854   uint64_t other_size;
855   size_t len;
856   int is_compressed;
857
858   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
859   GNUNET_STATISTICS_update (_GSS_statistics,
860                             "# bytes of SE received",
861                             ntohs (msg->header.size),
862                             GNUNET_NO);
863   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
864   other_size = GNUNET_ntohll (msg->set_size);
865   remote_se = strata_estimator_create (SE_STRATA_COUNT,
866                                        SE_IBF_SIZE,
867                                        SE_IBF_HASH_NUM);
868   if (NULL == remote_se)
869   {
870     /* insufficient resources, fail */
871     fail_union_operation (op);
872     return;
873   }
874   if (GNUNET_OK !=
875       strata_estimator_read (&msg[1],
876                              len,
877                              is_compressed,
878                              remote_se))
879   {
880     /* decompression failed */
881     strata_estimator_destroy (remote_se);
882     fail_union_operation (op);
883     return;
884   }
885   GNUNET_assert (NULL != op->state->se);
886   diff = strata_estimator_difference (remote_se,
887                                       op->state->se);
888
889   if (diff > 200)
890     diff = diff * 3 / 2;
891
892   strata_estimator_destroy (remote_se);
893   strata_estimator_destroy (op->state->se);
894   op->state->se = NULL;
895   LOG (GNUNET_ERROR_TYPE_DEBUG,
896        "got se diff=%d, using ibf size %d\n",
897        diff,
898        1U << get_order_from_difference (diff));
899
900   {
901     char *set_debug;
902
903     set_debug = getenv ("GNUNET_SET_BENCHMARK");
904     if ( (NULL != set_debug) &&
905          (0 == strcmp (set_debug, "1")) )
906     {
907       FILE *f = fopen ("set.log", "a");
908       fprintf (f, "%llu\n", (unsigned long long) diff);
909       fclose (f);
910     }
911   }
912
913   if ( (GNUNET_YES == op->byzantine) &&
914        (other_size < op->byzantine_lower_bound) )
915   {
916     GNUNET_break (0);
917     fail_union_operation (op);
918     return;
919   }
920
921   if ( (GNUNET_YES == op->force_full) ||
922        (diff > op->state->initial_size / 4) ||
923        (0 == other_size) )
924   {
925     LOG (GNUNET_ERROR_TYPE_DEBUG,
926          "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
927          diff,
928          op->state->initial_size);
929     GNUNET_STATISTICS_update (_GSS_statistics,
930                               "# of full sends",
931                               1,
932                               GNUNET_NO);
933     if ( (op->state->initial_size <= other_size) ||
934          (0 == other_size) )
935     {
936       send_full_set (op);
937     }
938     else
939     {
940       struct GNUNET_MQ_Envelope *ev;
941
942       LOG (GNUNET_ERROR_TYPE_DEBUG,
943            "Telling other peer that we expect its full set\n");
944       op->state->phase = PHASE_EXPECT_IBF;
945       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
946       GNUNET_MQ_send (op->mq,
947                       ev);
948     }
949   }
950   else
951   {
952     GNUNET_STATISTICS_update (_GSS_statistics,
953                               "# of ibf sends",
954                               1,
955                               GNUNET_NO);
956     if (GNUNET_OK !=
957         send_ibf (op,
958                   get_order_from_difference (diff)))
959     {
960       /* Internal error, best we can do is shut the connection */
961       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
962                   "Failed to send IBF, closing connection\n");
963       fail_union_operation (op);
964       return;
965     }
966   }
967   GNUNET_CADET_receive_done (op->channel);
968 }
969
970
971 /**
972  * Iterator to send elements to a remote peer
973  *
974  * @param cls closure with the element key and the union operation
975  * @param key ignored
976  * @param value the key entry
977  */
978 static int
979 send_offers_iterator (void *cls,
980                       uint32_t key,
981                       void *value)
982 {
983   struct SendElementClosure *sec = cls;
984   struct Operation *op = sec->op;
985   struct KeyEntry *ke = value;
986   struct GNUNET_MQ_Envelope *ev;
987   struct GNUNET_MessageHeader *mh;
988
989   /* Detect 32-bit key collision for the 64-bit IBF keys. */
990   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
991     return GNUNET_YES;
992
993   ev = GNUNET_MQ_msg_header_extra (mh,
994                                    sizeof (struct GNUNET_HashCode),
995                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
996
997   GNUNET_assert (NULL != ev);
998   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
999   LOG (GNUNET_ERROR_TYPE_DEBUG,
1000        "[OP %x] sending element offer (%s) to peer\n",
1001        (void *) op,
1002        GNUNET_h2s (&ke->element->element_hash));
1003   GNUNET_MQ_send (op->mq, ev);
1004   return GNUNET_YES;
1005 }
1006
1007
1008 /**
1009  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1010  *
1011  * @param op union operation
1012  * @param ibf_key IBF key of interest
1013  */
1014 static void
1015 send_offers_for_key (struct Operation *op,
1016                      struct IBF_Key ibf_key)
1017 {
1018   struct SendElementClosure send_cls;
1019
1020   send_cls.ibf_key = ibf_key;
1021   send_cls.op = op;
1022   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
1023                                                        (uint32_t) ibf_key.key_val,
1024                                                        &send_offers_iterator,
1025                                                        &send_cls);
1026 }
1027
1028
1029 /**
1030  * Decode which elements are missing on each side, and
1031  * send the appropriate offers and inquiries.
1032  *
1033  * @param op union operation
1034  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1035  */
1036 static int
1037 decode_and_send (struct Operation *op)
1038 {
1039   struct IBF_Key key;
1040   struct IBF_Key last_key;
1041   int side;
1042   unsigned int num_decoded;
1043   struct InvertibleBloomFilter *diff_ibf;
1044
1045   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1046
1047   if (GNUNET_OK !=
1048       prepare_ibf (op,
1049                    op->state->remote_ibf->size))
1050   {
1051     GNUNET_break (0);
1052     /* allocation failed */
1053     return GNUNET_SYSERR;
1054   }
1055   diff_ibf = ibf_dup (op->state->local_ibf);
1056   ibf_subtract (diff_ibf,
1057                 op->state->remote_ibf);
1058
1059   ibf_destroy (op->state->remote_ibf);
1060   op->state->remote_ibf = NULL;
1061
1062   LOG (GNUNET_ERROR_TYPE_DEBUG,
1063        "decoding IBF (size=%u)\n",
1064        diff_ibf->size);
1065
1066   num_decoded = 0;
1067   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1068
1069   while (1)
1070   {
1071     int res;
1072     int cycle_detected = GNUNET_NO;
1073
1074     last_key = key;
1075
1076     res = ibf_decode (diff_ibf, &side, &key);
1077     if (res == GNUNET_OK)
1078     {
1079       LOG (GNUNET_ERROR_TYPE_DEBUG,
1080            "decoded ibf key %lx\n",
1081            (unsigned long) key.key_val);
1082       num_decoded += 1;
1083       if ( (num_decoded > diff_ibf->size) ||
1084            ( (num_decoded > 1) &&
1085              (last_key.key_val == key.key_val) ) )
1086       {
1087         LOG (GNUNET_ERROR_TYPE_DEBUG,
1088              "detected cyclic ibf (decoded %u/%u)\n",
1089              num_decoded,
1090              diff_ibf->size);
1091         cycle_detected = GNUNET_YES;
1092       }
1093     }
1094     if ( (GNUNET_SYSERR == res) ||
1095          (GNUNET_YES == cycle_detected) )
1096     {
1097       int next_order;
1098       next_order = 0;
1099       while (1<<next_order < diff_ibf->size)
1100         next_order++;
1101       next_order++;
1102       if (next_order <= MAX_IBF_ORDER)
1103       {
1104         LOG (GNUNET_ERROR_TYPE_DEBUG,
1105              "decoding failed, sending larger ibf (size %u)\n",
1106              1<<next_order);
1107         GNUNET_STATISTICS_update (_GSS_statistics,
1108                                   "# of IBF retries",
1109                                   1,
1110                                   GNUNET_NO);
1111         op->state->salt_send++;
1112         if (GNUNET_OK !=
1113             send_ibf (op, next_order))
1114         {
1115           /* Internal error, best we can do is shut the connection */
1116           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1117                       "Failed to send IBF, closing connection\n");
1118           fail_union_operation (op);
1119           ibf_destroy (diff_ibf);
1120           return GNUNET_SYSERR;
1121         }
1122       }
1123       else
1124       {
1125         GNUNET_STATISTICS_update (_GSS_statistics,
1126                                   "# of failed union operations (too large)",
1127                                   1,
1128                                   GNUNET_NO);
1129         // XXX: Send the whole set, element-by-element
1130         LOG (GNUNET_ERROR_TYPE_ERROR,
1131              "set union failed: reached ibf limit\n");
1132         fail_union_operation (op);
1133         ibf_destroy (diff_ibf);
1134         return GNUNET_SYSERR;
1135       }
1136       break;
1137     }
1138     if (GNUNET_NO == res)
1139     {
1140       struct GNUNET_MQ_Envelope *ev;
1141
1142       LOG (GNUNET_ERROR_TYPE_DEBUG,
1143            "transmitted all values, sending DONE\n");
1144       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1145       GNUNET_MQ_send (op->mq, ev);
1146       /* We now wait until we get a DONE message back
1147        * and then wait for our MQ to be flushed and all our
1148        * demands be delivered. */
1149       break;
1150     }
1151     if (1 == side)
1152     {
1153       struct IBF_Key unsalted_key;
1154
1155       unsalt_key (&key,
1156                   op->state->salt_receive,
1157                   &unsalted_key);
1158       send_offers_for_key (op,
1159                            unsalted_key);
1160     }
1161     else if (-1 == side)
1162     {
1163       struct GNUNET_MQ_Envelope *ev;
1164       struct InquiryMessage *msg;
1165
1166       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1167        * the effort additional complexity. */
1168       ev = GNUNET_MQ_msg_extra (msg,
1169                                 sizeof (struct IBF_Key),
1170                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1171       msg->salt = htonl (op->state->salt_receive);
1172       GNUNET_memcpy (&msg[1],
1173               &key,
1174               sizeof (struct IBF_Key));
1175       LOG (GNUNET_ERROR_TYPE_DEBUG,
1176            "sending element inquiry for IBF key %lx\n",
1177            (unsigned long) key.key_val);
1178       GNUNET_MQ_send (op->mq, ev);
1179     }
1180     else
1181     {
1182       GNUNET_assert (0);
1183     }
1184   }
1185   ibf_destroy (diff_ibf);
1186   return GNUNET_OK;
1187 }
1188
1189
1190 /**
1191  * Check an IBF message from a remote peer.
1192  *
1193  * Reassemble the IBF from multiple pieces, and
1194  * process the whole IBF once possible.
1195  *
1196  * @param cls the union operation
1197  * @param msg the header of the message
1198  * @return #GNUNET_OK if @a msg is well-formed
1199  */
1200 int
1201 check_union_p2p_ibf (void *cls,
1202                      const struct IBFMessage *msg)
1203 {
1204   struct Operation *op = cls;
1205   unsigned int buckets_in_message;
1206
1207   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1208   {
1209     GNUNET_break_op (0);
1210     return GNUNET_SYSERR;
1211   }
1212   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1213   if (0 == buckets_in_message)
1214   {
1215     GNUNET_break_op (0);
1216     return GNUNET_SYSERR;
1217   }
1218   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1219   {
1220     GNUNET_break_op (0);
1221     return GNUNET_SYSERR;
1222   }
1223   if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1224   {
1225     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1226     {
1227       GNUNET_break_op (0);
1228       return GNUNET_SYSERR;
1229     }
1230     if (1<<msg->order != op->state->remote_ibf->size)
1231     {
1232       GNUNET_break_op (0);
1233       return GNUNET_SYSERR;
1234     }
1235     if (ntohl (msg->salt) != op->state->salt_receive)
1236     {
1237       GNUNET_break_op (0);
1238       return GNUNET_SYSERR;
1239     }
1240   }
1241   else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1242             (op->state->phase != PHASE_EXPECT_IBF) )
1243   {
1244     GNUNET_break_op (0);
1245     return GNUNET_SYSERR;
1246   }
1247
1248   return GNUNET_OK;
1249 }
1250
1251
1252 /**
1253  * Handle an IBF message from a remote peer.
1254  *
1255  * Reassemble the IBF from multiple pieces, and
1256  * process the whole IBF once possible.
1257  *
1258  * @param cls the union operation
1259  * @param msg the header of the message
1260  */
1261 void
1262 handle_union_p2p_ibf (void *cls,
1263                       const struct IBFMessage *msg)
1264 {
1265   struct Operation *op = cls;
1266   unsigned int buckets_in_message;
1267
1268   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1269   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1270        (op->state->phase == PHASE_EXPECT_IBF) )
1271   {
1272     op->state->phase = PHASE_EXPECT_IBF_CONT;
1273     GNUNET_assert (NULL == op->state->remote_ibf);
1274     LOG (GNUNET_ERROR_TYPE_DEBUG,
1275          "Creating new ibf of size %u\n",
1276          1 << msg->order);
1277     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1278     op->state->salt_receive = ntohl (msg->salt);
1279     LOG (GNUNET_ERROR_TYPE_DEBUG,
1280          "Receiving new IBF with salt %u\n",
1281          op->state->salt_receive);
1282     if (NULL == op->state->remote_ibf)
1283     {
1284       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1285                   "Failed to parse remote IBF, closing connection\n");
1286       fail_union_operation (op);
1287       return;
1288     }
1289     op->state->ibf_buckets_received = 0;
1290     if (0 != ntohl (msg->offset))
1291     {
1292       GNUNET_break_op (0);
1293       fail_union_operation (op);
1294       return;
1295     }
1296   }
1297   else
1298   {
1299     GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1300     LOG (GNUNET_ERROR_TYPE_DEBUG,
1301          "Received more of IBF\n");
1302   }
1303   GNUNET_assert (NULL != op->state->remote_ibf);
1304
1305   ibf_read_slice (&msg[1],
1306                   op->state->ibf_buckets_received,
1307                   buckets_in_message,
1308                   op->state->remote_ibf);
1309   op->state->ibf_buckets_received += buckets_in_message;
1310
1311   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1312   {
1313     LOG (GNUNET_ERROR_TYPE_DEBUG,
1314          "received full ibf\n");
1315     op->state->phase = PHASE_INVENTORY_ACTIVE;
1316     if (GNUNET_OK !=
1317         decode_and_send (op))
1318     {
1319       /* Internal error, best we can do is shut down */
1320       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1321                   "Failed to decode IBF, closing connection\n");
1322       fail_union_operation (op);
1323       return;
1324     }
1325   }
1326   GNUNET_CADET_receive_done (op->channel);
1327 }
1328
1329
1330 /**
1331  * Send a result message to the client indicating
1332  * that there is a new element.
1333  *
1334  * @param op union operation
1335  * @param element element to send
1336  * @param status status to send with the new element
1337  */
1338 static void
1339 send_client_element (struct Operation *op,
1340                      struct GNUNET_SET_Element *element,
1341                      int status)
1342 {
1343   struct GNUNET_MQ_Envelope *ev;
1344   struct GNUNET_SET_ResultMessage *rm;
1345
1346   LOG (GNUNET_ERROR_TYPE_DEBUG,
1347        "sending element (size %u) to client\n",
1348        element->size);
1349   GNUNET_assert (0 != op->client_request_id);
1350   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1351   if (NULL == ev)
1352   {
1353     GNUNET_MQ_discard (ev);
1354     GNUNET_break (0);
1355     return;
1356   }
1357   rm->result_status = htons (status);
1358   rm->request_id = htonl (op->client_request_id);
1359   rm->element_type = htons (element->element_type);
1360   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1361   GNUNET_memcpy (&rm[1],
1362                  element->data,
1363                  element->size);
1364   GNUNET_MQ_send (op->set->cs->mq,
1365                   ev);
1366 }
1367
1368
1369 /**
1370  * Destroy remote channel.
1371  *
1372  * @param op operation
1373  */
1374 static void
1375 destroy_channel (struct Operation *op)
1376 {
1377   struct GNUNET_CADET_Channel *channel;
1378
1379   if (NULL != (channel = op->channel))
1380   {
1381     /* This will free op; called conditionally as this helper function
1382        is also called from within the channel disconnect handler. */
1383     op->channel = NULL;
1384     GNUNET_CADET_channel_destroy (channel);
1385   }
1386 }
1387
1388
1389 /**
1390  * Signal to the client that the operation has finished and
1391  * destroy the operation.
1392  *
1393  * @param cls operation to destroy
1394  */
1395 static void
1396 send_client_done (void *cls)
1397 {
1398   struct Operation *op = cls;
1399   struct GNUNET_MQ_Envelope *ev;
1400   struct GNUNET_SET_ResultMessage *rm;
1401
1402   if (GNUNET_YES == op->state->client_done_sent) {
1403     return;
1404   }
1405
1406   if (PHASE_DONE != op->state->phase) {
1407     LOG (GNUNET_ERROR_TYPE_WARNING,
1408          "Union operation failed\n");
1409     GNUNET_STATISTICS_update (_GSS_statistics,
1410                               "# Union operations failed",
1411                               1,
1412                               GNUNET_NO);
1413     ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1414     rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1415     rm->request_id = htonl (op->client_request_id);
1416     rm->element_type = htons (0);
1417     GNUNET_MQ_send (op->set->cs->mq,
1418                     ev);
1419     return;
1420   }
1421
1422   op->state->client_done_sent = GNUNET_YES;
1423
1424   GNUNET_STATISTICS_update (_GSS_statistics,
1425                             "# Union operations succeeded",
1426                             1,
1427                             GNUNET_NO);
1428   LOG (GNUNET_ERROR_TYPE_INFO,
1429        "Signalling client that union operation is done\n");
1430   ev = GNUNET_MQ_msg (rm,
1431                       GNUNET_MESSAGE_TYPE_SET_RESULT);
1432   rm->request_id = htonl (op->client_request_id);
1433   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1434   rm->element_type = htons (0);
1435   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1436   GNUNET_MQ_send (op->set->cs->mq,
1437                   ev);
1438 }
1439
1440
1441 /**
1442  * Tests if the operation is finished, and if so notify.
1443  *
1444  * @param op operation to check
1445  */
1446 static void
1447 maybe_finish (struct Operation *op)
1448 {
1449   unsigned int num_demanded;
1450
1451   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1452
1453   if (PHASE_FINISH_WAITING == op->state->phase)
1454   {
1455     LOG (GNUNET_ERROR_TYPE_DEBUG,
1456          "In PHASE_FINISH_WAITING, pending %u demands\n",
1457          num_demanded);
1458     if (0 == num_demanded)
1459     {
1460       struct GNUNET_MQ_Envelope *ev;
1461
1462       op->state->phase = PHASE_DONE;
1463       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1464       GNUNET_MQ_send (op->mq,
1465                       ev);
1466       /* We now wait until the other peer sends P2P_OVER
1467        * after it got all elements from us. */
1468     }
1469   }
1470   if (PHASE_FINISH_CLOSING == op->state->phase)
1471   {
1472     LOG (GNUNET_ERROR_TYPE_DEBUG,
1473          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1474          num_demanded);
1475     if (0 == num_demanded)
1476     {
1477       op->state->phase = PHASE_DONE;
1478       send_client_done (op);
1479       destroy_channel (op);
1480     }
1481   }
1482 }
1483
1484
1485 /**
1486  * Check an element message from a remote peer.
1487  *
1488  * @param cls the union operation
1489  * @param emsg the message
1490  */
1491 int
1492 check_union_p2p_elements (void *cls,
1493                           const struct GNUNET_SET_ElementMessage *emsg)
1494 {
1495   struct Operation *op = cls;
1496
1497   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1498   {
1499     GNUNET_break_op (0);
1500     return GNUNET_SYSERR;
1501   }
1502   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1503   {
1504     GNUNET_break_op (0);
1505     return GNUNET_SYSERR;
1506   }
1507   return GNUNET_OK;
1508 }
1509
1510
1511 /**
1512  * Handle an element message from a remote peer.
1513  * Sent by the other peer either because we decoded an IBF and placed a demand,
1514  * or because the other peer switched to full set transmission.
1515  *
1516  * @param cls the union operation
1517  * @param emsg the message
1518  */
1519 void
1520 handle_union_p2p_elements (void *cls,
1521                            const struct GNUNET_SET_ElementMessage *emsg)
1522 {
1523   struct Operation *op = cls;
1524   struct ElementEntry *ee;
1525   struct KeyEntry *ke;
1526   uint16_t element_size;
1527
1528   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1529   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1530   GNUNET_memcpy (&ee[1],
1531                  &emsg[1],
1532                  element_size);
1533   ee->element.size = element_size;
1534   ee->element.data = &ee[1];
1535   ee->element.element_type = ntohs (emsg->element_type);
1536   ee->remote = GNUNET_YES;
1537   GNUNET_SET_element_hash (&ee->element,
1538                            &ee->element_hash);
1539   if (GNUNET_NO ==
1540       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1541                                             &ee->element_hash,
1542                                             NULL))
1543   {
1544     /* We got something we didn't demand, since it's not in our map. */
1545     GNUNET_break_op (0);
1546     fail_union_operation (op);
1547     return;
1548   }
1549
1550   LOG (GNUNET_ERROR_TYPE_DEBUG,
1551        "Got element (size %u, hash %s) from peer\n",
1552        (unsigned int) element_size,
1553        GNUNET_h2s (&ee->element_hash));
1554
1555   GNUNET_STATISTICS_update (_GSS_statistics,
1556                             "# received elements",
1557                             1,
1558                             GNUNET_NO);
1559   GNUNET_STATISTICS_update (_GSS_statistics,
1560                             "# exchanged elements",
1561                             1,
1562                             GNUNET_NO);
1563
1564   op->state->received_total++;
1565
1566   ke = op_get_element (op, &ee->element_hash);
1567   if (NULL != ke)
1568   {
1569     /* Got repeated element.  Should not happen since
1570      * we track demands. */
1571     GNUNET_STATISTICS_update (_GSS_statistics,
1572                               "# repeated elements",
1573                               1,
1574                               GNUNET_NO);
1575     ke->received = GNUNET_YES;
1576     GNUNET_free (ee);
1577   }
1578   else
1579   {
1580     LOG (GNUNET_ERROR_TYPE_DEBUG,
1581          "Registering new element from remote peer\n");
1582     op->state->received_fresh++;
1583     op_register_element (op, ee, GNUNET_YES);
1584     /* only send results immediately if the client wants it */
1585     switch (op->result_mode)
1586     {
1587       case GNUNET_SET_RESULT_ADDED:
1588         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1589         break;
1590       case GNUNET_SET_RESULT_SYMMETRIC:
1591         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1592         break;
1593       default:
1594         /* Result mode not supported, should have been caught earlier. */
1595         GNUNET_break (0);
1596         break;
1597     }
1598   }
1599
1600   if ( (op->state->received_total > 8) &&
1601        (op->state->received_fresh < op->state->received_total / 3) )
1602   {
1603     /* The other peer gave us lots of old elements, there's something wrong. */
1604     GNUNET_break_op (0);
1605     fail_union_operation (op);
1606     return;
1607   }
1608   GNUNET_CADET_receive_done (op->channel);
1609   maybe_finish (op);
1610 }
1611
1612
1613 /**
1614  * Check a full element message from a remote peer.
1615  *
1616  * @param cls the union operation
1617  * @param emsg the message
1618  */
1619 int
1620 check_union_p2p_full_element (void *cls,
1621                               const struct GNUNET_SET_ElementMessage *emsg)
1622 {
1623   struct Operation *op = cls;
1624
1625   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1626   {
1627     GNUNET_break_op (0);
1628     return GNUNET_SYSERR;
1629   }
1630   // FIXME: check that we expect full elements here?
1631   return GNUNET_OK;
1632 }
1633
1634
1635 /**
1636  * Handle an element message from a remote peer.
1637  *
1638  * @param cls the union operation
1639  * @param emsg the message
1640  */
1641 void
1642 handle_union_p2p_full_element (void *cls,
1643                                const struct GNUNET_SET_ElementMessage *emsg)
1644 {
1645   struct Operation *op = cls;
1646   struct ElementEntry *ee;
1647   struct KeyEntry *ke;
1648   uint16_t element_size;
1649
1650   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1651   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1652   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1653   ee->element.size = element_size;
1654   ee->element.data = &ee[1];
1655   ee->element.element_type = ntohs (emsg->element_type);
1656   ee->remote = GNUNET_YES;
1657   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1658
1659   LOG (GNUNET_ERROR_TYPE_DEBUG,
1660        "Got element (full diff, size %u, hash %s) from peer\n",
1661        (unsigned int) element_size,
1662        GNUNET_h2s (&ee->element_hash));
1663
1664   GNUNET_STATISTICS_update (_GSS_statistics,
1665                             "# received elements",
1666                             1,
1667                             GNUNET_NO);
1668   GNUNET_STATISTICS_update (_GSS_statistics,
1669                             "# exchanged elements",
1670                             1,
1671                             GNUNET_NO);
1672
1673   op->state->received_total++;
1674
1675   ke = op_get_element (op, &ee->element_hash);
1676   if (NULL != ke)
1677   {
1678     /* Got repeated element.  Should not happen since
1679      * we track demands. */
1680     GNUNET_STATISTICS_update (_GSS_statistics,
1681                               "# repeated elements",
1682                               1,
1683                               GNUNET_NO);
1684     ke->received = GNUNET_YES;
1685     GNUNET_free (ee);
1686   }
1687   else
1688   {
1689     LOG (GNUNET_ERROR_TYPE_DEBUG,
1690          "Registering new element from remote peer\n");
1691     op->state->received_fresh++;
1692     op_register_element (op, ee, GNUNET_YES);
1693     /* only send results immediately if the client wants it */
1694     switch (op->result_mode)
1695     {
1696       case GNUNET_SET_RESULT_ADDED:
1697         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1698         break;
1699       case GNUNET_SET_RESULT_SYMMETRIC:
1700         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1701         break;
1702       default:
1703         /* Result mode not supported, should have been caught earlier. */
1704         GNUNET_break (0);
1705         break;
1706     }
1707   }
1708
1709   if ( (GNUNET_YES == op->byzantine) &&
1710        (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1711        (op->state->received_fresh < op->state->received_total / 6) )
1712   {
1713     /* The other peer gave us lots of old elements, there's something wrong. */
1714     LOG (GNUNET_ERROR_TYPE_ERROR,
1715          "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1716          (unsigned long long) op->state->received_fresh,
1717          (unsigned long long) op->state->received_total);
1718     GNUNET_break_op (0);
1719     fail_union_operation (op);
1720     return;
1721   }
1722   GNUNET_CADET_receive_done (op->channel);
1723 }
1724
1725
1726 /**
1727  * Send offers (for GNUNET_Hash-es) in response
1728  * to inquiries (for IBF_Key-s).
1729  *
1730  * @param cls the union operation
1731  * @param msg the message
1732  */
1733 int
1734 check_union_p2p_inquiry (void *cls,
1735                          const struct InquiryMessage *msg)
1736 {
1737   struct Operation *op = cls;
1738   unsigned int num_keys;
1739
1740   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1741   {
1742     GNUNET_break_op (0);
1743     return GNUNET_SYSERR;
1744   }
1745   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1746   {
1747     GNUNET_break_op (0);
1748     return GNUNET_SYSERR;
1749   }
1750   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1751     / sizeof (struct IBF_Key);
1752   if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1753       != num_keys * sizeof (struct IBF_Key))
1754   {
1755     GNUNET_break_op (0);
1756     return GNUNET_SYSERR;
1757   }
1758   return GNUNET_OK;
1759 }
1760
1761
1762 /**
1763  * Send offers (for GNUNET_Hash-es) in response
1764  * to inquiries (for IBF_Key-s).
1765  *
1766  * @param cls the union operation
1767  * @param msg the message
1768  */
1769 void
1770 handle_union_p2p_inquiry (void *cls,
1771                           const struct InquiryMessage *msg)
1772 {
1773   struct Operation *op = cls;
1774   const struct IBF_Key *ibf_key;
1775   unsigned int num_keys;
1776
1777   LOG (GNUNET_ERROR_TYPE_DEBUG,
1778        "Received union inquiry\n");
1779   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1780     / sizeof (struct IBF_Key);
1781   ibf_key = (const struct IBF_Key *) &msg[1];
1782   while (0 != num_keys--)
1783   {
1784     struct IBF_Key unsalted_key;
1785
1786     unsalt_key (ibf_key,
1787                 ntohl (msg->salt),
1788                 &unsalted_key);
1789     send_offers_for_key (op,
1790                          unsalted_key);
1791     ibf_key++;
1792   }
1793   GNUNET_CADET_receive_done (op->channel);
1794 }
1795
1796
1797 /**
1798  * Iterator over hash map entries, called to
1799  * destroy the linked list of colliding ibf key entries.
1800  *
1801  * @param cls closure
1802  * @param key current key code
1803  * @param value value in the hash map
1804  * @return #GNUNET_YES if we should continue to iterate,
1805  *         #GNUNET_NO if not.
1806  */
1807 static int
1808 send_missing_full_elements_iter (void *cls,
1809                                  uint32_t key,
1810                                  void *value)
1811 {
1812   struct Operation *op = cls;
1813   struct KeyEntry *ke = value;
1814   struct GNUNET_MQ_Envelope *ev;
1815   struct GNUNET_SET_ElementMessage *emsg;
1816   struct ElementEntry *ee = ke->element;
1817
1818   if (GNUNET_YES == ke->received)
1819     return GNUNET_YES;
1820   ev = GNUNET_MQ_msg_extra (emsg,
1821                             ee->element.size,
1822                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1823   GNUNET_memcpy (&emsg[1],
1824                  ee->element.data,
1825                  ee->element.size);
1826   emsg->element_type = htons (ee->element.element_type);
1827   GNUNET_MQ_send (op->mq,
1828                   ev);
1829   return GNUNET_YES;
1830 }
1831
1832
1833 /**
1834  * Handle a request for full set transmission.
1835  *
1836  * @parem cls closure, a set union operation
1837  * @param mh the demand message
1838  */
1839 void
1840 handle_union_p2p_request_full (void *cls,
1841                                const struct GNUNET_MessageHeader *mh)
1842 {
1843   struct Operation *op = cls;
1844
1845   LOG (GNUNET_ERROR_TYPE_DEBUG,
1846        "Received request for full set transmission\n");
1847   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1848   {
1849     GNUNET_break_op (0);
1850     fail_union_operation (op);
1851     return;
1852   }
1853   if (PHASE_EXPECT_IBF != op->state->phase)
1854   {
1855     GNUNET_break_op (0);
1856     fail_union_operation (op);
1857     return;
1858   }
1859
1860   // FIXME: we need to check that our set is larger than the
1861   // byzantine_lower_bound by some threshold
1862   send_full_set (op);
1863   GNUNET_CADET_receive_done (op->channel);
1864 }
1865
1866
1867 /**
1868  * Handle a "full done" message.
1869  *
1870  * @parem cls closure, a set union operation
1871  * @param mh the demand message
1872  */
1873 void
1874 handle_union_p2p_full_done (void *cls,
1875                             const struct GNUNET_MessageHeader *mh)
1876 {
1877   struct Operation *op = cls;
1878
1879   switch (op->state->phase)
1880   {
1881   case PHASE_EXPECT_IBF:
1882     {
1883       struct GNUNET_MQ_Envelope *ev;
1884
1885       LOG (GNUNET_ERROR_TYPE_DEBUG,
1886            "got FULL DONE, sending elements that other peer is missing\n");
1887
1888       /* send all the elements that did not come from the remote peer */
1889       GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1890                                                &send_missing_full_elements_iter,
1891                                                op);
1892
1893       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1894       GNUNET_MQ_send (op->mq,
1895                       ev);
1896       op->state->phase = PHASE_DONE;
1897       /* we now wait until the other peer sends us the OVER message*/
1898     }
1899     break;
1900   case PHASE_FULL_SENDING:
1901     {
1902       LOG (GNUNET_ERROR_TYPE_DEBUG,
1903            "got FULL DONE, finishing\n");
1904       /* We sent the full set, and got the response for that.  We're done. */
1905       op->state->phase = PHASE_DONE;
1906       GNUNET_CADET_receive_done (op->channel);
1907       send_client_done (op);
1908       destroy_channel (op);
1909       return;
1910     }
1911     break;
1912   default:
1913     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1914                 "Handle full done phase is %u\n",
1915                 (unsigned) op->state->phase);
1916     GNUNET_break_op (0);
1917     fail_union_operation (op);
1918     return;
1919   }
1920   GNUNET_CADET_receive_done (op->channel);
1921 }
1922
1923
1924 /**
1925  * Check a demand by the other peer for elements based on a list
1926  * of `struct GNUNET_HashCode`s.
1927  *
1928  * @parem cls closure, a set union operation
1929  * @param mh the demand message
1930  * @return #GNUNET_OK if @a mh is well-formed
1931  */
1932 int
1933 check_union_p2p_demand (void *cls,
1934                         const struct GNUNET_MessageHeader *mh)
1935 {
1936   struct Operation *op = cls;
1937   unsigned int num_hashes;
1938
1939   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1940   {
1941     GNUNET_break_op (0);
1942     return GNUNET_SYSERR;
1943   }
1944   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1945     / sizeof (struct GNUNET_HashCode);
1946   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1947       != num_hashes * sizeof (struct GNUNET_HashCode))
1948   {
1949     GNUNET_break_op (0);
1950     return GNUNET_SYSERR;
1951   }
1952   return GNUNET_OK;
1953 }
1954
1955
1956 /**
1957  * Handle a demand by the other peer for elements based on a list
1958  * of `struct GNUNET_HashCode`s.
1959  *
1960  * @parem cls closure, a set union operation
1961  * @param mh the demand message
1962  */
1963 void
1964 handle_union_p2p_demand (void *cls,
1965                          const struct GNUNET_MessageHeader *mh)
1966 {
1967   struct Operation *op = cls;
1968   struct ElementEntry *ee;
1969   struct GNUNET_SET_ElementMessage *emsg;
1970   const struct GNUNET_HashCode *hash;
1971   unsigned int num_hashes;
1972   struct GNUNET_MQ_Envelope *ev;
1973
1974   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1975     / sizeof (struct GNUNET_HashCode);
1976   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1977        num_hashes > 0;
1978        hash++, num_hashes--)
1979   {
1980     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1981                                             hash);
1982     if (NULL == ee)
1983     {
1984       /* Demand for non-existing element. */
1985       GNUNET_break_op (0);
1986       fail_union_operation (op);
1987       return;
1988     }
1989     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1990     {
1991       /* Probably confused lazily copied sets. */
1992       GNUNET_break_op (0);
1993       fail_union_operation (op);
1994       return;
1995     }
1996     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1997     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1998     emsg->reserved = htons (0);
1999     emsg->element_type = htons (ee->element.element_type);
2000     LOG (GNUNET_ERROR_TYPE_DEBUG,
2001          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
2002          (void *) op,
2003          (unsigned int) ee->element.size,
2004          GNUNET_h2s (&ee->element_hash));
2005     GNUNET_MQ_send (op->mq, ev);
2006     GNUNET_STATISTICS_update (_GSS_statistics,
2007                               "# exchanged elements",
2008                               1,
2009                               GNUNET_NO);
2010
2011     switch (op->result_mode)
2012     {
2013       case GNUNET_SET_RESULT_ADDED:
2014         /* Nothing to do. */
2015         break;
2016       case GNUNET_SET_RESULT_SYMMETRIC:
2017         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2018         break;
2019       default:
2020         /* Result mode not supported, should have been caught earlier. */
2021         GNUNET_break (0);
2022         break;
2023     }
2024   }
2025   GNUNET_CADET_receive_done (op->channel);
2026 }
2027
2028
2029 /**
2030  * Check offer (of `struct GNUNET_HashCode`s).
2031  *
2032  * @param cls the union operation
2033  * @param mh the message
2034  * @return #GNUNET_OK if @a mh is well-formed
2035  */
2036 int
2037 check_union_p2p_offer (void *cls,
2038                         const struct GNUNET_MessageHeader *mh)
2039 {
2040   struct Operation *op = cls;
2041   unsigned int num_hashes;
2042
2043   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2044   {
2045     GNUNET_break_op (0);
2046     return GNUNET_SYSERR;
2047   }
2048   /* look up elements and send them */
2049   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2050        (op->state->phase != PHASE_INVENTORY_ACTIVE))
2051   {
2052     GNUNET_break_op (0);
2053     return GNUNET_SYSERR;
2054   }
2055   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2056     / sizeof (struct GNUNET_HashCode);
2057   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) !=
2058       num_hashes * sizeof (struct GNUNET_HashCode))
2059   {
2060     GNUNET_break_op (0);
2061     return GNUNET_SYSERR;
2062   }
2063   return GNUNET_OK;
2064 }
2065
2066
2067 /**
2068  * Handle offers (of `struct GNUNET_HashCode`s) and
2069  * respond with demands (of `struct GNUNET_HashCode`s).
2070  *
2071  * @param cls the union operation
2072  * @param mh the message
2073  */
2074 void
2075 handle_union_p2p_offer (void *cls,
2076                         const struct GNUNET_MessageHeader *mh)
2077 {
2078   struct Operation *op = cls;
2079   const struct GNUNET_HashCode *hash;
2080   unsigned int num_hashes;
2081
2082   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2083     / sizeof (struct GNUNET_HashCode);
2084   for (hash = (const struct GNUNET_HashCode *) &mh[1];
2085        num_hashes > 0;
2086        hash++, num_hashes--)
2087   {
2088     struct ElementEntry *ee;
2089     struct GNUNET_MessageHeader *demands;
2090     struct GNUNET_MQ_Envelope *ev;
2091
2092     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2093                                             hash);
2094     if (NULL != ee)
2095       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2096         continue;
2097
2098     if (GNUNET_YES ==
2099         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2100                                                 hash))
2101     {
2102       LOG (GNUNET_ERROR_TYPE_DEBUG,
2103            "Skipped sending duplicate demand\n");
2104       continue;
2105     }
2106
2107     GNUNET_assert (GNUNET_OK ==
2108                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
2109                                                       hash,
2110                                                       NULL,
2111                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2112
2113     LOG (GNUNET_ERROR_TYPE_DEBUG,
2114          "[OP %x] Requesting element (hash %s)\n",
2115          (void *) op, GNUNET_h2s (hash));
2116     ev = GNUNET_MQ_msg_header_extra (demands,
2117                                      sizeof (struct GNUNET_HashCode),
2118                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2119     GNUNET_memcpy (&demands[1],
2120                    hash,
2121                    sizeof (struct GNUNET_HashCode));
2122     GNUNET_MQ_send (op->mq, ev);
2123   }
2124   GNUNET_CADET_receive_done (op->channel);
2125 }
2126
2127
2128 /**
2129  * Handle a done message from a remote peer
2130  *
2131  * @param cls the union operation
2132  * @param mh the message
2133  */
2134 void
2135 handle_union_p2p_done (void *cls,
2136                        const struct GNUNET_MessageHeader *mh)
2137 {
2138   struct Operation *op = cls;
2139
2140   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2141   {
2142     GNUNET_break_op (0);
2143     fail_union_operation (op);
2144     return;
2145   }
2146   switch (op->state->phase)
2147   {
2148   case PHASE_INVENTORY_PASSIVE:
2149     /* We got all requests, but still have to send our elements in response. */
2150     op->state->phase = PHASE_FINISH_WAITING;
2151
2152     LOG (GNUNET_ERROR_TYPE_DEBUG,
2153          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2154     /* The active peer is done sending offers
2155      * and inquiries.  This means that all
2156      * our responses to that (demands and offers)
2157      * must be in flight (queued or in mesh).
2158      *
2159      * We should notify the active peer once
2160      * all our demands are satisfied, so that the active
2161      * peer can quit if we gave it everything.
2162      */
2163     GNUNET_CADET_receive_done (op->channel);
2164     maybe_finish (op);
2165     return;
2166   case PHASE_INVENTORY_ACTIVE:
2167     LOG (GNUNET_ERROR_TYPE_DEBUG,
2168          "got DONE (as active partner), waiting to finish\n");
2169     /* All demands of the other peer are satisfied,
2170      * and we processed all offers, thus we know
2171      * exactly what our demands must be.
2172      *
2173      * We'll close the channel
2174      * to the other peer once our demands are met.
2175      */
2176     op->state->phase = PHASE_FINISH_CLOSING;
2177     GNUNET_CADET_receive_done (op->channel);
2178     maybe_finish (op);
2179     return;
2180   default:
2181     GNUNET_break_op (0);
2182     fail_union_operation (op);
2183     return;
2184   }
2185 }
2186
2187 /**
2188  * Handle a over message from a remote peer
2189  *
2190  * @param cls the union operation
2191  * @param mh the message
2192  */
2193 void
2194 handle_union_p2p_over (void *cls,
2195                        const struct GNUNET_MessageHeader *mh)
2196 {
2197   send_client_done (cls);
2198 }
2199
2200
2201 /**
2202  * Initiate operation to evaluate a set union with a remote peer.
2203  *
2204  * @param op operation to perform (to be initialized)
2205  * @param opaque_context message to be transmitted to the listener
2206  *        to convince it to accept, may be NULL
2207  */
2208 static struct OperationState *
2209 union_evaluate (struct Operation *op,
2210                 const struct GNUNET_MessageHeader *opaque_context)
2211 {
2212   struct OperationState *state;
2213   struct GNUNET_MQ_Envelope *ev;
2214   struct OperationRequestMessage *msg;
2215
2216   ev = GNUNET_MQ_msg_nested_mh (msg,
2217                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2218                                 opaque_context);
2219   if (NULL == ev)
2220   {
2221     /* the context message is too large */
2222     GNUNET_break (0);
2223     return NULL;
2224   }
2225   state = GNUNET_new (struct OperationState);
2226   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2227                                                                  GNUNET_NO);
2228   /* copy the current generation's strata estimator for this operation */
2229   state->se = strata_estimator_dup (op->set->state->se);
2230   /* we started the operation, thus we have to send the operation request */
2231   state->phase = PHASE_EXPECT_SE;
2232   state->salt_receive = state->salt_send = 42; // FIXME?????
2233   LOG (GNUNET_ERROR_TYPE_DEBUG,
2234        "Initiating union operation evaluation\n");
2235   GNUNET_STATISTICS_update (_GSS_statistics,
2236                             "# of total union operations",
2237                             1,
2238                             GNUNET_NO);
2239   GNUNET_STATISTICS_update (_GSS_statistics,
2240                             "# of initiated union operations",
2241                             1,
2242                             GNUNET_NO);
2243   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2244   GNUNET_MQ_send (op->mq,
2245                   ev);
2246
2247   if (NULL != opaque_context)
2248     LOG (GNUNET_ERROR_TYPE_DEBUG,
2249          "sent op request with context message\n");
2250   else
2251     LOG (GNUNET_ERROR_TYPE_DEBUG,
2252          "sent op request without context message\n");
2253
2254   op->state = state;
2255   initialize_key_to_element (op);
2256   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2257   return state;
2258 }
2259
2260
2261 /**
2262  * Accept an union operation request from a remote peer.
2263  * Only initializes the private operation state.
2264  *
2265  * @param op operation that will be accepted as a union operation
2266  */
2267 static struct OperationState *
2268 union_accept (struct Operation *op)
2269 {
2270   struct OperationState *state;
2271   const struct StrataEstimator *se;
2272   struct GNUNET_MQ_Envelope *ev;
2273   struct StrataEstimatorMessage *strata_msg;
2274   char *buf;
2275   size_t len;
2276   uint16_t type;
2277
2278   LOG (GNUNET_ERROR_TYPE_DEBUG,
2279        "accepting set union operation\n");
2280   GNUNET_STATISTICS_update (_GSS_statistics,
2281                             "# of accepted union operations",
2282                             1,
2283                             GNUNET_NO);
2284   GNUNET_STATISTICS_update (_GSS_statistics,
2285                             "# of total union operations",
2286                             1,
2287                             GNUNET_NO);
2288
2289   state = GNUNET_new (struct OperationState);
2290   state->se = strata_estimator_dup (op->set->state->se);
2291   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2292                                                                  GNUNET_NO);
2293   state->salt_receive = state->salt_send = 42; // FIXME?????
2294   op->state = state;
2295   initialize_key_to_element (op);
2296   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2297
2298   /* kick off the operation */
2299   se = state->se;
2300   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2301   len = strata_estimator_write (se,
2302                                 buf);
2303   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2304     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2305   else
2306     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2307   ev = GNUNET_MQ_msg_extra (strata_msg,
2308                             len,
2309                             type);
2310   GNUNET_memcpy (&strata_msg[1],
2311                  buf,
2312                  len);
2313   GNUNET_free (buf);
2314   strata_msg->set_size
2315     = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
2316   GNUNET_MQ_send (op->mq,
2317                   ev);
2318   state->phase = PHASE_EXPECT_IBF;
2319   return state;
2320 }
2321
2322
2323 /**
2324  * Create a new set supporting the union operation
2325  *
2326  * We maintain one strata estimator per set and then manipulate it over the
2327  * lifetime of the set, as recreating a strata estimator would be expensive.
2328  *
2329  * @return the newly created set, NULL on error
2330  */
2331 static struct SetState *
2332 union_set_create (void)
2333 {
2334   struct SetState *set_state;
2335
2336   LOG (GNUNET_ERROR_TYPE_DEBUG,
2337        "union set created\n");
2338   set_state = GNUNET_new (struct SetState);
2339   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2340                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2341   if (NULL == set_state->se)
2342   {
2343     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2344                 "Failed to allocate strata estimator\n");
2345     GNUNET_free (set_state);
2346     return NULL;
2347   }
2348   return set_state;
2349 }
2350
2351
2352 /**
2353  * Add the element from the given element message to the set.
2354  *
2355  * @param set_state state of the set want to add to
2356  * @param ee the element to add to the set
2357  */
2358 static void
2359 union_add (struct SetState *set_state,
2360            struct ElementEntry *ee)
2361 {
2362   strata_estimator_insert (set_state->se,
2363                            get_ibf_key (&ee->element_hash));
2364 }
2365
2366
2367 /**
2368  * Remove the element given in the element message from the set.
2369  * Only marks the element as removed, so that older set operations can still exchange it.
2370  *
2371  * @param set_state state of the set to remove from
2372  * @param ee set element to remove
2373  */
2374 static void
2375 union_remove (struct SetState *set_state,
2376               struct ElementEntry *ee)
2377 {
2378   strata_estimator_remove (set_state->se,
2379                            get_ibf_key (&ee->element_hash));
2380 }
2381
2382
2383 /**
2384  * Destroy a set that supports the union operation.
2385  *
2386  * @param set_state the set to destroy
2387  */
2388 static void
2389 union_set_destroy (struct SetState *set_state)
2390 {
2391   if (NULL != set_state->se)
2392   {
2393     strata_estimator_destroy (set_state->se);
2394     set_state->se = NULL;
2395   }
2396   GNUNET_free (set_state);
2397 }
2398
2399
2400 /**
2401  * Copy union-specific set state.
2402  *
2403  * @param state source state for copying the union state
2404  * @return a copy of the union-specific set state
2405  */
2406 static struct SetState *
2407 union_copy_state (struct SetState *state)
2408 {
2409   struct SetState *new_state;
2410
2411   GNUNET_assert ( (NULL != state) &&
2412                   (NULL != state->se) );
2413   new_state = GNUNET_new (struct SetState);
2414   new_state->se = strata_estimator_dup (state->se);
2415
2416   return new_state;
2417 }
2418
2419
2420 /**
2421  * Handle case where channel went down for an operation.
2422  *
2423  * @param op operation that lost the channel
2424  */
2425 static void
2426 union_channel_death (struct Operation *op)
2427 {
2428   send_client_done (op);
2429   _GSS_operation_destroy (op,
2430                           GNUNET_YES);
2431 }
2432
2433
2434 /**
2435  * Get the table with implementing functions for
2436  * set union.
2437  *
2438  * @return the operation specific VTable
2439  */
2440 const struct SetVT *
2441 _GSS_union_vt ()
2442 {
2443   static const struct SetVT union_vt = {
2444     .create = &union_set_create,
2445     .add = &union_add,
2446     .remove = &union_remove,
2447     .destroy_set = &union_set_destroy,
2448     .evaluate = &union_evaluate,
2449     .accept = &union_accept,
2450     .cancel = &union_op_cancel,
2451     .copy_state = &union_copy_state,
2452     .channel_death = &union_channel_death
2453   };
2454
2455   return &union_vt;
2456 }