83201872387a82e54b12f9a9024af1684b7ede12
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2016 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
33 #include <gcrypt.h>
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
37
38
39 /**
40  * Number of IBFs in a strata estimator.
41  */
42 #define SE_STRATA_COUNT 32
43
44 /**
45  * Size of the IBFs in the strata estimator.
46  */
47 #define SE_IBF_SIZE 80
48
49 /**
50  * The hash num parameter for the difference digests and strata estimators.
51  */
52 #define SE_IBF_HASH_NUM 4
53
54 /**
55  * Number of buckets that can be transmitted in one message.
56  */
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
58
59 /**
60  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61  * Choose this value so that computing the IBF is still cheaper
62  * than transmitting all values.
63  */
64 #define MAX_IBF_ORDER (20)
65
66 /**
67  * Number of buckets used in the ibf per estimated
68  * difference.
69  */
70 #define IBF_ALPHA 4
71
72
73 /**
74  * Current phase we are in for a union operation.
75  */
76 enum UnionOperationPhase
77 {
78   /**
79    * We sent the request message, and expect a strata estimator.
80    */
81   PHASE_EXPECT_SE,
82
83   /**
84    * We sent the strata estimator, and expect an IBF. This phase is entered once
85    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86    *
87    * XXX: could use better wording.
88    *
89    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
90    */
91   PHASE_EXPECT_IBF,
92
93   /**
94    * Continuation for multi part IBFs.
95    */
96   PHASE_EXPECT_IBF_CONT,
97
98   /**
99    * We are decoding an IBF.
100    */
101   PHASE_INVENTORY_ACTIVE,
102
103   /**
104    * The other peer is decoding the IBF we just sent.
105    */
106   PHASE_INVENTORY_PASSIVE,
107
108   /**
109    * The protocol is almost finished, but we still have to flush our message
110    * queue and/or expect some elements.
111    */
112   PHASE_FINISH_CLOSING,
113
114   /**
115    * In the penultimate phase,
116    * we wait until all our demands
117    * are satisfied.  Then we send a done
118    * message, and wait for another done message.*/
119   PHASE_FINISH_WAITING,
120
121   /**
122    * In the ultimate phase, we wait until
123    * our demands are satisfied and then
124    * quit (sending another DONE message). */
125   PHASE_DONE
126 };
127
128
129 /**
130  * State of an evaluate operation with another peer.
131  */
132 struct OperationState
133 {
134   /**
135    * Copy of the set's strata estimator at the time of
136    * creation of this operation.
137    */
138   struct StrataEstimator *se;
139
140   /**
141    * The IBF we currently receive.
142    */
143   struct InvertibleBloomFilter *remote_ibf;
144
145   /**
146    * The IBF with the local set's element.
147    */
148   struct InvertibleBloomFilter *local_ibf;
149
150   /**
151    * Maps IBF-Keys (specific to the current salt) to elements.
152    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
153    * Colliding IBF-Keys are linked.
154    */
155   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
156
157   /**
158    * Current state of the operation.
159    */
160   enum UnionOperationPhase phase;
161
162   /**
163    * Did we send the client that we are done?
164    */
165   int client_done_sent;
166
167   /**
168    * Number of ibf buckets already received into the @a remote_ibf.
169    */
170   unsigned int ibf_buckets_received;
171
172   /**
173    * Hashes for elements that we have demanded from the other peer.
174    */
175   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
176
177   /**
178    * Salt that we're using for sending IBFs
179    */
180   uint32_t salt_send;
181
182   /**
183    * Salt for the IBF we've received and that we're currently decoding.
184    */
185   uint32_t salt_receive;
186
187   /**
188    * Number of elements we received from the other peer
189    * that were not in the local set yet.
190    */
191   uint32_t received_fresh;
192
193   /**
194    * Total number of elements received from the other peer.
195    */
196   uint32_t received_total;
197 };
198
199
200 /**
201  * The key entry is used to associate an ibf key with an element.
202  */
203 struct KeyEntry
204 {
205   /**
206    * IBF key for the entry, derived from the current salt.
207    */
208   struct IBF_Key ibf_key;
209
210   /**
211    * The actual element associated with the key.
212    *
213    * Only owned by the union operation if element->operation
214    * is #GNUNET_YES.
215    */
216   struct ElementEntry *element;
217 };
218
219
220 /**
221  * Used as a closure for sending elements
222  * with a specific IBF key.
223  */
224 struct SendElementClosure
225 {
226   /**
227    * The IBF key whose matching elements should be
228    * sent.
229    */
230   struct IBF_Key ibf_key;
231
232   /**
233    * Operation for which the elements
234    * should be sent.
235    */
236   struct Operation *op;
237 };
238
239
240 /**
241  * Extra state required for efficient set union.
242  */
243 struct SetState
244 {
245   /**
246    * The strata estimator is only generated once for
247    * each set.
248    * The IBF keys are derived from the element hashes with
249    * salt=0.
250    */
251   struct StrataEstimator *se;
252 };
253
254
255 /**
256  * Iterator over hash map entries, called to
257  * destroy the linked list of colliding ibf key entries.
258  *
259  * @param cls closure
260  * @param key current key code
261  * @param value value in the hash map
262  * @return #GNUNET_YES if we should continue to iterate,
263  *         #GNUNET_NO if not.
264  */
265 static int
266 destroy_key_to_element_iter (void *cls,
267                              uint32_t key,
268                              void *value)
269 {
270   struct KeyEntry *k = value;
271
272   GNUNET_assert (NULL != k);
273   if (GNUNET_YES == k->element->remote)
274   {
275     GNUNET_free (k->element);
276     k->element = NULL;
277   }
278   GNUNET_free (k);
279   return GNUNET_YES;
280 }
281
282
283 /**
284  * Destroy the union operation.  Only things specific to the union
285  * operation are destroyed.
286  *
287  * @param op union operation to destroy
288  */
289 static void
290 union_op_cancel (struct Operation *op)
291 {
292   LOG (GNUNET_ERROR_TYPE_DEBUG,
293        "destroying union op\n");
294   /* check if the op was canceled twice */
295   GNUNET_assert (NULL != op->state);
296   if (NULL != op->state->remote_ibf)
297   {
298     ibf_destroy (op->state->remote_ibf);
299     op->state->remote_ibf = NULL;
300   }
301   if (NULL != op->state->demanded_hashes)
302   {
303     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
304     op->state->demanded_hashes = NULL;
305   }
306   if (NULL != op->state->local_ibf)
307   {
308     ibf_destroy (op->state->local_ibf);
309     op->state->local_ibf = NULL;
310   }
311   if (NULL != op->state->se)
312   {
313     strata_estimator_destroy (op->state->se);
314     op->state->se = NULL;
315   }
316   if (NULL != op->state->key_to_element)
317   {
318     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
319                                              &destroy_key_to_element_iter,
320                                              NULL);
321     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
322     op->state->key_to_element = NULL;
323   }
324   GNUNET_free (op->state);
325   op->state = NULL;
326   LOG (GNUNET_ERROR_TYPE_DEBUG,
327        "destroying union op done\n");
328 }
329
330
331 /**
332  * Inform the client that the union operation has failed,
333  * and proceed to destroy the evaluate operation.
334  *
335  * @param op the union operation to fail
336  */
337 static void
338 fail_union_operation (struct Operation *op)
339 {
340   struct GNUNET_MQ_Envelope *ev;
341   struct GNUNET_SET_ResultMessage *msg;
342
343   LOG (GNUNET_ERROR_TYPE_ERROR,
344        "union operation failed\n");
345   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
346   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
347   msg->request_id = htonl (op->spec->client_request_id);
348   msg->element_type = htons (0);
349   GNUNET_MQ_send (op->spec->set->client_mq, ev);
350   _GSS_operation_destroy (op, GNUNET_YES);
351 }
352
353
354 /**
355  * Derive the IBF key from a hash code and
356  * a salt.
357  *
358  * @param src the hash code
359  * @return the derived IBF key
360  */
361 static struct IBF_Key
362 get_ibf_key (const struct GNUNET_HashCode *src)
363 {
364   struct IBF_Key key;
365   uint16_t salt = 0;
366
367   GNUNET_CRYPTO_kdf (&key, sizeof (key),
368                      src, sizeof *src,
369                      &salt, sizeof (salt),
370                      NULL, 0);
371   return key;
372 }
373
374
375 /**
376  * Iterator over the mapping from IBF keys to element entries.  Checks if we
377  * have an element with a given GNUNET_HashCode.
378  *
379  * @param cls closure
380  * @param key current key code
381  * @param value value in the hash map
382  * @return #GNUNET_YES if we should search further,
383  *         #GNUNET_NO if we've found the element.
384  */
385 static int
386 op_has_element_iterator (void *cls,
387                          uint32_t key,
388                          void *value)
389 {
390   struct GNUNET_HashCode *element_hash = cls;
391   struct KeyEntry *k = value;
392
393   GNUNET_assert (NULL != k);
394   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
395                                    element_hash))
396     return GNUNET_NO;
397   return GNUNET_YES;
398 }
399
400
401 /**
402  * Determine whether the given element is already in the operation's element
403  * set.
404  *
405  * @param op operation that should be tested for 'element_hash'
406  * @param element_hash hash of the element to look for
407  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
408  */
409 static int
410 op_has_element (struct Operation *op,
411                 const struct GNUNET_HashCode *element_hash)
412 {
413   int ret;
414   struct IBF_Key ibf_key;
415
416   ibf_key = get_ibf_key (element_hash);
417   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
418                                                       (uint32_t) ibf_key.key_val,
419                                                       op_has_element_iterator,
420                                                       (void *) element_hash);
421
422   /* was the iteration aborted because we found the element? */
423   if (GNUNET_SYSERR == ret)
424     return GNUNET_YES;
425   return GNUNET_NO;
426 }
427
428
429 /**
430  * Insert an element into the union operation's
431  * key-to-element mapping. Takes ownership of 'ee'.
432  * Note that this does not insert the element in the set,
433  * only in the operation's key-element mapping.
434  * This is done to speed up re-tried operations, if some elements
435  * were transmitted, and then the IBF fails to decode.
436  *
437  * XXX: clarify ownership, doesn't sound right.
438  *
439  * @param op the union operation
440  * @param ee the element entry
441  */
442 static void
443 op_register_element (struct Operation *op,
444                      struct ElementEntry *ee)
445 {
446   struct IBF_Key ibf_key;
447   struct KeyEntry *k;
448
449   ibf_key = get_ibf_key (&ee->element_hash);
450   k = GNUNET_new (struct KeyEntry);
451   k->element = ee;
452   k->ibf_key = ibf_key;
453   GNUNET_assert (GNUNET_OK ==
454                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
455                                                       (uint32_t) ibf_key.key_val,
456                                                       k,
457                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
458 }
459
460
461 static void
462 salt_key (const struct IBF_Key *k_in,
463           uint32_t salt,
464           struct IBF_Key *k_out)
465 {
466   int s = salt % 64;
467   uint64_t x = k_in->key_val;
468   /* rotate ibf key */
469   x = (x >> s) | (x << (64 - s));
470   k_out->key_val = x;
471 }
472
473
474 static void
475 unsalt_key (const struct IBF_Key *k_in,
476             uint32_t salt,
477             struct IBF_Key *k_out)
478 {
479   int s = salt % 64;
480   uint64_t x = k_in->key_val;
481   x = (x << s) | (x >> (64 - s));
482   k_out->key_val = x;
483 }
484
485
486 /**
487  * Insert a key into an ibf.
488  *
489  * @param cls the ibf
490  * @param key unused
491  * @param value the key entry to get the key from
492  */
493 static int
494 prepare_ibf_iterator (void *cls,
495                       uint32_t key,
496                       void *value)
497 {
498   struct Operation *op = cls;
499   struct KeyEntry *ke = value;
500   struct IBF_Key salted_key;
501
502   LOG (GNUNET_ERROR_TYPE_DEBUG,
503        "[OP %x] inserting %lx (hash %s) into ibf\n",
504        (void *) op,
505        (unsigned long) ke->ibf_key.key_val,
506        GNUNET_h2s (&ke->element->element_hash));
507   salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
508   ibf_insert (op->state->local_ibf, salted_key);
509   return GNUNET_YES;
510 }
511
512
513 /**
514  * Iterator for initializing the
515  * key-to-element mapping of a union operation
516  *
517  * @param cls the union operation `struct Operation *`
518  * @param key unused
519  * @param value the `struct ElementEntry *` to insert
520  *        into the key-to-element mapping
521  * @return #GNUNET_YES (to continue iterating)
522  */
523 static int
524 init_key_to_element_iterator (void *cls,
525                               const struct GNUNET_HashCode *key,
526                               void *value)
527 {
528   struct Operation *op = cls;
529   struct ElementEntry *ee = value;
530
531   /* make sure that the element belongs to the set at the time
532    * of creating the operation */
533   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
534     return GNUNET_YES;
535
536   GNUNET_assert (GNUNET_NO == ee->remote);
537
538   op_register_element (op, ee);
539   return GNUNET_YES;
540 }
541
542
543 /**
544  * Create an ibf with the operation's elements
545  * of the specified size
546  *
547  * @param op the union operation
548  * @param size size of the ibf to create
549  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
550  */
551 static int
552 prepare_ibf (struct Operation *op,
553              uint32_t size)
554 {
555   if (NULL == op->state->key_to_element)
556   {
557     unsigned int len;
558
559     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
560     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
561     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
562                                            init_key_to_element_iterator, op);
563   }
564   if (NULL != op->state->local_ibf)
565     ibf_destroy (op->state->local_ibf);
566   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
567   if (NULL == op->state->local_ibf)
568   {
569     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
570                 "Failed to allocate local IBF\n");
571     return GNUNET_SYSERR;
572   }
573   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
574                                            &prepare_ibf_iterator,
575                                            op);
576   return GNUNET_OK;
577 }
578
579
580 /**
581  * Send an ibf of appropriate size.
582  *
583  * Fragments the IBF into multiple messages if necessary.
584  *
585  * @param op the union operation
586  * @param ibf_order order of the ibf to send, size=2^order
587  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
588  */
589 static int
590 send_ibf (struct Operation *op,
591           uint16_t ibf_order)
592 {
593   unsigned int buckets_sent = 0;
594   struct InvertibleBloomFilter *ibf;
595
596   if (GNUNET_OK !=
597       prepare_ibf (op, 1<<ibf_order))
598   {
599     /* allocation failed */
600     return GNUNET_SYSERR;
601   }
602
603   LOG (GNUNET_ERROR_TYPE_DEBUG,
604        "sending ibf of size %u\n",
605        1<<ibf_order);
606
607   {
608     char name[64] = { 0 };
609     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
610     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
611   }
612
613   ibf = op->state->local_ibf;
614
615   while (buckets_sent < (1 << ibf_order))
616   {
617     unsigned int buckets_in_message;
618     struct GNUNET_MQ_Envelope *ev;
619     struct IBFMessage *msg;
620
621     buckets_in_message = (1 << ibf_order) - buckets_sent;
622     /* limit to maximum */
623     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
624       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
625
626     ev = GNUNET_MQ_msg_extra (msg,
627                               buckets_in_message * IBF_BUCKET_SIZE,
628                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
629     msg->reserved1 = 0;
630     msg->reserved2 = 0;
631     msg->order = ibf_order;
632     msg->offset = htonl (buckets_sent);
633     msg->salt = htonl (op->state->salt_send);
634     ibf_write_slice (ibf, buckets_sent,
635                      buckets_in_message, &msg[1]);
636     buckets_sent += buckets_in_message;
637     LOG (GNUNET_ERROR_TYPE_DEBUG,
638          "ibf chunk size %u, %u/%u sent\n",
639          buckets_in_message,
640          buckets_sent,
641          1<<ibf_order);
642     GNUNET_MQ_send (op->mq, ev);
643   }
644
645   /* The other peer must decode the IBF, so
646    * we're passive. */
647   op->state->phase = PHASE_INVENTORY_PASSIVE;
648   return GNUNET_OK;
649 }
650
651
652 /**
653  * Send a strata estimator to the remote peer.
654  *
655  * @param op the union operation with the remote peer
656  */
657 static void
658 send_strata_estimator (struct Operation *op)
659 {
660   const struct StrataEstimator *se = op->state->se;
661   struct GNUNET_MQ_Envelope *ev;
662   struct GNUNET_MessageHeader *strata_msg;
663   char *buf;
664   size_t len;
665   uint16_t type;
666
667   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
668   len = strata_estimator_write (op->state->se,
669                                 buf);
670   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
671     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
672   else
673     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
674   ev = GNUNET_MQ_msg_header_extra (strata_msg,
675                                    len,
676                                    type);
677   GNUNET_memcpy (&strata_msg[1],
678           buf,
679           len);
680   GNUNET_free (buf);
681   GNUNET_MQ_send (op->mq,
682                   ev);
683   op->state->phase = PHASE_EXPECT_IBF;
684   LOG (GNUNET_ERROR_TYPE_DEBUG,
685        "sent SE, expecting IBF\n");
686 }
687
688
689 /**
690  * Compute the necessary order of an ibf
691  * from the size of the symmetric set difference.
692  *
693  * @param diff the difference
694  * @return the required size of the ibf
695  */
696 static unsigned int
697 get_order_from_difference (unsigned int diff)
698 {
699   unsigned int ibf_order;
700
701   ibf_order = 2;
702   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
703           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
704     ibf_order++;
705   if (ibf_order > MAX_IBF_ORDER)
706     ibf_order = MAX_IBF_ORDER;
707   return ibf_order;
708 }
709
710
711 /**
712  * Handle a strata estimator from a remote peer
713  *
714  * @param cls the union operation
715  * @param mh the message
716  * @param is_compressed #GNUNET_YES if the estimator is compressed
717  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
718  *         #GNUNET_OK otherwise
719  */
720 static int
721 handle_p2p_strata_estimator (void *cls,
722                              const struct GNUNET_MessageHeader *mh,
723                              int is_compressed)
724 {
725   struct Operation *op = cls;
726   struct StrataEstimator *remote_se;
727   int diff;
728   size_t len;
729
730   GNUNET_STATISTICS_update (_GSS_statistics,
731                             "# bytes of SE received",
732                             ntohs (mh->size),
733                             GNUNET_NO);
734
735   if (op->state->phase != PHASE_EXPECT_SE)
736   {
737     fail_union_operation (op);
738     GNUNET_break (0);
739     return GNUNET_SYSERR;
740   }
741   len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
742   if ( (GNUNET_NO == is_compressed) &&
743        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
744   {
745     fail_union_operation (op);
746     GNUNET_break (0);
747     return GNUNET_SYSERR;
748   }
749   remote_se = strata_estimator_create (SE_STRATA_COUNT,
750                                        SE_IBF_SIZE,
751                                        SE_IBF_HASH_NUM);
752   if (NULL == remote_se)
753   {
754     /* insufficient resources, fail */
755     fail_union_operation (op);
756     return GNUNET_SYSERR;
757   }
758   if (GNUNET_OK !=
759       strata_estimator_read (&mh[1],
760                              len,
761                              is_compressed,
762                              remote_se))
763   {
764     /* decompression failed */
765     fail_union_operation (op);
766     strata_estimator_destroy (remote_se);
767     return GNUNET_SYSERR;
768   }
769   GNUNET_assert (NULL != op->state->se);
770   diff = strata_estimator_difference (remote_se,
771                                       op->state->se);
772   strata_estimator_destroy (remote_se);
773   strata_estimator_destroy (op->state->se);
774   op->state->se = NULL;
775   LOG (GNUNET_ERROR_TYPE_DEBUG,
776        "got se diff=%d, using ibf size %d\n",
777        diff,
778        1<<get_order_from_difference (diff));
779   if (GNUNET_OK !=
780       send_ibf (op,
781                 get_order_from_difference (diff)))
782   {
783     /* Internal error, best we can do is shut the connection */
784     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
785                 "Failed to send IBF, closing connection\n");
786     fail_union_operation (op);
787     return GNUNET_SYSERR;
788   }
789   return GNUNET_OK;
790 }
791
792
793 /**
794  * Iterator to send elements to a remote peer
795  *
796  * @param cls closure with the element key and the union operation
797  * @param key ignored
798  * @param value the key entry
799  */
800 static int
801 send_offers_iterator (void *cls,
802                       uint32_t key,
803                       void *value)
804 {
805   struct SendElementClosure *sec = cls;
806   struct Operation *op = sec->op;
807   struct KeyEntry *ke = value;
808   struct GNUNET_MQ_Envelope *ev;
809   struct GNUNET_MessageHeader *mh;
810
811   /* Detect 32-bit key collision for the 64-bit IBF keys. */
812   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
813     return GNUNET_YES;
814
815   ev = GNUNET_MQ_msg_header_extra (mh,
816                                    sizeof (struct GNUNET_HashCode),
817                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
818
819   GNUNET_assert (NULL != ev);
820   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
821   LOG (GNUNET_ERROR_TYPE_DEBUG,
822        "[OP %x] sending element offer (%s) to peer\n",
823        (void *) op,
824        GNUNET_h2s (&ke->element->element_hash));
825   GNUNET_MQ_send (op->mq, ev);
826   return GNUNET_YES;
827 }
828
829
830 /**
831  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
832  *
833  * @param op union operation
834  * @param ibf_key IBF key of interest
835  */
836 static void
837 send_offers_for_key (struct Operation *op,
838                      struct IBF_Key ibf_key)
839 {
840   struct SendElementClosure send_cls;
841
842   send_cls.ibf_key = ibf_key;
843   send_cls.op = op;
844   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
845                                                        (uint32_t) ibf_key.key_val,
846                                                        &send_offers_iterator,
847                                                        &send_cls);
848 }
849
850
851 /**
852  * Decode which elements are missing on each side, and
853  * send the appropriate offers and inquiries.
854  *
855  * @param op union operation
856  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
857  */
858 static int
859 decode_and_send (struct Operation *op)
860 {
861   struct IBF_Key key;
862   struct IBF_Key last_key;
863   int side;
864   unsigned int num_decoded;
865   struct InvertibleBloomFilter *diff_ibf;
866
867   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
868
869   if (GNUNET_OK !=
870       prepare_ibf (op, op->state->remote_ibf->size))
871   {
872     GNUNET_break (0);
873     /* allocation failed */
874     return GNUNET_SYSERR;
875   }
876   diff_ibf = ibf_dup (op->state->local_ibf);
877   ibf_subtract (diff_ibf, op->state->remote_ibf);
878
879   ibf_destroy (op->state->remote_ibf);
880   op->state->remote_ibf = NULL;
881
882   LOG (GNUNET_ERROR_TYPE_DEBUG,
883        "decoding IBF (size=%u)\n",
884        diff_ibf->size);
885
886   num_decoded = 0;
887   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
888
889   while (1)
890   {
891     int res;
892     int cycle_detected = GNUNET_NO;
893
894     last_key = key;
895
896     res = ibf_decode (diff_ibf, &side, &key);
897     if (res == GNUNET_OK)
898     {
899       LOG (GNUNET_ERROR_TYPE_DEBUG,
900            "decoded ibf key %lx\n",
901            (unsigned long) key.key_val);
902       num_decoded += 1;
903       if ( (num_decoded > diff_ibf->size) ||
904            ( (num_decoded > 1) &&
905              (last_key.key_val == key.key_val) ) )
906       {
907         LOG (GNUNET_ERROR_TYPE_DEBUG,
908              "detected cyclic ibf (decoded %u/%u)\n",
909              num_decoded,
910              diff_ibf->size);
911         cycle_detected = GNUNET_YES;
912       }
913     }
914     if ( (GNUNET_SYSERR == res) ||
915          (GNUNET_YES == cycle_detected) )
916     {
917       int next_order;
918       next_order = 0;
919       while (1<<next_order < diff_ibf->size)
920         next_order++;
921       next_order++;
922       if (next_order <= MAX_IBF_ORDER)
923       {
924         LOG (GNUNET_ERROR_TYPE_DEBUG,
925              "decoding failed, sending larger ibf (size %u)\n",
926              1<<next_order);
927         GNUNET_STATISTICS_update (_GSS_statistics,
928                                   "# of IBF retries",
929                                   1,
930                                   GNUNET_NO);
931         op->state->salt_send++;
932         if (GNUNET_OK !=
933             send_ibf (op, next_order))
934         {
935           /* Internal error, best we can do is shut the connection */
936           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
937                       "Failed to send IBF, closing connection\n");
938           fail_union_operation (op);
939           ibf_destroy (diff_ibf);
940           return GNUNET_SYSERR;
941         }
942       }
943       else
944       {
945         GNUNET_STATISTICS_update (_GSS_statistics,
946                                   "# of failed union operations (too large)",
947                                   1,
948                                   GNUNET_NO);
949         // XXX: Send the whole set, element-by-element
950         LOG (GNUNET_ERROR_TYPE_ERROR,
951              "set union failed: reached ibf limit\n");
952         fail_union_operation (op);
953         ibf_destroy (diff_ibf);
954         return GNUNET_SYSERR;
955       }
956       break;
957     }
958     if (GNUNET_NO == res)
959     {
960       struct GNUNET_MQ_Envelope *ev;
961
962       LOG (GNUNET_ERROR_TYPE_DEBUG,
963            "transmitted all values, sending DONE\n");
964       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
965       GNUNET_MQ_send (op->mq, ev);
966       /* We now wait until we get a DONE message back
967        * and then wait for our MQ to be flushed and all our
968        * demands be delivered. */
969       break;
970     }
971     if (1 == side)
972     {
973       struct IBF_Key unsalted_key;
974       unsalt_key (&key, op->state->salt_receive, &unsalted_key);
975       send_offers_for_key (op, unsalted_key);
976     }
977     else if (-1 == side)
978     {
979       struct GNUNET_MQ_Envelope *ev;
980       struct InquiryMessage *msg;
981
982       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
983        * the effort additional complexity. */
984       ev = GNUNET_MQ_msg_extra (msg,
985                                 sizeof (struct IBF_Key),
986                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
987       msg->salt = htonl (op->state->salt_receive);
988       GNUNET_memcpy (&msg[1],
989               &key,
990               sizeof (struct IBF_Key));
991       LOG (GNUNET_ERROR_TYPE_DEBUG,
992            "sending element inquiry for IBF key %lx\n",
993            (unsigned long) key.key_val);
994       GNUNET_MQ_send (op->mq, ev);
995     }
996     else
997     {
998       GNUNET_assert (0);
999     }
1000   }
1001   ibf_destroy (diff_ibf);
1002   return GNUNET_OK;
1003 }
1004
1005
1006 /**
1007  * Handle an IBF message from a remote peer.
1008  *
1009  * Reassemble the IBF from multiple pieces, and
1010  * process the whole IBF once possible.
1011  *
1012  * @param cls the union operation
1013  * @param mh the header of the message
1014  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1015  *         #GNUNET_OK otherwise
1016  */
1017 static int
1018 handle_p2p_ibf (void *cls,
1019                 const struct GNUNET_MessageHeader *mh)
1020 {
1021   struct Operation *op = cls;
1022   const struct IBFMessage *msg;
1023   unsigned int buckets_in_message;
1024
1025   if (ntohs (mh->size) < sizeof (struct IBFMessage))
1026   {
1027     GNUNET_break_op (0);
1028     fail_union_operation (op);
1029     return GNUNET_SYSERR;
1030   }
1031   msg = (const struct IBFMessage *) mh;
1032   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1033        (op->state->phase == PHASE_EXPECT_IBF) )
1034   {
1035     op->state->phase = PHASE_EXPECT_IBF_CONT;
1036     GNUNET_assert (NULL == op->state->remote_ibf);
1037     LOG (GNUNET_ERROR_TYPE_DEBUG,
1038          "Creating new ibf of size %u\n",
1039          1 << msg->order);
1040     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1041     op->state->salt_receive = ntohl (msg->salt);
1042     LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1043     if (NULL == op->state->remote_ibf)
1044     {
1045       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1046                   "Failed to parse remote IBF, closing connection\n");
1047       fail_union_operation (op);
1048       return GNUNET_SYSERR;
1049     }
1050     op->state->ibf_buckets_received = 0;
1051     if (0 != ntohl (msg->offset))
1052     {
1053       GNUNET_break_op (0);
1054       fail_union_operation (op);
1055       return GNUNET_SYSERR;
1056     }
1057   }
1058   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1059   {
1060     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1061     {
1062       GNUNET_break_op (0);
1063       fail_union_operation (op);
1064       return GNUNET_SYSERR;
1065     }
1066     if (1<<msg->order != op->state->remote_ibf->size)
1067     {
1068       GNUNET_break_op (0);
1069       fail_union_operation (op);
1070       return GNUNET_SYSERR;
1071     }
1072     if (ntohl (msg->salt) != op->state->salt_receive)
1073     {
1074       GNUNET_break_op (0);
1075       fail_union_operation (op);
1076       return GNUNET_SYSERR;
1077     }
1078   }
1079   else
1080   {
1081     GNUNET_assert (0);
1082   }
1083
1084   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1085
1086   if (0 == buckets_in_message)
1087   {
1088     GNUNET_break_op (0);
1089     fail_union_operation (op);
1090     return GNUNET_SYSERR;
1091   }
1092
1093   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1094   {
1095     GNUNET_break_op (0);
1096     fail_union_operation (op);
1097     return GNUNET_SYSERR;
1098   }
1099
1100   GNUNET_assert (NULL != op->state->remote_ibf);
1101
1102   ibf_read_slice (&msg[1],
1103                   op->state->ibf_buckets_received,
1104                   buckets_in_message,
1105                   op->state->remote_ibf);
1106   op->state->ibf_buckets_received += buckets_in_message;
1107
1108   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1109   {
1110     LOG (GNUNET_ERROR_TYPE_DEBUG,
1111          "received full ibf\n");
1112     op->state->phase = PHASE_INVENTORY_ACTIVE;
1113     if (GNUNET_OK !=
1114         decode_and_send (op))
1115     {
1116       /* Internal error, best we can do is shut down */
1117       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1118                   "Failed to decode IBF, closing connection\n");
1119       return GNUNET_SYSERR;
1120     }
1121   }
1122   return GNUNET_OK;
1123 }
1124
1125
1126 /**
1127  * Send a result message to the client indicating
1128  * that there is a new element.
1129  *
1130  * @param op union operation
1131  * @param element element to send
1132  * @param status status to send with the new element
1133  */
1134 static void
1135 send_client_element (struct Operation *op,
1136                      struct GNUNET_SET_Element *element,
1137                      int status)
1138 {
1139   struct GNUNET_MQ_Envelope *ev;
1140   struct GNUNET_SET_ResultMessage *rm;
1141
1142   LOG (GNUNET_ERROR_TYPE_DEBUG,
1143        "sending element (size %u) to client\n",
1144        element->size);
1145   GNUNET_assert (0 != op->spec->client_request_id);
1146   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1147   if (NULL == ev)
1148   {
1149     GNUNET_MQ_discard (ev);
1150     GNUNET_break (0);
1151     return;
1152   }
1153   rm->result_status = htons (status);
1154   rm->request_id = htonl (op->spec->client_request_id);
1155   rm->element_type = element->element_type;
1156   GNUNET_memcpy (&rm[1], element->data, element->size);
1157   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1158 }
1159
1160
1161 /**
1162  * Signal to the client that the operation has finished and
1163  * destroy the operation.
1164  *
1165  * @param cls operation to destroy
1166  */
1167 static void
1168 send_done_and_destroy (void *cls)
1169 {
1170   struct Operation *op = cls;
1171   struct GNUNET_MQ_Envelope *ev;
1172   struct GNUNET_SET_ResultMessage *rm;
1173
1174   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1175   rm->request_id = htonl (op->spec->client_request_id);
1176   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1177   rm->element_type = htons (0);
1178   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1179   /* Will also call the union-specific cancel function. */
1180   _GSS_operation_destroy (op, GNUNET_YES);
1181 }
1182
1183
1184 static void
1185 maybe_finish (struct Operation *op)
1186 {
1187   unsigned int num_demanded;
1188
1189   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1190
1191   if (PHASE_FINISH_WAITING == op->state->phase)
1192   {
1193     LOG (GNUNET_ERROR_TYPE_DEBUG,
1194          "In PHASE_FINISH_WAITING, pending %u demands\n",
1195          num_demanded);
1196     if (0 == num_demanded)
1197     {
1198       struct GNUNET_MQ_Envelope *ev;
1199
1200       op->state->phase = PHASE_DONE;
1201       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1202       GNUNET_MQ_send (op->mq, ev);
1203
1204       /* We now wait until the other peer closes the channel
1205        * after it got all elements from us. */
1206     }
1207   }
1208   if (PHASE_FINISH_CLOSING == op->state->phase)
1209   {
1210     LOG (GNUNET_ERROR_TYPE_DEBUG,
1211          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1212          num_demanded);
1213     if (0 == num_demanded)
1214     {
1215       op->state->phase = PHASE_DONE;
1216       send_done_and_destroy (op);
1217     }
1218   }
1219 }
1220
1221
1222 /**
1223  * Handle an element message from a remote peer.
1224  * Sent by the other peer either because we decoded an IBF and placed a demand,
1225  * or because the other peer switched to full set transmission.
1226  *
1227  * @param cls the union operation
1228  * @param mh the message
1229  */
1230 static void
1231 handle_p2p_elements (void *cls,
1232                      const struct GNUNET_MessageHeader *mh)
1233 {
1234   struct Operation *op = cls;
1235   struct ElementEntry *ee;
1236   const struct GNUNET_SET_ElementMessage *emsg;
1237   uint16_t element_size;
1238
1239   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1240   {
1241     GNUNET_break_op (0);
1242     fail_union_operation (op);
1243     return;
1244   }
1245   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1246   {
1247     GNUNET_break_op (0);
1248     fail_union_operation (op);
1249     return;
1250   }
1251
1252   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1253
1254   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1255   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1256   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1257   ee->element.size = element_size;
1258   ee->element.data = &ee[1];
1259   ee->element.element_type = ntohs (emsg->element_type);
1260   ee->remote = GNUNET_YES;
1261   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1262
1263   if (GNUNET_NO ==
1264       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1265                                             &ee->element_hash,
1266                                             NULL))
1267   {
1268     /* We got something we didn't demand, since it's not in our map. */
1269     GNUNET_break_op (0);
1270     GNUNET_free (ee);
1271     fail_union_operation (op);
1272     return;
1273   }
1274
1275   LOG (GNUNET_ERROR_TYPE_DEBUG,
1276        "Got element (size %u, hash %s) from peer\n",
1277        (unsigned int) element_size,
1278        GNUNET_h2s (&ee->element_hash));
1279
1280   GNUNET_STATISTICS_update (_GSS_statistics,
1281                             "# received elements",
1282                             1,
1283                             GNUNET_NO);
1284   GNUNET_STATISTICS_update (_GSS_statistics,
1285                             "# exchanged elements",
1286                             1,
1287                             GNUNET_NO);
1288
1289   op->state->received_total += 1;
1290
1291   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1292   {
1293     /* Got repeated element.  Should not happen since
1294      * we track demands. */
1295     GNUNET_STATISTICS_update (_GSS_statistics,
1296                               "# repeated elements",
1297                               1,
1298                               GNUNET_NO);
1299     GNUNET_free (ee);
1300   }
1301   else
1302   {
1303     LOG (GNUNET_ERROR_TYPE_DEBUG,
1304          "Registering new element from remote peer\n");
1305     op->state->received_fresh += 1;
1306     op_register_element (op, ee);
1307     /* only send results immediately if the client wants it */
1308     switch (op->spec->result_mode)
1309     {
1310       case GNUNET_SET_RESULT_ADDED:
1311         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1312         break;
1313       case GNUNET_SET_RESULT_SYMMETRIC:
1314         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1315         break;
1316       default:
1317         /* Result mode not supported, should have been caught earlier. */
1318         GNUNET_break (0);
1319         break;
1320     }
1321   }
1322
1323   if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1324   {
1325     /* The other peer gave us lots of old elements, there's something wrong. */
1326     GNUNET_break_op (0);
1327     fail_union_operation (op);
1328     return;
1329   }
1330
1331   maybe_finish (op);
1332 }
1333
1334
1335 /**
1336  * Send offers (for GNUNET_Hash-es) in response
1337  * to inquiries (for IBF_Key-s).
1338  *
1339  * @param cls the union operation
1340  * @param mh the message
1341  */
1342 static void
1343 handle_p2p_inquiry (void *cls,
1344                     const struct GNUNET_MessageHeader *mh)
1345 {
1346   struct Operation *op = cls;
1347   const struct IBF_Key *ibf_key;
1348   unsigned int num_keys;
1349   struct InquiryMessage *msg;
1350
1351   /* look up elements and send them */
1352   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1353   {
1354     GNUNET_break_op (0);
1355     fail_union_operation (op);
1356     return;
1357   }
1358   num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1359       / sizeof (struct IBF_Key);
1360   if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1361       != num_keys * sizeof (struct IBF_Key))
1362   {
1363     GNUNET_break_op (0);
1364     fail_union_operation (op);
1365     return;
1366   }
1367
1368   msg = (struct InquiryMessage *) mh;
1369
1370   ibf_key = (const struct IBF_Key *) &msg[1];
1371   while (0 != num_keys--)
1372   {
1373     struct IBF_Key unsalted_key;
1374     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1375     send_offers_for_key (op, unsalted_key);
1376     ibf_key++;
1377   }
1378 }
1379
1380
1381 /**
1382  * FIXME
1383  */
1384 static void
1385 handle_p2p_demand (void *cls,
1386                    const struct GNUNET_MessageHeader *mh)
1387 {
1388   struct Operation *op = cls;
1389   struct ElementEntry *ee;
1390   struct GNUNET_SET_ElementMessage *emsg;
1391   const struct GNUNET_HashCode *hash;
1392   unsigned int num_hashes;
1393   struct GNUNET_MQ_Envelope *ev;
1394
1395   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1396     / sizeof (struct GNUNET_HashCode);
1397   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1398       != num_hashes * sizeof (struct GNUNET_HashCode))
1399   {
1400     GNUNET_break_op (0);
1401     fail_union_operation (op);
1402     return;
1403   }
1404
1405   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1406        num_hashes > 0;
1407        hash++, num_hashes--)
1408   {
1409     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1410     if (NULL == ee)
1411     {
1412       /* Demand for non-existing element. */
1413       GNUNET_break_op (0);
1414       fail_union_operation (op);
1415       return;
1416     }
1417     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1418     {
1419       /* Probably confused lazily copied sets. */
1420       GNUNET_break_op (0);
1421       fail_union_operation (op);
1422       return;
1423     }
1424     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1425     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1426     emsg->reserved = htons (0);
1427     emsg->element_type = htons (ee->element.element_type);
1428     LOG (GNUNET_ERROR_TYPE_DEBUG,
1429          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1430          (void *) op,
1431          (unsigned int) ee->element.size,
1432          GNUNET_h2s (&ee->element_hash));
1433     GNUNET_MQ_send (op->mq, ev);
1434     GNUNET_STATISTICS_update (_GSS_statistics,
1435                               "# exchanged elements",
1436                               1,
1437                               GNUNET_NO);
1438
1439     switch (op->spec->result_mode)
1440     {
1441       case GNUNET_SET_RESULT_ADDED:
1442         /* Nothing to do. */
1443         break;
1444       case GNUNET_SET_RESULT_SYMMETRIC:
1445         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1446         break;
1447       default:
1448         /* Result mode not supported, should have been caught earlier. */
1449         GNUNET_break (0);
1450         break;
1451     }
1452   }
1453 }
1454
1455
1456 /**
1457  * Handle offers (of GNUNET_HashCode-s) and
1458  * respond with demands (of GNUNET_HashCode-s).
1459  *
1460  * @param cls the union operation
1461  * @param mh the message
1462  */
1463 static void
1464 handle_p2p_offer (void *cls,
1465                     const struct GNUNET_MessageHeader *mh)
1466 {
1467   struct Operation *op = cls;
1468   const struct GNUNET_HashCode *hash;
1469   unsigned int num_hashes;
1470
1471   /* look up elements and send them */
1472   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1473        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1474   {
1475     GNUNET_break_op (0);
1476     fail_union_operation (op);
1477     return;
1478   }
1479   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1480     / sizeof (struct GNUNET_HashCode);
1481   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1482       != num_hashes * sizeof (struct GNUNET_HashCode))
1483   {
1484     GNUNET_break_op (0);
1485     fail_union_operation (op);
1486     return;
1487   }
1488
1489   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1490        num_hashes > 0;
1491        hash++, num_hashes--)
1492   {
1493     struct ElementEntry *ee;
1494     struct GNUNET_MessageHeader *demands;
1495     struct GNUNET_MQ_Envelope *ev;
1496
1497     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1498                                             hash);
1499     if (NULL != ee)
1500       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1501         continue;
1502
1503     if (GNUNET_YES ==
1504         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1505                                                 hash))
1506     {
1507       LOG (GNUNET_ERROR_TYPE_DEBUG,
1508            "Skipped sending duplicate demand\n");
1509       continue;
1510     }
1511
1512     GNUNET_assert (GNUNET_OK ==
1513                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1514                                                       hash,
1515                                                       NULL,
1516                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1517
1518     LOG (GNUNET_ERROR_TYPE_DEBUG,
1519          "[OP %x] Requesting element (hash %s)\n",
1520          (void *) op, GNUNET_h2s (hash));
1521     ev = GNUNET_MQ_msg_header_extra (demands,
1522                                      sizeof (struct GNUNET_HashCode),
1523                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1524     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1525     GNUNET_MQ_send (op->mq, ev);
1526   }
1527 }
1528
1529
1530 /**
1531  * Handle a done message from a remote peer
1532  *
1533  * @param cls the union operation
1534  * @param mh the message
1535  */
1536 static void
1537 handle_p2p_done (void *cls,
1538                  const struct GNUNET_MessageHeader *mh)
1539 {
1540   struct Operation *op = cls;
1541
1542   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1543   {
1544     /* We got all requests, but still have to send our elements in response. */
1545
1546     op->state->phase = PHASE_FINISH_WAITING;
1547
1548     LOG (GNUNET_ERROR_TYPE_DEBUG,
1549          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1550     /* The active peer is done sending offers
1551      * and inquiries.  This means that all
1552      * our responses to that (demands and offers)
1553      * must be in flight (queued or in mesh).
1554      *
1555      * We should notify the active peer once
1556      * all our demands are satisfied, so that the active
1557      * peer can quit if we gave him everything.
1558      */
1559     maybe_finish (op);
1560     return;
1561   }
1562   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1563   {
1564     LOG (GNUNET_ERROR_TYPE_DEBUG,
1565          "got DONE (as active partner), waiting to finish\n");
1566     /* All demands of the other peer are satisfied,
1567      * and we processed all offers, thus we know
1568      * exactly what our demands must be.
1569      *
1570      * We'll close the channel
1571      * to the other peer once our demands are met.
1572      */
1573     op->state->phase = PHASE_FINISH_CLOSING;
1574     maybe_finish (op);
1575     return;
1576   }
1577   GNUNET_break_op (0);
1578   fail_union_operation (op);
1579 }
1580
1581
1582 /**
1583  * Initiate operation to evaluate a set union with a remote peer.
1584  *
1585  * @param op operation to perform (to be initialized)
1586  * @param opaque_context message to be transmitted to the listener
1587  *        to convince him to accept, may be NULL
1588  */
1589 static void
1590 union_evaluate (struct Operation *op,
1591                 const struct GNUNET_MessageHeader *opaque_context)
1592 {
1593   struct GNUNET_MQ_Envelope *ev;
1594   struct OperationRequestMessage *msg;
1595
1596   GNUNET_assert (NULL == op->state);
1597   op->state = GNUNET_new (struct OperationState);
1598   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1599   /* copy the current generation's strata estimator for this operation */
1600   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1601   /* we started the operation, thus we have to send the operation request */
1602   op->state->phase = PHASE_EXPECT_SE;
1603   op->state->salt_receive = op->state->salt_send = 42;
1604   LOG (GNUNET_ERROR_TYPE_DEBUG,
1605        "Initiating union operation evaluation\n");
1606   GNUNET_STATISTICS_update (_GSS_statistics,
1607                             "# of total union operations",
1608                             1,
1609                             GNUNET_NO);
1610   GNUNET_STATISTICS_update (_GSS_statistics,
1611                             "# of initiated union operations",
1612                             1,
1613                             GNUNET_NO);
1614   ev = GNUNET_MQ_msg_nested_mh (msg,
1615                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1616                                 opaque_context);
1617   if (NULL == ev)
1618   {
1619     /* the context message is too large */
1620     GNUNET_break (0);
1621     GNUNET_SERVICE_client_drop (op->spec->set->client);
1622     return;
1623   }
1624   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1625   GNUNET_MQ_send (op->mq,
1626                   ev);
1627
1628   if (NULL != opaque_context)
1629     LOG (GNUNET_ERROR_TYPE_DEBUG,
1630          "sent op request with context message\n");
1631   else
1632     LOG (GNUNET_ERROR_TYPE_DEBUG,
1633          "sent op request without context message\n");
1634 }
1635
1636
1637 /**
1638  * Accept an union operation request from a remote peer.
1639  * Only initializes the private operation state.
1640  *
1641  * @param op operation that will be accepted as a union operation
1642  */
1643 static void
1644 union_accept (struct Operation *op)
1645 {
1646   LOG (GNUNET_ERROR_TYPE_DEBUG,
1647        "accepting set union operation\n");
1648   GNUNET_assert (NULL == op->state);
1649
1650   GNUNET_STATISTICS_update (_GSS_statistics,
1651                             "# of accepted union operations",
1652                             1,
1653                             GNUNET_NO);
1654   GNUNET_STATISTICS_update (_GSS_statistics,
1655                             "# of total union operations",
1656                             1,
1657                             GNUNET_NO);
1658
1659   op->state = GNUNET_new (struct OperationState);
1660   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1661   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1662   op->state->salt_receive = op->state->salt_send = 42;
1663   /* kick off the operation */
1664   send_strata_estimator (op);
1665 }
1666
1667
1668 /**
1669  * Create a new set supporting the union operation
1670  *
1671  * We maintain one strata estimator per set and then manipulate it over the
1672  * lifetime of the set, as recreating a strata estimator would be expensive.
1673  *
1674  * @return the newly created set, NULL on error
1675  */
1676 static struct SetState *
1677 union_set_create (void)
1678 {
1679   struct SetState *set_state;
1680
1681   LOG (GNUNET_ERROR_TYPE_DEBUG,
1682        "union set created\n");
1683   set_state = GNUNET_new (struct SetState);
1684   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1685                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
1686   if (NULL == set_state->se)
1687   {
1688     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1689                 "Failed to allocate strata estimator\n");
1690     GNUNET_free (set_state);
1691     return NULL;
1692   }
1693   return set_state;
1694 }
1695
1696
1697 /**
1698  * Add the element from the given element message to the set.
1699  *
1700  * @param set_state state of the set want to add to
1701  * @param ee the element to add to the set
1702  */
1703 static void
1704 union_add (struct SetState *set_state, struct ElementEntry *ee)
1705 {
1706   strata_estimator_insert (set_state->se,
1707                            get_ibf_key (&ee->element_hash));
1708 }
1709
1710
1711 /**
1712  * Remove the element given in the element message from the set.
1713  * Only marks the element as removed, so that older set operations can still exchange it.
1714  *
1715  * @param set_state state of the set to remove from
1716  * @param ee set element to remove
1717  */
1718 static void
1719 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1720 {
1721   strata_estimator_remove (set_state->se,
1722                            get_ibf_key (&ee->element_hash));
1723 }
1724
1725
1726 /**
1727  * Destroy a set that supports the union operation.
1728  *
1729  * @param set_state the set to destroy
1730  */
1731 static void
1732 union_set_destroy (struct SetState *set_state)
1733 {
1734   if (NULL != set_state->se)
1735   {
1736     strata_estimator_destroy (set_state->se);
1737     set_state->se = NULL;
1738   }
1739   GNUNET_free (set_state);
1740 }
1741
1742
1743 /**
1744  * Dispatch messages for a union operation.
1745  *
1746  * @param op the state of the union evaluate operation
1747  * @param mh the received message
1748  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1749  *         #GNUNET_OK otherwise
1750  */
1751 int
1752 union_handle_p2p_message (struct Operation *op,
1753                           const struct GNUNET_MessageHeader *mh)
1754 {
1755   //LOG (GNUNET_ERROR_TYPE_DEBUG,
1756   //            "received p2p message (t: %u, s: %u)\n",
1757   //            ntohs (mh->type),
1758   //            ntohs (mh->size));
1759   switch (ntohs (mh->type))
1760   {
1761     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1762       return handle_p2p_ibf (op, mh);
1763     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1764       return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1765     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1766       return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1767     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1768       handle_p2p_elements (op, mh);
1769       break;
1770     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1771       handle_p2p_inquiry (op, mh);
1772       break;
1773     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1774       handle_p2p_done (op, mh);
1775       break;
1776     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1777       handle_p2p_offer (op, mh);
1778       break;
1779     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1780       handle_p2p_demand (op, mh);
1781       break;
1782     default:
1783       /* Something wrong with cadet's message handlers? */
1784       GNUNET_assert (0);
1785   }
1786   return GNUNET_OK;
1787 }
1788
1789
1790 /**
1791  * Handler for peer-disconnects, notifies the client
1792  * about the aborted operation in case the op was not concluded.
1793  *
1794  * @param op the destroyed operation
1795  */
1796 static void
1797 union_peer_disconnect (struct Operation *op)
1798 {
1799   if (PHASE_DONE != op->state->phase)
1800   {
1801     struct GNUNET_MQ_Envelope *ev;
1802     struct GNUNET_SET_ResultMessage *msg;
1803
1804     ev = GNUNET_MQ_msg (msg,
1805                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1806     msg->request_id = htonl (op->spec->client_request_id);
1807     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1808     msg->element_type = htons (0);
1809     GNUNET_MQ_send (op->spec->set->client_mq,
1810                     ev);
1811     LOG (GNUNET_ERROR_TYPE_WARNING,
1812          "other peer disconnected prematurely, phase %u\n",
1813          op->state->phase);
1814     _GSS_operation_destroy (op,
1815                             GNUNET_YES);
1816     return;
1817   }
1818   // else: the session has already been concluded
1819   LOG (GNUNET_ERROR_TYPE_DEBUG,
1820        "other peer disconnected (finished)\n");
1821   if (GNUNET_NO == op->state->client_done_sent)
1822     send_done_and_destroy (op);
1823 }
1824
1825
1826 /**
1827  * Copy union-specific set state.
1828  *
1829  * @param set source set for copying the union state
1830  * @return a copy of the union-specific set state
1831  */
1832 static struct SetState *
1833 union_copy_state (struct Set *set)
1834 {
1835   struct SetState *new_state;
1836
1837   new_state = GNUNET_new (struct SetState);
1838   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1839   new_state->se = strata_estimator_dup (set->state->se);
1840
1841   return new_state;
1842 }
1843
1844
1845 /**
1846  * Get the table with implementing functions for
1847  * set union.
1848  *
1849  * @return the operation specific VTable
1850  */
1851 const struct SetVT *
1852 _GSS_union_vt ()
1853 {
1854   static const struct SetVT union_vt = {
1855     .create = &union_set_create,
1856     .msg_handler = &union_handle_p2p_message,
1857     .add = &union_add,
1858     .remove = &union_remove,
1859     .destroy_set = &union_set_destroy,
1860     .evaluate = &union_evaluate,
1861     .accept = &union_accept,
1862     .peer_disconnect = &union_peer_disconnect,
1863     .cancel = &union_op_cancel,
1864     .copy_state = &union_copy_state,
1865   };
1866
1867   return &union_vt;
1868 }