-comments: the world ain't all male
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14      
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file set/gnunet-service-set_union.c
20  * @brief two-peer set operations
21  * @author Florian Dold
22  * @author Christian Grothoff
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_statistics_service.h"
27 #include "gnunet-service-set.h"
28 #include "ibf.h"
29 #include "gnunet-service-set_union.h"
30 #include "gnunet-service-set_union_strata_estimator.h"
31 #include "gnunet-service-set_protocol.h"
32 #include <gcrypt.h>
33
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
36
37
38 /**
39  * Number of IBFs in a strata estimator.
40  */
41 #define SE_STRATA_COUNT 32
42
43 /**
44  * Size of the IBFs in the strata estimator.
45  */
46 #define SE_IBF_SIZE 80
47
48 /**
49  * The hash num parameter for the difference digests and strata estimators.
50  */
51 #define SE_IBF_HASH_NUM 4
52
53 /**
54  * Number of buckets that can be transmitted in one message.
55  */
56 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
57
58 /**
59  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60  * Choose this value so that computing the IBF is still cheaper
61  * than transmitting all values.
62  */
63 #define MAX_IBF_ORDER (20)
64
65 /**
66  * Number of buckets used in the ibf per estimated
67  * difference.
68  */
69 #define IBF_ALPHA 4
70
71
72 /**
73  * Current phase we are in for a union operation.
74  */
75 enum UnionOperationPhase
76 {
77   /**
78    * We sent the request message, and expect a strata estimator.
79    */
80   PHASE_EXPECT_SE,
81
82   /**
83    * We sent the strata estimator, and expect an IBF. This phase is entered once
84    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
85    *
86    * XXX: could use better wording.
87    * XXX: repurposed to also expect a "request full set" message, should be renamed
88    *
89    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
90    */
91   PHASE_EXPECT_IBF,
92
93   /**
94    * Continuation for multi part IBFs.
95    */
96   PHASE_EXPECT_IBF_CONT,
97
98   /**
99    * We are decoding an IBF.
100    */
101   PHASE_INVENTORY_ACTIVE,
102
103   /**
104    * The other peer is decoding the IBF we just sent.
105    */
106   PHASE_INVENTORY_PASSIVE,
107
108   /**
109    * The protocol is almost finished, but we still have to flush our message
110    * queue and/or expect some elements.
111    */
112   PHASE_FINISH_CLOSING,
113
114   /**
115    * In the penultimate phase,
116    * we wait until all our demands
117    * are satisfied.  Then we send a done
118    * message, and wait for another done message.
119    */
120   PHASE_FINISH_WAITING,
121
122   /**
123    * In the ultimate phase, we wait until
124    * our demands are satisfied and then
125    * quit (sending another DONE message).
126    */
127   PHASE_DONE,
128
129   /**
130    * After sending the full set, wait for responses with the elements
131    * that the local peer is missing.
132    */
133   PHASE_FULL_SENDING,
134 };
135
136
137 /**
138  * State of an evaluate operation with another peer.
139  */
140 struct OperationState
141 {
142   /**
143    * Copy of the set's strata estimator at the time of
144    * creation of this operation.
145    */
146   struct StrataEstimator *se;
147
148   /**
149    * The IBF we currently receive.
150    */
151   struct InvertibleBloomFilter *remote_ibf;
152
153   /**
154    * The IBF with the local set's element.
155    */
156   struct InvertibleBloomFilter *local_ibf;
157
158   /**
159    * Maps unsalted IBF-Keys to elements.
160    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
161    * Colliding IBF-Keys are linked.
162    */
163   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
164
165   /**
166    * Current state of the operation.
167    */
168   enum UnionOperationPhase phase;
169
170   /**
171    * Did we send the client that we are done?
172    */
173   int client_done_sent;
174
175   /**
176    * Number of ibf buckets already received into the @a remote_ibf.
177    */
178   unsigned int ibf_buckets_received;
179
180   /**
181    * Hashes for elements that we have demanded from the other peer.
182    */
183   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
184
185   /**
186    * Salt that we're using for sending IBFs
187    */
188   uint32_t salt_send;
189
190   /**
191    * Salt for the IBF we've received and that we're currently decoding.
192    */
193   uint32_t salt_receive;
194
195   /**
196    * Number of elements we received from the other peer
197    * that were not in the local set yet.
198    */
199   uint32_t received_fresh;
200
201   /**
202    * Total number of elements received from the other peer.
203    */
204   uint32_t received_total;
205
206   /**
207    * Initial size of our set, just before
208    * the operation started.
209    */
210   uint64_t initial_size;
211 };
212
213
214 /**
215  * The key entry is used to associate an ibf key with an element.
216  */
217 struct KeyEntry
218 {
219   /**
220    * IBF key for the entry, derived from the current salt.
221    */
222   struct IBF_Key ibf_key;
223
224   /**
225    * The actual element associated with the key.
226    *
227    * Only owned by the union operation if element->operation
228    * is #GNUNET_YES.
229    */
230   struct ElementEntry *element;
231
232   /**
233    * Did we receive this element?
234    * Even if element->is_foreign is false, we might
235    * have received the element, so this indicates that
236    * the other peer has it.
237    */
238   int received;
239 };
240
241
242 /**
243  * Used as a closure for sending elements
244  * with a specific IBF key.
245  */
246 struct SendElementClosure
247 {
248   /**
249    * The IBF key whose matching elements should be
250    * sent.
251    */
252   struct IBF_Key ibf_key;
253
254   /**
255    * Operation for which the elements
256    * should be sent.
257    */
258   struct Operation *op;
259 };
260
261
262 /**
263  * Extra state required for efficient set union.
264  */
265 struct SetState
266 {
267   /**
268    * The strata estimator is only generated once for
269    * each set.
270    * The IBF keys are derived from the element hashes with
271    * salt=0.
272    */
273   struct StrataEstimator *se;
274 };
275
276
277 /**
278  * Iterator over hash map entries, called to
279  * destroy the linked list of colliding ibf key entries.
280  *
281  * @param cls closure
282  * @param key current key code
283  * @param value value in the hash map
284  * @return #GNUNET_YES if we should continue to iterate,
285  *         #GNUNET_NO if not.
286  */
287 static int
288 destroy_key_to_element_iter (void *cls,
289                              uint32_t key,
290                              void *value)
291 {
292   struct KeyEntry *k = value;
293
294   GNUNET_assert (NULL != k);
295   if (GNUNET_YES == k->element->remote)
296   {
297     GNUNET_free (k->element);
298     k->element = NULL;
299   }
300   GNUNET_free (k);
301   return GNUNET_YES;
302 }
303
304
305 /**
306  * Destroy the union operation.  Only things specific to the union
307  * operation are destroyed.
308  *
309  * @param op union operation to destroy
310  */
311 static void
312 union_op_cancel (struct Operation *op)
313 {
314   LOG (GNUNET_ERROR_TYPE_DEBUG,
315        "destroying union op\n");
316   /* check if the op was canceled twice */
317   GNUNET_assert (NULL != op->state);
318   if (NULL != op->state->remote_ibf)
319   {
320     ibf_destroy (op->state->remote_ibf);
321     op->state->remote_ibf = NULL;
322   }
323   if (NULL != op->state->demanded_hashes)
324   {
325     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
326     op->state->demanded_hashes = NULL;
327   }
328   if (NULL != op->state->local_ibf)
329   {
330     ibf_destroy (op->state->local_ibf);
331     op->state->local_ibf = NULL;
332   }
333   if (NULL != op->state->se)
334   {
335     strata_estimator_destroy (op->state->se);
336     op->state->se = NULL;
337   }
338   if (NULL != op->state->key_to_element)
339   {
340     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
341                                              &destroy_key_to_element_iter,
342                                              NULL);
343     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
344     op->state->key_to_element = NULL;
345   }
346   GNUNET_free (op->state);
347   op->state = NULL;
348   LOG (GNUNET_ERROR_TYPE_DEBUG,
349        "destroying union op done\n");
350 }
351
352
353 /**
354  * Inform the client that the union operation has failed,
355  * and proceed to destroy the evaluate operation.
356  *
357  * @param op the union operation to fail
358  */
359 static void
360 fail_union_operation (struct Operation *op)
361 {
362   struct GNUNET_MQ_Envelope *ev;
363   struct GNUNET_SET_ResultMessage *msg;
364
365   LOG (GNUNET_ERROR_TYPE_WARNING,
366        "union operation failed\n");
367   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
368   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
369   msg->request_id = htonl (op->client_request_id);
370   msg->element_type = htons (0);
371   GNUNET_MQ_send (op->set->cs->mq,
372                   ev);
373   _GSS_operation_destroy (op, GNUNET_YES);
374 }
375
376
377 /**
378  * Derive the IBF key from a hash code and
379  * a salt.
380  *
381  * @param src the hash code
382  * @return the derived IBF key
383  */
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
386 {
387   struct IBF_Key key;
388   uint16_t salt = 0;
389
390   GNUNET_assert (GNUNET_OK ==
391                  GNUNET_CRYPTO_kdf (&key, sizeof (key),
392                                     src, sizeof *src,
393                                     &salt, sizeof (salt),
394                                     NULL, 0));
395   return key;
396 }
397
398
399 /**
400  * Context for #op_get_element_iterator
401  */
402 struct GetElementContext
403 {
404   /**
405    * FIXME.
406    */
407   struct GNUNET_HashCode hash;
408
409   /**
410    * FIXME.
411    */
412   struct KeyEntry *k;
413 };
414
415
416 /**
417  * Iterator over the mapping from IBF keys to element entries.  Checks if we
418  * have an element with a given GNUNET_HashCode.
419  *
420  * @param cls closure
421  * @param key current key code
422  * @param value value in the hash map
423  * @return #GNUNET_YES if we should search further,
424  *         #GNUNET_NO if we've found the element.
425  */
426 static int
427 op_get_element_iterator (void *cls,
428                          uint32_t key,
429                          void *value)
430 {
431   struct GetElementContext *ctx = cls;
432   struct KeyEntry *k = value;
433
434   GNUNET_assert (NULL != k);
435   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
436                                    &ctx->hash))
437   {
438     ctx->k = k;
439     return GNUNET_NO;
440   }
441   return GNUNET_YES;
442 }
443
444
445 /**
446  * Determine whether the given element is already in the operation's element
447  * set.
448  *
449  * @param op operation that should be tested for 'element_hash'
450  * @param element_hash hash of the element to look for
451  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
452  */
453 static struct KeyEntry *
454 op_get_element (struct Operation *op,
455                 const struct GNUNET_HashCode *element_hash)
456 {
457   int ret;
458   struct IBF_Key ibf_key;
459   struct GetElementContext ctx = {{{ 0 }} , 0};
460
461   ctx.hash = *element_hash;
462
463   ibf_key = get_ibf_key (element_hash);
464   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
465                                                       (uint32_t) ibf_key.key_val,
466                                                       op_get_element_iterator,
467                                                       &ctx);
468
469   /* was the iteration aborted because we found the element? */
470   if (GNUNET_SYSERR == ret)
471   {
472     GNUNET_assert (NULL != ctx.k);
473     return ctx.k;
474   }
475   return NULL;
476 }
477
478
479 /**
480  * Insert an element into the union operation's
481  * key-to-element mapping. Takes ownership of 'ee'.
482  * Note that this does not insert the element in the set,
483  * only in the operation's key-element mapping.
484  * This is done to speed up re-tried operations, if some elements
485  * were transmitted, and then the IBF fails to decode.
486  *
487  * XXX: clarify ownership, doesn't sound right.
488  *
489  * @param op the union operation
490  * @param ee the element entry
491  * @parem received was this element received from the remote peer?
492  */
493 static void
494 op_register_element (struct Operation *op,
495                      struct ElementEntry *ee,
496                      int received)
497 {
498   struct IBF_Key ibf_key;
499   struct KeyEntry *k;
500
501   ibf_key = get_ibf_key (&ee->element_hash);
502   k = GNUNET_new (struct KeyEntry);
503   k->element = ee;
504   k->ibf_key = ibf_key;
505   k->received = received;
506   GNUNET_assert (GNUNET_OK ==
507                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
508                                                       (uint32_t) ibf_key.key_val,
509                                                       k,
510                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
511 }
512
513
514 /**
515  * FIXME.
516  */
517 static void
518 salt_key (const struct IBF_Key *k_in,
519           uint32_t salt,
520           struct IBF_Key *k_out)
521 {
522   int s = salt % 64;
523   uint64_t x = k_in->key_val;
524   /* rotate ibf key */
525   x = (x >> s) | (x << (64 - s));
526   k_out->key_val = x;
527 }
528
529
530 /**
531  * FIXME.
532  */
533 static void
534 unsalt_key (const struct IBF_Key *k_in,
535             uint32_t salt,
536             struct IBF_Key *k_out)
537 {
538   int s = salt % 64;
539   uint64_t x = k_in->key_val;
540   x = (x << s) | (x >> (64 - s));
541   k_out->key_val = x;
542 }
543
544
545 /**
546  * Insert a key into an ibf.
547  *
548  * @param cls the ibf
549  * @param key unused
550  * @param value the key entry to get the key from
551  */
552 static int
553 prepare_ibf_iterator (void *cls,
554                       uint32_t key,
555                       void *value)
556 {
557   struct Operation *op = cls;
558   struct KeyEntry *ke = value;
559   struct IBF_Key salted_key;
560
561   LOG (GNUNET_ERROR_TYPE_DEBUG,
562        "[OP %x] inserting %lx (hash %s) into ibf\n",
563        (void *) op,
564        (unsigned long) ke->ibf_key.key_val,
565        GNUNET_h2s (&ke->element->element_hash));
566   salt_key (&ke->ibf_key,
567             op->state->salt_send,
568             &salted_key);
569   ibf_insert (op->state->local_ibf, salted_key);
570   return GNUNET_YES;
571 }
572
573
574 /**
575  * Iterator for initializing the
576  * key-to-element mapping of a union operation
577  *
578  * @param cls the union operation `struct Operation *`
579  * @param key unused
580  * @param value the `struct ElementEntry *` to insert
581  *        into the key-to-element mapping
582  * @return #GNUNET_YES (to continue iterating)
583  */
584 static int
585 init_key_to_element_iterator (void *cls,
586                               const struct GNUNET_HashCode *key,
587                               void *value)
588 {
589   struct Operation *op = cls;
590   struct ElementEntry *ee = value;
591
592   /* make sure that the element belongs to the set at the time
593    * of creating the operation */
594   if (GNUNET_NO ==
595       _GSS_is_element_of_operation (ee,
596                                     op))
597     return GNUNET_YES;
598   GNUNET_assert (GNUNET_NO == ee->remote);
599   op_register_element (op,
600                        ee,
601                        GNUNET_NO);
602   return GNUNET_YES;
603 }
604
605
606 /**
607  * Initialize the IBF key to element mapping local to this set
608  * operation.
609  *
610  * @param op the set union operation
611  */
612 static void
613 initialize_key_to_element (struct Operation *op)
614 {
615   unsigned int len;
616
617   GNUNET_assert (NULL == op->state->key_to_element);
618   len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
619   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
620   GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
621                                          &init_key_to_element_iterator,
622                                          op);
623 }
624
625
626 /**
627  * Create an ibf with the operation's elements
628  * of the specified size
629  *
630  * @param op the union operation
631  * @param size size of the ibf to create
632  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
633  */
634 static int
635 prepare_ibf (struct Operation *op,
636              uint32_t size)
637 {
638   GNUNET_assert (NULL != op->state->key_to_element);
639
640   if (NULL != op->state->local_ibf)
641     ibf_destroy (op->state->local_ibf);
642   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
643   if (NULL == op->state->local_ibf)
644   {
645     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
646                 "Failed to allocate local IBF\n");
647     return GNUNET_SYSERR;
648   }
649   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
650                                            &prepare_ibf_iterator,
651                                            op);
652   return GNUNET_OK;
653 }
654
655
656 /**
657  * Send an ibf of appropriate size.
658  *
659  * Fragments the IBF into multiple messages if necessary.
660  *
661  * @param op the union operation
662  * @param ibf_order order of the ibf to send, size=2^order
663  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
664  */
665 static int
666 send_ibf (struct Operation *op,
667           uint16_t ibf_order)
668 {
669   unsigned int buckets_sent = 0;
670   struct InvertibleBloomFilter *ibf;
671
672   if (GNUNET_OK !=
673       prepare_ibf (op, 1<<ibf_order))
674   {
675     /* allocation failed */
676     return GNUNET_SYSERR;
677   }
678
679   LOG (GNUNET_ERROR_TYPE_DEBUG,
680        "sending ibf of size %u\n",
681        1<<ibf_order);
682
683   {
684     char name[64] = { 0 };
685     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
686     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
687   }
688
689   ibf = op->state->local_ibf;
690
691   while (buckets_sent < (1 << ibf_order))
692   {
693     unsigned int buckets_in_message;
694     struct GNUNET_MQ_Envelope *ev;
695     struct IBFMessage *msg;
696
697     buckets_in_message = (1 << ibf_order) - buckets_sent;
698     /* limit to maximum */
699     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
700       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
701
702     ev = GNUNET_MQ_msg_extra (msg,
703                               buckets_in_message * IBF_BUCKET_SIZE,
704                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
705     msg->reserved1 = 0;
706     msg->reserved2 = 0;
707     msg->order = ibf_order;
708     msg->offset = htonl (buckets_sent);
709     msg->salt = htonl (op->state->salt_send);
710     ibf_write_slice (ibf, buckets_sent,
711                      buckets_in_message, &msg[1]);
712     buckets_sent += buckets_in_message;
713     LOG (GNUNET_ERROR_TYPE_DEBUG,
714          "ibf chunk size %u, %u/%u sent\n",
715          buckets_in_message,
716          buckets_sent,
717          1<<ibf_order);
718     GNUNET_MQ_send (op->mq, ev);
719   }
720
721   /* The other peer must decode the IBF, so
722    * we're passive. */
723   op->state->phase = PHASE_INVENTORY_PASSIVE;
724   return GNUNET_OK;
725 }
726
727
728 /**
729  * Compute the necessary order of an ibf
730  * from the size of the symmetric set difference.
731  *
732  * @param diff the difference
733  * @return the required size of the ibf
734  */
735 static unsigned int
736 get_order_from_difference (unsigned int diff)
737 {
738   unsigned int ibf_order;
739
740   ibf_order = 2;
741   while ( ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
742             ((1<<ibf_order) < SE_IBF_HASH_NUM) ) &&
743           (ibf_order < MAX_IBF_ORDER) )
744     ibf_order++;
745   // add one for correction
746   return ibf_order + 1;
747 }
748
749
750 /**
751  * Send a set element.
752  *
753  * @param cls the union operation `struct Operation *`
754  * @param key unused
755  * @param value the `struct ElementEntry *` to insert
756  *        into the key-to-element mapping
757  * @return #GNUNET_YES (to continue iterating)
758  */
759 static int
760 send_full_element_iterator (void *cls,
761                        const struct GNUNET_HashCode *key,
762                        void *value)
763 {
764   struct Operation *op = cls;
765   struct GNUNET_SET_ElementMessage *emsg;
766   struct ElementEntry *ee = value;
767   struct GNUNET_SET_Element *el = &ee->element;
768   struct GNUNET_MQ_Envelope *ev;
769
770   LOG (GNUNET_ERROR_TYPE_DEBUG,
771        "Sending element %s\n",
772        GNUNET_h2s (key));
773   ev = GNUNET_MQ_msg_extra (emsg,
774                             el->size,
775                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
776   emsg->element_type = htons (el->element_type);
777   GNUNET_memcpy (&emsg[1],
778                  el->data,
779                  el->size);
780   GNUNET_MQ_send (op->mq,
781                   ev);
782   return GNUNET_YES;
783 }
784
785
786 /**
787  * Switch to full set transmission for @a op.
788  *
789  * @param op operation to switch to full set transmission.
790  */
791 static void
792 send_full_set (struct Operation *op)
793 {
794   struct GNUNET_MQ_Envelope *ev;
795
796   op->state->phase = PHASE_FULL_SENDING;
797   LOG (GNUNET_ERROR_TYPE_DEBUG,
798        "Dedicing to transmit the full set\n");
799   /* FIXME: use a more memory-friendly way of doing this with an
800      iterator, just as we do in the non-full case! */
801   (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
802                                                 &send_full_element_iterator,
803                                                 op);
804   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
805   GNUNET_MQ_send (op->mq,
806                   ev);
807 }
808
809
810 /**
811  * Handle a strata estimator from a remote peer
812  *
813  * @param cls the union operation
814  * @param msg the message
815  */
816 int
817 check_union_p2p_strata_estimator (void *cls,
818                                   const struct StrataEstimatorMessage *msg)
819 {
820   struct Operation *op = cls;
821   int is_compressed;
822   size_t len;
823
824   if (op->state->phase != PHASE_EXPECT_SE)
825   {
826     GNUNET_break (0);
827     return GNUNET_SYSERR;
828   }
829   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
830   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
831   if ( (GNUNET_NO == is_compressed) &&
832        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
833   {
834     GNUNET_break (0);
835     return GNUNET_SYSERR;
836   }
837   return GNUNET_OK;
838 }
839
840
841 /**
842  * Handle a strata estimator from a remote peer
843  *
844  * @param cls the union operation
845  * @param msg the message
846  */
847 void
848 handle_union_p2p_strata_estimator (void *cls,
849                                    const struct StrataEstimatorMessage *msg)
850 {
851   struct Operation *op = cls;
852   struct StrataEstimator *remote_se;
853   unsigned int diff;
854   uint64_t other_size;
855   size_t len;
856   int is_compressed;
857
858   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
859   GNUNET_STATISTICS_update (_GSS_statistics,
860                             "# bytes of SE received",
861                             ntohs (msg->header.size),
862                             GNUNET_NO);
863   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
864   other_size = GNUNET_ntohll (msg->set_size);
865   remote_se = strata_estimator_create (SE_STRATA_COUNT,
866                                        SE_IBF_SIZE,
867                                        SE_IBF_HASH_NUM);
868   if (NULL == remote_se)
869   {
870     /* insufficient resources, fail */
871     fail_union_operation (op);
872     return;
873   }
874   if (GNUNET_OK !=
875       strata_estimator_read (&msg[1],
876                              len,
877                              is_compressed,
878                              remote_se))
879   {
880     /* decompression failed */
881     strata_estimator_destroy (remote_se);
882     fail_union_operation (op);
883     return;
884   }
885   GNUNET_assert (NULL != op->state->se);
886   diff = strata_estimator_difference (remote_se,
887                                       op->state->se);
888
889   if (diff > 200)
890     diff = diff * 3 / 2;
891
892   strata_estimator_destroy (remote_se);
893   strata_estimator_destroy (op->state->se);
894   op->state->se = NULL;
895   LOG (GNUNET_ERROR_TYPE_DEBUG,
896        "got se diff=%d, using ibf size %d\n",
897        diff,
898        1U << get_order_from_difference (diff));
899
900   {
901     char *set_debug;
902
903     set_debug = getenv ("GNUNET_SET_BENCHMARK");
904     if ( (NULL != set_debug) &&
905          (0 == strcmp (set_debug, "1")) )
906     {
907       FILE *f = fopen ("set.log", "a");
908       fprintf (f, "%llu\n", (unsigned long long) diff);
909       fclose (f);
910     }
911   }
912
913   if ( (GNUNET_YES == op->byzantine) &&
914        (other_size < op->byzantine_lower_bound) )
915   {
916     GNUNET_break (0);
917     fail_union_operation (op);
918     return;
919   }
920
921   if ( (GNUNET_YES == op->force_full) ||
922        (diff > op->state->initial_size / 4) ||
923        (0 == other_size) )
924   {
925     LOG (GNUNET_ERROR_TYPE_DEBUG,
926          "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
927          diff,
928          op->state->initial_size);
929     GNUNET_STATISTICS_update (_GSS_statistics,
930                               "# of full sends",
931                               1,
932                               GNUNET_NO);
933     if ( (op->state->initial_size <= other_size) ||
934          (0 == other_size) )
935     {
936       send_full_set (op);
937     }
938     else
939     {
940       struct GNUNET_MQ_Envelope *ev;
941
942       LOG (GNUNET_ERROR_TYPE_DEBUG,
943            "Telling other peer that we expect its full set\n");
944       op->state->phase = PHASE_EXPECT_IBF;
945       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
946       GNUNET_MQ_send (op->mq,
947                       ev);
948     }
949   }
950   else
951   {
952     GNUNET_STATISTICS_update (_GSS_statistics,
953                               "# of ibf sends",
954                               1,
955                               GNUNET_NO);
956     if (GNUNET_OK !=
957         send_ibf (op,
958                   get_order_from_difference (diff)))
959     {
960       /* Internal error, best we can do is shut the connection */
961       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
962                   "Failed to send IBF, closing connection\n");
963       fail_union_operation (op);
964       return;
965     }
966   }
967   GNUNET_CADET_receive_done (op->channel);
968 }
969
970
971 /**
972  * Iterator to send elements to a remote peer
973  *
974  * @param cls closure with the element key and the union operation
975  * @param key ignored
976  * @param value the key entry
977  */
978 static int
979 send_offers_iterator (void *cls,
980                       uint32_t key,
981                       void *value)
982 {
983   struct SendElementClosure *sec = cls;
984   struct Operation *op = sec->op;
985   struct KeyEntry *ke = value;
986   struct GNUNET_MQ_Envelope *ev;
987   struct GNUNET_MessageHeader *mh;
988
989   /* Detect 32-bit key collision for the 64-bit IBF keys. */
990   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
991     return GNUNET_YES;
992
993   ev = GNUNET_MQ_msg_header_extra (mh,
994                                    sizeof (struct GNUNET_HashCode),
995                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
996
997   GNUNET_assert (NULL != ev);
998   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
999   LOG (GNUNET_ERROR_TYPE_DEBUG,
1000        "[OP %x] sending element offer (%s) to peer\n",
1001        (void *) op,
1002        GNUNET_h2s (&ke->element->element_hash));
1003   GNUNET_MQ_send (op->mq, ev);
1004   return GNUNET_YES;
1005 }
1006
1007
1008 /**
1009  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1010  *
1011  * @param op union operation
1012  * @param ibf_key IBF key of interest
1013  */
1014 static void
1015 send_offers_for_key (struct Operation *op,
1016                      struct IBF_Key ibf_key)
1017 {
1018   struct SendElementClosure send_cls;
1019
1020   send_cls.ibf_key = ibf_key;
1021   send_cls.op = op;
1022   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
1023                                                        (uint32_t) ibf_key.key_val,
1024                                                        &send_offers_iterator,
1025                                                        &send_cls);
1026 }
1027
1028
1029 /**
1030  * Decode which elements are missing on each side, and
1031  * send the appropriate offers and inquiries.
1032  *
1033  * @param op union operation
1034  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1035  */
1036 static int
1037 decode_and_send (struct Operation *op)
1038 {
1039   struct IBF_Key key;
1040   struct IBF_Key last_key;
1041   int side;
1042   unsigned int num_decoded;
1043   struct InvertibleBloomFilter *diff_ibf;
1044
1045   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1046
1047   if (GNUNET_OK !=
1048       prepare_ibf (op,
1049                    op->state->remote_ibf->size))
1050   {
1051     GNUNET_break (0);
1052     /* allocation failed */
1053     return GNUNET_SYSERR;
1054   }
1055   diff_ibf = ibf_dup (op->state->local_ibf);
1056   ibf_subtract (diff_ibf,
1057                 op->state->remote_ibf);
1058
1059   ibf_destroy (op->state->remote_ibf);
1060   op->state->remote_ibf = NULL;
1061
1062   LOG (GNUNET_ERROR_TYPE_DEBUG,
1063        "decoding IBF (size=%u)\n",
1064        diff_ibf->size);
1065
1066   num_decoded = 0;
1067   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1068
1069   while (1)
1070   {
1071     int res;
1072     int cycle_detected = GNUNET_NO;
1073
1074     last_key = key;
1075
1076     res = ibf_decode (diff_ibf, &side, &key);
1077     if (res == GNUNET_OK)
1078     {
1079       LOG (GNUNET_ERROR_TYPE_DEBUG,
1080            "decoded ibf key %lx\n",
1081            (unsigned long) key.key_val);
1082       num_decoded += 1;
1083       if ( (num_decoded > diff_ibf->size) ||
1084            ( (num_decoded > 1) &&
1085              (last_key.key_val == key.key_val) ) )
1086       {
1087         LOG (GNUNET_ERROR_TYPE_DEBUG,
1088              "detected cyclic ibf (decoded %u/%u)\n",
1089              num_decoded,
1090              diff_ibf->size);
1091         cycle_detected = GNUNET_YES;
1092       }
1093     }
1094     if ( (GNUNET_SYSERR == res) ||
1095          (GNUNET_YES == cycle_detected) )
1096     {
1097       int next_order;
1098       next_order = 0;
1099       while (1<<next_order < diff_ibf->size)
1100         next_order++;
1101       next_order++;
1102       if (next_order <= MAX_IBF_ORDER)
1103       {
1104         LOG (GNUNET_ERROR_TYPE_DEBUG,
1105              "decoding failed, sending larger ibf (size %u)\n",
1106              1<<next_order);
1107         GNUNET_STATISTICS_update (_GSS_statistics,
1108                                   "# of IBF retries",
1109                                   1,
1110                                   GNUNET_NO);
1111         op->state->salt_send++;
1112         if (GNUNET_OK !=
1113             send_ibf (op, next_order))
1114         {
1115           /* Internal error, best we can do is shut the connection */
1116           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1117                       "Failed to send IBF, closing connection\n");
1118           fail_union_operation (op);
1119           ibf_destroy (diff_ibf);
1120           return GNUNET_SYSERR;
1121         }
1122       }
1123       else
1124       {
1125         GNUNET_STATISTICS_update (_GSS_statistics,
1126                                   "# of failed union operations (too large)",
1127                                   1,
1128                                   GNUNET_NO);
1129         // XXX: Send the whole set, element-by-element
1130         LOG (GNUNET_ERROR_TYPE_ERROR,
1131              "set union failed: reached ibf limit\n");
1132         fail_union_operation (op);
1133         ibf_destroy (diff_ibf);
1134         return GNUNET_SYSERR;
1135       }
1136       break;
1137     }
1138     if (GNUNET_NO == res)
1139     {
1140       struct GNUNET_MQ_Envelope *ev;
1141
1142       LOG (GNUNET_ERROR_TYPE_DEBUG,
1143            "transmitted all values, sending DONE\n");
1144       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1145       GNUNET_MQ_send (op->mq, ev);
1146       /* We now wait until we get a DONE message back
1147        * and then wait for our MQ to be flushed and all our
1148        * demands be delivered. */
1149       break;
1150     }
1151     if (1 == side)
1152     {
1153       struct IBF_Key unsalted_key;
1154
1155       unsalt_key (&key,
1156                   op->state->salt_receive,
1157                   &unsalted_key);
1158       send_offers_for_key (op,
1159                            unsalted_key);
1160     }
1161     else if (-1 == side)
1162     {
1163       struct GNUNET_MQ_Envelope *ev;
1164       struct InquiryMessage *msg;
1165
1166       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1167        * the effort additional complexity. */
1168       ev = GNUNET_MQ_msg_extra (msg,
1169                                 sizeof (struct IBF_Key),
1170                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1171       msg->salt = htonl (op->state->salt_receive);
1172       GNUNET_memcpy (&msg[1],
1173               &key,
1174               sizeof (struct IBF_Key));
1175       LOG (GNUNET_ERROR_TYPE_DEBUG,
1176            "sending element inquiry for IBF key %lx\n",
1177            (unsigned long) key.key_val);
1178       GNUNET_MQ_send (op->mq, ev);
1179     }
1180     else
1181     {
1182       GNUNET_assert (0);
1183     }
1184   }
1185   ibf_destroy (diff_ibf);
1186   return GNUNET_OK;
1187 }
1188
1189
1190 /**
1191  * Check an IBF message from a remote peer.
1192  *
1193  * Reassemble the IBF from multiple pieces, and
1194  * process the whole IBF once possible.
1195  *
1196  * @param cls the union operation
1197  * @param msg the header of the message
1198  * @return #GNUNET_OK if @a msg is well-formed
1199  */
1200 int
1201 check_union_p2p_ibf (void *cls,
1202                      const struct IBFMessage *msg)
1203 {
1204   struct Operation *op = cls;
1205   unsigned int buckets_in_message;
1206
1207   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1208   {
1209     GNUNET_break_op (0);
1210     return GNUNET_SYSERR;
1211   }
1212   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1213   if (0 == buckets_in_message)
1214   {
1215     GNUNET_break_op (0);
1216     return GNUNET_SYSERR;
1217   }
1218   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1219   {
1220     GNUNET_break_op (0);
1221     return GNUNET_SYSERR;
1222   }
1223   if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1224   {
1225     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1226     {
1227       GNUNET_break_op (0);
1228       return GNUNET_SYSERR;
1229     }
1230     if (1<<msg->order != op->state->remote_ibf->size)
1231     {
1232       GNUNET_break_op (0);
1233       return GNUNET_SYSERR;
1234     }
1235     if (ntohl (msg->salt) != op->state->salt_receive)
1236     {
1237       GNUNET_break_op (0);
1238       return GNUNET_SYSERR;
1239     }
1240   }
1241   else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1242             (op->state->phase != PHASE_EXPECT_IBF) )
1243   {
1244     GNUNET_break_op (0);
1245     return GNUNET_SYSERR;
1246   }
1247
1248   return GNUNET_OK;
1249 }
1250
1251
1252 /**
1253  * Handle an IBF message from a remote peer.
1254  *
1255  * Reassemble the IBF from multiple pieces, and
1256  * process the whole IBF once possible.
1257  *
1258  * @param cls the union operation
1259  * @param msg the header of the message
1260  */
1261 void
1262 handle_union_p2p_ibf (void *cls,
1263                       const struct IBFMessage *msg)
1264 {
1265   struct Operation *op = cls;
1266   unsigned int buckets_in_message;
1267
1268   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1269   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1270        (op->state->phase == PHASE_EXPECT_IBF) )
1271   {
1272     op->state->phase = PHASE_EXPECT_IBF_CONT;
1273     GNUNET_assert (NULL == op->state->remote_ibf);
1274     LOG (GNUNET_ERROR_TYPE_DEBUG,
1275          "Creating new ibf of size %u\n",
1276          1 << msg->order);
1277     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1278     op->state->salt_receive = ntohl (msg->salt);
1279     LOG (GNUNET_ERROR_TYPE_DEBUG,
1280          "Receiving new IBF with salt %u\n",
1281          op->state->salt_receive);
1282     if (NULL == op->state->remote_ibf)
1283     {
1284       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1285                   "Failed to parse remote IBF, closing connection\n");
1286       fail_union_operation (op);
1287       return;
1288     }
1289     op->state->ibf_buckets_received = 0;
1290     if (0 != ntohl (msg->offset))
1291     {
1292       GNUNET_break_op (0);
1293       fail_union_operation (op);
1294       return;
1295     }
1296   }
1297   else
1298   {
1299     GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1300     LOG (GNUNET_ERROR_TYPE_DEBUG,
1301          "Received more of IBF\n");
1302   }
1303   GNUNET_assert (NULL != op->state->remote_ibf);
1304
1305   ibf_read_slice (&msg[1],
1306                   op->state->ibf_buckets_received,
1307                   buckets_in_message,
1308                   op->state->remote_ibf);
1309   op->state->ibf_buckets_received += buckets_in_message;
1310
1311   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1312   {
1313     LOG (GNUNET_ERROR_TYPE_DEBUG,
1314          "received full ibf\n");
1315     op->state->phase = PHASE_INVENTORY_ACTIVE;
1316     if (GNUNET_OK !=
1317         decode_and_send (op))
1318     {
1319       /* Internal error, best we can do is shut down */
1320       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1321                   "Failed to decode IBF, closing connection\n");
1322       fail_union_operation (op);
1323       return;
1324     }
1325   }
1326   GNUNET_CADET_receive_done (op->channel);
1327 }
1328
1329
1330 /**
1331  * Send a result message to the client indicating
1332  * that there is a new element.
1333  *
1334  * @param op union operation
1335  * @param element element to send
1336  * @param status status to send with the new element
1337  */
1338 static void
1339 send_client_element (struct Operation *op,
1340                      struct GNUNET_SET_Element *element,
1341                      int status)
1342 {
1343   struct GNUNET_MQ_Envelope *ev;
1344   struct GNUNET_SET_ResultMessage *rm;
1345
1346   LOG (GNUNET_ERROR_TYPE_DEBUG,
1347        "sending element (size %u) to client\n",
1348        element->size);
1349   GNUNET_assert (0 != op->client_request_id);
1350   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1351   if (NULL == ev)
1352   {
1353     GNUNET_MQ_discard (ev);
1354     GNUNET_break (0);
1355     return;
1356   }
1357   rm->result_status = htons (status);
1358   rm->request_id = htonl (op->client_request_id);
1359   rm->element_type = htons (element->element_type);
1360   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1361   GNUNET_memcpy (&rm[1],
1362                  element->data,
1363                  element->size);
1364   GNUNET_MQ_send (op->set->cs->mq,
1365                   ev);
1366 }
1367
1368
1369 /**
1370  * Destroy remote channel.
1371  *
1372  * @param op operation
1373  */
1374 void destroy_channel (struct Operation *op)
1375 {
1376   struct GNUNET_CADET_Channel *channel;
1377
1378   if (NULL != (channel = op->channel))
1379   {
1380     /* This will free op; called conditionally as this helper function
1381        is also called from within the channel disconnect handler. */
1382     op->channel = NULL;
1383     GNUNET_CADET_channel_destroy (channel);
1384   }
1385 }
1386
1387
1388 /**
1389  * Signal to the client that the operation has finished and
1390  * destroy the operation.
1391  *
1392  * @param cls operation to destroy
1393  */
1394 static void
1395 send_client_done (void *cls)
1396 {
1397   struct Operation *op = cls;
1398   struct GNUNET_MQ_Envelope *ev;
1399   struct GNUNET_SET_ResultMessage *rm;
1400
1401   if (GNUNET_YES == op->state->client_done_sent) {
1402     return;
1403   }
1404
1405   if (PHASE_DONE != op->state->phase) {
1406     LOG (GNUNET_ERROR_TYPE_WARNING,
1407          "union operation failed\n");
1408     ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1409     rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1410     rm->request_id = htonl (op->client_request_id);
1411     rm->element_type = htons (0);
1412     GNUNET_MQ_send (op->set->cs->mq,
1413                     ev);
1414     return;
1415   }
1416
1417   op->state->client_done_sent = GNUNET_YES;
1418
1419   LOG (GNUNET_ERROR_TYPE_INFO,
1420        "Signalling client that union operation is done\n");
1421   ev = GNUNET_MQ_msg (rm,
1422                       GNUNET_MESSAGE_TYPE_SET_RESULT);
1423   rm->request_id = htonl (op->client_request_id);
1424   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1425   rm->element_type = htons (0);
1426   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1427   GNUNET_MQ_send (op->set->cs->mq,
1428                   ev);
1429 }
1430
1431
1432 /**
1433  * Tests if the operation is finished, and if so notify.
1434  *
1435  * @param op operation to check
1436  */
1437 static void
1438 maybe_finish (struct Operation *op)
1439 {
1440   unsigned int num_demanded;
1441
1442   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1443
1444   if (PHASE_FINISH_WAITING == op->state->phase)
1445   {
1446     LOG (GNUNET_ERROR_TYPE_DEBUG,
1447          "In PHASE_FINISH_WAITING, pending %u demands\n",
1448          num_demanded);
1449     if (0 == num_demanded)
1450     {
1451       struct GNUNET_MQ_Envelope *ev;
1452
1453       op->state->phase = PHASE_DONE;
1454       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1455       GNUNET_MQ_send (op->mq,
1456                       ev);
1457       /* We now wait until the other peer sends P2P_OVER
1458        * after it got all elements from us. */
1459     }
1460   }
1461   if (PHASE_FINISH_CLOSING == op->state->phase)
1462   {
1463     LOG (GNUNET_ERROR_TYPE_DEBUG,
1464          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1465          num_demanded);
1466     if (0 == num_demanded)
1467     {
1468       op->state->phase = PHASE_DONE;
1469       send_client_done (op);
1470       destroy_channel (op);
1471     }
1472   }
1473 }
1474
1475
1476 /**
1477  * Check an element message from a remote peer.
1478  *
1479  * @param cls the union operation
1480  * @param emsg the message
1481  */
1482 int
1483 check_union_p2p_elements (void *cls,
1484                           const struct GNUNET_SET_ElementMessage *emsg)
1485 {
1486   struct Operation *op = cls;
1487
1488   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1489   {
1490     GNUNET_break_op (0);
1491     return GNUNET_SYSERR;
1492   }
1493   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1494   {
1495     GNUNET_break_op (0);
1496     return GNUNET_SYSERR;
1497   }
1498   return GNUNET_OK;
1499 }
1500
1501
1502 /**
1503  * Handle an element message from a remote peer.
1504  * Sent by the other peer either because we decoded an IBF and placed a demand,
1505  * or because the other peer switched to full set transmission.
1506  *
1507  * @param cls the union operation
1508  * @param emsg the message
1509  */
1510 void
1511 handle_union_p2p_elements (void *cls,
1512                            const struct GNUNET_SET_ElementMessage *emsg)
1513 {
1514   struct Operation *op = cls;
1515   struct ElementEntry *ee;
1516   struct KeyEntry *ke;
1517   uint16_t element_size;
1518
1519   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1520   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1521   GNUNET_memcpy (&ee[1],
1522                  &emsg[1],
1523                  element_size);
1524   ee->element.size = element_size;
1525   ee->element.data = &ee[1];
1526   ee->element.element_type = ntohs (emsg->element_type);
1527   ee->remote = GNUNET_YES;
1528   GNUNET_SET_element_hash (&ee->element,
1529                            &ee->element_hash);
1530   if (GNUNET_NO ==
1531       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1532                                             &ee->element_hash,
1533                                             NULL))
1534   {
1535     /* We got something we didn't demand, since it's not in our map. */
1536     GNUNET_break_op (0);
1537     fail_union_operation (op);
1538     return;
1539   }
1540
1541   LOG (GNUNET_ERROR_TYPE_DEBUG,
1542        "Got element (size %u, hash %s) from peer\n",
1543        (unsigned int) element_size,
1544        GNUNET_h2s (&ee->element_hash));
1545
1546   GNUNET_STATISTICS_update (_GSS_statistics,
1547                             "# received elements",
1548                             1,
1549                             GNUNET_NO);
1550   GNUNET_STATISTICS_update (_GSS_statistics,
1551                             "# exchanged elements",
1552                             1,
1553                             GNUNET_NO);
1554
1555   op->state->received_total++;
1556
1557   ke = op_get_element (op, &ee->element_hash);
1558   if (NULL != ke)
1559   {
1560     /* Got repeated element.  Should not happen since
1561      * we track demands. */
1562     GNUNET_STATISTICS_update (_GSS_statistics,
1563                               "# repeated elements",
1564                               1,
1565                               GNUNET_NO);
1566     ke->received = GNUNET_YES;
1567     GNUNET_free (ee);
1568   }
1569   else
1570   {
1571     LOG (GNUNET_ERROR_TYPE_DEBUG,
1572          "Registering new element from remote peer\n");
1573     op->state->received_fresh++;
1574     op_register_element (op, ee, GNUNET_YES);
1575     /* only send results immediately if the client wants it */
1576     switch (op->result_mode)
1577     {
1578       case GNUNET_SET_RESULT_ADDED:
1579         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1580         break;
1581       case GNUNET_SET_RESULT_SYMMETRIC:
1582         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1583         break;
1584       default:
1585         /* Result mode not supported, should have been caught earlier. */
1586         GNUNET_break (0);
1587         break;
1588     }
1589   }
1590
1591   if ( (op->state->received_total > 8) &&
1592        (op->state->received_fresh < op->state->received_total / 3) )
1593   {
1594     /* The other peer gave us lots of old elements, there's something wrong. */
1595     GNUNET_break_op (0);
1596     fail_union_operation (op);
1597     return;
1598   }
1599   GNUNET_CADET_receive_done (op->channel);
1600   maybe_finish (op);
1601 }
1602
1603
1604 /**
1605  * Check a full element message from a remote peer.
1606  *
1607  * @param cls the union operation
1608  * @param emsg the message
1609  */
1610 int
1611 check_union_p2p_full_element (void *cls,
1612                               const struct GNUNET_SET_ElementMessage *emsg)
1613 {
1614   struct Operation *op = cls;
1615
1616   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1617   {
1618     GNUNET_break_op (0);
1619     return GNUNET_SYSERR;
1620   }
1621   // FIXME: check that we expect full elements here?
1622   return GNUNET_OK;
1623 }
1624
1625
1626 /**
1627  * Handle an element message from a remote peer.
1628  *
1629  * @param cls the union operation
1630  * @param emsg the message
1631  */
1632 void
1633 handle_union_p2p_full_element (void *cls,
1634                                const struct GNUNET_SET_ElementMessage *emsg)
1635 {
1636   struct Operation *op = cls;
1637   struct ElementEntry *ee;
1638   struct KeyEntry *ke;
1639   uint16_t element_size;
1640
1641   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1642   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1643   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1644   ee->element.size = element_size;
1645   ee->element.data = &ee[1];
1646   ee->element.element_type = ntohs (emsg->element_type);
1647   ee->remote = GNUNET_YES;
1648   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1649
1650   LOG (GNUNET_ERROR_TYPE_DEBUG,
1651        "Got element (full diff, size %u, hash %s) from peer\n",
1652        (unsigned int) element_size,
1653        GNUNET_h2s (&ee->element_hash));
1654
1655   GNUNET_STATISTICS_update (_GSS_statistics,
1656                             "# received elements",
1657                             1,
1658                             GNUNET_NO);
1659   GNUNET_STATISTICS_update (_GSS_statistics,
1660                             "# exchanged elements",
1661                             1,
1662                             GNUNET_NO);
1663
1664   op->state->received_total++;
1665
1666   ke = op_get_element (op, &ee->element_hash);
1667   if (NULL != ke)
1668   {
1669     /* Got repeated element.  Should not happen since
1670      * we track demands. */
1671     GNUNET_STATISTICS_update (_GSS_statistics,
1672                               "# repeated elements",
1673                               1,
1674                               GNUNET_NO);
1675     ke->received = GNUNET_YES;
1676     GNUNET_free (ee);
1677   }
1678   else
1679   {
1680     LOG (GNUNET_ERROR_TYPE_DEBUG,
1681          "Registering new element from remote peer\n");
1682     op->state->received_fresh++;
1683     op_register_element (op, ee, GNUNET_YES);
1684     /* only send results immediately if the client wants it */
1685     switch (op->result_mode)
1686     {
1687       case GNUNET_SET_RESULT_ADDED:
1688         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1689         break;
1690       case GNUNET_SET_RESULT_SYMMETRIC:
1691         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1692         break;
1693       default:
1694         /* Result mode not supported, should have been caught earlier. */
1695         GNUNET_break (0);
1696         break;
1697     }
1698   }
1699
1700   if ( (GNUNET_YES == op->byzantine) &&
1701        (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1702        (op->state->received_fresh < op->state->received_total / 6) )
1703   {
1704     /* The other peer gave us lots of old elements, there's something wrong. */
1705     LOG (GNUNET_ERROR_TYPE_ERROR,
1706          "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1707          (unsigned long long) op->state->received_fresh,
1708          (unsigned long long) op->state->received_total);
1709     GNUNET_break_op (0);
1710     fail_union_operation (op);
1711     return;
1712   }
1713   GNUNET_CADET_receive_done (op->channel);
1714 }
1715
1716
1717 /**
1718  * Send offers (for GNUNET_Hash-es) in response
1719  * to inquiries (for IBF_Key-s).
1720  *
1721  * @param cls the union operation
1722  * @param msg the message
1723  */
1724 int
1725 check_union_p2p_inquiry (void *cls,
1726                          const struct InquiryMessage *msg)
1727 {
1728   struct Operation *op = cls;
1729   unsigned int num_keys;
1730
1731   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1732   {
1733     GNUNET_break_op (0);
1734     return GNUNET_SYSERR;
1735   }
1736   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1737   {
1738     GNUNET_break_op (0);
1739     return GNUNET_SYSERR;
1740   }
1741   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1742     / sizeof (struct IBF_Key);
1743   if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1744       != num_keys * sizeof (struct IBF_Key))
1745   {
1746     GNUNET_break_op (0);
1747     return GNUNET_SYSERR;
1748   }
1749   return GNUNET_OK;
1750 }
1751
1752
1753 /**
1754  * Send offers (for GNUNET_Hash-es) in response
1755  * to inquiries (for IBF_Key-s).
1756  *
1757  * @param cls the union operation
1758  * @param msg the message
1759  */
1760 void
1761 handle_union_p2p_inquiry (void *cls,
1762                           const struct InquiryMessage *msg)
1763 {
1764   struct Operation *op = cls;
1765   const struct IBF_Key *ibf_key;
1766   unsigned int num_keys;
1767
1768   LOG (GNUNET_ERROR_TYPE_DEBUG,
1769        "Received union inquiry\n");
1770   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1771     / sizeof (struct IBF_Key);
1772   ibf_key = (const struct IBF_Key *) &msg[1];
1773   while (0 != num_keys--)
1774   {
1775     struct IBF_Key unsalted_key;
1776
1777     unsalt_key (ibf_key,
1778                 ntohl (msg->salt),
1779                 &unsalted_key);
1780     send_offers_for_key (op,
1781                          unsalted_key);
1782     ibf_key++;
1783   }
1784   GNUNET_CADET_receive_done (op->channel);
1785 }
1786
1787
1788 /**
1789  * Iterator over hash map entries, called to
1790  * destroy the linked list of colliding ibf key entries.
1791  *
1792  * @param cls closure
1793  * @param key current key code
1794  * @param value value in the hash map
1795  * @return #GNUNET_YES if we should continue to iterate,
1796  *         #GNUNET_NO if not.
1797  */
1798 static int
1799 send_missing_full_elements_iter (void *cls,
1800                                  uint32_t key,
1801                                  void *value)
1802 {
1803   struct Operation *op = cls;
1804   struct KeyEntry *ke = value;
1805   struct GNUNET_MQ_Envelope *ev;
1806   struct GNUNET_SET_ElementMessage *emsg;
1807   struct ElementEntry *ee = ke->element;
1808
1809   if (GNUNET_YES == ke->received)
1810     return GNUNET_YES;
1811   ev = GNUNET_MQ_msg_extra (emsg,
1812                             ee->element.size,
1813                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1814   GNUNET_memcpy (&emsg[1],
1815                  ee->element.data,
1816                  ee->element.size);
1817   emsg->element_type = htons (ee->element.element_type);
1818   GNUNET_MQ_send (op->mq,
1819                   ev);
1820   return GNUNET_YES;
1821 }
1822
1823
1824 /**
1825  * Handle a request for full set transmission.
1826  *
1827  * @parem cls closure, a set union operation
1828  * @param mh the demand message
1829  */
1830 void
1831 handle_union_p2p_request_full (void *cls,
1832                                const struct GNUNET_MessageHeader *mh)
1833 {
1834   struct Operation *op = cls;
1835
1836   LOG (GNUNET_ERROR_TYPE_DEBUG,
1837        "Received request for full set transmission\n");
1838   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1839   {
1840     GNUNET_break_op (0);
1841     fail_union_operation (op);
1842     return;
1843   }
1844   if (PHASE_EXPECT_IBF != op->state->phase)
1845   {
1846     GNUNET_break_op (0);
1847     fail_union_operation (op);
1848     return;
1849   }
1850
1851   // FIXME: we need to check that our set is larger than the
1852   // byzantine_lower_bound by some threshold
1853   send_full_set (op);
1854   GNUNET_CADET_receive_done (op->channel);
1855 }
1856
1857
1858 /**
1859  * Handle a "full done" message.
1860  *
1861  * @parem cls closure, a set union operation
1862  * @param mh the demand message
1863  */
1864 void
1865 handle_union_p2p_full_done (void *cls,
1866                             const struct GNUNET_MessageHeader *mh)
1867 {
1868   struct Operation *op = cls;
1869
1870   switch (op->state->phase)
1871   {
1872   case PHASE_EXPECT_IBF:
1873     {
1874       struct GNUNET_MQ_Envelope *ev;
1875
1876       LOG (GNUNET_ERROR_TYPE_DEBUG,
1877            "got FULL DONE, sending elements that other peer is missing\n");
1878
1879       /* send all the elements that did not come from the remote peer */
1880       GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1881                                                &send_missing_full_elements_iter,
1882                                                op);
1883
1884       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1885       GNUNET_MQ_send (op->mq,
1886                       ev);
1887       op->state->phase = PHASE_DONE;
1888       /* we now wait until the other peer sends us the OVER message*/
1889     }
1890     break;
1891   case PHASE_FULL_SENDING:
1892     {
1893       LOG (GNUNET_ERROR_TYPE_DEBUG,
1894            "got FULL DONE, finishing\n");
1895       /* We sent the full set, and got the response for that.  We're done. */
1896       op->state->phase = PHASE_DONE;
1897       GNUNET_CADET_receive_done (op->channel);
1898       send_client_done (op);
1899       destroy_channel (op);
1900       return;
1901     }
1902     break;
1903   default:
1904     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1905                 "Handle full done phase is %u\n",
1906                 (unsigned) op->state->phase);
1907     GNUNET_break_op (0);
1908     fail_union_operation (op);
1909     return;
1910   }
1911   GNUNET_CADET_receive_done (op->channel);
1912 }
1913
1914
1915 /**
1916  * Check a demand by the other peer for elements based on a list
1917  * of `struct GNUNET_HashCode`s.
1918  *
1919  * @parem cls closure, a set union operation
1920  * @param mh the demand message
1921  * @return #GNUNET_OK if @a mh is well-formed
1922  */
1923 int
1924 check_union_p2p_demand (void *cls,
1925                         const struct GNUNET_MessageHeader *mh)
1926 {
1927   struct Operation *op = cls;
1928   unsigned int num_hashes;
1929
1930   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1931   {
1932     GNUNET_break_op (0);
1933     return GNUNET_SYSERR;
1934   }
1935   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1936     / sizeof (struct GNUNET_HashCode);
1937   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1938       != num_hashes * sizeof (struct GNUNET_HashCode))
1939   {
1940     GNUNET_break_op (0);
1941     return GNUNET_SYSERR;
1942   }
1943   return GNUNET_OK;
1944 }
1945
1946
1947 /**
1948  * Handle a demand by the other peer for elements based on a list
1949  * of `struct GNUNET_HashCode`s.
1950  *
1951  * @parem cls closure, a set union operation
1952  * @param mh the demand message
1953  */
1954 void
1955 handle_union_p2p_demand (void *cls,
1956                          const struct GNUNET_MessageHeader *mh)
1957 {
1958   struct Operation *op = cls;
1959   struct ElementEntry *ee;
1960   struct GNUNET_SET_ElementMessage *emsg;
1961   const struct GNUNET_HashCode *hash;
1962   unsigned int num_hashes;
1963   struct GNUNET_MQ_Envelope *ev;
1964
1965   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1966     / sizeof (struct GNUNET_HashCode);
1967   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1968        num_hashes > 0;
1969        hash++, num_hashes--)
1970   {
1971     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1972                                             hash);
1973     if (NULL == ee)
1974     {
1975       /* Demand for non-existing element. */
1976       GNUNET_break_op (0);
1977       fail_union_operation (op);
1978       return;
1979     }
1980     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1981     {
1982       /* Probably confused lazily copied sets. */
1983       GNUNET_break_op (0);
1984       fail_union_operation (op);
1985       return;
1986     }
1987     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1988     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1989     emsg->reserved = htons (0);
1990     emsg->element_type = htons (ee->element.element_type);
1991     LOG (GNUNET_ERROR_TYPE_DEBUG,
1992          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1993          (void *) op,
1994          (unsigned int) ee->element.size,
1995          GNUNET_h2s (&ee->element_hash));
1996     GNUNET_MQ_send (op->mq, ev);
1997     GNUNET_STATISTICS_update (_GSS_statistics,
1998                               "# exchanged elements",
1999                               1,
2000                               GNUNET_NO);
2001
2002     switch (op->result_mode)
2003     {
2004       case GNUNET_SET_RESULT_ADDED:
2005         /* Nothing to do. */
2006         break;
2007       case GNUNET_SET_RESULT_SYMMETRIC:
2008         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2009         break;
2010       default:
2011         /* Result mode not supported, should have been caught earlier. */
2012         GNUNET_break (0);
2013         break;
2014     }
2015   }
2016   GNUNET_CADET_receive_done (op->channel);
2017 }
2018
2019
2020 /**
2021  * Check offer (of `struct GNUNET_HashCode`s).
2022  *
2023  * @param cls the union operation
2024  * @param mh the message
2025  * @return #GNUNET_OK if @a mh is well-formed
2026  */
2027 int
2028 check_union_p2p_offer (void *cls,
2029                         const struct GNUNET_MessageHeader *mh)
2030 {
2031   struct Operation *op = cls;
2032   unsigned int num_hashes;
2033
2034   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2035   {
2036     GNUNET_break_op (0);
2037     return GNUNET_SYSERR;
2038   }
2039   /* look up elements and send them */
2040   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2041        (op->state->phase != PHASE_INVENTORY_ACTIVE))
2042   {
2043     GNUNET_break_op (0);
2044     return GNUNET_SYSERR;
2045   }
2046   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2047     / sizeof (struct GNUNET_HashCode);
2048   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) !=
2049       num_hashes * sizeof (struct GNUNET_HashCode))
2050   {
2051     GNUNET_break_op (0);
2052     return GNUNET_SYSERR;
2053   }
2054   return GNUNET_OK;
2055 }
2056
2057
2058 /**
2059  * Handle offers (of `struct GNUNET_HashCode`s) and
2060  * respond with demands (of `struct GNUNET_HashCode`s).
2061  *
2062  * @param cls the union operation
2063  * @param mh the message
2064  */
2065 void
2066 handle_union_p2p_offer (void *cls,
2067                         const struct GNUNET_MessageHeader *mh)
2068 {
2069   struct Operation *op = cls;
2070   const struct GNUNET_HashCode *hash;
2071   unsigned int num_hashes;
2072
2073   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2074     / sizeof (struct GNUNET_HashCode);
2075   for (hash = (const struct GNUNET_HashCode *) &mh[1];
2076        num_hashes > 0;
2077        hash++, num_hashes--)
2078   {
2079     struct ElementEntry *ee;
2080     struct GNUNET_MessageHeader *demands;
2081     struct GNUNET_MQ_Envelope *ev;
2082
2083     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2084                                             hash);
2085     if (NULL != ee)
2086       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2087         continue;
2088
2089     if (GNUNET_YES ==
2090         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2091                                                 hash))
2092     {
2093       LOG (GNUNET_ERROR_TYPE_DEBUG,
2094            "Skipped sending duplicate demand\n");
2095       continue;
2096     }
2097
2098     GNUNET_assert (GNUNET_OK ==
2099                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
2100                                                       hash,
2101                                                       NULL,
2102                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2103
2104     LOG (GNUNET_ERROR_TYPE_DEBUG,
2105          "[OP %x] Requesting element (hash %s)\n",
2106          (void *) op, GNUNET_h2s (hash));
2107     ev = GNUNET_MQ_msg_header_extra (demands,
2108                                      sizeof (struct GNUNET_HashCode),
2109                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2110     GNUNET_memcpy (&demands[1],
2111                    hash,
2112                    sizeof (struct GNUNET_HashCode));
2113     GNUNET_MQ_send (op->mq, ev);
2114   }
2115   GNUNET_CADET_receive_done (op->channel);
2116 }
2117
2118
2119 /**
2120  * Handle a done message from a remote peer
2121  *
2122  * @param cls the union operation
2123  * @param mh the message
2124  */
2125 void
2126 handle_union_p2p_done (void *cls,
2127                        const struct GNUNET_MessageHeader *mh)
2128 {
2129   struct Operation *op = cls;
2130
2131   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2132   {
2133     GNUNET_break_op (0);
2134     fail_union_operation (op);
2135     return;
2136   }
2137   switch (op->state->phase)
2138   {
2139   case PHASE_INVENTORY_PASSIVE:
2140     /* We got all requests, but still have to send our elements in response. */
2141     op->state->phase = PHASE_FINISH_WAITING;
2142
2143     LOG (GNUNET_ERROR_TYPE_DEBUG,
2144          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2145     /* The active peer is done sending offers
2146      * and inquiries.  This means that all
2147      * our responses to that (demands and offers)
2148      * must be in flight (queued or in mesh).
2149      *
2150      * We should notify the active peer once
2151      * all our demands are satisfied, so that the active
2152      * peer can quit if we gave it everything.
2153      */
2154     GNUNET_CADET_receive_done (op->channel);
2155     maybe_finish (op);
2156     return;
2157   case PHASE_INVENTORY_ACTIVE:
2158     LOG (GNUNET_ERROR_TYPE_DEBUG,
2159          "got DONE (as active partner), waiting to finish\n");
2160     /* All demands of the other peer are satisfied,
2161      * and we processed all offers, thus we know
2162      * exactly what our demands must be.
2163      *
2164      * We'll close the channel
2165      * to the other peer once our demands are met.
2166      */
2167     op->state->phase = PHASE_FINISH_CLOSING;
2168     GNUNET_CADET_receive_done (op->channel);
2169     maybe_finish (op);
2170     return;
2171   default:
2172     GNUNET_break_op (0);
2173     fail_union_operation (op);
2174     return;
2175   }
2176 }
2177
2178 /**
2179  * Handle a over message from a remote peer
2180  *
2181  * @param cls the union operation
2182  * @param mh the message
2183  */
2184 void
2185 handle_union_p2p_over (void *cls,
2186                        const struct GNUNET_MessageHeader *mh)
2187 {
2188   send_client_done (cls);
2189 }
2190
2191
2192 /**
2193  * Initiate operation to evaluate a set union with a remote peer.
2194  *
2195  * @param op operation to perform (to be initialized)
2196  * @param opaque_context message to be transmitted to the listener
2197  *        to convince it to accept, may be NULL
2198  */
2199 static struct OperationState *
2200 union_evaluate (struct Operation *op,
2201                 const struct GNUNET_MessageHeader *opaque_context)
2202 {
2203   struct OperationState *state;
2204   struct GNUNET_MQ_Envelope *ev;
2205   struct OperationRequestMessage *msg;
2206
2207   ev = GNUNET_MQ_msg_nested_mh (msg,
2208                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2209                                 opaque_context);
2210   if (NULL == ev)
2211   {
2212     /* the context message is too large */
2213     GNUNET_break (0);
2214     return NULL;
2215   }
2216   state = GNUNET_new (struct OperationState);
2217   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2218                                                                  GNUNET_NO);
2219   /* copy the current generation's strata estimator for this operation */
2220   state->se = strata_estimator_dup (op->set->state->se);
2221   /* we started the operation, thus we have to send the operation request */
2222   state->phase = PHASE_EXPECT_SE;
2223   state->salt_receive = state->salt_send = 42; // FIXME?????
2224   LOG (GNUNET_ERROR_TYPE_DEBUG,
2225        "Initiating union operation evaluation\n");
2226   GNUNET_STATISTICS_update (_GSS_statistics,
2227                             "# of total union operations",
2228                             1,
2229                             GNUNET_NO);
2230   GNUNET_STATISTICS_update (_GSS_statistics,
2231                             "# of initiated union operations",
2232                             1,
2233                             GNUNET_NO);
2234   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2235   GNUNET_MQ_send (op->mq,
2236                   ev);
2237
2238   if (NULL != opaque_context)
2239     LOG (GNUNET_ERROR_TYPE_DEBUG,
2240          "sent op request with context message\n");
2241   else
2242     LOG (GNUNET_ERROR_TYPE_DEBUG,
2243          "sent op request without context message\n");
2244
2245   op->state = state;
2246   initialize_key_to_element (op);
2247   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2248   return state;
2249 }
2250
2251
2252 /**
2253  * Accept an union operation request from a remote peer.
2254  * Only initializes the private operation state.
2255  *
2256  * @param op operation that will be accepted as a union operation
2257  */
2258 static struct OperationState *
2259 union_accept (struct Operation *op)
2260 {
2261   struct OperationState *state;
2262   const struct StrataEstimator *se;
2263   struct GNUNET_MQ_Envelope *ev;
2264   struct StrataEstimatorMessage *strata_msg;
2265   char *buf;
2266   size_t len;
2267   uint16_t type;
2268
2269   LOG (GNUNET_ERROR_TYPE_DEBUG,
2270        "accepting set union operation\n");
2271   GNUNET_STATISTICS_update (_GSS_statistics,
2272                             "# of accepted union operations",
2273                             1,
2274                             GNUNET_NO);
2275   GNUNET_STATISTICS_update (_GSS_statistics,
2276                             "# of total union operations",
2277                             1,
2278                             GNUNET_NO);
2279
2280   state = GNUNET_new (struct OperationState);
2281   state->se = strata_estimator_dup (op->set->state->se);
2282   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2283                                                                  GNUNET_NO);
2284   state->salt_receive = state->salt_send = 42; // FIXME?????
2285   op->state = state;
2286   initialize_key_to_element (op);
2287   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2288
2289   /* kick off the operation */
2290   se = state->se;
2291   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2292   len = strata_estimator_write (se,
2293                                 buf);
2294   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2295     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2296   else
2297     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2298   ev = GNUNET_MQ_msg_extra (strata_msg,
2299                             len,
2300                             type);
2301   GNUNET_memcpy (&strata_msg[1],
2302                  buf,
2303                  len);
2304   GNUNET_free (buf);
2305   strata_msg->set_size
2306     = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
2307   GNUNET_MQ_send (op->mq,
2308                   ev);
2309   state->phase = PHASE_EXPECT_IBF;
2310   return state;
2311 }
2312
2313
2314 /**
2315  * Create a new set supporting the union operation
2316  *
2317  * We maintain one strata estimator per set and then manipulate it over the
2318  * lifetime of the set, as recreating a strata estimator would be expensive.
2319  *
2320  * @return the newly created set, NULL on error
2321  */
2322 static struct SetState *
2323 union_set_create (void)
2324 {
2325   struct SetState *set_state;
2326
2327   LOG (GNUNET_ERROR_TYPE_DEBUG,
2328        "union set created\n");
2329   set_state = GNUNET_new (struct SetState);
2330   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2331                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2332   if (NULL == set_state->se)
2333   {
2334     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2335                 "Failed to allocate strata estimator\n");
2336     GNUNET_free (set_state);
2337     return NULL;
2338   }
2339   return set_state;
2340 }
2341
2342
2343 /**
2344  * Add the element from the given element message to the set.
2345  *
2346  * @param set_state state of the set want to add to
2347  * @param ee the element to add to the set
2348  */
2349 static void
2350 union_add (struct SetState *set_state,
2351            struct ElementEntry *ee)
2352 {
2353   strata_estimator_insert (set_state->se,
2354                            get_ibf_key (&ee->element_hash));
2355 }
2356
2357
2358 /**
2359  * Remove the element given in the element message from the set.
2360  * Only marks the element as removed, so that older set operations can still exchange it.
2361  *
2362  * @param set_state state of the set to remove from
2363  * @param ee set element to remove
2364  */
2365 static void
2366 union_remove (struct SetState *set_state,
2367               struct ElementEntry *ee)
2368 {
2369   strata_estimator_remove (set_state->se,
2370                            get_ibf_key (&ee->element_hash));
2371 }
2372
2373
2374 /**
2375  * Destroy a set that supports the union operation.
2376  *
2377  * @param set_state the set to destroy
2378  */
2379 static void
2380 union_set_destroy (struct SetState *set_state)
2381 {
2382   if (NULL != set_state->se)
2383   {
2384     strata_estimator_destroy (set_state->se);
2385     set_state->se = NULL;
2386   }
2387   GNUNET_free (set_state);
2388 }
2389
2390
2391 /**
2392  * Copy union-specific set state.
2393  *
2394  * @param state source state for copying the union state
2395  * @return a copy of the union-specific set state
2396  */
2397 static struct SetState *
2398 union_copy_state (struct SetState *state)
2399 {
2400   struct SetState *new_state;
2401
2402   GNUNET_assert ( (NULL != state) &&
2403                   (NULL != state->se) );
2404   new_state = GNUNET_new (struct SetState);
2405   new_state->se = strata_estimator_dup (state->se);
2406
2407   return new_state;
2408 }
2409
2410
2411 /**
2412  * Handle case where channel went down for an operation.
2413  *
2414  * @param op operation that lost the channel
2415  */
2416 static void
2417 union_channel_death (struct Operation *op)
2418 {
2419   send_client_done (op);
2420   _GSS_operation_destroy (op,
2421                           GNUNET_YES);
2422 }
2423
2424
2425 /**
2426  * Get the table with implementing functions for
2427  * set union.
2428  *
2429  * @return the operation specific VTable
2430  */
2431 const struct SetVT *
2432 _GSS_union_vt ()
2433 {
2434   static const struct SetVT union_vt = {
2435     .create = &union_set_create,
2436     .add = &union_add,
2437     .remove = &union_remove,
2438     .destroy_set = &union_set_destroy,
2439     .evaluate = &union_evaluate,
2440     .accept = &union_accept,
2441     .cancel = &union_op_cancel,
2442     .copy_state = &union_copy_state,
2443     .channel_death = &union_channel_death
2444   };
2445
2446   return &union_vt;
2447 }