migrating set to new CADET port API - tests now fail due to CADET issues
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2015 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
33 #include <gcrypt.h>
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
37
38
39 /**
40  * Number of IBFs in a strata estimator.
41  */
42 #define SE_STRATA_COUNT 32
43
44 /**
45  * Size of the IBFs in the strata estimator.
46  */
47 #define SE_IBF_SIZE 80
48
49 /**
50  * The hash num parameter for the difference digests and strata estimators.
51  */
52 #define SE_IBF_HASH_NUM 4
53
54 /**
55  * Number of buckets that can be transmitted in one message.
56  */
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
58
59 /**
60  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61  * Choose this value so that computing the IBF is still cheaper
62  * than transmitting all values.
63  */
64 #define MAX_IBF_ORDER (20)
65
66 /**
67  * Number of buckets used in the ibf per estimated
68  * difference.
69  */
70 #define IBF_ALPHA 4
71
72
73 /**
74  * Current phase we are in for a union operation.
75  */
76 enum UnionOperationPhase
77 {
78   /**
79    * We sent the request message, and expect a strata estimator.
80    */
81   PHASE_EXPECT_SE,
82
83   /**
84    * We sent the strata estimator, and expect an IBF. This phase is entered once
85    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86    *
87    * XXX: could use better wording.
88    *
89    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
90    */
91   PHASE_EXPECT_IBF,
92
93   /**
94    * Continuation for multi part IBFs.
95    */
96   PHASE_EXPECT_IBF_CONT,
97
98   /**
99    * We are decoding an IBF.
100    */
101   PHASE_INVENTORY_ACTIVE,
102
103   /**
104    * The other peer is decoding the IBF we just sent.
105    */
106   PHASE_INVENTORY_PASSIVE,
107
108   /**
109    * The protocol is almost finished, but we still have to flush our message
110    * queue and/or expect some elements.
111    */
112   PHASE_FINISH_CLOSING,
113
114   /**
115    * In the penultimate phase,
116    * we wait until all our demands
117    * are satisfied.  Then we send a done
118    * message, and wait for another done message.*/
119   PHASE_FINISH_WAITING,
120
121   /**
122    * In the ultimate phase, we wait until
123    * our demands are satisfied and then
124    * quit (sending another DONE message). */
125   PHASE_DONE
126 };
127
128
129 /**
130  * State of an evaluate operation with another peer.
131  */
132 struct OperationState
133 {
134   /**
135    * Copy of the set's strata estimator at the time of
136    * creation of this operation.
137    */
138   struct StrataEstimator *se;
139
140   /**
141    * The IBF we currently receive.
142    */
143   struct InvertibleBloomFilter *remote_ibf;
144
145   /**
146    * The IBF with the local set's element.
147    */
148   struct InvertibleBloomFilter *local_ibf;
149
150   /**
151    * Maps IBF-Keys (specific to the current salt) to elements.
152    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
153    * Colliding IBF-Keys are linked.
154    */
155   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
156
157   /**
158    * Current state of the operation.
159    */
160   enum UnionOperationPhase phase;
161
162   /**
163    * Did we send the client that we are done?
164    */
165   int client_done_sent;
166
167   /**
168    * Number of ibf buckets already received into the @a remote_ibf.
169    */
170   unsigned int ibf_buckets_received;
171
172   /**
173    * Hashes for elements that we have demanded from the other peer.
174    */
175   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
176
177   /**
178    * Salt that we're using for sending IBFs
179    */
180   uint32_t salt_send;
181
182   /**
183    * Salt for the IBF we've received and that we're currently decoding.
184    */
185   uint32_t salt_receive;
186 };
187
188
189 /**
190  * The key entry is used to associate an ibf key with an element.
191  */
192 struct KeyEntry
193 {
194   /**
195    * IBF key for the entry, derived from the current salt.
196    */
197   struct IBF_Key ibf_key;
198
199   /**
200    * The actual element associated with the key.
201    *
202    * Only owned by the union operation if element->operation
203    * is #GNUNET_YES.
204    */
205   struct ElementEntry *element;
206 };
207
208
209 /**
210  * Used as a closure for sending elements
211  * with a specific IBF key.
212  */
213 struct SendElementClosure
214 {
215   /**
216    * The IBF key whose matching elements should be
217    * sent.
218    */
219   struct IBF_Key ibf_key;
220
221   /**
222    * Operation for which the elements
223    * should be sent.
224    */
225   struct Operation *op;
226 };
227
228
229 /**
230  * Extra state required for efficient set union.
231  */
232 struct SetState
233 {
234   /**
235    * The strata estimator is only generated once for
236    * each set.
237    * The IBF keys are derived from the element hashes with
238    * salt=0.
239    */
240   struct StrataEstimator *se;
241 };
242
243
244 /**
245  * Iterator over hash map entries, called to
246  * destroy the linked list of colliding ibf key entries.
247  *
248  * @param cls closure
249  * @param key current key code
250  * @param value value in the hash map
251  * @return #GNUNET_YES if we should continue to iterate,
252  *         #GNUNET_NO if not.
253  */
254 static int
255 destroy_key_to_element_iter (void *cls,
256                              uint32_t key,
257                              void *value)
258 {
259   struct KeyEntry *k = value;
260
261   GNUNET_assert (NULL != k);
262   if (GNUNET_YES == k->element->remote)
263   {
264     GNUNET_free (k->element);
265     k->element = NULL;
266   }
267   GNUNET_free (k);
268   return GNUNET_YES;
269 }
270
271
272 /**
273  * Destroy the union operation.  Only things specific to the union
274  * operation are destroyed.
275  *
276  * @param op union operation to destroy
277  */
278 static void
279 union_op_cancel (struct Operation *op)
280 {
281   LOG (GNUNET_ERROR_TYPE_DEBUG,
282        "destroying union op\n");
283   /* check if the op was canceled twice */
284   GNUNET_assert (NULL != op->state);
285   if (NULL != op->state->remote_ibf)
286   {
287     ibf_destroy (op->state->remote_ibf);
288     op->state->remote_ibf = NULL;
289   }
290   if (NULL != op->state->demanded_hashes)
291   {
292     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
293     op->state->demanded_hashes = NULL;
294   }
295   if (NULL != op->state->local_ibf)
296   {
297     ibf_destroy (op->state->local_ibf);
298     op->state->local_ibf = NULL;
299   }
300   if (NULL != op->state->se)
301   {
302     strata_estimator_destroy (op->state->se);
303     op->state->se = NULL;
304   }
305   if (NULL != op->state->key_to_element)
306   {
307     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
308                                              &destroy_key_to_element_iter,
309                                              NULL);
310     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
311     op->state->key_to_element = NULL;
312   }
313   GNUNET_free (op->state);
314   op->state = NULL;
315   LOG (GNUNET_ERROR_TYPE_DEBUG,
316        "destroying union op done\n");
317 }
318
319
320 /**
321  * Inform the client that the union operation has failed,
322  * and proceed to destroy the evaluate operation.
323  *
324  * @param op the union operation to fail
325  */
326 static void
327 fail_union_operation (struct Operation *op)
328 {
329   struct GNUNET_MQ_Envelope *ev;
330   struct GNUNET_SET_ResultMessage *msg;
331
332   LOG (GNUNET_ERROR_TYPE_ERROR,
333        "union operation failed\n");
334   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
335   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
336   msg->request_id = htonl (op->spec->client_request_id);
337   msg->element_type = htons (0);
338   GNUNET_MQ_send (op->spec->set->client_mq, ev);
339   _GSS_operation_destroy (op, GNUNET_YES);
340 }
341
342
343 /**
344  * Derive the IBF key from a hash code and
345  * a salt.
346  *
347  * @param src the hash code
348  * @return the derived IBF key
349  */
350 static struct IBF_Key
351 get_ibf_key (const struct GNUNET_HashCode *src)
352 {
353   struct IBF_Key key;
354   uint16_t salt = 0;
355
356   GNUNET_CRYPTO_kdf (&key, sizeof (key),
357                      src, sizeof *src,
358                      &salt, sizeof (salt),
359                      NULL, 0);
360   return key;
361 }
362
363
364 /**
365  * Iterator over the mapping from IBF keys to element entries.  Checks if we
366  * have an element with a given GNUNET_HashCode.
367  *
368  * @param cls closure
369  * @param key current key code
370  * @param value value in the hash map
371  * @return #GNUNET_YES if we should search further,
372  *         #GNUNET_NO if we've found the element.
373  */
374 static int
375 op_has_element_iterator (void *cls,
376                          uint32_t key,
377                          void *value)
378 {
379   struct GNUNET_HashCode *element_hash = cls;
380   struct KeyEntry *k = value;
381
382   GNUNET_assert (NULL != k);
383   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
384                                    element_hash))
385     return GNUNET_NO;
386   return GNUNET_YES;
387 }
388
389
390 /**
391  * Determine whether the given element is already in the operation's element
392  * set.
393  *
394  * @param op operation that should be tested for 'element_hash'
395  * @param element_hash hash of the element to look for
396  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
397  */
398 static int
399 op_has_element (struct Operation *op,
400                 const struct GNUNET_HashCode *element_hash)
401 {
402   int ret;
403   struct IBF_Key ibf_key;
404
405   ibf_key = get_ibf_key (element_hash);
406   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
407                                                       (uint32_t) ibf_key.key_val,
408                                                       op_has_element_iterator,
409                                                       (void *) element_hash);
410
411   /* was the iteration aborted because we found the element? */
412   if (GNUNET_SYSERR == ret)
413     return GNUNET_YES;
414   return GNUNET_NO;
415 }
416
417
418 /**
419  * Insert an element into the union operation's
420  * key-to-element mapping. Takes ownership of 'ee'.
421  * Note that this does not insert the element in the set,
422  * only in the operation's key-element mapping.
423  * This is done to speed up re-tried operations, if some elements
424  * were transmitted, and then the IBF fails to decode.
425  *
426  * XXX: clarify ownership, doesn't sound right.
427  *
428  * @param op the union operation
429  * @param ee the element entry
430  */
431 static void
432 op_register_element (struct Operation *op,
433                      struct ElementEntry *ee)
434 {
435   struct IBF_Key ibf_key;
436   struct KeyEntry *k;
437
438   ibf_key = get_ibf_key (&ee->element_hash);
439   k = GNUNET_new (struct KeyEntry);
440   k->element = ee;
441   k->ibf_key = ibf_key;
442   GNUNET_assert (GNUNET_OK ==
443                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
444                                                       (uint32_t) ibf_key.key_val,
445                                                       k,
446                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
447 }
448
449
450 static void
451 salt_key (const struct IBF_Key *k_in,
452           uint32_t salt,
453           struct IBF_Key *k_out)
454 {
455   int s = salt % 64;
456   uint64_t x = k_in->key_val;
457   /* rotate ibf key */
458   x = (x >> s) | (x << (64 - s));
459   k_out->key_val = x;
460 }
461
462
463 static void
464 unsalt_key (const struct IBF_Key *k_in,
465             uint32_t salt,
466             struct IBF_Key *k_out)
467 {
468   int s = salt % 64;
469   uint64_t x = k_in->key_val;
470   x = (x << s) | (x >> (64 - s));
471   k_out->key_val = x;
472 }
473
474
475 /**
476  * Insert a key into an ibf.
477  *
478  * @param cls the ibf
479  * @param key unused
480  * @param value the key entry to get the key from
481  */
482 static int
483 prepare_ibf_iterator (void *cls,
484                       uint32_t key,
485                       void *value)
486 {
487   struct Operation *op = cls;
488   struct KeyEntry *ke = value;
489   struct IBF_Key salted_key;
490
491   LOG (GNUNET_ERROR_TYPE_DEBUG,
492        "[OP %x] inserting %lx (hash %s) into ibf\n",
493        (void *) op,
494        (unsigned long) ke->ibf_key.key_val,
495        GNUNET_h2s (&ke->element->element_hash));
496   salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
497   ibf_insert (op->state->local_ibf, salted_key);
498   return GNUNET_YES;
499 }
500
501
502 /**
503  * Iterator for initializing the
504  * key-to-element mapping of a union operation
505  *
506  * @param cls the union operation `struct Operation *`
507  * @param key unused
508  * @param value the `struct ElementEntry *` to insert
509  *        into the key-to-element mapping
510  * @return #GNUNET_YES (to continue iterating)
511  */
512 static int
513 init_key_to_element_iterator (void *cls,
514                               const struct GNUNET_HashCode *key,
515                               void *value)
516 {
517   struct Operation *op = cls;
518   struct ElementEntry *ee = value;
519
520   /* make sure that the element belongs to the set at the time
521    * of creating the operation */
522   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
523     return GNUNET_YES;
524
525   GNUNET_assert (GNUNET_NO == ee->remote);
526
527   op_register_element (op, ee);
528   return GNUNET_YES;
529 }
530
531
532 /**
533  * Create an ibf with the operation's elements
534  * of the specified size
535  *
536  * @param op the union operation
537  * @param size size of the ibf to create
538  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
539  */
540 static int
541 prepare_ibf (struct Operation *op,
542              uint32_t size)
543 {
544   if (NULL == op->state->key_to_element)
545   {
546     unsigned int len;
547
548     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
549     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
550     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
551                                            init_key_to_element_iterator, op);
552   }
553   if (NULL != op->state->local_ibf)
554     ibf_destroy (op->state->local_ibf);
555   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
556   if (NULL == op->state->local_ibf)
557   {
558     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
559                 "Failed to allocate local IBF\n");
560     return GNUNET_SYSERR;
561   }
562   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
563                                            &prepare_ibf_iterator,
564                                            op);
565   return GNUNET_OK;
566 }
567
568
569 /**
570  * Send an ibf of appropriate size.
571  *
572  * Fragments the IBF into multiple messages if necessary.
573  *
574  * @param op the union operation
575  * @param ibf_order order of the ibf to send, size=2^order
576  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
577  */
578 static int
579 send_ibf (struct Operation *op,
580           uint16_t ibf_order)
581 {
582   unsigned int buckets_sent = 0;
583   struct InvertibleBloomFilter *ibf;
584
585   if (GNUNET_OK !=
586       prepare_ibf (op, 1<<ibf_order))
587   {
588     /* allocation failed */
589     return GNUNET_SYSERR;
590   }
591
592   LOG (GNUNET_ERROR_TYPE_DEBUG,
593        "sending ibf of size %u\n",
594        1<<ibf_order);
595
596   {
597     char name[64] = { 0 };
598     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
599     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
600   }
601
602   ibf = op->state->local_ibf;
603
604   while (buckets_sent < (1 << ibf_order))
605   {
606     unsigned int buckets_in_message;
607     struct GNUNET_MQ_Envelope *ev;
608     struct IBFMessage *msg;
609
610     buckets_in_message = (1 << ibf_order) - buckets_sent;
611     /* limit to maximum */
612     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
613       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
614
615     ev = GNUNET_MQ_msg_extra (msg,
616                               buckets_in_message * IBF_BUCKET_SIZE,
617                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
618     msg->reserved1 = 0;
619     msg->reserved2 = 0;
620     msg->order = ibf_order;
621     msg->offset = htonl (buckets_sent);
622     msg->salt = htonl (op->state->salt_send);
623     ibf_write_slice (ibf, buckets_sent,
624                      buckets_in_message, &msg[1]);
625     buckets_sent += buckets_in_message;
626     LOG (GNUNET_ERROR_TYPE_DEBUG,
627          "ibf chunk size %u, %u/%u sent\n",
628          buckets_in_message,
629          buckets_sent,
630          1<<ibf_order);
631     GNUNET_MQ_send (op->mq, ev);
632   }
633
634   /* The other peer must decode the IBF, so
635    * we're passive. */
636   op->state->phase = PHASE_INVENTORY_PASSIVE;
637   return GNUNET_OK;
638 }
639
640
641 /**
642  * Send a strata estimator to the remote peer.
643  *
644  * @param op the union operation with the remote peer
645  */
646 static void
647 send_strata_estimator (struct Operation *op)
648 {
649   const struct StrataEstimator *se = op->state->se;
650   struct GNUNET_MQ_Envelope *ev;
651   struct GNUNET_MessageHeader *strata_msg;
652   char *buf;
653   size_t len;
654   uint16_t type;
655
656   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
657   len = strata_estimator_write (op->state->se,
658                                 buf);
659   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
660     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
661   else
662     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
663   ev = GNUNET_MQ_msg_header_extra (strata_msg,
664                                    len,
665                                    type);
666   GNUNET_memcpy (&strata_msg[1],
667           buf,
668           len);
669   GNUNET_free (buf);
670   GNUNET_MQ_send (op->mq,
671                   ev);
672   op->state->phase = PHASE_EXPECT_IBF;
673   LOG (GNUNET_ERROR_TYPE_DEBUG,
674        "sent SE, expecting IBF\n");
675 }
676
677
678 /**
679  * Compute the necessary order of an ibf
680  * from the size of the symmetric set difference.
681  *
682  * @param diff the difference
683  * @return the required size of the ibf
684  */
685 static unsigned int
686 get_order_from_difference (unsigned int diff)
687 {
688   unsigned int ibf_order;
689
690   ibf_order = 2;
691   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
692           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
693     ibf_order++;
694   if (ibf_order > MAX_IBF_ORDER)
695     ibf_order = MAX_IBF_ORDER;
696   return ibf_order;
697 }
698
699
700 /**
701  * Handle a strata estimator from a remote peer
702  *
703  * @param cls the union operation
704  * @param mh the message
705  * @param is_compressed #GNUNET_YES if the estimator is compressed
706  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
707  *         #GNUNET_OK otherwise
708  */
709 static int
710 handle_p2p_strata_estimator (void *cls,
711                              const struct GNUNET_MessageHeader *mh,
712                              int is_compressed)
713 {
714   struct Operation *op = cls;
715   struct StrataEstimator *remote_se;
716   int diff;
717   size_t len;
718
719   GNUNET_STATISTICS_update (_GSS_statistics,
720                             "# bytes of SE received",
721                             ntohs (mh->size),
722                             GNUNET_NO);
723
724   if (op->state->phase != PHASE_EXPECT_SE)
725   {
726     fail_union_operation (op);
727     GNUNET_break (0);
728     return GNUNET_SYSERR;
729   }
730   len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
731   if ( (GNUNET_NO == is_compressed) &&
732        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
733   {
734     fail_union_operation (op);
735     GNUNET_break (0);
736     return GNUNET_SYSERR;
737   }
738   remote_se = strata_estimator_create (SE_STRATA_COUNT,
739                                        SE_IBF_SIZE,
740                                        SE_IBF_HASH_NUM);
741   if (NULL == remote_se)
742   {
743     /* insufficient resources, fail */
744     fail_union_operation (op);
745     return GNUNET_SYSERR;
746   }
747   if (GNUNET_OK !=
748       strata_estimator_read (&mh[1],
749                              len,
750                              is_compressed,
751                              remote_se))
752   {
753     /* decompression failed */
754     fail_union_operation (op);
755     strata_estimator_destroy (remote_se);
756     return GNUNET_SYSERR;
757   }
758   GNUNET_assert (NULL != op->state->se);
759   diff = strata_estimator_difference (remote_se,
760                                       op->state->se);
761   strata_estimator_destroy (remote_se);
762   strata_estimator_destroy (op->state->se);
763   op->state->se = NULL;
764   LOG (GNUNET_ERROR_TYPE_DEBUG,
765        "got se diff=%d, using ibf size %d\n",
766        diff,
767        1<<get_order_from_difference (diff));
768   if (GNUNET_OK !=
769       send_ibf (op,
770                 get_order_from_difference (diff)))
771   {
772     /* Internal error, best we can do is shut the connection */
773     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
774                 "Failed to send IBF, closing connection\n");
775     fail_union_operation (op);
776     return GNUNET_SYSERR;
777   }
778   return GNUNET_OK;
779 }
780
781
782 /**
783  * Iterator to send elements to a remote peer
784  *
785  * @param cls closure with the element key and the union operation
786  * @param key ignored
787  * @param value the key entry
788  */
789 static int
790 send_offers_iterator (void *cls,
791                       uint32_t key,
792                       void *value)
793 {
794   struct SendElementClosure *sec = cls;
795   struct Operation *op = sec->op;
796   struct KeyEntry *ke = value;
797   struct GNUNET_MQ_Envelope *ev;
798   struct GNUNET_MessageHeader *mh;
799
800   /* Detect 32-bit key collision for the 64-bit IBF keys. */
801   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
802     return GNUNET_YES;
803
804   ev = GNUNET_MQ_msg_header_extra (mh,
805                                    sizeof (struct GNUNET_HashCode),
806                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
807
808   GNUNET_assert (NULL != ev);
809   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
810   LOG (GNUNET_ERROR_TYPE_DEBUG,
811        "[OP %x] sending element offer (%s) to peer\n",
812        (void *) op,
813        GNUNET_h2s (&ke->element->element_hash));
814   GNUNET_MQ_send (op->mq, ev);
815   return GNUNET_YES;
816 }
817
818
819 /**
820  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
821  *
822  * @param op union operation
823  * @param ibf_key IBF key of interest
824  */
825 static void
826 send_offers_for_key (struct Operation *op,
827                      struct IBF_Key ibf_key)
828 {
829   struct SendElementClosure send_cls;
830
831   send_cls.ibf_key = ibf_key;
832   send_cls.op = op;
833   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
834                                                        (uint32_t) ibf_key.key_val,
835                                                        &send_offers_iterator,
836                                                        &send_cls);
837 }
838
839
840 /**
841  * Decode which elements are missing on each side, and
842  * send the appropriate offers and inquiries.
843  *
844  * @param op union operation
845  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
846  */
847 static int
848 decode_and_send (struct Operation *op)
849 {
850   struct IBF_Key key;
851   struct IBF_Key last_key;
852   int side;
853   unsigned int num_decoded;
854   struct InvertibleBloomFilter *diff_ibf;
855
856   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
857
858   if (GNUNET_OK !=
859       prepare_ibf (op, op->state->remote_ibf->size))
860   {
861     GNUNET_break (0);
862     /* allocation failed */
863     return GNUNET_SYSERR;
864   }
865   diff_ibf = ibf_dup (op->state->local_ibf);
866   ibf_subtract (diff_ibf, op->state->remote_ibf);
867
868   ibf_destroy (op->state->remote_ibf);
869   op->state->remote_ibf = NULL;
870
871   LOG (GNUNET_ERROR_TYPE_DEBUG,
872        "decoding IBF (size=%u)\n",
873        diff_ibf->size);
874
875   num_decoded = 0;
876   last_key.key_val = 0;
877
878   while (1)
879   {
880     int res;
881     int cycle_detected = GNUNET_NO;
882
883     last_key = key;
884
885     res = ibf_decode (diff_ibf, &side, &key);
886     if (res == GNUNET_OK)
887     {
888       LOG (GNUNET_ERROR_TYPE_DEBUG,
889            "decoded ibf key %lx\n",
890            (unsigned long) key.key_val);
891       num_decoded += 1;
892       if ( (num_decoded > diff_ibf->size) ||
893            (num_decoded > 1 && last_key.key_val == key.key_val) )
894       {
895         LOG (GNUNET_ERROR_TYPE_DEBUG,
896              "detected cyclic ibf (decoded %u/%u)\n",
897              num_decoded,
898              diff_ibf->size);
899         cycle_detected = GNUNET_YES;
900       }
901     }
902     if ( (GNUNET_SYSERR == res) ||
903          (GNUNET_YES == cycle_detected) )
904     {
905       int next_order;
906       next_order = 0;
907       while (1<<next_order < diff_ibf->size)
908         next_order++;
909       next_order++;
910       if (next_order <= MAX_IBF_ORDER)
911       {
912         LOG (GNUNET_ERROR_TYPE_DEBUG,
913              "decoding failed, sending larger ibf (size %u)\n",
914              1<<next_order);
915         GNUNET_STATISTICS_update (_GSS_statistics,
916                                   "# of IBF retries",
917                                   1,
918                                   GNUNET_NO);
919         op->state->salt_send++;
920         if (GNUNET_OK !=
921             send_ibf (op, next_order))
922         {
923           /* Internal error, best we can do is shut the connection */
924           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
925                       "Failed to send IBF, closing connection\n");
926           fail_union_operation (op);
927           ibf_destroy (diff_ibf);
928           return GNUNET_SYSERR;
929         }
930       }
931       else
932       {
933         GNUNET_STATISTICS_update (_GSS_statistics,
934                                   "# of failed union operations (too large)",
935                                   1,
936                                   GNUNET_NO);
937         // XXX: Send the whole set, element-by-element
938         LOG (GNUNET_ERROR_TYPE_ERROR,
939              "set union failed: reached ibf limit\n");
940         fail_union_operation (op);
941         ibf_destroy (diff_ibf);
942         return GNUNET_SYSERR;
943       }
944       break;
945     }
946     if (GNUNET_NO == res)
947     {
948       struct GNUNET_MQ_Envelope *ev;
949
950       LOG (GNUNET_ERROR_TYPE_DEBUG,
951            "transmitted all values, sending DONE\n");
952       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
953       GNUNET_MQ_send (op->mq, ev);
954       /* We now wait until we get a DONE message back
955        * and then wait for our MQ to be flushed and all our
956        * demands be delivered. */
957       break;
958     }
959     if (1 == side)
960     {
961       struct IBF_Key unsalted_key;
962       unsalt_key (&key, op->state->salt_receive, &unsalted_key);
963       send_offers_for_key (op, unsalted_key);
964     }
965     else if (-1 == side)
966     {
967       struct GNUNET_MQ_Envelope *ev;
968       struct InquiryMessage *msg;
969
970       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
971        * the effort additional complexity. */
972       ev = GNUNET_MQ_msg_extra (msg,
973                                 sizeof (struct IBF_Key),
974                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
975       msg->salt = htonl (op->state->salt_receive);
976       GNUNET_memcpy (&msg[1],
977               &key,
978               sizeof (struct IBF_Key));
979       LOG (GNUNET_ERROR_TYPE_DEBUG,
980            "sending element inquiry for IBF key %lx\n",
981            (unsigned long) key.key_val);
982       GNUNET_MQ_send (op->mq, ev);
983     }
984     else
985     {
986       GNUNET_assert (0);
987     }
988   }
989   ibf_destroy (diff_ibf);
990   return GNUNET_OK;
991 }
992
993
994 /**
995  * Handle an IBF message from a remote peer.
996  *
997  * Reassemble the IBF from multiple pieces, and
998  * process the whole IBF once possible.
999  *
1000  * @param cls the union operation
1001  * @param mh the header of the message
1002  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1003  *         #GNUNET_OK otherwise
1004  */
1005 static int
1006 handle_p2p_ibf (void *cls,
1007                 const struct GNUNET_MessageHeader *mh)
1008 {
1009   struct Operation *op = cls;
1010   const struct IBFMessage *msg;
1011   unsigned int buckets_in_message;
1012
1013   if (ntohs (mh->size) < sizeof (struct IBFMessage))
1014   {
1015     GNUNET_break_op (0);
1016     fail_union_operation (op);
1017     return GNUNET_SYSERR;
1018   }
1019   msg = (const struct IBFMessage *) mh;
1020   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1021        (op->state->phase == PHASE_EXPECT_IBF) )
1022   {
1023     op->state->phase = PHASE_EXPECT_IBF_CONT;
1024     GNUNET_assert (NULL == op->state->remote_ibf);
1025     LOG (GNUNET_ERROR_TYPE_DEBUG,
1026          "Creating new ibf of size %u\n",
1027          1 << msg->order);
1028     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1029     op->state->salt_receive = ntohl (msg->salt);
1030     LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1031     if (NULL == op->state->remote_ibf)
1032     {
1033       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1034                   "Failed to parse remote IBF, closing connection\n");
1035       fail_union_operation (op);
1036       return GNUNET_SYSERR;
1037     }
1038     op->state->ibf_buckets_received = 0;
1039     if (0 != ntohl (msg->offset))
1040     {
1041       GNUNET_break_op (0);
1042       fail_union_operation (op);
1043       return GNUNET_SYSERR;
1044     }
1045   }
1046   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1047   {
1048     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1049     {
1050       GNUNET_break_op (0);
1051       fail_union_operation (op);
1052       return GNUNET_SYSERR;
1053     }
1054     if (1<<msg->order != op->state->remote_ibf->size)
1055     {
1056       GNUNET_break_op (0);
1057       fail_union_operation (op);
1058       return GNUNET_SYSERR;
1059     }
1060     if (ntohl (msg->salt) != op->state->salt_receive)
1061     {
1062       GNUNET_break_op (0);
1063       fail_union_operation (op);
1064       return GNUNET_SYSERR;
1065     }
1066   }
1067   else
1068   {
1069     GNUNET_assert (0);
1070   }
1071
1072   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1073
1074   if (0 == buckets_in_message)
1075   {
1076     GNUNET_break_op (0);
1077     fail_union_operation (op);
1078     return GNUNET_SYSERR;
1079   }
1080
1081   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1082   {
1083     GNUNET_break_op (0);
1084     fail_union_operation (op);
1085     return GNUNET_SYSERR;
1086   }
1087
1088   GNUNET_assert (NULL != op->state->remote_ibf);
1089
1090   ibf_read_slice (&msg[1],
1091                   op->state->ibf_buckets_received,
1092                   buckets_in_message,
1093                   op->state->remote_ibf);
1094   op->state->ibf_buckets_received += buckets_in_message;
1095
1096   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1097   {
1098     LOG (GNUNET_ERROR_TYPE_DEBUG,
1099          "received full ibf\n");
1100     op->state->phase = PHASE_INVENTORY_ACTIVE;
1101     if (GNUNET_OK !=
1102         decode_and_send (op))
1103     {
1104       /* Internal error, best we can do is shut down */
1105       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1106                   "Failed to decode IBF, closing connection\n");
1107       return GNUNET_SYSERR;
1108     }
1109   }
1110   return GNUNET_OK;
1111 }
1112
1113
1114 /**
1115  * Send a result message to the client indicating
1116  * that there is a new element.
1117  *
1118  * @param op union operation
1119  * @param element element to send
1120  * @param status status to send with the new element
1121  */
1122 static void
1123 send_client_element (struct Operation *op,
1124                      struct GNUNET_SET_Element *element,
1125                      int status)
1126 {
1127   struct GNUNET_MQ_Envelope *ev;
1128   struct GNUNET_SET_ResultMessage *rm;
1129
1130   LOG (GNUNET_ERROR_TYPE_DEBUG,
1131        "sending element (size %u) to client\n",
1132        element->size);
1133   GNUNET_assert (0 != op->spec->client_request_id);
1134   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1135   if (NULL == ev)
1136   {
1137     GNUNET_MQ_discard (ev);
1138     GNUNET_break (0);
1139     return;
1140   }
1141   rm->result_status = htons (status);
1142   rm->request_id = htonl (op->spec->client_request_id);
1143   rm->element_type = element->element_type;
1144   GNUNET_memcpy (&rm[1], element->data, element->size);
1145   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1146 }
1147
1148
1149 /**
1150  * Signal to the client that the operation has finished and
1151  * destroy the operation.
1152  *
1153  * @param cls operation to destroy
1154  */
1155 static void
1156 send_done_and_destroy (void *cls)
1157 {
1158   struct Operation *op = cls;
1159   struct GNUNET_MQ_Envelope *ev;
1160   struct GNUNET_SET_ResultMessage *rm;
1161
1162   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1163   rm->request_id = htonl (op->spec->client_request_id);
1164   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1165   rm->element_type = htons (0);
1166   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1167   /* Will also call the union-specific cancel function. */
1168   _GSS_operation_destroy (op, GNUNET_YES);
1169 }
1170
1171
1172 static void
1173 maybe_finish (struct Operation *op)
1174 {
1175   unsigned int num_demanded;
1176
1177   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1178
1179   if (PHASE_FINISH_WAITING == op->state->phase)
1180   {
1181     LOG (GNUNET_ERROR_TYPE_DEBUG,
1182          "In PHASE_FINISH_WAITING, pending %u demands\n",
1183          num_demanded);
1184     if (0 == num_demanded)
1185     {
1186       struct GNUNET_MQ_Envelope *ev;
1187
1188       op->state->phase = PHASE_DONE;
1189       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1190       GNUNET_MQ_send (op->mq, ev);
1191
1192       /* We now wait until the other peer closes the channel
1193        * after it got all elements from us. */
1194     }
1195   }
1196   if (PHASE_FINISH_CLOSING == op->state->phase)
1197   {
1198     LOG (GNUNET_ERROR_TYPE_DEBUG,
1199          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1200          num_demanded);
1201     if (0 == num_demanded)
1202     {
1203       op->state->phase = PHASE_DONE;
1204       send_done_and_destroy (op);
1205     }
1206   }
1207 }
1208
1209
1210 /**
1211  * Handle an element message from a remote peer.
1212  *
1213  * @param cls the union operation
1214  * @param mh the message
1215  */
1216 static void
1217 handle_p2p_elements (void *cls,
1218                      const struct GNUNET_MessageHeader *mh)
1219 {
1220   struct Operation *op = cls;
1221   struct ElementEntry *ee;
1222   const struct GNUNET_SET_ElementMessage *emsg;
1223   uint16_t element_size;
1224
1225   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1226   {
1227     GNUNET_break_op (0);
1228     fail_union_operation (op);
1229     return;
1230   }
1231   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1232   {
1233     GNUNET_break_op (0);
1234     fail_union_operation (op);
1235     return;
1236   }
1237
1238   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1239
1240   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1241   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1242   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1243   ee->element.size = element_size;
1244   ee->element.data = &ee[1];
1245   ee->element.element_type = ntohs (emsg->element_type);
1246   ee->remote = GNUNET_YES;
1247   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1248
1249   if (GNUNET_NO ==
1250       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1251                                             &ee->element_hash,
1252                                             NULL))
1253   {
1254     /* We got something we didn't demand, since it's not in our map. */
1255     GNUNET_break_op (0);
1256     GNUNET_free (ee);
1257     fail_union_operation (op);
1258     return;
1259   }
1260
1261   LOG (GNUNET_ERROR_TYPE_DEBUG,
1262        "Got element (size %u, hash %s) from peer\n",
1263        (unsigned int) element_size,
1264        GNUNET_h2s (&ee->element_hash));
1265
1266   GNUNET_STATISTICS_update (_GSS_statistics,
1267                             "# received elements",
1268                             1,
1269                             GNUNET_NO);
1270   GNUNET_STATISTICS_update (_GSS_statistics,
1271                             "# exchanged elements",
1272                             1,
1273                             GNUNET_NO);
1274
1275   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1276   {
1277     /* Got repeated element.  Should not happen since
1278      * we track demands. */
1279     GNUNET_STATISTICS_update (_GSS_statistics,
1280                               "# repeated elements",
1281                               1,
1282                               GNUNET_NO);
1283     GNUNET_free (ee);
1284   }
1285   else
1286   {
1287     LOG (GNUNET_ERROR_TYPE_DEBUG,
1288          "Registering new element from remote peer\n");
1289     op_register_element (op, ee);
1290     /* only send results immediately if the client wants it */
1291     switch (op->spec->result_mode)
1292     {
1293       case GNUNET_SET_RESULT_ADDED:
1294         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1295         break;
1296       case GNUNET_SET_RESULT_SYMMETRIC:
1297         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1298         break;
1299       default:
1300         /* Result mode not supported, should have been caught earlier. */
1301         GNUNET_break (0);
1302         break;
1303     }
1304   }
1305
1306   maybe_finish (op);
1307 }
1308
1309
1310 /**
1311  * Send offers (for GNUNET_Hash-es) in response
1312  * to inquiries (for IBF_Key-s).
1313  *
1314  * @param cls the union operation
1315  * @param mh the message
1316  */
1317 static void
1318 handle_p2p_inquiry (void *cls,
1319                     const struct GNUNET_MessageHeader *mh)
1320 {
1321   struct Operation *op = cls;
1322   const struct IBF_Key *ibf_key;
1323   unsigned int num_keys;
1324   struct InquiryMessage *msg;
1325
1326   /* look up elements and send them */
1327   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1328   {
1329     GNUNET_break_op (0);
1330     fail_union_operation (op);
1331     return;
1332   }
1333   num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1334       / sizeof (struct IBF_Key);
1335   if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1336       != num_keys * sizeof (struct IBF_Key))
1337   {
1338     GNUNET_break_op (0);
1339     fail_union_operation (op);
1340     return;
1341   }
1342
1343   msg = (struct InquiryMessage *) mh;
1344
1345   ibf_key = (const struct IBF_Key *) &msg[1];
1346   while (0 != num_keys--)
1347   {
1348     struct IBF_Key unsalted_key;
1349     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1350     send_offers_for_key (op, unsalted_key);
1351     ibf_key++;
1352   }
1353 }
1354
1355
1356 /**
1357  * FIXME
1358  */
1359 static void
1360 handle_p2p_demand (void *cls,
1361                    const struct GNUNET_MessageHeader *mh)
1362 {
1363   struct Operation *op = cls;
1364   struct ElementEntry *ee;
1365   struct GNUNET_SET_ElementMessage *emsg;
1366   const struct GNUNET_HashCode *hash;
1367   unsigned int num_hashes;
1368   struct GNUNET_MQ_Envelope *ev;
1369
1370   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1371     / sizeof (struct GNUNET_HashCode);
1372   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1373       != num_hashes * sizeof (struct GNUNET_HashCode))
1374   {
1375     GNUNET_break_op (0);
1376     fail_union_operation (op);
1377     return;
1378   }
1379
1380   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1381        num_hashes > 0;
1382        hash++, num_hashes--)
1383   {
1384     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1385     if (NULL == ee)
1386     {
1387       /* Demand for non-existing element. */
1388       GNUNET_break_op (0);
1389       fail_union_operation (op);
1390       return;
1391     }
1392     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1393     {
1394       /* Probably confused lazily copied sets. */
1395       GNUNET_break_op (0);
1396       fail_union_operation (op);
1397       return;
1398     }
1399     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1400     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1401     emsg->reserved = htons (0);
1402     emsg->element_type = htons (ee->element.element_type);
1403     LOG (GNUNET_ERROR_TYPE_DEBUG,
1404          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1405          (void *) op,
1406          (unsigned int) ee->element.size,
1407          GNUNET_h2s (&ee->element_hash));
1408     GNUNET_MQ_send (op->mq, ev);
1409     GNUNET_STATISTICS_update (_GSS_statistics,
1410                               "# exchanged elements",
1411                               1,
1412                               GNUNET_NO);
1413
1414     switch (op->spec->result_mode)
1415     {
1416       case GNUNET_SET_RESULT_ADDED:
1417         /* Nothing to do. */
1418         break;
1419       case GNUNET_SET_RESULT_SYMMETRIC:
1420         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1421         break;
1422       default:
1423         /* Result mode not supported, should have been caught earlier. */
1424         GNUNET_break (0);
1425         break;
1426     }
1427   }
1428 }
1429
1430
1431 /**
1432  * Handle offers (of GNUNET_HashCode-s) and
1433  * respond with demands (of GNUNET_HashCode-s).
1434  *
1435  * @param cls the union operation
1436  * @param mh the message
1437  */
1438 static void
1439 handle_p2p_offer (void *cls,
1440                     const struct GNUNET_MessageHeader *mh)
1441 {
1442   struct Operation *op = cls;
1443   const struct GNUNET_HashCode *hash;
1444   unsigned int num_hashes;
1445
1446   /* look up elements and send them */
1447   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1448        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1449   {
1450     GNUNET_break_op (0);
1451     fail_union_operation (op);
1452     return;
1453   }
1454   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1455     / sizeof (struct GNUNET_HashCode);
1456   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1457       != num_hashes * sizeof (struct GNUNET_HashCode))
1458   {
1459     GNUNET_break_op (0);
1460     fail_union_operation (op);
1461     return;
1462   }
1463
1464   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1465        num_hashes > 0;
1466        hash++, num_hashes--)
1467   {
1468     struct ElementEntry *ee;
1469     struct GNUNET_MessageHeader *demands;
1470     struct GNUNET_MQ_Envelope *ev;
1471
1472     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1473                                             hash);
1474     if (NULL != ee)
1475       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1476         continue;
1477
1478     if (GNUNET_YES ==
1479         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1480                                                 hash))
1481     {
1482       LOG (GNUNET_ERROR_TYPE_DEBUG,
1483            "Skipped sending duplicate demand\n");
1484       continue;
1485     }
1486
1487     GNUNET_assert (GNUNET_OK ==
1488                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1489                                                       hash,
1490                                                       NULL,
1491                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1492
1493     LOG (GNUNET_ERROR_TYPE_DEBUG,
1494          "[OP %x] Requesting element (hash %s)\n",
1495          (void *) op, GNUNET_h2s (hash));
1496     ev = GNUNET_MQ_msg_header_extra (demands,
1497                                      sizeof (struct GNUNET_HashCode),
1498                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1499     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1500     GNUNET_MQ_send (op->mq, ev);
1501   }
1502 }
1503
1504
1505 /**
1506  * Handle a done message from a remote peer
1507  *
1508  * @param cls the union operation
1509  * @param mh the message
1510  */
1511 static void
1512 handle_p2p_done (void *cls,
1513                  const struct GNUNET_MessageHeader *mh)
1514 {
1515   struct Operation *op = cls;
1516
1517   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1518   {
1519     /* We got all requests, but still have to send our elements in response. */
1520
1521     op->state->phase = PHASE_FINISH_WAITING;
1522
1523     LOG (GNUNET_ERROR_TYPE_DEBUG,
1524          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1525     /* The active peer is done sending offers
1526      * and inquiries.  This means that all
1527      * our responses to that (demands and offers)
1528      * must be in flight (queued or in mesh).
1529      *
1530      * We should notify the active peer once
1531      * all our demands are satisfied, so that the active
1532      * peer can quit if we gave him everything.
1533      */
1534     maybe_finish (op);
1535     return;
1536   }
1537   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1538   {
1539     LOG (GNUNET_ERROR_TYPE_DEBUG,
1540          "got DONE (as active partner), waiting to finish\n");
1541     /* All demands of the other peer are satisfied,
1542      * and we processed all offers, thus we know
1543      * exactly what our demands must be.
1544      *
1545      * We'll close the channel
1546      * to the other peer once our demands are met.
1547      */
1548     op->state->phase = PHASE_FINISH_CLOSING;
1549     maybe_finish (op);
1550     return;
1551   }
1552   GNUNET_break_op (0);
1553   fail_union_operation (op);
1554 }
1555
1556
1557 /**
1558  * Initiate operation to evaluate a set union with a remote peer.
1559  *
1560  * @param op operation to perform (to be initialized)
1561  * @param opaque_context message to be transmitted to the listener
1562  *        to convince him to accept, may be NULL
1563  */
1564 static void
1565 union_evaluate (struct Operation *op,
1566                 const struct GNUNET_MessageHeader *opaque_context)
1567 {
1568   struct GNUNET_MQ_Envelope *ev;
1569   struct OperationRequestMessage *msg;
1570
1571   GNUNET_assert (NULL == op->state);
1572   op->state = GNUNET_new (struct OperationState);
1573   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1574   /* copy the current generation's strata estimator for this operation */
1575   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1576   /* we started the operation, thus we have to send the operation request */
1577   op->state->phase = PHASE_EXPECT_SE;
1578   op->state->salt_receive = op->state->salt_send = 42;
1579   LOG (GNUNET_ERROR_TYPE_DEBUG,
1580        "Initiating union operation evaluation\n");
1581   GNUNET_STATISTICS_update (_GSS_statistics,
1582                             "# of total union operations",
1583                             1,
1584                             GNUNET_NO);
1585   GNUNET_STATISTICS_update (_GSS_statistics,
1586                             "# of initiated union operations",
1587                             1,
1588                             GNUNET_NO);
1589   ev = GNUNET_MQ_msg_nested_mh (msg,
1590                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1591                                 opaque_context);
1592   if (NULL == ev)
1593   {
1594     /* the context message is too large */
1595     GNUNET_break (0);
1596     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1597     return;
1598   }
1599   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1600   GNUNET_MQ_send (op->mq,
1601                   ev);
1602
1603   if (NULL != opaque_context)
1604     LOG (GNUNET_ERROR_TYPE_DEBUG,
1605          "sent op request with context message\n");
1606   else
1607     LOG (GNUNET_ERROR_TYPE_DEBUG,
1608          "sent op request without context message\n");
1609 }
1610
1611
1612 /**
1613  * Accept an union operation request from a remote peer.
1614  * Only initializes the private operation state.
1615  *
1616  * @param op operation that will be accepted as a union operation
1617  */
1618 static void
1619 union_accept (struct Operation *op)
1620 {
1621   LOG (GNUNET_ERROR_TYPE_DEBUG,
1622        "accepting set union operation\n");
1623   GNUNET_assert (NULL == op->state);
1624
1625   GNUNET_STATISTICS_update (_GSS_statistics,
1626                             "# of accepted union operations",
1627                             1,
1628                             GNUNET_NO);
1629   GNUNET_STATISTICS_update (_GSS_statistics,
1630                             "# of total union operations",
1631                             1,
1632                             GNUNET_NO);
1633
1634   op->state = GNUNET_new (struct OperationState);
1635   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1636   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1637   op->state->salt_receive = op->state->salt_send = 42;
1638   /* kick off the operation */
1639   send_strata_estimator (op);
1640 }
1641
1642
1643 /**
1644  * Create a new set supporting the union operation
1645  *
1646  * We maintain one strata estimator per set and then manipulate it over the
1647  * lifetime of the set, as recreating a strata estimator would be expensive.
1648  *
1649  * @return the newly created set, NULL on error
1650  */
1651 static struct SetState *
1652 union_set_create (void)
1653 {
1654   struct SetState *set_state;
1655
1656   LOG (GNUNET_ERROR_TYPE_DEBUG,
1657        "union set created\n");
1658   set_state = GNUNET_new (struct SetState);
1659   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1660                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
1661   if (NULL == set_state->se)
1662   {
1663     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1664                 "Failed to allocate strata estimator\n");
1665     GNUNET_free (set_state);
1666     return NULL;
1667   }
1668   return set_state;
1669 }
1670
1671
1672 /**
1673  * Add the element from the given element message to the set.
1674  *
1675  * @param set_state state of the set want to add to
1676  * @param ee the element to add to the set
1677  */
1678 static void
1679 union_add (struct SetState *set_state, struct ElementEntry *ee)
1680 {
1681   strata_estimator_insert (set_state->se,
1682                            get_ibf_key (&ee->element_hash));
1683 }
1684
1685
1686 /**
1687  * Remove the element given in the element message from the set.
1688  * Only marks the element as removed, so that older set operations can still exchange it.
1689  *
1690  * @param set_state state of the set to remove from
1691  * @param ee set element to remove
1692  */
1693 static void
1694 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1695 {
1696   strata_estimator_remove (set_state->se,
1697                            get_ibf_key (&ee->element_hash));
1698 }
1699
1700
1701 /**
1702  * Destroy a set that supports the union operation.
1703  *
1704  * @param set_state the set to destroy
1705  */
1706 static void
1707 union_set_destroy (struct SetState *set_state)
1708 {
1709   if (NULL != set_state->se)
1710   {
1711     strata_estimator_destroy (set_state->se);
1712     set_state->se = NULL;
1713   }
1714   GNUNET_free (set_state);
1715 }
1716
1717
1718 /**
1719  * Dispatch messages for a union operation.
1720  *
1721  * @param op the state of the union evaluate operation
1722  * @param mh the received message
1723  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1724  *         #GNUNET_OK otherwise
1725  */
1726 int
1727 union_handle_p2p_message (struct Operation *op,
1728                           const struct GNUNET_MessageHeader *mh)
1729 {
1730   //LOG (GNUNET_ERROR_TYPE_DEBUG,
1731   //            "received p2p message (t: %u, s: %u)\n",
1732   //            ntohs (mh->type),
1733   //            ntohs (mh->size));
1734   switch (ntohs (mh->type))
1735   {
1736     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1737       return handle_p2p_ibf (op, mh);
1738     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1739       return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1740     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1741       return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1742     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1743       handle_p2p_elements (op, mh);
1744       break;
1745     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1746       handle_p2p_inquiry (op, mh);
1747       break;
1748     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1749       handle_p2p_done (op, mh);
1750       break;
1751     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1752       handle_p2p_offer (op, mh);
1753       break;
1754     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1755       handle_p2p_demand (op, mh);
1756       break;
1757     default:
1758       /* Something wrong with cadet's message handlers? */
1759       GNUNET_assert (0);
1760   }
1761   return GNUNET_OK;
1762 }
1763
1764
1765 /**
1766  * Handler for peer-disconnects, notifies the client
1767  * about the aborted operation in case the op was not concluded.
1768  *
1769  * @param op the destroyed operation
1770  */
1771 static void
1772 union_peer_disconnect (struct Operation *op)
1773 {
1774   if (PHASE_DONE != op->state->phase)
1775   {
1776     struct GNUNET_MQ_Envelope *ev;
1777     struct GNUNET_SET_ResultMessage *msg;
1778
1779     ev = GNUNET_MQ_msg (msg,
1780                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1781     msg->request_id = htonl (op->spec->client_request_id);
1782     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1783     msg->element_type = htons (0);
1784     GNUNET_MQ_send (op->spec->set->client_mq,
1785                     ev);
1786     LOG (GNUNET_ERROR_TYPE_WARNING,
1787          "other peer disconnected prematurely, phase %u\n",
1788          op->state->phase);
1789     _GSS_operation_destroy (op,
1790                             GNUNET_YES);
1791     return;
1792   }
1793   // else: the session has already been concluded
1794   LOG (GNUNET_ERROR_TYPE_DEBUG,
1795        "other peer disconnected (finished)\n");
1796   if (GNUNET_NO == op->state->client_done_sent)
1797     send_done_and_destroy (op);
1798 }
1799
1800
1801 /**
1802  * Copy union-specific set state.
1803  *
1804  * @param set source set for copying the union state
1805  * @return a copy of the union-specific set state
1806  */
1807 static struct SetState *
1808 union_copy_state (struct Set *set)
1809 {
1810   struct SetState *new_state;
1811
1812   new_state = GNUNET_new (struct SetState);
1813   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1814   new_state->se = strata_estimator_dup (set->state->se);
1815
1816   return new_state;
1817 }
1818
1819
1820 /**
1821  * Get the table with implementing functions for
1822  * set union.
1823  *
1824  * @return the operation specific VTable
1825  */
1826 const struct SetVT *
1827 _GSS_union_vt ()
1828 {
1829   static const struct SetVT union_vt = {
1830     .create = &union_set_create,
1831     .msg_handler = &union_handle_p2p_message,
1832     .add = &union_add,
1833     .remove = &union_remove,
1834     .destroy_set = &union_set_destroy,
1835     .evaluate = &union_evaluate,
1836     .accept = &union_accept,
1837     .peer_disconnect = &union_peer_disconnect,
1838     .cancel = &union_op_cancel,
1839     .copy_state = &union_copy_state,
1840   };
1841
1842   return &union_vt;
1843 }