log error when timed out
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2015 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
33 #include <gcrypt.h>
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
37
38
39 /**
40  * Number of IBFs in a strata estimator.
41  */
42 #define SE_STRATA_COUNT 32
43
44 /**
45  * Size of the IBFs in the strata estimator.
46  */
47 #define SE_IBF_SIZE 80
48
49 /**
50  * The hash num parameter for the difference digests and strata estimators.
51  */
52 #define SE_IBF_HASH_NUM 4
53
54 /**
55  * Number of buckets that can be transmitted in one message.
56  */
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
58
59 /**
60  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61  * Choose this value so that computing the IBF is still cheaper
62  * than transmitting all values.
63  */
64 #define MAX_IBF_ORDER (20)
65
66 /**
67  * Number of buckets used in the ibf per estimated
68  * difference.
69  */
70 #define IBF_ALPHA 4
71
72
73 /**
74  * Current phase we are in for a union operation.
75  */
76 enum UnionOperationPhase
77 {
78   /**
79    * We sent the request message, and expect a strata estimator.
80    */
81   PHASE_EXPECT_SE,
82
83   /**
84    * We sent the strata estimator, and expect an IBF. This phase is entered once
85    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86    *
87    * XXX: could use better wording.
88    *
89    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
90    */
91   PHASE_EXPECT_IBF,
92
93   /**
94    * Continuation for multi part IBFs.
95    */
96   PHASE_EXPECT_IBF_CONT,
97
98   /**
99    * We are decoding an IBF.
100    */
101   PHASE_INVENTORY_ACTIVE,
102
103   /**
104    * The other peer is decoding the IBF we just sent.
105    */
106   PHASE_INVENTORY_PASSIVE,
107
108   /**
109    * The protocol is almost finished, but we still have to flush our message
110    * queue and/or expect some elements.
111    */
112   PHASE_FINISH_CLOSING,
113
114   /**
115    * In the penultimate phase,
116    * we wait until all our demands
117    * are satisfied.  Then we send a done
118    * message, and wait for another done message.*/
119   PHASE_FINISH_WAITING,
120
121   /**
122    * In the ultimate phase, we wait until
123    * our demands are satisfied and then
124    * quit (sending another DONE message). */
125   PHASE_DONE
126 };
127
128
129 /**
130  * State of an evaluate operation with another peer.
131  */
132 struct OperationState
133 {
134   /**
135    * Copy of the set's strata estimator at the time of
136    * creation of this operation.
137    */
138   struct StrataEstimator *se;
139
140   /**
141    * The IBF we currently receive.
142    */
143   struct InvertibleBloomFilter *remote_ibf;
144
145   /**
146    * The IBF with the local set's element.
147    */
148   struct InvertibleBloomFilter *local_ibf;
149
150   /**
151    * Maps IBF-Keys (specific to the current salt) to elements.
152    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
153    * Colliding IBF-Keys are linked.
154    */
155   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
156
157   /**
158    * Current state of the operation.
159    */
160   enum UnionOperationPhase phase;
161
162   /**
163    * Did we send the client that we are done?
164    */
165   int client_done_sent;
166
167   /**
168    * Number of ibf buckets already received into the @a remote_ibf.
169    */
170   unsigned int ibf_buckets_received;
171
172   /**
173    * Hashes for elements that we have demanded from the other peer.
174    */
175   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
176
177   /**
178    * Salt that we're using for sending IBFs
179    */
180   uint32_t salt_send;
181
182   /**
183    * Salt for the IBF we've received and that we're currently decoding.
184    */
185   uint32_t salt_receive;
186 };
187
188
189 /**
190  * The key entry is used to associate an ibf key with an element.
191  */
192 struct KeyEntry
193 {
194   /**
195    * IBF key for the entry, derived from the current salt.
196    */
197   struct IBF_Key ibf_key;
198
199   /**
200    * The actual element associated with the key.
201    *
202    * Only owned by the union operation if element->operation
203    * is #GNUNET_YES.
204    */
205   struct ElementEntry *element;
206 };
207
208
209 /**
210  * Used as a closure for sending elements
211  * with a specific IBF key.
212  */
213 struct SendElementClosure
214 {
215   /**
216    * The IBF key whose matching elements should be
217    * sent.
218    */
219   struct IBF_Key ibf_key;
220
221   /**
222    * Operation for which the elements
223    * should be sent.
224    */
225   struct Operation *op;
226 };
227
228
229 /**
230  * Extra state required for efficient set union.
231  */
232 struct SetState
233 {
234   /**
235    * The strata estimator is only generated once for
236    * each set.
237    * The IBF keys are derived from the element hashes with
238    * salt=0.
239    */
240   struct StrataEstimator *se;
241 };
242
243
244 /**
245  * Iterator over hash map entries, called to
246  * destroy the linked list of colliding ibf key entries.
247  *
248  * @param cls closure
249  * @param key current key code
250  * @param value value in the hash map
251  * @return #GNUNET_YES if we should continue to iterate,
252  *         #GNUNET_NO if not.
253  */
254 static int
255 destroy_key_to_element_iter (void *cls,
256                              uint32_t key,
257                              void *value)
258 {
259   struct KeyEntry *k = value;
260
261   GNUNET_assert (NULL != k);
262   if (GNUNET_YES == k->element->remote)
263   {
264     GNUNET_free (k->element);
265     k->element = NULL;
266   }
267   GNUNET_free (k);
268   return GNUNET_YES;
269 }
270
271
272 /**
273  * Destroy the union operation.  Only things specific to the union
274  * operation are destroyed.
275  *
276  * @param op union operation to destroy
277  */
278 static void
279 union_op_cancel (struct Operation *op)
280 {
281   LOG (GNUNET_ERROR_TYPE_DEBUG,
282        "destroying union op\n");
283   /* check if the op was canceled twice */
284   GNUNET_assert (NULL != op->state);
285   if (NULL != op->state->remote_ibf)
286   {
287     ibf_destroy (op->state->remote_ibf);
288     op->state->remote_ibf = NULL;
289   }
290   if (NULL != op->state->demanded_hashes)
291   {
292     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
293     op->state->demanded_hashes = NULL;
294   }
295   if (NULL != op->state->local_ibf)
296   {
297     ibf_destroy (op->state->local_ibf);
298     op->state->local_ibf = NULL;
299   }
300   if (NULL != op->state->se)
301   {
302     strata_estimator_destroy (op->state->se);
303     op->state->se = NULL;
304   }
305   if (NULL != op->state->key_to_element)
306   {
307     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
308                                              &destroy_key_to_element_iter,
309                                              NULL);
310     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
311     op->state->key_to_element = NULL;
312   }
313   GNUNET_free (op->state);
314   op->state = NULL;
315   LOG (GNUNET_ERROR_TYPE_DEBUG,
316        "destroying union op done\n");
317 }
318
319
320 /**
321  * Inform the client that the union operation has failed,
322  * and proceed to destroy the evaluate operation.
323  *
324  * @param op the union operation to fail
325  */
326 static void
327 fail_union_operation (struct Operation *op)
328 {
329   struct GNUNET_MQ_Envelope *ev;
330   struct GNUNET_SET_ResultMessage *msg;
331
332   LOG (GNUNET_ERROR_TYPE_ERROR,
333        "union operation failed\n");
334   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
335   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
336   msg->request_id = htonl (op->spec->client_request_id);
337   msg->element_type = htons (0);
338   GNUNET_MQ_send (op->spec->set->client_mq, ev);
339   _GSS_operation_destroy (op, GNUNET_YES);
340 }
341
342
343 /**
344  * Derive the IBF key from a hash code and
345  * a salt.
346  *
347  * @param src the hash code
348  * @return the derived IBF key
349  */
350 static struct IBF_Key
351 get_ibf_key (const struct GNUNET_HashCode *src)
352 {
353   struct IBF_Key key;
354   uint16_t salt = 0;
355
356   GNUNET_CRYPTO_kdf (&key, sizeof (key),
357                      src, sizeof *src,
358                      &salt, sizeof (salt),
359                      NULL, 0);
360   return key;
361 }
362
363
364 /**
365  * Iterator over the mapping from IBF keys to element entries.  Checks if we
366  * have an element with a given GNUNET_HashCode.
367  *
368  * @param cls closure
369  * @param key current key code
370  * @param value value in the hash map
371  * @return #GNUNET_YES if we should search further,
372  *         #GNUNET_NO if we've found the element.
373  */
374 static int
375 op_has_element_iterator (void *cls,
376                          uint32_t key,
377                          void *value)
378 {
379   struct GNUNET_HashCode *element_hash = cls;
380   struct KeyEntry *k = value;
381
382   GNUNET_assert (NULL != k);
383   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
384                                    element_hash))
385     return GNUNET_NO;
386   return GNUNET_YES;
387 }
388
389
390 /**
391  * Determine whether the given element is already in the operation's element
392  * set.
393  *
394  * @param op operation that should be tested for 'element_hash'
395  * @param element_hash hash of the element to look for
396  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
397  */
398 static int
399 op_has_element (struct Operation *op,
400                 const struct GNUNET_HashCode *element_hash)
401 {
402   int ret;
403   struct IBF_Key ibf_key;
404
405   ibf_key = get_ibf_key (element_hash);
406   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
407                                                       (uint32_t) ibf_key.key_val,
408                                                       op_has_element_iterator,
409                                                       (void *) element_hash);
410
411   /* was the iteration aborted because we found the element? */
412   if (GNUNET_SYSERR == ret)
413     return GNUNET_YES;
414   return GNUNET_NO;
415 }
416
417
418 /**
419  * Insert an element into the union operation's
420  * key-to-element mapping. Takes ownership of 'ee'.
421  * Note that this does not insert the element in the set,
422  * only in the operation's key-element mapping.
423  * This is done to speed up re-tried operations, if some elements
424  * were transmitted, and then the IBF fails to decode.
425  *
426  * XXX: clarify ownership, doesn't sound right.
427  *
428  * @param op the union operation
429  * @param ee the element entry
430  */
431 static void
432 op_register_element (struct Operation *op,
433                      struct ElementEntry *ee)
434 {
435   struct IBF_Key ibf_key;
436   struct KeyEntry *k;
437
438   ibf_key = get_ibf_key (&ee->element_hash);
439   k = GNUNET_new (struct KeyEntry);
440   k->element = ee;
441   k->ibf_key = ibf_key;
442   GNUNET_assert (GNUNET_OK ==
443                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
444                                                       (uint32_t) ibf_key.key_val,
445                                                       k,
446                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
447 }
448
449
450 static void
451 salt_key (const struct IBF_Key *k_in,
452           uint32_t salt, 
453           struct IBF_Key *k_out)
454 {
455   int s = salt % 64;
456   uint64_t x = k_in->key_val;
457   /* rotate ibf key */
458   x = (x >> s) | (x << (64 - s));
459   k_out->key_val = x;
460 }
461
462
463 static void
464 unsalt_key (const struct IBF_Key *k_in,
465             uint32_t salt, 
466             struct IBF_Key *k_out)
467 {
468   int s = salt % 64;
469   uint64_t x = k_in->key_val;
470   x = (x << s) | (x >> (64 - s));
471   k_out->key_val = x;
472 }
473
474
475 /**
476  * Insert a key into an ibf.
477  *
478  * @param cls the ibf
479  * @param key unused
480  * @param value the key entry to get the key from
481  */
482 static int
483 prepare_ibf_iterator (void *cls,
484                       uint32_t key,
485                       void *value)
486 {
487   struct Operation *op = cls;
488   struct KeyEntry *ke = value;
489   struct IBF_Key salted_key;
490
491   LOG (GNUNET_ERROR_TYPE_DEBUG,
492        "[OP %x] inserting %lx (hash %s) into ibf\n",
493        (void *) op,
494        (unsigned long) ke->ibf_key.key_val,
495        GNUNET_h2s (&ke->element->element_hash));
496   salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
497   ibf_insert (op->state->local_ibf, salted_key);
498   return GNUNET_YES;
499 }
500
501
502 /**
503  * Iterator for initializing the
504  * key-to-element mapping of a union operation
505  *
506  * @param cls the union operation `struct Operation *`
507  * @param key unused
508  * @param value the `struct ElementEntry *` to insert
509  *        into the key-to-element mapping
510  * @return #GNUNET_YES (to continue iterating)
511  */
512 static int
513 init_key_to_element_iterator (void *cls,
514                               const struct GNUNET_HashCode *key,
515                               void *value)
516 {
517   struct Operation *op = cls;
518   struct ElementEntry *ee = value;
519
520   /* make sure that the element belongs to the set at the time
521    * of creating the operation */
522   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
523     return GNUNET_YES;
524
525   GNUNET_assert (GNUNET_NO == ee->remote);
526
527   op_register_element (op, ee);
528   return GNUNET_YES;
529 }
530
531
532 /**
533  * Create an ibf with the operation's elements
534  * of the specified size
535  *
536  * @param op the union operation
537  * @param size size of the ibf to create
538  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
539  */
540 static int
541 prepare_ibf (struct Operation *op,
542              uint32_t size)
543 {
544   if (NULL == op->state->key_to_element)
545   {
546     unsigned int len;
547
548     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
549     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
550     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
551                                            init_key_to_element_iterator, op);
552   }
553   if (NULL != op->state->local_ibf)
554     ibf_destroy (op->state->local_ibf);
555   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
556   if (NULL == op->state->local_ibf)
557   {
558     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
559                 "Failed to allocate local IBF\n");
560     return GNUNET_SYSERR;
561   }
562   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
563                                            &prepare_ibf_iterator,
564                                            op);
565   return GNUNET_OK;
566 }
567
568
569 /**
570  * Send an ibf of appropriate size.
571  *
572  * Fragments the IBF into multiple messages if necessary.
573  *
574  * @param op the union operation
575  * @param ibf_order order of the ibf to send, size=2^order
576  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
577  */
578 static int
579 send_ibf (struct Operation *op,
580           uint16_t ibf_order)
581 {
582   unsigned int buckets_sent = 0;
583   struct InvertibleBloomFilter *ibf;
584
585   if (GNUNET_OK !=
586       prepare_ibf (op, 1<<ibf_order))
587   {
588     /* allocation failed */
589     return GNUNET_SYSERR;
590   }
591
592   LOG (GNUNET_ERROR_TYPE_DEBUG,
593        "sending ibf of size %u\n",
594        1<<ibf_order);
595
596   {
597     char name[64] = { 0 };
598     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
599     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
600   }
601
602   ibf = op->state->local_ibf;
603
604   while (buckets_sent < (1 << ibf_order))
605   {
606     unsigned int buckets_in_message;
607     struct GNUNET_MQ_Envelope *ev;
608     struct IBFMessage *msg;
609
610     buckets_in_message = (1 << ibf_order) - buckets_sent;
611     /* limit to maximum */
612     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
613       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
614
615     ev = GNUNET_MQ_msg_extra (msg,
616                               buckets_in_message * IBF_BUCKET_SIZE,
617                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
618     msg->reserved1 = 0;
619     msg->reserved2 = 0;
620     msg->order = ibf_order;
621     msg->offset = htonl (buckets_sent);
622     msg->salt = htonl (op->state->salt_send);
623     ibf_write_slice (ibf, buckets_sent,
624                      buckets_in_message, &msg[1]);
625     buckets_sent += buckets_in_message;
626     LOG (GNUNET_ERROR_TYPE_DEBUG,
627          "ibf chunk size %u, %u/%u sent\n",
628          buckets_in_message,
629          buckets_sent,
630          1<<ibf_order);
631     GNUNET_MQ_send (op->mq, ev);
632   }
633
634   /* The other peer must decode the IBF, so
635    * we're passive. */
636   op->state->phase = PHASE_INVENTORY_PASSIVE;
637   return GNUNET_OK;
638 }
639
640
641 /**
642  * Send a strata estimator to the remote peer.
643  *
644  * @param op the union operation with the remote peer
645  */
646 static void
647 send_strata_estimator (struct Operation *op)
648 {
649   const struct StrataEstimator *se = op->state->se;
650   struct GNUNET_MQ_Envelope *ev;
651   struct GNUNET_MessageHeader *strata_msg;
652   char *buf;
653   size_t len;
654   uint16_t type;
655
656   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
657   len = strata_estimator_write (op->state->se,
658                                 buf);
659   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
660     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
661   else
662     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
663   ev = GNUNET_MQ_msg_header_extra (strata_msg,
664                                    len,
665                                    type);
666   memcpy (&strata_msg[1],
667           buf,
668           len);
669   GNUNET_free (buf);
670   GNUNET_MQ_send (op->mq,
671                   ev);
672   op->state->phase = PHASE_EXPECT_IBF;
673   LOG (GNUNET_ERROR_TYPE_DEBUG,
674        "sent SE, expecting IBF\n");
675 }
676
677
678 /**
679  * Compute the necessary order of an ibf
680  * from the size of the symmetric set difference.
681  *
682  * @param diff the difference
683  * @return the required size of the ibf
684  */
685 static unsigned int
686 get_order_from_difference (unsigned int diff)
687 {
688   unsigned int ibf_order;
689
690   ibf_order = 2;
691   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
692           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
693     ibf_order++;
694   if (ibf_order > MAX_IBF_ORDER)
695     ibf_order = MAX_IBF_ORDER;
696   return ibf_order;
697 }
698
699
700 /**
701  * Handle a strata estimator from a remote peer
702  *
703  * @param cls the union operation
704  * @param mh the message
705  * @param is_compressed #GNUNET_YES if the estimator is compressed
706  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
707  *         #GNUNET_OK otherwise
708  */
709 static int
710 handle_p2p_strata_estimator (void *cls,
711                              const struct GNUNET_MessageHeader *mh,
712                              int is_compressed)
713 {
714   struct Operation *op = cls;
715   struct StrataEstimator *remote_se;
716   int diff;
717   size_t len;
718
719   GNUNET_STATISTICS_update (_GSS_statistics,
720                             "# bytes of SE received",
721                             ntohs (mh->size),
722                             GNUNET_NO);
723
724   if (op->state->phase != PHASE_EXPECT_SE)
725   {
726     fail_union_operation (op);
727     GNUNET_break (0);
728     return GNUNET_SYSERR;
729   }
730   len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
731   if ( (GNUNET_NO == is_compressed) &&
732        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
733   {
734     fail_union_operation (op);
735     GNUNET_break (0);
736     return GNUNET_SYSERR;
737   }
738   remote_se = strata_estimator_create (SE_STRATA_COUNT,
739                                        SE_IBF_SIZE,
740                                        SE_IBF_HASH_NUM);
741   if (NULL == remote_se)
742   {
743     /* insufficient resources, fail */
744     fail_union_operation (op);
745     return GNUNET_SYSERR;
746   }
747   if (GNUNET_OK !=
748       strata_estimator_read (&mh[1],
749                              len,
750                              is_compressed,
751                              remote_se))
752   {
753     /* decompression failed */
754     fail_union_operation (op);
755     return GNUNET_SYSERR;
756   }
757   GNUNET_assert (NULL != op->state->se);
758   diff = strata_estimator_difference (remote_se,
759                                       op->state->se);
760   strata_estimator_destroy (remote_se);
761   strata_estimator_destroy (op->state->se);
762   op->state->se = NULL;
763   LOG (GNUNET_ERROR_TYPE_DEBUG,
764        "got se diff=%d, using ibf size %d\n",
765        diff,
766        1<<get_order_from_difference (diff));
767   if (GNUNET_OK !=
768       send_ibf (op,
769                 get_order_from_difference (diff)))
770   {
771     /* Internal error, best we can do is shut the connection */
772     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
773                 "Failed to send IBF, closing connection\n");
774     fail_union_operation (op);
775     return GNUNET_SYSERR;
776   }
777   return GNUNET_OK;
778 }
779
780
781 /**
782  * Iterator to send elements to a remote peer
783  *
784  * @param cls closure with the element key and the union operation
785  * @param key ignored
786  * @param value the key entry
787  */
788 static int
789 send_offers_iterator (void *cls,
790                       uint32_t key,
791                       void *value)
792 {
793   struct SendElementClosure *sec = cls;
794   struct Operation *op = sec->op;
795   struct KeyEntry *ke = value;
796   struct GNUNET_MQ_Envelope *ev;
797   struct GNUNET_MessageHeader *mh;
798
799   /* Detect 32-bit key collision for the 64-bit IBF keys. */
800   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
801     return GNUNET_YES;
802
803   ev = GNUNET_MQ_msg_header_extra (mh,
804                                    sizeof (struct GNUNET_HashCode),
805                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
806
807   GNUNET_assert (NULL != ev);
808   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
809   LOG (GNUNET_ERROR_TYPE_DEBUG,
810        "[OP %x] sending element offer (%s) to peer\n",
811        (void *) op,
812        GNUNET_h2s (&ke->element->element_hash));
813   GNUNET_MQ_send (op->mq, ev);
814   return GNUNET_YES;
815 }
816
817
818 /**
819  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
820  *
821  * @param op union operation
822  * @param ibf_key IBF key of interest
823  */
824 static void
825 send_offers_for_key (struct Operation *op,
826                      struct IBF_Key ibf_key)
827 {
828   struct SendElementClosure send_cls;
829
830   send_cls.ibf_key = ibf_key;
831   send_cls.op = op;
832   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
833                                                        (uint32_t) ibf_key.key_val,
834                                                        &send_offers_iterator,
835                                                        &send_cls);
836 }
837
838
839 /**
840  * Decode which elements are missing on each side, and
841  * send the appropriate offers and inquiries.
842  *
843  * @param op union operation
844  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
845  */
846 static int
847 decode_and_send (struct Operation *op)
848 {
849   struct IBF_Key key;
850   struct IBF_Key last_key;
851   int side;
852   unsigned int num_decoded;
853   struct InvertibleBloomFilter *diff_ibf;
854
855   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
856
857   if (GNUNET_OK !=
858       prepare_ibf (op, op->state->remote_ibf->size))
859   {
860     GNUNET_break (0);
861     /* allocation failed */
862     return GNUNET_SYSERR;
863   }
864   diff_ibf = ibf_dup (op->state->local_ibf);
865   ibf_subtract (diff_ibf, op->state->remote_ibf);
866
867   ibf_destroy (op->state->remote_ibf);
868   op->state->remote_ibf = NULL;
869
870   LOG (GNUNET_ERROR_TYPE_DEBUG,
871        "decoding IBF (size=%u)\n",
872        diff_ibf->size);
873
874   num_decoded = 0;
875   last_key.key_val = 0;
876
877   while (1)
878   {
879     int res;
880     int cycle_detected = GNUNET_NO;
881
882     last_key = key;
883
884     res = ibf_decode (diff_ibf, &side, &key);
885     if (res == GNUNET_OK)
886     {
887       LOG (GNUNET_ERROR_TYPE_DEBUG,
888            "decoded ibf key %lx\n",
889            (unsigned long) key.key_val);
890       num_decoded += 1;
891       if ( (num_decoded > diff_ibf->size) ||
892            (num_decoded > 1 && last_key.key_val == key.key_val) )
893       {
894         LOG (GNUNET_ERROR_TYPE_DEBUG,
895              "detected cyclic ibf (decoded %u/%u)\n",
896              num_decoded,
897              diff_ibf->size);
898         cycle_detected = GNUNET_YES;
899       }
900     }
901     if ( (GNUNET_SYSERR == res) ||
902          (GNUNET_YES == cycle_detected) )
903     {
904       int next_order;
905       next_order = 0;
906       while (1<<next_order < diff_ibf->size)
907         next_order++;
908       next_order++;
909       if (next_order <= MAX_IBF_ORDER)
910       {
911         LOG (GNUNET_ERROR_TYPE_DEBUG,
912              "decoding failed, sending larger ibf (size %u)\n",
913              1<<next_order);
914         GNUNET_STATISTICS_update (_GSS_statistics,
915                                   "# of IBF retries",
916                                   1,
917                                   GNUNET_NO);
918         op->state->salt_send++;
919         if (GNUNET_OK !=
920             send_ibf (op, next_order))
921         {
922           /* Internal error, best we can do is shut the connection */
923           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
924                       "Failed to send IBF, closing connection\n");
925           fail_union_operation (op);
926           ibf_destroy (diff_ibf);
927           return GNUNET_SYSERR;
928         }
929       }
930       else
931       {
932         GNUNET_STATISTICS_update (_GSS_statistics,
933                                   "# of failed union operations (too large)",
934                                   1,
935                                   GNUNET_NO);
936         // XXX: Send the whole set, element-by-element
937         LOG (GNUNET_ERROR_TYPE_ERROR,
938              "set union failed: reached ibf limit\n");
939         fail_union_operation (op);
940         ibf_destroy (diff_ibf);
941         return GNUNET_SYSERR;
942       }
943       break;
944     }
945     if (GNUNET_NO == res)
946     {
947       struct GNUNET_MQ_Envelope *ev;
948
949       LOG (GNUNET_ERROR_TYPE_DEBUG,
950            "transmitted all values, sending DONE\n");
951       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
952       GNUNET_MQ_send (op->mq, ev);
953       /* We now wait until we get a DONE message back
954        * and then wait for our MQ to be flushed and all our
955        * demands be delivered. */
956       break;
957     }
958     if (1 == side)
959     {
960       struct IBF_Key unsalted_key;
961       unsalt_key (&key, op->state->salt_receive, &unsalted_key);
962       send_offers_for_key (op, unsalted_key);
963     }
964     else if (-1 == side)
965     {
966       struct GNUNET_MQ_Envelope *ev;
967       struct InquiryMessage *msg;
968
969       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
970        * the effort additional complexity. */
971       ev = GNUNET_MQ_msg_extra (msg,
972                                 sizeof (struct IBF_Key),
973                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
974       msg->salt = htonl (op->state->salt_receive);
975       memcpy (&msg[1],
976               &key,
977               sizeof (struct IBF_Key));
978       LOG (GNUNET_ERROR_TYPE_DEBUG,
979            "sending element inquiry for IBF key %lx\n",
980            (unsigned long) key.key_val);
981       GNUNET_MQ_send (op->mq, ev);
982     }
983     else
984     {
985       GNUNET_assert (0);
986     }
987   }
988   ibf_destroy (diff_ibf);
989   return GNUNET_OK;
990 }
991
992
993 /**
994  * Handle an IBF message from a remote peer.
995  *
996  * Reassemble the IBF from multiple pieces, and
997  * process the whole IBF once possible.
998  *
999  * @param cls the union operation
1000  * @param mh the header of the message
1001  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1002  *         #GNUNET_OK otherwise
1003  */
1004 static int
1005 handle_p2p_ibf (void *cls,
1006                 const struct GNUNET_MessageHeader *mh)
1007 {
1008   struct Operation *op = cls;
1009   const struct IBFMessage *msg;
1010   unsigned int buckets_in_message;
1011
1012   if (ntohs (mh->size) < sizeof (struct IBFMessage))
1013   {
1014     GNUNET_break_op (0);
1015     fail_union_operation (op);
1016     return GNUNET_SYSERR;
1017   }
1018   msg = (const struct IBFMessage *) mh;
1019   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1020        (op->state->phase == PHASE_EXPECT_IBF) )
1021   {
1022     op->state->phase = PHASE_EXPECT_IBF_CONT;
1023     GNUNET_assert (NULL == op->state->remote_ibf);
1024     LOG (GNUNET_ERROR_TYPE_DEBUG,
1025          "Creating new ibf of size %u\n",
1026          1 << msg->order);
1027     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1028     op->state->salt_receive = ntohl (msg->salt);
1029     LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1030     if (NULL == op->state->remote_ibf)
1031     {
1032       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1033                   "Failed to parse remote IBF, closing connection\n");
1034       fail_union_operation (op);
1035       return GNUNET_SYSERR;
1036     }
1037     op->state->ibf_buckets_received = 0;
1038     if (0 != ntohl (msg->offset))
1039     {
1040       GNUNET_break_op (0);
1041       fail_union_operation (op);
1042       return GNUNET_SYSERR;
1043     }
1044   }
1045   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1046   {
1047     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1048     {
1049       GNUNET_break_op (0);
1050       fail_union_operation (op);
1051       return GNUNET_SYSERR;
1052     }
1053     if (1<<msg->order != op->state->remote_ibf->size)
1054     {
1055       GNUNET_break_op (0);
1056       fail_union_operation (op);
1057       return GNUNET_SYSERR;
1058     }
1059     if (ntohl (msg->salt) != op->state->salt_receive)
1060     {
1061       GNUNET_break_op (0);
1062       fail_union_operation (op);
1063       return GNUNET_SYSERR;
1064     }
1065   }
1066   else
1067   {
1068     GNUNET_assert (0);
1069   }
1070
1071   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1072
1073   if (0 == buckets_in_message)
1074   {
1075     GNUNET_break_op (0);
1076     fail_union_operation (op);
1077     return GNUNET_SYSERR;
1078   }
1079
1080   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1081   {
1082     GNUNET_break_op (0);
1083     fail_union_operation (op);
1084     return GNUNET_SYSERR;
1085   }
1086
1087   GNUNET_assert (NULL != op->state->remote_ibf);
1088
1089   ibf_read_slice (&msg[1],
1090                   op->state->ibf_buckets_received,
1091                   buckets_in_message,
1092                   op->state->remote_ibf);
1093   op->state->ibf_buckets_received += buckets_in_message;
1094
1095   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1096   {
1097     LOG (GNUNET_ERROR_TYPE_DEBUG,
1098          "received full ibf\n");
1099     op->state->phase = PHASE_INVENTORY_ACTIVE;
1100     if (GNUNET_OK !=
1101         decode_and_send (op))
1102     {
1103       /* Internal error, best we can do is shut down */
1104       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1105                   "Failed to decode IBF, closing connection\n");
1106       return GNUNET_SYSERR;
1107     }
1108   }
1109   return GNUNET_OK;
1110 }
1111
1112
1113 /**
1114  * Send a result message to the client indicating
1115  * that there is a new element.
1116  *
1117  * @param op union operation
1118  * @param element element to send
1119  * @param status status to send with the new element
1120  */
1121 static void
1122 send_client_element (struct Operation *op,
1123                      struct GNUNET_SET_Element *element,
1124                      int status)
1125 {
1126   struct GNUNET_MQ_Envelope *ev;
1127   struct GNUNET_SET_ResultMessage *rm;
1128
1129   LOG (GNUNET_ERROR_TYPE_DEBUG,
1130        "sending element (size %u) to client\n",
1131        element->size);
1132   GNUNET_assert (0 != op->spec->client_request_id);
1133   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1134   if (NULL == ev)
1135   {
1136     GNUNET_MQ_discard (ev);
1137     GNUNET_break (0);
1138     return;
1139   }
1140   rm->result_status = htons (status);
1141   rm->request_id = htonl (op->spec->client_request_id);
1142   rm->element_type = element->element_type;
1143   memcpy (&rm[1], element->data, element->size);
1144   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1145 }
1146
1147
1148 /**
1149  * Signal to the client that the operation has finished and
1150  * destroy the operation.
1151  *
1152  * @param cls operation to destroy
1153  */
1154 static void
1155 send_done_and_destroy (void *cls)
1156 {
1157   struct Operation *op = cls;
1158   struct GNUNET_MQ_Envelope *ev;
1159   struct GNUNET_SET_ResultMessage *rm;
1160
1161   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1162   rm->request_id = htonl (op->spec->client_request_id);
1163   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1164   rm->element_type = htons (0);
1165   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1166   /* Will also call the union-specific cancel function. */
1167   _GSS_operation_destroy (op, GNUNET_YES);
1168 }
1169
1170
1171 static void
1172 maybe_finish (struct Operation *op)
1173 {
1174   unsigned int num_demanded;
1175
1176   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1177
1178   if (PHASE_FINISH_WAITING == op->state->phase)
1179   {
1180     LOG (GNUNET_ERROR_TYPE_DEBUG,
1181          "In PHASE_FINISH_WAITING, pending %u demands\n",
1182          num_demanded);
1183     if (0 == num_demanded)
1184     {
1185       struct GNUNET_MQ_Envelope *ev;
1186
1187       op->state->phase = PHASE_DONE;
1188       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1189       GNUNET_MQ_send (op->mq, ev);
1190
1191       /* We now wait until the other peer closes the channel
1192        * after it got all elements from us. */
1193     }
1194   }
1195   if (PHASE_FINISH_CLOSING == op->state->phase)
1196   {
1197     LOG (GNUNET_ERROR_TYPE_DEBUG,
1198          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1199          num_demanded);
1200     if (0 == num_demanded)
1201     {
1202       op->state->phase = PHASE_DONE;
1203       send_done_and_destroy (op);
1204     }
1205   }
1206 }
1207
1208
1209 /**
1210  * Handle an element message from a remote peer.
1211  *
1212  * @param cls the union operation
1213  * @param mh the message
1214  */
1215 static void
1216 handle_p2p_elements (void *cls,
1217                      const struct GNUNET_MessageHeader *mh)
1218 {
1219   struct Operation *op = cls;
1220   struct ElementEntry *ee;
1221   const struct GNUNET_SET_ElementMessage *emsg;
1222   uint16_t element_size;
1223
1224   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1225   {
1226     GNUNET_break_op (0);
1227     fail_union_operation (op);
1228     return;
1229   }
1230   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1231   {
1232     GNUNET_break_op (0);
1233     fail_union_operation (op);
1234     return;
1235   }
1236
1237   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1238
1239   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1240   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1241   memcpy (&ee[1], &emsg[1], element_size);
1242   ee->element.size = element_size;
1243   ee->element.data = &ee[1];
1244   ee->element.element_type = ntohs (emsg->element_type);
1245   ee->remote = GNUNET_YES;
1246   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1247
1248   if (GNUNET_NO ==
1249       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1250                                             &ee->element_hash,
1251                                             NULL))
1252   {
1253     /* We got something we didn't demand, since it's not in our map. */
1254     GNUNET_break_op (0);
1255     GNUNET_free (ee);
1256     fail_union_operation (op);
1257     return;
1258   }
1259
1260   LOG (GNUNET_ERROR_TYPE_DEBUG,
1261        "Got element (size %u, hash %s) from peer\n",
1262        (unsigned int) element_size,
1263        GNUNET_h2s (&ee->element_hash));
1264
1265   GNUNET_STATISTICS_update (_GSS_statistics,
1266                             "# received elements",
1267                             1,
1268                             GNUNET_NO);
1269   GNUNET_STATISTICS_update (_GSS_statistics,
1270                             "# exchanged elements",
1271                             1,
1272                             GNUNET_NO);
1273
1274   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1275   {
1276     /* Got repeated element.  Should not happen since
1277      * we track demands. */
1278     GNUNET_STATISTICS_update (_GSS_statistics,
1279                               "# repeated elements",
1280                               1,
1281                               GNUNET_NO);
1282     GNUNET_free (ee);
1283   }
1284   else
1285   {
1286     LOG (GNUNET_ERROR_TYPE_DEBUG,
1287          "Registering new element from remote peer\n");
1288     op_register_element (op, ee);
1289     /* only send results immediately if the client wants it */
1290     switch (op->spec->result_mode)
1291     {
1292       case GNUNET_SET_RESULT_ADDED:
1293         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1294         break;
1295       case GNUNET_SET_RESULT_SYMMETRIC:
1296         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1297         break;
1298       default:
1299         /* Result mode not supported, should have been caught earlier. */
1300         GNUNET_break (0);
1301         break;
1302     }
1303   }
1304
1305   maybe_finish (op);
1306 }
1307
1308
1309 /**
1310  * Send offers (for GNUNET_Hash-es) in response
1311  * to inquiries (for IBF_Key-s).
1312  *
1313  * @param cls the union operation
1314  * @param mh the message
1315  */
1316 static void
1317 handle_p2p_inquiry (void *cls,
1318                     const struct GNUNET_MessageHeader *mh)
1319 {
1320   struct Operation *op = cls;
1321   const struct IBF_Key *ibf_key;
1322   unsigned int num_keys;
1323   struct InquiryMessage *msg;
1324
1325   /* look up elements and send them */
1326   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1327   {
1328     GNUNET_break_op (0);
1329     fail_union_operation (op);
1330     return;
1331   }
1332   num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1333       / sizeof (struct IBF_Key);
1334   if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1335       != num_keys * sizeof (struct IBF_Key))
1336   {
1337     GNUNET_break_op (0);
1338     fail_union_operation (op);
1339     return;
1340   }
1341
1342   msg = (struct InquiryMessage *) mh;
1343
1344   ibf_key = (const struct IBF_Key *) &msg[1];
1345   while (0 != num_keys--)
1346   {
1347     struct IBF_Key unsalted_key;
1348     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1349     send_offers_for_key (op, unsalted_key);
1350     ibf_key++;
1351   }
1352 }
1353
1354
1355 /**
1356  * FIXME
1357  */
1358 static void
1359 handle_p2p_demand (void *cls,
1360                    const struct GNUNET_MessageHeader *mh)
1361 {
1362   struct Operation *op = cls;
1363   struct ElementEntry *ee;
1364   struct GNUNET_SET_ElementMessage *emsg;
1365   const struct GNUNET_HashCode *hash;
1366   unsigned int num_hashes;
1367   struct GNUNET_MQ_Envelope *ev;
1368
1369   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1370     / sizeof (struct GNUNET_HashCode);
1371   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1372       != num_hashes * sizeof (struct GNUNET_HashCode))
1373   {
1374     GNUNET_break_op (0);
1375     fail_union_operation (op);
1376     return;
1377   }
1378
1379   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1380        num_hashes > 0;
1381        hash++, num_hashes--)
1382   {
1383     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1384     if (NULL == ee)
1385     {
1386       /* Demand for non-existing element. */
1387       GNUNET_break_op (0);
1388       fail_union_operation (op);
1389       return;
1390     }
1391     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1392     {
1393       /* Probably confused lazily copied sets. */
1394       GNUNET_break_op (0);
1395       fail_union_operation (op);
1396       return;
1397     }
1398     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1399     memcpy (&emsg[1], ee->element.data, ee->element.size);
1400     emsg->reserved = htons (0);
1401     emsg->element_type = htons (ee->element.element_type);
1402     LOG (GNUNET_ERROR_TYPE_DEBUG,
1403          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1404          (void *) op,
1405          (unsigned int) ee->element.size,
1406          GNUNET_h2s (&ee->element_hash));
1407     GNUNET_MQ_send (op->mq, ev);
1408     GNUNET_STATISTICS_update (_GSS_statistics,
1409                               "# exchanged elements",
1410                               1,
1411                               GNUNET_NO);
1412
1413     switch (op->spec->result_mode)
1414     {
1415       case GNUNET_SET_RESULT_ADDED:
1416         /* Nothing to do. */
1417         break;
1418       case GNUNET_SET_RESULT_SYMMETRIC:
1419         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1420         break;
1421       default:
1422         /* Result mode not supported, should have been caught earlier. */
1423         GNUNET_break (0);
1424         break;
1425     }
1426   }
1427 }
1428
1429
1430 /**
1431  * Handle offers (of GNUNET_HashCode-s) and
1432  * respond with demands (of GNUNET_HashCode-s).
1433  *
1434  * @param cls the union operation
1435  * @param mh the message
1436  */
1437 static void
1438 handle_p2p_offer (void *cls,
1439                     const struct GNUNET_MessageHeader *mh)
1440 {
1441   struct Operation *op = cls;
1442   const struct GNUNET_HashCode *hash;
1443   unsigned int num_hashes;
1444
1445   /* look up elements and send them */
1446   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1447        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1448   {
1449     GNUNET_break_op (0);
1450     fail_union_operation (op);
1451     return;
1452   }
1453   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1454     / sizeof (struct GNUNET_HashCode);
1455   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1456       != num_hashes * sizeof (struct GNUNET_HashCode))
1457   {
1458     GNUNET_break_op (0);
1459     fail_union_operation (op);
1460     return;
1461   }
1462
1463   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1464        num_hashes > 0;
1465        hash++, num_hashes--)
1466   {
1467     struct ElementEntry *ee;
1468     struct GNUNET_MessageHeader *demands;
1469     struct GNUNET_MQ_Envelope *ev;
1470
1471     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1472                                             hash);
1473     if (NULL != ee)
1474       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1475         continue;
1476
1477     if (GNUNET_YES ==
1478         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1479                                                 hash))
1480     {
1481       LOG (GNUNET_ERROR_TYPE_DEBUG,
1482            "Skipped sending duplicate demand\n");
1483       continue;
1484     }
1485
1486     GNUNET_assert (GNUNET_OK ==
1487                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1488                                                       hash,
1489                                                       NULL,
1490                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1491
1492     LOG (GNUNET_ERROR_TYPE_DEBUG,
1493          "[OP %x] Requesting element (hash %s)\n",
1494          (void *) op, GNUNET_h2s (hash));
1495     ev = GNUNET_MQ_msg_header_extra (demands,
1496                                      sizeof (struct GNUNET_HashCode),
1497                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1498     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1499     GNUNET_MQ_send (op->mq, ev);
1500   }
1501 }
1502
1503
1504 /**
1505  * Handle a done message from a remote peer
1506  *
1507  * @param cls the union operation
1508  * @param mh the message
1509  */
1510 static void
1511 handle_p2p_done (void *cls,
1512                  const struct GNUNET_MessageHeader *mh)
1513 {
1514   struct Operation *op = cls;
1515
1516   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1517   {
1518     /* We got all requests, but still have to send our elements in response. */
1519
1520     op->state->phase = PHASE_FINISH_WAITING;
1521
1522     LOG (GNUNET_ERROR_TYPE_DEBUG,
1523          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1524     /* The active peer is done sending offers
1525      * and inquiries.  This means that all
1526      * our responses to that (demands and offers)
1527      * must be in flight (queued or in mesh).
1528      *
1529      * We should notify the active peer once
1530      * all our demands are satisfied, so that the active
1531      * peer can quit if we gave him everything.
1532      */
1533     maybe_finish (op);
1534     return;
1535   }
1536   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1537   {
1538     LOG (GNUNET_ERROR_TYPE_DEBUG,
1539          "got DONE (as active partner), waiting to finish\n");
1540     /* All demands of the other peer are satisfied,
1541      * and we processed all offers, thus we know
1542      * exactly what our demands must be.
1543      *
1544      * We'll close the channel
1545      * to the other peer once our demands are met.
1546      */
1547     op->state->phase = PHASE_FINISH_CLOSING;
1548     maybe_finish (op);
1549     return;
1550   }
1551   GNUNET_break_op (0);
1552   fail_union_operation (op);
1553 }
1554
1555
1556 /**
1557  * Initiate operation to evaluate a set union with a remote peer.
1558  *
1559  * @param op operation to perform (to be initialized)
1560  * @param opaque_context message to be transmitted to the listener
1561  *        to convince him to accept, may be NULL
1562  */
1563 static void
1564 union_evaluate (struct Operation *op,
1565                 const struct GNUNET_MessageHeader *opaque_context)
1566 {
1567   struct GNUNET_MQ_Envelope *ev;
1568   struct OperationRequestMessage *msg;
1569
1570   GNUNET_assert (NULL == op->state);
1571   op->state = GNUNET_new (struct OperationState);
1572   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1573   /* copy the current generation's strata estimator for this operation */
1574   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1575   /* we started the operation, thus we have to send the operation request */
1576   op->state->phase = PHASE_EXPECT_SE;
1577   op->state->salt_receive = op->state->salt_send = 42;
1578   LOG (GNUNET_ERROR_TYPE_DEBUG,
1579        "Initiating union operation evaluation\n");
1580   GNUNET_STATISTICS_update (_GSS_statistics,
1581                             "# of total union operations",
1582                             1,
1583                             GNUNET_NO);
1584   GNUNET_STATISTICS_update (_GSS_statistics,
1585                             "# of initiated union operations",
1586                             1,
1587                             GNUNET_NO);
1588   ev = GNUNET_MQ_msg_nested_mh (msg,
1589                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1590                                 opaque_context);
1591   if (NULL == ev)
1592   {
1593     /* the context message is too large */
1594     GNUNET_break (0);
1595     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1596     return;
1597   }
1598   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1599   msg->app_id = op->spec->app_id;
1600   GNUNET_MQ_send (op->mq,
1601                   ev);
1602
1603   if (NULL != opaque_context)
1604     LOG (GNUNET_ERROR_TYPE_DEBUG,
1605          "sent op request with context message\n");
1606   else
1607     LOG (GNUNET_ERROR_TYPE_DEBUG,
1608          "sent op request without context message\n");
1609 }
1610
1611
1612 /**
1613  * Accept an union operation request from a remote peer.
1614  * Only initializes the private operation state.
1615  *
1616  * @param op operation that will be accepted as a union operation
1617  */
1618 static void
1619 union_accept (struct Operation *op)
1620 {
1621   LOG (GNUNET_ERROR_TYPE_DEBUG,
1622        "accepting set union operation\n");
1623   GNUNET_assert (NULL == op->state);
1624
1625   GNUNET_STATISTICS_update (_GSS_statistics,
1626                             "# of accepted union operations",
1627                             1,
1628                             GNUNET_NO);
1629   GNUNET_STATISTICS_update (_GSS_statistics,
1630                             "# of total union operations",
1631                             1,
1632                             GNUNET_NO);
1633
1634   op->state = GNUNET_new (struct OperationState);
1635   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1636   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1637   op->state->salt_receive = op->state->salt_send = 42;
1638   /* kick off the operation */
1639   send_strata_estimator (op);
1640 }
1641
1642
1643 /**
1644  * Create a new set supporting the union operation
1645  *
1646  * We maintain one strata estimator per set and then manipulate it over the
1647  * lifetime of the set, as recreating a strata estimator would be expensive.
1648  *
1649  * @return the newly created set, NULL on error
1650  */
1651 static struct SetState *
1652 union_set_create (void)
1653 {
1654   struct SetState *set_state;
1655
1656   LOG (GNUNET_ERROR_TYPE_DEBUG,
1657        "union set created\n");
1658   set_state = GNUNET_new (struct SetState);
1659   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1660                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
1661   if (NULL == set_state->se)
1662   {
1663     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1664                 "Failed to allocate strata estimator\n");
1665     GNUNET_free (set_state);
1666     return NULL;
1667   }
1668   return set_state;
1669 }
1670
1671
1672 /**
1673  * Add the element from the given element message to the set.
1674  *
1675  * @param set_state state of the set want to add to
1676  * @param ee the element to add to the set
1677  */
1678 static void
1679 union_add (struct SetState *set_state, struct ElementEntry *ee)
1680 {
1681   strata_estimator_insert (set_state->se,
1682                            get_ibf_key (&ee->element_hash));
1683 }
1684
1685
1686 /**
1687  * Remove the element given in the element message from the set.
1688  * Only marks the element as removed, so that older set operations can still exchange it.
1689  *
1690  * @param set_state state of the set to remove from
1691  * @param ee set element to remove
1692  */
1693 static void
1694 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1695 {
1696   strata_estimator_remove (set_state->se,
1697                            get_ibf_key (&ee->element_hash));
1698 }
1699
1700
1701 /**
1702  * Destroy a set that supports the union operation.
1703  *
1704  * @param set_state the set to destroy
1705  */
1706 static void
1707 union_set_destroy (struct SetState *set_state)
1708 {
1709   if (NULL != set_state->se)
1710   {
1711     strata_estimator_destroy (set_state->se);
1712     set_state->se = NULL;
1713   }
1714   GNUNET_free (set_state);
1715 }
1716
1717
1718 /**
1719  * Dispatch messages for a union operation.
1720  *
1721  * @param op the state of the union evaluate operation
1722  * @param mh the received message
1723  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1724  *         #GNUNET_OK otherwise
1725  */
1726 int
1727 union_handle_p2p_message (struct Operation *op,
1728                           const struct GNUNET_MessageHeader *mh)
1729 {
1730   //LOG (GNUNET_ERROR_TYPE_DEBUG,
1731   //            "received p2p message (t: %u, s: %u)\n",
1732   //            ntohs (mh->type),
1733   //            ntohs (mh->size));
1734   switch (ntohs (mh->type))
1735   {
1736     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1737       return handle_p2p_ibf (op, mh);
1738     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1739       return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1740     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1741       return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1742     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1743       handle_p2p_elements (op, mh);
1744       break;
1745     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1746       handle_p2p_inquiry (op, mh);
1747       break;
1748     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1749       handle_p2p_done (op, mh);
1750       break;
1751     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1752       handle_p2p_offer (op, mh);
1753       break;
1754     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1755       handle_p2p_demand (op, mh);
1756       break;
1757     default:
1758       /* Something wrong with cadet's message handlers? */
1759       GNUNET_assert (0);
1760   }
1761   return GNUNET_OK;
1762 }
1763
1764
1765 /**
1766  * Handler for peer-disconnects, notifies the client
1767  * about the aborted operation in case the op was not concluded.
1768  *
1769  * @param op the destroyed operation
1770  */
1771 static void
1772 union_peer_disconnect (struct Operation *op)
1773 {
1774   if (PHASE_DONE != op->state->phase)
1775   {
1776     struct GNUNET_MQ_Envelope *ev;
1777     struct GNUNET_SET_ResultMessage *msg;
1778
1779     ev = GNUNET_MQ_msg (msg,
1780                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1781     msg->request_id = htonl (op->spec->client_request_id);
1782     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1783     msg->element_type = htons (0);
1784     GNUNET_MQ_send (op->spec->set->client_mq,
1785                     ev);
1786     LOG (GNUNET_ERROR_TYPE_WARNING,
1787          "other peer disconnected prematurely, phase %u\n",
1788          op->state->phase);
1789     _GSS_operation_destroy (op,
1790                             GNUNET_YES);
1791     return;
1792   }
1793   // else: the session has already been concluded
1794   LOG (GNUNET_ERROR_TYPE_DEBUG,
1795        "other peer disconnected (finished)\n");
1796   if (GNUNET_NO == op->state->client_done_sent)
1797     send_done_and_destroy (op);
1798 }
1799
1800
1801 /**
1802  * Copy union-specific set state.
1803  *
1804  * @param set source set for copying the union state
1805  * @return a copy of the union-specific set state
1806  */
1807 static struct SetState *
1808 union_copy_state (struct Set *set)
1809 {
1810   struct SetState *new_state;
1811
1812   new_state = GNUNET_new (struct SetState);
1813   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1814   new_state->se = strata_estimator_dup (set->state->se);
1815
1816   return new_state;
1817 }
1818
1819
1820 /**
1821  * Get the table with implementing functions for
1822  * set union.
1823  *
1824  * @return the operation specific VTable
1825  */
1826 const struct SetVT *
1827 _GSS_union_vt ()
1828 {
1829   static const struct SetVT union_vt = {
1830     .create = &union_set_create,
1831     .msg_handler = &union_handle_p2p_message,
1832     .add = &union_add,
1833     .remove = &union_remove,
1834     .destroy_set = &union_set_destroy,
1835     .evaluate = &union_evaluate,
1836     .accept = &union_accept,
1837     .peer_disconnect = &union_peer_disconnect,
1838     .cancel = &union_op_cancel,
1839     .copy_state = &union_copy_state,
1840   };
1841
1842   return &union_vt;
1843 }