uncrustify as demanded.
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union.h"
32 #include "gnunet-service-set_union_strata_estimator.h"
33 #include "gnunet-service-set_protocol.h"
34 #include <gcrypt.h>
35
36
37 #define LOG(kind, ...) GNUNET_log_from(kind, "set-union", __VA_ARGS__)
38
39
40 /**
41  * Number of IBFs in a strata estimator.
42  */
43 #define SE_STRATA_COUNT 32
44
45 /**
46  * Size of the IBFs in the strata estimator.
47  */
48 #define SE_IBF_SIZE 80
49
50 /**
51  * The hash num parameter for the difference digests and strata estimators.
52  */
53 #define SE_IBF_HASH_NUM 4
54
55 /**
56  * Number of buckets that can be transmitted in one message.
57  */
58 #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
59
60 /**
61  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
62  * Choose this value so that computing the IBF is still cheaper
63  * than transmitting all values.
64  */
65 #define MAX_IBF_ORDER (20)
66
67 /**
68  * Number of buckets used in the ibf per estimated
69  * difference.
70  */
71 #define IBF_ALPHA 4
72
73
74 /**
75  * Current phase we are in for a union operation.
76  */
77 enum UnionOperationPhase {
78   /**
79    * We sent the request message, and expect a strata estimator.
80    */
81   PHASE_EXPECT_SE,
82
83   /**
84    * We sent the strata estimator, and expect an IBF. This phase is entered once
85    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86    *
87    * XXX: could use better wording.
88    * XXX: repurposed to also expect a "request full set" message, should be renamed
89    *
90    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
91    */
92   PHASE_EXPECT_IBF,
93
94   /**
95    * Continuation for multi part IBFs.
96    */
97   PHASE_EXPECT_IBF_CONT,
98
99   /**
100    * We are decoding an IBF.
101    */
102   PHASE_INVENTORY_ACTIVE,
103
104   /**
105    * The other peer is decoding the IBF we just sent.
106    */
107   PHASE_INVENTORY_PASSIVE,
108
109   /**
110    * The protocol is almost finished, but we still have to flush our message
111    * queue and/or expect some elements.
112    */
113   PHASE_FINISH_CLOSING,
114
115   /**
116    * In the penultimate phase,
117    * we wait until all our demands
118    * are satisfied.  Then we send a done
119    * message, and wait for another done message.
120    */
121   PHASE_FINISH_WAITING,
122
123   /**
124    * In the ultimate phase, we wait until
125    * our demands are satisfied and then
126    * quit (sending another DONE message).
127    */
128   PHASE_DONE,
129
130   /**
131    * After sending the full set, wait for responses with the elements
132    * that the local peer is missing.
133    */
134   PHASE_FULL_SENDING,
135 };
136
137
138 /**
139  * State of an evaluate operation with another peer.
140  */
141 struct OperationState {
142   /**
143    * Copy of the set's strata estimator at the time of
144    * creation of this operation.
145    */
146   struct StrataEstimator *se;
147
148   /**
149    * The IBF we currently receive.
150    */
151   struct InvertibleBloomFilter *remote_ibf;
152
153   /**
154    * The IBF with the local set's element.
155    */
156   struct InvertibleBloomFilter *local_ibf;
157
158   /**
159    * Maps unsalted IBF-Keys to elements.
160    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
161    * Colliding IBF-Keys are linked.
162    */
163   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
164
165   /**
166    * Current state of the operation.
167    */
168   enum UnionOperationPhase phase;
169
170   /**
171    * Did we send the client that we are done?
172    */
173   int client_done_sent;
174
175   /**
176    * Number of ibf buckets already received into the @a remote_ibf.
177    */
178   unsigned int ibf_buckets_received;
179
180   /**
181    * Hashes for elements that we have demanded from the other peer.
182    */
183   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
184
185   /**
186    * Salt that we're using for sending IBFs
187    */
188   uint32_t salt_send;
189
190   /**
191    * Salt for the IBF we've received and that we're currently decoding.
192    */
193   uint32_t salt_receive;
194
195   /**
196    * Number of elements we received from the other peer
197    * that were not in the local set yet.
198    */
199   uint32_t received_fresh;
200
201   /**
202    * Total number of elements received from the other peer.
203    */
204   uint32_t received_total;
205
206   /**
207    * Initial size of our set, just before
208    * the operation started.
209    */
210   uint64_t initial_size;
211 };
212
213
214 /**
215  * The key entry is used to associate an ibf key with an element.
216  */
217 struct KeyEntry {
218   /**
219    * IBF key for the entry, derived from the current salt.
220    */
221   struct IBF_Key ibf_key;
222
223   /**
224    * The actual element associated with the key.
225    *
226    * Only owned by the union operation if element->operation
227    * is #GNUNET_YES.
228    */
229   struct ElementEntry *element;
230
231   /**
232    * Did we receive this element?
233    * Even if element->is_foreign is false, we might
234    * have received the element, so this indicates that
235    * the other peer has it.
236    */
237   int received;
238 };
239
240
241 /**
242  * Used as a closure for sending elements
243  * with a specific IBF key.
244  */
245 struct SendElementClosure {
246   /**
247    * The IBF key whose matching elements should be
248    * sent.
249    */
250   struct IBF_Key ibf_key;
251
252   /**
253    * Operation for which the elements
254    * should be sent.
255    */
256   struct Operation *op;
257 };
258
259
260 /**
261  * Extra state required for efficient set union.
262  */
263 struct SetState {
264   /**
265    * The strata estimator is only generated once for
266    * each set.
267    * The IBF keys are derived from the element hashes with
268    * salt=0.
269    */
270   struct StrataEstimator *se;
271 };
272
273
274 /**
275  * Iterator over hash map entries, called to
276  * destroy the linked list of colliding ibf key entries.
277  *
278  * @param cls closure
279  * @param key current key code
280  * @param value value in the hash map
281  * @return #GNUNET_YES if we should continue to iterate,
282  *         #GNUNET_NO if not.
283  */
284 static int
285 destroy_key_to_element_iter(void *cls,
286                             uint32_t key,
287                             void *value)
288 {
289   struct KeyEntry *k = value;
290
291   GNUNET_assert(NULL != k);
292   if (GNUNET_YES == k->element->remote)
293     {
294       GNUNET_free(k->element);
295       k->element = NULL;
296     }
297   GNUNET_free(k);
298   return GNUNET_YES;
299 }
300
301
302 /**
303  * Destroy the union operation.  Only things specific to the union
304  * operation are destroyed.
305  *
306  * @param op union operation to destroy
307  */
308 static void
309 union_op_cancel(struct Operation *op)
310 {
311   LOG(GNUNET_ERROR_TYPE_DEBUG,
312       "destroying union op\n");
313   /* check if the op was canceled twice */
314   GNUNET_assert(NULL != op->state);
315   if (NULL != op->state->remote_ibf)
316     {
317       ibf_destroy(op->state->remote_ibf);
318       op->state->remote_ibf = NULL;
319     }
320   if (NULL != op->state->demanded_hashes)
321     {
322       GNUNET_CONTAINER_multihashmap_destroy(op->state->demanded_hashes);
323       op->state->demanded_hashes = NULL;
324     }
325   if (NULL != op->state->local_ibf)
326     {
327       ibf_destroy(op->state->local_ibf);
328       op->state->local_ibf = NULL;
329     }
330   if (NULL != op->state->se)
331     {
332       strata_estimator_destroy(op->state->se);
333       op->state->se = NULL;
334     }
335   if (NULL != op->state->key_to_element)
336     {
337       GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element,
338                                               &destroy_key_to_element_iter,
339                                               NULL);
340       GNUNET_CONTAINER_multihashmap32_destroy(op->state->key_to_element);
341       op->state->key_to_element = NULL;
342     }
343   GNUNET_free(op->state);
344   op->state = NULL;
345   LOG(GNUNET_ERROR_TYPE_DEBUG,
346       "destroying union op done\n");
347 }
348
349
350 /**
351  * Inform the client that the union operation has failed,
352  * and proceed to destroy the evaluate operation.
353  *
354  * @param op the union operation to fail
355  */
356 static void
357 fail_union_operation(struct Operation *op)
358 {
359   struct GNUNET_MQ_Envelope *ev;
360   struct GNUNET_SET_ResultMessage *msg;
361
362   LOG(GNUNET_ERROR_TYPE_WARNING,
363       "union operation failed\n");
364   ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
365   msg->result_status = htons(GNUNET_SET_STATUS_FAILURE);
366   msg->request_id = htonl(op->client_request_id);
367   msg->element_type = htons(0);
368   GNUNET_MQ_send(op->set->cs->mq,
369                  ev);
370   _GSS_operation_destroy(op, GNUNET_YES);
371 }
372
373
374 /**
375  * Derive the IBF key from a hash code and
376  * a salt.
377  *
378  * @param src the hash code
379  * @return the derived IBF key
380  */
381 static struct IBF_Key
382 get_ibf_key(const struct GNUNET_HashCode *src)
383 {
384   struct IBF_Key key;
385   uint16_t salt = 0;
386
387   GNUNET_assert(GNUNET_OK ==
388                 GNUNET_CRYPTO_kdf(&key, sizeof(key),
389                                   src, sizeof *src,
390                                   &salt, sizeof(salt),
391                                   NULL, 0));
392   return key;
393 }
394
395
396 /**
397  * Context for #op_get_element_iterator
398  */
399 struct GetElementContext {
400   /**
401    * FIXME.
402    */
403   struct GNUNET_HashCode hash;
404
405   /**
406    * FIXME.
407    */
408   struct KeyEntry *k;
409 };
410
411
412 /**
413  * Iterator over the mapping from IBF keys to element entries.  Checks if we
414  * have an element with a given GNUNET_HashCode.
415  *
416  * @param cls closure
417  * @param key current key code
418  * @param value value in the hash map
419  * @return #GNUNET_YES if we should search further,
420  *         #GNUNET_NO if we've found the element.
421  */
422 static int
423 op_get_element_iterator(void *cls,
424                         uint32_t key,
425                         void *value)
426 {
427   struct GetElementContext *ctx = cls;
428   struct KeyEntry *k = value;
429
430   GNUNET_assert(NULL != k);
431   if (0 == GNUNET_CRYPTO_hash_cmp(&k->element->element_hash,
432                                   &ctx->hash))
433     {
434       ctx->k = k;
435       return GNUNET_NO;
436     }
437   return GNUNET_YES;
438 }
439
440
441 /**
442  * Determine whether the given element is already in the operation's element
443  * set.
444  *
445  * @param op operation that should be tested for 'element_hash'
446  * @param element_hash hash of the element to look for
447  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
448  */
449 static struct KeyEntry *
450 op_get_element(struct Operation *op,
451                const struct GNUNET_HashCode *element_hash)
452 {
453   int ret;
454   struct IBF_Key ibf_key;
455   struct GetElementContext ctx = { { { 0 } }, 0 };
456
457   ctx.hash = *element_hash;
458
459   ibf_key = get_ibf_key(element_hash);
460   ret = GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element,
461                                                      (uint32_t)ibf_key.key_val,
462                                                      op_get_element_iterator,
463                                                      &ctx);
464
465   /* was the iteration aborted because we found the element? */
466   if (GNUNET_SYSERR == ret)
467     {
468       GNUNET_assert(NULL != ctx.k);
469       return ctx.k;
470     }
471   return NULL;
472 }
473
474
475 /**
476  * Insert an element into the union operation's
477  * key-to-element mapping. Takes ownership of 'ee'.
478  * Note that this does not insert the element in the set,
479  * only in the operation's key-element mapping.
480  * This is done to speed up re-tried operations, if some elements
481  * were transmitted, and then the IBF fails to decode.
482  *
483  * XXX: clarify ownership, doesn't sound right.
484  *
485  * @param op the union operation
486  * @param ee the element entry
487  * @parem received was this element received from the remote peer?
488  */
489 static void
490 op_register_element(struct Operation *op,
491                     struct ElementEntry *ee,
492                     int received)
493 {
494   struct IBF_Key ibf_key;
495   struct KeyEntry *k;
496
497   ibf_key = get_ibf_key(&ee->element_hash);
498   k = GNUNET_new(struct KeyEntry);
499   k->element = ee;
500   k->ibf_key = ibf_key;
501   k->received = received;
502   GNUNET_assert(GNUNET_OK ==
503                 GNUNET_CONTAINER_multihashmap32_put(op->state->key_to_element,
504                                                     (uint32_t)ibf_key.key_val,
505                                                     k,
506                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
507 }
508
509
510 /**
511  * FIXME.
512  */
513 static void
514 salt_key(const struct IBF_Key *k_in,
515          uint32_t salt,
516          struct IBF_Key *k_out)
517 {
518   int s = salt % 64;
519   uint64_t x = k_in->key_val;
520
521   /* rotate ibf key */
522   x = (x >> s) | (x << (64 - s));
523   k_out->key_val = x;
524 }
525
526
527 /**
528  * FIXME.
529  */
530 static void
531 unsalt_key(const struct IBF_Key *k_in,
532            uint32_t salt,
533            struct IBF_Key *k_out)
534 {
535   int s = salt % 64;
536   uint64_t x = k_in->key_val;
537
538   x = (x << s) | (x >> (64 - s));
539   k_out->key_val = x;
540 }
541
542
543 /**
544  * Insert a key into an ibf.
545  *
546  * @param cls the ibf
547  * @param key unused
548  * @param value the key entry to get the key from
549  */
550 static int
551 prepare_ibf_iterator(void *cls,
552                      uint32_t key,
553                      void *value)
554 {
555   struct Operation *op = cls;
556   struct KeyEntry *ke = value;
557   struct IBF_Key salted_key;
558
559   LOG(GNUNET_ERROR_TYPE_DEBUG,
560       "[OP %x] inserting %lx (hash %s) into ibf\n",
561       (void *)op,
562       (unsigned long)ke->ibf_key.key_val,
563       GNUNET_h2s(&ke->element->element_hash));
564   salt_key(&ke->ibf_key,
565            op->state->salt_send,
566            &salted_key);
567   ibf_insert(op->state->local_ibf, salted_key);
568   return GNUNET_YES;
569 }
570
571
572 /**
573  * Iterator for initializing the
574  * key-to-element mapping of a union operation
575  *
576  * @param cls the union operation `struct Operation *`
577  * @param key unused
578  * @param value the `struct ElementEntry *` to insert
579  *        into the key-to-element mapping
580  * @return #GNUNET_YES (to continue iterating)
581  */
582 static int
583 init_key_to_element_iterator(void *cls,
584                              const struct GNUNET_HashCode *key,
585                              void *value)
586 {
587   struct Operation *op = cls;
588   struct ElementEntry *ee = value;
589
590   /* make sure that the element belongs to the set at the time
591    * of creating the operation */
592   if (GNUNET_NO ==
593       _GSS_is_element_of_operation(ee,
594                                    op))
595     return GNUNET_YES;
596   GNUNET_assert(GNUNET_NO == ee->remote);
597   op_register_element(op,
598                       ee,
599                       GNUNET_NO);
600   return GNUNET_YES;
601 }
602
603
604 /**
605  * Initialize the IBF key to element mapping local to this set
606  * operation.
607  *
608  * @param op the set union operation
609  */
610 static void
611 initialize_key_to_element(struct Operation *op)
612 {
613   unsigned int len;
614
615   GNUNET_assert(NULL == op->state->key_to_element);
616   len = GNUNET_CONTAINER_multihashmap_size(op->set->content->elements);
617   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create(len + 1);
618   GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements,
619                                         &init_key_to_element_iterator,
620                                         op);
621 }
622
623
624 /**
625  * Create an ibf with the operation's elements
626  * of the specified size
627  *
628  * @param op the union operation
629  * @param size size of the ibf to create
630  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
631  */
632 static int
633 prepare_ibf(struct Operation *op,
634             uint32_t size)
635 {
636   GNUNET_assert(NULL != op->state->key_to_element);
637
638   if (NULL != op->state->local_ibf)
639     ibf_destroy(op->state->local_ibf);
640   op->state->local_ibf = ibf_create(size, SE_IBF_HASH_NUM);
641   if (NULL == op->state->local_ibf)
642     {
643       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
644                  "Failed to allocate local IBF\n");
645       return GNUNET_SYSERR;
646     }
647   GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element,
648                                           &prepare_ibf_iterator,
649                                           op);
650   return GNUNET_OK;
651 }
652
653
654 /**
655  * Send an ibf of appropriate size.
656  *
657  * Fragments the IBF into multiple messages if necessary.
658  *
659  * @param op the union operation
660  * @param ibf_order order of the ibf to send, size=2^order
661  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
662  */
663 static int
664 send_ibf(struct Operation *op,
665          uint16_t ibf_order)
666 {
667   unsigned int buckets_sent = 0;
668   struct InvertibleBloomFilter *ibf;
669
670   if (GNUNET_OK !=
671       prepare_ibf(op, 1 << ibf_order))
672     {
673       /* allocation failed */
674       return GNUNET_SYSERR;
675     }
676
677   LOG(GNUNET_ERROR_TYPE_DEBUG,
678       "sending ibf of size %u\n",
679       1 << ibf_order);
680
681   {
682     char name[64] = { 0 };
683     snprintf(name, sizeof(name), "# sent IBF (order %u)", ibf_order);
684     GNUNET_STATISTICS_update(_GSS_statistics, name, 1, GNUNET_NO);
685   }
686
687   ibf = op->state->local_ibf;
688
689   while (buckets_sent < (1 << ibf_order))
690     {
691       unsigned int buckets_in_message;
692       struct GNUNET_MQ_Envelope *ev;
693       struct IBFMessage *msg;
694
695       buckets_in_message = (1 << ibf_order) - buckets_sent;
696       /* limit to maximum */
697       if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
698         buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
699
700       ev = GNUNET_MQ_msg_extra(msg,
701                                buckets_in_message * IBF_BUCKET_SIZE,
702                                GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
703       msg->reserved1 = 0;
704       msg->reserved2 = 0;
705       msg->order = ibf_order;
706       msg->offset = htonl(buckets_sent);
707       msg->salt = htonl(op->state->salt_send);
708       ibf_write_slice(ibf, buckets_sent,
709                       buckets_in_message, &msg[1]);
710       buckets_sent += buckets_in_message;
711       LOG(GNUNET_ERROR_TYPE_DEBUG,
712           "ibf chunk size %u, %u/%u sent\n",
713           buckets_in_message,
714           buckets_sent,
715           1 << ibf_order);
716       GNUNET_MQ_send(op->mq, ev);
717     }
718
719   /* The other peer must decode the IBF, so
720    * we're passive. */
721   op->state->phase = PHASE_INVENTORY_PASSIVE;
722   return GNUNET_OK;
723 }
724
725
726 /**
727  * Compute the necessary order of an ibf
728  * from the size of the symmetric set difference.
729  *
730  * @param diff the difference
731  * @return the required size of the ibf
732  */
733 static unsigned int
734 get_order_from_difference(unsigned int diff)
735 {
736   unsigned int ibf_order;
737
738   ibf_order = 2;
739   while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
740           ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
741          (ibf_order < MAX_IBF_ORDER))
742     ibf_order++;
743   // add one for correction
744   return ibf_order + 1;
745 }
746
747
748 /**
749  * Send a set element.
750  *
751  * @param cls the union operation `struct Operation *`
752  * @param key unused
753  * @param value the `struct ElementEntry *` to insert
754  *        into the key-to-element mapping
755  * @return #GNUNET_YES (to continue iterating)
756  */
757 static int
758 send_full_element_iterator(void *cls,
759                            const struct GNUNET_HashCode *key,
760                            void *value)
761 {
762   struct Operation *op = cls;
763   struct GNUNET_SET_ElementMessage *emsg;
764   struct ElementEntry *ee = value;
765   struct GNUNET_SET_Element *el = &ee->element;
766   struct GNUNET_MQ_Envelope *ev;
767
768   LOG(GNUNET_ERROR_TYPE_DEBUG,
769       "Sending element %s\n",
770       GNUNET_h2s(key));
771   ev = GNUNET_MQ_msg_extra(emsg,
772                            el->size,
773                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
774   emsg->element_type = htons(el->element_type);
775   GNUNET_memcpy(&emsg[1],
776                 el->data,
777                 el->size);
778   GNUNET_MQ_send(op->mq,
779                  ev);
780   return GNUNET_YES;
781 }
782
783
784 /**
785  * Switch to full set transmission for @a op.
786  *
787  * @param op operation to switch to full set transmission.
788  */
789 static void
790 send_full_set(struct Operation *op)
791 {
792   struct GNUNET_MQ_Envelope *ev;
793
794   op->state->phase = PHASE_FULL_SENDING;
795   LOG(GNUNET_ERROR_TYPE_DEBUG,
796       "Dedicing to transmit the full set\n");
797   /* FIXME: use a more memory-friendly way of doing this with an
798      iterator, just as we do in the non-full case! */
799   (void)GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements,
800                                               &send_full_element_iterator,
801                                               op);
802   ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
803   GNUNET_MQ_send(op->mq,
804                  ev);
805 }
806
807
808 /**
809  * Handle a strata estimator from a remote peer
810  *
811  * @param cls the union operation
812  * @param msg the message
813  */
814 int
815 check_union_p2p_strata_estimator(void *cls,
816                                  const struct StrataEstimatorMessage *msg)
817 {
818   struct Operation *op = cls;
819   int is_compressed;
820   size_t len;
821
822   if (op->state->phase != PHASE_EXPECT_SE)
823     {
824       GNUNET_break(0);
825       return GNUNET_SYSERR;
826     }
827   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type));
828   len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage);
829   if ((GNUNET_NO == is_compressed) &&
830       (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
831     {
832       GNUNET_break(0);
833       return GNUNET_SYSERR;
834     }
835   return GNUNET_OK;
836 }
837
838
839 /**
840  * Handle a strata estimator from a remote peer
841  *
842  * @param cls the union operation
843  * @param msg the message
844  */
845 void
846 handle_union_p2p_strata_estimator(void *cls,
847                                   const struct StrataEstimatorMessage *msg)
848 {
849   struct Operation *op = cls;
850   struct StrataEstimator *remote_se;
851   unsigned int diff;
852   uint64_t other_size;
853   size_t len;
854   int is_compressed;
855
856   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type));
857   GNUNET_STATISTICS_update(_GSS_statistics,
858                            "# bytes of SE received",
859                            ntohs(msg->header.size),
860                            GNUNET_NO);
861   len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage);
862   other_size = GNUNET_ntohll(msg->set_size);
863   remote_se = strata_estimator_create(SE_STRATA_COUNT,
864                                       SE_IBF_SIZE,
865                                       SE_IBF_HASH_NUM);
866   if (NULL == remote_se)
867     {
868       /* insufficient resources, fail */
869       fail_union_operation(op);
870       return;
871     }
872   if (GNUNET_OK !=
873       strata_estimator_read(&msg[1],
874                             len,
875                             is_compressed,
876                             remote_se))
877     {
878       /* decompression failed */
879       strata_estimator_destroy(remote_se);
880       fail_union_operation(op);
881       return;
882     }
883   GNUNET_assert(NULL != op->state->se);
884   diff = strata_estimator_difference(remote_se,
885                                      op->state->se);
886
887   if (diff > 200)
888     diff = diff * 3 / 2;
889
890   strata_estimator_destroy(remote_se);
891   strata_estimator_destroy(op->state->se);
892   op->state->se = NULL;
893   LOG(GNUNET_ERROR_TYPE_DEBUG,
894       "got se diff=%d, using ibf size %d\n",
895       diff,
896       1U << get_order_from_difference(diff));
897
898   {
899     char *set_debug;
900
901     set_debug = getenv("GNUNET_SET_BENCHMARK");
902     if ((NULL != set_debug) &&
903         (0 == strcmp(set_debug, "1")))
904       {
905         FILE *f = fopen("set.log", "a");
906         fprintf(f, "%llu\n", (unsigned long long)diff);
907         fclose(f);
908       }
909   }
910
911   if ((GNUNET_YES == op->byzantine) &&
912       (other_size < op->byzantine_lower_bound))
913     {
914       GNUNET_break(0);
915       fail_union_operation(op);
916       return;
917     }
918
919   if ((GNUNET_YES == op->force_full) ||
920       (diff > op->state->initial_size / 4) ||
921       (0 == other_size))
922     {
923       LOG(GNUNET_ERROR_TYPE_DEBUG,
924           "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
925           diff,
926           op->state->initial_size);
927       GNUNET_STATISTICS_update(_GSS_statistics,
928                                "# of full sends",
929                                1,
930                                GNUNET_NO);
931       if ((op->state->initial_size <= other_size) ||
932           (0 == other_size))
933         {
934           send_full_set(op);
935         }
936       else
937         {
938           struct GNUNET_MQ_Envelope *ev;
939
940           LOG(GNUNET_ERROR_TYPE_DEBUG,
941               "Telling other peer that we expect its full set\n");
942           op->state->phase = PHASE_EXPECT_IBF;
943           ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
944           GNUNET_MQ_send(op->mq,
945                          ev);
946         }
947     }
948   else
949     {
950       GNUNET_STATISTICS_update(_GSS_statistics,
951                                "# of ibf sends",
952                                1,
953                                GNUNET_NO);
954       if (GNUNET_OK !=
955           send_ibf(op,
956                    get_order_from_difference(diff)))
957         {
958           /* Internal error, best we can do is shut the connection */
959           GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
960                      "Failed to send IBF, closing connection\n");
961           fail_union_operation(op);
962           return;
963         }
964     }
965   GNUNET_CADET_receive_done(op->channel);
966 }
967
968
969 /**
970  * Iterator to send elements to a remote peer
971  *
972  * @param cls closure with the element key and the union operation
973  * @param key ignored
974  * @param value the key entry
975  */
976 static int
977 send_offers_iterator(void *cls,
978                      uint32_t key,
979                      void *value)
980 {
981   struct SendElementClosure *sec = cls;
982   struct Operation *op = sec->op;
983   struct KeyEntry *ke = value;
984   struct GNUNET_MQ_Envelope *ev;
985   struct GNUNET_MessageHeader *mh;
986
987   /* Detect 32-bit key collision for the 64-bit IBF keys. */
988   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
989     return GNUNET_YES;
990
991   ev = GNUNET_MQ_msg_header_extra(mh,
992                                   sizeof(struct GNUNET_HashCode),
993                                   GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
994
995   GNUNET_assert(NULL != ev);
996   *(struct GNUNET_HashCode *)&mh[1] = ke->element->element_hash;
997   LOG(GNUNET_ERROR_TYPE_DEBUG,
998       "[OP %x] sending element offer (%s) to peer\n",
999       (void *)op,
1000       GNUNET_h2s(&ke->element->element_hash));
1001   GNUNET_MQ_send(op->mq, ev);
1002   return GNUNET_YES;
1003 }
1004
1005
1006 /**
1007  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1008  *
1009  * @param op union operation
1010  * @param ibf_key IBF key of interest
1011  */
1012 static void
1013 send_offers_for_key(struct Operation *op,
1014                     struct IBF_Key ibf_key)
1015 {
1016   struct SendElementClosure send_cls;
1017
1018   send_cls.ibf_key = ibf_key;
1019   send_cls.op = op;
1020   (void)GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element,
1021                                                      (uint32_t)ibf_key.key_val,
1022                                                      &send_offers_iterator,
1023                                                      &send_cls);
1024 }
1025
1026
1027 /**
1028  * Decode which elements are missing on each side, and
1029  * send the appropriate offers and inquiries.
1030  *
1031  * @param op union operation
1032  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1033  */
1034 static int
1035 decode_and_send(struct Operation *op)
1036 {
1037   struct IBF_Key key;
1038   struct IBF_Key last_key;
1039   int side;
1040   unsigned int num_decoded;
1041   struct InvertibleBloomFilter *diff_ibf;
1042
1043   GNUNET_assert(PHASE_INVENTORY_ACTIVE == op->state->phase);
1044
1045   if (GNUNET_OK !=
1046       prepare_ibf(op,
1047                   op->state->remote_ibf->size))
1048     {
1049       GNUNET_break(0);
1050       /* allocation failed */
1051       return GNUNET_SYSERR;
1052     }
1053   diff_ibf = ibf_dup(op->state->local_ibf);
1054   ibf_subtract(diff_ibf,
1055                op->state->remote_ibf);
1056
1057   ibf_destroy(op->state->remote_ibf);
1058   op->state->remote_ibf = NULL;
1059
1060   LOG(GNUNET_ERROR_TYPE_DEBUG,
1061       "decoding IBF (size=%u)\n",
1062       diff_ibf->size);
1063
1064   num_decoded = 0;
1065   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1066
1067   while (1)
1068     {
1069       int res;
1070       int cycle_detected = GNUNET_NO;
1071
1072       last_key = key;
1073
1074       res = ibf_decode(diff_ibf, &side, &key);
1075       if (res == GNUNET_OK)
1076         {
1077           LOG(GNUNET_ERROR_TYPE_DEBUG,
1078               "decoded ibf key %lx\n",
1079               (unsigned long)key.key_val);
1080           num_decoded += 1;
1081           if ((num_decoded > diff_ibf->size) ||
1082               ((num_decoded > 1) &&
1083                (last_key.key_val == key.key_val)))
1084             {
1085               LOG(GNUNET_ERROR_TYPE_DEBUG,
1086                   "detected cyclic ibf (decoded %u/%u)\n",
1087                   num_decoded,
1088                   diff_ibf->size);
1089               cycle_detected = GNUNET_YES;
1090             }
1091         }
1092       if ((GNUNET_SYSERR == res) ||
1093           (GNUNET_YES == cycle_detected))
1094         {
1095           int next_order;
1096           next_order = 0;
1097           while (1 << next_order < diff_ibf->size)
1098             next_order++;
1099           next_order++;
1100           if (next_order <= MAX_IBF_ORDER)
1101             {
1102               LOG(GNUNET_ERROR_TYPE_DEBUG,
1103                   "decoding failed, sending larger ibf (size %u)\n",
1104                   1 << next_order);
1105               GNUNET_STATISTICS_update(_GSS_statistics,
1106                                        "# of IBF retries",
1107                                        1,
1108                                        GNUNET_NO);
1109               op->state->salt_send++;
1110               if (GNUNET_OK !=
1111                   send_ibf(op, next_order))
1112                 {
1113                   /* Internal error, best we can do is shut the connection */
1114                   GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1115                              "Failed to send IBF, closing connection\n");
1116                   fail_union_operation(op);
1117                   ibf_destroy(diff_ibf);
1118                   return GNUNET_SYSERR;
1119                 }
1120             }
1121           else
1122             {
1123               GNUNET_STATISTICS_update(_GSS_statistics,
1124                                        "# of failed union operations (too large)",
1125                                        1,
1126                                        GNUNET_NO);
1127               // XXX: Send the whole set, element-by-element
1128               LOG(GNUNET_ERROR_TYPE_ERROR,
1129                   "set union failed: reached ibf limit\n");
1130               fail_union_operation(op);
1131               ibf_destroy(diff_ibf);
1132               return GNUNET_SYSERR;
1133             }
1134           break;
1135         }
1136       if (GNUNET_NO == res)
1137         {
1138           struct GNUNET_MQ_Envelope *ev;
1139
1140           LOG(GNUNET_ERROR_TYPE_DEBUG,
1141               "transmitted all values, sending DONE\n");
1142           ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1143           GNUNET_MQ_send(op->mq, ev);
1144           /* We now wait until we get a DONE message back
1145            * and then wait for our MQ to be flushed and all our
1146            * demands be delivered. */
1147           break;
1148         }
1149       if (1 == side)
1150         {
1151           struct IBF_Key unsalted_key;
1152
1153           unsalt_key(&key,
1154                      op->state->salt_receive,
1155                      &unsalted_key);
1156           send_offers_for_key(op,
1157                               unsalted_key);
1158         }
1159       else if (-1 == side)
1160         {
1161           struct GNUNET_MQ_Envelope *ev;
1162           struct InquiryMessage *msg;
1163
1164           /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1165            * the effort additional complexity. */
1166           ev = GNUNET_MQ_msg_extra(msg,
1167                                    sizeof(struct IBF_Key),
1168                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1169           msg->salt = htonl(op->state->salt_receive);
1170           GNUNET_memcpy(&msg[1],
1171                         &key,
1172                         sizeof(struct IBF_Key));
1173           LOG(GNUNET_ERROR_TYPE_DEBUG,
1174               "sending element inquiry for IBF key %lx\n",
1175               (unsigned long)key.key_val);
1176           GNUNET_MQ_send(op->mq, ev);
1177         }
1178       else
1179         {
1180           GNUNET_assert(0);
1181         }
1182     }
1183   ibf_destroy(diff_ibf);
1184   return GNUNET_OK;
1185 }
1186
1187
1188 /**
1189  * Check an IBF message from a remote peer.
1190  *
1191  * Reassemble the IBF from multiple pieces, and
1192  * process the whole IBF once possible.
1193  *
1194  * @param cls the union operation
1195  * @param msg the header of the message
1196  * @return #GNUNET_OK if @a msg is well-formed
1197  */
1198 int
1199 check_union_p2p_ibf(void *cls,
1200                     const struct IBFMessage *msg)
1201 {
1202   struct Operation *op = cls;
1203   unsigned int buckets_in_message;
1204
1205   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1206     {
1207       GNUNET_break_op(0);
1208       return GNUNET_SYSERR;
1209     }
1210   buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1211   if (0 == buckets_in_message)
1212     {
1213       GNUNET_break_op(0);
1214       return GNUNET_SYSERR;
1215     }
1216   if ((ntohs(msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1217     {
1218       GNUNET_break_op(0);
1219       return GNUNET_SYSERR;
1220     }
1221   if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1222     {
1223       if (ntohl(msg->offset) != op->state->ibf_buckets_received)
1224         {
1225           GNUNET_break_op(0);
1226           return GNUNET_SYSERR;
1227         }
1228       if (1 << msg->order != op->state->remote_ibf->size)
1229         {
1230           GNUNET_break_op(0);
1231           return GNUNET_SYSERR;
1232         }
1233       if (ntohl(msg->salt) != op->state->salt_receive)
1234         {
1235           GNUNET_break_op(0);
1236           return GNUNET_SYSERR;
1237         }
1238     }
1239   else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1240            (op->state->phase != PHASE_EXPECT_IBF))
1241     {
1242       GNUNET_break_op(0);
1243       return GNUNET_SYSERR;
1244     }
1245
1246   return GNUNET_OK;
1247 }
1248
1249
1250 /**
1251  * Handle an IBF message from a remote peer.
1252  *
1253  * Reassemble the IBF from multiple pieces, and
1254  * process the whole IBF once possible.
1255  *
1256  * @param cls the union operation
1257  * @param msg the header of the message
1258  */
1259 void
1260 handle_union_p2p_ibf(void *cls,
1261                      const struct IBFMessage *msg)
1262 {
1263   struct Operation *op = cls;
1264   unsigned int buckets_in_message;
1265
1266   buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1267   if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1268       (op->state->phase == PHASE_EXPECT_IBF))
1269     {
1270       op->state->phase = PHASE_EXPECT_IBF_CONT;
1271       GNUNET_assert(NULL == op->state->remote_ibf);
1272       LOG(GNUNET_ERROR_TYPE_DEBUG,
1273           "Creating new ibf of size %u\n",
1274           1 << msg->order);
1275       op->state->remote_ibf = ibf_create(1 << msg->order, SE_IBF_HASH_NUM);
1276       op->state->salt_receive = ntohl(msg->salt);
1277       LOG(GNUNET_ERROR_TYPE_DEBUG,
1278           "Receiving new IBF with salt %u\n",
1279           op->state->salt_receive);
1280       if (NULL == op->state->remote_ibf)
1281         {
1282           GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1283                      "Failed to parse remote IBF, closing connection\n");
1284           fail_union_operation(op);
1285           return;
1286         }
1287       op->state->ibf_buckets_received = 0;
1288       if (0 != ntohl(msg->offset))
1289         {
1290           GNUNET_break_op(0);
1291           fail_union_operation(op);
1292           return;
1293         }
1294     }
1295   else
1296     {
1297       GNUNET_assert(op->state->phase == PHASE_EXPECT_IBF_CONT);
1298       LOG(GNUNET_ERROR_TYPE_DEBUG,
1299           "Received more of IBF\n");
1300     }
1301   GNUNET_assert(NULL != op->state->remote_ibf);
1302
1303   ibf_read_slice(&msg[1],
1304                  op->state->ibf_buckets_received,
1305                  buckets_in_message,
1306                  op->state->remote_ibf);
1307   op->state->ibf_buckets_received += buckets_in_message;
1308
1309   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1310     {
1311       LOG(GNUNET_ERROR_TYPE_DEBUG,
1312           "received full ibf\n");
1313       op->state->phase = PHASE_INVENTORY_ACTIVE;
1314       if (GNUNET_OK !=
1315           decode_and_send(op))
1316         {
1317           /* Internal error, best we can do is shut down */
1318           GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1319                      "Failed to decode IBF, closing connection\n");
1320           fail_union_operation(op);
1321           return;
1322         }
1323     }
1324   GNUNET_CADET_receive_done(op->channel);
1325 }
1326
1327
1328 /**
1329  * Send a result message to the client indicating
1330  * that there is a new element.
1331  *
1332  * @param op union operation
1333  * @param element element to send
1334  * @param status status to send with the new element
1335  */
1336 static void
1337 send_client_element(struct Operation *op,
1338                     struct GNUNET_SET_Element *element,
1339                     int status)
1340 {
1341   struct GNUNET_MQ_Envelope *ev;
1342   struct GNUNET_SET_ResultMessage *rm;
1343
1344   LOG(GNUNET_ERROR_TYPE_DEBUG,
1345       "sending element (size %u) to client\n",
1346       element->size);
1347   GNUNET_assert(0 != op->client_request_id);
1348   ev = GNUNET_MQ_msg_extra(rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1349   if (NULL == ev)
1350     {
1351       GNUNET_MQ_discard(ev);
1352       GNUNET_break(0);
1353       return;
1354     }
1355   rm->result_status = htons(status);
1356   rm->request_id = htonl(op->client_request_id);
1357   rm->element_type = htons(element->element_type);
1358   rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element));
1359   GNUNET_memcpy(&rm[1],
1360                 element->data,
1361                 element->size);
1362   GNUNET_MQ_send(op->set->cs->mq,
1363                  ev);
1364 }
1365
1366
1367 /**
1368  * Signal to the client that the operation has finished and
1369  * destroy the operation.
1370  *
1371  * @param cls operation to destroy
1372  */
1373 static void
1374 send_client_done(void *cls)
1375 {
1376   struct Operation *op = cls;
1377   struct GNUNET_MQ_Envelope *ev;
1378   struct GNUNET_SET_ResultMessage *rm;
1379
1380   if (GNUNET_YES == op->state->client_done_sent)
1381     {
1382       return;
1383     }
1384
1385   if (PHASE_DONE != op->state->phase)
1386     {
1387       LOG(GNUNET_ERROR_TYPE_WARNING,
1388           "Union operation failed\n");
1389       GNUNET_STATISTICS_update(_GSS_statistics,
1390                                "# Union operations failed",
1391                                1,
1392                                GNUNET_NO);
1393       ev = GNUNET_MQ_msg(rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1394       rm->result_status = htons(GNUNET_SET_STATUS_FAILURE);
1395       rm->request_id = htonl(op->client_request_id);
1396       rm->element_type = htons(0);
1397       GNUNET_MQ_send(op->set->cs->mq,
1398                      ev);
1399       return;
1400     }
1401
1402   op->state->client_done_sent = GNUNET_YES;
1403
1404   GNUNET_STATISTICS_update(_GSS_statistics,
1405                            "# Union operations succeeded",
1406                            1,
1407                            GNUNET_NO);
1408   LOG(GNUNET_ERROR_TYPE_INFO,
1409       "Signalling client that union operation is done\n");
1410   ev = GNUNET_MQ_msg(rm,
1411                      GNUNET_MESSAGE_TYPE_SET_RESULT);
1412   rm->request_id = htonl(op->client_request_id);
1413   rm->result_status = htons(GNUNET_SET_STATUS_DONE);
1414   rm->element_type = htons(0);
1415   rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element));
1416   GNUNET_MQ_send(op->set->cs->mq,
1417                  ev);
1418 }
1419
1420
1421 /**
1422  * Tests if the operation is finished, and if so notify.
1423  *
1424  * @param op operation to check
1425  */
1426 static void
1427 maybe_finish(struct Operation *op)
1428 {
1429   unsigned int num_demanded;
1430
1431   num_demanded = GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes);
1432
1433   if (PHASE_FINISH_WAITING == op->state->phase)
1434     {
1435       LOG(GNUNET_ERROR_TYPE_DEBUG,
1436           "In PHASE_FINISH_WAITING, pending %u demands\n",
1437           num_demanded);
1438       if (0 == num_demanded)
1439         {
1440           struct GNUNET_MQ_Envelope *ev;
1441
1442           op->state->phase = PHASE_DONE;
1443           ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1444           GNUNET_MQ_send(op->mq,
1445                          ev);
1446           /* We now wait until the other peer sends P2P_OVER
1447            * after it got all elements from us. */
1448         }
1449     }
1450   if (PHASE_FINISH_CLOSING == op->state->phase)
1451     {
1452       LOG(GNUNET_ERROR_TYPE_DEBUG,
1453           "In PHASE_FINISH_CLOSING, pending %u demands\n",
1454           num_demanded);
1455       if (0 == num_demanded)
1456         {
1457           op->state->phase = PHASE_DONE;
1458           send_client_done(op);
1459           _GSS_operation_destroy2(op);
1460         }
1461     }
1462 }
1463
1464
1465 /**
1466  * Check an element message from a remote peer.
1467  *
1468  * @param cls the union operation
1469  * @param emsg the message
1470  */
1471 int
1472 check_union_p2p_elements(void *cls,
1473                          const struct GNUNET_SET_ElementMessage *emsg)
1474 {
1475   struct Operation *op = cls;
1476
1477   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1478     {
1479       GNUNET_break_op(0);
1480       return GNUNET_SYSERR;
1481     }
1482   if (0 == GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes))
1483     {
1484       GNUNET_break_op(0);
1485       return GNUNET_SYSERR;
1486     }
1487   return GNUNET_OK;
1488 }
1489
1490
1491 /**
1492  * Handle an element message from a remote peer.
1493  * Sent by the other peer either because we decoded an IBF and placed a demand,
1494  * or because the other peer switched to full set transmission.
1495  *
1496  * @param cls the union operation
1497  * @param emsg the message
1498  */
1499 void
1500 handle_union_p2p_elements(void *cls,
1501                           const struct GNUNET_SET_ElementMessage *emsg)
1502 {
1503   struct Operation *op = cls;
1504   struct ElementEntry *ee;
1505   struct KeyEntry *ke;
1506   uint16_t element_size;
1507
1508   element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage);
1509   ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size);
1510   GNUNET_memcpy(&ee[1],
1511                 &emsg[1],
1512                 element_size);
1513   ee->element.size = element_size;
1514   ee->element.data = &ee[1];
1515   ee->element.element_type = ntohs(emsg->element_type);
1516   ee->remote = GNUNET_YES;
1517   GNUNET_SET_element_hash(&ee->element,
1518                           &ee->element_hash);
1519   if (GNUNET_NO ==
1520       GNUNET_CONTAINER_multihashmap_remove(op->state->demanded_hashes,
1521                                            &ee->element_hash,
1522                                            NULL))
1523     {
1524       /* We got something we didn't demand, since it's not in our map. */
1525       GNUNET_break_op(0);
1526       fail_union_operation(op);
1527       return;
1528     }
1529
1530   LOG(GNUNET_ERROR_TYPE_DEBUG,
1531       "Got element (size %u, hash %s) from peer\n",
1532       (unsigned int)element_size,
1533       GNUNET_h2s(&ee->element_hash));
1534
1535   GNUNET_STATISTICS_update(_GSS_statistics,
1536                            "# received elements",
1537                            1,
1538                            GNUNET_NO);
1539   GNUNET_STATISTICS_update(_GSS_statistics,
1540                            "# exchanged elements",
1541                            1,
1542                            GNUNET_NO);
1543
1544   op->state->received_total++;
1545
1546   ke = op_get_element(op, &ee->element_hash);
1547   if (NULL != ke)
1548     {
1549       /* Got repeated element.  Should not happen since
1550        * we track demands. */
1551       GNUNET_STATISTICS_update(_GSS_statistics,
1552                                "# repeated elements",
1553                                1,
1554                                GNUNET_NO);
1555       ke->received = GNUNET_YES;
1556       GNUNET_free(ee);
1557     }
1558   else
1559     {
1560       LOG(GNUNET_ERROR_TYPE_DEBUG,
1561           "Registering new element from remote peer\n");
1562       op->state->received_fresh++;
1563       op_register_element(op, ee, GNUNET_YES);
1564       /* only send results immediately if the client wants it */
1565       switch (op->result_mode)
1566         {
1567         case GNUNET_SET_RESULT_ADDED:
1568           send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK);
1569           break;
1570
1571         case GNUNET_SET_RESULT_SYMMETRIC:
1572           send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1573           break;
1574
1575         default:
1576           /* Result mode not supported, should have been caught earlier. */
1577           GNUNET_break(0);
1578           break;
1579         }
1580     }
1581
1582   if ((op->state->received_total > 8) &&
1583       (op->state->received_fresh < op->state->received_total / 3))
1584     {
1585       /* The other peer gave us lots of old elements, there's something wrong. */
1586       GNUNET_break_op(0);
1587       fail_union_operation(op);
1588       return;
1589     }
1590   GNUNET_CADET_receive_done(op->channel);
1591   maybe_finish(op);
1592 }
1593
1594
1595 /**
1596  * Check a full element message from a remote peer.
1597  *
1598  * @param cls the union operation
1599  * @param emsg the message
1600  */
1601 int
1602 check_union_p2p_full_element(void *cls,
1603                              const struct GNUNET_SET_ElementMessage *emsg)
1604 {
1605   struct Operation *op = cls;
1606
1607   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1608     {
1609       GNUNET_break_op(0);
1610       return GNUNET_SYSERR;
1611     }
1612   // FIXME: check that we expect full elements here?
1613   return GNUNET_OK;
1614 }
1615
1616
1617 /**
1618  * Handle an element message from a remote peer.
1619  *
1620  * @param cls the union operation
1621  * @param emsg the message
1622  */
1623 void
1624 handle_union_p2p_full_element(void *cls,
1625                               const struct GNUNET_SET_ElementMessage *emsg)
1626 {
1627   struct Operation *op = cls;
1628   struct ElementEntry *ee;
1629   struct KeyEntry *ke;
1630   uint16_t element_size;
1631
1632   element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage);
1633   ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size);
1634   GNUNET_memcpy(&ee[1], &emsg[1], element_size);
1635   ee->element.size = element_size;
1636   ee->element.data = &ee[1];
1637   ee->element.element_type = ntohs(emsg->element_type);
1638   ee->remote = GNUNET_YES;
1639   GNUNET_SET_element_hash(&ee->element, &ee->element_hash);
1640
1641   LOG(GNUNET_ERROR_TYPE_DEBUG,
1642       "Got element (full diff, size %u, hash %s) from peer\n",
1643       (unsigned int)element_size,
1644       GNUNET_h2s(&ee->element_hash));
1645
1646   GNUNET_STATISTICS_update(_GSS_statistics,
1647                            "# received elements",
1648                            1,
1649                            GNUNET_NO);
1650   GNUNET_STATISTICS_update(_GSS_statistics,
1651                            "# exchanged elements",
1652                            1,
1653                            GNUNET_NO);
1654
1655   op->state->received_total++;
1656
1657   ke = op_get_element(op, &ee->element_hash);
1658   if (NULL != ke)
1659     {
1660       /* Got repeated element.  Should not happen since
1661        * we track demands. */
1662       GNUNET_STATISTICS_update(_GSS_statistics,
1663                                "# repeated elements",
1664                                1,
1665                                GNUNET_NO);
1666       ke->received = GNUNET_YES;
1667       GNUNET_free(ee);
1668     }
1669   else
1670     {
1671       LOG(GNUNET_ERROR_TYPE_DEBUG,
1672           "Registering new element from remote peer\n");
1673       op->state->received_fresh++;
1674       op_register_element(op, ee, GNUNET_YES);
1675       /* only send results immediately if the client wants it */
1676       switch (op->result_mode)
1677         {
1678         case GNUNET_SET_RESULT_ADDED:
1679           send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK);
1680           break;
1681
1682         case GNUNET_SET_RESULT_SYMMETRIC:
1683           send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1684           break;
1685
1686         default:
1687           /* Result mode not supported, should have been caught earlier. */
1688           GNUNET_break(0);
1689           break;
1690         }
1691     }
1692
1693   if ((GNUNET_YES == op->byzantine) &&
1694       (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1695       (op->state->received_fresh < op->state->received_total / 6))
1696     {
1697       /* The other peer gave us lots of old elements, there's something wrong. */
1698       LOG(GNUNET_ERROR_TYPE_ERROR,
1699           "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1700           (unsigned long long)op->state->received_fresh,
1701           (unsigned long long)op->state->received_total);
1702       GNUNET_break_op(0);
1703       fail_union_operation(op);
1704       return;
1705     }
1706   GNUNET_CADET_receive_done(op->channel);
1707 }
1708
1709
1710 /**
1711  * Send offers (for GNUNET_Hash-es) in response
1712  * to inquiries (for IBF_Key-s).
1713  *
1714  * @param cls the union operation
1715  * @param msg the message
1716  */
1717 int
1718 check_union_p2p_inquiry(void *cls,
1719                         const struct InquiryMessage *msg)
1720 {
1721   struct Operation *op = cls;
1722   unsigned int num_keys;
1723
1724   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1725     {
1726       GNUNET_break_op(0);
1727       return GNUNET_SYSERR;
1728     }
1729   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1730     {
1731       GNUNET_break_op(0);
1732       return GNUNET_SYSERR;
1733     }
1734   num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1735              / sizeof(struct IBF_Key);
1736   if ((ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1737       != num_keys * sizeof(struct IBF_Key))
1738     {
1739       GNUNET_break_op(0);
1740       return GNUNET_SYSERR;
1741     }
1742   return GNUNET_OK;
1743 }
1744
1745
1746 /**
1747  * Send offers (for GNUNET_Hash-es) in response
1748  * to inquiries (for IBF_Key-s).
1749  *
1750  * @param cls the union operation
1751  * @param msg the message
1752  */
1753 void
1754 handle_union_p2p_inquiry(void *cls,
1755                          const struct InquiryMessage *msg)
1756 {
1757   struct Operation *op = cls;
1758   const struct IBF_Key *ibf_key;
1759   unsigned int num_keys;
1760
1761   LOG(GNUNET_ERROR_TYPE_DEBUG,
1762       "Received union inquiry\n");
1763   num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1764              / sizeof(struct IBF_Key);
1765   ibf_key = (const struct IBF_Key *)&msg[1];
1766   while (0 != num_keys--)
1767     {
1768       struct IBF_Key unsalted_key;
1769
1770       unsalt_key(ibf_key,
1771                  ntohl(msg->salt),
1772                  &unsalted_key);
1773       send_offers_for_key(op,
1774                           unsalted_key);
1775       ibf_key++;
1776     }
1777   GNUNET_CADET_receive_done(op->channel);
1778 }
1779
1780
1781 /**
1782  * Iterator over hash map entries, called to
1783  * destroy the linked list of colliding ibf key entries.
1784  *
1785  * @param cls closure
1786  * @param key current key code
1787  * @param value value in the hash map
1788  * @return #GNUNET_YES if we should continue to iterate,
1789  *         #GNUNET_NO if not.
1790  */
1791 static int
1792 send_missing_full_elements_iter(void *cls,
1793                                 uint32_t key,
1794                                 void *value)
1795 {
1796   struct Operation *op = cls;
1797   struct KeyEntry *ke = value;
1798   struct GNUNET_MQ_Envelope *ev;
1799   struct GNUNET_SET_ElementMessage *emsg;
1800   struct ElementEntry *ee = ke->element;
1801
1802   if (GNUNET_YES == ke->received)
1803     return GNUNET_YES;
1804   ev = GNUNET_MQ_msg_extra(emsg,
1805                            ee->element.size,
1806                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1807   GNUNET_memcpy(&emsg[1],
1808                 ee->element.data,
1809                 ee->element.size);
1810   emsg->element_type = htons(ee->element.element_type);
1811   GNUNET_MQ_send(op->mq,
1812                  ev);
1813   return GNUNET_YES;
1814 }
1815
1816
1817 /**
1818  * Handle a request for full set transmission.
1819  *
1820  * @parem cls closure, a set union operation
1821  * @param mh the demand message
1822  */
1823 void
1824 handle_union_p2p_request_full(void *cls,
1825                               const struct GNUNET_MessageHeader *mh)
1826 {
1827   struct Operation *op = cls;
1828
1829   LOG(GNUNET_ERROR_TYPE_DEBUG,
1830       "Received request for full set transmission\n");
1831   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1832     {
1833       GNUNET_break_op(0);
1834       fail_union_operation(op);
1835       return;
1836     }
1837   if (PHASE_EXPECT_IBF != op->state->phase)
1838     {
1839       GNUNET_break_op(0);
1840       fail_union_operation(op);
1841       return;
1842     }
1843
1844   // FIXME: we need to check that our set is larger than the
1845   // byzantine_lower_bound by some threshold
1846   send_full_set(op);
1847   GNUNET_CADET_receive_done(op->channel);
1848 }
1849
1850
1851 /**
1852  * Handle a "full done" message.
1853  *
1854  * @parem cls closure, a set union operation
1855  * @param mh the demand message
1856  */
1857 void
1858 handle_union_p2p_full_done(void *cls,
1859                            const struct GNUNET_MessageHeader *mh)
1860 {
1861   struct Operation *op = cls;
1862
1863   switch (op->state->phase)
1864     {
1865     case PHASE_EXPECT_IBF:
1866     {
1867       struct GNUNET_MQ_Envelope *ev;
1868
1869       LOG(GNUNET_ERROR_TYPE_DEBUG,
1870           "got FULL DONE, sending elements that other peer is missing\n");
1871
1872       /* send all the elements that did not come from the remote peer */
1873       GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element,
1874                                               &send_missing_full_elements_iter,
1875                                               op);
1876
1877       ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1878       GNUNET_MQ_send(op->mq,
1879                      ev);
1880       op->state->phase = PHASE_DONE;
1881       /* we now wait until the other peer sends us the OVER message*/
1882     }
1883     break;
1884
1885     case PHASE_FULL_SENDING:
1886     {
1887       LOG(GNUNET_ERROR_TYPE_DEBUG,
1888           "got FULL DONE, finishing\n");
1889       /* We sent the full set, and got the response for that.  We're done. */
1890       op->state->phase = PHASE_DONE;
1891       GNUNET_CADET_receive_done(op->channel);
1892       send_client_done(op);
1893       _GSS_operation_destroy2(op);
1894       return;
1895     }
1896     break;
1897
1898     default:
1899       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1900                  "Handle full done phase is %u\n",
1901                  (unsigned)op->state->phase);
1902       GNUNET_break_op(0);
1903       fail_union_operation(op);
1904       return;
1905     }
1906   GNUNET_CADET_receive_done(op->channel);
1907 }
1908
1909
1910 /**
1911  * Check a demand by the other peer for elements based on a list
1912  * of `struct GNUNET_HashCode`s.
1913  *
1914  * @parem cls closure, a set union operation
1915  * @param mh the demand message
1916  * @return #GNUNET_OK if @a mh is well-formed
1917  */
1918 int
1919 check_union_p2p_demand(void *cls,
1920                        const struct GNUNET_MessageHeader *mh)
1921 {
1922   struct Operation *op = cls;
1923   unsigned int num_hashes;
1924
1925   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1926     {
1927       GNUNET_break_op(0);
1928       return GNUNET_SYSERR;
1929     }
1930   num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1931                / sizeof(struct GNUNET_HashCode);
1932   if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1933       != num_hashes * sizeof(struct GNUNET_HashCode))
1934     {
1935       GNUNET_break_op(0);
1936       return GNUNET_SYSERR;
1937     }
1938   return GNUNET_OK;
1939 }
1940
1941
1942 /**
1943  * Handle a demand by the other peer for elements based on a list
1944  * of `struct GNUNET_HashCode`s.
1945  *
1946  * @parem cls closure, a set union operation
1947  * @param mh the demand message
1948  */
1949 void
1950 handle_union_p2p_demand(void *cls,
1951                         const struct GNUNET_MessageHeader *mh)
1952 {
1953   struct Operation *op = cls;
1954   struct ElementEntry *ee;
1955   struct GNUNET_SET_ElementMessage *emsg;
1956   const struct GNUNET_HashCode *hash;
1957   unsigned int num_hashes;
1958   struct GNUNET_MQ_Envelope *ev;
1959
1960   num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1961                / sizeof(struct GNUNET_HashCode);
1962   for (hash = (const struct GNUNET_HashCode *)&mh[1];
1963        num_hashes > 0;
1964        hash++, num_hashes--)
1965     {
1966       ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements,
1967                                              hash);
1968       if (NULL == ee)
1969         {
1970           /* Demand for non-existing element. */
1971           GNUNET_break_op(0);
1972           fail_union_operation(op);
1973           return;
1974         }
1975       if (GNUNET_NO == _GSS_is_element_of_operation(ee, op))
1976         {
1977           /* Probably confused lazily copied sets. */
1978           GNUNET_break_op(0);
1979           fail_union_operation(op);
1980           return;
1981         }
1982       ev = GNUNET_MQ_msg_extra(emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1983       GNUNET_memcpy(&emsg[1], ee->element.data, ee->element.size);
1984       emsg->reserved = htons(0);
1985       emsg->element_type = htons(ee->element.element_type);
1986       LOG(GNUNET_ERROR_TYPE_DEBUG,
1987           "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1988           (void *)op,
1989           (unsigned int)ee->element.size,
1990           GNUNET_h2s(&ee->element_hash));
1991       GNUNET_MQ_send(op->mq, ev);
1992       GNUNET_STATISTICS_update(_GSS_statistics,
1993                                "# exchanged elements",
1994                                1,
1995                                GNUNET_NO);
1996
1997       switch (op->result_mode)
1998         {
1999         case GNUNET_SET_RESULT_ADDED:
2000           /* Nothing to do. */
2001           break;
2002
2003         case GNUNET_SET_RESULT_SYMMETRIC:
2004           send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2005           break;
2006
2007         default:
2008           /* Result mode not supported, should have been caught earlier. */
2009           GNUNET_break(0);
2010           break;
2011         }
2012     }
2013   GNUNET_CADET_receive_done(op->channel);
2014 }
2015
2016
2017 /**
2018  * Check offer (of `struct GNUNET_HashCode`s).
2019  *
2020  * @param cls the union operation
2021  * @param mh the message
2022  * @return #GNUNET_OK if @a mh is well-formed
2023  */
2024 int
2025 check_union_p2p_offer(void *cls,
2026                       const struct GNUNET_MessageHeader *mh)
2027 {
2028   struct Operation *op = cls;
2029   unsigned int num_hashes;
2030
2031   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2032     {
2033       GNUNET_break_op(0);
2034       return GNUNET_SYSERR;
2035     }
2036   /* look up elements and send them */
2037   if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2038       (op->state->phase != PHASE_INVENTORY_ACTIVE))
2039     {
2040       GNUNET_break_op(0);
2041       return GNUNET_SYSERR;
2042     }
2043   num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
2044                / sizeof(struct GNUNET_HashCode);
2045   if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2046       num_hashes * sizeof(struct GNUNET_HashCode))
2047     {
2048       GNUNET_break_op(0);
2049       return GNUNET_SYSERR;
2050     }
2051   return GNUNET_OK;
2052 }
2053
2054
2055 /**
2056  * Handle offers (of `struct GNUNET_HashCode`s) and
2057  * respond with demands (of `struct GNUNET_HashCode`s).
2058  *
2059  * @param cls the union operation
2060  * @param mh the message
2061  */
2062 void
2063 handle_union_p2p_offer(void *cls,
2064                        const struct GNUNET_MessageHeader *mh)
2065 {
2066   struct Operation *op = cls;
2067   const struct GNUNET_HashCode *hash;
2068   unsigned int num_hashes;
2069
2070   num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
2071                / sizeof(struct GNUNET_HashCode);
2072   for (hash = (const struct GNUNET_HashCode *)&mh[1];
2073        num_hashes > 0;
2074        hash++, num_hashes--)
2075     {
2076       struct ElementEntry *ee;
2077       struct GNUNET_MessageHeader *demands;
2078       struct GNUNET_MQ_Envelope *ev;
2079
2080       ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements,
2081                                              hash);
2082       if (NULL != ee)
2083         if (GNUNET_YES == _GSS_is_element_of_operation(ee, op))
2084           continue;
2085
2086       if (GNUNET_YES ==
2087           GNUNET_CONTAINER_multihashmap_contains(op->state->demanded_hashes,
2088                                                  hash))
2089         {
2090           LOG(GNUNET_ERROR_TYPE_DEBUG,
2091               "Skipped sending duplicate demand\n");
2092           continue;
2093         }
2094
2095       GNUNET_assert(GNUNET_OK ==
2096                     GNUNET_CONTAINER_multihashmap_put(op->state->demanded_hashes,
2097                                                       hash,
2098                                                       NULL,
2099                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2100
2101       LOG(GNUNET_ERROR_TYPE_DEBUG,
2102           "[OP %x] Requesting element (hash %s)\n",
2103           (void *)op, GNUNET_h2s(hash));
2104       ev = GNUNET_MQ_msg_header_extra(demands,
2105                                       sizeof(struct GNUNET_HashCode),
2106                                       GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2107       GNUNET_memcpy(&demands[1],
2108                     hash,
2109                     sizeof(struct GNUNET_HashCode));
2110       GNUNET_MQ_send(op->mq, ev);
2111     }
2112   GNUNET_CADET_receive_done(op->channel);
2113 }
2114
2115
2116 /**
2117  * Handle a done message from a remote peer
2118  *
2119  * @param cls the union operation
2120  * @param mh the message
2121  */
2122 void
2123 handle_union_p2p_done(void *cls,
2124                       const struct GNUNET_MessageHeader *mh)
2125 {
2126   struct Operation *op = cls;
2127
2128   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2129     {
2130       GNUNET_break_op(0);
2131       fail_union_operation(op);
2132       return;
2133     }
2134   switch (op->state->phase)
2135     {
2136     case PHASE_INVENTORY_PASSIVE:
2137       /* We got all requests, but still have to send our elements in response. */
2138       op->state->phase = PHASE_FINISH_WAITING;
2139
2140       LOG(GNUNET_ERROR_TYPE_DEBUG,
2141           "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2142       /* The active peer is done sending offers
2143        * and inquiries.  This means that all
2144        * our responses to that (demands and offers)
2145        * must be in flight (queued or in mesh).
2146        *
2147        * We should notify the active peer once
2148        * all our demands are satisfied, so that the active
2149        * peer can quit if we gave it everything.
2150        */
2151       GNUNET_CADET_receive_done(op->channel);
2152       maybe_finish(op);
2153       return;
2154
2155     case PHASE_INVENTORY_ACTIVE:
2156       LOG(GNUNET_ERROR_TYPE_DEBUG,
2157           "got DONE (as active partner), waiting to finish\n");
2158       /* All demands of the other peer are satisfied,
2159        * and we processed all offers, thus we know
2160        * exactly what our demands must be.
2161        *
2162        * We'll close the channel
2163        * to the other peer once our demands are met.
2164        */
2165       op->state->phase = PHASE_FINISH_CLOSING;
2166       GNUNET_CADET_receive_done(op->channel);
2167       maybe_finish(op);
2168       return;
2169
2170     default:
2171       GNUNET_break_op(0);
2172       fail_union_operation(op);
2173       return;
2174     }
2175 }
2176
2177 /**
2178  * Handle a over message from a remote peer
2179  *
2180  * @param cls the union operation
2181  * @param mh the message
2182  */
2183 void
2184 handle_union_p2p_over(void *cls,
2185                       const struct GNUNET_MessageHeader *mh)
2186 {
2187   send_client_done(cls);
2188 }
2189
2190
2191 /**
2192  * Initiate operation to evaluate a set union with a remote peer.
2193  *
2194  * @param op operation to perform (to be initialized)
2195  * @param opaque_context message to be transmitted to the listener
2196  *        to convince it to accept, may be NULL
2197  */
2198 static struct OperationState *
2199 union_evaluate(struct Operation *op,
2200                const struct GNUNET_MessageHeader *opaque_context)
2201 {
2202   struct OperationState *state;
2203   struct GNUNET_MQ_Envelope *ev;
2204   struct OperationRequestMessage *msg;
2205
2206   ev = GNUNET_MQ_msg_nested_mh(msg,
2207                                GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2208                                opaque_context);
2209   if (NULL == ev)
2210     {
2211       /* the context message is too large */
2212       GNUNET_break(0);
2213       return NULL;
2214     }
2215   state = GNUNET_new(struct OperationState);
2216   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32,
2217                                                                 GNUNET_NO);
2218   /* copy the current generation's strata estimator for this operation */
2219   state->se = strata_estimator_dup(op->set->state->se);
2220   /* we started the operation, thus we have to send the operation request */
2221   state->phase = PHASE_EXPECT_SE;
2222   state->salt_receive = state->salt_send = 42; // FIXME?????
2223   LOG(GNUNET_ERROR_TYPE_DEBUG,
2224       "Initiating union operation evaluation\n");
2225   GNUNET_STATISTICS_update(_GSS_statistics,
2226                            "# of total union operations",
2227                            1,
2228                            GNUNET_NO);
2229   GNUNET_STATISTICS_update(_GSS_statistics,
2230                            "# of initiated union operations",
2231                            1,
2232                            GNUNET_NO);
2233   msg->operation = htonl(GNUNET_SET_OPERATION_UNION);
2234   GNUNET_MQ_send(op->mq,
2235                  ev);
2236
2237   if (NULL != opaque_context)
2238     LOG(GNUNET_ERROR_TYPE_DEBUG,
2239         "sent op request with context message\n");
2240   else
2241     LOG(GNUNET_ERROR_TYPE_DEBUG,
2242         "sent op request without context message\n");
2243
2244   op->state = state;
2245   initialize_key_to_element(op);
2246   state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element);
2247   return state;
2248 }
2249
2250
2251 /**
2252  * Accept an union operation request from a remote peer.
2253  * Only initializes the private operation state.
2254  *
2255  * @param op operation that will be accepted as a union operation
2256  */
2257 static struct OperationState *
2258 union_accept(struct Operation *op)
2259 {
2260   struct OperationState *state;
2261   const struct StrataEstimator *se;
2262   struct GNUNET_MQ_Envelope *ev;
2263   struct StrataEstimatorMessage *strata_msg;
2264   char *buf;
2265   size_t len;
2266   uint16_t type;
2267
2268   LOG(GNUNET_ERROR_TYPE_DEBUG,
2269       "accepting set union operation\n");
2270   GNUNET_STATISTICS_update(_GSS_statistics,
2271                            "# of accepted union operations",
2272                            1,
2273                            GNUNET_NO);
2274   GNUNET_STATISTICS_update(_GSS_statistics,
2275                            "# of total union operations",
2276                            1,
2277                            GNUNET_NO);
2278
2279   state = GNUNET_new(struct OperationState);
2280   state->se = strata_estimator_dup(op->set->state->se);
2281   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32,
2282                                                                 GNUNET_NO);
2283   state->salt_receive = state->salt_send = 42; // FIXME?????
2284   op->state = state;
2285   initialize_key_to_element(op);
2286   state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element);
2287
2288   /* kick off the operation */
2289   se = state->se;
2290   buf = GNUNET_malloc(se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2291   len = strata_estimator_write(se,
2292                                buf);
2293   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2294     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2295   else
2296     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2297   ev = GNUNET_MQ_msg_extra(strata_msg,
2298                            len,
2299                            type);
2300   GNUNET_memcpy(&strata_msg[1],
2301                 buf,
2302                 len);
2303   GNUNET_free(buf);
2304   strata_msg->set_size
2305     = GNUNET_htonll(GNUNET_CONTAINER_multihashmap_size(op->set->content->elements));
2306   GNUNET_MQ_send(op->mq,
2307                  ev);
2308   state->phase = PHASE_EXPECT_IBF;
2309   return state;
2310 }
2311
2312
2313 /**
2314  * Create a new set supporting the union operation
2315  *
2316  * We maintain one strata estimator per set and then manipulate it over the
2317  * lifetime of the set, as recreating a strata estimator would be expensive.
2318  *
2319  * @return the newly created set, NULL on error
2320  */
2321 static struct SetState *
2322 union_set_create(void)
2323 {
2324   struct SetState *set_state;
2325
2326   LOG(GNUNET_ERROR_TYPE_DEBUG,
2327       "union set created\n");
2328   set_state = GNUNET_new(struct SetState);
2329   set_state->se = strata_estimator_create(SE_STRATA_COUNT,
2330                                           SE_IBF_SIZE, SE_IBF_HASH_NUM);
2331   if (NULL == set_state->se)
2332     {
2333       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
2334                  "Failed to allocate strata estimator\n");
2335       GNUNET_free(set_state);
2336       return NULL;
2337     }
2338   return set_state;
2339 }
2340
2341
2342 /**
2343  * Add the element from the given element message to the set.
2344  *
2345  * @param set_state state of the set want to add to
2346  * @param ee the element to add to the set
2347  */
2348 static void
2349 union_add(struct SetState *set_state,
2350           struct ElementEntry *ee)
2351 {
2352   strata_estimator_insert(set_state->se,
2353                           get_ibf_key(&ee->element_hash));
2354 }
2355
2356
2357 /**
2358  * Remove the element given in the element message from the set.
2359  * Only marks the element as removed, so that older set operations can still exchange it.
2360  *
2361  * @param set_state state of the set to remove from
2362  * @param ee set element to remove
2363  */
2364 static void
2365 union_remove(struct SetState *set_state,
2366              struct ElementEntry *ee)
2367 {
2368   strata_estimator_remove(set_state->se,
2369                           get_ibf_key(&ee->element_hash));
2370 }
2371
2372
2373 /**
2374  * Destroy a set that supports the union operation.
2375  *
2376  * @param set_state the set to destroy
2377  */
2378 static void
2379 union_set_destroy(struct SetState *set_state)
2380 {
2381   if (NULL != set_state->se)
2382     {
2383       strata_estimator_destroy(set_state->se);
2384       set_state->se = NULL;
2385     }
2386   GNUNET_free(set_state);
2387 }
2388
2389
2390 /**
2391  * Copy union-specific set state.
2392  *
2393  * @param state source state for copying the union state
2394  * @return a copy of the union-specific set state
2395  */
2396 static struct SetState *
2397 union_copy_state(struct SetState *state)
2398 {
2399   struct SetState *new_state;
2400
2401   GNUNET_assert((NULL != state) &&
2402                 (NULL != state->se));
2403   new_state = GNUNET_new(struct SetState);
2404   new_state->se = strata_estimator_dup(state->se);
2405
2406   return new_state;
2407 }
2408
2409
2410 /**
2411  * Handle case where channel went down for an operation.
2412  *
2413  * @param op operation that lost the channel
2414  */
2415 static void
2416 union_channel_death(struct Operation *op)
2417 {
2418   send_client_done(op);
2419   _GSS_operation_destroy(op,
2420                          GNUNET_YES);
2421 }
2422
2423
2424 /**
2425  * Get the table with implementing functions for
2426  * set union.
2427  *
2428  * @return the operation specific VTable
2429  */
2430 const struct SetVT *
2431 _GSS_union_vt()
2432 {
2433   static const struct SetVT union_vt = {
2434     .create = &union_set_create,
2435     .add = &union_add,
2436     .remove = &union_remove,
2437     .destroy_set = &union_set_destroy,
2438     .evaluate = &union_evaluate,
2439     .accept = &union_accept,
2440     .cancel = &union_op_cancel,
2441     .copy_state = &union_copy_state,
2442     .channel_death = &union_channel_death
2443   };
2444
2445   return &union_vt;
2446 }