- another fix to generation handling and lazy copying
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2015 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22  * @brief two-peer set operations
23  * @author Florian Dold
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
28 #include "ibf.h"
29 #include "gnunet-service-set_union_strata_estimator.h"
30 #include "gnunet-service-set_protocol.h"
31 #include <gcrypt.h>
32
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
35
36
37 /**
38  * Number of IBFs in a strata estimator.
39  */
40 #define SE_STRATA_COUNT 32
41 /**
42  * Size of the IBFs in the strata estimator.
43  */
44 #define SE_IBF_SIZE 80
45 /**
46  * The hash num parameter for the difference digests and strata estimators.
47  */
48 #define SE_IBF_HASH_NUM 4
49
50 /**
51  * Number of buckets that can be transmitted in one message.
52  */
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54
55 /**
56  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57  * Choose this value so that computing the IBF is still cheaper
58  * than transmitting all values.
59  */
60 #define MAX_IBF_ORDER (16)
61
62 /**
63  * Number of buckets used in the ibf per estimated
64  * difference.
65  */
66 #define IBF_ALPHA 4
67
68
69 /**
70  * Current phase we are in for a union operation.
71  */
72 enum UnionOperationPhase
73 {
74   /**
75    * We sent the request message, and expect a strata estimator.
76    */
77   PHASE_EXPECT_SE,
78
79   /**
80    * We sent the strata estimator, and expect an IBF. This phase is entered once
81    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
82    *
83    * XXX: could use better wording.
84    *
85    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
86    */
87   PHASE_EXPECT_IBF,
88
89   /**
90    * Continuation for multi part IBFs.
91    */
92   PHASE_EXPECT_IBF_CONT,
93
94   /**
95    * We are decoding an IBF.
96    */
97   PHASE_INVENTORY_ACTIVE,
98
99   /**
100    * The other peer is decoding the IBF we just sent.
101    */
102   PHASE_INVENTORY_PASSIVE,
103
104   /**
105    * The protocol is almost finished, but we still have to flush our message
106    * queue and/or expect some elements.
107    */
108   PHASE_FINISH_CLOSING,
109
110   /**
111    * In the penultimate phase,
112    * we wait until all our demands
113    * are satisfied.  Then we send a done
114    * message, and wait for another done message.*/
115   PHASE_FINISH_WAITING,
116
117   /**
118    * In the ultimate phase, we wait until
119    * our demands are satisfied and then
120    * quit (sending another DONE message). */
121   PHASE_DONE,
122 };
123
124
125 /**
126  * State of an evaluate operation with another peer.
127  */
128 struct OperationState
129 {
130   /**
131    * Copy of the set's strata estimator at the time of
132    * creation of this operation.
133    */
134   struct StrataEstimator *se;
135
136   /**
137    * The IBF we currently receive.
138    */
139   struct InvertibleBloomFilter *remote_ibf;
140
141   /**
142    * The IBF with the local set's element.
143    */
144   struct InvertibleBloomFilter *local_ibf;
145
146   /**
147    * Maps IBF-Keys (specific to the current salt) to elements.
148    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
149    * Colliding IBF-Keys are linked.
150    */
151   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
152
153   /**
154    * Current state of the operation.
155    */
156   enum UnionOperationPhase phase;
157
158   /**
159    * Did we send the client that we are done?
160    */
161   int client_done_sent;
162
163   /**
164    * Number of ibf buckets already received into the @a remote_ibf.
165    */
166   unsigned int ibf_buckets_received;
167
168   /**
169    * Hashes for elements that we have demanded from the other peer.
170    */
171   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
172 };
173
174
175 /**
176  * The key entry is used to associate an ibf key with an element.
177  */
178 struct KeyEntry
179 {
180   /**
181    * IBF key for the entry, derived from the current salt.
182    */
183   struct IBF_Key ibf_key;
184
185   /**
186    * The actual element associated with the key.
187    *
188    * Only owned by the union operation if element->operation
189    * is #GNUNET_YES.
190    */
191   struct ElementEntry *element;
192 };
193
194
195 /**
196  * Used as a closure for sending elements
197  * with a specific IBF key.
198  */
199 struct SendElementClosure
200 {
201   /**
202    * The IBF key whose matching elements should be
203    * sent.
204    */
205   struct IBF_Key ibf_key;
206
207   /**
208    * Operation for which the elements
209    * should be sent.
210    */
211   struct Operation *op;
212 };
213
214
215 /**
216  * Extra state required for efficient set union.
217  */
218   struct SetState
219 {
220   /**
221    * The strata estimator is only generated once for
222    * each set.
223    * The IBF keys are derived from the element hashes with
224    * salt=0.
225    */
226   struct StrataEstimator *se;
227 };
228
229
230 /**
231  * Iterator over hash map entries, called to
232  * destroy the linked list of colliding ibf key entries.
233  *
234  * @param cls closure
235  * @param key current key code
236  * @param value value in the hash map
237  * @return #GNUNET_YES if we should continue to iterate,
238  *         #GNUNET_NO if not.
239  */
240 static int
241 destroy_key_to_element_iter (void *cls,
242                              uint32_t key,
243                              void *value)
244 {
245   struct KeyEntry *k = value;
246
247   GNUNET_assert (NULL != k);
248   if (GNUNET_YES == k->element->remote)
249   {
250     GNUNET_free (k->element);
251     k->element = NULL;
252   }
253   GNUNET_free (k);
254   return GNUNET_YES;
255 }
256
257
258 /**
259  * Destroy the union operation.  Only things specific to the union
260  * operation are destroyed.
261  *
262  * @param op union operation to destroy
263  */
264 static void
265 union_op_cancel (struct Operation *op)
266 {
267   LOG (GNUNET_ERROR_TYPE_DEBUG,
268        "destroying union op\n");
269   /* check if the op was canceled twice */
270   GNUNET_assert (NULL != op->state);
271   if (NULL != op->state->remote_ibf)
272   {
273     ibf_destroy (op->state->remote_ibf);
274     op->state->remote_ibf = NULL;
275   }
276   if (NULL != op->state->demanded_hashes)
277   {
278     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
279     op->state->demanded_hashes = NULL;
280   }
281   if (NULL != op->state->local_ibf)
282   {
283     ibf_destroy (op->state->local_ibf);
284     op->state->local_ibf = NULL;
285   }
286   if (NULL != op->state->se)
287   {
288     strata_estimator_destroy (op->state->se);
289     op->state->se = NULL;
290   }
291   if (NULL != op->state->key_to_element)
292   {
293     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
294                                              &destroy_key_to_element_iter,
295                                              NULL);
296     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
297     op->state->key_to_element = NULL;
298   }
299   GNUNET_free (op->state);
300   op->state = NULL;
301   LOG (GNUNET_ERROR_TYPE_DEBUG,
302        "destroying union op done\n");
303 }
304
305
306 /**
307  * Inform the client that the union operation has failed,
308  * and proceed to destroy the evaluate operation.
309  *
310  * @param op the union operation to fail
311  */
312 static void
313 fail_union_operation (struct Operation *op)
314 {
315   struct GNUNET_MQ_Envelope *ev;
316   struct GNUNET_SET_ResultMessage *msg;
317
318   LOG (GNUNET_ERROR_TYPE_ERROR,
319        "union operation failed\n");
320   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
321   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
322   msg->request_id = htonl (op->spec->client_request_id);
323   msg->element_type = htons (0);
324   GNUNET_MQ_send (op->spec->set->client_mq, ev);
325   _GSS_operation_destroy (op, GNUNET_YES);
326 }
327
328
329 /**
330  * Derive the IBF key from a hash code and
331  * a salt.
332  *
333  * @param src the hash code
334  * @param salt salt to use
335  * @return the derived IBF key
336  */
337 static struct IBF_Key
338 get_ibf_key (const struct GNUNET_HashCode *src,
339              uint16_t salt)
340 {
341   struct IBF_Key key;
342
343   GNUNET_CRYPTO_kdf (&key, sizeof (key),
344                      src, sizeof *src,
345                      &salt, sizeof (salt),
346                      NULL, 0);
347   return key;
348 }
349
350
351 /**
352  * Iterator over the mapping from IBF keys to element entries.  Checks if we
353  * have an element with a given GNUNET_HashCode.
354  *
355  * @param cls closure
356  * @param key current key code
357  * @param value value in the hash map
358  * @return #GNUNET_YES if we should search further,
359  *         #GNUNET_NO if we've found the element.
360  */
361 static int
362 op_has_element_iterator (void *cls,
363                          uint32_t key,
364                          void *value)
365 {
366   struct GNUNET_HashCode *element_hash = cls;
367   struct KeyEntry *k = value;
368
369   GNUNET_assert (NULL != k);
370   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
371                                    element_hash))
372     return GNUNET_NO;
373   return GNUNET_YES;
374 }
375
376
377 /**
378  * Determine whether the given element is already in the operation's element
379  * set.
380  *
381  * @param op operation that should be tested for 'element_hash'
382  * @param element_hash hash of the element to look for
383  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
384  */
385 static int
386 op_has_element (struct Operation *op,
387                 const struct GNUNET_HashCode *element_hash)
388 {
389   int ret;
390   struct IBF_Key ibf_key;
391
392   ibf_key = get_ibf_key (element_hash, op->spec->salt);
393   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
394                                                       (uint32_t) ibf_key.key_val,
395                                                       op_has_element_iterator,
396                                                       (void *) element_hash);
397
398   /* was the iteration aborted because we found the element? */
399   if (GNUNET_SYSERR == ret)
400     return GNUNET_YES;
401   return GNUNET_NO;
402 }
403
404
405 /**
406  * Insert an element into the union operation's
407  * key-to-element mapping. Takes ownership of 'ee'.
408  * Note that this does not insert the element in the set,
409  * only in the operation's key-element mapping.
410  * This is done to speed up re-tried operations, if some elements
411  * were transmitted, and then the IBF fails to decode.
412  *
413  * XXX: clarify ownership, doesn't sound right.
414  *
415  * @param op the union operation
416  * @param ee the element entry
417  */
418 static void
419 op_register_element (struct Operation *op,
420                      struct ElementEntry *ee)
421 {
422   struct IBF_Key ibf_key;
423   struct KeyEntry *k;
424
425   ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
426   k = GNUNET_new (struct KeyEntry);
427   k->element = ee;
428   k->ibf_key = ibf_key;
429   GNUNET_assert (GNUNET_OK ==
430                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
431                                                       (uint32_t) ibf_key.key_val,
432                                                       k,
433                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
434 }
435
436
437 /**
438  * Insert a key into an ibf.
439  *
440  * @param cls the ibf
441  * @param key unused
442  * @param value the key entry to get the key from
443  */
444 static int
445 prepare_ibf_iterator (void *cls,
446                       uint32_t key,
447                       void *value)
448 {
449   struct Operation *op = cls;
450   struct KeyEntry *ke = value;
451
452   LOG (GNUNET_ERROR_TYPE_DEBUG,
453        "[OP %x] inserting %lx (hash %s) into ibf\n",
454        (void *) op,
455        (unsigned long) ke->ibf_key.key_val,
456        GNUNET_h2s (&ke->element->element_hash));
457   ibf_insert (op->state->local_ibf, ke->ibf_key);
458   return GNUNET_YES;
459 }
460
461
462 /**
463  * Iterator for initializing the
464  * key-to-element mapping of a union operation
465  *
466  * @param cls the union operation `struct Operation *`
467  * @param key unused
468  * @param value the `struct ElementEntry *` to insert
469  *        into the key-to-element mapping
470  * @return #GNUNET_YES (to continue iterating)
471  */
472 static int
473 init_key_to_element_iterator (void *cls,
474                               const struct GNUNET_HashCode *key,
475                               void *value)
476 {
477   struct Operation *op = cls;
478   struct ElementEntry *ee = value;
479
480   /* make sure that the element belongs to the set at the time
481    * of creating the operation */
482   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
483     return GNUNET_YES;
484
485   GNUNET_assert (GNUNET_NO == ee->remote);
486
487   op_register_element (op, ee);
488   return GNUNET_YES;
489 }
490
491
492 /**
493  * Create an ibf with the operation's elements
494  * of the specified size
495  *
496  * @param op the union operation
497  * @param size size of the ibf to create
498  */
499 static void
500 prepare_ibf (struct Operation *op,
501              uint16_t size)
502 {
503   if (NULL == op->state->key_to_element)
504   {
505     unsigned int len;
506
507     len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
508     op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
509     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
510                                            init_key_to_element_iterator, op);
511   }
512   if (NULL != op->state->local_ibf)
513     ibf_destroy (op->state->local_ibf);
514   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
515   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
516                                            &prepare_ibf_iterator,
517                                            op);
518 }
519
520
521 /**
522  * Send an ibf of appropriate size.
523  *
524  * Fragments the IBF into multiple messages if necessary.
525  *
526  * @param op the union operation
527  * @param ibf_order order of the ibf to send, size=2^order
528  */
529 static void
530 send_ibf (struct Operation *op,
531           uint16_t ibf_order)
532 {
533   unsigned int buckets_sent = 0;
534   struct InvertibleBloomFilter *ibf;
535
536   prepare_ibf (op, 1<<ibf_order);
537
538   LOG (GNUNET_ERROR_TYPE_DEBUG,
539        "sending ibf of size %u\n",
540        1<<ibf_order);
541
542   ibf = op->state->local_ibf;
543
544   while (buckets_sent < (1 << ibf_order))
545   {
546     unsigned int buckets_in_message;
547     struct GNUNET_MQ_Envelope *ev;
548     struct IBFMessage *msg;
549
550     buckets_in_message = (1 << ibf_order) - buckets_sent;
551     /* limit to maximum */
552     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
553       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
554
555     ev = GNUNET_MQ_msg_extra (msg,
556                               buckets_in_message * IBF_BUCKET_SIZE,
557                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
558     msg->reserved = 0;
559     msg->order = ibf_order;
560     msg->offset = htons (buckets_sent);
561     ibf_write_slice (ibf, buckets_sent,
562                      buckets_in_message, &msg[1]);
563     buckets_sent += buckets_in_message;
564     LOG (GNUNET_ERROR_TYPE_DEBUG,
565          "ibf chunk size %u, %u/%u sent\n",
566          buckets_in_message,
567          buckets_sent,
568          1<<ibf_order);
569     GNUNET_MQ_send (op->mq, ev);
570   }
571
572   /* The other peer must decode the IBF, so
573    * we're passive. */
574   op->state->phase = PHASE_INVENTORY_PASSIVE;
575 }
576
577
578 /**
579  * Send a strata estimator to the remote peer.
580  *
581  * @param op the union operation with the remote peer
582  */
583 static void
584 send_strata_estimator (struct Operation *op)
585 {
586   struct GNUNET_MQ_Envelope *ev;
587   struct GNUNET_MessageHeader *strata_msg;
588
589   ev = GNUNET_MQ_msg_header_extra (strata_msg,
590                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
591                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
592   strata_estimator_write (op->state->se, &strata_msg[1]);
593   GNUNET_MQ_send (op->mq,
594                   ev);
595   op->state->phase = PHASE_EXPECT_IBF;
596   LOG (GNUNET_ERROR_TYPE_DEBUG,
597        "sent SE, expecting IBF\n");
598 }
599
600
601 /**
602  * Compute the necessary order of an ibf
603  * from the size of the symmetric set difference.
604  *
605  * @param diff the difference
606  * @return the required size of the ibf
607  */
608 static unsigned int
609 get_order_from_difference (unsigned int diff)
610 {
611   unsigned int ibf_order;
612
613   ibf_order = 2;
614   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
615           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
616     ibf_order++;
617   if (ibf_order > MAX_IBF_ORDER)
618     ibf_order = MAX_IBF_ORDER;
619   return ibf_order;
620 }
621
622
623 /**
624  * Handle a strata estimator from a remote peer
625  *
626  * @param cls the union operation
627  * @param mh the message
628  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
629  *         #GNUNET_OK otherwise
630  */
631 static int
632 handle_p2p_strata_estimator (void *cls,
633                              const struct GNUNET_MessageHeader *mh)
634 {
635   struct Operation *op = cls;
636   struct StrataEstimator *remote_se;
637   int diff;
638
639   if (op->state->phase != PHASE_EXPECT_SE)
640   {
641     fail_union_operation (op);
642     GNUNET_break (0);
643     return GNUNET_SYSERR;
644   }
645   if (ntohs (mh->size) !=
646       SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE +
647       sizeof (struct GNUNET_MessageHeader))
648   {
649     fail_union_operation (op);
650     GNUNET_break (0);
651     return GNUNET_SYSERR;
652   }
653   remote_se = strata_estimator_create (SE_STRATA_COUNT,
654                                        SE_IBF_SIZE,
655                                        SE_IBF_HASH_NUM);
656   strata_estimator_read (&mh[1], remote_se);
657   GNUNET_assert (NULL != op->state->se);
658   diff = strata_estimator_difference (remote_se,
659                                       op->state->se);
660   strata_estimator_destroy (remote_se);
661   strata_estimator_destroy (op->state->se);
662   op->state->se = NULL;
663   LOG (GNUNET_ERROR_TYPE_DEBUG,
664        "got se diff=%d, using ibf size %d\n",
665        diff,
666        1<<get_order_from_difference (diff));
667   send_ibf (op,
668             get_order_from_difference (diff));
669   return GNUNET_OK;
670 }
671
672
673 /**
674  * Iterator to send elements to a remote peer
675  *
676  * @param cls closure with the element key and the union operation
677  * @param key ignored
678  * @param value the key entry
679  */
680 static int
681 send_offers_iterator (void *cls,
682                       uint32_t key,
683                       void *value)
684 {
685   struct SendElementClosure *sec = cls;
686   struct Operation *op = sec->op;
687   struct KeyEntry *ke = value;
688   struct GNUNET_MQ_Envelope *ev;
689   struct GNUNET_MessageHeader *mh;
690
691   /* Detect 32-bit key collision for the 64-bit IBF keys. */
692   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
693     return GNUNET_YES;
694
695   ev = GNUNET_MQ_msg_header_extra (mh,
696                                    sizeof (struct GNUNET_HashCode),
697                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
698
699   GNUNET_assert (NULL != ev);
700   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
701   LOG (GNUNET_ERROR_TYPE_DEBUG,
702        "[OP %x] sending element offer (%s) to peer\n",
703        (void *) op,
704        GNUNET_h2s (&ke->element->element_hash));
705   GNUNET_MQ_send (op->mq, ev);
706   return GNUNET_YES;
707 }
708
709
710 /**
711  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
712  *
713  * @param op union operation
714  * @param ibf_key IBF key of interest
715  */
716 static void
717 send_offers_for_key (struct Operation *op,
718                      struct IBF_Key ibf_key)
719 {
720   struct SendElementClosure send_cls;
721
722   send_cls.ibf_key = ibf_key;
723   send_cls.op = op;
724   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
725                                                        (uint32_t) ibf_key.key_val,
726                                                        &send_offers_iterator,
727                                                        &send_cls);
728 }
729
730
731 /**
732  * Decode which elements are missing on each side, and
733  * send the appropriate offers and inquiries.
734  *
735  * @param op union operation
736  */
737 static void
738 decode_and_send (struct Operation *op)
739 {
740   struct IBF_Key key;
741   struct IBF_Key last_key;
742   int side;
743   unsigned int num_decoded;
744   struct InvertibleBloomFilter *diff_ibf;
745
746   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
747
748   prepare_ibf (op, op->state->remote_ibf->size);
749   diff_ibf = ibf_dup (op->state->local_ibf);
750   ibf_subtract (diff_ibf, op->state->remote_ibf);
751
752   ibf_destroy (op->state->remote_ibf);
753   op->state->remote_ibf = NULL;
754
755   LOG (GNUNET_ERROR_TYPE_DEBUG,
756        "decoding IBF (size=%u)\n",
757        diff_ibf->size);
758
759   num_decoded = 0;
760   last_key.key_val = 0;
761
762   while (1)
763   {
764     int res;
765     int cycle_detected = GNUNET_NO;
766
767     last_key = key;
768
769     res = ibf_decode (diff_ibf, &side, &key);
770     if (res == GNUNET_OK)
771     {
772       LOG (GNUNET_ERROR_TYPE_DEBUG,
773            "decoded ibf key %lx\n",
774            (unsigned long) key.key_val);
775       num_decoded += 1;
776       if ( (num_decoded > diff_ibf->size) ||
777            (num_decoded > 1 && last_key.key_val == key.key_val) )
778       {
779         LOG (GNUNET_ERROR_TYPE_DEBUG,
780              "detected cyclic ibf (decoded %u/%u)\n",
781              num_decoded,
782              diff_ibf->size);
783         cycle_detected = GNUNET_YES;
784       }
785     }
786     if ( (GNUNET_SYSERR == res) ||
787          (GNUNET_YES == cycle_detected) )
788     {
789       int next_order;
790       next_order = 0;
791       while (1<<next_order < diff_ibf->size)
792         next_order++;
793       next_order++;
794       if (next_order <= MAX_IBF_ORDER)
795       {
796         LOG (GNUNET_ERROR_TYPE_DEBUG,
797              "decoding failed, sending larger ibf (size %u)\n",
798              1<<next_order);
799         send_ibf (op, next_order);
800       }
801       else
802       {
803         // XXX: Send the whole set, element-by-element
804         LOG (GNUNET_ERROR_TYPE_ERROR,
805              "set union failed: reached ibf limit\n");
806       }
807       break;
808     }
809     if (GNUNET_NO == res)
810     {
811       struct GNUNET_MQ_Envelope *ev;
812
813       LOG (GNUNET_ERROR_TYPE_DEBUG,
814            "transmitted all values, sending DONE\n");
815       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
816       GNUNET_MQ_send (op->mq, ev);
817       /* We now wait until we get a DONE message back
818        * and then wait for our MQ to be flushed and all our
819        * demands be delivered. */
820       break;
821     }
822     if (1 == side)
823     {
824       send_offers_for_key (op, key);
825     }
826     else if (-1 == side)
827     {
828       struct GNUNET_MQ_Envelope *ev;
829       struct GNUNET_MessageHeader *msg;
830
831       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
832        * the effort additional complexity. */
833       ev = GNUNET_MQ_msg_header_extra (msg,
834                                        sizeof (struct IBF_Key),
835                                        GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
836
837       memcpy (&msg[1],
838               &key,
839               sizeof (struct IBF_Key));
840       LOG (GNUNET_ERROR_TYPE_DEBUG,
841            "sending element inquiry for IBF key %lx\n",
842            (unsigned long) key.key_val);
843       GNUNET_MQ_send (op->mq, ev);
844     }
845     else
846     {
847       GNUNET_assert (0);
848     }
849   }
850   ibf_destroy (diff_ibf);
851 }
852
853
854 /**
855  * Handle an IBF message from a remote peer.
856  *
857  * Reassemble the IBF from multiple pieces, and
858  * process the whole IBF once possible.
859  *
860  * @param cls the union operation
861  * @param mh the header of the message
862  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
863  *         #GNUNET_OK otherwise
864  */
865 static int
866 handle_p2p_ibf (void *cls,
867                 const struct GNUNET_MessageHeader *mh)
868 {
869   struct Operation *op = cls;
870   const struct IBFMessage *msg;
871   unsigned int buckets_in_message;
872
873   if (ntohs (mh->size) < sizeof (struct IBFMessage))
874   {
875     GNUNET_break_op (0);
876     fail_union_operation (op);
877     return GNUNET_SYSERR;
878   }
879   msg = (const struct IBFMessage *) mh;
880   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
881        (op->state->phase == PHASE_EXPECT_IBF) )
882   {
883     op->state->phase = PHASE_EXPECT_IBF_CONT;
884     GNUNET_assert (NULL == op->state->remote_ibf);
885     LOG (GNUNET_ERROR_TYPE_DEBUG,
886          "Creating new ibf of size %u\n",
887          1 << msg->order);
888     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
889     op->state->ibf_buckets_received = 0;
890     if (0 != ntohs (msg->offset))
891     {
892       GNUNET_break_op (0);
893       fail_union_operation (op);
894       return GNUNET_SYSERR;
895     }
896   }
897   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
898   {
899     if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
900          (1<<msg->order != op->state->remote_ibf->size) )
901     {
902       GNUNET_break_op (0);
903       fail_union_operation (op);
904       return GNUNET_SYSERR;
905     }
906   }
907   else
908   {
909     GNUNET_assert (0);
910   }
911
912   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
913
914   if (0 == buckets_in_message)
915   {
916     GNUNET_break_op (0);
917     fail_union_operation (op);
918     return GNUNET_SYSERR;
919   }
920
921   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
922   {
923     GNUNET_break_op (0);
924     fail_union_operation (op);
925     return GNUNET_SYSERR;
926   }
927
928   GNUNET_assert (NULL != op->state->remote_ibf);
929
930   ibf_read_slice (&msg[1],
931                   op->state->ibf_buckets_received,
932                   buckets_in_message,
933                   op->state->remote_ibf);
934   op->state->ibf_buckets_received += buckets_in_message;
935
936   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
937   {
938     LOG (GNUNET_ERROR_TYPE_DEBUG,
939          "received full ibf\n");
940     op->state->phase = PHASE_INVENTORY_ACTIVE;
941     decode_and_send (op);
942   }
943   return GNUNET_OK;
944 }
945
946
947 /**
948  * Send a result message to the client indicating
949  * that there is a new element.
950  *
951  * @param op union operation
952  * @param element element to send
953  * @param status status to send with the new element
954  */
955 static void
956 send_client_element (struct Operation *op,
957                      struct GNUNET_SET_Element *element,
958                      int status)
959 {
960   struct GNUNET_MQ_Envelope *ev;
961   struct GNUNET_SET_ResultMessage *rm;
962
963   LOG (GNUNET_ERROR_TYPE_DEBUG,
964        "sending element (size %u) to client\n",
965        element->size);
966   GNUNET_assert (0 != op->spec->client_request_id);
967   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
968   if (NULL == ev)
969   {
970     GNUNET_MQ_discard (ev);
971     GNUNET_break (0);
972     return;
973   }
974   rm->result_status = htons (status);
975   rm->request_id = htonl (op->spec->client_request_id);
976   rm->element_type = element->element_type;
977   memcpy (&rm[1], element->data, element->size);
978   GNUNET_MQ_send (op->spec->set->client_mq, ev);
979 }
980
981
982 /**
983  * Signal to the client that the operation has finished and
984  * destroy the operation.
985  *
986  * @param cls operation to destroy
987  */
988 static void
989 send_done_and_destroy (void *cls)
990 {
991   struct Operation *op = cls;
992   struct GNUNET_MQ_Envelope *ev;
993   struct GNUNET_SET_ResultMessage *rm;
994
995   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
996   rm->request_id = htonl (op->spec->client_request_id);
997   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
998   rm->element_type = htons (0);
999   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1000   /* Will also call the union-specific cancel function. */
1001   _GSS_operation_destroy (op, GNUNET_YES);
1002 }
1003
1004
1005 static void
1006 maybe_finish (struct Operation *op)
1007 {
1008   unsigned int num_demanded;
1009
1010   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1011
1012   if (PHASE_FINISH_WAITING == op->state->phase)
1013   {
1014     LOG (GNUNET_ERROR_TYPE_DEBUG,
1015          "In PHASE_FINISH_WAITING, pending %u demands\n",
1016          num_demanded);
1017     if (0 == num_demanded)
1018     {
1019       struct GNUNET_MQ_Envelope *ev;
1020
1021       op->state->phase = PHASE_DONE;
1022       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1023       GNUNET_MQ_send (op->mq, ev);
1024
1025       /* We now wait until the other peer closes the channel
1026        * after it got all elements from us. */
1027     }
1028   }
1029   if (PHASE_FINISH_CLOSING == op->state->phase)
1030   {
1031     LOG (GNUNET_ERROR_TYPE_DEBUG,
1032          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1033          num_demanded);
1034     if (0 == num_demanded)
1035     {
1036       op->state->phase = PHASE_DONE;
1037       send_done_and_destroy (op);
1038     }
1039   }
1040 }
1041
1042
1043 /**
1044  * Handle an element message from a remote peer.
1045  *
1046  * @param cls the union operation
1047  * @param mh the message
1048  */
1049 static void
1050 handle_p2p_elements (void *cls,
1051                      const struct GNUNET_MessageHeader *mh)
1052 {
1053   struct Operation *op = cls;
1054   struct ElementEntry *ee;
1055   const struct GNUNET_SET_ElementMessage *emsg;
1056   uint16_t element_size;
1057
1058   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1059   {
1060     GNUNET_break_op (0);
1061     fail_union_operation (op);
1062     return;
1063   }
1064
1065   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1066   {
1067     GNUNET_break_op (0);
1068     fail_union_operation (op);
1069     return;
1070   }
1071
1072   emsg = (struct GNUNET_SET_ElementMessage *) mh;
1073
1074   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1075   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1076   memcpy (&ee[1], &emsg[1], element_size);
1077   ee->element.size = element_size;
1078   ee->element.data = &ee[1];
1079   ee->element.element_type = ntohs (emsg->element_type);
1080   ee->remote = GNUNET_YES;
1081   GNUNET_CRYPTO_hash (ee->element.data,
1082                       ee->element.size,
1083                       &ee->element_hash);
1084
1085   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, &ee->element_hash, NULL))
1086   {
1087     /* We got something we didn't demand, since it's not in our map. */
1088     GNUNET_break_op (0);
1089     GNUNET_free (ee);
1090     fail_union_operation (op);
1091     return;
1092   }
1093
1094   LOG (GNUNET_ERROR_TYPE_DEBUG,
1095        "Got element (size %u, hash %s) from peer\n",
1096        (unsigned int) element_size,
1097        GNUNET_h2s (&ee->element_hash));
1098
1099   if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1100   {
1101     /* Got repeated element.  Should not happen since
1102      * we track demands. */
1103     GNUNET_break (0);
1104     GNUNET_free (ee);
1105   }
1106   else
1107   {
1108     LOG (GNUNET_ERROR_TYPE_DEBUG,
1109          "Registering new element from remote peer\n");
1110     op_register_element (op, ee);
1111     /* only send results immediately if the client wants it */
1112     switch (op->spec->result_mode)
1113     {
1114       case GNUNET_SET_RESULT_ADDED:
1115         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1116         break;
1117       case GNUNET_SET_RESULT_SYMMETRIC:
1118         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1119         break;
1120       default:
1121         /* Result mode not supported, should have been caught earlier. */
1122         GNUNET_break (0);
1123         break;
1124     }
1125   }
1126
1127   maybe_finish (op);
1128 }
1129
1130
1131 /**
1132  * Send offers (for GNUNET_Hash-es) in response
1133  * to inquiries (for IBF_Key-s).
1134  *
1135  * @param cls the union operation
1136  * @param mh the message
1137  */
1138 static void
1139 handle_p2p_inquiry (void *cls,
1140                     const struct GNUNET_MessageHeader *mh)
1141 {
1142   struct Operation *op = cls;
1143   const struct IBF_Key *ibf_key;
1144   unsigned int num_keys;
1145
1146   /* look up elements and send them */
1147   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1148   {
1149     GNUNET_break_op (0);
1150     fail_union_operation (op);
1151     return;
1152   }
1153   num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1154       / sizeof (struct IBF_Key);
1155   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1156       != num_keys * sizeof (struct IBF_Key))
1157   {
1158     GNUNET_break_op (0);
1159     fail_union_operation (op);
1160     return;
1161   }
1162
1163   ibf_key = (const struct IBF_Key *) &mh[1];
1164   while (0 != num_keys--)
1165   {
1166     send_offers_for_key (op, *ibf_key);
1167     ibf_key++;
1168   }
1169 }
1170
1171
1172
1173 static void
1174 handle_p2p_demand (void *cls,
1175                     const struct GNUNET_MessageHeader *mh)
1176 {
1177   struct Operation *op = cls;
1178   struct ElementEntry *ee;
1179   struct GNUNET_SET_ElementMessage *emsg;
1180   const struct GNUNET_HashCode *hash;
1181   unsigned int num_hashes;
1182   struct GNUNET_MQ_Envelope *ev;
1183
1184   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1185     / sizeof (struct GNUNET_HashCode);
1186   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1187       != num_hashes * sizeof (struct GNUNET_HashCode))
1188   {
1189     GNUNET_break_op (0);
1190     fail_union_operation (op);
1191     return;
1192   }
1193
1194   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1195        num_hashes > 0;
1196        hash++, num_hashes--)
1197   {
1198     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1199     if (NULL == ee)
1200     {
1201       /* Demand for non-existing element. */
1202       GNUNET_break_op (0);
1203       fail_union_operation (op);
1204       return;
1205     }
1206     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1207     {
1208       /* Probably confused lazily copied sets. */
1209       GNUNET_break_op (0);
1210       fail_union_operation (op);
1211       return;
1212     }
1213     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1214     memcpy (&emsg[1], ee->element.data, ee->element.size);
1215     emsg->reserved = htons (0);
1216     emsg->element_type = htons (ee->element.element_type);
1217     LOG (GNUNET_ERROR_TYPE_DEBUG,
1218          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1219          (void *) op,
1220          (unsigned int) ee->element.size,
1221          GNUNET_h2s (&ee->element_hash));
1222     GNUNET_MQ_send (op->mq, ev);
1223
1224     switch (op->spec->result_mode)
1225     {
1226       case GNUNET_SET_RESULT_ADDED:
1227         /* Nothing to do. */
1228         break;
1229       case GNUNET_SET_RESULT_SYMMETRIC:
1230         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1231         break;
1232       default:
1233         /* Result mode not supported, should have been caught earlier. */
1234         GNUNET_break (0);
1235         break;
1236     }
1237   }
1238 }
1239
1240
1241 /**
1242  * Handle offers (of GNUNET_HashCode-s) and
1243  * respond with demands (of GNUNET_HashCode-s).
1244  *
1245  * @param cls the union operation
1246  * @param mh the message
1247  */
1248 static void
1249 handle_p2p_offer (void *cls,
1250                     const struct GNUNET_MessageHeader *mh)
1251 {
1252   struct Operation *op = cls;
1253   const struct GNUNET_HashCode *hash;
1254   unsigned int num_hashes;
1255
1256   /* look up elements and send them */
1257   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1258        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1259   {
1260     GNUNET_break_op (0);
1261     fail_union_operation (op);
1262     return;
1263   }
1264   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1265     / sizeof (struct GNUNET_HashCode);
1266   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1267       != num_hashes * sizeof (struct GNUNET_HashCode))
1268   {
1269     GNUNET_break_op (0);
1270     fail_union_operation (op);
1271     return;
1272   }
1273
1274   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1275        num_hashes > 0;
1276        hash++, num_hashes--)
1277   {
1278     struct ElementEntry *ee;
1279     struct GNUNET_MessageHeader *demands;
1280     struct GNUNET_MQ_Envelope *ev;
1281     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1282     if (NULL != ee)
1283       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1284         continue;
1285
1286     if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, hash))
1287     {
1288       LOG (GNUNET_ERROR_TYPE_DEBUG,
1289            "Skipped sending duplicate demand\n");
1290       continue;
1291     }
1292
1293     GNUNET_assert (GNUNET_OK ==
1294                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1295                                                       hash,
1296                                                       NULL,
1297                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1298
1299     LOG (GNUNET_ERROR_TYPE_DEBUG,
1300          "[OP %x] Requesting element (hash %s)\n",
1301          (void *) op, GNUNET_h2s (hash));
1302     ev = GNUNET_MQ_msg_header_extra (demands, sizeof (struct GNUNET_HashCode), GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1303     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1304     GNUNET_MQ_send (op->mq, ev);
1305   }
1306 }
1307
1308
1309 /**
1310  * Handle a done message from a remote peer
1311  *
1312  * @param cls the union operation
1313  * @param mh the message
1314  */
1315 static void
1316 handle_p2p_done (void *cls,
1317                  const struct GNUNET_MessageHeader *mh)
1318 {
1319   struct Operation *op = cls;
1320
1321   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1322   {
1323     /* We got all requests, but still have to send our elements in response. */
1324
1325     op->state->phase = PHASE_FINISH_WAITING;
1326
1327     LOG (GNUNET_ERROR_TYPE_DEBUG,
1328          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1329     /* The active peer is done sending offers
1330      * and inquiries.  This means that all
1331      * our responses to that (demands and offers)
1332      * must be in flight (queued or in mesh).
1333      *
1334      * We should notify the active peer once
1335      * all our demands are satisfied, so that the active
1336      * peer can quit if we gave him everything.
1337      */
1338     maybe_finish (op);
1339     return;
1340   }
1341   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1342   {
1343     LOG (GNUNET_ERROR_TYPE_DEBUG,
1344          "got DONE (as active partner), waiting to finish\n");
1345     /* All demands of the other peer are satisfied,
1346      * and we processed all offers, thus we know
1347      * exactly what our demands must be.
1348      *
1349      * We'll close the channel
1350      * to the other peer once our demands are met.
1351      */
1352     op->state->phase = PHASE_FINISH_CLOSING;
1353     maybe_finish (op);
1354     return;
1355   }
1356   GNUNET_break_op (0);
1357   fail_union_operation (op);
1358 }
1359
1360
1361 /**
1362  * Initiate operation to evaluate a set union with a remote peer.
1363  *
1364  * @param op operation to perform (to be initialized)
1365  * @param opaque_context message to be transmitted to the listener
1366  *        to convince him to accept, may be NULL
1367  */
1368 static void
1369 union_evaluate (struct Operation *op,
1370                 const struct GNUNET_MessageHeader *opaque_context)
1371 {
1372   struct GNUNET_MQ_Envelope *ev;
1373   struct OperationRequestMessage *msg;
1374
1375   GNUNET_assert (NULL == op->state);
1376   op->state = GNUNET_new (struct OperationState);
1377   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1378   /* copy the current generation's strata estimator for this operation */
1379   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1380   /* we started the operation, thus we have to send the operation request */
1381   op->state->phase = PHASE_EXPECT_SE;
1382   LOG (GNUNET_ERROR_TYPE_DEBUG,
1383        "Initiating union operation evaluation\n");
1384   ev = GNUNET_MQ_msg_nested_mh (msg,
1385                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1386                                 opaque_context);
1387   if (NULL == ev)
1388   {
1389     /* the context message is too large */
1390     GNUNET_break (0);
1391     GNUNET_SERVER_client_disconnect (op->spec->set->client);
1392     return;
1393   }
1394   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1395   msg->app_id = op->spec->app_id;
1396   GNUNET_MQ_send (op->mq,
1397                   ev);
1398
1399   if (NULL != opaque_context)
1400     LOG (GNUNET_ERROR_TYPE_DEBUG,
1401          "sent op request with context message\n");
1402   else
1403     LOG (GNUNET_ERROR_TYPE_DEBUG,
1404          "sent op request without context message\n");
1405 }
1406
1407
1408 /**
1409  * Accept an union operation request from a remote peer.
1410  * Only initializes the private operation state.
1411  *
1412  * @param op operation that will be accepted as a union operation
1413  */
1414 static void
1415 union_accept (struct Operation *op)
1416 {
1417   LOG (GNUNET_ERROR_TYPE_DEBUG,
1418        "accepting set union operation\n");
1419   GNUNET_assert (NULL == op->state);
1420   op->state = GNUNET_new (struct OperationState);
1421   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1422   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1423   /* kick off the operation */
1424   send_strata_estimator (op);
1425 }
1426
1427
1428 /**
1429  * Create a new set supporting the union operation
1430  *
1431  * We maintain one strata estimator per set and then manipulate it over the
1432  * lifetime of the set, as recreating a strata estimator would be expensive.
1433  *
1434  * @return the newly created set
1435  */
1436 static struct SetState *
1437 union_set_create (void)
1438 {
1439   struct SetState *set_state;
1440
1441   LOG (GNUNET_ERROR_TYPE_DEBUG,
1442        "union set created\n");
1443   set_state = GNUNET_new (struct SetState);
1444   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1445                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
1446   return set_state;
1447 }
1448
1449
1450 /**
1451  * Add the element from the given element message to the set.
1452  *
1453  * @param set_state state of the set want to add to
1454  * @param ee the element to add to the set
1455  */
1456 static void
1457 union_add (struct SetState *set_state, struct ElementEntry *ee)
1458 {
1459   strata_estimator_insert (set_state->se,
1460                            get_ibf_key (&ee->element_hash, 0));
1461 }
1462
1463
1464 /**
1465  * Remove the element given in the element message from the set.
1466  * Only marks the element as removed, so that older set operations can still exchange it.
1467  *
1468  * @param set_state state of the set to remove from
1469  * @param ee set element to remove
1470  */
1471 static void
1472 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1473 {
1474   strata_estimator_remove (set_state->se,
1475                            get_ibf_key (&ee->element_hash, 0));
1476 }
1477
1478
1479 /**
1480  * Destroy a set that supports the union operation.
1481  *
1482  * @param set_state the set to destroy
1483  */
1484 static void
1485 union_set_destroy (struct SetState *set_state)
1486 {
1487   if (NULL != set_state->se)
1488   {
1489     strata_estimator_destroy (set_state->se);
1490     set_state->se = NULL;
1491   }
1492   GNUNET_free (set_state);
1493 }
1494
1495
1496 /**
1497  * Dispatch messages for a union operation.
1498  *
1499  * @param op the state of the union evaluate operation
1500  * @param mh the received message
1501  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1502  *         #GNUNET_OK otherwise
1503  */
1504 int
1505 union_handle_p2p_message (struct Operation *op,
1506                           const struct GNUNET_MessageHeader *mh)
1507 {
1508   //LOG (GNUNET_ERROR_TYPE_DEBUG,
1509   //            "received p2p message (t: %u, s: %u)\n",
1510   //            ntohs (mh->type),
1511   //            ntohs (mh->size));
1512   switch (ntohs (mh->type))
1513   {
1514     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1515       return handle_p2p_ibf (op, mh);
1516     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1517       return handle_p2p_strata_estimator (op, mh);
1518     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1519       handle_p2p_elements (op, mh);
1520       break;
1521     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1522       handle_p2p_inquiry (op, mh);
1523       break;
1524     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1525       handle_p2p_done (op, mh);
1526       break;
1527     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1528       handle_p2p_offer (op, mh);
1529       break;
1530     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1531       handle_p2p_demand (op, mh);
1532       break;
1533     default:
1534       /* Something wrong with cadet's message handlers? */
1535       GNUNET_assert (0);
1536   }
1537   return GNUNET_OK;
1538 }
1539
1540
1541 /**
1542  * Handler for peer-disconnects, notifies the client
1543  * about the aborted operation in case the op was not concluded.
1544  *
1545  * @param op the destroyed operation
1546  */
1547 static void
1548 union_peer_disconnect (struct Operation *op)
1549 {
1550   if (PHASE_DONE != op->state->phase)
1551   {
1552     struct GNUNET_MQ_Envelope *ev;
1553     struct GNUNET_SET_ResultMessage *msg;
1554
1555     ev = GNUNET_MQ_msg (msg,
1556                         GNUNET_MESSAGE_TYPE_SET_RESULT);
1557     msg->request_id = htonl (op->spec->client_request_id);
1558     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1559     msg->element_type = htons (0);
1560     GNUNET_MQ_send (op->spec->set->client_mq,
1561                     ev);
1562     LOG (GNUNET_ERROR_TYPE_WARNING,
1563          "other peer disconnected prematurely, phase %u\n",
1564          op->state->phase);
1565     _GSS_operation_destroy (op,
1566                             GNUNET_YES);
1567     return;
1568   }
1569   // else: the session has already been concluded
1570   LOG (GNUNET_ERROR_TYPE_DEBUG,
1571        "other peer disconnected (finished)\n");
1572   if (GNUNET_NO == op->state->client_done_sent)
1573     send_done_and_destroy (op);
1574 }
1575
1576
1577 /**
1578  * Copy union-specific set state.
1579  *
1580  * @param set source set for copying the union state
1581  * @return a copy of the union-specific set state
1582  */
1583 static struct SetState *
1584 union_copy_state (struct Set *set)
1585 {
1586   struct SetState *new_state;
1587
1588   new_state = GNUNET_new (struct SetState);
1589   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1590   new_state->se = strata_estimator_dup (set->state->se);
1591
1592   return new_state;
1593 }
1594
1595
1596 /**
1597  * Get the table with implementing functions for
1598  * set union.
1599  *
1600  * @return the operation specific VTable
1601  */
1602 const struct SetVT *
1603 _GSS_union_vt ()
1604 {
1605   static const struct SetVT union_vt = {
1606     .create = &union_set_create,
1607     .msg_handler = &union_handle_p2p_message,
1608     .add = &union_add,
1609     .remove = &union_remove,
1610     .destroy_set = &union_set_destroy,
1611     .evaluate = &union_evaluate,
1612     .accept = &union_accept,
1613     .peer_disconnect = &union_peer_disconnect,
1614     .cancel = &union_op_cancel,
1615     .copy_state = &union_copy_state,
1616   };
1617
1618   return &union_vt;
1619 }