src: for every AGPL3.0 file, add SPDX identifier.
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2017 GNUnet e.V.
4
5       GNUnet is free software: you can redistribute it and/or modify it
6       under the terms of the GNU Affero General Public License as published
7       by the Free Software Foundation, either version 3 of the License,
8       or (at your option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       Affero General Public License for more details.
14
15       You should have received a copy of the GNU Affero General Public License
16       along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union.h"
32 #include "gnunet-service-set_union_strata_estimator.h"
33 #include "gnunet-service-set_protocol.h"
34 #include <gcrypt.h>
35
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
38
39
40 /**
41  * Number of IBFs in a strata estimator.
42  */
43 #define SE_STRATA_COUNT 32
44
45 /**
46  * Size of the IBFs in the strata estimator.
47  */
48 #define SE_IBF_SIZE 80
49
50 /**
51  * The hash num parameter for the difference digests and strata estimators.
52  */
53 #define SE_IBF_HASH_NUM 4
54
55 /**
56  * Number of buckets that can be transmitted in one message.
57  */
58 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
59
60 /**
61  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
62  * Choose this value so that computing the IBF is still cheaper
63  * than transmitting all values.
64  */
65 #define MAX_IBF_ORDER (20)
66
67 /**
68  * Number of buckets used in the ibf per estimated
69  * difference.
70  */
71 #define IBF_ALPHA 4
72
73
74 /**
75  * Current phase we are in for a union operation.
76  */
77 enum UnionOperationPhase
78 {
79   /**
80    * We sent the request message, and expect a strata estimator.
81    */
82   PHASE_EXPECT_SE,
83
84   /**
85    * We sent the strata estimator, and expect an IBF. This phase is entered once
86    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87    *
88    * XXX: could use better wording.
89    * XXX: repurposed to also expect a "request full set" message, should be renamed
90    *
91    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
92    */
93   PHASE_EXPECT_IBF,
94
95   /**
96    * Continuation for multi part IBFs.
97    */
98   PHASE_EXPECT_IBF_CONT,
99
100   /**
101    * We are decoding an IBF.
102    */
103   PHASE_INVENTORY_ACTIVE,
104
105   /**
106    * The other peer is decoding the IBF we just sent.
107    */
108   PHASE_INVENTORY_PASSIVE,
109
110   /**
111    * The protocol is almost finished, but we still have to flush our message
112    * queue and/or expect some elements.
113    */
114   PHASE_FINISH_CLOSING,
115
116   /**
117    * In the penultimate phase,
118    * we wait until all our demands
119    * are satisfied.  Then we send a done
120    * message, and wait for another done message.
121    */
122   PHASE_FINISH_WAITING,
123
124   /**
125    * In the ultimate phase, we wait until
126    * our demands are satisfied and then
127    * quit (sending another DONE message).
128    */
129   PHASE_DONE,
130
131   /**
132    * After sending the full set, wait for responses with the elements
133    * that the local peer is missing.
134    */
135   PHASE_FULL_SENDING,
136 };
137
138
139 /**
140  * State of an evaluate operation with another peer.
141  */
142 struct OperationState
143 {
144   /**
145    * Copy of the set's strata estimator at the time of
146    * creation of this operation.
147    */
148   struct StrataEstimator *se;
149
150   /**
151    * The IBF we currently receive.
152    */
153   struct InvertibleBloomFilter *remote_ibf;
154
155   /**
156    * The IBF with the local set's element.
157    */
158   struct InvertibleBloomFilter *local_ibf;
159
160   /**
161    * Maps unsalted IBF-Keys to elements.
162    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
163    * Colliding IBF-Keys are linked.
164    */
165   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
166
167   /**
168    * Current state of the operation.
169    */
170   enum UnionOperationPhase phase;
171
172   /**
173    * Did we send the client that we are done?
174    */
175   int client_done_sent;
176
177   /**
178    * Number of ibf buckets already received into the @a remote_ibf.
179    */
180   unsigned int ibf_buckets_received;
181
182   /**
183    * Hashes for elements that we have demanded from the other peer.
184    */
185   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
186
187   /**
188    * Salt that we're using for sending IBFs
189    */
190   uint32_t salt_send;
191
192   /**
193    * Salt for the IBF we've received and that we're currently decoding.
194    */
195   uint32_t salt_receive;
196
197   /**
198    * Number of elements we received from the other peer
199    * that were not in the local set yet.
200    */
201   uint32_t received_fresh;
202
203   /**
204    * Total number of elements received from the other peer.
205    */
206   uint32_t received_total;
207
208   /**
209    * Initial size of our set, just before
210    * the operation started.
211    */
212   uint64_t initial_size;
213 };
214
215
216 /**
217  * The key entry is used to associate an ibf key with an element.
218  */
219 struct KeyEntry
220 {
221   /**
222    * IBF key for the entry, derived from the current salt.
223    */
224   struct IBF_Key ibf_key;
225
226   /**
227    * The actual element associated with the key.
228    *
229    * Only owned by the union operation if element->operation
230    * is #GNUNET_YES.
231    */
232   struct ElementEntry *element;
233
234   /**
235    * Did we receive this element?
236    * Even if element->is_foreign is false, we might
237    * have received the element, so this indicates that
238    * the other peer has it.
239    */
240   int received;
241 };
242
243
244 /**
245  * Used as a closure for sending elements
246  * with a specific IBF key.
247  */
248 struct SendElementClosure
249 {
250   /**
251    * The IBF key whose matching elements should be
252    * sent.
253    */
254   struct IBF_Key ibf_key;
255
256   /**
257    * Operation for which the elements
258    * should be sent.
259    */
260   struct Operation *op;
261 };
262
263
264 /**
265  * Extra state required for efficient set union.
266  */
267 struct SetState
268 {
269   /**
270    * The strata estimator is only generated once for
271    * each set.
272    * The IBF keys are derived from the element hashes with
273    * salt=0.
274    */
275   struct StrataEstimator *se;
276 };
277
278
279 /**
280  * Iterator over hash map entries, called to
281  * destroy the linked list of colliding ibf key entries.
282  *
283  * @param cls closure
284  * @param key current key code
285  * @param value value in the hash map
286  * @return #GNUNET_YES if we should continue to iterate,
287  *         #GNUNET_NO if not.
288  */
289 static int
290 destroy_key_to_element_iter (void *cls,
291                              uint32_t key,
292                              void *value)
293 {
294   struct KeyEntry *k = value;
295
296   GNUNET_assert (NULL != k);
297   if (GNUNET_YES == k->element->remote)
298   {
299     GNUNET_free (k->element);
300     k->element = NULL;
301   }
302   GNUNET_free (k);
303   return GNUNET_YES;
304 }
305
306
307 /**
308  * Destroy the union operation.  Only things specific to the union
309  * operation are destroyed.
310  *
311  * @param op union operation to destroy
312  */
313 static void
314 union_op_cancel (struct Operation *op)
315 {
316   LOG (GNUNET_ERROR_TYPE_DEBUG,
317        "destroying union op\n");
318   /* check if the op was canceled twice */
319   GNUNET_assert (NULL != op->state);
320   if (NULL != op->state->remote_ibf)
321   {
322     ibf_destroy (op->state->remote_ibf);
323     op->state->remote_ibf = NULL;
324   }
325   if (NULL != op->state->demanded_hashes)
326   {
327     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
328     op->state->demanded_hashes = NULL;
329   }
330   if (NULL != op->state->local_ibf)
331   {
332     ibf_destroy (op->state->local_ibf);
333     op->state->local_ibf = NULL;
334   }
335   if (NULL != op->state->se)
336   {
337     strata_estimator_destroy (op->state->se);
338     op->state->se = NULL;
339   }
340   if (NULL != op->state->key_to_element)
341   {
342     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
343                                              &destroy_key_to_element_iter,
344                                              NULL);
345     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
346     op->state->key_to_element = NULL;
347   }
348   GNUNET_free (op->state);
349   op->state = NULL;
350   LOG (GNUNET_ERROR_TYPE_DEBUG,
351        "destroying union op done\n");
352 }
353
354
355 /**
356  * Inform the client that the union operation has failed,
357  * and proceed to destroy the evaluate operation.
358  *
359  * @param op the union operation to fail
360  */
361 static void
362 fail_union_operation (struct Operation *op)
363 {
364   struct GNUNET_MQ_Envelope *ev;
365   struct GNUNET_SET_ResultMessage *msg;
366
367   LOG (GNUNET_ERROR_TYPE_WARNING,
368        "union operation failed\n");
369   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371   msg->request_id = htonl (op->client_request_id);
372   msg->element_type = htons (0);
373   GNUNET_MQ_send (op->set->cs->mq,
374                   ev);
375   _GSS_operation_destroy (op, GNUNET_YES);
376 }
377
378
379 /**
380  * Derive the IBF key from a hash code and
381  * a salt.
382  *
383  * @param src the hash code
384  * @return the derived IBF key
385  */
386 static struct IBF_Key
387 get_ibf_key (const struct GNUNET_HashCode *src)
388 {
389   struct IBF_Key key;
390   uint16_t salt = 0;
391
392   GNUNET_assert (GNUNET_OK ==
393                  GNUNET_CRYPTO_kdf (&key, sizeof (key),
394                                     src, sizeof *src,
395                                     &salt, sizeof (salt),
396                                     NULL, 0));
397   return key;
398 }
399
400
401 /**
402  * Context for #op_get_element_iterator
403  */
404 struct GetElementContext
405 {
406   /**
407    * FIXME.
408    */
409   struct GNUNET_HashCode hash;
410
411   /**
412    * FIXME.
413    */
414   struct KeyEntry *k;
415 };
416
417
418 /**
419  * Iterator over the mapping from IBF keys to element entries.  Checks if we
420  * have an element with a given GNUNET_HashCode.
421  *
422  * @param cls closure
423  * @param key current key code
424  * @param value value in the hash map
425  * @return #GNUNET_YES if we should search further,
426  *         #GNUNET_NO if we've found the element.
427  */
428 static int
429 op_get_element_iterator (void *cls,
430                          uint32_t key,
431                          void *value)
432 {
433   struct GetElementContext *ctx = cls;
434   struct KeyEntry *k = value;
435
436   GNUNET_assert (NULL != k);
437   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
438                                    &ctx->hash))
439   {
440     ctx->k = k;
441     return GNUNET_NO;
442   }
443   return GNUNET_YES;
444 }
445
446
447 /**
448  * Determine whether the given element is already in the operation's element
449  * set.
450  *
451  * @param op operation that should be tested for 'element_hash'
452  * @param element_hash hash of the element to look for
453  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
454  */
455 static struct KeyEntry *
456 op_get_element (struct Operation *op,
457                 const struct GNUNET_HashCode *element_hash)
458 {
459   int ret;
460   struct IBF_Key ibf_key;
461   struct GetElementContext ctx = {{{ 0 }} , 0};
462
463   ctx.hash = *element_hash;
464
465   ibf_key = get_ibf_key (element_hash);
466   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
467                                                       (uint32_t) ibf_key.key_val,
468                                                       op_get_element_iterator,
469                                                       &ctx);
470
471   /* was the iteration aborted because we found the element? */
472   if (GNUNET_SYSERR == ret)
473   {
474     GNUNET_assert (NULL != ctx.k);
475     return ctx.k;
476   }
477   return NULL;
478 }
479
480
481 /**
482  * Insert an element into the union operation's
483  * key-to-element mapping. Takes ownership of 'ee'.
484  * Note that this does not insert the element in the set,
485  * only in the operation's key-element mapping.
486  * This is done to speed up re-tried operations, if some elements
487  * were transmitted, and then the IBF fails to decode.
488  *
489  * XXX: clarify ownership, doesn't sound right.
490  *
491  * @param op the union operation
492  * @param ee the element entry
493  * @parem received was this element received from the remote peer?
494  */
495 static void
496 op_register_element (struct Operation *op,
497                      struct ElementEntry *ee,
498                      int received)
499 {
500   struct IBF_Key ibf_key;
501   struct KeyEntry *k;
502
503   ibf_key = get_ibf_key (&ee->element_hash);
504   k = GNUNET_new (struct KeyEntry);
505   k->element = ee;
506   k->ibf_key = ibf_key;
507   k->received = received;
508   GNUNET_assert (GNUNET_OK ==
509                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
510                                                       (uint32_t) ibf_key.key_val,
511                                                       k,
512                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
513 }
514
515
516 /**
517  * FIXME.
518  */
519 static void
520 salt_key (const struct IBF_Key *k_in,
521           uint32_t salt,
522           struct IBF_Key *k_out)
523 {
524   int s = salt % 64;
525   uint64_t x = k_in->key_val;
526   /* rotate ibf key */
527   x = (x >> s) | (x << (64 - s));
528   k_out->key_val = x;
529 }
530
531
532 /**
533  * FIXME.
534  */
535 static void
536 unsalt_key (const struct IBF_Key *k_in,
537             uint32_t salt,
538             struct IBF_Key *k_out)
539 {
540   int s = salt % 64;
541   uint64_t x = k_in->key_val;
542   x = (x << s) | (x >> (64 - s));
543   k_out->key_val = x;
544 }
545
546
547 /**
548  * Insert a key into an ibf.
549  *
550  * @param cls the ibf
551  * @param key unused
552  * @param value the key entry to get the key from
553  */
554 static int
555 prepare_ibf_iterator (void *cls,
556                       uint32_t key,
557                       void *value)
558 {
559   struct Operation *op = cls;
560   struct KeyEntry *ke = value;
561   struct IBF_Key salted_key;
562
563   LOG (GNUNET_ERROR_TYPE_DEBUG,
564        "[OP %x] inserting %lx (hash %s) into ibf\n",
565        (void *) op,
566        (unsigned long) ke->ibf_key.key_val,
567        GNUNET_h2s (&ke->element->element_hash));
568   salt_key (&ke->ibf_key,
569             op->state->salt_send,
570             &salted_key);
571   ibf_insert (op->state->local_ibf, salted_key);
572   return GNUNET_YES;
573 }
574
575
576 /**
577  * Iterator for initializing the
578  * key-to-element mapping of a union operation
579  *
580  * @param cls the union operation `struct Operation *`
581  * @param key unused
582  * @param value the `struct ElementEntry *` to insert
583  *        into the key-to-element mapping
584  * @return #GNUNET_YES (to continue iterating)
585  */
586 static int
587 init_key_to_element_iterator (void *cls,
588                               const struct GNUNET_HashCode *key,
589                               void *value)
590 {
591   struct Operation *op = cls;
592   struct ElementEntry *ee = value;
593
594   /* make sure that the element belongs to the set at the time
595    * of creating the operation */
596   if (GNUNET_NO ==
597       _GSS_is_element_of_operation (ee,
598                                     op))
599     return GNUNET_YES;
600   GNUNET_assert (GNUNET_NO == ee->remote);
601   op_register_element (op,
602                        ee,
603                        GNUNET_NO);
604   return GNUNET_YES;
605 }
606
607
608 /**
609  * Initialize the IBF key to element mapping local to this set
610  * operation.
611  *
612  * @param op the set union operation
613  */
614 static void
615 initialize_key_to_element (struct Operation *op)
616 {
617   unsigned int len;
618
619   GNUNET_assert (NULL == op->state->key_to_element);
620   len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
621   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
622   GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
623                                          &init_key_to_element_iterator,
624                                          op);
625 }
626
627
628 /**
629  * Create an ibf with the operation's elements
630  * of the specified size
631  *
632  * @param op the union operation
633  * @param size size of the ibf to create
634  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
635  */
636 static int
637 prepare_ibf (struct Operation *op,
638              uint32_t size)
639 {
640   GNUNET_assert (NULL != op->state->key_to_element);
641
642   if (NULL != op->state->local_ibf)
643     ibf_destroy (op->state->local_ibf);
644   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
645   if (NULL == op->state->local_ibf)
646   {
647     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
648                 "Failed to allocate local IBF\n");
649     return GNUNET_SYSERR;
650   }
651   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
652                                            &prepare_ibf_iterator,
653                                            op);
654   return GNUNET_OK;
655 }
656
657
658 /**
659  * Send an ibf of appropriate size.
660  *
661  * Fragments the IBF into multiple messages if necessary.
662  *
663  * @param op the union operation
664  * @param ibf_order order of the ibf to send, size=2^order
665  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
666  */
667 static int
668 send_ibf (struct Operation *op,
669           uint16_t ibf_order)
670 {
671   unsigned int buckets_sent = 0;
672   struct InvertibleBloomFilter *ibf;
673
674   if (GNUNET_OK !=
675       prepare_ibf (op, 1<<ibf_order))
676   {
677     /* allocation failed */
678     return GNUNET_SYSERR;
679   }
680
681   LOG (GNUNET_ERROR_TYPE_DEBUG,
682        "sending ibf of size %u\n",
683        1<<ibf_order);
684
685   {
686     char name[64] = { 0 };
687     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
688     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
689   }
690
691   ibf = op->state->local_ibf;
692
693   while (buckets_sent < (1 << ibf_order))
694   {
695     unsigned int buckets_in_message;
696     struct GNUNET_MQ_Envelope *ev;
697     struct IBFMessage *msg;
698
699     buckets_in_message = (1 << ibf_order) - buckets_sent;
700     /* limit to maximum */
701     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
702       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
703
704     ev = GNUNET_MQ_msg_extra (msg,
705                               buckets_in_message * IBF_BUCKET_SIZE,
706                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
707     msg->reserved1 = 0;
708     msg->reserved2 = 0;
709     msg->order = ibf_order;
710     msg->offset = htonl (buckets_sent);
711     msg->salt = htonl (op->state->salt_send);
712     ibf_write_slice (ibf, buckets_sent,
713                      buckets_in_message, &msg[1]);
714     buckets_sent += buckets_in_message;
715     LOG (GNUNET_ERROR_TYPE_DEBUG,
716          "ibf chunk size %u, %u/%u sent\n",
717          buckets_in_message,
718          buckets_sent,
719          1<<ibf_order);
720     GNUNET_MQ_send (op->mq, ev);
721   }
722
723   /* The other peer must decode the IBF, so
724    * we're passive. */
725   op->state->phase = PHASE_INVENTORY_PASSIVE;
726   return GNUNET_OK;
727 }
728
729
730 /**
731  * Compute the necessary order of an ibf
732  * from the size of the symmetric set difference.
733  *
734  * @param diff the difference
735  * @return the required size of the ibf
736  */
737 static unsigned int
738 get_order_from_difference (unsigned int diff)
739 {
740   unsigned int ibf_order;
741
742   ibf_order = 2;
743   while ( ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
744             ((1<<ibf_order) < SE_IBF_HASH_NUM) ) &&
745           (ibf_order < MAX_IBF_ORDER) )
746     ibf_order++;
747   // add one for correction
748   return ibf_order + 1;
749 }
750
751
752 /**
753  * Send a set element.
754  *
755  * @param cls the union operation `struct Operation *`
756  * @param key unused
757  * @param value the `struct ElementEntry *` to insert
758  *        into the key-to-element mapping
759  * @return #GNUNET_YES (to continue iterating)
760  */
761 static int
762 send_full_element_iterator (void *cls,
763                             const struct GNUNET_HashCode *key,
764                             void *value)
765 {
766   struct Operation *op = cls;
767   struct GNUNET_SET_ElementMessage *emsg;
768   struct ElementEntry *ee = value;
769   struct GNUNET_SET_Element *el = &ee->element;
770   struct GNUNET_MQ_Envelope *ev;
771
772   LOG (GNUNET_ERROR_TYPE_DEBUG,
773        "Sending element %s\n",
774        GNUNET_h2s (key));
775   ev = GNUNET_MQ_msg_extra (emsg,
776                             el->size,
777                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
778   emsg->element_type = htons (el->element_type);
779   GNUNET_memcpy (&emsg[1],
780                  el->data,
781                  el->size);
782   GNUNET_MQ_send (op->mq,
783                   ev);
784   return GNUNET_YES;
785 }
786
787
788 /**
789  * Switch to full set transmission for @a op.
790  *
791  * @param op operation to switch to full set transmission.
792  */
793 static void
794 send_full_set (struct Operation *op)
795 {
796   struct GNUNET_MQ_Envelope *ev;
797
798   op->state->phase = PHASE_FULL_SENDING;
799   LOG (GNUNET_ERROR_TYPE_DEBUG,
800        "Dedicing to transmit the full set\n");
801   /* FIXME: use a more memory-friendly way of doing this with an
802      iterator, just as we do in the non-full case! */
803   (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
804                                                 &send_full_element_iterator,
805                                                 op);
806   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
807   GNUNET_MQ_send (op->mq,
808                   ev);
809 }
810
811
812 /**
813  * Handle a strata estimator from a remote peer
814  *
815  * @param cls the union operation
816  * @param msg the message
817  */
818 int
819 check_union_p2p_strata_estimator (void *cls,
820                                   const struct StrataEstimatorMessage *msg)
821 {
822   struct Operation *op = cls;
823   int is_compressed;
824   size_t len;
825
826   if (op->state->phase != PHASE_EXPECT_SE)
827   {
828     GNUNET_break (0);
829     return GNUNET_SYSERR;
830   }
831   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
832   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
833   if ( (GNUNET_NO == is_compressed) &&
834        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
835   {
836     GNUNET_break (0);
837     return GNUNET_SYSERR;
838   }
839   return GNUNET_OK;
840 }
841
842
843 /**
844  * Handle a strata estimator from a remote peer
845  *
846  * @param cls the union operation
847  * @param msg the message
848  */
849 void
850 handle_union_p2p_strata_estimator (void *cls,
851                                    const struct StrataEstimatorMessage *msg)
852 {
853   struct Operation *op = cls;
854   struct StrataEstimator *remote_se;
855   unsigned int diff;
856   uint64_t other_size;
857   size_t len;
858   int is_compressed;
859
860   is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
861   GNUNET_STATISTICS_update (_GSS_statistics,
862                             "# bytes of SE received",
863                             ntohs (msg->header.size),
864                             GNUNET_NO);
865   len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
866   other_size = GNUNET_ntohll (msg->set_size);
867   remote_se = strata_estimator_create (SE_STRATA_COUNT,
868                                        SE_IBF_SIZE,
869                                        SE_IBF_HASH_NUM);
870   if (NULL == remote_se)
871   {
872     /* insufficient resources, fail */
873     fail_union_operation (op);
874     return;
875   }
876   if (GNUNET_OK !=
877       strata_estimator_read (&msg[1],
878                              len,
879                              is_compressed,
880                              remote_se))
881   {
882     /* decompression failed */
883     strata_estimator_destroy (remote_se);
884     fail_union_operation (op);
885     return;
886   }
887   GNUNET_assert (NULL != op->state->se);
888   diff = strata_estimator_difference (remote_se,
889                                       op->state->se);
890
891   if (diff > 200)
892     diff = diff * 3 / 2;
893
894   strata_estimator_destroy (remote_se);
895   strata_estimator_destroy (op->state->se);
896   op->state->se = NULL;
897   LOG (GNUNET_ERROR_TYPE_DEBUG,
898        "got se diff=%d, using ibf size %d\n",
899        diff,
900        1U << get_order_from_difference (diff));
901
902   {
903     char *set_debug;
904
905     set_debug = getenv ("GNUNET_SET_BENCHMARK");
906     if ( (NULL != set_debug) &&
907          (0 == strcmp (set_debug, "1")) )
908     {
909       FILE *f = fopen ("set.log", "a");
910       fprintf (f, "%llu\n", (unsigned long long) diff);
911       fclose (f);
912     }
913   }
914
915   if ( (GNUNET_YES == op->byzantine) &&
916        (other_size < op->byzantine_lower_bound) )
917   {
918     GNUNET_break (0);
919     fail_union_operation (op);
920     return;
921   }
922
923   if ( (GNUNET_YES == op->force_full) ||
924        (diff > op->state->initial_size / 4) ||
925        (0 == other_size) )
926   {
927     LOG (GNUNET_ERROR_TYPE_DEBUG,
928          "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
929          diff,
930          op->state->initial_size);
931     GNUNET_STATISTICS_update (_GSS_statistics,
932                               "# of full sends",
933                               1,
934                               GNUNET_NO);
935     if ( (op->state->initial_size <= other_size) ||
936          (0 == other_size) )
937     {
938       send_full_set (op);
939     }
940     else
941     {
942       struct GNUNET_MQ_Envelope *ev;
943
944       LOG (GNUNET_ERROR_TYPE_DEBUG,
945            "Telling other peer that we expect its full set\n");
946       op->state->phase = PHASE_EXPECT_IBF;
947       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
948       GNUNET_MQ_send (op->mq,
949                       ev);
950     }
951   }
952   else
953   {
954     GNUNET_STATISTICS_update (_GSS_statistics,
955                               "# of ibf sends",
956                               1,
957                               GNUNET_NO);
958     if (GNUNET_OK !=
959         send_ibf (op,
960                   get_order_from_difference (diff)))
961     {
962       /* Internal error, best we can do is shut the connection */
963       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
964                   "Failed to send IBF, closing connection\n");
965       fail_union_operation (op);
966       return;
967     }
968   }
969   GNUNET_CADET_receive_done (op->channel);
970 }
971
972
973 /**
974  * Iterator to send elements to a remote peer
975  *
976  * @param cls closure with the element key and the union operation
977  * @param key ignored
978  * @param value the key entry
979  */
980 static int
981 send_offers_iterator (void *cls,
982                       uint32_t key,
983                       void *value)
984 {
985   struct SendElementClosure *sec = cls;
986   struct Operation *op = sec->op;
987   struct KeyEntry *ke = value;
988   struct GNUNET_MQ_Envelope *ev;
989   struct GNUNET_MessageHeader *mh;
990
991   /* Detect 32-bit key collision for the 64-bit IBF keys. */
992   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
993     return GNUNET_YES;
994
995   ev = GNUNET_MQ_msg_header_extra (mh,
996                                    sizeof (struct GNUNET_HashCode),
997                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
998
999   GNUNET_assert (NULL != ev);
1000   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1001   LOG (GNUNET_ERROR_TYPE_DEBUG,
1002        "[OP %x] sending element offer (%s) to peer\n",
1003        (void *) op,
1004        GNUNET_h2s (&ke->element->element_hash));
1005   GNUNET_MQ_send (op->mq, ev);
1006   return GNUNET_YES;
1007 }
1008
1009
1010 /**
1011  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1012  *
1013  * @param op union operation
1014  * @param ibf_key IBF key of interest
1015  */
1016 static void
1017 send_offers_for_key (struct Operation *op,
1018                      struct IBF_Key ibf_key)
1019 {
1020   struct SendElementClosure send_cls;
1021
1022   send_cls.ibf_key = ibf_key;
1023   send_cls.op = op;
1024   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
1025                                                        (uint32_t) ibf_key.key_val,
1026                                                        &send_offers_iterator,
1027                                                        &send_cls);
1028 }
1029
1030
1031 /**
1032  * Decode which elements are missing on each side, and
1033  * send the appropriate offers and inquiries.
1034  *
1035  * @param op union operation
1036  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1037  */
1038 static int
1039 decode_and_send (struct Operation *op)
1040 {
1041   struct IBF_Key key;
1042   struct IBF_Key last_key;
1043   int side;
1044   unsigned int num_decoded;
1045   struct InvertibleBloomFilter *diff_ibf;
1046
1047   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1048
1049   if (GNUNET_OK !=
1050       prepare_ibf (op,
1051                    op->state->remote_ibf->size))
1052   {
1053     GNUNET_break (0);
1054     /* allocation failed */
1055     return GNUNET_SYSERR;
1056   }
1057   diff_ibf = ibf_dup (op->state->local_ibf);
1058   ibf_subtract (diff_ibf,
1059                 op->state->remote_ibf);
1060
1061   ibf_destroy (op->state->remote_ibf);
1062   op->state->remote_ibf = NULL;
1063
1064   LOG (GNUNET_ERROR_TYPE_DEBUG,
1065        "decoding IBF (size=%u)\n",
1066        diff_ibf->size);
1067
1068   num_decoded = 0;
1069   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1070
1071   while (1)
1072   {
1073     int res;
1074     int cycle_detected = GNUNET_NO;
1075
1076     last_key = key;
1077
1078     res = ibf_decode (diff_ibf, &side, &key);
1079     if (res == GNUNET_OK)
1080     {
1081       LOG (GNUNET_ERROR_TYPE_DEBUG,
1082            "decoded ibf key %lx\n",
1083            (unsigned long) key.key_val);
1084       num_decoded += 1;
1085       if ( (num_decoded > diff_ibf->size) ||
1086            ( (num_decoded > 1) &&
1087              (last_key.key_val == key.key_val) ) )
1088       {
1089         LOG (GNUNET_ERROR_TYPE_DEBUG,
1090              "detected cyclic ibf (decoded %u/%u)\n",
1091              num_decoded,
1092              diff_ibf->size);
1093         cycle_detected = GNUNET_YES;
1094       }
1095     }
1096     if ( (GNUNET_SYSERR == res) ||
1097          (GNUNET_YES == cycle_detected) )
1098     {
1099       int next_order;
1100       next_order = 0;
1101       while (1<<next_order < diff_ibf->size)
1102         next_order++;
1103       next_order++;
1104       if (next_order <= MAX_IBF_ORDER)
1105       {
1106         LOG (GNUNET_ERROR_TYPE_DEBUG,
1107              "decoding failed, sending larger ibf (size %u)\n",
1108              1<<next_order);
1109         GNUNET_STATISTICS_update (_GSS_statistics,
1110                                   "# of IBF retries",
1111                                   1,
1112                                   GNUNET_NO);
1113         op->state->salt_send++;
1114         if (GNUNET_OK !=
1115             send_ibf (op, next_order))
1116         {
1117           /* Internal error, best we can do is shut the connection */
1118           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1119                       "Failed to send IBF, closing connection\n");
1120           fail_union_operation (op);
1121           ibf_destroy (diff_ibf);
1122           return GNUNET_SYSERR;
1123         }
1124       }
1125       else
1126       {
1127         GNUNET_STATISTICS_update (_GSS_statistics,
1128                                   "# of failed union operations (too large)",
1129                                   1,
1130                                   GNUNET_NO);
1131         // XXX: Send the whole set, element-by-element
1132         LOG (GNUNET_ERROR_TYPE_ERROR,
1133              "set union failed: reached ibf limit\n");
1134         fail_union_operation (op);
1135         ibf_destroy (diff_ibf);
1136         return GNUNET_SYSERR;
1137       }
1138       break;
1139     }
1140     if (GNUNET_NO == res)
1141     {
1142       struct GNUNET_MQ_Envelope *ev;
1143
1144       LOG (GNUNET_ERROR_TYPE_DEBUG,
1145            "transmitted all values, sending DONE\n");
1146       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1147       GNUNET_MQ_send (op->mq, ev);
1148       /* We now wait until we get a DONE message back
1149        * and then wait for our MQ to be flushed and all our
1150        * demands be delivered. */
1151       break;
1152     }
1153     if (1 == side)
1154     {
1155       struct IBF_Key unsalted_key;
1156
1157       unsalt_key (&key,
1158                   op->state->salt_receive,
1159                   &unsalted_key);
1160       send_offers_for_key (op,
1161                            unsalted_key);
1162     }
1163     else if (-1 == side)
1164     {
1165       struct GNUNET_MQ_Envelope *ev;
1166       struct InquiryMessage *msg;
1167
1168       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1169        * the effort additional complexity. */
1170       ev = GNUNET_MQ_msg_extra (msg,
1171                                 sizeof (struct IBF_Key),
1172                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1173       msg->salt = htonl (op->state->salt_receive);
1174       GNUNET_memcpy (&msg[1],
1175               &key,
1176               sizeof (struct IBF_Key));
1177       LOG (GNUNET_ERROR_TYPE_DEBUG,
1178            "sending element inquiry for IBF key %lx\n",
1179            (unsigned long) key.key_val);
1180       GNUNET_MQ_send (op->mq, ev);
1181     }
1182     else
1183     {
1184       GNUNET_assert (0);
1185     }
1186   }
1187   ibf_destroy (diff_ibf);
1188   return GNUNET_OK;
1189 }
1190
1191
1192 /**
1193  * Check an IBF message from a remote peer.
1194  *
1195  * Reassemble the IBF from multiple pieces, and
1196  * process the whole IBF once possible.
1197  *
1198  * @param cls the union operation
1199  * @param msg the header of the message
1200  * @return #GNUNET_OK if @a msg is well-formed
1201  */
1202 int
1203 check_union_p2p_ibf (void *cls,
1204                      const struct IBFMessage *msg)
1205 {
1206   struct Operation *op = cls;
1207   unsigned int buckets_in_message;
1208
1209   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1210   {
1211     GNUNET_break_op (0);
1212     return GNUNET_SYSERR;
1213   }
1214   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1215   if (0 == buckets_in_message)
1216   {
1217     GNUNET_break_op (0);
1218     return GNUNET_SYSERR;
1219   }
1220   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1221   {
1222     GNUNET_break_op (0);
1223     return GNUNET_SYSERR;
1224   }
1225   if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1226   {
1227     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1228     {
1229       GNUNET_break_op (0);
1230       return GNUNET_SYSERR;
1231     }
1232     if (1<<msg->order != op->state->remote_ibf->size)
1233     {
1234       GNUNET_break_op (0);
1235       return GNUNET_SYSERR;
1236     }
1237     if (ntohl (msg->salt) != op->state->salt_receive)
1238     {
1239       GNUNET_break_op (0);
1240       return GNUNET_SYSERR;
1241     }
1242   }
1243   else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1244             (op->state->phase != PHASE_EXPECT_IBF) )
1245   {
1246     GNUNET_break_op (0);
1247     return GNUNET_SYSERR;
1248   }
1249
1250   return GNUNET_OK;
1251 }
1252
1253
1254 /**
1255  * Handle an IBF message from a remote peer.
1256  *
1257  * Reassemble the IBF from multiple pieces, and
1258  * process the whole IBF once possible.
1259  *
1260  * @param cls the union operation
1261  * @param msg the header of the message
1262  */
1263 void
1264 handle_union_p2p_ibf (void *cls,
1265                       const struct IBFMessage *msg)
1266 {
1267   struct Operation *op = cls;
1268   unsigned int buckets_in_message;
1269
1270   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1271   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1272        (op->state->phase == PHASE_EXPECT_IBF) )
1273   {
1274     op->state->phase = PHASE_EXPECT_IBF_CONT;
1275     GNUNET_assert (NULL == op->state->remote_ibf);
1276     LOG (GNUNET_ERROR_TYPE_DEBUG,
1277          "Creating new ibf of size %u\n",
1278          1 << msg->order);
1279     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1280     op->state->salt_receive = ntohl (msg->salt);
1281     LOG (GNUNET_ERROR_TYPE_DEBUG,
1282          "Receiving new IBF with salt %u\n",
1283          op->state->salt_receive);
1284     if (NULL == op->state->remote_ibf)
1285     {
1286       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1287                   "Failed to parse remote IBF, closing connection\n");
1288       fail_union_operation (op);
1289       return;
1290     }
1291     op->state->ibf_buckets_received = 0;
1292     if (0 != ntohl (msg->offset))
1293     {
1294       GNUNET_break_op (0);
1295       fail_union_operation (op);
1296       return;
1297     }
1298   }
1299   else
1300   {
1301     GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1302     LOG (GNUNET_ERROR_TYPE_DEBUG,
1303          "Received more of IBF\n");
1304   }
1305   GNUNET_assert (NULL != op->state->remote_ibf);
1306
1307   ibf_read_slice (&msg[1],
1308                   op->state->ibf_buckets_received,
1309                   buckets_in_message,
1310                   op->state->remote_ibf);
1311   op->state->ibf_buckets_received += buckets_in_message;
1312
1313   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1314   {
1315     LOG (GNUNET_ERROR_TYPE_DEBUG,
1316          "received full ibf\n");
1317     op->state->phase = PHASE_INVENTORY_ACTIVE;
1318     if (GNUNET_OK !=
1319         decode_and_send (op))
1320     {
1321       /* Internal error, best we can do is shut down */
1322       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1323                   "Failed to decode IBF, closing connection\n");
1324       fail_union_operation (op);
1325       return;
1326     }
1327   }
1328   GNUNET_CADET_receive_done (op->channel);
1329 }
1330
1331
1332 /**
1333  * Send a result message to the client indicating
1334  * that there is a new element.
1335  *
1336  * @param op union operation
1337  * @param element element to send
1338  * @param status status to send with the new element
1339  */
1340 static void
1341 send_client_element (struct Operation *op,
1342                      struct GNUNET_SET_Element *element,
1343                      int status)
1344 {
1345   struct GNUNET_MQ_Envelope *ev;
1346   struct GNUNET_SET_ResultMessage *rm;
1347
1348   LOG (GNUNET_ERROR_TYPE_DEBUG,
1349        "sending element (size %u) to client\n",
1350        element->size);
1351   GNUNET_assert (0 != op->client_request_id);
1352   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1353   if (NULL == ev)
1354   {
1355     GNUNET_MQ_discard (ev);
1356     GNUNET_break (0);
1357     return;
1358   }
1359   rm->result_status = htons (status);
1360   rm->request_id = htonl (op->client_request_id);
1361   rm->element_type = htons (element->element_type);
1362   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1363   GNUNET_memcpy (&rm[1],
1364                  element->data,
1365                  element->size);
1366   GNUNET_MQ_send (op->set->cs->mq,
1367                   ev);
1368 }
1369
1370
1371 /**
1372  * Destroy remote channel.
1373  *
1374  * @param op operation
1375  */
1376 static void
1377 destroy_channel (struct Operation *op)
1378 {
1379   struct GNUNET_CADET_Channel *channel;
1380
1381   if (NULL != (channel = op->channel))
1382   {
1383     /* This will free op; called conditionally as this helper function
1384        is also called from within the channel disconnect handler. */
1385     op->channel = NULL;
1386     GNUNET_CADET_channel_destroy (channel);
1387   }
1388 }
1389
1390
1391 /**
1392  * Signal to the client that the operation has finished and
1393  * destroy the operation.
1394  *
1395  * @param cls operation to destroy
1396  */
1397 static void
1398 send_client_done (void *cls)
1399 {
1400   struct Operation *op = cls;
1401   struct GNUNET_MQ_Envelope *ev;
1402   struct GNUNET_SET_ResultMessage *rm;
1403
1404   if (GNUNET_YES == op->state->client_done_sent)
1405   {
1406     return;
1407   }
1408
1409   if (PHASE_DONE != op->state->phase) {
1410     LOG (GNUNET_ERROR_TYPE_WARNING,
1411          "Union operation failed\n");
1412     GNUNET_STATISTICS_update (_GSS_statistics,
1413                               "# Union operations failed",
1414                               1,
1415                               GNUNET_NO);
1416     ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1417     rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1418     rm->request_id = htonl (op->client_request_id);
1419     rm->element_type = htons (0);
1420     GNUNET_MQ_send (op->set->cs->mq,
1421                     ev);
1422     return;
1423   }
1424
1425   op->state->client_done_sent = GNUNET_YES;
1426
1427   GNUNET_STATISTICS_update (_GSS_statistics,
1428                             "# Union operations succeeded",
1429                             1,
1430                             GNUNET_NO);
1431   LOG (GNUNET_ERROR_TYPE_INFO,
1432        "Signalling client that union operation is done\n");
1433   ev = GNUNET_MQ_msg (rm,
1434                       GNUNET_MESSAGE_TYPE_SET_RESULT);
1435   rm->request_id = htonl (op->client_request_id);
1436   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1437   rm->element_type = htons (0);
1438   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1439   GNUNET_MQ_send (op->set->cs->mq,
1440                   ev);
1441 }
1442
1443
1444 /**
1445  * Tests if the operation is finished, and if so notify.
1446  *
1447  * @param op operation to check
1448  */
1449 static void
1450 maybe_finish (struct Operation *op)
1451 {
1452   unsigned int num_demanded;
1453
1454   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1455
1456   if (PHASE_FINISH_WAITING == op->state->phase)
1457   {
1458     LOG (GNUNET_ERROR_TYPE_DEBUG,
1459          "In PHASE_FINISH_WAITING, pending %u demands\n",
1460          num_demanded);
1461     if (0 == num_demanded)
1462     {
1463       struct GNUNET_MQ_Envelope *ev;
1464
1465       op->state->phase = PHASE_DONE;
1466       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1467       GNUNET_MQ_send (op->mq,
1468                       ev);
1469       /* We now wait until the other peer sends P2P_OVER
1470        * after it got all elements from us. */
1471     }
1472   }
1473   if (PHASE_FINISH_CLOSING == op->state->phase)
1474   {
1475     LOG (GNUNET_ERROR_TYPE_DEBUG,
1476          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1477          num_demanded);
1478     if (0 == num_demanded)
1479     {
1480       op->state->phase = PHASE_DONE;
1481       send_client_done (op);
1482       _GSS_operation_destroy2 (op);
1483     }
1484   }
1485 }
1486
1487
1488 /**
1489  * Check an element message from a remote peer.
1490  *
1491  * @param cls the union operation
1492  * @param emsg the message
1493  */
1494 int
1495 check_union_p2p_elements (void *cls,
1496                           const struct GNUNET_SET_ElementMessage *emsg)
1497 {
1498   struct Operation *op = cls;
1499
1500   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1501   {
1502     GNUNET_break_op (0);
1503     return GNUNET_SYSERR;
1504   }
1505   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1506   {
1507     GNUNET_break_op (0);
1508     return GNUNET_SYSERR;
1509   }
1510   return GNUNET_OK;
1511 }
1512
1513
1514 /**
1515  * Handle an element message from a remote peer.
1516  * Sent by the other peer either because we decoded an IBF and placed a demand,
1517  * or because the other peer switched to full set transmission.
1518  *
1519  * @param cls the union operation
1520  * @param emsg the message
1521  */
1522 void
1523 handle_union_p2p_elements (void *cls,
1524                            const struct GNUNET_SET_ElementMessage *emsg)
1525 {
1526   struct Operation *op = cls;
1527   struct ElementEntry *ee;
1528   struct KeyEntry *ke;
1529   uint16_t element_size;
1530
1531   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1532   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1533   GNUNET_memcpy (&ee[1],
1534                  &emsg[1],
1535                  element_size);
1536   ee->element.size = element_size;
1537   ee->element.data = &ee[1];
1538   ee->element.element_type = ntohs (emsg->element_type);
1539   ee->remote = GNUNET_YES;
1540   GNUNET_SET_element_hash (&ee->element,
1541                            &ee->element_hash);
1542   if (GNUNET_NO ==
1543       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1544                                             &ee->element_hash,
1545                                             NULL))
1546   {
1547     /* We got something we didn't demand, since it's not in our map. */
1548     GNUNET_break_op (0);
1549     fail_union_operation (op);
1550     return;
1551   }
1552
1553   LOG (GNUNET_ERROR_TYPE_DEBUG,
1554        "Got element (size %u, hash %s) from peer\n",
1555        (unsigned int) element_size,
1556        GNUNET_h2s (&ee->element_hash));
1557
1558   GNUNET_STATISTICS_update (_GSS_statistics,
1559                             "# received elements",
1560                             1,
1561                             GNUNET_NO);
1562   GNUNET_STATISTICS_update (_GSS_statistics,
1563                             "# exchanged elements",
1564                             1,
1565                             GNUNET_NO);
1566
1567   op->state->received_total++;
1568
1569   ke = op_get_element (op, &ee->element_hash);
1570   if (NULL != ke)
1571   {
1572     /* Got repeated element.  Should not happen since
1573      * we track demands. */
1574     GNUNET_STATISTICS_update (_GSS_statistics,
1575                               "# repeated elements",
1576                               1,
1577                               GNUNET_NO);
1578     ke->received = GNUNET_YES;
1579     GNUNET_free (ee);
1580   }
1581   else
1582   {
1583     LOG (GNUNET_ERROR_TYPE_DEBUG,
1584          "Registering new element from remote peer\n");
1585     op->state->received_fresh++;
1586     op_register_element (op, ee, GNUNET_YES);
1587     /* only send results immediately if the client wants it */
1588     switch (op->result_mode)
1589     {
1590       case GNUNET_SET_RESULT_ADDED:
1591         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1592         break;
1593       case GNUNET_SET_RESULT_SYMMETRIC:
1594         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1595         break;
1596       default:
1597         /* Result mode not supported, should have been caught earlier. */
1598         GNUNET_break (0);
1599         break;
1600     }
1601   }
1602
1603   if ( (op->state->received_total > 8) &&
1604        (op->state->received_fresh < op->state->received_total / 3) )
1605   {
1606     /* The other peer gave us lots of old elements, there's something wrong. */
1607     GNUNET_break_op (0);
1608     fail_union_operation (op);
1609     return;
1610   }
1611   GNUNET_CADET_receive_done (op->channel);
1612   maybe_finish (op);
1613 }
1614
1615
1616 /**
1617  * Check a full element message from a remote peer.
1618  *
1619  * @param cls the union operation
1620  * @param emsg the message
1621  */
1622 int
1623 check_union_p2p_full_element (void *cls,
1624                               const struct GNUNET_SET_ElementMessage *emsg)
1625 {
1626   struct Operation *op = cls;
1627
1628   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1629   {
1630     GNUNET_break_op (0);
1631     return GNUNET_SYSERR;
1632   }
1633   // FIXME: check that we expect full elements here?
1634   return GNUNET_OK;
1635 }
1636
1637
1638 /**
1639  * Handle an element message from a remote peer.
1640  *
1641  * @param cls the union operation
1642  * @param emsg the message
1643  */
1644 void
1645 handle_union_p2p_full_element (void *cls,
1646                                const struct GNUNET_SET_ElementMessage *emsg)
1647 {
1648   struct Operation *op = cls;
1649   struct ElementEntry *ee;
1650   struct KeyEntry *ke;
1651   uint16_t element_size;
1652
1653   element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1654   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1655   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1656   ee->element.size = element_size;
1657   ee->element.data = &ee[1];
1658   ee->element.element_type = ntohs (emsg->element_type);
1659   ee->remote = GNUNET_YES;
1660   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1661
1662   LOG (GNUNET_ERROR_TYPE_DEBUG,
1663        "Got element (full diff, size %u, hash %s) from peer\n",
1664        (unsigned int) element_size,
1665        GNUNET_h2s (&ee->element_hash));
1666
1667   GNUNET_STATISTICS_update (_GSS_statistics,
1668                             "# received elements",
1669                             1,
1670                             GNUNET_NO);
1671   GNUNET_STATISTICS_update (_GSS_statistics,
1672                             "# exchanged elements",
1673                             1,
1674                             GNUNET_NO);
1675
1676   op->state->received_total++;
1677
1678   ke = op_get_element (op, &ee->element_hash);
1679   if (NULL != ke)
1680   {
1681     /* Got repeated element.  Should not happen since
1682      * we track demands. */
1683     GNUNET_STATISTICS_update (_GSS_statistics,
1684                               "# repeated elements",
1685                               1,
1686                               GNUNET_NO);
1687     ke->received = GNUNET_YES;
1688     GNUNET_free (ee);
1689   }
1690   else
1691   {
1692     LOG (GNUNET_ERROR_TYPE_DEBUG,
1693          "Registering new element from remote peer\n");
1694     op->state->received_fresh++;
1695     op_register_element (op, ee, GNUNET_YES);
1696     /* only send results immediately if the client wants it */
1697     switch (op->result_mode)
1698     {
1699       case GNUNET_SET_RESULT_ADDED:
1700         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1701         break;
1702       case GNUNET_SET_RESULT_SYMMETRIC:
1703         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1704         break;
1705       default:
1706         /* Result mode not supported, should have been caught earlier. */
1707         GNUNET_break (0);
1708         break;
1709     }
1710   }
1711
1712   if ( (GNUNET_YES == op->byzantine) &&
1713        (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1714        (op->state->received_fresh < op->state->received_total / 6) )
1715   {
1716     /* The other peer gave us lots of old elements, there's something wrong. */
1717     LOG (GNUNET_ERROR_TYPE_ERROR,
1718          "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1719          (unsigned long long) op->state->received_fresh,
1720          (unsigned long long) op->state->received_total);
1721     GNUNET_break_op (0);
1722     fail_union_operation (op);
1723     return;
1724   }
1725   GNUNET_CADET_receive_done (op->channel);
1726 }
1727
1728
1729 /**
1730  * Send offers (for GNUNET_Hash-es) in response
1731  * to inquiries (for IBF_Key-s).
1732  *
1733  * @param cls the union operation
1734  * @param msg the message
1735  */
1736 int
1737 check_union_p2p_inquiry (void *cls,
1738                          const struct InquiryMessage *msg)
1739 {
1740   struct Operation *op = cls;
1741   unsigned int num_keys;
1742
1743   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1744   {
1745     GNUNET_break_op (0);
1746     return GNUNET_SYSERR;
1747   }
1748   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1749   {
1750     GNUNET_break_op (0);
1751     return GNUNET_SYSERR;
1752   }
1753   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1754     / sizeof (struct IBF_Key);
1755   if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1756       != num_keys * sizeof (struct IBF_Key))
1757   {
1758     GNUNET_break_op (0);
1759     return GNUNET_SYSERR;
1760   }
1761   return GNUNET_OK;
1762 }
1763
1764
1765 /**
1766  * Send offers (for GNUNET_Hash-es) in response
1767  * to inquiries (for IBF_Key-s).
1768  *
1769  * @param cls the union operation
1770  * @param msg the message
1771  */
1772 void
1773 handle_union_p2p_inquiry (void *cls,
1774                           const struct InquiryMessage *msg)
1775 {
1776   struct Operation *op = cls;
1777   const struct IBF_Key *ibf_key;
1778   unsigned int num_keys;
1779
1780   LOG (GNUNET_ERROR_TYPE_DEBUG,
1781        "Received union inquiry\n");
1782   num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1783     / sizeof (struct IBF_Key);
1784   ibf_key = (const struct IBF_Key *) &msg[1];
1785   while (0 != num_keys--)
1786   {
1787     struct IBF_Key unsalted_key;
1788
1789     unsalt_key (ibf_key,
1790                 ntohl (msg->salt),
1791                 &unsalted_key);
1792     send_offers_for_key (op,
1793                          unsalted_key);
1794     ibf_key++;
1795   }
1796   GNUNET_CADET_receive_done (op->channel);
1797 }
1798
1799
1800 /**
1801  * Iterator over hash map entries, called to
1802  * destroy the linked list of colliding ibf key entries.
1803  *
1804  * @param cls closure
1805  * @param key current key code
1806  * @param value value in the hash map
1807  * @return #GNUNET_YES if we should continue to iterate,
1808  *         #GNUNET_NO if not.
1809  */
1810 static int
1811 send_missing_full_elements_iter (void *cls,
1812                                  uint32_t key,
1813                                  void *value)
1814 {
1815   struct Operation *op = cls;
1816   struct KeyEntry *ke = value;
1817   struct GNUNET_MQ_Envelope *ev;
1818   struct GNUNET_SET_ElementMessage *emsg;
1819   struct ElementEntry *ee = ke->element;
1820
1821   if (GNUNET_YES == ke->received)
1822     return GNUNET_YES;
1823   ev = GNUNET_MQ_msg_extra (emsg,
1824                             ee->element.size,
1825                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1826   GNUNET_memcpy (&emsg[1],
1827                  ee->element.data,
1828                  ee->element.size);
1829   emsg->element_type = htons (ee->element.element_type);
1830   GNUNET_MQ_send (op->mq,
1831                   ev);
1832   return GNUNET_YES;
1833 }
1834
1835
1836 /**
1837  * Handle a request for full set transmission.
1838  *
1839  * @parem cls closure, a set union operation
1840  * @param mh the demand message
1841  */
1842 void
1843 handle_union_p2p_request_full (void *cls,
1844                                const struct GNUNET_MessageHeader *mh)
1845 {
1846   struct Operation *op = cls;
1847
1848   LOG (GNUNET_ERROR_TYPE_DEBUG,
1849        "Received request for full set transmission\n");
1850   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1851   {
1852     GNUNET_break_op (0);
1853     fail_union_operation (op);
1854     return;
1855   }
1856   if (PHASE_EXPECT_IBF != op->state->phase)
1857   {
1858     GNUNET_break_op (0);
1859     fail_union_operation (op);
1860     return;
1861   }
1862
1863   // FIXME: we need to check that our set is larger than the
1864   // byzantine_lower_bound by some threshold
1865   send_full_set (op);
1866   GNUNET_CADET_receive_done (op->channel);
1867 }
1868
1869
1870 /**
1871  * Handle a "full done" message.
1872  *
1873  * @parem cls closure, a set union operation
1874  * @param mh the demand message
1875  */
1876 void
1877 handle_union_p2p_full_done (void *cls,
1878                             const struct GNUNET_MessageHeader *mh)
1879 {
1880   struct Operation *op = cls;
1881
1882   switch (op->state->phase)
1883   {
1884   case PHASE_EXPECT_IBF:
1885     {
1886       struct GNUNET_MQ_Envelope *ev;
1887
1888       LOG (GNUNET_ERROR_TYPE_DEBUG,
1889            "got FULL DONE, sending elements that other peer is missing\n");
1890
1891       /* send all the elements that did not come from the remote peer */
1892       GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1893                                                &send_missing_full_elements_iter,
1894                                                op);
1895
1896       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1897       GNUNET_MQ_send (op->mq,
1898                       ev);
1899       op->state->phase = PHASE_DONE;
1900       /* we now wait until the other peer sends us the OVER message*/
1901     }
1902     break;
1903   case PHASE_FULL_SENDING:
1904     {
1905       LOG (GNUNET_ERROR_TYPE_DEBUG,
1906            "got FULL DONE, finishing\n");
1907       /* We sent the full set, and got the response for that.  We're done. */
1908       op->state->phase = PHASE_DONE;
1909       GNUNET_CADET_receive_done (op->channel);
1910       send_client_done (op);
1911       _GSS_operation_destroy2 (op);
1912       return;
1913     }
1914     break;
1915   default:
1916     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917                 "Handle full done phase is %u\n",
1918                 (unsigned) op->state->phase);
1919     GNUNET_break_op (0);
1920     fail_union_operation (op);
1921     return;
1922   }
1923   GNUNET_CADET_receive_done (op->channel);
1924 }
1925
1926
1927 /**
1928  * Check a demand by the other peer for elements based on a list
1929  * of `struct GNUNET_HashCode`s.
1930  *
1931  * @parem cls closure, a set union operation
1932  * @param mh the demand message
1933  * @return #GNUNET_OK if @a mh is well-formed
1934  */
1935 int
1936 check_union_p2p_demand (void *cls,
1937                         const struct GNUNET_MessageHeader *mh)
1938 {
1939   struct Operation *op = cls;
1940   unsigned int num_hashes;
1941
1942   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1943   {
1944     GNUNET_break_op (0);
1945     return GNUNET_SYSERR;
1946   }
1947   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1948     / sizeof (struct GNUNET_HashCode);
1949   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1950       != num_hashes * sizeof (struct GNUNET_HashCode))
1951   {
1952     GNUNET_break_op (0);
1953     return GNUNET_SYSERR;
1954   }
1955   return GNUNET_OK;
1956 }
1957
1958
1959 /**
1960  * Handle a demand by the other peer for elements based on a list
1961  * of `struct GNUNET_HashCode`s.
1962  *
1963  * @parem cls closure, a set union operation
1964  * @param mh the demand message
1965  */
1966 void
1967 handle_union_p2p_demand (void *cls,
1968                          const struct GNUNET_MessageHeader *mh)
1969 {
1970   struct Operation *op = cls;
1971   struct ElementEntry *ee;
1972   struct GNUNET_SET_ElementMessage *emsg;
1973   const struct GNUNET_HashCode *hash;
1974   unsigned int num_hashes;
1975   struct GNUNET_MQ_Envelope *ev;
1976
1977   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1978     / sizeof (struct GNUNET_HashCode);
1979   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1980        num_hashes > 0;
1981        hash++, num_hashes--)
1982   {
1983     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1984                                             hash);
1985     if (NULL == ee)
1986     {
1987       /* Demand for non-existing element. */
1988       GNUNET_break_op (0);
1989       fail_union_operation (op);
1990       return;
1991     }
1992     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1993     {
1994       /* Probably confused lazily copied sets. */
1995       GNUNET_break_op (0);
1996       fail_union_operation (op);
1997       return;
1998     }
1999     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
2000     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
2001     emsg->reserved = htons (0);
2002     emsg->element_type = htons (ee->element.element_type);
2003     LOG (GNUNET_ERROR_TYPE_DEBUG,
2004          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
2005          (void *) op,
2006          (unsigned int) ee->element.size,
2007          GNUNET_h2s (&ee->element_hash));
2008     GNUNET_MQ_send (op->mq, ev);
2009     GNUNET_STATISTICS_update (_GSS_statistics,
2010                               "# exchanged elements",
2011                               1,
2012                               GNUNET_NO);
2013
2014     switch (op->result_mode)
2015     {
2016       case GNUNET_SET_RESULT_ADDED:
2017         /* Nothing to do. */
2018         break;
2019       case GNUNET_SET_RESULT_SYMMETRIC:
2020         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2021         break;
2022       default:
2023         /* Result mode not supported, should have been caught earlier. */
2024         GNUNET_break (0);
2025         break;
2026     }
2027   }
2028   GNUNET_CADET_receive_done (op->channel);
2029 }
2030
2031
2032 /**
2033  * Check offer (of `struct GNUNET_HashCode`s).
2034  *
2035  * @param cls the union operation
2036  * @param mh the message
2037  * @return #GNUNET_OK if @a mh is well-formed
2038  */
2039 int
2040 check_union_p2p_offer (void *cls,
2041                         const struct GNUNET_MessageHeader *mh)
2042 {
2043   struct Operation *op = cls;
2044   unsigned int num_hashes;
2045
2046   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2047   {
2048     GNUNET_break_op (0);
2049     return GNUNET_SYSERR;
2050   }
2051   /* look up elements and send them */
2052   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2053        (op->state->phase != PHASE_INVENTORY_ACTIVE))
2054   {
2055     GNUNET_break_op (0);
2056     return GNUNET_SYSERR;
2057   }
2058   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2059     / sizeof (struct GNUNET_HashCode);
2060   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) !=
2061       num_hashes * sizeof (struct GNUNET_HashCode))
2062   {
2063     GNUNET_break_op (0);
2064     return GNUNET_SYSERR;
2065   }
2066   return GNUNET_OK;
2067 }
2068
2069
2070 /**
2071  * Handle offers (of `struct GNUNET_HashCode`s) and
2072  * respond with demands (of `struct GNUNET_HashCode`s).
2073  *
2074  * @param cls the union operation
2075  * @param mh the message
2076  */
2077 void
2078 handle_union_p2p_offer (void *cls,
2079                         const struct GNUNET_MessageHeader *mh)
2080 {
2081   struct Operation *op = cls;
2082   const struct GNUNET_HashCode *hash;
2083   unsigned int num_hashes;
2084
2085   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2086     / sizeof (struct GNUNET_HashCode);
2087   for (hash = (const struct GNUNET_HashCode *) &mh[1];
2088        num_hashes > 0;
2089        hash++, num_hashes--)
2090   {
2091     struct ElementEntry *ee;
2092     struct GNUNET_MessageHeader *demands;
2093     struct GNUNET_MQ_Envelope *ev;
2094
2095     ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2096                                             hash);
2097     if (NULL != ee)
2098       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2099         continue;
2100
2101     if (GNUNET_YES ==
2102         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2103                                                 hash))
2104     {
2105       LOG (GNUNET_ERROR_TYPE_DEBUG,
2106            "Skipped sending duplicate demand\n");
2107       continue;
2108     }
2109
2110     GNUNET_assert (GNUNET_OK ==
2111                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
2112                                                       hash,
2113                                                       NULL,
2114                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2115
2116     LOG (GNUNET_ERROR_TYPE_DEBUG,
2117          "[OP %x] Requesting element (hash %s)\n",
2118          (void *) op, GNUNET_h2s (hash));
2119     ev = GNUNET_MQ_msg_header_extra (demands,
2120                                      sizeof (struct GNUNET_HashCode),
2121                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2122     GNUNET_memcpy (&demands[1],
2123                    hash,
2124                    sizeof (struct GNUNET_HashCode));
2125     GNUNET_MQ_send (op->mq, ev);
2126   }
2127   GNUNET_CADET_receive_done (op->channel);
2128 }
2129
2130
2131 /**
2132  * Handle a done message from a remote peer
2133  *
2134  * @param cls the union operation
2135  * @param mh the message
2136  */
2137 void
2138 handle_union_p2p_done (void *cls,
2139                        const struct GNUNET_MessageHeader *mh)
2140 {
2141   struct Operation *op = cls;
2142
2143   if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2144   {
2145     GNUNET_break_op (0);
2146     fail_union_operation (op);
2147     return;
2148   }
2149   switch (op->state->phase)
2150   {
2151   case PHASE_INVENTORY_PASSIVE:
2152     /* We got all requests, but still have to send our elements in response. */
2153     op->state->phase = PHASE_FINISH_WAITING;
2154
2155     LOG (GNUNET_ERROR_TYPE_DEBUG,
2156          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2157     /* The active peer is done sending offers
2158      * and inquiries.  This means that all
2159      * our responses to that (demands and offers)
2160      * must be in flight (queued or in mesh).
2161      *
2162      * We should notify the active peer once
2163      * all our demands are satisfied, so that the active
2164      * peer can quit if we gave it everything.
2165      */
2166     GNUNET_CADET_receive_done (op->channel);
2167     maybe_finish (op);
2168     return;
2169   case PHASE_INVENTORY_ACTIVE:
2170     LOG (GNUNET_ERROR_TYPE_DEBUG,
2171          "got DONE (as active partner), waiting to finish\n");
2172     /* All demands of the other peer are satisfied,
2173      * and we processed all offers, thus we know
2174      * exactly what our demands must be.
2175      *
2176      * We'll close the channel
2177      * to the other peer once our demands are met.
2178      */
2179     op->state->phase = PHASE_FINISH_CLOSING;
2180     GNUNET_CADET_receive_done (op->channel);
2181     maybe_finish (op);
2182     return;
2183   default:
2184     GNUNET_break_op (0);
2185     fail_union_operation (op);
2186     return;
2187   }
2188 }
2189
2190 /**
2191  * Handle a over message from a remote peer
2192  *
2193  * @param cls the union operation
2194  * @param mh the message
2195  */
2196 void
2197 handle_union_p2p_over (void *cls,
2198                        const struct GNUNET_MessageHeader *mh)
2199 {
2200   send_client_done (cls);
2201 }
2202
2203
2204 /**
2205  * Initiate operation to evaluate a set union with a remote peer.
2206  *
2207  * @param op operation to perform (to be initialized)
2208  * @param opaque_context message to be transmitted to the listener
2209  *        to convince it to accept, may be NULL
2210  */
2211 static struct OperationState *
2212 union_evaluate (struct Operation *op,
2213                 const struct GNUNET_MessageHeader *opaque_context)
2214 {
2215   struct OperationState *state;
2216   struct GNUNET_MQ_Envelope *ev;
2217   struct OperationRequestMessage *msg;
2218
2219   ev = GNUNET_MQ_msg_nested_mh (msg,
2220                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2221                                 opaque_context);
2222   if (NULL == ev)
2223   {
2224     /* the context message is too large */
2225     GNUNET_break (0);
2226     return NULL;
2227   }
2228   state = GNUNET_new (struct OperationState);
2229   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2230                                                                  GNUNET_NO);
2231   /* copy the current generation's strata estimator for this operation */
2232   state->se = strata_estimator_dup (op->set->state->se);
2233   /* we started the operation, thus we have to send the operation request */
2234   state->phase = PHASE_EXPECT_SE;
2235   state->salt_receive = state->salt_send = 42; // FIXME?????
2236   LOG (GNUNET_ERROR_TYPE_DEBUG,
2237        "Initiating union operation evaluation\n");
2238   GNUNET_STATISTICS_update (_GSS_statistics,
2239                             "# of total union operations",
2240                             1,
2241                             GNUNET_NO);
2242   GNUNET_STATISTICS_update (_GSS_statistics,
2243                             "# of initiated union operations",
2244                             1,
2245                             GNUNET_NO);
2246   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2247   GNUNET_MQ_send (op->mq,
2248                   ev);
2249
2250   if (NULL != opaque_context)
2251     LOG (GNUNET_ERROR_TYPE_DEBUG,
2252          "sent op request with context message\n");
2253   else
2254     LOG (GNUNET_ERROR_TYPE_DEBUG,
2255          "sent op request without context message\n");
2256
2257   op->state = state;
2258   initialize_key_to_element (op);
2259   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2260   return state;
2261 }
2262
2263
2264 /**
2265  * Accept an union operation request from a remote peer.
2266  * Only initializes the private operation state.
2267  *
2268  * @param op operation that will be accepted as a union operation
2269  */
2270 static struct OperationState *
2271 union_accept (struct Operation *op)
2272 {
2273   struct OperationState *state;
2274   const struct StrataEstimator *se;
2275   struct GNUNET_MQ_Envelope *ev;
2276   struct StrataEstimatorMessage *strata_msg;
2277   char *buf;
2278   size_t len;
2279   uint16_t type;
2280
2281   LOG (GNUNET_ERROR_TYPE_DEBUG,
2282        "accepting set union operation\n");
2283   GNUNET_STATISTICS_update (_GSS_statistics,
2284                             "# of accepted union operations",
2285                             1,
2286                             GNUNET_NO);
2287   GNUNET_STATISTICS_update (_GSS_statistics,
2288                             "# of total union operations",
2289                             1,
2290                             GNUNET_NO);
2291
2292   state = GNUNET_new (struct OperationState);
2293   state->se = strata_estimator_dup (op->set->state->se);
2294   state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2295                                                                  GNUNET_NO);
2296   state->salt_receive = state->salt_send = 42; // FIXME?????
2297   op->state = state;
2298   initialize_key_to_element (op);
2299   state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2300
2301   /* kick off the operation */
2302   se = state->se;
2303   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2304   len = strata_estimator_write (se,
2305                                 buf);
2306   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2307     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2308   else
2309     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2310   ev = GNUNET_MQ_msg_extra (strata_msg,
2311                             len,
2312                             type);
2313   GNUNET_memcpy (&strata_msg[1],
2314                  buf,
2315                  len);
2316   GNUNET_free (buf);
2317   strata_msg->set_size
2318     = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
2319   GNUNET_MQ_send (op->mq,
2320                   ev);
2321   state->phase = PHASE_EXPECT_IBF;
2322   return state;
2323 }
2324
2325
2326 /**
2327  * Create a new set supporting the union operation
2328  *
2329  * We maintain one strata estimator per set and then manipulate it over the
2330  * lifetime of the set, as recreating a strata estimator would be expensive.
2331  *
2332  * @return the newly created set, NULL on error
2333  */
2334 static struct SetState *
2335 union_set_create (void)
2336 {
2337   struct SetState *set_state;
2338
2339   LOG (GNUNET_ERROR_TYPE_DEBUG,
2340        "union set created\n");
2341   set_state = GNUNET_new (struct SetState);
2342   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2343                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2344   if (NULL == set_state->se)
2345   {
2346     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2347                 "Failed to allocate strata estimator\n");
2348     GNUNET_free (set_state);
2349     return NULL;
2350   }
2351   return set_state;
2352 }
2353
2354
2355 /**
2356  * Add the element from the given element message to the set.
2357  *
2358  * @param set_state state of the set want to add to
2359  * @param ee the element to add to the set
2360  */
2361 static void
2362 union_add (struct SetState *set_state,
2363            struct ElementEntry *ee)
2364 {
2365   strata_estimator_insert (set_state->se,
2366                            get_ibf_key (&ee->element_hash));
2367 }
2368
2369
2370 /**
2371  * Remove the element given in the element message from the set.
2372  * Only marks the element as removed, so that older set operations can still exchange it.
2373  *
2374  * @param set_state state of the set to remove from
2375  * @param ee set element to remove
2376  */
2377 static void
2378 union_remove (struct SetState *set_state,
2379               struct ElementEntry *ee)
2380 {
2381   strata_estimator_remove (set_state->se,
2382                            get_ibf_key (&ee->element_hash));
2383 }
2384
2385
2386 /**
2387  * Destroy a set that supports the union operation.
2388  *
2389  * @param set_state the set to destroy
2390  */
2391 static void
2392 union_set_destroy (struct SetState *set_state)
2393 {
2394   if (NULL != set_state->se)
2395   {
2396     strata_estimator_destroy (set_state->se);
2397     set_state->se = NULL;
2398   }
2399   GNUNET_free (set_state);
2400 }
2401
2402
2403 /**
2404  * Copy union-specific set state.
2405  *
2406  * @param state source state for copying the union state
2407  * @return a copy of the union-specific set state
2408  */
2409 static struct SetState *
2410 union_copy_state (struct SetState *state)
2411 {
2412   struct SetState *new_state;
2413
2414   GNUNET_assert ( (NULL != state) &&
2415                   (NULL != state->se) );
2416   new_state = GNUNET_new (struct SetState);
2417   new_state->se = strata_estimator_dup (state->se);
2418
2419   return new_state;
2420 }
2421
2422
2423 /**
2424  * Handle case where channel went down for an operation.
2425  *
2426  * @param op operation that lost the channel
2427  */
2428 static void
2429 union_channel_death (struct Operation *op)
2430 {
2431   send_client_done (op);
2432   _GSS_operation_destroy (op,
2433                           GNUNET_YES);
2434 }
2435
2436
2437 /**
2438  * Get the table with implementing functions for
2439  * set union.
2440  *
2441  * @return the operation specific VTable
2442  */
2443 const struct SetVT *
2444 _GSS_union_vt ()
2445 {
2446   static const struct SetVT union_vt = {
2447     .create = &union_set_create,
2448     .add = &union_add,
2449     .remove = &union_remove,
2450     .destroy_set = &union_set_destroy,
2451     .evaluate = &union_evaluate,
2452     .accept = &union_accept,
2453     .cancel = &union_op_cancel,
2454     .copy_state = &union_copy_state,
2455     .channel_death = &union_channel_death
2456   };
2457
2458   return &union_vt;
2459 }