improve logging, indentation
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union.h"
32 #include "gnunet-service-set_union_strata_estimator.h"
33 #include "gnunet-service-set_protocol.h"
34 #include <gcrypt.h>
35
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
38
39
40 /**
41  * Number of IBFs in a strata estimator.
42  */
43 #define SE_STRATA_COUNT 32
44
45 /**
46  * Size of the IBFs in the strata estimator.
47  */
48 #define SE_IBF_SIZE 80
49
50 /**
51  * The hash num parameter for the difference digests and strata estimators.
52  */
53 #define SE_IBF_HASH_NUM 4
54
55 /**
56  * Number of buckets that can be transmitted in one message.
57  */
58 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
59
60 /**
61  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
62  * Choose this value so that computing the IBF is still cheaper
63  * than transmitting all values.
64  */
65 #define MAX_IBF_ORDER (20)
66
67 /**
68  * Number of buckets used in the ibf per estimated
69  * difference.
70  */
71 #define IBF_ALPHA 4
72
73
74 /**
75  * Current phase we are in for a union operation.
76  */
77 enum UnionOperationPhase
78 {
79   /**
80    * We sent the request message, and expect a strata estimator.
81    */
82   PHASE_EXPECT_SE,
83
84   /**
85    * We sent the strata estimator, and expect an IBF. This phase is entered once
86    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87    *
88    * XXX: could use better wording.
89    * XXX: repurposed to also expect a "request full set" message, should be renamed
90    *
91    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
92    */
93   PHASE_EXPECT_IBF,
94
95   /**
96    * Continuation for multi part IBFs.
97    */
98   PHASE_EXPECT_IBF_CONT,
99
100   /**
101    * We are decoding an IBF.
102    */
103   PHASE_INVENTORY_ACTIVE,
104
105   /**
106    * The other peer is decoding the IBF we just sent.
107    */
108   PHASE_INVENTORY_PASSIVE,
109
110   /**
111    * The protocol is almost finished, but we still have to flush our message
112    * queue and/or expect some elements.
113    */
114   PHASE_FINISH_CLOSING,
115
116   /**
117    * In the penultimate phase,
118    * we wait until all our demands
119    * are satisfied.  Then we send a done
120    * message, and wait for another done message.
121    */
122   PHASE_FINISH_WAITING,
123
124   /**
125    * In the ultimate phase, we wait until
126    * our demands are satisfied and then
127    * quit (sending another DONE message).
128    */
129   PHASE_DONE,
130
131   /**
132    * After sending the full set, wait for responses with the elements
133    * that the local peer is missing.
134    */
135   PHASE_FULL_SENDING,
136 };
137
138
139 /**
140  * State of an evaluate operation with another peer.
141  */
142 struct OperationState
143 {
144   /**
145    * Copy of the set's strata estimator at the time of
146    * creation of this operation.
147    */
148   struct StrataEstimator *se;
149
150   /**
151    * The IBF we currently receive.
152    */
153   struct InvertibleBloomFilter *remote_ibf;
154
155   /**
156    * The IBF with the local set's element.
157    */
158   struct InvertibleBloomFilter *local_ibf;
159
160   /**
161    * Maps unsalted IBF-Keys to elements.
162    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
163    * Colliding IBF-Keys are linked.
164    */
165   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
166
167   /**
168    * Current state of the operation.
169    */
170   enum UnionOperationPhase phase;
171
172   /**
173    * Did we send the client that we are done?
174    */
175   int client_done_sent;
176
177   /**
178    * Number of ibf buckets already received into the @a remote_ibf.
179    */
180   unsigned int ibf_buckets_received;
181
182   /**
183    * Hashes for elements that we have demanded from the other peer.
184    */
185   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
186
187   /**
188    * Salt that we're using for sending IBFs
189    */
190   uint32_t salt_send;
191
192   /**
193    * Salt for the IBF we've received and that we're currently decoding.
194    */
195   uint32_t salt_receive;
196
197   /**
198    * Number of elements we received from the other peer
199    * that were not in the local set yet.
200    */
201   uint32_t received_fresh;
202
203   /**
204    * Total number of elements received from the other peer.
205    */
206   uint32_t received_total;
207
208   /**
209    * Initial size of our set, just before
210    * the operation started.
211    */
212   uint64_t initial_size;
213 };
214
215
216 /**
217  * The key entry is used to associate an ibf key with an element.
218  */
219 struct KeyEntry
220 {
221   /**
222    * IBF key for the entry, derived from the current salt.
223    */
224   struct IBF_Key ibf_key;
225
226   /**
227    * The actual element associated with the key.
228    *
229    * Only owned by the union operation if element->operation
230    * is #GNUNET_YES.
231    */
232   struct ElementEntry *element;
233
234   /**
235    * Did we receive this element?
236    * Even if element->is_foreign is false, we might
237    * have received the element, so this indicates that
238    * the other peer has it.
239    */
240   int received;
241 };
242
243
244 /**
245  * Used as a closure for sending elements
246  * with a specific IBF key.
247  */
248 struct SendElementClosure
249 {
250   /**
251    * The IBF key whose matching elements should be
252    * sent.
253    */
254   struct IBF_Key ibf_key;
255
256   /**
257    * Operation for which the elements
258    * should be sent.
259    */
260   struct Operation *op;
261 };
262
263
264 /**
265  * Extra state required for efficient set union.
266  */
267 struct SetState
268 {
269   /**
270    * The strata estimator is only generated once for
271    * each set.
272    * The IBF keys are derived from the element hashes with
273    * salt=0.
274    */
275   struct StrataEstimator *se;
276 };
277
278
279 /**
280  * Iterator over hash map entries, called to
281  * destroy the linked list of colliding ibf key entries.
282  *
283  * @param cls closure
284  * @param key current key code
285  * @param value value in the hash map
286  * @return #GNUNET_YES if we should continue to iterate,
287  *         #GNUNET_NO if not.
288  */
289 static int
290 destroy_key_to_element_iter (void *cls,
291                              uint32_t key,
292                              void *value)
293 {
294   struct KeyEntry *k = value;
295
296   GNUNET_assert (NULL != k);
297   if (GNUNET_YES == k->element->remote)
298   {
299     GNUNET_free (k->element);
300     k->element = NULL;
301   }
302   GNUNET_free (k);
303   return GNUNET_YES;
304 }
305
306
307 /**
308  * Destroy the union operation.  Only things specific to the union
309  * operation are destroyed.
310  *
311  * @param op union operation to destroy
312  */
313 static void
314 union_op_cancel (struct Operation *op)
315 {
316   LOG (GNUNET_ERROR_TYPE_DEBUG,
317        "destroying union op\n");
318   /* check if the op was canceled twice */
319   GNUNET_assert (NULL != op->state);
320   if (NULL != op->state->remote_ibf)
321   {
322     ibf_destroy (op->state->remote_ibf);
323     op->state->remote_ibf = NULL;
324   }
325   if (NULL != op->state->demanded_hashes)
326   {
327     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
328     op->state->demanded_hashes = NULL;
329   }
330   if (NULL != op->state->local_ibf)
331   {
332     ibf_destroy (op->state->local_ibf);
333     op->state->local_ibf = NULL;
334   }
335   if (NULL != op->state->se)
336   {
337     strata_estimator_destroy (op->state->se);
338     op->state->se = NULL;
339   }
340   if (NULL != op->state->key_to_element)
341   {
342     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
343                                              &destroy_key_to_element_iter,
344                                              NULL);
345     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
346     op->state->key_to_element = NULL;
347   }
348   GNUNET_free (op->state);
349   op->state = NULL;
350   LOG (GNUNET_ERROR_TYPE_DEBUG,
351        "destroying union op done\n");
352 }
353
354
355 /**
356  * Inform the client that the union operation has failed,
357  * and proceed to destroy the evaluate operation.
358  *
359  * @param op the union operation to fail
360  */
361 static void
362 fail_union_operation (struct Operation *op)
363 {
364   struct GNUNET_MQ_Envelope *ev;
365   struct GNUNET_SET_ResultMessage *msg;
366
367   LOG (GNUNET_ERROR_TYPE_ERROR,
368        "union operation failed\n");
369   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371   msg->request_id = htonl (op->spec->client_request_id);
372   msg->element_type = htons (0);
373   GNUNET_MQ_send (op->spec->set->client_mq, ev);
374   _GSS_operation_destroy (op, GNUNET_YES);
375 }
376
377
378 /**
379  * Derive the IBF key from a hash code and
380  * a salt.
381  *
382  * @param src the hash code
383  * @return the derived IBF key
384  */
385 static struct IBF_Key
386 get_ibf_key (const struct GNUNET_HashCode *src)
387 {
388   struct IBF_Key key;
389   uint16_t salt = 0;
390
391   GNUNET_CRYPTO_kdf (&key, sizeof (key),
392                      src, sizeof *src,
393                      &salt, sizeof (salt),
394                      NULL, 0);
395   return key;
396 }
397
398
399 /**
400  * Context for #op_get_element_iterator
401  */
402 struct GetElementContext
403 {
404   struct GNUNET_HashCode hash;
405   struct KeyEntry *k;
406 };
407
408
409 /**
410  * Iterator over the mapping from IBF keys to element entries.  Checks if we
411  * have an element with a given GNUNET_HashCode.
412  *
413  * @param cls closure
414  * @param key current key code
415  * @param value value in the hash map
416  * @return #GNUNET_YES if we should search further,
417  *         #GNUNET_NO if we've found the element.
418  */
419 static int
420 op_get_element_iterator (void *cls,
421                          uint32_t key,
422                          void *value)
423 {
424   struct GetElementContext *ctx = cls;
425   struct KeyEntry *k = value;
426
427   GNUNET_assert (NULL != k);
428   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
429                                    &ctx->hash))
430   {
431     ctx->k = k;
432     return GNUNET_NO;
433   }
434   return GNUNET_YES;
435 }
436
437
438 /**
439  * Determine whether the given element is already in the operation's element
440  * set.
441  *
442  * @param op operation that should be tested for 'element_hash'
443  * @param element_hash hash of the element to look for
444  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
445  */
446 static struct KeyEntry *
447 op_get_element (struct Operation *op,
448                 const struct GNUNET_HashCode *element_hash)
449 {
450   int ret;
451   struct IBF_Key ibf_key;
452   struct GetElementContext ctx = {{{ 0 }} , 0};
453
454   ctx.hash = *element_hash;
455
456   ibf_key = get_ibf_key (element_hash);
457   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
458                                                       (uint32_t) ibf_key.key_val,
459                                                       op_get_element_iterator,
460                                                       &ctx);
461
462   /* was the iteration aborted because we found the element? */
463   if (GNUNET_SYSERR == ret)
464   {
465     GNUNET_assert (NULL != ctx.k);
466     return ctx.k;
467   }
468   return NULL;
469 }
470
471
472 /**
473  * Insert an element into the union operation's
474  * key-to-element mapping. Takes ownership of 'ee'.
475  * Note that this does not insert the element in the set,
476  * only in the operation's key-element mapping.
477  * This is done to speed up re-tried operations, if some elements
478  * were transmitted, and then the IBF fails to decode.
479  *
480  * XXX: clarify ownership, doesn't sound right.
481  *
482  * @param op the union operation
483  * @param ee the element entry
484  * @parem received was this element received from the remote peer?
485  */
486 static void
487 op_register_element (struct Operation *op,
488                      struct ElementEntry *ee,
489                      int received)
490 {
491   struct IBF_Key ibf_key;
492   struct KeyEntry *k;
493
494   ibf_key = get_ibf_key (&ee->element_hash);
495   k = GNUNET_new (struct KeyEntry);
496   k->element = ee;
497   k->ibf_key = ibf_key;
498   k->received = received;
499   GNUNET_assert (GNUNET_OK ==
500                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
501                                                       (uint32_t) ibf_key.key_val,
502                                                       k,
503                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
504 }
505
506
507 static void
508 salt_key (const struct IBF_Key *k_in,
509           uint32_t salt,
510           struct IBF_Key *k_out)
511 {
512   int s = salt % 64;
513   uint64_t x = k_in->key_val;
514   /* rotate ibf key */
515   x = (x >> s) | (x << (64 - s));
516   k_out->key_val = x;
517 }
518
519
520 static void
521 unsalt_key (const struct IBF_Key *k_in,
522             uint32_t salt,
523             struct IBF_Key *k_out)
524 {
525   int s = salt % 64;
526   uint64_t x = k_in->key_val;
527   x = (x << s) | (x >> (64 - s));
528   k_out->key_val = x;
529 }
530
531
532 /**
533  * Insert a key into an ibf.
534  *
535  * @param cls the ibf
536  * @param key unused
537  * @param value the key entry to get the key from
538  */
539 static int
540 prepare_ibf_iterator (void *cls,
541                       uint32_t key,
542                       void *value)
543 {
544   struct Operation *op = cls;
545   struct KeyEntry *ke = value;
546   struct IBF_Key salted_key;
547
548   LOG (GNUNET_ERROR_TYPE_DEBUG,
549        "[OP %x] inserting %lx (hash %s) into ibf\n",
550        (void *) op,
551        (unsigned long) ke->ibf_key.key_val,
552        GNUNET_h2s (&ke->element->element_hash));
553   salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
554   ibf_insert (op->state->local_ibf, salted_key);
555   return GNUNET_YES;
556 }
557
558
559 /**
560  * Iterator for initializing the
561  * key-to-element mapping of a union operation
562  *
563  * @param cls the union operation `struct Operation *`
564  * @param key unused
565  * @param value the `struct ElementEntry *` to insert
566  *        into the key-to-element mapping
567  * @return #GNUNET_YES (to continue iterating)
568  */
569 static int
570 init_key_to_element_iterator (void *cls,
571                               const struct GNUNET_HashCode *key,
572                               void *value)
573 {
574   struct Operation *op = cls;
575   struct ElementEntry *ee = value;
576
577   /* make sure that the element belongs to the set at the time
578    * of creating the operation */
579   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
580     return GNUNET_YES;
581
582   GNUNET_assert (GNUNET_NO == ee->remote);
583
584   op_register_element (op, ee, GNUNET_NO);
585   return GNUNET_YES;
586 }
587
588
589 /**
590  * Initialize the IBF key to element mapping local to this set
591  * operation.
592  *
593  * @param op the set union operation
594  */
595 static void
596 initialize_key_to_element (struct Operation *op)
597 {
598   unsigned int len;
599
600   GNUNET_assert (NULL == op->state->key_to_element);
601   len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
602   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
603   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
604 }
605
606
607 /**
608  * Create an ibf with the operation's elements
609  * of the specified size
610  *
611  * @param op the union operation
612  * @param size size of the ibf to create
613  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
614  */
615 static int
616 prepare_ibf (struct Operation *op,
617              uint32_t size)
618 {
619   GNUNET_assert (NULL != op->state->key_to_element);
620
621   if (NULL != op->state->local_ibf)
622     ibf_destroy (op->state->local_ibf);
623   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
624   if (NULL == op->state->local_ibf)
625   {
626     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
627                 "Failed to allocate local IBF\n");
628     return GNUNET_SYSERR;
629   }
630   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
631                                            &prepare_ibf_iterator,
632                                            op);
633   return GNUNET_OK;
634 }
635
636
637 /**
638  * Send an ibf of appropriate size.
639  *
640  * Fragments the IBF into multiple messages if necessary.
641  *
642  * @param op the union operation
643  * @param ibf_order order of the ibf to send, size=2^order
644  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
645  */
646 static int
647 send_ibf (struct Operation *op,
648           uint16_t ibf_order)
649 {
650   unsigned int buckets_sent = 0;
651   struct InvertibleBloomFilter *ibf;
652
653   if (GNUNET_OK !=
654       prepare_ibf (op, 1<<ibf_order))
655   {
656     /* allocation failed */
657     return GNUNET_SYSERR;
658   }
659
660   LOG (GNUNET_ERROR_TYPE_DEBUG,
661        "sending ibf of size %u\n",
662        1<<ibf_order);
663
664   {
665     char name[64] = { 0 };
666     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
667     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
668   }
669
670   ibf = op->state->local_ibf;
671
672   while (buckets_sent < (1 << ibf_order))
673   {
674     unsigned int buckets_in_message;
675     struct GNUNET_MQ_Envelope *ev;
676     struct IBFMessage *msg;
677
678     buckets_in_message = (1 << ibf_order) - buckets_sent;
679     /* limit to maximum */
680     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
681       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
682
683     ev = GNUNET_MQ_msg_extra (msg,
684                               buckets_in_message * IBF_BUCKET_SIZE,
685                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
686     msg->reserved1 = 0;
687     msg->reserved2 = 0;
688     msg->order = ibf_order;
689     msg->offset = htonl (buckets_sent);
690     msg->salt = htonl (op->state->salt_send);
691     ibf_write_slice (ibf, buckets_sent,
692                      buckets_in_message, &msg[1]);
693     buckets_sent += buckets_in_message;
694     LOG (GNUNET_ERROR_TYPE_DEBUG,
695          "ibf chunk size %u, %u/%u sent\n",
696          buckets_in_message,
697          buckets_sent,
698          1<<ibf_order);
699     GNUNET_MQ_send (op->mq, ev);
700   }
701
702   /* The other peer must decode the IBF, so
703    * we're passive. */
704   op->state->phase = PHASE_INVENTORY_PASSIVE;
705   return GNUNET_OK;
706 }
707
708
709 /**
710  * Send a strata estimator to the remote peer.
711  *
712  * @param op the union operation with the remote peer
713  */
714 static void
715 send_strata_estimator (struct Operation *op)
716 {
717   const struct StrataEstimator *se = op->state->se;
718   struct GNUNET_MQ_Envelope *ev;
719   struct StrataEstimatorMessage *strata_msg;
720   char *buf;
721   size_t len;
722   uint16_t type;
723
724   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
725   len = strata_estimator_write (op->state->se,
726                                 buf);
727   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
728     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
729   else
730     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
731   ev = GNUNET_MQ_msg_extra (strata_msg,
732                             len,
733                             type);
734   GNUNET_memcpy (&strata_msg[1],
735           buf,
736           len);
737   GNUNET_free (buf);
738   strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
739   GNUNET_MQ_send (op->mq,
740                   ev);
741   op->state->phase = PHASE_EXPECT_IBF;
742   LOG (GNUNET_ERROR_TYPE_DEBUG,
743        "sent SE, expecting IBF\n");
744 }
745
746
747 /**
748  * Compute the necessary order of an ibf
749  * from the size of the symmetric set difference.
750  *
751  * @param diff the difference
752  * @return the required size of the ibf
753  */
754 static unsigned int
755 get_order_from_difference (unsigned int diff)
756 {
757   unsigned int ibf_order;
758
759   ibf_order = 2;
760   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
761           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
762     ibf_order++;
763   if (ibf_order > MAX_IBF_ORDER)
764     ibf_order = MAX_IBF_ORDER;
765   // add one for correction
766   return ibf_order + 1;
767 }
768
769
770 /**
771  * Send a set element.
772  *
773  * @param cls the union operation `struct Operation *`
774  * @param key unused
775  * @param value the `struct ElementEntry *` to insert
776  *        into the key-to-element mapping
777  * @return #GNUNET_YES (to continue iterating)
778  */
779 static int
780 send_element_iterator (void *cls,
781                        const struct GNUNET_HashCode *key,
782                        void *value)
783 {
784   struct Operation *op = cls;
785   struct GNUNET_SET_ElementMessage *emsg;
786   struct ElementEntry *ee = value;
787   struct GNUNET_SET_Element *el = &ee->element;
788   struct GNUNET_MQ_Envelope *ev;
789
790   LOG (GNUNET_ERROR_TYPE_INFO,
791        "Sending element %s\n",
792        GNUNET_h2s (key));
793   ev = GNUNET_MQ_msg_extra (emsg,
794                             el->size,
795                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
796   emsg->element_type = htons (el->element_type);
797   GNUNET_memcpy (&emsg[1],
798                  el->data,
799                  el->size);
800   GNUNET_MQ_send (op->mq,
801                   ev);
802   return GNUNET_YES;
803 }
804
805
806 static void
807 send_full_set (struct Operation *op)
808 {
809   struct GNUNET_MQ_Envelope *ev;
810
811   op->state->phase = PHASE_FULL_SENDING;
812   /* FIXME: use a more memory-friendly way of doing this with an
813      iterator, just as we do in the non-full case! */
814   (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
815                                                 &send_element_iterator,
816                                                 op);
817   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
818   GNUNET_MQ_send (op->mq,
819                   ev);
820 }
821
822
823 /**
824  * Handle a strata estimator from a remote peer
825  *
826  * @param cls the union operation
827  * @param msg the message
828  */
829 int
830 check_union_p2p_strata_estimator (void *cls,
831                                   const struct StrataEstimatorMessage *msg)
832 {
833   struct Operation *op = cls;
834   int is_compressed;
835   size_t len;
836
837   if (op->state->phase != PHASE_EXPECT_SE)
838   {
839     GNUNET_break (0);
840     return GNUNET_SYSERR;
841   }
842   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
843   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
844   if ( (GNUNET_NO == is_compressed) &&
845        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
846   {
847     GNUNET_break (0);
848     return GNUNET_SYSERR;
849   }
850   return GNUNET_OK;
851 }
852
853
854 /**
855  * Handle a strata estimator from a remote peer
856  *
857  * @param cls the union operation
858  * @param msg the message
859  */
860 void
861 handle_union_p2p_strata_estimator (void *cls,
862                                    const struct StrataEstimatorMessage *msg)
863 {
864   struct Operation *op = cls;
865   struct StrataEstimator *remote_se;
866   unsigned int diff;
867   uint64_t other_size;
868   size_t len;
869   int is_compressed;
870
871   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
872   GNUNET_STATISTICS_update (_GSS_statistics,
873                             "# bytes of SE received",
874                             ntohs (msg->header.size),
875                             GNUNET_NO);
876   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
877   other_size = GNUNET_ntohll (msg->set_size);
878   remote_se = strata_estimator_create (SE_STRATA_COUNT,
879                                        SE_IBF_SIZE,
880                                        SE_IBF_HASH_NUM);
881   if (NULL == remote_se)
882   {
883     /* insufficient resources, fail */
884     fail_union_operation (op);
885     return;
886   }
887   if (GNUNET_OK !=
888       strata_estimator_read (&msg[1],
889                              len,
890                              is_compressed,
891                              remote_se))
892   {
893     /* decompression failed */
894     strata_estimator_destroy (remote_se);
895     fail_union_operation (op);
896     return;
897   }
898   GNUNET_assert (NULL != op->state->se);
899   diff = strata_estimator_difference (remote_se,
900                                       op->state->se);
901
902   if (diff > 200)
903     diff = diff * 3 / 2;
904
905   strata_estimator_destroy (remote_se);
906   strata_estimator_destroy (op->state->se);
907   op->state->se = NULL;
908   LOG (GNUNET_ERROR_TYPE_DEBUG,
909        "got se diff=%d, using ibf size %d\n",
910        diff,
911        1U << get_order_from_difference (diff));
912
913   {
914     char *set_debug;
915
916     set_debug = getenv ("GNUNET_SET_BENCHMARK");
917     if ( (NULL != set_debug) &&
918          (0 == strcmp (set_debug, "1")) )
919     {
920       FILE *f = fopen ("set.log", "a");
921       fprintf (f, "%llu\n", (unsigned long long) diff);
922       fclose (f);
923     }
924   }
925
926   if ( (GNUNET_YES == op->spec->byzantine) &&
927        (other_size < op->spec->byzantine_lower_bound) )
928   {
929     GNUNET_break (0);
930     fail_union_operation (op);
931     return;
932   }
933
934   if ( (GNUNET_YES == op->spec->force_full) ||
935        (diff > op->state->initial_size / 4) ||
936        (0 == other_size) )
937   {
938     LOG (GNUNET_ERROR_TYPE_INFO,
939          "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
940          diff,
941          op->state->initial_size);
942     GNUNET_STATISTICS_update (_GSS_statistics,
943                               "# of full sends",
944                               1,
945                               GNUNET_NO);
946     if ( (op->state->initial_size <= other_size) ||
947          (0 == other_size) )
948     {
949       send_full_set (op);
950     }
951     else
952     {
953       struct GNUNET_MQ_Envelope *ev;
954
955       LOG (GNUNET_ERROR_TYPE_INFO,
956            "Telling other peer that we expect its full set\n");
957       op->state->phase = PHASE_EXPECT_IBF;
958       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
959       GNUNET_MQ_send (op->mq,
960                       ev);
961     }
962   }
963   else
964   {
965     GNUNET_STATISTICS_update (_GSS_statistics,
966                               "# of ibf sends",
967                               1,
968                               GNUNET_NO);
969     if (GNUNET_OK !=
970         send_ibf (op,
971                   get_order_from_difference (diff)))
972     {
973       /* Internal error, best we can do is shut the connection */
974       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
975                   "Failed to send IBF, closing connection\n");
976       fail_union_operation (op);
977       return;
978     }
979   }
980   GNUNET_CADET_receive_done (op->channel);
981 }
982
983
984 /**
985  * Iterator to send elements to a remote peer
986  *
987  * @param cls closure with the element key and the union operation
988  * @param key ignored
989  * @param value the key entry
990  */
991 static int
992 send_offers_iterator (void *cls,
993                       uint32_t key,
994                       void *value)
995 {
996   struct SendElementClosure *sec = cls;
997   struct Operation *op = sec->op;
998   struct KeyEntry *ke = value;
999   struct GNUNET_MQ_Envelope *ev;
1000   struct GNUNET_MessageHeader *mh;
1001
1002   /* Detect 32-bit key collision for the 64-bit IBF keys. */
1003   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
1004     return GNUNET_YES;
1005
1006   ev = GNUNET_MQ_msg_header_extra (mh,
1007                                    sizeof (struct GNUNET_HashCode),
1008                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
1009
1010   GNUNET_assert (NULL != ev);
1011   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1012   LOG (GNUNET_ERROR_TYPE_DEBUG,
1013        "[OP %x] sending element offer (%s) to peer\n",
1014        (void *) op,
1015        GNUNET_h2s (&ke->element->element_hash));
1016   GNUNET_MQ_send (op->mq, ev);
1017   return GNUNET_YES;
1018 }
1019
1020
1021 /**
1022  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1023  *
1024  * @param op union operation
1025  * @param ibf_key IBF key of interest
1026  */
1027 static void
1028 send_offers_for_key (struct Operation *op,
1029                      struct IBF_Key ibf_key)
1030 {
1031   struct SendElementClosure send_cls;
1032
1033   send_cls.ibf_key = ibf_key;
1034   send_cls.op = op;
1035   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
1036                                                        (uint32_t) ibf_key.key_val,
1037                                                        &send_offers_iterator,
1038                                                        &send_cls);
1039 }
1040
1041
1042 /**
1043  * Decode which elements are missing on each side, and
1044  * send the appropriate offers and inquiries.
1045  *
1046  * @param op union operation
1047  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1048  */
1049 static int
1050 decode_and_send (struct Operation *op)
1051 {
1052   struct IBF_Key key;
1053   struct IBF_Key last_key;
1054   int side;
1055   unsigned int num_decoded;
1056   struct InvertibleBloomFilter *diff_ibf;
1057
1058   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1059
1060   if (GNUNET_OK !=
1061       prepare_ibf (op, op->state->remote_ibf->size))
1062   {
1063     GNUNET_break (0);
1064     /* allocation failed */
1065     return GNUNET_SYSERR;
1066   }
1067   diff_ibf = ibf_dup (op->state->local_ibf);
1068   ibf_subtract (diff_ibf, op->state->remote_ibf);
1069
1070   ibf_destroy (op->state->remote_ibf);
1071   op->state->remote_ibf = NULL;
1072
1073   LOG (GNUNET_ERROR_TYPE_DEBUG,
1074        "decoding IBF (size=%u)\n",
1075        diff_ibf->size);
1076
1077   num_decoded = 0;
1078   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1079
1080   while (1)
1081   {
1082     int res;
1083     int cycle_detected = GNUNET_NO;
1084
1085     last_key = key;
1086
1087     res = ibf_decode (diff_ibf, &side, &key);
1088     if (res == GNUNET_OK)
1089     {
1090       LOG (GNUNET_ERROR_TYPE_DEBUG,
1091            "decoded ibf key %lx\n",
1092            (unsigned long) key.key_val);
1093       num_decoded += 1;
1094       if ( (num_decoded > diff_ibf->size) ||
1095            ( (num_decoded > 1) &&
1096              (last_key.key_val == key.key_val) ) )
1097       {
1098         LOG (GNUNET_ERROR_TYPE_DEBUG,
1099              "detected cyclic ibf (decoded %u/%u)\n",
1100              num_decoded,
1101              diff_ibf->size);
1102         cycle_detected = GNUNET_YES;
1103       }
1104     }
1105     if ( (GNUNET_SYSERR == res) ||
1106          (GNUNET_YES == cycle_detected) )
1107     {
1108       int next_order;
1109       next_order = 0;
1110       while (1<<next_order < diff_ibf->size)
1111         next_order++;
1112       next_order++;
1113       if (next_order <= MAX_IBF_ORDER)
1114       {
1115         LOG (GNUNET_ERROR_TYPE_DEBUG,
1116              "decoding failed, sending larger ibf (size %u)\n",
1117              1<<next_order);
1118         GNUNET_STATISTICS_update (_GSS_statistics,
1119                                   "# of IBF retries",
1120                                   1,
1121                                   GNUNET_NO);
1122         op->state->salt_send++;
1123         if (GNUNET_OK !=
1124             send_ibf (op, next_order))
1125         {
1126           /* Internal error, best we can do is shut the connection */
1127           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1128                       "Failed to send IBF, closing connection\n");
1129           fail_union_operation (op);
1130           ibf_destroy (diff_ibf);
1131           return GNUNET_SYSERR;
1132         }
1133       }
1134       else
1135       {
1136         GNUNET_STATISTICS_update (_GSS_statistics,
1137                                   "# of failed union operations (too large)",
1138                                   1,
1139                                   GNUNET_NO);
1140         // XXX: Send the whole set, element-by-element
1141         LOG (GNUNET_ERROR_TYPE_ERROR,
1142              "set union failed: reached ibf limit\n");
1143         fail_union_operation (op);
1144         ibf_destroy (diff_ibf);
1145         return GNUNET_SYSERR;
1146       }
1147       break;
1148     }
1149     if (GNUNET_NO == res)
1150     {
1151       struct GNUNET_MQ_Envelope *ev;
1152
1153       LOG (GNUNET_ERROR_TYPE_DEBUG,
1154            "transmitted all values, sending DONE\n");
1155       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1156       GNUNET_MQ_send (op->mq, ev);
1157       /* We now wait until we get a DONE message back
1158        * and then wait for our MQ to be flushed and all our
1159        * demands be delivered. */
1160       break;
1161     }
1162     if (1 == side)
1163     {
1164       struct IBF_Key unsalted_key;
1165       unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1166       send_offers_for_key (op, unsalted_key);
1167     }
1168     else if (-1 == side)
1169     {
1170       struct GNUNET_MQ_Envelope *ev;
1171       struct InquiryMessage *msg;
1172
1173       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1174        * the effort additional complexity. */
1175       ev = GNUNET_MQ_msg_extra (msg,
1176                                 sizeof (struct IBF_Key),
1177                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1178       msg->salt = htonl (op->state->salt_receive);
1179       GNUNET_memcpy (&msg[1],
1180               &key,
1181               sizeof (struct IBF_Key));
1182       LOG (GNUNET_ERROR_TYPE_DEBUG,
1183            "sending element inquiry for IBF key %lx\n",
1184            (unsigned long) key.key_val);
1185       GNUNET_MQ_send (op->mq, ev);
1186     }
1187     else
1188     {
1189       GNUNET_assert (0);
1190     }
1191   }
1192   ibf_destroy (diff_ibf);
1193   return GNUNET_OK;
1194 }
1195
1196
1197 /**
1198  * Check an IBF message from a remote peer.
1199  *
1200  * Reassemble the IBF from multiple pieces, and
1201  * process the whole IBF once possible.
1202  *
1203  * @param cls the union operation
1204  * @param msg the header of the message
1205  * @return #GNUNET_OK if @a msg is well-formed
1206  */
1207 int
1208 check_union_p2p_ibf (void *cls,
1209                      const struct IBFMessage *msg)
1210 {
1211   struct Operation *op = cls;
1212   unsigned int buckets_in_message;
1213
1214   if (GNUNET_SET_OPERATION_UNION != op->operation)
1215   {
1216     GNUNET_break_op (0);
1217     return GNUNET_SYSERR;
1218   }
1219   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1220   if (0 == buckets_in_message)
1221   {
1222     GNUNET_break_op (0);
1223     return GNUNET_SYSERR;
1224   }
1225   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1226   {
1227     GNUNET_break_op (0);
1228     return GNUNET_SYSERR;
1229   }
1230   if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1231   {
1232     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1233     {
1234       GNUNET_break_op (0);
1235       return GNUNET_SYSERR;
1236     }
1237     if (1<<msg->order != op->state->remote_ibf->size)
1238     {
1239       GNUNET_break_op (0);
1240       return GNUNET_SYSERR;
1241     }
1242     if (ntohl (msg->salt) != op->state->salt_receive)
1243     {
1244       GNUNET_break_op (0);
1245       return GNUNET_SYSERR;
1246     }
1247   }
1248   else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1249             (op->state->phase != PHASE_EXPECT_IBF) )
1250   {
1251     GNUNET_break_op (0);
1252     return GNUNET_SYSERR;
1253   }
1254
1255   return GNUNET_OK;
1256 }
1257
1258
1259 /**
1260  * Handle an IBF message from a remote peer.
1261  *
1262  * Reassemble the IBF from multiple pieces, and
1263  * process the whole IBF once possible.
1264  *
1265  * @param cls the union operation
1266  * @param msg the header of the message
1267  */
1268 void
1269 handle_union_p2p_ibf (void *cls,
1270                       const struct IBFMessage *msg)
1271 {
1272   struct Operation *op = cls;
1273   unsigned int buckets_in_message;
1274
1275   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1276   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1277        (op->state->phase == PHASE_EXPECT_IBF) )
1278   {
1279     op->state->phase = PHASE_EXPECT_IBF_CONT;
1280     GNUNET_assert (NULL == op->state->remote_ibf);
1281     LOG (GNUNET_ERROR_TYPE_DEBUG,
1282          "Creating new ibf of size %u\n",
1283          1 << msg->order);
1284     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1285     op->state->salt_receive = ntohl (msg->salt);
1286     LOG (GNUNET_ERROR_TYPE_DEBUG,
1287          "Receiving new IBF with salt %u\n",
1288          op->state->salt_receive);
1289     if (NULL == op->state->remote_ibf)
1290     {
1291       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1292                   "Failed to parse remote IBF, closing connection\n");
1293       fail_union_operation (op);
1294       return;
1295     }
1296     op->state->ibf_buckets_received = 0;
1297     if (0 != ntohl (msg->offset))
1298     {
1299       GNUNET_break_op (0);
1300       fail_union_operation (op);
1301       return;
1302     }
1303   }
1304   else
1305   {
1306     GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1307   }
1308   GNUNET_assert (NULL != op->state->remote_ibf);
1309
1310   ibf_read_slice (&msg[1],
1311                   op->state->ibf_buckets_received,
1312                   buckets_in_message,
1313                   op->state->remote_ibf);
1314   op->state->ibf_buckets_received += buckets_in_message;
1315
1316   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1317   {
1318     LOG (GNUNET_ERROR_TYPE_DEBUG,
1319          "received full ibf\n");
1320     op->state->phase = PHASE_INVENTORY_ACTIVE;
1321     if (GNUNET_OK !=
1322         decode_and_send (op))
1323     {
1324       /* Internal error, best we can do is shut down */
1325       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1326                   "Failed to decode IBF, closing connection\n");
1327       fail_union_operation (op);
1328       return;
1329     }
1330   }
1331   GNUNET_CADET_receive_done (op->channel);
1332 }
1333
1334
1335 /**
1336  * Send a result message to the client indicating
1337  * that there is a new element.
1338  *
1339  * @param op union operation
1340  * @param element element to send
1341  * @param status status to send with the new element
1342  */
1343 static void
1344 send_client_element (struct Operation *op,
1345                      struct GNUNET_SET_Element *element,
1346                      int status)
1347 {
1348   struct GNUNET_MQ_Envelope *ev;
1349   struct GNUNET_SET_ResultMessage *rm;
1350
1351   LOG (GNUNET_ERROR_TYPE_DEBUG,
1352        "sending element (size %u) to client\n",
1353        element->size);
1354   GNUNET_assert (0 != op->spec->client_request_id);
1355   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1356   if (NULL == ev)
1357   {
1358     GNUNET_MQ_discard (ev);
1359     GNUNET_break (0);
1360     return;
1361   }
1362   rm->result_status = htons (status);
1363   rm->request_id = htonl (op->spec->client_request_id);
1364   rm->element_type = htons (element->element_type);
1365   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1366   GNUNET_memcpy (&rm[1], element->data, element->size);
1367   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1368 }
1369
1370
1371 /**
1372  * Signal to the client that the operation has finished and
1373  * destroy the operation.
1374  *
1375  * @param cls operation to destroy
1376  */
1377 static void
1378 send_done_and_destroy (void *cls)
1379 {
1380   struct Operation *op = cls;
1381   struct GNUNET_MQ_Envelope *ev;
1382   struct GNUNET_SET_ResultMessage *rm;
1383
1384   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1385   rm->request_id = htonl (op->spec->client_request_id);
1386   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1387   rm->element_type = htons (0);
1388   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1389   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1390   /* Will also call the union-specific cancel function. */
1391   _GSS_operation_destroy (op, GNUNET_YES);
1392 }
1393
1394
1395 /**
1396  * Tests if the operation is finished, and if so notify.
1397  *
1398  * @param op operation to check
1399  */
1400 static void
1401 maybe_finish (struct Operation *op)
1402 {
1403   unsigned int num_demanded;
1404
1405   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1406
1407   if (PHASE_FINISH_WAITING == op->state->phase)
1408   {
1409     LOG (GNUNET_ERROR_TYPE_DEBUG,
1410          "In PHASE_FINISH_WAITING, pending %u demands\n",
1411          num_demanded);
1412     if (0 == num_demanded)
1413     {
1414       struct GNUNET_MQ_Envelope *ev;
1415
1416       op->state->phase = PHASE_DONE;
1417       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1418       GNUNET_MQ_send (op->mq, ev);
1419
1420       /* We now wait until the other peer closes the channel
1421        * after it got all elements from us. */
1422     }
1423   }
1424   if (PHASE_FINISH_CLOSING == op->state->phase)
1425   {
1426     LOG (GNUNET_ERROR_TYPE_DEBUG,
1427          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1428          num_demanded);
1429     if (0 == num_demanded)
1430     {
1431       op->state->phase = PHASE_DONE;
1432       send_done_and_destroy (op);
1433     }
1434   }
1435 }
1436
1437
1438 /**
1439  * Check an element message from a remote peer.
1440  *
1441  * @param cls the union operation
1442  * @param emsg the message
1443  */
1444 int
1445 check_union_p2p_elements (void *cls,
1446                           const struct GNUNET_SET_ElementMessage *emsg)
1447 {
1448   struct Operation *op = cls;
1449
1450   if (GNUNET_SET_OPERATION_UNION != op->operation)
1451   {
1452     GNUNET_break_op (0);
1453     return GNUNET_SYSERR;
1454   }
1455   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1456   {
1457     GNUNET_break_op (0);
1458     return GNUNET_SYSERR;
1459   }
1460   return GNUNET_OK;
1461 }
1462
1463
1464 /**
1465  * Handle an element message from a remote peer.
1466  * Sent by the other peer either because we decoded an IBF and placed a demand,
1467  * or because the other peer switched to full set transmission.
1468  *
1469  * @param cls the union operation
1470  * @param emsg the message
1471  */
1472 void
1473 handle_union_p2p_elements (void *cls,
1474                            const struct GNUNET_SET_ElementMessage *emsg)
1475 {
1476   struct Operation *op = cls;
1477   struct ElementEntry *ee;
1478   struct KeyEntry *ke;
1479   uint16_t element_size;
1480
1481   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1482   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1483   GNUNET_memcpy (&ee[1],
1484                  &emsg[1],
1485                  element_size);
1486   ee->element.size = element_size;
1487   ee->element.data = &ee[1];
1488   ee->element.element_type = ntohs (emsg->element_type);
1489   ee->remote = GNUNET_YES;
1490   GNUNET_SET_element_hash (&ee->element,
1491                            &ee->element_hash);
1492   if (GNUNET_NO ==
1493       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1494                                             &ee->element_hash,
1495                                             NULL))
1496   {
1497     /* We got something we didn't demand, since it's not in our map. */
1498     GNUNET_break_op (0);
1499     fail_union_operation (op);
1500     return;
1501   }
1502
1503   LOG (GNUNET_ERROR_TYPE_DEBUG,
1504        "Got element (size %u, hash %s) from peer\n",
1505        (unsigned int) element_size,
1506        GNUNET_h2s (&ee->element_hash));
1507
1508   GNUNET_STATISTICS_update (_GSS_statistics,
1509                             "# received elements",
1510                             1,
1511                             GNUNET_NO);
1512   GNUNET_STATISTICS_update (_GSS_statistics,
1513                             "# exchanged elements",
1514                             1,
1515                             GNUNET_NO);
1516
1517   op->state->received_total++;
1518
1519   ke = op_get_element (op, &ee->element_hash);
1520   if (NULL != ke)
1521   {
1522     /* Got repeated element.  Should not happen since
1523      * we track demands. */
1524     GNUNET_STATISTICS_update (_GSS_statistics,
1525                               "# repeated elements",
1526                               1,
1527                               GNUNET_NO);
1528     ke->received = GNUNET_YES;
1529     GNUNET_free (ee);
1530   }
1531   else
1532   {
1533     LOG (GNUNET_ERROR_TYPE_DEBUG,
1534          "Registering new element from remote peer\n");
1535     op->state->received_fresh++;
1536     op_register_element (op, ee, GNUNET_YES);
1537     /* only send results immediately if the client wants it */
1538     switch (op->spec->result_mode)
1539     {
1540       case GNUNET_SET_RESULT_ADDED:
1541         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1542         break;
1543       case GNUNET_SET_RESULT_SYMMETRIC:
1544         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1545         break;
1546       default:
1547         /* Result mode not supported, should have been caught earlier. */
1548         GNUNET_break (0);
1549         break;
1550     }
1551   }
1552
1553   if ( (op->state->received_total > 8) &&
1554        (op->state->received_fresh < op->state->received_total / 3) )
1555   {
1556     /* The other peer gave us lots of old elements, there's something wrong. */
1557     GNUNET_break_op (0);
1558     fail_union_operation (op);
1559     return;
1560   }
1561   GNUNET_CADET_receive_done (op->channel);
1562   maybe_finish (op);
1563 }
1564
1565
1566 /**
1567  * Check a full element message from a remote peer.
1568  *
1569  * @param cls the union operation
1570  * @param emsg the message
1571  */
1572 int
1573 check_union_p2p_full_element (void *cls,
1574                               const struct GNUNET_SET_ElementMessage *emsg)
1575 {
1576   struct Operation *op = cls;
1577
1578   if (GNUNET_SET_OPERATION_UNION != op->operation)
1579   {
1580     GNUNET_break_op (0);
1581     return GNUNET_SYSERR;
1582   }
1583   // FIXME: check that we expect full elements here?
1584   return GNUNET_OK;
1585 }
1586
1587
1588 /**
1589  * Handle an element message from a remote peer.
1590  *
1591  * @param cls the union operation
1592  * @param emsg the message
1593  */
1594 void
1595 handle_union_p2p_full_element (void *cls,
1596                                const struct GNUNET_SET_ElementMessage *emsg)
1597 {
1598   struct Operation *op = cls;
1599   struct ElementEntry *ee;
1600   struct KeyEntry *ke;
1601   uint16_t element_size;
1602
1603   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1604   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1605   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1606   ee->element.size = element_size;
1607   ee->element.data = &ee[1];
1608   ee->element.element_type = ntohs (emsg->element_type);
1609   ee->remote = GNUNET_YES;
1610   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1611
1612   LOG (GNUNET_ERROR_TYPE_DEBUG,
1613        "Got element (full diff, size %u, hash %s) from peer\n",
1614        (unsigned int) element_size,
1615        GNUNET_h2s (&ee->element_hash));
1616
1617   GNUNET_STATISTICS_update (_GSS_statistics,
1618                             "# received elements",
1619                             1,
1620                             GNUNET_NO);
1621   GNUNET_STATISTICS_update (_GSS_statistics,
1622                             "# exchanged elements",
1623                             1,
1624                             GNUNET_NO);
1625
1626   op->state->received_total++;
1627
1628   ke = op_get_element (op, &ee->element_hash);
1629   if (NULL != ke)
1630   {
1631     /* Got repeated element.  Should not happen since
1632      * we track demands. */
1633     GNUNET_STATISTICS_update (_GSS_statistics,
1634                               "# repeated elements",
1635                               1,
1636                               GNUNET_NO);
1637     ke->received = GNUNET_YES;
1638     GNUNET_free (ee);
1639   }
1640   else
1641   {
1642     LOG (GNUNET_ERROR_TYPE_DEBUG,
1643          "Registering new element from remote peer\n");
1644     op->state->received_fresh++;
1645     op_register_element (op, ee, GNUNET_YES);
1646     /* only send results immediately if the client wants it */
1647     switch (op->spec->result_mode)
1648     {
1649       case GNUNET_SET_RESULT_ADDED:
1650         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1651         break;
1652       case GNUNET_SET_RESULT_SYMMETRIC:
1653         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1654         break;
1655       default:
1656         /* Result mode not supported, should have been caught earlier. */
1657         GNUNET_break (0);
1658         break;
1659     }
1660   }
1661
1662   if ( (GNUNET_YES == op->spec->byzantine) &&
1663        (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1664        (op->state->received_fresh < op->state->received_total / 6) )
1665   {
1666     /* The other peer gave us lots of old elements, there's something wrong. */
1667     LOG (GNUNET_ERROR_TYPE_ERROR,
1668          "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1669          (unsigned long long) op->state->received_fresh,
1670          (unsigned long long) op->state->received_total);
1671     GNUNET_break_op (0);
1672     fail_union_operation (op);
1673     return;
1674   }
1675   GNUNET_CADET_receive_done (op->channel);
1676 }
1677
1678
1679 /**
1680  * Send offers (for GNUNET_Hash-es) in response
1681  * to inquiries (for IBF_Key-s).
1682  *
1683  * @param cls the union operation
1684  * @param msg the message
1685  */
1686 int
1687 check_union_p2p_inquiry (void *cls,
1688                          const struct InquiryMessage *msg)
1689 {
1690   struct Operation *op = cls;
1691   unsigned int num_keys;
1692
1693   if (GNUNET_SET_OPERATION_UNION != op->operation)
1694   {
1695     GNUNET_break_op (0);
1696     return GNUNET_SYSERR;
1697   }
1698   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1699   {
1700     GNUNET_break_op (0);
1701     return GNUNET_SYSERR;
1702   }
1703   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1704     / sizeof (struct IBF_Key);
1705   if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1706       != num_keys * sizeof (struct IBF_Key))
1707   {
1708     GNUNET_break_op (0);
1709     return GNUNET_SYSERR;
1710   }
1711   return GNUNET_OK;
1712 }
1713
1714
1715 /**
1716  * Send offers (for GNUNET_Hash-es) in response
1717  * to inquiries (for IBF_Key-s).
1718  *
1719  * @param cls the union operation
1720  * @param msg the message
1721  */
1722 void
1723 handle_union_p2p_inquiry (void *cls,
1724                           const struct InquiryMessage *msg)
1725 {
1726   struct Operation *op = cls;
1727   const struct IBF_Key *ibf_key;
1728   unsigned int num_keys;
1729
1730   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1731     / sizeof (struct IBF_Key);
1732   ibf_key = (const struct IBF_Key *) &msg[1];
1733   while (0 != num_keys--)
1734   {
1735     struct IBF_Key unsalted_key;
1736
1737     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1738     send_offers_for_key (op, unsalted_key);
1739     ibf_key++;
1740   }
1741   GNUNET_CADET_receive_done (op->channel);
1742 }
1743
1744
1745 /**
1746  * Iterator over hash map entries, called to
1747  * destroy the linked list of colliding ibf key entries.
1748  *
1749  * @param cls closure
1750  * @param key current key code
1751  * @param value value in the hash map
1752  * @return #GNUNET_YES if we should continue to iterate,
1753  *         #GNUNET_NO if not.
1754  */
1755 static int
1756 send_missing_elements_iter (void *cls,
1757                             uint32_t key,
1758                             void *value)
1759 {
1760   struct Operation *op = cls;
1761   struct KeyEntry *ke = value;
1762   struct GNUNET_MQ_Envelope *ev;
1763   struct GNUNET_SET_ElementMessage *emsg;
1764   struct ElementEntry *ee = ke->element;
1765
1766   if (GNUNET_YES == ke->received)
1767     return GNUNET_YES;
1768
1769   ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1770   GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1771   emsg->reserved = htons (0);
1772   emsg->element_type = htons (ee->element.element_type);
1773   GNUNET_MQ_send (op->mq, ev);
1774
1775   return GNUNET_YES;
1776 }
1777
1778
1779 /**
1780  * Handle a request for full set transmission.
1781  *
1782  * @parem cls closure, a set union operation
1783  * @param mh the demand message
1784  */
1785 void
1786 handle_union_p2p_request_full (void *cls,
1787                                const struct GNUNET_MessageHeader *mh)
1788 {
1789   struct Operation *op = cls;
1790
1791   LOG (GNUNET_ERROR_TYPE_INFO,
1792        "Received request for full set transmission\n");
1793   if (GNUNET_SET_OPERATION_UNION != op->operation)
1794   {
1795     GNUNET_break_op (0);
1796     fail_union_operation (op);
1797     return;
1798   }
1799   if (PHASE_EXPECT_IBF != op->state->phase)
1800   {
1801     GNUNET_break_op (0);
1802     fail_union_operation (op);
1803     return;
1804   }
1805
1806   // FIXME: we need to check that our set is larger than the
1807   // byzantine_lower_bound by some threshold
1808   send_full_set (op);
1809   GNUNET_CADET_receive_done (op->channel);
1810 }
1811
1812
1813 /**
1814  * Handle a "full done" message.
1815  *
1816  * @parem cls closure, a set union operation
1817  * @param mh the demand message
1818  */
1819 void
1820 handle_union_p2p_full_done (void *cls,
1821                             const struct GNUNET_MessageHeader *mh)
1822 {
1823   struct Operation *op = cls;
1824
1825   switch (op->state->phase)
1826   {
1827   case PHASE_EXPECT_IBF:
1828     {
1829       struct GNUNET_MQ_Envelope *ev;
1830
1831       LOG (GNUNET_ERROR_TYPE_DEBUG,
1832            "got FULL DONE, sending elements that other peer is missing\n");
1833
1834       /* send all the elements that did not come from the remote peer */
1835       GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1836                                                &send_missing_elements_iter,
1837                                                op);
1838
1839       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1840       GNUNET_MQ_send (op->mq, ev);
1841       op->state->phase = PHASE_DONE;
1842       /* we now wait until the other peer shuts the tunnel down*/
1843     }
1844     break;
1845   case PHASE_FULL_SENDING:
1846     {
1847       LOG (GNUNET_ERROR_TYPE_DEBUG,
1848            "got FULL DONE, finishing\n");
1849       /* We sent the full set, and got the response for that.  We're done. */
1850       op->state->phase = PHASE_DONE;
1851       GNUNET_CADET_receive_done (op->channel);
1852       send_done_and_destroy (op);
1853       return;
1854     }
1855     break;
1856   default:
1857     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1858                 "Handle full done phase is %u\n",
1859                 (unsigned) op->state->phase);
1860     GNUNET_break_op (0);
1861     fail_union_operation (op);
1862     return;
1863   }
1864   GNUNET_CADET_receive_done (op->channel);
1865 }
1866
1867
1868 /**
1869  * Check a demand by the other peer for elements based on a list
1870  * of `struct GNUNET_HashCode`s.
1871  *
1872  * @parem cls closure, a set union operation
1873  * @param mh the demand message
1874  * @return #GNUNET_OK if @a mh is well-formed
1875  */
1876 int
1877 check_union_p2p_demand (void *cls,
1878                         const struct GNUNET_MessageHeader *mh)
1879 {
1880   struct Operation *op = cls;
1881   unsigned int num_hashes;
1882
1883   if (GNUNET_SET_OPERATION_UNION != op->operation)
1884   {
1885     GNUNET_break_op (0);
1886     return GNUNET_SYSERR;
1887   }
1888   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1889     / sizeof (struct GNUNET_HashCode);
1890   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1891       != num_hashes * sizeof (struct GNUNET_HashCode))
1892   {
1893     GNUNET_break_op (0);
1894     return GNUNET_SYSERR;
1895   }
1896   return GNUNET_OK;
1897 }
1898
1899
1900 /**
1901  * Handle a demand by the other peer for elements based on a list
1902  * of `struct GNUNET_HashCode`s.
1903  *
1904  * @parem cls closure, a set union operation
1905  * @param mh the demand message
1906  */
1907 void
1908 handle_union_p2p_demand (void *cls,
1909                          const struct GNUNET_MessageHeader *mh)
1910 {
1911   struct Operation *op = cls;
1912   struct ElementEntry *ee;
1913   struct GNUNET_SET_ElementMessage *emsg;
1914   const struct GNUNET_HashCode *hash;
1915   unsigned int num_hashes;
1916   struct GNUNET_MQ_Envelope *ev;
1917
1918   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1919     / sizeof (struct GNUNET_HashCode);
1920   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1921        num_hashes > 0;
1922        hash++, num_hashes--)
1923   {
1924     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1925                                             hash);
1926     if (NULL == ee)
1927     {
1928       /* Demand for non-existing element. */
1929       GNUNET_break_op (0);
1930       fail_union_operation (op);
1931       return;
1932     }
1933     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1934     {
1935       /* Probably confused lazily copied sets. */
1936       GNUNET_break_op (0);
1937       fail_union_operation (op);
1938       return;
1939     }
1940     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1941     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1942     emsg->reserved = htons (0);
1943     emsg->element_type = htons (ee->element.element_type);
1944     LOG (GNUNET_ERROR_TYPE_DEBUG,
1945          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1946          (void *) op,
1947          (unsigned int) ee->element.size,
1948          GNUNET_h2s (&ee->element_hash));
1949     GNUNET_MQ_send (op->mq, ev);
1950     GNUNET_STATISTICS_update (_GSS_statistics,
1951                               "# exchanged elements",
1952                               1,
1953                               GNUNET_NO);
1954
1955     switch (op->spec->result_mode)
1956     {
1957       case GNUNET_SET_RESULT_ADDED:
1958         /* Nothing to do. */
1959         break;
1960       case GNUNET_SET_RESULT_SYMMETRIC:
1961         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1962         break;
1963       default:
1964         /* Result mode not supported, should have been caught earlier. */
1965         GNUNET_break (0);
1966         break;
1967     }
1968   }
1969   GNUNET_CADET_receive_done (op->channel);
1970 }
1971
1972
1973 /**
1974  * Check offer (of `struct GNUNET_HashCode`s).
1975  *
1976  * @param cls the union operation
1977  * @param mh the message
1978  * @return #GNUNET_OK if @a mh is well-formed
1979  */
1980 int
1981 check_union_p2p_offer (void *cls,
1982                         const struct GNUNET_MessageHeader *mh)
1983 {
1984   struct Operation *op = cls;
1985   unsigned int num_hashes;
1986
1987   if (GNUNET_SET_OPERATION_UNION != op->operation)
1988   {
1989     GNUNET_break_op (0);
1990     return GNUNET_SYSERR;
1991   }
1992   /* look up elements and send them */
1993   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1994        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1995   {
1996     GNUNET_break_op (0);
1997     return GNUNET_SYSERR;
1998   }
1999   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2000     / sizeof (struct GNUNET_HashCode);
2001   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2002       != num_hashes * sizeof (struct GNUNET_HashCode))
2003   {
2004     GNUNET_break_op (0);
2005     return GNUNET_SYSERR;
2006   }
2007   return GNUNET_OK;
2008 }
2009
2010
2011 /**
2012  * Handle offers (of `struct GNUNET_HashCode`s) and
2013  * respond with demands (of `struct GNUNET_HashCode`s).
2014  *
2015  * @param cls the union operation
2016  * @param mh the message
2017  */
2018 void
2019 handle_union_p2p_offer (void *cls,
2020                         const struct GNUNET_MessageHeader *mh)
2021 {
2022   struct Operation *op = cls;
2023   const struct GNUNET_HashCode *hash;
2024   unsigned int num_hashes;
2025
2026   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2027     / sizeof (struct GNUNET_HashCode);
2028   for (hash = (const struct GNUNET_HashCode *) &mh[1];
2029        num_hashes > 0;
2030        hash++, num_hashes--)
2031   {
2032     struct ElementEntry *ee;
2033     struct GNUNET_MessageHeader *demands;
2034     struct GNUNET_MQ_Envelope *ev;
2035
2036     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
2037                                             hash);
2038     if (NULL != ee)
2039       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2040         continue;
2041
2042     if (GNUNET_YES ==
2043         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2044                                                 hash))
2045     {
2046       LOG (GNUNET_ERROR_TYPE_DEBUG,
2047            "Skipped sending duplicate demand\n");
2048       continue;
2049     }
2050
2051     GNUNET_assert (GNUNET_OK ==
2052                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
2053                                                       hash,
2054                                                       NULL,
2055                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2056
2057     LOG (GNUNET_ERROR_TYPE_DEBUG,
2058          "[OP %x] Requesting element (hash %s)\n",
2059          (void *) op, GNUNET_h2s (hash));
2060     ev = GNUNET_MQ_msg_header_extra (demands,
2061                                      sizeof (struct GNUNET_HashCode),
2062                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2063     *(struct GNUNET_HashCode *) &demands[1] = *hash;
2064     GNUNET_MQ_send (op->mq, ev);
2065   }
2066   GNUNET_CADET_receive_done (op->channel);
2067 }
2068
2069
2070 /**
2071  * Handle a done message from a remote peer
2072  *
2073  * @param cls the union operation
2074  * @param mh the message
2075  */
2076 void
2077 handle_union_p2p_done (void *cls,
2078                        const struct GNUNET_MessageHeader *mh)
2079 {
2080   struct Operation *op = cls;
2081
2082   if (GNUNET_SET_OPERATION_UNION != op->operation)
2083   {
2084     GNUNET_break_op (0);
2085     fail_union_operation (op);
2086     return;
2087   }
2088   switch (op->state->phase)
2089   {
2090   case PHASE_INVENTORY_PASSIVE:
2091     /* We got all requests, but still have to send our elements in response. */
2092     op->state->phase = PHASE_FINISH_WAITING;
2093
2094     LOG (GNUNET_ERROR_TYPE_DEBUG,
2095          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2096     /* The active peer is done sending offers
2097      * and inquiries.  This means that all
2098      * our responses to that (demands and offers)
2099      * must be in flight (queued or in mesh).
2100      *
2101      * We should notify the active peer once
2102      * all our demands are satisfied, so that the active
2103      * peer can quit if we gave him everything.
2104      */
2105     GNUNET_CADET_receive_done (op->channel);
2106     maybe_finish (op);
2107     return;
2108   case PHASE_INVENTORY_ACTIVE:
2109     LOG (GNUNET_ERROR_TYPE_DEBUG,
2110          "got DONE (as active partner), waiting to finish\n");
2111     /* All demands of the other peer are satisfied,
2112      * and we processed all offers, thus we know
2113      * exactly what our demands must be.
2114      *
2115      * We'll close the channel
2116      * to the other peer once our demands are met.
2117      */
2118     op->state->phase = PHASE_FINISH_CLOSING;
2119     GNUNET_CADET_receive_done (op->channel);
2120     maybe_finish (op);
2121     return;
2122   default:
2123     GNUNET_break_op (0);
2124     fail_union_operation (op);
2125     return;
2126   }
2127 }
2128
2129
2130 /**
2131  * Initiate operation to evaluate a set union with a remote peer.
2132  *
2133  * @param op operation to perform (to be initialized)
2134  * @param opaque_context message to be transmitted to the listener
2135  *        to convince him to accept, may be NULL
2136  */
2137 static void
2138 union_evaluate (struct Operation *op,
2139                 const struct GNUNET_MessageHeader *opaque_context)
2140 {
2141   struct GNUNET_MQ_Envelope *ev;
2142   struct OperationRequestMessage *msg;
2143
2144   GNUNET_assert (NULL == op->state);
2145   op->state = GNUNET_new (struct OperationState);
2146   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2147   /* copy the current generation's strata estimator for this operation */
2148   op->state->se = strata_estimator_dup (op->spec->set->state->se);
2149   /* we started the operation, thus we have to send the operation request */
2150   op->state->phase = PHASE_EXPECT_SE;
2151   op->state->salt_receive = op->state->salt_send = 42;
2152   LOG (GNUNET_ERROR_TYPE_DEBUG,
2153        "Initiating union operation evaluation\n");
2154   GNUNET_STATISTICS_update (_GSS_statistics,
2155                             "# of total union operations",
2156                             1,
2157                             GNUNET_NO);
2158   GNUNET_STATISTICS_update (_GSS_statistics,
2159                             "# of initiated union operations",
2160                             1,
2161                             GNUNET_NO);
2162   ev = GNUNET_MQ_msg_nested_mh (msg,
2163                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2164                                 opaque_context);
2165   if (NULL == ev)
2166   {
2167     /* the context message is too large */
2168     GNUNET_break (0);
2169     GNUNET_SERVICE_client_drop (op->spec->set->client);
2170     return;
2171   }
2172   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2173   GNUNET_MQ_send (op->mq,
2174                   ev);
2175
2176   if (NULL != opaque_context)
2177     LOG (GNUNET_ERROR_TYPE_DEBUG,
2178          "sent op request with context message\n");
2179   else
2180     LOG (GNUNET_ERROR_TYPE_DEBUG,
2181          "sent op request without context message\n");
2182
2183   initialize_key_to_element (op);
2184   op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2185 }
2186
2187
2188 /**
2189  * Accept an union operation request from a remote peer.
2190  * Only initializes the private operation state.
2191  *
2192  * @param op operation that will be accepted as a union operation
2193  */
2194 static void
2195 union_accept (struct Operation *op)
2196 {
2197   LOG (GNUNET_ERROR_TYPE_DEBUG,
2198        "accepting set union operation\n");
2199   GNUNET_assert (NULL == op->state);
2200
2201   GNUNET_STATISTICS_update (_GSS_statistics,
2202                             "# of accepted union operations",
2203                             1,
2204                             GNUNET_NO);
2205   GNUNET_STATISTICS_update (_GSS_statistics,
2206                             "# of total union operations",
2207                             1,
2208                             GNUNET_NO);
2209
2210   op->state = GNUNET_new (struct OperationState);
2211   op->state->se = strata_estimator_dup (op->spec->set->state->se);
2212   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2213   op->state->salt_receive = op->state->salt_send = 42;
2214   initialize_key_to_element (op);
2215   op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2216   /* kick off the operation */
2217   send_strata_estimator (op);
2218 }
2219
2220
2221 /**
2222  * Create a new set supporting the union operation
2223  *
2224  * We maintain one strata estimator per set and then manipulate it over the
2225  * lifetime of the set, as recreating a strata estimator would be expensive.
2226  *
2227  * @return the newly created set, NULL on error
2228  */
2229 static struct SetState *
2230 union_set_create (void)
2231 {
2232   struct SetState *set_state;
2233
2234   LOG (GNUNET_ERROR_TYPE_DEBUG,
2235        "union set created\n");
2236   set_state = GNUNET_new (struct SetState);
2237   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2238                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2239   if (NULL == set_state->se)
2240   {
2241     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2242                 "Failed to allocate strata estimator\n");
2243     GNUNET_free (set_state);
2244     return NULL;
2245   }
2246   return set_state;
2247 }
2248
2249
2250 /**
2251  * Add the element from the given element message to the set.
2252  *
2253  * @param set_state state of the set want to add to
2254  * @param ee the element to add to the set
2255  */
2256 static void
2257 union_add (struct SetState *set_state, struct ElementEntry *ee)
2258 {
2259   strata_estimator_insert (set_state->se,
2260                            get_ibf_key (&ee->element_hash));
2261 }
2262
2263
2264 /**
2265  * Remove the element given in the element message from the set.
2266  * Only marks the element as removed, so that older set operations can still exchange it.
2267  *
2268  * @param set_state state of the set to remove from
2269  * @param ee set element to remove
2270  */
2271 static void
2272 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2273 {
2274   strata_estimator_remove (set_state->se,
2275                            get_ibf_key (&ee->element_hash));
2276 }
2277
2278
2279 /**
2280  * Destroy a set that supports the union operation.
2281  *
2282  * @param set_state the set to destroy
2283  */
2284 static void
2285 union_set_destroy (struct SetState *set_state)
2286 {
2287   if (NULL != set_state->se)
2288   {
2289     strata_estimator_destroy (set_state->se);
2290     set_state->se = NULL;
2291   }
2292   GNUNET_free (set_state);
2293 }
2294
2295
2296 /**
2297  * Handler for peer-disconnects, notifies the client
2298  * about the aborted operation in case the op was not concluded.
2299  *
2300  * @param op the destroyed operation
2301  */
2302 static void
2303 union_peer_disconnect (struct Operation *op)
2304 {
2305   if (PHASE_DONE != op->state->phase)
2306   {
2307     struct GNUNET_MQ_Envelope *ev;
2308     struct GNUNET_SET_ResultMessage *msg;
2309
2310     ev = GNUNET_MQ_msg (msg,
2311                         GNUNET_MESSAGE_TYPE_SET_RESULT);
2312     msg->request_id = htonl (op->spec->client_request_id);
2313     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2314     msg->element_type = htons (0);
2315     GNUNET_MQ_send (op->spec->set->client_mq,
2316                     ev);
2317     LOG (GNUNET_ERROR_TYPE_WARNING,
2318          "other peer disconnected prematurely, phase %u\n",
2319          op->state->phase);
2320     _GSS_operation_destroy (op,
2321                             GNUNET_YES);
2322     return;
2323   }
2324   // else: the session has already been concluded
2325   LOG (GNUNET_ERROR_TYPE_DEBUG,
2326        "other peer disconnected (finished)\n");
2327   if (GNUNET_NO == op->state->client_done_sent)
2328     send_done_and_destroy (op);
2329 }
2330
2331
2332 /**
2333  * Copy union-specific set state.
2334  *
2335  * @param set source set for copying the union state
2336  * @return a copy of the union-specific set state
2337  */
2338 static struct SetState *
2339 union_copy_state (struct Set *set)
2340 {
2341   struct SetState *new_state;
2342
2343   new_state = GNUNET_new (struct SetState);
2344   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2345   new_state->se = strata_estimator_dup (set->state->se);
2346
2347   return new_state;
2348 }
2349
2350
2351 /**
2352  * Get the table with implementing functions for
2353  * set union.
2354  *
2355  * @return the operation specific VTable
2356  */
2357 const struct SetVT *
2358 _GSS_union_vt ()
2359 {
2360   static const struct SetVT union_vt = {
2361     .create = &union_set_create,
2362     .add = &union_add,
2363     .remove = &union_remove,
2364     .destroy_set = &union_set_destroy,
2365     .evaluate = &union_evaluate,
2366     .accept = &union_accept,
2367     .peer_disconnect = &union_peer_disconnect,
2368     .cancel = &union_op_cancel,
2369     .copy_state = &union_copy_state,
2370   };
2371
2372   return &union_vt;
2373 }