440d0d1b0d9974399df6fd9575e46958a1aa9c9f
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14 */
15 /**
16  * @file set/gnunet-service-set_union.c
17  * @brief two-peer set operations
18  * @author Florian Dold
19  * @author Christian Grothoff
20  */
21 #include "platform.h"
22 #include "gnunet_util_lib.h"
23 #include "gnunet_statistics_service.h"
24 #include "gnunet-service-set.h"
25 #include "ibf.h"
26 #include "gnunet-service-set_union.h"
27 #include "gnunet-service-set_union_strata_estimator.h"
28 #include "gnunet-service-set_protocol.h"
29 #include <gcrypt.h>
30
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
33
34
35 /**
36  * Number of IBFs in a strata estimator.
37  */
38 #define SE_STRATA_COUNT 32
39
40 /**
41  * Size of the IBFs in the strata estimator.
42  */
43 #define SE_IBF_SIZE 80
44
45 /**
46  * The hash num parameter for the difference digests and strata estimators.
47  */
48 #define SE_IBF_HASH_NUM 4
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (20)
61
62 /**
63  * Number of buckets used in the ibf per estimated
64  * difference.
65  */
66 #define IBF_ALPHA 4
67
68
69 /**
70  * Current phase we are in for a union operation.
71  */
72 enum UnionOperationPhase
73 {
74   /**
75    * We sent the request message, and expect a strata estimator.
76    */
77   PHASE_EXPECT_SE,
78
79   /**
80    * We sent the strata estimator, and expect an IBF. This phase is entered once
81    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
82    *
83    * XXX: could use better wording.
84    * XXX: repurposed to also expect a "request full set" message, should be renamed
85    *
86    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
87    */
88   PHASE_EXPECT_IBF,
89
90   /**
91    * Continuation for multi part IBFs.
92    */
93   PHASE_EXPECT_IBF_CONT,
94
95   /**
96    * We are decoding an IBF.
97    */
98   PHASE_INVENTORY_ACTIVE,
99
100   /**
101    * The other peer is decoding the IBF we just sent.
102    */
103   PHASE_INVENTORY_PASSIVE,
104
105   /**
106    * The protocol is almost finished, but we still have to flush our message
107    * queue and/or expect some elements.
108    */
109   PHASE_FINISH_CLOSING,
110
111   /**
112    * In the penultimate phase,
113    * we wait until all our demands
114    * are satisfied.  Then we send a done
115    * message, and wait for another done message.
116    */
117   PHASE_FINISH_WAITING,
118
119   /**
120    * In the ultimate phase, we wait until
121    * our demands are satisfied and then
122    * quit (sending another DONE message).
123    */
124   PHASE_DONE,
125
126   /**
127    * After sending the full set, wait for responses with the elements
128    * that the local peer is missing.
129    */
130   PHASE_FULL_SENDING,
131 };
132
133
134 /**
135  * State of an evaluate operation with another peer.
136  */
137 struct OperationState
138 {
139   /**
140    * Copy of the set's strata estimator at the time of
141    * creation of this operation.
142    */
143   struct StrataEstimator *se;
144
145   /**
146    * The IBF we currently receive.
147    */
148   struct InvertibleBloomFilter *remote_ibf;
149
150   /**
151    * The IBF with the local set's element.
152    */
153   struct InvertibleBloomFilter *local_ibf;
154
155   /**
156    * Maps unsalted IBF-Keys to elements.
157    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
158    * Colliding IBF-Keys are linked.
159    */
160   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
161
162   /**
163    * Current state of the operation.
164    */
165   enum UnionOperationPhase phase;
166
167   /**
168    * Did we send the client that we are done?
169    */
170   int client_done_sent;
171
172   /**
173    * Number of ibf buckets already received into the @a remote_ibf.
174    */
175   unsigned int ibf_buckets_received;
176
177   /**
178    * Hashes for elements that we have demanded from the other peer.
179    */
180   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
181
182   /**
183    * Salt that we're using for sending IBFs
184    */
185   uint32_t salt_send;
186
187   /**
188    * Salt for the IBF we've received and that we're currently decoding.
189    */
190   uint32_t salt_receive;
191
192   /**
193    * Number of elements we received from the other peer
194    * that were not in the local set yet.
195    */
196   uint32_t received_fresh;
197
198   /**
199    * Total number of elements received from the other peer.
200    */
201   uint32_t received_total;
202
203   /**
204    * Initial size of our set, just before
205    * the operation started.
206    */
207   uint64_t initial_size;
208 };
209
210
211 /**
212  * The key entry is used to associate an ibf key with an element.
213  */
214 struct KeyEntry
215 {
216   /**
217    * IBF key for the entry, derived from the current salt.
218    */
219   struct IBF_Key ibf_key;
220
221   /**
222    * The actual element associated with the key.
223    *
224    * Only owned by the union operation if element->operation
225    * is #GNUNET_YES.
226    */
227   struct ElementEntry *element;
228
229   /**
230    * Did we receive this element?
231    * Even if element->is_foreign is false, we might
232    * have received the element, so this indicates that
233    * the other peer has it.
234    */
235   int received;
236 };
237
238
239 /**
240  * Used as a closure for sending elements
241  * with a specific IBF key.
242  */
243 struct SendElementClosure
244 {
245   /**
246    * The IBF key whose matching elements should be
247    * sent.
248    */
249   struct IBF_Key ibf_key;
250
251   /**
252    * Operation for which the elements
253    * should be sent.
254    */
255   struct Operation *op;
256 };
257
258
259 /**
260  * Extra state required for efficient set union.
261  */
262 struct SetState
263 {
264   /**
265    * The strata estimator is only generated once for
266    * each set.
267    * The IBF keys are derived from the element hashes with
268    * salt=0.
269    */
270   struct StrataEstimator *se;
271 };
272
273
274 /**
275  * Iterator over hash map entries, called to
276  * destroy the linked list of colliding ibf key entries.
277  *
278  * @param cls closure
279  * @param key current key code
280  * @param value value in the hash map
281  * @return #GNUNET_YES if we should continue to iterate,
282  *         #GNUNET_NO if not.
283  */
284 static int
285 destroy_key_to_element_iter (void *cls,
286                              uint32_t key,
287                              void *value)
288 {
289   struct KeyEntry *k = value;
290
291   GNUNET_assert (NULL != k);
292   if (GNUNET_YES == k->element->remote)
293   {
294     GNUNET_free (k->element);
295     k->element = NULL;
296   }
297   GNUNET_free (k);
298   return GNUNET_YES;
299 }
300
301
302 /**
303  * Destroy the union operation.  Only things specific to the union
304  * operation are destroyed.
305  *
306  * @param op union operation to destroy
307  */
308 static void
309 union_op_cancel (struct Operation *op)
310 {
311   LOG (GNUNET_ERROR_TYPE_DEBUG,
312        "destroying union op\n");
313   /* check if the op was canceled twice */
314   GNUNET_assert (NULL != op->state);
315   if (NULL != op->state->remote_ibf)
316   {
317     ibf_destroy (op->state->remote_ibf);
318     op->state->remote_ibf = NULL;
319   }
320   if (NULL != op->state->demanded_hashes)
321   {
322     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
323     op->state->demanded_hashes = NULL;
324   }
325   if (NULL != op->state->local_ibf)
326   {
327     ibf_destroy (op->state->local_ibf);
328     op->state->local_ibf = NULL;
329   }
330   if (NULL != op->state->se)
331   {
332     strata_estimator_destroy (op->state->se);
333     op->state->se = NULL;
334   }
335   if (NULL != op->state->key_to_element)
336   {
337     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
338                                              &destroy_key_to_element_iter,
339                                              NULL);
340     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
341     op->state->key_to_element = NULL;
342   }
343   GNUNET_free (op->state);
344   op->state = NULL;
345   LOG (GNUNET_ERROR_TYPE_DEBUG,
346        "destroying union op done\n");
347 }
348
349
350 /**
351  * Inform the client that the union operation has failed,
352  * and proceed to destroy the evaluate operation.
353  *
354  * @param op the union operation to fail
355  */
356 static void
357 fail_union_operation (struct Operation *op)
358 {
359   struct GNUNET_MQ_Envelope *ev;
360   struct GNUNET_SET_ResultMessage *msg;
361
362   LOG (GNUNET_ERROR_TYPE_WARNING,
363        "union operation failed\n");
364   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
365   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
366   msg->request_id = htonl (op->client_request_id);
367   msg->element_type = htons (0);
368   GNUNET_MQ_send (op->set->cs->mq,
369                   ev);
370   _GSS_operation_destroy (op, GNUNET_YES);
371 }
372
373
374 /**
375  * Derive the IBF key from a hash code and
376  * a salt.
377  *
378  * @param src the hash code
379  * @return the derived IBF key
380  */
381 static struct IBF_Key
382 get_ibf_key (const struct GNUNET_HashCode *src)
383 {
384   struct IBF_Key key;
385   uint16_t salt = 0;
386
387   GNUNET_assert (GNUNET_OK ==
388                  GNUNET_CRYPTO_kdf (&key, sizeof (key),
389                                     src, sizeof *src,
390                                     &salt, sizeof (salt),
391                                     NULL, 0));
392   return key;
393 }
394
395
396 /**
397  * Context for #op_get_element_iterator
398  */
399 struct GetElementContext
400 {
401   /**
402    * FIXME.
403    */
404   struct GNUNET_HashCode hash;
405
406   /**
407    * FIXME.
408    */
409   struct KeyEntry *k;
410 };
411
412
413 /**
414  * Iterator over the mapping from IBF keys to element entries.  Checks if we
415  * have an element with a given GNUNET_HashCode.
416  *
417  * @param cls closure
418  * @param key current key code
419  * @param value value in the hash map
420  * @return #GNUNET_YES if we should search further,
421  *         #GNUNET_NO if we've found the element.
422  */
423 static int
424 op_get_element_iterator (void *cls,
425                          uint32_t key,
426                          void *value)
427 {
428   struct GetElementContext *ctx = cls;
429   struct KeyEntry *k = value;
430
431   GNUNET_assert (NULL != k);
432   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
433                                    &ctx->hash))
434   {
435     ctx->k = k;
436     return GNUNET_NO;
437   }
438   return GNUNET_YES;
439 }
440
441
442 /**
443  * Determine whether the given element is already in the operation's element
444  * set.
445  *
446  * @param op operation that should be tested for 'element_hash'
447  * @param element_hash hash of the element to look for
448  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
449  */
450 static struct KeyEntry *
451 op_get_element (struct Operation *op,
452                 const struct GNUNET_HashCode *element_hash)
453 {
454   int ret;
455   struct IBF_Key ibf_key;
456   struct GetElementContext ctx = {{{ 0 }} , 0};
457
458   ctx.hash = *element_hash;
459
460   ibf_key = get_ibf_key (element_hash);
461   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
462                                                       (uint32_t) ibf_key.key_val,
463                                                       op_get_element_iterator,
464                                                       &ctx);
465
466   /* was the iteration aborted because we found the element? */
467   if (GNUNET_SYSERR == ret)
468   {
469     GNUNET_assert (NULL != ctx.k);
470     return ctx.k;
471   }
472   return NULL;
473 }
474
475
476 /**
477  * Insert an element into the union operation's
478  * key-to-element mapping. Takes ownership of 'ee'.
479  * Note that this does not insert the element in the set,
480  * only in the operation's key-element mapping.
481  * This is done to speed up re-tried operations, if some elements
482  * were transmitted, and then the IBF fails to decode.
483  *
484  * XXX: clarify ownership, doesn't sound right.
485  *
486  * @param op the union operation
487  * @param ee the element entry
488  * @parem received was this element received from the remote peer?
489  */
490 static void
491 op_register_element (struct Operation *op,
492                      struct ElementEntry *ee,
493                      int received)
494 {
495   struct IBF_Key ibf_key;
496   struct KeyEntry *k;
497
498   ibf_key = get_ibf_key (&ee->element_hash);
499   k = GNUNET_new (struct KeyEntry);
500   k->element = ee;
501   k->ibf_key = ibf_key;
502   k->received = received;
503   GNUNET_assert (GNUNET_OK ==
504                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
505                                                       (uint32_t) ibf_key.key_val,
506                                                       k,
507                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
508 }
509
510
511 /**
512  * FIXME.
513  */
514 static void
515 salt_key (const struct IBF_Key *k_in,
516           uint32_t salt,
517           struct IBF_Key *k_out)
518 {
519   int s = salt % 64;
520   uint64_t x = k_in->key_val;
521   /* rotate ibf key */
522   x = (x >> s) | (x << (64 - s));
523   k_out->key_val = x;
524 }
525
526
527 /**
528  * FIXME.
529  */
530 static void
531 unsalt_key (const struct IBF_Key *k_in,
532             uint32_t salt,
533             struct IBF_Key *k_out)
534 {
535   int s = salt % 64;
536   uint64_t x = k_in->key_val;
537   x = (x << s) | (x >> (64 - s));
538   k_out->key_val = x;
539 }
540
541
542 /**
543  * Insert a key into an ibf.
544  *
545  * @param cls the ibf
546  * @param key unused
547  * @param value the key entry to get the key from
548  */
549 static int
550 prepare_ibf_iterator (void *cls,
551                       uint32_t key,
552                       void *value)
553 {
554   struct Operation *op = cls;
555   struct KeyEntry *ke = value;
556   struct IBF_Key salted_key;
557
558   LOG (GNUNET_ERROR_TYPE_DEBUG,
559        "[OP %x] inserting %lx (hash %s) into ibf\n",
560        (void *) op,
561        (unsigned long) ke->ibf_key.key_val,
562        GNUNET_h2s (&ke->element->element_hash));
563   salt_key (&ke->ibf_key,
564             op->state->salt_send,
565             &salted_key);
566   ibf_insert (op->state->local_ibf, salted_key);
567   return GNUNET_YES;
568 }
569
570
571 /**
572  * Iterator for initializing the
573  * key-to-element mapping of a union operation
574  *
575  * @param cls the union operation `struct Operation *`
576  * @param key unused
577  * @param value the `struct ElementEntry *` to insert
578  *        into the key-to-element mapping
579  * @return #GNUNET_YES (to continue iterating)
580  */
581 static int
582 init_key_to_element_iterator (void *cls,
583                               const struct GNUNET_HashCode *key,
584                               void *value)
585 {
586   struct Operation *op = cls;
587   struct ElementEntry *ee = value;
588
589   /* make sure that the element belongs to the set at the time
590    * of creating the operation */
591   if (GNUNET_NO ==
592       _GSS_is_element_of_operation (ee,
593                                     op))
594     return GNUNET_YES;
595   GNUNET_assert (GNUNET_NO == ee->remote);
596   op_register_element (op,
597                        ee,
598                        GNUNET_NO);
599   return GNUNET_YES;
600 }
601
602
603 /**
604  * Initialize the IBF key to element mapping local to this set
605  * operation.
606  *
607  * @param op the set union operation
608  */
609 static void
610 initialize_key_to_element (struct Operation *op)
611 {
612   unsigned int len;
613
614   GNUNET_assert (NULL == op->state->key_to_element);
615   len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
616   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
617   GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
618                                          &init_key_to_element_iterator,
619                                          op);
620 }
621
622
623 /**
624  * Create an ibf with the operation's elements
625  * of the specified size
626  *
627  * @param op the union operation
628  * @param size size of the ibf to create
629  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
630  */
631 static int
632 prepare_ibf (struct Operation *op,
633              uint32_t size)
634 {
635   GNUNET_assert (NULL != op->state->key_to_element);
636
637   if (NULL != op->state->local_ibf)
638     ibf_destroy (op->state->local_ibf);
639   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
640   if (NULL == op->state->local_ibf)
641   {
642     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
643                 "Failed to allocate local IBF\n");
644     return GNUNET_SYSERR;
645   }
646   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
647                                            &prepare_ibf_iterator,
648                                            op);
649   return GNUNET_OK;
650 }
651
652
653 /**
654  * Send an ibf of appropriate size.
655  *
656  * Fragments the IBF into multiple messages if necessary.
657  *
658  * @param op the union operation
659  * @param ibf_order order of the ibf to send, size=2^order
660  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
661  */
662 static int
663 send_ibf (struct Operation *op,
664           uint16_t ibf_order)
665 {
666   unsigned int buckets_sent = 0;
667   struct InvertibleBloomFilter *ibf;
668
669   if (GNUNET_OK !=
670       prepare_ibf (op, 1<<ibf_order))
671   {
672     /* allocation failed */
673     return GNUNET_SYSERR;
674   }
675
676   LOG (GNUNET_ERROR_TYPE_DEBUG,
677        "sending ibf of size %u\n",
678        1<<ibf_order);
679
680   {
681     char name[64] = { 0 };
682     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
683     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
684   }
685
686   ibf = op->state->local_ibf;
687
688   while (buckets_sent < (1 << ibf_order))
689   {
690     unsigned int buckets_in_message;
691     struct GNUNET_MQ_Envelope *ev;
692     struct IBFMessage *msg;
693
694     buckets_in_message = (1 << ibf_order) - buckets_sent;
695     /* limit to maximum */
696     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
697       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
698
699     ev = GNUNET_MQ_msg_extra (msg,
700                               buckets_in_message * IBF_BUCKET_SIZE,
701                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
702     msg->reserved1 = 0;
703     msg->reserved2 = 0;
704     msg->order = ibf_order;
705     msg->offset = htonl (buckets_sent);
706     msg->salt = htonl (op->state->salt_send);
707     ibf_write_slice (ibf, buckets_sent,
708                      buckets_in_message, &msg[1]);
709     buckets_sent += buckets_in_message;
710     LOG (GNUNET_ERROR_TYPE_DEBUG,
711          "ibf chunk size %u, %u/%u sent\n",
712          buckets_in_message,
713          buckets_sent,
714          1<<ibf_order);
715     GNUNET_MQ_send (op->mq, ev);
716   }
717
718   /* The other peer must decode the IBF, so
719    * we're passive. */
720   op->state->phase = PHASE_INVENTORY_PASSIVE;
721   return GNUNET_OK;
722 }
723
724
725 /**
726  * Compute the necessary order of an ibf
727  * from the size of the symmetric set difference.
728  *
729  * @param diff the difference
730  * @return the required size of the ibf
731  */
732 static unsigned int
733 get_order_from_difference (unsigned int diff)
734 {
735   unsigned int ibf_order;
736
737   ibf_order = 2;
738   while ( ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
739             ((1<<ibf_order) < SE_IBF_HASH_NUM) ) &&
740           (ibf_order < MAX_IBF_ORDER) )
741     ibf_order++;
742   // add one for correction
743   return ibf_order + 1;
744 }
745
746
747 /**
748  * Send a set element.
749  *
750  * @param cls the union operation `struct Operation *`
751  * @param key unused
752  * @param value the `struct ElementEntry *` to insert
753  *        into the key-to-element mapping
754  * @return #GNUNET_YES (to continue iterating)
755  */
756 static int
757 send_full_element_iterator (void *cls,
758                        const struct GNUNET_HashCode *key,
759                        void *value)
760 {
761   struct Operation *op = cls;
762   struct GNUNET_SET_ElementMessage *emsg;
763   struct ElementEntry *ee = value;
764   struct GNUNET_SET_Element *el = &ee->element;
765   struct GNUNET_MQ_Envelope *ev;
766
767   LOG (GNUNET_ERROR_TYPE_DEBUG,
768        "Sending element %s\n",
769        GNUNET_h2s (key));
770   ev = GNUNET_MQ_msg_extra (emsg,
771                             el->size,
772                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
773   emsg->element_type = htons (el->element_type);
774   GNUNET_memcpy (&emsg[1],
775                  el->data,
776                  el->size);
777   GNUNET_MQ_send (op->mq,
778                   ev);
779   return GNUNET_YES;
780 }
781
782
783 /**
784  * Switch to full set transmission for @a op.
785  *
786  * @param op operation to switch to full set transmission.
787  */
788 static void
789 send_full_set (struct Operation *op)
790 {
791   struct GNUNET_MQ_Envelope *ev;
792
793   op->state->phase = PHASE_FULL_SENDING;
794   LOG (GNUNET_ERROR_TYPE_DEBUG,
795        "Dedicing to transmit the full set\n");
796   /* FIXME: use a more memory-friendly way of doing this with an
797      iterator, just as we do in the non-full case! */
798   (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
799                                                 &send_full_element_iterator,
800                                                 op);
801   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
802   GNUNET_MQ_send (op->mq,
803                   ev);
804 }
805
806
807 /**
808  * Handle a strata estimator from a remote peer
809  *
810  * @param cls the union operation
811  * @param msg the message
812  */
813 int
814 check_union_p2p_strata_estimator (void *cls,
815                                   const struct StrataEstimatorMessage *msg)
816 {
817   struct Operation *op = cls;
818   int is_compressed;
819   size_t len;
820
821   if (op->state->phase != PHASE_EXPECT_SE)
822   {
823     GNUNET_break (0);
824     return GNUNET_SYSERR;
825   }
826   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
827   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
828   if ( (GNUNET_NO == is_compressed) &&
829        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
830   {
831     GNUNET_break (0);
832     return GNUNET_SYSERR;
833   }
834   return GNUNET_OK;
835 }
836
837
838 /**
839  * Handle a strata estimator from a remote peer
840  *
841  * @param cls the union operation
842  * @param msg the message
843  */
844 void
845 handle_union_p2p_strata_estimator (void *cls,
846                                    const struct StrataEstimatorMessage *msg)
847 {
848   struct Operation *op = cls;
849   struct StrataEstimator *remote_se;
850   unsigned int diff;
851   uint64_t other_size;
852   size_t len;
853   int is_compressed;
854
855   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
856   GNUNET_STATISTICS_update (_GSS_statistics,
857                             "# bytes of SE received",
858                             ntohs (msg->header.size),
859                             GNUNET_NO);
860   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
861   other_size = GNUNET_ntohll (msg->set_size);
862   remote_se = strata_estimator_create (SE_STRATA_COUNT,
863                                        SE_IBF_SIZE,
864                                        SE_IBF_HASH_NUM);
865   if (NULL == remote_se)
866   {
867     /* insufficient resources, fail */
868     fail_union_operation (op);
869     return;
870   }
871   if (GNUNET_OK !=
872       strata_estimator_read (&msg[1],
873                              len,
874                              is_compressed,
875                              remote_se))
876   {
877     /* decompression failed */
878     strata_estimator_destroy (remote_se);
879     fail_union_operation (op);
880     return;
881   }
882   GNUNET_assert (NULL != op->state->se);
883   diff = strata_estimator_difference (remote_se,
884                                       op->state->se);
885
886   if (diff > 200)
887     diff = diff * 3 / 2;
888
889   strata_estimator_destroy (remote_se);
890   strata_estimator_destroy (op->state->se);
891   op->state->se = NULL;
892   LOG (GNUNET_ERROR_TYPE_DEBUG,
893        "got se diff=%d, using ibf size %d\n",
894        diff,
895        1U << get_order_from_difference (diff));
896
897   {
898     char *set_debug;
899
900     set_debug = getenv ("GNUNET_SET_BENCHMARK");
901     if ( (NULL != set_debug) &&
902          (0 == strcmp (set_debug, "1")) )
903     {
904       FILE *f = fopen ("set.log", "a");
905       fprintf (f, "%llu\n", (unsigned long long) diff);
906       fclose (f);
907     }
908   }
909
910   if ( (GNUNET_YES == op->byzantine) &&
911        (other_size < op->byzantine_lower_bound) )
912   {
913     GNUNET_break (0);
914     fail_union_operation (op);
915     return;
916   }
917
918   if ( (GNUNET_YES == op->force_full) ||
919        (diff > op->state->initial_size / 4) ||
920        (0 == other_size) )
921   {
922     LOG (GNUNET_ERROR_TYPE_DEBUG,
923          "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
924          diff,
925          op->state->initial_size);
926     GNUNET_STATISTICS_update (_GSS_statistics,
927                               "# of full sends",
928                               1,
929                               GNUNET_NO);
930     if ( (op->state->initial_size <= other_size) ||
931          (0 == other_size) )
932     {
933       send_full_set (op);
934     }
935     else
936     {
937       struct GNUNET_MQ_Envelope *ev;
938
939       LOG (GNUNET_ERROR_TYPE_DEBUG,
940            "Telling other peer that we expect its full set\n");
941       op->state->phase = PHASE_EXPECT_IBF;
942       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
943       GNUNET_MQ_send (op->mq,
944                       ev);
945     }
946   }
947   else
948   {
949     GNUNET_STATISTICS_update (_GSS_statistics,
950                               "# of ibf sends",
951                               1,
952                               GNUNET_NO);
953     if (GNUNET_OK !=
954         send_ibf (op,
955                   get_order_from_difference (diff)))
956     {
957       /* Internal error, best we can do is shut the connection */
958       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
959                   "Failed to send IBF, closing connection\n");
960       fail_union_operation (op);
961       return;
962     }
963   }
964   GNUNET_CADET_receive_done (op->channel);
965 }
966
967
968 /**
969  * Iterator to send elements to a remote peer
970  *
971  * @param cls closure with the element key and the union operation
972  * @param key ignored
973  * @param value the key entry
974  */
975 static int
976 send_offers_iterator (void *cls,
977                       uint32_t key,
978                       void *value)
979 {
980   struct SendElementClosure *sec = cls;
981   struct Operation *op = sec->op;
982   struct KeyEntry *ke = value;
983   struct GNUNET_MQ_Envelope *ev;
984   struct GNUNET_MessageHeader *mh;
985
986   /* Detect 32-bit key collision for the 64-bit IBF keys. */
987   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
988     return GNUNET_YES;
989
990   ev = GNUNET_MQ_msg_header_extra (mh,
991                                    sizeof (struct GNUNET_HashCode),
992                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
993
994   GNUNET_assert (NULL != ev);
995   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
996   LOG (GNUNET_ERROR_TYPE_DEBUG,
997        "[OP %x] sending element offer (%s) to peer\n",
998        (void *) op,
999        GNUNET_h2s (&ke->element->element_hash));
1000   GNUNET_MQ_send (op->mq, ev);
1001   return GNUNET_YES;
1002 }
1003
1004
1005 /**
1006  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1007  *
1008  * @param op union operation
1009  * @param ibf_key IBF key of interest
1010  */
1011 static void
1012 send_offers_for_key (struct Operation *op,
1013                      struct IBF_Key ibf_key)
1014 {
1015   struct SendElementClosure send_cls;
1016
1017   send_cls.ibf_key = ibf_key;
1018   send_cls.op = op;
1019   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
1020                                                        (uint32_t) ibf_key.key_val,
1021                                                        &send_offers_iterator,
1022                                                        &send_cls);
1023 }
1024
1025
1026 /**
1027  * Decode which elements are missing on each side, and
1028  * send the appropriate offers and inquiries.
1029  *
1030  * @param op union operation
1031  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1032  */
1033 static int
1034 decode_and_send (struct Operation *op)
1035 {
1036   struct IBF_Key key;
1037   struct IBF_Key last_key;
1038   int side;
1039   unsigned int num_decoded;
1040   struct InvertibleBloomFilter *diff_ibf;
1041
1042   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1043
1044   if (GNUNET_OK !=
1045       prepare_ibf (op,
1046                    op->state->remote_ibf->size))
1047   {
1048     GNUNET_break (0);
1049     /* allocation failed */
1050     return GNUNET_SYSERR;
1051   }
1052   diff_ibf = ibf_dup (op->state->local_ibf);
1053   ibf_subtract (diff_ibf,
1054                 op->state->remote_ibf);
1055
1056   ibf_destroy (op->state->remote_ibf);
1057   op->state->remote_ibf = NULL;
1058
1059   LOG (GNUNET_ERROR_TYPE_DEBUG,
1060        "decoding IBF (size=%u)\n",
1061        diff_ibf->size);
1062
1063   num_decoded = 0;
1064   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1065
1066   while (1)
1067   {
1068     int res;
1069     int cycle_detected = GNUNET_NO;
1070
1071     last_key = key;
1072
1073     res = ibf_decode (diff_ibf, &side, &key);
1074     if (res == GNUNET_OK)
1075     {
1076       LOG (GNUNET_ERROR_TYPE_DEBUG,
1077            "decoded ibf key %lx\n",
1078            (unsigned long) key.key_val);
1079       num_decoded += 1;
1080       if ( (num_decoded > diff_ibf->size) ||
1081            ( (num_decoded > 1) &&
1082              (last_key.key_val == key.key_val) ) )
1083       {
1084         LOG (GNUNET_ERROR_TYPE_DEBUG,
1085              "detected cyclic ibf (decoded %u/%u)\n",
1086              num_decoded,
1087              diff_ibf->size);
1088         cycle_detected = GNUNET_YES;
1089       }
1090     }
1091     if ( (GNUNET_SYSERR == res) ||
1092          (GNUNET_YES == cycle_detected) )
1093     {
1094       int next_order;
1095       next_order = 0;
1096       while (1<<next_order < diff_ibf->size)
1097         next_order++;
1098       next_order++;
1099       if (next_order <= MAX_IBF_ORDER)
1100       {
1101         LOG (GNUNET_ERROR_TYPE_DEBUG,
1102              "decoding failed, sending larger ibf (size %u)\n",
1103              1<<next_order);
1104         GNUNET_STATISTICS_update (_GSS_statistics,
1105                                   "# of IBF retries",
1106                                   1,
1107                                   GNUNET_NO);
1108         op->state->salt_send++;
1109         if (GNUNET_OK !=
1110             send_ibf (op, next_order))
1111         {
1112           /* Internal error, best we can do is shut the connection */
1113           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1114                       "Failed to send IBF, closing connection\n");
1115           fail_union_operation (op);
1116           ibf_destroy (diff_ibf);
1117           return GNUNET_SYSERR;
1118         }
1119       }
1120       else
1121       {
1122         GNUNET_STATISTICS_update (_GSS_statistics,
1123                                   "# of failed union operations (too large)",
1124                                   1,
1125                                   GNUNET_NO);
1126         // XXX: Send the whole set, element-by-element
1127         LOG (GNUNET_ERROR_TYPE_ERROR,
1128              "set union failed: reached ibf limit\n");
1129         fail_union_operation (op);
1130         ibf_destroy (diff_ibf);
1131         return GNUNET_SYSERR;
1132       }
1133       break;
1134     }
1135     if (GNUNET_NO == res)
1136     {
1137       struct GNUNET_MQ_Envelope *ev;
1138
1139       LOG (GNUNET_ERROR_TYPE_DEBUG,
1140            "transmitted all values, sending DONE\n");
1141       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1142       GNUNET_MQ_send (op->mq, ev);
1143       /* We now wait until we get a DONE message back
1144        * and then wait for our MQ to be flushed and all our
1145        * demands be delivered. */
1146       break;
1147     }
1148     if (1 == side)
1149     {
1150       struct IBF_Key unsalted_key;
1151
1152       unsalt_key (&key,
1153                   op->state->salt_receive,
1154                   &unsalted_key);
1155       send_offers_for_key (op,
1156                            unsalted_key);
1157     }
1158     else if (-1 == side)
1159     {
1160       struct GNUNET_MQ_Envelope *ev;
1161       struct InquiryMessage *msg;
1162
1163       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1164        * the effort additional complexity. */
1165       ev = GNUNET_MQ_msg_extra (msg,
1166                                 sizeof (struct IBF_Key),
1167                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1168       msg->salt = htonl (op->state->salt_receive);
1169       GNUNET_memcpy (&msg[1],
1170               &key,
1171               sizeof (struct IBF_Key));
1172       LOG (GNUNET_ERROR_TYPE_DEBUG,
1173            "sending element inquiry for IBF key %lx\n",
1174            (unsigned long) key.key_val);
1175       GNUNET_MQ_send (op->mq, ev);
1176     }
1177     else
1178     {
1179       GNUNET_assert (0);
1180     }
1181   }
1182   ibf_destroy (diff_ibf);
1183   return GNUNET_OK;
1184 }
1185
1186
1187 /**
1188  * Check an IBF message from a remote peer.
1189  *
1190  * Reassemble the IBF from multiple pieces, and
1191  * process the whole IBF once possible.
1192  *
1193  * @param cls the union operation
1194  * @param msg the header of the message
1195  * @return #GNUNET_OK if @a msg is well-formed
1196  */
1197 int
1198 check_union_p2p_ibf (void *cls,
1199                      const struct IBFMessage *msg)
1200 {
1201   struct Operation *op = cls;
1202   unsigned int buckets_in_message;
1203
1204   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1205   {
1206     GNUNET_break_op (0);
1207     return GNUNET_SYSERR;
1208   }
1209   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1210   if (0 == buckets_in_message)
1211   {
1212     GNUNET_break_op (0);
1213     return GNUNET_SYSERR;
1214   }
1215   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1216   {
1217     GNUNET_break_op (0);
1218     return GNUNET_SYSERR;
1219   }
1220   if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1221   {
1222     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1223     {
1224       GNUNET_break_op (0);
1225       return GNUNET_SYSERR;
1226     }
1227     if (1<<msg->order != op->state->remote_ibf->size)
1228     {
1229       GNUNET_break_op (0);
1230       return GNUNET_SYSERR;
1231     }
1232     if (ntohl (msg->salt) != op->state->salt_receive)
1233     {
1234       GNUNET_break_op (0);
1235       return GNUNET_SYSERR;
1236     }
1237   }
1238   else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1239             (op->state->phase != PHASE_EXPECT_IBF) )
1240   {
1241     GNUNET_break_op (0);
1242     return GNUNET_SYSERR;
1243   }
1244
1245   return GNUNET_OK;
1246 }
1247
1248
1249 /**
1250  * Handle an IBF message from a remote peer.
1251  *
1252  * Reassemble the IBF from multiple pieces, and
1253  * process the whole IBF once possible.
1254  *
1255  * @param cls the union operation
1256  * @param msg the header of the message
1257  */
1258 void
1259 handle_union_p2p_ibf (void *cls,
1260                       const struct IBFMessage *msg)
1261 {
1262   struct Operation *op = cls;
1263   unsigned int buckets_in_message;
1264
1265   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1266   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1267        (op->state->phase == PHASE_EXPECT_IBF) )
1268   {
1269     op->state->phase = PHASE_EXPECT_IBF_CONT;
1270     GNUNET_assert (NULL == op->state->remote_ibf);
1271     LOG (GNUNET_ERROR_TYPE_DEBUG,
1272          "Creating new ibf of size %u\n",
1273          1 << msg->order);
1274     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1275     op->state->salt_receive = ntohl (msg->salt);
1276     LOG (GNUNET_ERROR_TYPE_DEBUG,
1277          "Receiving new IBF with salt %u\n",
1278          op->state->salt_receive);
1279     if (NULL == op->state->remote_ibf)
1280     {
1281       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1282                   "Failed to parse remote IBF, closing connection\n");
1283       fail_union_operation (op);
1284       return;
1285     }
1286     op->state->ibf_buckets_received = 0;
1287     if (0 != ntohl (msg->offset))
1288     {
1289       GNUNET_break_op (0);
1290       fail_union_operation (op);
1291       return;
1292     }
1293   }
1294   else
1295   {
1296     GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1297     LOG (GNUNET_ERROR_TYPE_DEBUG,
1298          "Received more of IBF\n");
1299   }
1300   GNUNET_assert (NULL != op->state->remote_ibf);
1301
1302   ibf_read_slice (&msg[1],
1303                   op->state->ibf_buckets_received,
1304                   buckets_in_message,
1305                   op->state->remote_ibf);
1306   op->state->ibf_buckets_received += buckets_in_message;
1307
1308   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1309   {
1310     LOG (GNUNET_ERROR_TYPE_DEBUG,
1311          "received full ibf\n");
1312     op->state->phase = PHASE_INVENTORY_ACTIVE;
1313     if (GNUNET_OK !=
1314         decode_and_send (op))
1315     {
1316       /* Internal error, best we can do is shut down */
1317       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1318                   "Failed to decode IBF, closing connection\n");
1319       fail_union_operation (op);
1320       return;
1321     }
1322   }
1323   GNUNET_CADET_receive_done (op->channel);
1324 }
1325
1326
1327 /**
1328  * Send a result message to the client indicating
1329  * that there is a new element.
1330  *
1331  * @param op union operation
1332  * @param element element to send
1333  * @param status status to send with the new element
1334  */
1335 static void
1336 send_client_element (struct Operation *op,
1337                      struct GNUNET_SET_Element *element,
1338                      int status)
1339 {
1340   struct GNUNET_MQ_Envelope *ev;
1341   struct GNUNET_SET_ResultMessage *rm;
1342
1343   LOG (GNUNET_ERROR_TYPE_DEBUG,
1344        "sending element (size %u) to client\n",
1345        element->size);
1346   GNUNET_assert (0 != op->client_request_id);
1347   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1348   if (NULL == ev)
1349   {
1350     GNUNET_MQ_discard (ev);
1351     GNUNET_break (0);
1352     return;
1353   }
1354   rm->result_status = htons (status);
1355   rm->request_id = htonl (op->client_request_id);
1356   rm->element_type = htons (element->element_type);
1357   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1358   GNUNET_memcpy (&rm[1],
1359                  element->data,
1360                  element->size);
1361   GNUNET_MQ_send (op->set->cs->mq,
1362                   ev);
1363 }
1364
1365
1366 /**
1367  * Destroy remote channel.
1368  *
1369  * @param op operation
1370  */
1371 void destroy_channel (struct Operation *op)
1372 {
1373   struct GNUNET_CADET_Channel *channel;
1374
1375   if (NULL != (channel = op->channel))
1376   {
1377     /* This will free op; called conditionally as this helper function
1378        is also called from within the channel disconnect handler. */
1379     op->channel = NULL;
1380     GNUNET_CADET_channel_destroy (channel);
1381   }
1382 }
1383
1384
1385 /**
1386  * Signal to the client that the operation has finished and
1387  * destroy the operation.
1388  *
1389  * @param cls operation to destroy
1390  */
1391 static void
1392 send_client_done (void *cls)
1393 {
1394   struct Operation *op = cls;
1395   struct GNUNET_MQ_Envelope *ev;
1396   struct GNUNET_SET_ResultMessage *rm;
1397
1398   if (GNUNET_YES == op->state->client_done_sent) {
1399     return;
1400   }
1401
1402   if (PHASE_DONE != op->state->phase) {
1403     LOG (GNUNET_ERROR_TYPE_WARNING,
1404          "union operation failed\n");
1405     ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1406     rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1407     rm->request_id = htonl (op->client_request_id);
1408     rm->element_type = htons (0);
1409     GNUNET_MQ_send (op->set->cs->mq,
1410                     ev);
1411     return;
1412   }
1413
1414   op->state->client_done_sent = GNUNET_YES;
1415
1416   LOG (GNUNET_ERROR_TYPE_INFO,
1417        "Signalling client that union operation is done\n");
1418   ev = GNUNET_MQ_msg (rm,
1419                       GNUNET_MESSAGE_TYPE_SET_RESULT);
1420   rm->request_id = htonl (op->client_request_id);
1421   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1422   rm->element_type = htons (0);
1423   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1424   GNUNET_MQ_send (op->set->cs->mq,
1425                   ev);
1426 }
1427
1428
1429 /**
1430  * Tests if the operation is finished, and if so notify.
1431  *
1432  * @param op operation to check
1433  */
1434 static void
1435 maybe_finish (struct Operation *op)
1436 {
1437   unsigned int num_demanded;
1438
1439   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1440
1441   if (PHASE_FINISH_WAITING == op->state->phase)
1442   {
1443     LOG (GNUNET_ERROR_TYPE_DEBUG,
1444          "In PHASE_FINISH_WAITING, pending %u demands\n",
1445          num_demanded);
1446     if (0 == num_demanded)
1447     {
1448       struct GNUNET_MQ_Envelope *ev;
1449
1450       op->state->phase = PHASE_DONE;
1451       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1452       GNUNET_MQ_send (op->mq,
1453                       ev);
1454       /* We now wait until the other peer sends P2P_OVER
1455        * after it got all elements from us. */
1456     }
1457   }
1458   if (PHASE_FINISH_CLOSING == op->state->phase)
1459   {
1460     LOG (GNUNET_ERROR_TYPE_DEBUG,
1461          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1462          num_demanded);
1463     if (0 == num_demanded)
1464     {
1465       op->state->phase = PHASE_DONE;
1466       send_client_done (op);
1467       destroy_channel (op);
1468     }
1469   }
1470 }
1471
1472
1473 /**
1474  * Check an element message from a remote peer.
1475  *
1476  * @param cls the union operation
1477  * @param emsg the message
1478  */
1479 int
1480 check_union_p2p_elements (void *cls,
1481                           const struct GNUNET_SET_ElementMessage *emsg)
1482 {
1483   struct Operation *op = cls;
1484
1485   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1486   {
1487     GNUNET_break_op (0);
1488     return GNUNET_SYSERR;
1489   }
1490   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1491   {
1492     GNUNET_break_op (0);
1493     return GNUNET_SYSERR;
1494   }
1495   return GNUNET_OK;
1496 }
1497
1498
1499 /**
1500  * Handle an element message from a remote peer.
1501  * Sent by the other peer either because we decoded an IBF and placed a demand,
1502  * or because the other peer switched to full set transmission.
1503  *
1504  * @param cls the union operation
1505  * @param emsg the message
1506  */
1507 void
1508 handle_union_p2p_elements (void *cls,
1509                            const struct GNUNET_SET_ElementMessage *emsg)
1510 {
1511   struct Operation *op = cls;
1512   struct ElementEntry *ee;
1513   struct KeyEntry *ke;
1514   uint16_t element_size;
1515
1516   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1517   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1518   GNUNET_memcpy (&ee[1],
1519                  &emsg[1],
1520                  element_size);
1521   ee->element.size = element_size;
1522   ee->element.data = &ee[1];
1523   ee->element.element_type = ntohs (emsg->element_type);
1524   ee->remote = GNUNET_YES;
1525   GNUNET_SET_element_hash (&ee->element,
1526                            &ee->element_hash);
1527   if (GNUNET_NO ==
1528       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1529                                             &ee->element_hash,
1530                                             NULL))
1531   {
1532     /* We got something we didn't demand, since it's not in our map. */
1533     GNUNET_break_op (0);
1534     fail_union_operation (op);
1535     return;
1536   }
1537
1538   LOG (GNUNET_ERROR_TYPE_DEBUG,
1539        "Got element (size %u, hash %s) from peer\n",
1540        (unsigned int) element_size,
1541        GNUNET_h2s (&ee->element_hash));
1542
1543   GNUNET_STATISTICS_update (_GSS_statistics,
1544                             "# received elements",
1545                             1,
1546                             GNUNET_NO);
1547   GNUNET_STATISTICS_update (_GSS_statistics,
1548                             "# exchanged elements",
1549                             1,
1550                             GNUNET_NO);
1551
1552   op->state->received_total++;
1553
1554   ke = op_get_element (op, &ee->element_hash);
1555   if (NULL != ke)
1556   {
1557     /* Got repeated element.  Should not happen since
1558      * we track demands. */
1559     GNUNET_STATISTICS_update (_GSS_statistics,
1560                               "# repeated elements",
1561                               1,
1562                               GNUNET_NO);
1563     ke->received = GNUNET_YES;
1564     GNUNET_free (ee);
1565   }
1566   else
1567   {
1568     LOG (GNUNET_ERROR_TYPE_DEBUG,
1569          "Registering new element from remote peer\n");
1570     op->state->received_fresh++;
1571     op_register_element (op, ee, GNUNET_YES);
1572     /* only send results immediately if the client wants it */
1573     switch (op->result_mode)
1574     {
1575       case GNUNET_SET_RESULT_ADDED:
1576         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1577         break;
1578       case GNUNET_SET_RESULT_SYMMETRIC:
1579         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1580         break;
1581       default:
1582         /* Result mode not supported, should have been caught earlier. */
1583         GNUNET_break (0);
1584         break;
1585     }
1586   }
1587
1588   if ( (op->state->received_total > 8) &&
1589        (op->state->received_fresh < op->state->received_total / 3) )
1590   {
1591     /* The other peer gave us lots of old elements, there's something wrong. */
1592     GNUNET_break_op (0);
1593     fail_union_operation (op);
1594     return;
1595   }
1596   GNUNET_CADET_receive_done (op->channel);
1597   maybe_finish (op);
1598 }
1599
1600
1601 /**
1602  * Check a full element message from a remote peer.
1603  *
1604  * @param cls the union operation
1605  * @param emsg the message
1606  */
1607 int
1608 check_union_p2p_full_element (void *cls,
1609                               const struct GNUNET_SET_ElementMessage *emsg)
1610 {
1611   struct Operation *op = cls;
1612
1613   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1614   {
1615     GNUNET_break_op (0);
1616     return GNUNET_SYSERR;
1617   }
1618   // FIXME: check that we expect full elements here?
1619   return GNUNET_OK;
1620 }
1621
1622
1623 /**
1624  * Handle an element message from a remote peer.
1625  *
1626  * @param cls the union operation
1627  * @param emsg the message
1628  */
1629 void
1630 handle_union_p2p_full_element (void *cls,
1631                                const struct GNUNET_SET_ElementMessage *emsg)
1632 {
1633   struct Operation *op = cls;
1634   struct ElementEntry *ee;
1635   struct KeyEntry *ke;
1636   uint16_t element_size;
1637
1638   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1639   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1640   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1641   ee->element.size = element_size;
1642   ee->element.data = &ee[1];
1643   ee->element.element_type = ntohs (emsg->element_type);
1644   ee->remote = GNUNET_YES;
1645   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1646
1647   LOG (GNUNET_ERROR_TYPE_DEBUG,
1648        "Got element (full diff, size %u, hash %s) from peer\n",
1649        (unsigned int) element_size,
1650        GNUNET_h2s (&ee->element_hash));
1651
1652   GNUNET_STATISTICS_update (_GSS_statistics,
1653                             "# received elements",
1654                             1,
1655                             GNUNET_NO);
1656   GNUNET_STATISTICS_update (_GSS_statistics,
1657                             "# exchanged elements",
1658                             1,
1659                             GNUNET_NO);
1660
1661   op->state->received_total++;
1662
1663   ke = op_get_element (op, &ee->element_hash);
1664   if (NULL != ke)
1665   {
1666     /* Got repeated element.  Should not happen since
1667      * we track demands. */
1668     GNUNET_STATISTICS_update (_GSS_statistics,
1669                               "# repeated elements",
1670                               1,
1671                               GNUNET_NO);
1672     ke->received = GNUNET_YES;
1673     GNUNET_free (ee);
1674   }
1675   else
1676   {
1677     LOG (GNUNET_ERROR_TYPE_DEBUG,
1678          "Registering new element from remote peer\n");
1679     op->state->received_fresh++;
1680     op_register_element (op, ee, GNUNET_YES);
1681     /* only send results immediately if the client wants it */
1682     switch (op->result_mode)
1683     {
1684       case GNUNET_SET_RESULT_ADDED:
1685         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1686         break;
1687       case GNUNET_SET_RESULT_SYMMETRIC:
1688         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1689         break;
1690       default:
1691         /* Result mode not supported, should have been caught earlier. */
1692         GNUNET_break (0);
1693         break;
1694     }
1695   }
1696
1697   if ( (GNUNET_YES == op->byzantine) &&
1698        (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1699        (op->state->received_fresh < op->state->received_total / 6) )
1700   {
1701     /* The other peer gave us lots of old elements, there's something wrong. */
1702     LOG (GNUNET_ERROR_TYPE_ERROR,
1703          "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1704          (unsigned long long) op->state->received_fresh,
1705          (unsigned long long) op->state->received_total);
1706     GNUNET_break_op (0);
1707     fail_union_operation (op);
1708     return;
1709   }
1710   GNUNET_CADET_receive_done (op->channel);
1711 }
1712
1713
1714 /**
1715  * Send offers (for GNUNET_Hash-es) in response
1716  * to inquiries (for IBF_Key-s).
1717  *
1718  * @param cls the union operation
1719  * @param msg the message
1720  */
1721 int
1722 check_union_p2p_inquiry (void *cls,
1723                          const struct InquiryMessage *msg)
1724 {
1725   struct Operation *op = cls;
1726   unsigned int num_keys;
1727
1728   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1729   {
1730     GNUNET_break_op (0);
1731     return GNUNET_SYSERR;
1732   }
1733   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1734   {
1735     GNUNET_break_op (0);
1736     return GNUNET_SYSERR;
1737   }
1738   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1739     / sizeof (struct IBF_Key);
1740   if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1741       != num_keys * sizeof (struct IBF_Key))
1742   {
1743     GNUNET_break_op (0);
1744     return GNUNET_SYSERR;
1745   }
1746   return GNUNET_OK;
1747 }
1748
1749
1750 /**
1751  * Send offers (for GNUNET_Hash-es) in response
1752  * to inquiries (for IBF_Key-s).
1753  *
1754  * @param cls the union operation
1755  * @param msg the message
1756  */
1757 void
1758 handle_union_p2p_inquiry (void *cls,
1759                           const struct InquiryMessage *msg)
1760 {
1761   struct Operation *op = cls;
1762   const struct IBF_Key *ibf_key;
1763   unsigned int num_keys;
1764
1765   LOG (GNUNET_ERROR_TYPE_DEBUG,
1766        "Received union inquiry\n");
1767   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1768     / sizeof (struct IBF_Key);
1769   ibf_key = (const struct IBF_Key *) &msg[1];
1770   while (0 != num_keys--)
1771   {
1772     struct IBF_Key unsalted_key;
1773
1774     unsalt_key (ibf_key,
1775                 ntohl (msg->salt),
1776                 &unsalted_key);
1777     send_offers_for_key (op,
1778                          unsalted_key);
1779     ibf_key++;
1780   }
1781   GNUNET_CADET_receive_done (op->channel);
1782 }
1783
1784
1785 /**
1786  * Iterator over hash map entries, called to
1787  * destroy the linked list of colliding ibf key entries.
1788  *
1789  * @param cls closure
1790  * @param key current key code
1791  * @param value value in the hash map
1792  * @return #GNUNET_YES if we should continue to iterate,
1793  *         #GNUNET_NO if not.
1794  */
1795 static int
1796 send_missing_full_elements_iter (void *cls,
1797                                  uint32_t key,
1798                                  void *value)
1799 {
1800   struct Operation *op = cls;
1801   struct KeyEntry *ke = value;
1802   struct GNUNET_MQ_Envelope *ev;
1803   struct GNUNET_SET_ElementMessage *emsg;
1804   struct ElementEntry *ee = ke->element;
1805
1806   if (GNUNET_YES == ke->received)
1807     return GNUNET_YES;
1808   ev = GNUNET_MQ_msg_extra (emsg,
1809                             ee->element.size,
1810                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1811   GNUNET_memcpy (&emsg[1],
1812                  ee->element.data,
1813                  ee->element.size);
1814   emsg->element_type = htons (ee->element.element_type);
1815   GNUNET_MQ_send (op->mq,
1816                   ev);
1817   return GNUNET_YES;
1818 }
1819
1820
1821 /**
1822  * Handle a request for full set transmission.
1823  *
1824  * @parem cls closure, a set union operation
1825  * @param mh the demand message
1826  */
1827 void
1828 handle_union_p2p_request_full (void *cls,
1829                                const struct GNUNET_MessageHeader *mh)
1830 {
1831   struct Operation *op = cls;
1832
1833   LOG (GNUNET_ERROR_TYPE_DEBUG,
1834        "Received request for full set transmission\n");
1835   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1836   {
1837     GNUNET_break_op (0);
1838     fail_union_operation (op);
1839     return;
1840   }
1841   if (PHASE_EXPECT_IBF != op->state->phase)
1842   {
1843     GNUNET_break_op (0);
1844     fail_union_operation (op);
1845     return;
1846   }
1847
1848   // FIXME: we need to check that our set is larger than the
1849   // byzantine_lower_bound by some threshold
1850   send_full_set (op);
1851   GNUNET_CADET_receive_done (op->channel);
1852 }
1853
1854
1855 /**
1856  * Handle a "full done" message.
1857  *
1858  * @parem cls closure, a set union operation
1859  * @param mh the demand message
1860  */
1861 void
1862 handle_union_p2p_full_done (void *cls,
1863                             const struct GNUNET_MessageHeader *mh)
1864 {
1865   struct Operation *op = cls;
1866
1867   switch (op->state->phase)
1868   {
1869   case PHASE_EXPECT_IBF:
1870     {
1871       struct GNUNET_MQ_Envelope *ev;
1872
1873       LOG (GNUNET_ERROR_TYPE_DEBUG,
1874            "got FULL DONE, sending elements that other peer is missing\n");
1875
1876       /* send all the elements that did not come from the remote peer */
1877       GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1878                                                &send_missing_full_elements_iter,
1879                                                op);
1880
1881       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1882       GNUNET_MQ_send (op->mq,
1883                       ev);
1884       op->state->phase = PHASE_DONE;
1885       /* we now wait until the other peer sends us the OVER message*/
1886     }
1887     break;
1888   case PHASE_FULL_SENDING:
1889     {
1890       LOG (GNUNET_ERROR_TYPE_DEBUG,
1891            "got FULL DONE, finishing\n");
1892       /* We sent the full set, and got the response for that.  We're done. */
1893       op->state->phase = PHASE_DONE;
1894       GNUNET_CADET_receive_done (op->channel);
1895       send_client_done (op);
1896       destroy_channel (op);
1897       return;
1898     }
1899     break;
1900   default:
1901     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1902                 "Handle full done phase is %u\n",
1903                 (unsigned) op->state->phase);
1904     GNUNET_break_op (0);
1905     fail_union_operation (op);
1906     return;
1907   }
1908   GNUNET_CADET_receive_done (op->channel);
1909 }
1910
1911
1912 /**
1913  * Check a demand by the other peer for elements based on a list
1914  * of `struct GNUNET_HashCode`s.
1915  *
1916  * @parem cls closure, a set union operation
1917  * @param mh the demand message
1918  * @return #GNUNET_OK if @a mh is well-formed
1919  */
1920 int
1921 check_union_p2p_demand (void *cls,
1922                         const struct GNUNET_MessageHeader *mh)
1923 {
1924   struct Operation *op = cls;
1925   unsigned int num_hashes;
1926
1927   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1928   {
1929     GNUNET_break_op (0);
1930     return GNUNET_SYSERR;
1931   }
1932   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1933     / sizeof (struct GNUNET_HashCode);
1934   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1935       != num_hashes * sizeof (struct GNUNET_HashCode))
1936   {
1937     GNUNET_break_op (0);
1938     return GNUNET_SYSERR;
1939   }
1940   return GNUNET_OK;
1941 }
1942
1943
1944 /**
1945  * Handle a demand by the other peer for elements based on a list
1946  * of `struct GNUNET_HashCode`s.
1947  *
1948  * @parem cls closure, a set union operation
1949  * @param mh the demand message
1950  */
1951 void
1952 handle_union_p2p_demand (void *cls,
1953                          const struct GNUNET_MessageHeader *mh)
1954 {
1955   struct Operation *op = cls;
1956   struct ElementEntry *ee;
1957   struct GNUNET_SET_ElementMessage *emsg;
1958   const struct GNUNET_HashCode *hash;
1959   unsigned int num_hashes;
1960   struct GNUNET_MQ_Envelope *ev;
1961
1962   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1963     / sizeof (struct GNUNET_HashCode);
1964   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1965        num_hashes > 0;
1966        hash++, num_hashes--)
1967   {
1968     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1969                                             hash);
1970     if (NULL == ee)
1971     {
1972       /* Demand for non-existing element. */
1973       GNUNET_break_op (0);
1974       fail_union_operation (op);
1975       return;
1976     }
1977     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1978     {
1979       /* Probably confused lazily copied sets. */
1980       GNUNET_break_op (0);
1981       fail_union_operation (op);
1982       return;
1983     }
1984     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1985     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1986     emsg->reserved = htons (0);
1987     emsg->element_type = htons (ee->element.element_type);
1988     LOG (GNUNET_ERROR_TYPE_DEBUG,
1989          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1990          (void *) op,
1991          (unsigned int) ee->element.size,
1992          GNUNET_h2s (&ee->element_hash));
1993     GNUNET_MQ_send (op->mq, ev);
1994     GNUNET_STATISTICS_update (_GSS_statistics,
1995                               "# exchanged elements",
1996                               1,
1997                               GNUNET_NO);
1998
1999     switch (op->result_mode)
2000     {
2001       case GNUNET_SET_RESULT_ADDED:
2002         /* Nothing to do. */
2003         break;
2004       case GNUNET_SET_RESULT_SYMMETRIC:
2005         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2006         break;
2007       default:
2008         /* Result mode not supported, should have been caught earlier. */
2009         GNUNET_break (0);
2010         break;
2011     }
2012   }
2013   GNUNET_CADET_receive_done (op->channel);
2014 }
2015
2016
2017 /**
2018  * Check offer (of `struct GNUNET_HashCode`s).
2019  *
2020  * @param cls the union operation
2021  * @param mh the message
2022  * @return #GNUNET_OK if @a mh is well-formed
2023  */
2024 int
2025 check_union_p2p_offer (void *cls,
2026                         const struct GNUNET_MessageHeader *mh)
2027 {
2028   struct Operation *op = cls;
2029   unsigned int num_hashes;
2030
2031   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2032   {
2033     GNUNET_break_op (0);
2034     return GNUNET_SYSERR;
2035   }
2036   /* look up elements and send them */
2037   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2038        (op->state->phase != PHASE_INVENTORY_ACTIVE))
2039   {
2040     GNUNET_break_op (0);
2041     return GNUNET_SYSERR;
2042   }
2043   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2044     / sizeof (struct GNUNET_HashCode);
2045   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) !=
2046       num_hashes * sizeof (struct GNUNET_HashCode))
2047   {
2048     GNUNET_break_op (0);
2049     return GNUNET_SYSERR;
2050   }
2051   return GNUNET_OK;
2052 }
2053
2054
2055 /**
2056  * Handle offers (of `struct GNUNET_HashCode`s) and
2057  * respond with demands (of `struct GNUNET_HashCode`s).
2058  *
2059  * @param cls the union operation
2060  * @param mh the message
2061  */
2062 void
2063 handle_union_p2p_offer (void *cls,
2064                         const struct GNUNET_MessageHeader *mh)
2065 {
2066   struct Operation *op = cls;
2067   const struct GNUNET_HashCode *hash;
2068   unsigned int num_hashes;
2069
2070   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2071     / sizeof (struct GNUNET_HashCode);
2072   for (hash = (const struct GNUNET_HashCode *) &mh[1];
2073        num_hashes > 0;
2074        hash++, num_hashes--)
2075   {
2076     struct ElementEntry *ee;
2077     struct GNUNET_MessageHeader *demands;
2078     struct GNUNET_MQ_Envelope *ev;
2079
2080     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2081                                             hash);
2082     if (NULL != ee)
2083       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2084         continue;
2085
2086     if (GNUNET_YES ==
2087         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2088                                                 hash))
2089     {
2090       LOG (GNUNET_ERROR_TYPE_DEBUG,
2091            "Skipped sending duplicate demand\n");
2092       continue;
2093     }
2094
2095     GNUNET_assert (GNUNET_OK ==
2096                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
2097                                                       hash,
2098                                                       NULL,
2099                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2100
2101     LOG (GNUNET_ERROR_TYPE_DEBUG,
2102          "[OP %x] Requesting element (hash %s)\n",
2103          (void *) op, GNUNET_h2s (hash));
2104     ev = GNUNET_MQ_msg_header_extra (demands,
2105                                      sizeof (struct GNUNET_HashCode),
2106                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2107     GNUNET_memcpy (&demands[1],
2108                    hash,
2109                    sizeof (struct GNUNET_HashCode));
2110     GNUNET_MQ_send (op->mq, ev);
2111   }
2112   GNUNET_CADET_receive_done (op->channel);
2113 }
2114
2115
2116 /**
2117  * Handle a done message from a remote peer
2118  *
2119  * @param cls the union operation
2120  * @param mh the message
2121  */
2122 void
2123 handle_union_p2p_done (void *cls,
2124                        const struct GNUNET_MessageHeader *mh)
2125 {
2126   struct Operation *op = cls;
2127
2128   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2129   {
2130     GNUNET_break_op (0);
2131     fail_union_operation (op);
2132     return;
2133   }
2134   switch (op->state->phase)
2135   {
2136   case PHASE_INVENTORY_PASSIVE:
2137     /* We got all requests, but still have to send our elements in response. */
2138     op->state->phase = PHASE_FINISH_WAITING;
2139
2140     LOG (GNUNET_ERROR_TYPE_DEBUG,
2141          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2142     /* The active peer is done sending offers
2143      * and inquiries.  This means that all
2144      * our responses to that (demands and offers)
2145      * must be in flight (queued or in mesh).
2146      *
2147      * We should notify the active peer once
2148      * all our demands are satisfied, so that the active
2149      * peer can quit if we gave him everything.
2150      */
2151     GNUNET_CADET_receive_done (op->channel);
2152     maybe_finish (op);
2153     return;
2154   case PHASE_INVENTORY_ACTIVE:
2155     LOG (GNUNET_ERROR_TYPE_DEBUG,
2156          "got DONE (as active partner), waiting to finish\n");
2157     /* All demands of the other peer are satisfied,
2158      * and we processed all offers, thus we know
2159      * exactly what our demands must be.
2160      *
2161      * We'll close the channel
2162      * to the other peer once our demands are met.
2163      */
2164     op->state->phase = PHASE_FINISH_CLOSING;
2165     GNUNET_CADET_receive_done (op->channel);
2166     maybe_finish (op);
2167     return;
2168   default:
2169     GNUNET_break_op (0);
2170     fail_union_operation (op);
2171     return;
2172   }
2173 }
2174
2175 /**
2176  * Handle a over message from a remote peer
2177  *
2178  * @param cls the union operation
2179  * @param mh the message
2180  */
2181 void
2182 handle_union_p2p_over (void *cls,
2183                        const struct GNUNET_MessageHeader *mh)
2184 {
2185   send_client_done (cls);
2186 }
2187
2188
2189 /**
2190  * Initiate operation to evaluate a set union with a remote peer.
2191  *
2192  * @param op operation to perform (to be initialized)
2193  * @param opaque_context message to be transmitted to the listener
2194  *        to convince him to accept, may be NULL
2195  */
2196 static struct OperationState *
2197 union_evaluate (struct Operation *op,
2198                 const struct GNUNET_MessageHeader *opaque_context)
2199 {
2200   struct OperationState *state;
2201   struct GNUNET_MQ_Envelope *ev;
2202   struct OperationRequestMessage *msg;
2203
2204   ev = GNUNET_MQ_msg_nested_mh (msg,
2205                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2206                                 opaque_context);
2207   if (NULL == ev)
2208   {
2209     /* the context message is too large */
2210     GNUNET_break (0);
2211     return NULL;
2212   }
2213   state = GNUNET_new (struct OperationState);
2214   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2215                                                                  GNUNET_NO);
2216   /* copy the current generation's strata estimator for this operation */
2217   state->se = strata_estimator_dup (op->set->state->se);
2218   /* we started the operation, thus we have to send the operation request */
2219   state->phase = PHASE_EXPECT_SE;
2220   state->salt_receive = state->salt_send = 42; // FIXME?????
2221   LOG (GNUNET_ERROR_TYPE_DEBUG,
2222        "Initiating union operation evaluation\n");
2223   GNUNET_STATISTICS_update (_GSS_statistics,
2224                             "# of total union operations",
2225                             1,
2226                             GNUNET_NO);
2227   GNUNET_STATISTICS_update (_GSS_statistics,
2228                             "# of initiated union operations",
2229                             1,
2230                             GNUNET_NO);
2231   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2232   GNUNET_MQ_send (op->mq,
2233                   ev);
2234
2235   if (NULL != opaque_context)
2236     LOG (GNUNET_ERROR_TYPE_DEBUG,
2237          "sent op request with context message\n");
2238   else
2239     LOG (GNUNET_ERROR_TYPE_DEBUG,
2240          "sent op request without context message\n");
2241
2242   op->state = state;
2243   initialize_key_to_element (op);
2244   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2245   return state;
2246 }
2247
2248
2249 /**
2250  * Accept an union operation request from a remote peer.
2251  * Only initializes the private operation state.
2252  *
2253  * @param op operation that will be accepted as a union operation
2254  */
2255 static struct OperationState *
2256 union_accept (struct Operation *op)
2257 {
2258   struct OperationState *state;
2259   const struct StrataEstimator *se;
2260   struct GNUNET_MQ_Envelope *ev;
2261   struct StrataEstimatorMessage *strata_msg;
2262   char *buf;
2263   size_t len;
2264   uint16_t type;
2265
2266   LOG (GNUNET_ERROR_TYPE_DEBUG,
2267        "accepting set union operation\n");
2268   GNUNET_STATISTICS_update (_GSS_statistics,
2269                             "# of accepted union operations",
2270                             1,
2271                             GNUNET_NO);
2272   GNUNET_STATISTICS_update (_GSS_statistics,
2273                             "# of total union operations",
2274                             1,
2275                             GNUNET_NO);
2276
2277   state = GNUNET_new (struct OperationState);
2278   state->se = strata_estimator_dup (op->set->state->se);
2279   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2280                                                                  GNUNET_NO);
2281   state->salt_receive = state->salt_send = 42; // FIXME?????
2282   op->state = state;
2283   initialize_key_to_element (op);
2284   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2285
2286   /* kick off the operation */
2287   se = state->se;
2288   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2289   len = strata_estimator_write (se,
2290                                 buf);
2291   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2292     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2293   else
2294     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2295   ev = GNUNET_MQ_msg_extra (strata_msg,
2296                             len,
2297                             type);
2298   GNUNET_memcpy (&strata_msg[1],
2299                  buf,
2300                  len);
2301   GNUNET_free (buf);
2302   strata_msg->set_size
2303     = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
2304   GNUNET_MQ_send (op->mq,
2305                   ev);
2306   state->phase = PHASE_EXPECT_IBF;
2307   return state;
2308 }
2309
2310
2311 /**
2312  * Create a new set supporting the union operation
2313  *
2314  * We maintain one strata estimator per set and then manipulate it over the
2315  * lifetime of the set, as recreating a strata estimator would be expensive.
2316  *
2317  * @return the newly created set, NULL on error
2318  */
2319 static struct SetState *
2320 union_set_create (void)
2321 {
2322   struct SetState *set_state;
2323
2324   LOG (GNUNET_ERROR_TYPE_DEBUG,
2325        "union set created\n");
2326   set_state = GNUNET_new (struct SetState);
2327   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2328                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2329   if (NULL == set_state->se)
2330   {
2331     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2332                 "Failed to allocate strata estimator\n");
2333     GNUNET_free (set_state);
2334     return NULL;
2335   }
2336   return set_state;
2337 }
2338
2339
2340 /**
2341  * Add the element from the given element message to the set.
2342  *
2343  * @param set_state state of the set want to add to
2344  * @param ee the element to add to the set
2345  */
2346 static void
2347 union_add (struct SetState *set_state,
2348            struct ElementEntry *ee)
2349 {
2350   strata_estimator_insert (set_state->se,
2351                            get_ibf_key (&ee->element_hash));
2352 }
2353
2354
2355 /**
2356  * Remove the element given in the element message from the set.
2357  * Only marks the element as removed, so that older set operations can still exchange it.
2358  *
2359  * @param set_state state of the set to remove from
2360  * @param ee set element to remove
2361  */
2362 static void
2363 union_remove (struct SetState *set_state,
2364               struct ElementEntry *ee)
2365 {
2366   strata_estimator_remove (set_state->se,
2367                            get_ibf_key (&ee->element_hash));
2368 }
2369
2370
2371 /**
2372  * Destroy a set that supports the union operation.
2373  *
2374  * @param set_state the set to destroy
2375  */
2376 static void
2377 union_set_destroy (struct SetState *set_state)
2378 {
2379   if (NULL != set_state->se)
2380   {
2381     strata_estimator_destroy (set_state->se);
2382     set_state->se = NULL;
2383   }
2384   GNUNET_free (set_state);
2385 }
2386
2387
2388 /**
2389  * Copy union-specific set state.
2390  *
2391  * @param state source state for copying the union state
2392  * @return a copy of the union-specific set state
2393  */
2394 static struct SetState *
2395 union_copy_state (struct SetState *state)
2396 {
2397   struct SetState *new_state;
2398
2399   GNUNET_assert ( (NULL != state) &&
2400                   (NULL != state->se) );
2401   new_state = GNUNET_new (struct SetState);
2402   new_state->se = strata_estimator_dup (state->se);
2403
2404   return new_state;
2405 }
2406
2407
2408 /**
2409  * Handle case where channel went down for an operation.
2410  *
2411  * @param op operation that lost the channel
2412  */
2413 static void
2414 union_channel_death (struct Operation *op)
2415 {
2416   send_client_done (op);
2417   _GSS_operation_destroy (op,
2418                           GNUNET_YES);
2419 }
2420
2421
2422 /**
2423  * Get the table with implementing functions for
2424  * set union.
2425  *
2426  * @return the operation specific VTable
2427  */
2428 const struct SetVT *
2429 _GSS_union_vt ()
2430 {
2431   static const struct SetVT union_vt = {
2432     .create = &union_set_create,
2433     .add = &union_add,
2434     .remove = &union_remove,
2435     .destroy_set = &union_set_destroy,
2436     .evaluate = &union_evaluate,
2437     .accept = &union_accept,
2438     .cancel = &union_op_cancel,
2439     .copy_state = &union_copy_state,
2440     .channel_death = &union_channel_death
2441   };
2442
2443   return &union_vt;
2444 }