Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2016 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
33 #include <gcrypt.h>
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
37
38
39 /**
40  * Number of IBFs in a strata estimator.
41  */
42 #define SE_STRATA_COUNT 32
43
44 /**
45  * Size of the IBFs in the strata estimator.
46  */
47 #define SE_IBF_SIZE 80
48
49 /**
50  * The hash num parameter for the difference digests and strata estimators.
51  */
52 #define SE_IBF_HASH_NUM 4
53
54 /**
55  * Number of buckets that can be transmitted in one message.
56  */
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
58
59 /**
60  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61  * Choose this value so that computing the IBF is still cheaper
62  * than transmitting all values.
63  */
64 #define MAX_IBF_ORDER (20)
65
66 /**
67  * Number of buckets used in the ibf per estimated
68  * difference.
69  */
70 #define IBF_ALPHA 4
71
72
73 /**
74  * Current phase we are in for a union operation.
75  */
76 enum UnionOperationPhase
77 {
78   /**
79    * We sent the request message, and expect a strata estimator.
80    */
81   PHASE_EXPECT_SE,
82
83   /**
84    * We sent the strata estimator, and expect an IBF. This phase is entered once
85    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86    *
87    * XXX: could use better wording.
88    * XXX: repurposed to also expect a "request full set" message, should be renamed
89    *
90    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
91    */
92   PHASE_EXPECT_IBF,
93
94   /**
95    * Continuation for multi part IBFs.
96    */
97   PHASE_EXPECT_IBF_CONT,
98
99   /**
100    * We are decoding an IBF.
101    */
102   PHASE_INVENTORY_ACTIVE,
103
104   /**
105    * The other peer is decoding the IBF we just sent.
106    */
107   PHASE_INVENTORY_PASSIVE,
108
109   /**
110    * The protocol is almost finished, but we still have to flush our message
111    * queue and/or expect some elements.
112    */
113   PHASE_FINISH_CLOSING,
114
115   /**
116    * In the penultimate phase,
117    * we wait until all our demands
118    * are satisfied.  Then we send a done
119    * message, and wait for another done message.
120    */
121   PHASE_FINISH_WAITING,
122
123   /**
124    * In the ultimate phase, we wait until
125    * our demands are satisfied and then
126    * quit (sending another DONE message).
127    */
128   PHASE_DONE,
129
130   /**
131    * After sending the full set, wait for responses with the elements
132    * that the local peer is missing.
133    */
134   PHASE_FULL_SENDING,
135 };
136
137
138 /**
139  * State of an evaluate operation with another peer.
140  */
141 struct OperationState
142 {
143   /**
144    * Copy of the set's strata estimator at the time of
145    * creation of this operation.
146    */
147   struct StrataEstimator *se;
148
149   /**
150    * The IBF we currently receive.
151    */
152   struct InvertibleBloomFilter *remote_ibf;
153
154   /**
155    * The IBF with the local set's element.
156    */
157   struct InvertibleBloomFilter *local_ibf;
158
159   /**
160    * Maps unsalted IBF-Keys to elements.
161    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
162    * Colliding IBF-Keys are linked.
163    */
164   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
165
166   /**
167    * Current state of the operation.
168    */
169   enum UnionOperationPhase phase;
170
171   /**
172    * Did we send the client that we are done?
173    */
174   int client_done_sent;
175
176   /**
177    * Number of ibf buckets already received into the @a remote_ibf.
178    */
179   unsigned int ibf_buckets_received;
180
181   /**
182    * Hashes for elements that we have demanded from the other peer.
183    */
184   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
185
186   /**
187    * Salt that we're using for sending IBFs
188    */
189   uint32_t salt_send;
190
191   /**
192    * Salt for the IBF we've received and that we're currently decoding.
193    */
194   uint32_t salt_receive;
195
196   /**
197    * Number of elements we received from the other peer
198    * that were not in the local set yet.
199    */
200   uint32_t received_fresh;
201
202   /**
203    * Total number of elements received from the other peer.
204    */
205   uint32_t received_total;
206
207   /**
208    * Initial size of our set, just before
209    * the operation started.
210    */
211   uint64_t initial_size;
212 };
213
214
215 /**
216  * The key entry is used to associate an ibf key with an element.
217  */
218 struct KeyEntry
219 {
220   /**
221    * IBF key for the entry, derived from the current salt.
222    */
223   struct IBF_Key ibf_key;
224
225   /**
226    * The actual element associated with the key.
227    *
228    * Only owned by the union operation if element->operation
229    * is #GNUNET_YES.
230    */
231   struct ElementEntry *element;
232
233   /**
234    * Did we receive this element?
235    * Even if element->is_foreign is false, we might
236    * have received the element, so this indicates that
237    * the other peer has it.
238    */
239   int received;
240 };
241
242
243 /**
244  * Used as a closure for sending elements
245  * with a specific IBF key.
246  */
247 struct SendElementClosure
248 {
249   /**
250    * The IBF key whose matching elements should be
251    * sent.
252    */
253   struct IBF_Key ibf_key;
254
255   /**
256    * Operation for which the elements
257    * should be sent.
258    */
259   struct Operation *op;
260 };
261
262
263 /**
264  * Extra state required for efficient set union.
265  */
266 struct SetState
267 {
268   /**
269    * The strata estimator is only generated once for
270    * each set.
271    * The IBF keys are derived from the element hashes with
272    * salt=0.
273    */
274   struct StrataEstimator *se;
275 };
276
277
278 /**
279  * Iterator over hash map entries, called to
280  * destroy the linked list of colliding ibf key entries.
281  *
282  * @param cls closure
283  * @param key current key code
284  * @param value value in the hash map
285  * @return #GNUNET_YES if we should continue to iterate,
286  *         #GNUNET_NO if not.
287  */
288 static int
289 destroy_key_to_element_iter (void *cls,
290                              uint32_t key,
291                              void *value)
292 {
293   struct KeyEntry *k = value;
294
295   GNUNET_assert (NULL != k);
296   if (GNUNET_YES == k->element->remote)
297   {
298     GNUNET_free (k->element);
299     k->element = NULL;
300   }
301   GNUNET_free (k);
302   return GNUNET_YES;
303 }
304
305
306 /**
307  * Destroy the union operation.  Only things specific to the union
308  * operation are destroyed.
309  *
310  * @param op union operation to destroy
311  */
312 static void
313 union_op_cancel (struct Operation *op)
314 {
315   LOG (GNUNET_ERROR_TYPE_DEBUG,
316        "destroying union op\n");
317   /* check if the op was canceled twice */
318   GNUNET_assert (NULL != op->state);
319   if (NULL != op->state->remote_ibf)
320   {
321     ibf_destroy (op->state->remote_ibf);
322     op->state->remote_ibf = NULL;
323   }
324   if (NULL != op->state->demanded_hashes)
325   {
326     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
327     op->state->demanded_hashes = NULL;
328   }
329   if (NULL != op->state->local_ibf)
330   {
331     ibf_destroy (op->state->local_ibf);
332     op->state->local_ibf = NULL;
333   }
334   if (NULL != op->state->se)
335   {
336     strata_estimator_destroy (op->state->se);
337     op->state->se = NULL;
338   }
339   if (NULL != op->state->key_to_element)
340   {
341     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
342                                              &destroy_key_to_element_iter,
343                                              NULL);
344     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
345     op->state->key_to_element = NULL;
346   }
347   GNUNET_free (op->state);
348   op->state = NULL;
349   LOG (GNUNET_ERROR_TYPE_DEBUG,
350        "destroying union op done\n");
351 }
352
353
354 /**
355  * Inform the client that the union operation has failed,
356  * and proceed to destroy the evaluate operation.
357  *
358  * @param op the union operation to fail
359  */
360 static void
361 fail_union_operation (struct Operation *op)
362 {
363   struct GNUNET_MQ_Envelope *ev;
364   struct GNUNET_SET_ResultMessage *msg;
365
366   LOG (GNUNET_ERROR_TYPE_ERROR,
367        "union operation failed\n");
368   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
369   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
370   msg->request_id = htonl (op->spec->client_request_id);
371   msg->element_type = htons (0);
372   GNUNET_MQ_send (op->spec->set->client_mq, ev);
373   _GSS_operation_destroy (op, GNUNET_YES);
374 }
375
376
377 /**
378  * Derive the IBF key from a hash code and
379  * a salt.
380  *
381  * @param src the hash code
382  * @return the derived IBF key
383  */
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
386 {
387   struct IBF_Key key;
388   uint16_t salt = 0;
389
390   GNUNET_CRYPTO_kdf (&key, sizeof (key),
391                      src, sizeof *src,
392                      &salt, sizeof (salt),
393                      NULL, 0);
394   return key;
395 }
396
397
398 /**
399  * Context for #op_get_element_iterator
400  */
401 struct GetElementContext
402 {
403   struct GNUNET_HashCode hash;
404   struct KeyEntry *k;
405 };
406
407
408 /**
409  * Iterator over the mapping from IBF keys to element entries.  Checks if we
410  * have an element with a given GNUNET_HashCode.
411  *
412  * @param cls closure
413  * @param key current key code
414  * @param value value in the hash map
415  * @return #GNUNET_YES if we should search further,
416  *         #GNUNET_NO if we've found the element.
417  */
418 static int
419 op_get_element_iterator (void *cls,
420                          uint32_t key,
421                          void *value)
422 {
423   struct GetElementContext *ctx = cls;
424   struct KeyEntry *k = value;
425
426   GNUNET_assert (NULL != k);
427   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
428                                    &ctx->hash))
429   {
430     ctx->k = k;
431     return GNUNET_NO;
432   }
433   return GNUNET_YES;
434 }
435
436
437 /**
438  * Determine whether the given element is already in the operation's element
439  * set.
440  *
441  * @param op operation that should be tested for 'element_hash'
442  * @param element_hash hash of the element to look for
443  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
444  */
445 static struct KeyEntry *
446 op_get_element (struct Operation *op,
447                 const struct GNUNET_HashCode *element_hash)
448 {
449   int ret;
450   struct IBF_Key ibf_key;
451   struct GetElementContext ctx = {{{ 0 }} , 0};
452
453   ctx.hash = *element_hash;
454
455   ibf_key = get_ibf_key (element_hash);
456   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
457                                                       (uint32_t) ibf_key.key_val,
458                                                       op_get_element_iterator,
459                                                       &ctx);
460
461   /* was the iteration aborted because we found the element? */
462   if (GNUNET_SYSERR == ret)
463   {
464     GNUNET_assert (NULL != ctx.k);
465     return ctx.k;
466   }
467   return NULL;
468 }
469
470
471 /**
472  * Insert an element into the union operation's
473  * key-to-element mapping. Takes ownership of 'ee'.
474  * Note that this does not insert the element in the set,
475  * only in the operation's key-element mapping.
476  * This is done to speed up re-tried operations, if some elements
477  * were transmitted, and then the IBF fails to decode.
478  *
479  * XXX: clarify ownership, doesn't sound right.
480  *
481  * @param op the union operation
482  * @param ee the element entry
483  * @parem received was this element received from the remote peer?
484  */
485 static void
486 op_register_element (struct Operation *op,
487                      struct ElementEntry *ee,
488                      int received)
489 {
490   struct IBF_Key ibf_key;
491   struct KeyEntry *k;
492
493   ibf_key = get_ibf_key (&ee->element_hash);
494   k = GNUNET_new (struct KeyEntry);
495   k->element = ee;
496   k->ibf_key = ibf_key;
497   k->received = received;
498   GNUNET_assert (GNUNET_OK ==
499                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
500                                                       (uint32_t) ibf_key.key_val,
501                                                       k,
502                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
503 }
504
505
506 static void
507 salt_key (const struct IBF_Key *k_in,
508           uint32_t salt,
509           struct IBF_Key *k_out)
510 {
511   int s = salt % 64;
512   uint64_t x = k_in->key_val;
513   /* rotate ibf key */
514   x = (x >> s) | (x << (64 - s));
515   k_out->key_val = x;
516 }
517
518
519 static void
520 unsalt_key (const struct IBF_Key *k_in,
521             uint32_t salt,
522             struct IBF_Key *k_out)
523 {
524   int s = salt % 64;
525   uint64_t x = k_in->key_val;
526   x = (x << s) | (x >> (64 - s));
527   k_out->key_val = x;
528 }
529
530
531 /**
532  * Insert a key into an ibf.
533  *
534  * @param cls the ibf
535  * @param key unused
536  * @param value the key entry to get the key from
537  */
538 static int
539 prepare_ibf_iterator (void *cls,
540                       uint32_t key,
541                       void *value)
542 {
543   struct Operation *op = cls;
544   struct KeyEntry *ke = value;
545   struct IBF_Key salted_key;
546
547   LOG (GNUNET_ERROR_TYPE_DEBUG,
548        "[OP %x] inserting %lx (hash %s) into ibf\n",
549        (void *) op,
550        (unsigned long) ke->ibf_key.key_val,
551        GNUNET_h2s (&ke->element->element_hash));
552   salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
553   ibf_insert (op->state->local_ibf, salted_key);
554   return GNUNET_YES;
555 }
556
557
558 /**
559  * Iterator for initializing the
560  * key-to-element mapping of a union operation
561  *
562  * @param cls the union operation `struct Operation *`
563  * @param key unused
564  * @param value the `struct ElementEntry *` to insert
565  *        into the key-to-element mapping
566  * @return #GNUNET_YES (to continue iterating)
567  */
568 static int
569 init_key_to_element_iterator (void *cls,
570                               const struct GNUNET_HashCode *key,
571                               void *value)
572 {
573   struct Operation *op = cls;
574   struct ElementEntry *ee = value;
575
576   /* make sure that the element belongs to the set at the time
577    * of creating the operation */
578   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
579     return GNUNET_YES;
580
581   GNUNET_assert (GNUNET_NO == ee->remote);
582
583   op_register_element (op, ee, GNUNET_NO);
584   return GNUNET_YES;
585 }
586
587
588 /**
589  * Initialize the IBF key to element mapping local to this set
590  * operation.
591  *
592  * @param op the set union operation
593  */
594 static void
595 initialize_key_to_element (struct Operation *op)
596 {
597   unsigned int len;
598
599   GNUNET_assert (NULL == op->state->key_to_element);
600   len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
603 }
604
605
606 /**
607  * Create an ibf with the operation's elements
608  * of the specified size
609  *
610  * @param op the union operation
611  * @param size size of the ibf to create
612  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
613  */
614 static int
615 prepare_ibf (struct Operation *op,
616              uint32_t size)
617 {
618   GNUNET_assert (NULL != op->state->key_to_element);
619
620   if (NULL != op->state->local_ibf)
621     ibf_destroy (op->state->local_ibf);
622   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
623   if (NULL == op->state->local_ibf)
624   {
625     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
626                 "Failed to allocate local IBF\n");
627     return GNUNET_SYSERR;
628   }
629   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
630                                            &prepare_ibf_iterator,
631                                            op);
632   return GNUNET_OK;
633 }
634
635
636 /**
637  * Send an ibf of appropriate size.
638  *
639  * Fragments the IBF into multiple messages if necessary.
640  *
641  * @param op the union operation
642  * @param ibf_order order of the ibf to send, size=2^order
643  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
644  */
645 static int
646 send_ibf (struct Operation *op,
647           uint16_t ibf_order)
648 {
649   unsigned int buckets_sent = 0;
650   struct InvertibleBloomFilter *ibf;
651
652   if (GNUNET_OK !=
653       prepare_ibf (op, 1<<ibf_order))
654   {
655     /* allocation failed */
656     return GNUNET_SYSERR;
657   }
658
659   LOG (GNUNET_ERROR_TYPE_DEBUG,
660        "sending ibf of size %u\n",
661        1<<ibf_order);
662
663   {
664     char name[64] = { 0 };
665     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
666     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
667   }
668
669   ibf = op->state->local_ibf;
670
671   while (buckets_sent < (1 << ibf_order))
672   {
673     unsigned int buckets_in_message;
674     struct GNUNET_MQ_Envelope *ev;
675     struct IBFMessage *msg;
676
677     buckets_in_message = (1 << ibf_order) - buckets_sent;
678     /* limit to maximum */
679     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
680       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
681
682     ev = GNUNET_MQ_msg_extra (msg,
683                               buckets_in_message * IBF_BUCKET_SIZE,
684                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
685     msg->reserved1 = 0;
686     msg->reserved2 = 0;
687     msg->order = ibf_order;
688     msg->offset = htonl (buckets_sent);
689     msg->salt = htonl (op->state->salt_send);
690     ibf_write_slice (ibf, buckets_sent,
691                      buckets_in_message, &msg[1]);
692     buckets_sent += buckets_in_message;
693     LOG (GNUNET_ERROR_TYPE_DEBUG,
694          "ibf chunk size %u, %u/%u sent\n",
695          buckets_in_message,
696          buckets_sent,
697          1<<ibf_order);
698     GNUNET_MQ_send (op->mq, ev);
699   }
700
701   /* The other peer must decode the IBF, so
702    * we're passive. */
703   op->state->phase = PHASE_INVENTORY_PASSIVE;
704   return GNUNET_OK;
705 }
706
707
708 /**
709  * Send a strata estimator to the remote peer.
710  *
711  * @param op the union operation with the remote peer
712  */
713 static void
714 send_strata_estimator (struct Operation *op)
715 {
716   const struct StrataEstimator *se = op->state->se;
717   struct GNUNET_MQ_Envelope *ev;
718   struct StrataEstimatorMessage *strata_msg;
719   char *buf;
720   size_t len;
721   uint16_t type;
722
723   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
724   len = strata_estimator_write (op->state->se,
725                                 buf);
726   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
727     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
728   else
729     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
730   ev = GNUNET_MQ_msg_extra (strata_msg,
731                             len,
732                             type);
733   GNUNET_memcpy (&strata_msg[1],
734           buf,
735           len);
736   GNUNET_free (buf);
737   strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
738   GNUNET_MQ_send (op->mq,
739                   ev);
740   op->state->phase = PHASE_EXPECT_IBF;
741   LOG (GNUNET_ERROR_TYPE_DEBUG,
742        "sent SE, expecting IBF\n");
743 }
744
745
746 /**
747  * Compute the necessary order of an ibf
748  * from the size of the symmetric set difference.
749  *
750  * @param diff the difference
751  * @return the required size of the ibf
752  */
753 static unsigned int
754 get_order_from_difference (unsigned int diff)
755 {
756   unsigned int ibf_order;
757
758   ibf_order = 2;
759   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
760           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
761     ibf_order++;
762   if (ibf_order > MAX_IBF_ORDER)
763     ibf_order = MAX_IBF_ORDER;
764   // add one for correction
765   return ibf_order + 1;
766 }
767
768
769 /**
770  * Send a set element.
771  *
772  * @param cls the union operation `struct Operation *`
773  * @param key unused
774  * @param value the `struct ElementEntry *` to insert
775  *        into the key-to-element mapping
776  * @return #GNUNET_YES (to continue iterating)
777  */
778 static int
779 send_element_iterator (void *cls,
780                        const struct GNUNET_HashCode *key,
781                        void *value)
782 {
783   struct Operation *op = cls;
784   struct GNUNET_SET_ElementMessage *emsg;
785   struct ElementEntry *ee = value;
786   struct GNUNET_SET_Element *el = &ee->element;
787   struct GNUNET_MQ_Envelope *ev;
788
789
790   ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
791   emsg->element_type = htons (el->element_type);
792   GNUNET_memcpy (&emsg[1], el->data, el->size);
793   GNUNET_MQ_send (op->mq, ev);
794   return GNUNET_YES;
795 }
796
797
798 static void
799 send_full_set (struct Operation *op)
800 {
801   struct GNUNET_MQ_Envelope *ev;
802
803   op->state->phase = PHASE_FULL_SENDING;
804
805   (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
806                                                 &send_element_iterator, op);
807   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
808   GNUNET_MQ_send (op->mq, ev);
809 }
810
811
812 /**
813  * Handle a strata estimator from a remote peer
814  *
815  * @param cls the union operation
816  * @param mh the message
817  * @param is_compressed #GNUNET_YES if the estimator is compressed
818  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
819  *         #GNUNET_OK otherwise
820  */
821 static int
822 handle_p2p_strata_estimator (void *cls,
823                              const struct GNUNET_MessageHeader *mh,
824                              int is_compressed)
825 {
826   struct Operation *op = cls;
827   struct StrataEstimator *remote_se;
828   struct StrataEstimatorMessage *msg = (void *) mh;
829   unsigned int diff;
830   uint64_t other_size;
831   size_t len;
832
833   GNUNET_STATISTICS_update (_GSS_statistics,
834                             "# bytes of SE received",
835                             ntohs (mh->size),
836                             GNUNET_NO);
837
838   if (op->state->phase != PHASE_EXPECT_SE)
839   {
840     GNUNET_break (0);
841     fail_union_operation (op);
842     return GNUNET_SYSERR;
843   }
844   len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
845   if ( (GNUNET_NO == is_compressed) &&
846        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
847   {
848     fail_union_operation (op);
849     GNUNET_break (0);
850     return GNUNET_SYSERR;
851   }
852   other_size = GNUNET_ntohll (msg->set_size);
853   remote_se = strata_estimator_create (SE_STRATA_COUNT,
854                                        SE_IBF_SIZE,
855                                        SE_IBF_HASH_NUM);
856   if (NULL == remote_se)
857   {
858     /* insufficient resources, fail */
859     fail_union_operation (op);
860     return GNUNET_SYSERR;
861   }
862   if (GNUNET_OK !=
863       strata_estimator_read (&msg[1],
864                              len,
865                              is_compressed,
866                              remote_se))
867   {
868     /* decompression failed */
869     fail_union_operation (op);
870     strata_estimator_destroy (remote_se);
871     return GNUNET_SYSERR;
872   }
873   GNUNET_assert (NULL != op->state->se);
874   diff = strata_estimator_difference (remote_se,
875                                       op->state->se);
876
877   if (diff > 200)
878     diff = diff * 3 / 2; 
879
880   strata_estimator_destroy (remote_se);
881   strata_estimator_destroy (op->state->se);
882   op->state->se = NULL;
883   LOG (GNUNET_ERROR_TYPE_DEBUG,
884        "got se diff=%d, using ibf size %d\n",
885        diff,
886        1<<get_order_from_difference (diff));
887
888   if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
889   {
890     GNUNET_break (0);
891     fail_union_operation (op);
892     return GNUNET_SYSERR;
893   }
894
895
896   if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4))
897   {
898     LOG (GNUNET_ERROR_TYPE_INFO,
899          "Sending full set (diff=%d, own set=%u)\n",
900          diff,
901          op->state->initial_size);
902     GNUNET_STATISTICS_update (_GSS_statistics,
903                               "# of full sends",
904                               1,
905                               GNUNET_NO);
906     if (op->state->initial_size <= other_size)
907     {
908       send_full_set (op);
909     }
910     else
911     {
912       struct GNUNET_MQ_Envelope *ev;
913       op->state->phase = PHASE_EXPECT_IBF;
914       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
915       GNUNET_MQ_send (op->mq, ev);
916     }
917   }
918   else
919   {
920     GNUNET_STATISTICS_update (_GSS_statistics,
921                               "# of ibf sends",
922                               1,
923                               GNUNET_NO);
924     if (GNUNET_OK !=
925         send_ibf (op,
926                   get_order_from_difference (diff)))
927     {
928       /* Internal error, best we can do is shut the connection */
929       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
930                   "Failed to send IBF, closing connection\n");
931       fail_union_operation (op);
932       return GNUNET_SYSERR;
933     }
934   }
935
936   return GNUNET_OK;
937 }
938
939
940 /**
941  * Iterator to send elements to a remote peer
942  *
943  * @param cls closure with the element key and the union operation
944  * @param key ignored
945  * @param value the key entry
946  */
947 static int
948 send_offers_iterator (void *cls,
949                       uint32_t key,
950                       void *value)
951 {
952   struct SendElementClosure *sec = cls;
953   struct Operation *op = sec->op;
954   struct KeyEntry *ke = value;
955   struct GNUNET_MQ_Envelope *ev;
956   struct GNUNET_MessageHeader *mh;
957
958   /* Detect 32-bit key collision for the 64-bit IBF keys. */
959   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
960     return GNUNET_YES;
961
962   ev = GNUNET_MQ_msg_header_extra (mh,
963                                    sizeof (struct GNUNET_HashCode),
964                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
965
966   GNUNET_assert (NULL != ev);
967   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
968   LOG (GNUNET_ERROR_TYPE_DEBUG,
969        "[OP %x] sending element offer (%s) to peer\n",
970        (void *) op,
971        GNUNET_h2s (&ke->element->element_hash));
972   GNUNET_MQ_send (op->mq, ev);
973   return GNUNET_YES;
974 }
975
976
977 /**
978  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
979  *
980  * @param op union operation
981  * @param ibf_key IBF key of interest
982  */
983 static void
984 send_offers_for_key (struct Operation *op,
985                      struct IBF_Key ibf_key)
986 {
987   struct SendElementClosure send_cls;
988
989   send_cls.ibf_key = ibf_key;
990   send_cls.op = op;
991   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
992                                                        (uint32_t) ibf_key.key_val,
993                                                        &send_offers_iterator,
994                                                        &send_cls);
995 }
996
997
998 /**
999  * Decode which elements are missing on each side, and
1000  * send the appropriate offers and inquiries.
1001  *
1002  * @param op union operation
1003  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1004  */
1005 static int
1006 decode_and_send (struct Operation *op)
1007 {
1008   struct IBF_Key key;
1009   struct IBF_Key last_key;
1010   int side;
1011   unsigned int num_decoded;
1012   struct InvertibleBloomFilter *diff_ibf;
1013
1014   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1015
1016   if (GNUNET_OK !=
1017       prepare_ibf (op, op->state->remote_ibf->size))
1018   {
1019     GNUNET_break (0);
1020     /* allocation failed */
1021     return GNUNET_SYSERR;
1022   }
1023   diff_ibf = ibf_dup (op->state->local_ibf);
1024   ibf_subtract (diff_ibf, op->state->remote_ibf);
1025
1026   ibf_destroy (op->state->remote_ibf);
1027   op->state->remote_ibf = NULL;
1028
1029   LOG (GNUNET_ERROR_TYPE_DEBUG,
1030        "decoding IBF (size=%u)\n",
1031        diff_ibf->size);
1032
1033   num_decoded = 0;
1034   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1035
1036   while (1)
1037   {
1038     int res;
1039     int cycle_detected = GNUNET_NO;
1040
1041     last_key = key;
1042
1043     res = ibf_decode (diff_ibf, &side, &key);
1044     if (res == GNUNET_OK)
1045     {
1046       LOG (GNUNET_ERROR_TYPE_DEBUG,
1047            "decoded ibf key %lx\n",
1048            (unsigned long) key.key_val);
1049       num_decoded += 1;
1050       if ( (num_decoded > diff_ibf->size) ||
1051            ( (num_decoded > 1) &&
1052              (last_key.key_val == key.key_val) ) )
1053       {
1054         LOG (GNUNET_ERROR_TYPE_DEBUG,
1055              "detected cyclic ibf (decoded %u/%u)\n",
1056              num_decoded,
1057              diff_ibf->size);
1058         cycle_detected = GNUNET_YES;
1059       }
1060     }
1061     if ( (GNUNET_SYSERR == res) ||
1062          (GNUNET_YES == cycle_detected) )
1063     {
1064       int next_order;
1065       next_order = 0;
1066       while (1<<next_order < diff_ibf->size)
1067         next_order++;
1068       next_order++;
1069       if (next_order <= MAX_IBF_ORDER)
1070       {
1071         LOG (GNUNET_ERROR_TYPE_DEBUG,
1072              "decoding failed, sending larger ibf (size %u)\n",
1073              1<<next_order);
1074         GNUNET_STATISTICS_update (_GSS_statistics,
1075                                   "# of IBF retries",
1076                                   1,
1077                                   GNUNET_NO);
1078         op->state->salt_send++;
1079         if (GNUNET_OK !=
1080             send_ibf (op, next_order))
1081         {
1082           /* Internal error, best we can do is shut the connection */
1083           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1084                       "Failed to send IBF, closing connection\n");
1085           fail_union_operation (op);
1086           ibf_destroy (diff_ibf);
1087           return GNUNET_SYSERR;
1088         }
1089       }
1090       else
1091       {
1092         GNUNET_STATISTICS_update (_GSS_statistics,
1093                                   "# of failed union operations (too large)",
1094                                   1,
1095                                   GNUNET_NO);
1096         // XXX: Send the whole set, element-by-element
1097         LOG (GNUNET_ERROR_TYPE_ERROR,
1098              "set union failed: reached ibf limit\n");
1099         fail_union_operation (op);
1100         ibf_destroy (diff_ibf);
1101         return GNUNET_SYSERR;
1102       }
1103       break;
1104     }
1105     if (GNUNET_NO == res)
1106     {
1107       struct GNUNET_MQ_Envelope *ev;
1108
1109       LOG (GNUNET_ERROR_TYPE_DEBUG,
1110            "transmitted all values, sending DONE\n");
1111       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1112       GNUNET_MQ_send (op->mq, ev);
1113       /* We now wait until we get a DONE message back
1114        * and then wait for our MQ to be flushed and all our
1115        * demands be delivered. */
1116       break;
1117     }
1118     if (1 == side)
1119     {
1120       struct IBF_Key unsalted_key;
1121       unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1122       send_offers_for_key (op, unsalted_key);
1123     }
1124     else if (-1 == side)
1125     {
1126       struct GNUNET_MQ_Envelope *ev;
1127       struct InquiryMessage *msg;
1128
1129       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1130        * the effort additional complexity. */
1131       ev = GNUNET_MQ_msg_extra (msg,
1132                                 sizeof (struct IBF_Key),
1133                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1134       msg->salt = htonl (op->state->salt_receive);
1135       GNUNET_memcpy (&msg[1],
1136               &key,
1137               sizeof (struct IBF_Key));
1138       LOG (GNUNET_ERROR_TYPE_DEBUG,
1139            "sending element inquiry for IBF key %lx\n",
1140            (unsigned long) key.key_val);
1141       GNUNET_MQ_send (op->mq, ev);
1142     }
1143     else
1144     {
1145       GNUNET_assert (0);
1146     }
1147   }
1148   ibf_destroy (diff_ibf);
1149   return GNUNET_OK;
1150 }
1151
1152
1153 /**
1154  * Handle an IBF message from a remote peer.
1155  *
1156  * Reassemble the IBF from multiple pieces, and
1157  * process the whole IBF once possible.
1158  *
1159  * @param cls the union operation
1160  * @param mh the header of the message
1161  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1162  *         #GNUNET_OK otherwise
1163  */
1164 static int
1165 handle_p2p_ibf (void *cls,
1166                 const struct GNUNET_MessageHeader *mh)
1167 {
1168   struct Operation *op = cls;
1169   const struct IBFMessage *msg;
1170   unsigned int buckets_in_message;
1171
1172   if (ntohs (mh->size) < sizeof (struct IBFMessage))
1173   {
1174     GNUNET_break_op (0);
1175     fail_union_operation (op);
1176     return GNUNET_SYSERR;
1177   }
1178   msg = (const struct IBFMessage *) mh;
1179   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1180        (op->state->phase == PHASE_EXPECT_IBF) )
1181   {
1182     op->state->phase = PHASE_EXPECT_IBF_CONT;
1183     GNUNET_assert (NULL == op->state->remote_ibf);
1184     LOG (GNUNET_ERROR_TYPE_DEBUG,
1185          "Creating new ibf of size %u\n",
1186          1 << msg->order);
1187     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1188     op->state->salt_receive = ntohl (msg->salt);
1189     LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1190     if (NULL == op->state->remote_ibf)
1191     {
1192       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1193                   "Failed to parse remote IBF, closing connection\n");
1194       fail_union_operation (op);
1195       return GNUNET_SYSERR;
1196     }
1197     op->state->ibf_buckets_received = 0;
1198     if (0 != ntohl (msg->offset))
1199     {
1200       GNUNET_break_op (0);
1201       fail_union_operation (op);
1202       return GNUNET_SYSERR;
1203     }
1204   }
1205   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1206   {
1207     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1208     {
1209       GNUNET_break_op (0);
1210       fail_union_operation (op);
1211       return GNUNET_SYSERR;
1212     }
1213     if (1<<msg->order != op->state->remote_ibf->size)
1214     {
1215       GNUNET_break_op (0);
1216       fail_union_operation (op);
1217       return GNUNET_SYSERR;
1218     }
1219     if (ntohl (msg->salt) != op->state->salt_receive)
1220     {
1221       GNUNET_break_op (0);
1222       fail_union_operation (op);
1223       return GNUNET_SYSERR;
1224     }
1225   }
1226   else
1227   {
1228     GNUNET_assert (0);
1229   }
1230
1231   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1232
1233   if (0 == buckets_in_message)
1234   {
1235     GNUNET_break_op (0);
1236     fail_union_operation (op);
1237     return GNUNET_SYSERR;
1238   }
1239
1240   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1241   {
1242     GNUNET_break_op (0);
1243     fail_union_operation (op);
1244     return GNUNET_SYSERR;
1245   }
1246
1247   GNUNET_assert (NULL != op->state->remote_ibf);
1248
1249   ibf_read_slice (&msg[1],
1250                   op->state->ibf_buckets_received,
1251                   buckets_in_message,
1252                   op->state->remote_ibf);
1253   op->state->ibf_buckets_received += buckets_in_message;
1254
1255   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1256   {
1257     LOG (GNUNET_ERROR_TYPE_DEBUG,
1258          "received full ibf\n");
1259     op->state->phase = PHASE_INVENTORY_ACTIVE;
1260     if (GNUNET_OK !=
1261         decode_and_send (op))
1262     {
1263       /* Internal error, best we can do is shut down */
1264       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1265                   "Failed to decode IBF, closing connection\n");
1266       return GNUNET_SYSERR;
1267     }
1268   }
1269   return GNUNET_OK;
1270 }
1271
1272
1273 /**
1274  * Send a result message to the client indicating
1275  * that there is a new element.
1276  *
1277  * @param op union operation
1278  * @param element element to send
1279  * @param status status to send with the new element
1280  */
1281 static void
1282 send_client_element (struct Operation *op,
1283                      struct GNUNET_SET_Element *element,
1284                      int status)
1285 {
1286   struct GNUNET_MQ_Envelope *ev;
1287   struct GNUNET_SET_ResultMessage *rm;
1288
1289   LOG (GNUNET_ERROR_TYPE_DEBUG,
1290        "sending element (size %u) to client\n",
1291        element->size);
1292   GNUNET_assert (0 != op->spec->client_request_id);
1293   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1294   if (NULL == ev)
1295   {
1296     GNUNET_MQ_discard (ev);
1297     GNUNET_break (0);
1298     return;
1299   }
1300   rm->result_status = htons (status);
1301   rm->request_id = htonl (op->spec->client_request_id);
1302   rm->element_type = htons (element->element_type);
1303   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1304   GNUNET_memcpy (&rm[1], element->data, element->size);
1305   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1306 }
1307
1308
1309 /**
1310  * Signal to the client that the operation has finished and
1311  * destroy the operation.
1312  *
1313  * @param cls operation to destroy
1314  */
1315 static void
1316 send_done_and_destroy (void *cls)
1317 {
1318   struct Operation *op = cls;
1319   struct GNUNET_MQ_Envelope *ev;
1320   struct GNUNET_SET_ResultMessage *rm;
1321
1322   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1323   rm->request_id = htonl (op->spec->client_request_id);
1324   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1325   rm->element_type = htons (0);
1326   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1327   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1328   /* Will also call the union-specific cancel function. */
1329   _GSS_operation_destroy (op, GNUNET_YES);
1330 }
1331
1332
1333 static void
1334 maybe_finish (struct Operation *op)
1335 {
1336   unsigned int num_demanded;
1337
1338   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1339
1340   if (PHASE_FINISH_WAITING == op->state->phase)
1341   {
1342     LOG (GNUNET_ERROR_TYPE_DEBUG,
1343          "In PHASE_FINISH_WAITING, pending %u demands\n",
1344          num_demanded);
1345     if (0 == num_demanded)
1346     {
1347       struct GNUNET_MQ_Envelope *ev;
1348
1349       op->state->phase = PHASE_DONE;
1350       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1351       GNUNET_MQ_send (op->mq, ev);
1352
1353       /* We now wait until the other peer closes the channel
1354        * after it got all elements from us. */
1355     }
1356   }
1357   if (PHASE_FINISH_CLOSING == op->state->phase)
1358   {
1359     LOG (GNUNET_ERROR_TYPE_DEBUG,
1360          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1361          num_demanded);
1362     if (0 == num_demanded)
1363     {
1364       op->state->phase = PHASE_DONE;
1365       send_done_and_destroy (op);
1366     }
1367   }
1368 }
1369
1370
1371 /**
1372  * Handle an element message from a remote peer.
1373  * Sent by the other peer either because we decoded an IBF and placed a demand,
1374  * or because the other peer switched to full set transmission.
1375  *
1376  * @param cls the union operation
1377  * @param mh the message
1378  */
1379 static void
1380 handle_p2p_elements (void *cls,
1381                      const struct GNUNET_MessageHeader *mh)
1382 {
1383   struct Operation *op = cls;
1384   struct ElementEntry *ee;
1385   const struct GNUNET_SET_ElementMessage *emsg;
1386   uint16_t element_size;
1387
1388   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1389   {
1390     GNUNET_break_op (0);
1391     fail_union_operation (op);
1392     return;
1393   }
1394   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1395   {
1396     GNUNET_break_op (0);
1397     fail_union_operation (op);
1398     return;
1399   }
1400
1401   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1402
1403   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1404   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1405   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1406   ee->element.size = element_size;
1407   ee->element.data = &ee[1];
1408   ee->element.element_type = ntohs (emsg->element_type);
1409   ee->remote = GNUNET_YES;
1410   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1411
1412   if (GNUNET_NO ==
1413       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1414                                             &ee->element_hash,
1415                                             NULL))
1416   {
1417     /* We got something we didn't demand, since it's not in our map. */
1418     GNUNET_break_op (0);
1419     GNUNET_free (ee);
1420     fail_union_operation (op);
1421     return;
1422   }
1423
1424   LOG (GNUNET_ERROR_TYPE_DEBUG,
1425        "Got element (size %u, hash %s) from peer\n",
1426        (unsigned int) element_size,
1427        GNUNET_h2s (&ee->element_hash));
1428
1429   GNUNET_STATISTICS_update (_GSS_statistics,
1430                             "# received elements",
1431                             1,
1432                             GNUNET_NO);
1433   GNUNET_STATISTICS_update (_GSS_statistics,
1434                             "# exchanged elements",
1435                             1,
1436                             GNUNET_NO);
1437
1438   op->state->received_total += 1;
1439
1440   struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1441
1442   if (NULL != ke)
1443   {
1444     /* Got repeated element.  Should not happen since
1445      * we track demands. */
1446     GNUNET_STATISTICS_update (_GSS_statistics,
1447                               "# repeated elements",
1448                               1,
1449                               GNUNET_NO);
1450     ke->received = GNUNET_YES;
1451     GNUNET_free (ee);
1452   }
1453   else
1454   {
1455     LOG (GNUNET_ERROR_TYPE_DEBUG,
1456          "Registering new element from remote peer\n");
1457     op->state->received_fresh += 1;
1458     op_register_element (op, ee, GNUNET_YES);
1459     /* only send results immediately if the client wants it */
1460     switch (op->spec->result_mode)
1461     {
1462       case GNUNET_SET_RESULT_ADDED:
1463         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1464         break;
1465       case GNUNET_SET_RESULT_SYMMETRIC:
1466         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1467         break;
1468       default:
1469         /* Result mode not supported, should have been caught earlier. */
1470         GNUNET_break (0);
1471         break;
1472     }
1473   }
1474
1475   if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1476   {
1477     /* The other peer gave us lots of old elements, there's something wrong. */
1478     GNUNET_break_op (0);
1479     fail_union_operation (op);
1480     return;
1481   }
1482
1483   maybe_finish (op);
1484 }
1485
1486
1487 /**
1488  * Handle an element message from a remote peer.
1489  *
1490  * @param cls the union operation
1491  * @param mh the message
1492  */
1493 static void
1494 handle_p2p_full_element (void *cls,
1495                          const struct GNUNET_MessageHeader *mh)
1496 {
1497   struct Operation *op = cls;
1498   struct ElementEntry *ee;
1499   const struct GNUNET_SET_ElementMessage *emsg;
1500   uint16_t element_size;
1501
1502   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1503   {
1504     GNUNET_break_op (0);
1505     fail_union_operation (op);
1506     return;
1507   }
1508
1509   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1510
1511   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1512   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1513   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1514   ee->element.size = element_size;
1515   ee->element.data = &ee[1];
1516   ee->element.element_type = ntohs (emsg->element_type);
1517   ee->remote = GNUNET_YES;
1518   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1519
1520   LOG (GNUNET_ERROR_TYPE_DEBUG,
1521        "Got element (full diff, size %u, hash %s) from peer\n",
1522        (unsigned int) element_size,
1523        GNUNET_h2s (&ee->element_hash));
1524
1525   GNUNET_STATISTICS_update (_GSS_statistics,
1526                             "# received elements",
1527                             1,
1528                             GNUNET_NO);
1529   GNUNET_STATISTICS_update (_GSS_statistics,
1530                             "# exchanged elements",
1531                             1,
1532                             GNUNET_NO);
1533
1534   op->state->received_total += 1;
1535
1536   struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1537
1538   if (NULL != ke)
1539   {
1540     /* Got repeated element.  Should not happen since
1541      * we track demands. */
1542     GNUNET_STATISTICS_update (_GSS_statistics,
1543                               "# repeated elements",
1544                               1,
1545                               GNUNET_NO);
1546     ke->received = GNUNET_YES;
1547     GNUNET_free (ee);
1548   }
1549   else
1550   {
1551     LOG (GNUNET_ERROR_TYPE_DEBUG,
1552          "Registering new element from remote peer\n");
1553     op->state->received_fresh += 1;
1554     op_register_element (op, ee, GNUNET_YES);
1555     /* only send results immediately if the client wants it */
1556     switch (op->spec->result_mode)
1557     {
1558       case GNUNET_SET_RESULT_ADDED:
1559         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1560         break;
1561       case GNUNET_SET_RESULT_SYMMETRIC:
1562         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1563         break;
1564       default:
1565         /* Result mode not supported, should have been caught earlier. */
1566         GNUNET_break (0);
1567         break;
1568     }
1569   }
1570
1571   if ( (GNUNET_YES == op->spec->byzantine) && 
1572        (op->state->received_total > 384 + op->state->received_fresh * 4) && 
1573        (op->state->received_fresh < op->state->received_total / 6) )
1574   {
1575     /* The other peer gave us lots of old elements, there's something wrong. */
1576     LOG (GNUNET_ERROR_TYPE_ERROR,
1577          "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1578          (unsigned long long) op->state->received_fresh,
1579          (unsigned long long) op->state->received_total);
1580     GNUNET_break_op (0);
1581     fail_union_operation (op);
1582     return;
1583   }
1584 }
1585
1586 /**
1587  * Send offers (for GNUNET_Hash-es) in response
1588  * to inquiries (for IBF_Key-s).
1589  *
1590  * @param cls the union operation
1591  * @param mh the message
1592  */
1593 static void
1594 handle_p2p_inquiry (void *cls,
1595                     const struct GNUNET_MessageHeader *mh)
1596 {
1597   struct Operation *op = cls;
1598   const struct IBF_Key *ibf_key;
1599   unsigned int num_keys;
1600   struct InquiryMessage *msg;
1601
1602   /* look up elements and send them */
1603   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1604   {
1605     GNUNET_break_op (0);
1606     fail_union_operation (op);
1607     return;
1608   }
1609   num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1610       / sizeof (struct IBF_Key);
1611   if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1612       != num_keys * sizeof (struct IBF_Key))
1613   {
1614     GNUNET_break_op (0);
1615     fail_union_operation (op);
1616     return;
1617   }
1618
1619   msg = (struct InquiryMessage *) mh;
1620
1621   ibf_key = (const struct IBF_Key *) &msg[1];
1622   while (0 != num_keys--)
1623   {
1624     struct IBF_Key unsalted_key;
1625     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1626     send_offers_for_key (op, unsalted_key);
1627     ibf_key++;
1628   }
1629 }
1630
1631
1632 /**
1633  * Iterator over hash map entries, called to
1634  * destroy the linked list of colliding ibf key entries.
1635  *
1636  * @param cls closure
1637  * @param key current key code
1638  * @param value value in the hash map
1639  * @return #GNUNET_YES if we should continue to iterate,
1640  *         #GNUNET_NO if not.
1641  */
1642 static int
1643 send_missing_elements_iter (void *cls,
1644                             uint32_t key,
1645                             void *value)
1646 {
1647   struct Operation *op = cls;
1648   struct KeyEntry *ke = value;
1649   struct GNUNET_MQ_Envelope *ev;
1650   struct GNUNET_SET_ElementMessage *emsg;
1651   struct ElementEntry *ee = ke->element;
1652
1653   if (GNUNET_YES == ke->received)
1654     return GNUNET_YES;
1655
1656   ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1657   GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1658   emsg->reserved = htons (0);
1659   emsg->element_type = htons (ee->element.element_type);
1660   GNUNET_MQ_send (op->mq, ev);
1661
1662   return GNUNET_YES;
1663 }
1664
1665
1666 /**
1667  * Handle a 
1668  *
1669  * @parem cls closure, a set union operation
1670  * @param mh the demand message
1671  */
1672 static void
1673 handle_p2p_request_full (void *cls,
1674                          const struct GNUNET_MessageHeader *mh)
1675 {
1676   struct Operation *op = cls;
1677
1678   if (PHASE_EXPECT_IBF != op->state->phase)
1679   {
1680     fail_union_operation (op);
1681     GNUNET_break_op (0);
1682     return;
1683   }
1684
1685   // FIXME: we need to check that our set is larger than the
1686   // byzantine_lower_bound by some threshold
1687   send_full_set (op);
1688 }
1689
1690
1691 /**
1692  * Handle a "full done" message.
1693  *
1694  * @parem cls closure, a set union operation
1695  * @param mh the demand message
1696  */
1697 static void
1698 handle_p2p_full_done (void *cls,
1699                       const struct GNUNET_MessageHeader *mh)
1700 {
1701   struct Operation *op = cls;
1702
1703   if (PHASE_EXPECT_IBF == op->state->phase)
1704   {
1705     struct GNUNET_MQ_Envelope *ev;
1706
1707     LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1708
1709     /* send all the elements that did not come from the remote peer */
1710     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1711                                              &send_missing_elements_iter,
1712                                              op);
1713
1714     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1715     GNUNET_MQ_send (op->mq, ev);
1716     op->state->phase = PHASE_DONE;
1717
1718     /* we now wait until the other peer shuts the tunnel down*/
1719   }
1720   else if (PHASE_FULL_SENDING == op->state->phase)
1721   {
1722     LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1723     /* We sent the full set, and got the response for that.  We're done. */
1724     op->state->phase = PHASE_DONE;
1725     send_done_and_destroy (op);
1726   }
1727   else
1728   {
1729     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1730     GNUNET_break_op (0);
1731     fail_union_operation (op);
1732     return;
1733   }
1734 }
1735
1736
1737 /**
1738  * Handle a demand by the other peer for elements based on a list
1739  * of GNUNET_HashCode-s.
1740  *
1741  * @parem cls closure, a set union operation
1742  * @param mh the demand message
1743  */
1744 static void
1745 handle_p2p_demand (void *cls,
1746                    const struct GNUNET_MessageHeader *mh)
1747 {
1748   struct Operation *op = cls;
1749   struct ElementEntry *ee;
1750   struct GNUNET_SET_ElementMessage *emsg;
1751   const struct GNUNET_HashCode *hash;
1752   unsigned int num_hashes;
1753   struct GNUNET_MQ_Envelope *ev;
1754
1755   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1756     / sizeof (struct GNUNET_HashCode);
1757   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1758       != num_hashes * sizeof (struct GNUNET_HashCode))
1759   {
1760     GNUNET_break_op (0);
1761     fail_union_operation (op);
1762     return;
1763   }
1764
1765   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1766        num_hashes > 0;
1767        hash++, num_hashes--)
1768   {
1769     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1770     if (NULL == ee)
1771     {
1772       /* Demand for non-existing element. */
1773       GNUNET_break_op (0);
1774       fail_union_operation (op);
1775       return;
1776     }
1777     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1778     {
1779       /* Probably confused lazily copied sets. */
1780       GNUNET_break_op (0);
1781       fail_union_operation (op);
1782       return;
1783     }
1784     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1785     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1786     emsg->reserved = htons (0);
1787     emsg->element_type = htons (ee->element.element_type);
1788     LOG (GNUNET_ERROR_TYPE_DEBUG,
1789          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1790          (void *) op,
1791          (unsigned int) ee->element.size,
1792          GNUNET_h2s (&ee->element_hash));
1793     GNUNET_MQ_send (op->mq, ev);
1794     GNUNET_STATISTICS_update (_GSS_statistics,
1795                               "# exchanged elements",
1796                               1,
1797                               GNUNET_NO);
1798
1799     switch (op->spec->result_mode)
1800     {
1801       case GNUNET_SET_RESULT_ADDED:
1802         /* Nothing to do. */
1803         break;
1804       case GNUNET_SET_RESULT_SYMMETRIC:
1805         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1806         break;
1807       default:
1808         /* Result mode not supported, should have been caught earlier. */
1809         GNUNET_break (0);
1810         break;
1811     }
1812   }
1813 }
1814
1815
1816 /**
1817  * Handle offers (of GNUNET_HashCode-s) and
1818  * respond with demands (of GNUNET_HashCode-s).
1819  *
1820  * @param cls the union operation
1821  * @param mh the message
1822  */
1823 static void
1824 handle_p2p_offer (void *cls,
1825                     const struct GNUNET_MessageHeader *mh)
1826 {
1827   struct Operation *op = cls;
1828   const struct GNUNET_HashCode *hash;
1829   unsigned int num_hashes;
1830
1831   /* look up elements and send them */
1832   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1833        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1834   {
1835     GNUNET_break_op (0);
1836     fail_union_operation (op);
1837     return;
1838   }
1839   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1840     / sizeof (struct GNUNET_HashCode);
1841   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1842       != num_hashes * sizeof (struct GNUNET_HashCode))
1843   {
1844     GNUNET_break_op (0);
1845     fail_union_operation (op);
1846     return;
1847   }
1848
1849   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1850        num_hashes > 0;
1851        hash++, num_hashes--)
1852   {
1853     struct ElementEntry *ee;
1854     struct GNUNET_MessageHeader *demands;
1855     struct GNUNET_MQ_Envelope *ev;
1856
1857     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1858                                             hash);
1859     if (NULL != ee)
1860       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1861         continue;
1862
1863     if (GNUNET_YES ==
1864         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1865                                                 hash))
1866     {
1867       LOG (GNUNET_ERROR_TYPE_DEBUG,
1868            "Skipped sending duplicate demand\n");
1869       continue;
1870     }
1871
1872     GNUNET_assert (GNUNET_OK ==
1873                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1874                                                       hash,
1875                                                       NULL,
1876                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1877
1878     LOG (GNUNET_ERROR_TYPE_DEBUG,
1879          "[OP %x] Requesting element (hash %s)\n",
1880          (void *) op, GNUNET_h2s (hash));
1881     ev = GNUNET_MQ_msg_header_extra (demands,
1882                                      sizeof (struct GNUNET_HashCode),
1883                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1884     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1885     GNUNET_MQ_send (op->mq, ev);
1886   }
1887 }
1888
1889
1890 /**
1891  * Handle a done message from a remote peer
1892  *
1893  * @param cls the union operation
1894  * @param mh the message
1895  */
1896 static void
1897 handle_p2p_done (void *cls,
1898                  const struct GNUNET_MessageHeader *mh)
1899 {
1900   struct Operation *op = cls;
1901
1902   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1903   {
1904     /* We got all requests, but still have to send our elements in response. */
1905
1906     op->state->phase = PHASE_FINISH_WAITING;
1907
1908     LOG (GNUNET_ERROR_TYPE_DEBUG,
1909          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1910     /* The active peer is done sending offers
1911      * and inquiries.  This means that all
1912      * our responses to that (demands and offers)
1913      * must be in flight (queued or in mesh).
1914      *
1915      * We should notify the active peer once
1916      * all our demands are satisfied, so that the active
1917      * peer can quit if we gave him everything.
1918      */
1919     maybe_finish (op);
1920     return;
1921   }
1922   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1923   {
1924     LOG (GNUNET_ERROR_TYPE_DEBUG,
1925          "got DONE (as active partner), waiting to finish\n");
1926     /* All demands of the other peer are satisfied,
1927      * and we processed all offers, thus we know
1928      * exactly what our demands must be.
1929      *
1930      * We'll close the channel
1931      * to the other peer once our demands are met.
1932      */
1933     op->state->phase = PHASE_FINISH_CLOSING;
1934     maybe_finish (op);
1935     return;
1936   }
1937   GNUNET_break_op (0);
1938   fail_union_operation (op);
1939 }
1940
1941
1942 /**
1943  * Initiate operation to evaluate a set union with a remote peer.
1944  *
1945  * @param op operation to perform (to be initialized)
1946  * @param opaque_context message to be transmitted to the listener
1947  *        to convince him to accept, may be NULL
1948  */
1949 static void
1950 union_evaluate (struct Operation *op,
1951                 const struct GNUNET_MessageHeader *opaque_context)
1952 {
1953   struct GNUNET_MQ_Envelope *ev;
1954   struct OperationRequestMessage *msg;
1955
1956   GNUNET_assert (NULL == op->state);
1957   op->state = GNUNET_new (struct OperationState);
1958   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1959   /* copy the current generation's strata estimator for this operation */
1960   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1961   /* we started the operation, thus we have to send the operation request */
1962   op->state->phase = PHASE_EXPECT_SE;
1963   op->state->salt_receive = op->state->salt_send = 42;
1964   LOG (GNUNET_ERROR_TYPE_DEBUG,
1965        "Initiating union operation evaluation\n");
1966   GNUNET_STATISTICS_update (_GSS_statistics,
1967                             "# of total union operations",
1968                             1,
1969                             GNUNET_NO);
1970   GNUNET_STATISTICS_update (_GSS_statistics,
1971                             "# of initiated union operations",
1972                             1,
1973                             GNUNET_NO);
1974   ev = GNUNET_MQ_msg_nested_mh (msg,
1975                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1976                                 opaque_context);
1977   if (NULL == ev)
1978   {
1979     /* the context message is too large */
1980     GNUNET_break (0);
1981     GNUNET_SERVICE_client_drop (op->spec->set->client);
1982     return;
1983   }
1984   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1985   GNUNET_MQ_send (op->mq,
1986                   ev);
1987
1988   if (NULL != opaque_context)
1989     LOG (GNUNET_ERROR_TYPE_DEBUG,
1990          "sent op request with context message\n");
1991   else
1992     LOG (GNUNET_ERROR_TYPE_DEBUG,
1993          "sent op request without context message\n");
1994
1995   initialize_key_to_element (op);
1996   op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
1997 }
1998
1999
2000 /**
2001  * Accept an union operation request from a remote peer.
2002  * Only initializes the private operation state.
2003  *
2004  * @param op operation that will be accepted as a union operation
2005  */
2006 static void
2007 union_accept (struct Operation *op)
2008 {
2009   LOG (GNUNET_ERROR_TYPE_DEBUG,
2010        "accepting set union operation\n");
2011   GNUNET_assert (NULL == op->state);
2012
2013   GNUNET_STATISTICS_update (_GSS_statistics,
2014                             "# of accepted union operations",
2015                             1,
2016                             GNUNET_NO);
2017   GNUNET_STATISTICS_update (_GSS_statistics,
2018                             "# of total union operations",
2019                             1,
2020                             GNUNET_NO);
2021
2022   op->state = GNUNET_new (struct OperationState);
2023   op->state->se = strata_estimator_dup (op->spec->set->state->se);
2024   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2025   op->state->salt_receive = op->state->salt_send = 42;
2026   initialize_key_to_element (op);
2027   op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2028   /* kick off the operation */
2029   send_strata_estimator (op);
2030 }
2031
2032
2033 /**
2034  * Create a new set supporting the union operation
2035  *
2036  * We maintain one strata estimator per set and then manipulate it over the
2037  * lifetime of the set, as recreating a strata estimator would be expensive.
2038  *
2039  * @return the newly created set, NULL on error
2040  */
2041 static struct SetState *
2042 union_set_create (void)
2043 {
2044   struct SetState *set_state;
2045
2046   LOG (GNUNET_ERROR_TYPE_DEBUG,
2047        "union set created\n");
2048   set_state = GNUNET_new (struct SetState);
2049   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2050                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2051   if (NULL == set_state->se)
2052   {
2053     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2054                 "Failed to allocate strata estimator\n");
2055     GNUNET_free (set_state);
2056     return NULL;
2057   }
2058   return set_state;
2059 }
2060
2061
2062 /**
2063  * Add the element from the given element message to the set.
2064  *
2065  * @param set_state state of the set want to add to
2066  * @param ee the element to add to the set
2067  */
2068 static void
2069 union_add (struct SetState *set_state, struct ElementEntry *ee)
2070 {
2071   strata_estimator_insert (set_state->se,
2072                            get_ibf_key (&ee->element_hash));
2073 }
2074
2075
2076 /**
2077  * Remove the element given in the element message from the set.
2078  * Only marks the element as removed, so that older set operations can still exchange it.
2079  *
2080  * @param set_state state of the set to remove from
2081  * @param ee set element to remove
2082  */
2083 static void
2084 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2085 {
2086   strata_estimator_remove (set_state->se,
2087                            get_ibf_key (&ee->element_hash));
2088 }
2089
2090
2091 /**
2092  * Destroy a set that supports the union operation.
2093  *
2094  * @param set_state the set to destroy
2095  */
2096 static void
2097 union_set_destroy (struct SetState *set_state)
2098 {
2099   if (NULL != set_state->se)
2100   {
2101     strata_estimator_destroy (set_state->se);
2102     set_state->se = NULL;
2103   }
2104   GNUNET_free (set_state);
2105 }
2106
2107
2108 /**
2109  * Dispatch messages for a union operation.
2110  *
2111  * @param op the state of the union evaluate operation
2112  * @param mh the received message
2113  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2114  *         #GNUNET_OK otherwise
2115  */
2116 int
2117 union_handle_p2p_message (struct Operation *op,
2118                           const struct GNUNET_MessageHeader *mh)
2119 {
2120   //LOG (GNUNET_ERROR_TYPE_DEBUG,
2121   //            "received p2p message (t: %u, s: %u)\n",
2122   //            ntohs (mh->type),
2123   //            ntohs (mh->size));
2124   switch (ntohs (mh->type))
2125   {
2126     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2127       return handle_p2p_ibf (op, mh);
2128     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2129       return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2130     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2131       return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2132     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2133       handle_p2p_elements (op, mh);
2134       break;
2135     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2136       handle_p2p_full_element (op, mh);
2137       break;
2138     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2139       handle_p2p_inquiry (op, mh);
2140       break;
2141     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2142       handle_p2p_done (op, mh);
2143       break;
2144     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2145       handle_p2p_offer (op, mh);
2146       break;
2147     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2148       handle_p2p_demand (op, mh);
2149       break;
2150     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2151       handle_p2p_full_done (op, mh);
2152       break;
2153     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2154       handle_p2p_request_full (op, mh);
2155       break;
2156     default:
2157       /* Something wrong with cadet's message handlers? */
2158       GNUNET_assert (0);
2159   }
2160   return GNUNET_OK;
2161 }
2162
2163
2164 /**
2165  * Handler for peer-disconnects, notifies the client
2166  * about the aborted operation in case the op was not concluded.
2167  *
2168  * @param op the destroyed operation
2169  */
2170 static void
2171 union_peer_disconnect (struct Operation *op)
2172 {
2173   if (PHASE_DONE != op->state->phase)
2174   {
2175     struct GNUNET_MQ_Envelope *ev;
2176     struct GNUNET_SET_ResultMessage *msg;
2177
2178     ev = GNUNET_MQ_msg (msg,
2179                         GNUNET_MESSAGE_TYPE_SET_RESULT);
2180     msg->request_id = htonl (op->spec->client_request_id);
2181     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2182     msg->element_type = htons (0);
2183     GNUNET_MQ_send (op->spec->set->client_mq,
2184                     ev);
2185     LOG (GNUNET_ERROR_TYPE_WARNING,
2186          "other peer disconnected prematurely, phase %u\n",
2187          op->state->phase);
2188     _GSS_operation_destroy (op,
2189                             GNUNET_YES);
2190     return;
2191   }
2192   // else: the session has already been concluded
2193   LOG (GNUNET_ERROR_TYPE_DEBUG,
2194        "other peer disconnected (finished)\n");
2195   if (GNUNET_NO == op->state->client_done_sent)
2196     send_done_and_destroy (op);
2197 }
2198
2199
2200 /**
2201  * Copy union-specific set state.
2202  *
2203  * @param set source set for copying the union state
2204  * @return a copy of the union-specific set state
2205  */
2206 static struct SetState *
2207 union_copy_state (struct Set *set)
2208 {
2209   struct SetState *new_state;
2210
2211   new_state = GNUNET_new (struct SetState);
2212   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2213   new_state->se = strata_estimator_dup (set->state->se);
2214
2215   return new_state;
2216 }
2217
2218
2219 /**
2220  * Get the table with implementing functions for
2221  * set union.
2222  *
2223  * @return the operation specific VTable
2224  */
2225 const struct SetVT *
2226 _GSS_union_vt ()
2227 {
2228   static const struct SetVT union_vt = {
2229     .create = &union_set_create,
2230     .msg_handler = &union_handle_p2p_message,
2231     .add = &union_add,
2232     .remove = &union_remove,
2233     .destroy_set = &union_set_destroy,
2234     .evaluate = &union_evaluate,
2235     .accept = &union_accept,
2236     .peer_disconnect = &union_peer_disconnect,
2237     .cancel = &union_op_cancel,
2238     .copy_state = &union_copy_state,
2239   };
2240
2241   return &union_vt;
2242 }