1a421063eb2aa450a87f9340c3477fa553a13040
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2016 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
33 #include <gcrypt.h>
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
37
38
39 /**
40  * Number of IBFs in a strata estimator.
41  */
42 #define SE_STRATA_COUNT 32
43
44 /**
45  * Size of the IBFs in the strata estimator.
46  */
47 #define SE_IBF_SIZE 80
48
49 /**
50  * The hash num parameter for the difference digests and strata estimators.
51  */
52 #define SE_IBF_HASH_NUM 4
53
54 /**
55  * Number of buckets that can be transmitted in one message.
56  */
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
58
59 /**
60  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61  * Choose this value so that computing the IBF is still cheaper
62  * than transmitting all values.
63  */
64 #define MAX_IBF_ORDER (20)
65
66 /**
67  * Number of buckets used in the ibf per estimated
68  * difference.
69  */
70 #define IBF_ALPHA 4
71
72
73 /**
74  * Current phase we are in for a union operation.
75  */
76 enum UnionOperationPhase
77 {
78   /**
79    * We sent the request message, and expect a strata estimator.
80    */
81   PHASE_EXPECT_SE,
82
83   /**
84    * We sent the strata estimator, and expect an IBF. This phase is entered once
85    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86    *
87    * XXX: could use better wording.
88    * XXX: repurposed to also expect a "request full set" message, should be renamed
89    *
90    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
91    */
92   PHASE_EXPECT_IBF,
93
94   /**
95    * Continuation for multi part IBFs.
96    */
97   PHASE_EXPECT_IBF_CONT,
98
99   /**
100    * We are decoding an IBF.
101    */
102   PHASE_INVENTORY_ACTIVE,
103
104   /**
105    * The other peer is decoding the IBF we just sent.
106    */
107   PHASE_INVENTORY_PASSIVE,
108
109   /**
110    * The protocol is almost finished, but we still have to flush our message
111    * queue and/or expect some elements.
112    */
113   PHASE_FINISH_CLOSING,
114
115   /**
116    * In the penultimate phase,
117    * we wait until all our demands
118    * are satisfied.  Then we send a done
119    * message, and wait for another done message.
120    */
121   PHASE_FINISH_WAITING,
122
123   /**
124    * In the ultimate phase, we wait until
125    * our demands are satisfied and then
126    * quit (sending another DONE message).
127    */
128   PHASE_DONE,
129
130   /**
131    * After sending the full set, wait for responses with the elements
132    * that the local peer is missing.
133    */
134   PHASE_FULL_SENDING,
135 };
136
137
138 /**
139  * State of an evaluate operation with another peer.
140  */
141 struct OperationState
142 {
143   /**
144    * Copy of the set's strata estimator at the time of
145    * creation of this operation.
146    */
147   struct StrataEstimator *se;
148
149   /**
150    * The IBF we currently receive.
151    */
152   struct InvertibleBloomFilter *remote_ibf;
153
154   /**
155    * The IBF with the local set's element.
156    */
157   struct InvertibleBloomFilter *local_ibf;
158
159   /**
160    * Maps unsalted IBF-Keys to elements.
161    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
162    * Colliding IBF-Keys are linked.
163    */
164   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
165
166   /**
167    * Current state of the operation.
168    */
169   enum UnionOperationPhase phase;
170
171   /**
172    * Did we send the client that we are done?
173    */
174   int client_done_sent;
175
176   /**
177    * Number of ibf buckets already received into the @a remote_ibf.
178    */
179   unsigned int ibf_buckets_received;
180
181   /**
182    * Hashes for elements that we have demanded from the other peer.
183    */
184   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
185
186   /**
187    * Salt that we're using for sending IBFs
188    */
189   uint32_t salt_send;
190
191   /**
192    * Salt for the IBF we've received and that we're currently decoding.
193    */
194   uint32_t salt_receive;
195
196   /**
197    * Number of elements we received from the other peer
198    * that were not in the local set yet.
199    */
200   uint32_t received_fresh;
201
202   /**
203    * Total number of elements received from the other peer.
204    */
205   uint32_t received_total;
206
207   /**
208    * Initial size of our set, just before
209    * the operation started.
210    */
211   uint64_t initial_size;
212 };
213
214
215 /**
216  * The key entry is used to associate an ibf key with an element.
217  */
218 struct KeyEntry
219 {
220   /**
221    * IBF key for the entry, derived from the current salt.
222    */
223   struct IBF_Key ibf_key;
224
225   /**
226    * The actual element associated with the key.
227    *
228    * Only owned by the union operation if element->operation
229    * is #GNUNET_YES.
230    */
231   struct ElementEntry *element;
232
233   /**
234    * Did we receive this element?
235    * Even if element->is_foreign is false, we might
236    * have received the element, so this indicates that
237    * the other peer has it.
238    */
239   int received;
240 };
241
242
243 /**
244  * Used as a closure for sending elements
245  * with a specific IBF key.
246  */
247 struct SendElementClosure
248 {
249   /**
250    * The IBF key whose matching elements should be
251    * sent.
252    */
253   struct IBF_Key ibf_key;
254
255   /**
256    * Operation for which the elements
257    * should be sent.
258    */
259   struct Operation *op;
260 };
261
262
263 /**
264  * Extra state required for efficient set union.
265  */
266 struct SetState
267 {
268   /**
269    * The strata estimator is only generated once for
270    * each set.
271    * The IBF keys are derived from the element hashes with
272    * salt=0.
273    */
274   struct StrataEstimator *se;
275 };
276
277
278 /**
279  * Iterator over hash map entries, called to
280  * destroy the linked list of colliding ibf key entries.
281  *
282  * @param cls closure
283  * @param key current key code
284  * @param value value in the hash map
285  * @return #GNUNET_YES if we should continue to iterate,
286  *         #GNUNET_NO if not.
287  */
288 static int
289 destroy_key_to_element_iter (void *cls,
290                              uint32_t key,
291                              void *value)
292 {
293   struct KeyEntry *k = value;
294
295   GNUNET_assert (NULL != k);
296   if (GNUNET_YES == k->element->remote)
297   {
298     GNUNET_free (k->element);
299     k->element = NULL;
300   }
301   GNUNET_free (k);
302   return GNUNET_YES;
303 }
304
305
306 /**
307  * Destroy the union operation.  Only things specific to the union
308  * operation are destroyed.
309  *
310  * @param op union operation to destroy
311  */
312 static void
313 union_op_cancel (struct Operation *op)
314 {
315   LOG (GNUNET_ERROR_TYPE_DEBUG,
316        "destroying union op\n");
317   /* check if the op was canceled twice */
318   GNUNET_assert (NULL != op->state);
319   if (NULL != op->state->remote_ibf)
320   {
321     ibf_destroy (op->state->remote_ibf);
322     op->state->remote_ibf = NULL;
323   }
324   if (NULL != op->state->demanded_hashes)
325   {
326     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
327     op->state->demanded_hashes = NULL;
328   }
329   if (NULL != op->state->local_ibf)
330   {
331     ibf_destroy (op->state->local_ibf);
332     op->state->local_ibf = NULL;
333   }
334   if (NULL != op->state->se)
335   {
336     strata_estimator_destroy (op->state->se);
337     op->state->se = NULL;
338   }
339   if (NULL != op->state->key_to_element)
340   {
341     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
342                                              &destroy_key_to_element_iter,
343                                              NULL);
344     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
345     op->state->key_to_element = NULL;
346   }
347   GNUNET_free (op->state);
348   op->state = NULL;
349   LOG (GNUNET_ERROR_TYPE_DEBUG,
350        "destroying union op done\n");
351 }
352
353
354 /**
355  * Inform the client that the union operation has failed,
356  * and proceed to destroy the evaluate operation.
357  *
358  * @param op the union operation to fail
359  */
360 static void
361 fail_union_operation (struct Operation *op)
362 {
363   struct GNUNET_MQ_Envelope *ev;
364   struct GNUNET_SET_ResultMessage *msg;
365
366   LOG (GNUNET_ERROR_TYPE_ERROR,
367        "union operation failed\n");
368   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
369   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
370   msg->request_id = htonl (op->spec->client_request_id);
371   msg->element_type = htons (0);
372   GNUNET_MQ_send (op->spec->set->client_mq, ev);
373   _GSS_operation_destroy (op, GNUNET_YES);
374 }
375
376
377 /**
378  * Derive the IBF key from a hash code and
379  * a salt.
380  *
381  * @param src the hash code
382  * @return the derived IBF key
383  */
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
386 {
387   struct IBF_Key key;
388   uint16_t salt = 0;
389
390   GNUNET_CRYPTO_kdf (&key, sizeof (key),
391                      src, sizeof *src,
392                      &salt, sizeof (salt),
393                      NULL, 0);
394   return key;
395 }
396
397
398 /**
399  * Context for #op_get_element_iterator
400  */
401 struct GetElementContext
402 {
403   struct GNUNET_HashCode hash;
404   struct KeyEntry *k;
405 };
406
407
408 /**
409  * Iterator over the mapping from IBF keys to element entries.  Checks if we
410  * have an element with a given GNUNET_HashCode.
411  *
412  * @param cls closure
413  * @param key current key code
414  * @param value value in the hash map
415  * @return #GNUNET_YES if we should search further,
416  *         #GNUNET_NO if we've found the element.
417  */
418 static int
419 op_get_element_iterator (void *cls,
420                          uint32_t key,
421                          void *value)
422 {
423   struct GetElementContext *ctx = cls;
424   struct KeyEntry *k = value;
425
426   GNUNET_assert (NULL != k);
427   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
428                                    &ctx->hash))
429   {
430     ctx->k = k;
431     return GNUNET_NO;
432   }
433   return GNUNET_YES;
434 }
435
436
437 /**
438  * Determine whether the given element is already in the operation's element
439  * set.
440  *
441  * @param op operation that should be tested for 'element_hash'
442  * @param element_hash hash of the element to look for
443  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
444  */
445 static struct KeyEntry *
446 op_get_element (struct Operation *op,
447                 const struct GNUNET_HashCode *element_hash)
448 {
449   int ret;
450   struct IBF_Key ibf_key;
451   struct GetElementContext ctx = {{{ 0 }} , 0};
452
453   ctx.hash = *element_hash;
454
455   ibf_key = get_ibf_key (element_hash);
456   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
457                                                       (uint32_t) ibf_key.key_val,
458                                                       op_get_element_iterator,
459                                                       &ctx);
460
461   /* was the iteration aborted because we found the element? */
462   if (GNUNET_SYSERR == ret)
463   {
464     GNUNET_assert (NULL != ctx.k);
465     return ctx.k;
466   }
467   return NULL;
468 }
469
470
471 /**
472  * Insert an element into the union operation's
473  * key-to-element mapping. Takes ownership of 'ee'.
474  * Note that this does not insert the element in the set,
475  * only in the operation's key-element mapping.
476  * This is done to speed up re-tried operations, if some elements
477  * were transmitted, and then the IBF fails to decode.
478  *
479  * XXX: clarify ownership, doesn't sound right.
480  *
481  * @param op the union operation
482  * @param ee the element entry
483  * @parem received was this element received from the remote peer?
484  */
485 static void
486 op_register_element (struct Operation *op,
487                      struct ElementEntry *ee,
488                      int received)
489 {
490   struct IBF_Key ibf_key;
491   struct KeyEntry *k;
492
493   ibf_key = get_ibf_key (&ee->element_hash);
494   k = GNUNET_new (struct KeyEntry);
495   k->element = ee;
496   k->ibf_key = ibf_key;
497   k->received = received;
498   GNUNET_assert (GNUNET_OK ==
499                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
500                                                       (uint32_t) ibf_key.key_val,
501                                                       k,
502                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
503 }
504
505
506 static void
507 salt_key (const struct IBF_Key *k_in,
508           uint32_t salt,
509           struct IBF_Key *k_out)
510 {
511   int s = salt % 64;
512   uint64_t x = k_in->key_val;
513   /* rotate ibf key */
514   x = (x >> s) | (x << (64 - s));
515   k_out->key_val = x;
516 }
517
518
519 static void
520 unsalt_key (const struct IBF_Key *k_in,
521             uint32_t salt,
522             struct IBF_Key *k_out)
523 {
524   int s = salt % 64;
525   uint64_t x = k_in->key_val;
526   x = (x << s) | (x >> (64 - s));
527   k_out->key_val = x;
528 }
529
530
531 /**
532  * Insert a key into an ibf.
533  *
534  * @param cls the ibf
535  * @param key unused
536  * @param value the key entry to get the key from
537  */
538 static int
539 prepare_ibf_iterator (void *cls,
540                       uint32_t key,
541                       void *value)
542 {
543   struct Operation *op = cls;
544   struct KeyEntry *ke = value;
545   struct IBF_Key salted_key;
546
547   LOG (GNUNET_ERROR_TYPE_DEBUG,
548        "[OP %x] inserting %lx (hash %s) into ibf\n",
549        (void *) op,
550        (unsigned long) ke->ibf_key.key_val,
551        GNUNET_h2s (&ke->element->element_hash));
552   salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
553   ibf_insert (op->state->local_ibf, salted_key);
554   return GNUNET_YES;
555 }
556
557
558 /**
559  * Iterator for initializing the
560  * key-to-element mapping of a union operation
561  *
562  * @param cls the union operation `struct Operation *`
563  * @param key unused
564  * @param value the `struct ElementEntry *` to insert
565  *        into the key-to-element mapping
566  * @return #GNUNET_YES (to continue iterating)
567  */
568 static int
569 init_key_to_element_iterator (void *cls,
570                               const struct GNUNET_HashCode *key,
571                               void *value)
572 {
573   struct Operation *op = cls;
574   struct ElementEntry *ee = value;
575
576   /* make sure that the element belongs to the set at the time
577    * of creating the operation */
578   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
579     return GNUNET_YES;
580
581   GNUNET_assert (GNUNET_NO == ee->remote);
582
583   op_register_element (op, ee, GNUNET_NO);
584   return GNUNET_YES;
585 }
586
587
588 /**
589  * Initialize the IBF key to element mapping local to this set
590  * operation.
591  *
592  * @param op the set union operation
593  */
594 static void
595 initialize_key_to_element (struct Operation *op)
596 {
597   unsigned int len;
598
599   GNUNET_assert (NULL == op->state->key_to_element);
600   len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
603 }
604
605
606 /**
607  * Create an ibf with the operation's elements
608  * of the specified size
609  *
610  * @param op the union operation
611  * @param size size of the ibf to create
612  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
613  */
614 static int
615 prepare_ibf (struct Operation *op,
616              uint32_t size)
617 {
618   GNUNET_assert (NULL != op->state->key_to_element);
619
620   if (NULL != op->state->local_ibf)
621     ibf_destroy (op->state->local_ibf);
622   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
623   if (NULL == op->state->local_ibf)
624   {
625     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
626                 "Failed to allocate local IBF\n");
627     return GNUNET_SYSERR;
628   }
629   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
630                                            &prepare_ibf_iterator,
631                                            op);
632   return GNUNET_OK;
633 }
634
635
636 /**
637  * Send an ibf of appropriate size.
638  *
639  * Fragments the IBF into multiple messages if necessary.
640  *
641  * @param op the union operation
642  * @param ibf_order order of the ibf to send, size=2^order
643  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
644  */
645 static int
646 send_ibf (struct Operation *op,
647           uint16_t ibf_order)
648 {
649   unsigned int buckets_sent = 0;
650   struct InvertibleBloomFilter *ibf;
651
652   if (GNUNET_OK !=
653       prepare_ibf (op, 1<<ibf_order))
654   {
655     /* allocation failed */
656     return GNUNET_SYSERR;
657   }
658
659   LOG (GNUNET_ERROR_TYPE_DEBUG,
660        "sending ibf of size %u\n",
661        1<<ibf_order);
662
663   {
664     char name[64] = { 0 };
665     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
666     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
667   }
668
669   ibf = op->state->local_ibf;
670
671   while (buckets_sent < (1 << ibf_order))
672   {
673     unsigned int buckets_in_message;
674     struct GNUNET_MQ_Envelope *ev;
675     struct IBFMessage *msg;
676
677     buckets_in_message = (1 << ibf_order) - buckets_sent;
678     /* limit to maximum */
679     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
680       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
681
682     ev = GNUNET_MQ_msg_extra (msg,
683                               buckets_in_message * IBF_BUCKET_SIZE,
684                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
685     msg->reserved1 = 0;
686     msg->reserved2 = 0;
687     msg->order = ibf_order;
688     msg->offset = htonl (buckets_sent);
689     msg->salt = htonl (op->state->salt_send);
690     ibf_write_slice (ibf, buckets_sent,
691                      buckets_in_message, &msg[1]);
692     buckets_sent += buckets_in_message;
693     LOG (GNUNET_ERROR_TYPE_DEBUG,
694          "ibf chunk size %u, %u/%u sent\n",
695          buckets_in_message,
696          buckets_sent,
697          1<<ibf_order);
698     GNUNET_MQ_send (op->mq, ev);
699   }
700
701   /* The other peer must decode the IBF, so
702    * we're passive. */
703   op->state->phase = PHASE_INVENTORY_PASSIVE;
704   return GNUNET_OK;
705 }
706
707
708 /**
709  * Send a strata estimator to the remote peer.
710  *
711  * @param op the union operation with the remote peer
712  */
713 static void
714 send_strata_estimator (struct Operation *op)
715 {
716   const struct StrataEstimator *se = op->state->se;
717   struct GNUNET_MQ_Envelope *ev;
718   struct StrataEstimatorMessage *strata_msg;
719   char *buf;
720   size_t len;
721   uint16_t type;
722
723   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
724   len = strata_estimator_write (op->state->se,
725                                 buf);
726   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
727     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
728   else
729     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
730   ev = GNUNET_MQ_msg_extra (strata_msg,
731                             len,
732                             type);
733   GNUNET_memcpy (&strata_msg[1],
734           buf,
735           len);
736   GNUNET_free (buf);
737   strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
738   GNUNET_MQ_send (op->mq,
739                   ev);
740   op->state->phase = PHASE_EXPECT_IBF;
741   LOG (GNUNET_ERROR_TYPE_DEBUG,
742        "sent SE, expecting IBF\n");
743 }
744
745
746 /**
747  * Compute the necessary order of an ibf
748  * from the size of the symmetric set difference.
749  *
750  * @param diff the difference
751  * @return the required size of the ibf
752  */
753 static unsigned int
754 get_order_from_difference (unsigned int diff)
755 {
756   unsigned int ibf_order;
757
758   ibf_order = 2;
759   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
760           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
761     ibf_order++;
762   if (ibf_order > MAX_IBF_ORDER)
763     ibf_order = MAX_IBF_ORDER;
764   // add one for correction
765   return ibf_order + 1;
766 }
767
768
769 /**
770  * Send a set element.
771  *
772  * @param cls the union operation `struct Operation *`
773  * @param key unused
774  * @param value the `struct ElementEntry *` to insert
775  *        into the key-to-element mapping
776  * @return #GNUNET_YES (to continue iterating)
777  */
778 static int
779 send_element_iterator (void *cls,
780                        const struct GNUNET_HashCode *key,
781                        void *value)
782 {
783   struct Operation *op = cls;
784   struct GNUNET_SET_ElementMessage *emsg;
785   struct ElementEntry *ee = value;
786   struct GNUNET_SET_Element *el = &ee->element;
787   struct GNUNET_MQ_Envelope *ev;
788
789
790   ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
791   emsg->element_type = htons (el->element_type);
792   GNUNET_memcpy (&emsg[1], el->data, el->size);
793   GNUNET_MQ_send (op->mq, ev);
794   return GNUNET_YES;
795 }
796
797
798 static void
799 send_full_set (struct Operation *op)
800 {
801   struct GNUNET_MQ_Envelope *ev;
802
803   op->state->phase = PHASE_FULL_SENDING;
804
805   (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
806                                                 &send_element_iterator, op);
807   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
808   GNUNET_MQ_send (op->mq, ev);
809 }
810
811
812 /**
813  * Handle a strata estimator from a remote peer
814  *
815  * @param cls the union operation
816  * @param mh the message
817  * @param is_compressed #GNUNET_YES if the estimator is compressed
818  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
819  *         #GNUNET_OK otherwise
820  */
821 static int
822 handle_p2p_strata_estimator (void *cls,
823                              const struct GNUNET_MessageHeader *mh,
824                              int is_compressed)
825 {
826   struct Operation *op = cls;
827   struct StrataEstimator *remote_se;
828   struct StrataEstimatorMessage *msg = (void *) mh;
829   unsigned int diff;
830   uint64_t other_size;
831   size_t len;
832
833   GNUNET_STATISTICS_update (_GSS_statistics,
834                             "# bytes of SE received",
835                             ntohs (mh->size),
836                             GNUNET_NO);
837
838   if (op->state->phase != PHASE_EXPECT_SE)
839   {
840     GNUNET_break (0);
841     fail_union_operation (op);
842     return GNUNET_SYSERR;
843   }
844   len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
845   if ( (GNUNET_NO == is_compressed) &&
846        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
847   {
848     fail_union_operation (op);
849     GNUNET_break (0);
850     return GNUNET_SYSERR;
851   }
852   other_size = GNUNET_ntohll (msg->set_size);
853   remote_se = strata_estimator_create (SE_STRATA_COUNT,
854                                        SE_IBF_SIZE,
855                                        SE_IBF_HASH_NUM);
856   if (NULL == remote_se)
857   {
858     /* insufficient resources, fail */
859     fail_union_operation (op);
860     return GNUNET_SYSERR;
861   }
862   if (GNUNET_OK !=
863       strata_estimator_read (&msg[1],
864                              len,
865                              is_compressed,
866                              remote_se))
867   {
868     /* decompression failed */
869     fail_union_operation (op);
870     strata_estimator_destroy (remote_se);
871     return GNUNET_SYSERR;
872   }
873   GNUNET_assert (NULL != op->state->se);
874   diff = strata_estimator_difference (remote_se,
875                                       op->state->se);
876   strata_estimator_destroy (remote_se);
877   strata_estimator_destroy (op->state->se);
878   op->state->se = NULL;
879   LOG (GNUNET_ERROR_TYPE_DEBUG,
880        "got se diff=%d, using ibf size %d\n",
881        diff,
882        1<<get_order_from_difference (diff));
883
884   if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
885   {
886     GNUNET_break (0);
887     fail_union_operation (op);
888     return GNUNET_SYSERR;
889   }
890
891
892   if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2))
893   {
894     LOG (GNUNET_ERROR_TYPE_INFO,
895          "Sending full set (diff=%d, own set=%u)\n",
896          diff,
897          op->state->initial_size);
898     if (op->state->initial_size <= other_size)
899     {
900       send_full_set (op);
901     }
902     else
903     {
904       struct GNUNET_MQ_Envelope *ev;
905       op->state->phase = PHASE_EXPECT_IBF;
906       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
907       GNUNET_MQ_send (op->mq, ev);
908     }
909   }
910   else
911   {
912     if (GNUNET_OK !=
913         send_ibf (op,
914                   get_order_from_difference (diff)))
915     {
916       /* Internal error, best we can do is shut the connection */
917       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
918                   "Failed to send IBF, closing connection\n");
919       fail_union_operation (op);
920       return GNUNET_SYSERR;
921     }
922   }
923
924   return GNUNET_OK;
925 }
926
927
928 /**
929  * Iterator to send elements to a remote peer
930  *
931  * @param cls closure with the element key and the union operation
932  * @param key ignored
933  * @param value the key entry
934  */
935 static int
936 send_offers_iterator (void *cls,
937                       uint32_t key,
938                       void *value)
939 {
940   struct SendElementClosure *sec = cls;
941   struct Operation *op = sec->op;
942   struct KeyEntry *ke = value;
943   struct GNUNET_MQ_Envelope *ev;
944   struct GNUNET_MessageHeader *mh;
945
946   /* Detect 32-bit key collision for the 64-bit IBF keys. */
947   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
948     return GNUNET_YES;
949
950   ev = GNUNET_MQ_msg_header_extra (mh,
951                                    sizeof (struct GNUNET_HashCode),
952                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
953
954   GNUNET_assert (NULL != ev);
955   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
956   LOG (GNUNET_ERROR_TYPE_DEBUG,
957        "[OP %x] sending element offer (%s) to peer\n",
958        (void *) op,
959        GNUNET_h2s (&ke->element->element_hash));
960   GNUNET_MQ_send (op->mq, ev);
961   return GNUNET_YES;
962 }
963
964
965 /**
966  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
967  *
968  * @param op union operation
969  * @param ibf_key IBF key of interest
970  */
971 static void
972 send_offers_for_key (struct Operation *op,
973                      struct IBF_Key ibf_key)
974 {
975   struct SendElementClosure send_cls;
976
977   send_cls.ibf_key = ibf_key;
978   send_cls.op = op;
979   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
980                                                        (uint32_t) ibf_key.key_val,
981                                                        &send_offers_iterator,
982                                                        &send_cls);
983 }
984
985
986 /**
987  * Decode which elements are missing on each side, and
988  * send the appropriate offers and inquiries.
989  *
990  * @param op union operation
991  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
992  */
993 static int
994 decode_and_send (struct Operation *op)
995 {
996   struct IBF_Key key;
997   struct IBF_Key last_key;
998   int side;
999   unsigned int num_decoded;
1000   struct InvertibleBloomFilter *diff_ibf;
1001
1002   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1003
1004   if (GNUNET_OK !=
1005       prepare_ibf (op, op->state->remote_ibf->size))
1006   {
1007     GNUNET_break (0);
1008     /* allocation failed */
1009     return GNUNET_SYSERR;
1010   }
1011   diff_ibf = ibf_dup (op->state->local_ibf);
1012   ibf_subtract (diff_ibf, op->state->remote_ibf);
1013
1014   ibf_destroy (op->state->remote_ibf);
1015   op->state->remote_ibf = NULL;
1016
1017   LOG (GNUNET_ERROR_TYPE_DEBUG,
1018        "decoding IBF (size=%u)\n",
1019        diff_ibf->size);
1020
1021   num_decoded = 0;
1022   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1023
1024   while (1)
1025   {
1026     int res;
1027     int cycle_detected = GNUNET_NO;
1028
1029     last_key = key;
1030
1031     res = ibf_decode (diff_ibf, &side, &key);
1032     if (res == GNUNET_OK)
1033     {
1034       LOG (GNUNET_ERROR_TYPE_DEBUG,
1035            "decoded ibf key %lx\n",
1036            (unsigned long) key.key_val);
1037       num_decoded += 1;
1038       if ( (num_decoded > diff_ibf->size) ||
1039            ( (num_decoded > 1) &&
1040              (last_key.key_val == key.key_val) ) )
1041       {
1042         LOG (GNUNET_ERROR_TYPE_DEBUG,
1043              "detected cyclic ibf (decoded %u/%u)\n",
1044              num_decoded,
1045              diff_ibf->size);
1046         cycle_detected = GNUNET_YES;
1047       }
1048     }
1049     if ( (GNUNET_SYSERR == res) ||
1050          (GNUNET_YES == cycle_detected) )
1051     {
1052       int next_order;
1053       next_order = 0;
1054       while (1<<next_order < diff_ibf->size)
1055         next_order++;
1056       next_order++;
1057       if (next_order <= MAX_IBF_ORDER)
1058       {
1059         LOG (GNUNET_ERROR_TYPE_DEBUG,
1060              "decoding failed, sending larger ibf (size %u)\n",
1061              1<<next_order);
1062         GNUNET_STATISTICS_update (_GSS_statistics,
1063                                   "# of IBF retries",
1064                                   1,
1065                                   GNUNET_NO);
1066         op->state->salt_send++;
1067         if (GNUNET_OK !=
1068             send_ibf (op, next_order))
1069         {
1070           /* Internal error, best we can do is shut the connection */
1071           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1072                       "Failed to send IBF, closing connection\n");
1073           fail_union_operation (op);
1074           ibf_destroy (diff_ibf);
1075           return GNUNET_SYSERR;
1076         }
1077       }
1078       else
1079       {
1080         GNUNET_STATISTICS_update (_GSS_statistics,
1081                                   "# of failed union operations (too large)",
1082                                   1,
1083                                   GNUNET_NO);
1084         // XXX: Send the whole set, element-by-element
1085         LOG (GNUNET_ERROR_TYPE_ERROR,
1086              "set union failed: reached ibf limit\n");
1087         fail_union_operation (op);
1088         ibf_destroy (diff_ibf);
1089         return GNUNET_SYSERR;
1090       }
1091       break;
1092     }
1093     if (GNUNET_NO == res)
1094     {
1095       struct GNUNET_MQ_Envelope *ev;
1096
1097       LOG (GNUNET_ERROR_TYPE_DEBUG,
1098            "transmitted all values, sending DONE\n");
1099       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1100       GNUNET_MQ_send (op->mq, ev);
1101       /* We now wait until we get a DONE message back
1102        * and then wait for our MQ to be flushed and all our
1103        * demands be delivered. */
1104       break;
1105     }
1106     if (1 == side)
1107     {
1108       struct IBF_Key unsalted_key;
1109       unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1110       send_offers_for_key (op, unsalted_key);
1111     }
1112     else if (-1 == side)
1113     {
1114       struct GNUNET_MQ_Envelope *ev;
1115       struct InquiryMessage *msg;
1116
1117       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1118        * the effort additional complexity. */
1119       ev = GNUNET_MQ_msg_extra (msg,
1120                                 sizeof (struct IBF_Key),
1121                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1122       msg->salt = htonl (op->state->salt_receive);
1123       GNUNET_memcpy (&msg[1],
1124               &key,
1125               sizeof (struct IBF_Key));
1126       LOG (GNUNET_ERROR_TYPE_DEBUG,
1127            "sending element inquiry for IBF key %lx\n",
1128            (unsigned long) key.key_val);
1129       GNUNET_MQ_send (op->mq, ev);
1130     }
1131     else
1132     {
1133       GNUNET_assert (0);
1134     }
1135   }
1136   ibf_destroy (diff_ibf);
1137   return GNUNET_OK;
1138 }
1139
1140
1141 /**
1142  * Handle an IBF message from a remote peer.
1143  *
1144  * Reassemble the IBF from multiple pieces, and
1145  * process the whole IBF once possible.
1146  *
1147  * @param cls the union operation
1148  * @param mh the header of the message
1149  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1150  *         #GNUNET_OK otherwise
1151  */
1152 static int
1153 handle_p2p_ibf (void *cls,
1154                 const struct GNUNET_MessageHeader *mh)
1155 {
1156   struct Operation *op = cls;
1157   const struct IBFMessage *msg;
1158   unsigned int buckets_in_message;
1159
1160   if (ntohs (mh->size) < sizeof (struct IBFMessage))
1161   {
1162     GNUNET_break_op (0);
1163     fail_union_operation (op);
1164     return GNUNET_SYSERR;
1165   }
1166   msg = (const struct IBFMessage *) mh;
1167   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1168        (op->state->phase == PHASE_EXPECT_IBF) )
1169   {
1170     op->state->phase = PHASE_EXPECT_IBF_CONT;
1171     GNUNET_assert (NULL == op->state->remote_ibf);
1172     LOG (GNUNET_ERROR_TYPE_DEBUG,
1173          "Creating new ibf of size %u\n",
1174          1 << msg->order);
1175     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1176     op->state->salt_receive = ntohl (msg->salt);
1177     LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1178     if (NULL == op->state->remote_ibf)
1179     {
1180       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1181                   "Failed to parse remote IBF, closing connection\n");
1182       fail_union_operation (op);
1183       return GNUNET_SYSERR;
1184     }
1185     op->state->ibf_buckets_received = 0;
1186     if (0 != ntohl (msg->offset))
1187     {
1188       GNUNET_break_op (0);
1189       fail_union_operation (op);
1190       return GNUNET_SYSERR;
1191     }
1192   }
1193   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1194   {
1195     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1196     {
1197       GNUNET_break_op (0);
1198       fail_union_operation (op);
1199       return GNUNET_SYSERR;
1200     }
1201     if (1<<msg->order != op->state->remote_ibf->size)
1202     {
1203       GNUNET_break_op (0);
1204       fail_union_operation (op);
1205       return GNUNET_SYSERR;
1206     }
1207     if (ntohl (msg->salt) != op->state->salt_receive)
1208     {
1209       GNUNET_break_op (0);
1210       fail_union_operation (op);
1211       return GNUNET_SYSERR;
1212     }
1213   }
1214   else
1215   {
1216     GNUNET_assert (0);
1217   }
1218
1219   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1220
1221   if (0 == buckets_in_message)
1222   {
1223     GNUNET_break_op (0);
1224     fail_union_operation (op);
1225     return GNUNET_SYSERR;
1226   }
1227
1228   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1229   {
1230     GNUNET_break_op (0);
1231     fail_union_operation (op);
1232     return GNUNET_SYSERR;
1233   }
1234
1235   GNUNET_assert (NULL != op->state->remote_ibf);
1236
1237   ibf_read_slice (&msg[1],
1238                   op->state->ibf_buckets_received,
1239                   buckets_in_message,
1240                   op->state->remote_ibf);
1241   op->state->ibf_buckets_received += buckets_in_message;
1242
1243   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1244   {
1245     LOG (GNUNET_ERROR_TYPE_DEBUG,
1246          "received full ibf\n");
1247     op->state->phase = PHASE_INVENTORY_ACTIVE;
1248     if (GNUNET_OK !=
1249         decode_and_send (op))
1250     {
1251       /* Internal error, best we can do is shut down */
1252       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1253                   "Failed to decode IBF, closing connection\n");
1254       return GNUNET_SYSERR;
1255     }
1256   }
1257   return GNUNET_OK;
1258 }
1259
1260
1261 /**
1262  * Send a result message to the client indicating
1263  * that there is a new element.
1264  *
1265  * @param op union operation
1266  * @param element element to send
1267  * @param status status to send with the new element
1268  */
1269 static void
1270 send_client_element (struct Operation *op,
1271                      struct GNUNET_SET_Element *element,
1272                      int status)
1273 {
1274   struct GNUNET_MQ_Envelope *ev;
1275   struct GNUNET_SET_ResultMessage *rm;
1276
1277   LOG (GNUNET_ERROR_TYPE_DEBUG,
1278        "sending element (size %u) to client\n",
1279        element->size);
1280   GNUNET_assert (0 != op->spec->client_request_id);
1281   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1282   if (NULL == ev)
1283   {
1284     GNUNET_MQ_discard (ev);
1285     GNUNET_break (0);
1286     return;
1287   }
1288   rm->result_status = htons (status);
1289   rm->request_id = htonl (op->spec->client_request_id);
1290   rm->element_type = htons (element->element_type);
1291   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1292   GNUNET_memcpy (&rm[1], element->data, element->size);
1293   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1294 }
1295
1296
1297 /**
1298  * Signal to the client that the operation has finished and
1299  * destroy the operation.
1300  *
1301  * @param cls operation to destroy
1302  */
1303 static void
1304 send_done_and_destroy (void *cls)
1305 {
1306   struct Operation *op = cls;
1307   struct GNUNET_MQ_Envelope *ev;
1308   struct GNUNET_SET_ResultMessage *rm;
1309
1310   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1311   rm->request_id = htonl (op->spec->client_request_id);
1312   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1313   rm->element_type = htons (0);
1314   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1315   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1316   /* Will also call the union-specific cancel function. */
1317   _GSS_operation_destroy (op, GNUNET_YES);
1318 }
1319
1320
1321 static void
1322 maybe_finish (struct Operation *op)
1323 {
1324   unsigned int num_demanded;
1325
1326   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1327
1328   if (PHASE_FINISH_WAITING == op->state->phase)
1329   {
1330     LOG (GNUNET_ERROR_TYPE_DEBUG,
1331          "In PHASE_FINISH_WAITING, pending %u demands\n",
1332          num_demanded);
1333     if (0 == num_demanded)
1334     {
1335       struct GNUNET_MQ_Envelope *ev;
1336
1337       op->state->phase = PHASE_DONE;
1338       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1339       GNUNET_MQ_send (op->mq, ev);
1340
1341       /* We now wait until the other peer closes the channel
1342        * after it got all elements from us. */
1343     }
1344   }
1345   if (PHASE_FINISH_CLOSING == op->state->phase)
1346   {
1347     LOG (GNUNET_ERROR_TYPE_DEBUG,
1348          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1349          num_demanded);
1350     if (0 == num_demanded)
1351     {
1352       op->state->phase = PHASE_DONE;
1353       send_done_and_destroy (op);
1354     }
1355   }
1356 }
1357
1358
1359 /**
1360  * Handle an element message from a remote peer.
1361  * Sent by the other peer either because we decoded an IBF and placed a demand,
1362  * or because the other peer switched to full set transmission.
1363  *
1364  * @param cls the union operation
1365  * @param mh the message
1366  */
1367 static void
1368 handle_p2p_elements (void *cls,
1369                      const struct GNUNET_MessageHeader *mh)
1370 {
1371   struct Operation *op = cls;
1372   struct ElementEntry *ee;
1373   const struct GNUNET_SET_ElementMessage *emsg;
1374   uint16_t element_size;
1375
1376   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1377   {
1378     GNUNET_break_op (0);
1379     fail_union_operation (op);
1380     return;
1381   }
1382   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1383   {
1384     GNUNET_break_op (0);
1385     fail_union_operation (op);
1386     return;
1387   }
1388
1389   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1390
1391   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1392   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1393   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1394   ee->element.size = element_size;
1395   ee->element.data = &ee[1];
1396   ee->element.element_type = ntohs (emsg->element_type);
1397   ee->remote = GNUNET_YES;
1398   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1399
1400   if (GNUNET_NO ==
1401       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1402                                             &ee->element_hash,
1403                                             NULL))
1404   {
1405     /* We got something we didn't demand, since it's not in our map. */
1406     GNUNET_break_op (0);
1407     GNUNET_free (ee);
1408     fail_union_operation (op);
1409     return;
1410   }
1411
1412   LOG (GNUNET_ERROR_TYPE_DEBUG,
1413        "Got element (size %u, hash %s) from peer\n",
1414        (unsigned int) element_size,
1415        GNUNET_h2s (&ee->element_hash));
1416
1417   GNUNET_STATISTICS_update (_GSS_statistics,
1418                             "# received elements",
1419                             1,
1420                             GNUNET_NO);
1421   GNUNET_STATISTICS_update (_GSS_statistics,
1422                             "# exchanged elements",
1423                             1,
1424                             GNUNET_NO);
1425
1426   op->state->received_total += 1;
1427
1428   struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1429
1430   if (NULL != ke)
1431   {
1432     /* Got repeated element.  Should not happen since
1433      * we track demands. */
1434     GNUNET_STATISTICS_update (_GSS_statistics,
1435                               "# repeated elements",
1436                               1,
1437                               GNUNET_NO);
1438     ke->received = GNUNET_YES;
1439     GNUNET_free (ee);
1440   }
1441   else
1442   {
1443     LOG (GNUNET_ERROR_TYPE_DEBUG,
1444          "Registering new element from remote peer\n");
1445     op->state->received_fresh += 1;
1446     op_register_element (op, ee, GNUNET_YES);
1447     /* only send results immediately if the client wants it */
1448     switch (op->spec->result_mode)
1449     {
1450       case GNUNET_SET_RESULT_ADDED:
1451         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1452         break;
1453       case GNUNET_SET_RESULT_SYMMETRIC:
1454         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1455         break;
1456       default:
1457         /* Result mode not supported, should have been caught earlier. */
1458         GNUNET_break (0);
1459         break;
1460     }
1461   }
1462
1463   if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1464   {
1465     /* The other peer gave us lots of old elements, there's something wrong. */
1466     GNUNET_break_op (0);
1467     fail_union_operation (op);
1468     return;
1469   }
1470
1471   maybe_finish (op);
1472 }
1473
1474
1475 /**
1476  * Handle an element message from a remote peer.
1477  *
1478  * @param cls the union operation
1479  * @param mh the message
1480  */
1481 static void
1482 handle_p2p_full_element (void *cls,
1483                          const struct GNUNET_MessageHeader *mh)
1484 {
1485   struct Operation *op = cls;
1486   struct ElementEntry *ee;
1487   const struct GNUNET_SET_ElementMessage *emsg;
1488   uint16_t element_size;
1489
1490   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1491   {
1492     GNUNET_break_op (0);
1493     fail_union_operation (op);
1494     return;
1495   }
1496
1497   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1498
1499   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1500   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1501   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1502   ee->element.size = element_size;
1503   ee->element.data = &ee[1];
1504   ee->element.element_type = ntohs (emsg->element_type);
1505   ee->remote = GNUNET_YES;
1506   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1507
1508   LOG (GNUNET_ERROR_TYPE_DEBUG,
1509        "Got element (full diff, size %u, hash %s) from peer\n",
1510        (unsigned int) element_size,
1511        GNUNET_h2s (&ee->element_hash));
1512
1513   GNUNET_STATISTICS_update (_GSS_statistics,
1514                             "# received elements",
1515                             1,
1516                             GNUNET_NO);
1517   GNUNET_STATISTICS_update (_GSS_statistics,
1518                             "# exchanged elements",
1519                             1,
1520                             GNUNET_NO);
1521
1522   op->state->received_total += 1;
1523
1524   struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1525
1526   if (NULL != ke)
1527   {
1528     /* Got repeated element.  Should not happen since
1529      * we track demands. */
1530     GNUNET_STATISTICS_update (_GSS_statistics,
1531                               "# repeated elements",
1532                               1,
1533                               GNUNET_NO);
1534     ke->received = GNUNET_YES;
1535     GNUNET_free (ee);
1536   }
1537   else
1538   {
1539     LOG (GNUNET_ERROR_TYPE_DEBUG,
1540          "Registering new element from remote peer\n");
1541     op->state->received_fresh += 1;
1542     op_register_element (op, ee, GNUNET_YES);
1543     /* only send results immediately if the client wants it */
1544     switch (op->spec->result_mode)
1545     {
1546       case GNUNET_SET_RESULT_ADDED:
1547         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1548         break;
1549       case GNUNET_SET_RESULT_SYMMETRIC:
1550         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1551         break;
1552       default:
1553         /* Result mode not supported, should have been caught earlier. */
1554         GNUNET_break (0);
1555         break;
1556     }
1557   }
1558
1559   if ( (GNUNET_YES == op->spec->byzantine) && 
1560        (op->state->received_total > 128) && 
1561        (op->state->received_fresh < op->state->received_total / 3) )
1562   {
1563     /* The other peer gave us lots of old elements, there's something wrong. */
1564     LOG (GNUNET_ERROR_TYPE_ERROR,
1565          "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1566          (unsigned long long) op->state->received_fresh,
1567          (unsigned long long) op->state->received_total);
1568     GNUNET_break_op (0);
1569     fail_union_operation (op);
1570     return;
1571   }
1572 }
1573
1574 /**
1575  * Send offers (for GNUNET_Hash-es) in response
1576  * to inquiries (for IBF_Key-s).
1577  *
1578  * @param cls the union operation
1579  * @param mh the message
1580  */
1581 static void
1582 handle_p2p_inquiry (void *cls,
1583                     const struct GNUNET_MessageHeader *mh)
1584 {
1585   struct Operation *op = cls;
1586   const struct IBF_Key *ibf_key;
1587   unsigned int num_keys;
1588   struct InquiryMessage *msg;
1589
1590   /* look up elements and send them */
1591   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1592   {
1593     GNUNET_break_op (0);
1594     fail_union_operation (op);
1595     return;
1596   }
1597   num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1598       / sizeof (struct IBF_Key);
1599   if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1600       != num_keys * sizeof (struct IBF_Key))
1601   {
1602     GNUNET_break_op (0);
1603     fail_union_operation (op);
1604     return;
1605   }
1606
1607   msg = (struct InquiryMessage *) mh;
1608
1609   ibf_key = (const struct IBF_Key *) &msg[1];
1610   while (0 != num_keys--)
1611   {
1612     struct IBF_Key unsalted_key;
1613     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1614     send_offers_for_key (op, unsalted_key);
1615     ibf_key++;
1616   }
1617 }
1618
1619
1620 /**
1621  * Iterator over hash map entries, called to
1622  * destroy the linked list of colliding ibf key entries.
1623  *
1624  * @param cls closure
1625  * @param key current key code
1626  * @param value value in the hash map
1627  * @return #GNUNET_YES if we should continue to iterate,
1628  *         #GNUNET_NO if not.
1629  */
1630 static int
1631 send_missing_elements_iter (void *cls,
1632                             uint32_t key,
1633                             void *value)
1634 {
1635   struct Operation *op = cls;
1636   struct KeyEntry *ke = value;
1637   struct GNUNET_MQ_Envelope *ev;
1638   struct GNUNET_SET_ElementMessage *emsg;
1639   struct ElementEntry *ee = ke->element;
1640
1641   if (GNUNET_YES == ke->received)
1642     return GNUNET_YES;
1643
1644   ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1645   GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1646   emsg->reserved = htons (0);
1647   emsg->element_type = htons (ee->element.element_type);
1648   GNUNET_MQ_send (op->mq, ev);
1649
1650   return GNUNET_YES;
1651 }
1652
1653
1654 /**
1655  * Handle a 
1656  *
1657  * @parem cls closure, a set union operation
1658  * @param mh the demand message
1659  */
1660 static void
1661 handle_p2p_request_full (void *cls,
1662                          const struct GNUNET_MessageHeader *mh)
1663 {
1664   struct Operation *op = cls;
1665
1666   if (PHASE_EXPECT_IBF != op->state->phase)
1667   {
1668     fail_union_operation (op);
1669     GNUNET_break_op (0);
1670     return;
1671   }
1672
1673   // FIXME: we need to check that our set is larger than the
1674   // byzantine_lower_bound by some threshold
1675   send_full_set (op);
1676 }
1677
1678
1679 /**
1680  * Handle a "full done" message.
1681  *
1682  * @parem cls closure, a set union operation
1683  * @param mh the demand message
1684  */
1685 static void
1686 handle_p2p_full_done (void *cls,
1687                       const struct GNUNET_MessageHeader *mh)
1688 {
1689   struct Operation *op = cls;
1690
1691   if (PHASE_EXPECT_IBF == op->state->phase)
1692   {
1693     struct GNUNET_MQ_Envelope *ev;
1694
1695     LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1696
1697     /* send all the elements that did not come from the remote peer */
1698     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1699                                              &send_missing_elements_iter,
1700                                              op);
1701
1702     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1703     GNUNET_MQ_send (op->mq, ev);
1704     op->state->phase = PHASE_DONE;
1705
1706     /* we now wait until the other peer shuts the tunnel down*/
1707   }
1708   else if (PHASE_FULL_SENDING == op->state->phase)
1709   {
1710     LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1711     /* We sent the full set, and got the response for that.  We're done. */
1712     op->state->phase = PHASE_DONE;
1713     send_done_and_destroy (op);
1714   }
1715   else
1716   {
1717     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1718     GNUNET_break_op (0);
1719     fail_union_operation (op);
1720     return;
1721   }
1722 }
1723
1724
1725 /**
1726  * Handle a demand by the other peer for elements based on a list
1727  * of GNUNET_HashCode-s.
1728  *
1729  * @parem cls closure, a set union operation
1730  * @param mh the demand message
1731  */
1732 static void
1733 handle_p2p_demand (void *cls,
1734                    const struct GNUNET_MessageHeader *mh)
1735 {
1736   struct Operation *op = cls;
1737   struct ElementEntry *ee;
1738   struct GNUNET_SET_ElementMessage *emsg;
1739   const struct GNUNET_HashCode *hash;
1740   unsigned int num_hashes;
1741   struct GNUNET_MQ_Envelope *ev;
1742
1743   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1744     / sizeof (struct GNUNET_HashCode);
1745   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1746       != num_hashes * sizeof (struct GNUNET_HashCode))
1747   {
1748     GNUNET_break_op (0);
1749     fail_union_operation (op);
1750     return;
1751   }
1752
1753   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1754        num_hashes > 0;
1755        hash++, num_hashes--)
1756   {
1757     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1758     if (NULL == ee)
1759     {
1760       /* Demand for non-existing element. */
1761       GNUNET_break_op (0);
1762       fail_union_operation (op);
1763       return;
1764     }
1765     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1766     {
1767       /* Probably confused lazily copied sets. */
1768       GNUNET_break_op (0);
1769       fail_union_operation (op);
1770       return;
1771     }
1772     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1773     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1774     emsg->reserved = htons (0);
1775     emsg->element_type = htons (ee->element.element_type);
1776     LOG (GNUNET_ERROR_TYPE_DEBUG,
1777          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1778          (void *) op,
1779          (unsigned int) ee->element.size,
1780          GNUNET_h2s (&ee->element_hash));
1781     GNUNET_MQ_send (op->mq, ev);
1782     GNUNET_STATISTICS_update (_GSS_statistics,
1783                               "# exchanged elements",
1784                               1,
1785                               GNUNET_NO);
1786
1787     switch (op->spec->result_mode)
1788     {
1789       case GNUNET_SET_RESULT_ADDED:
1790         /* Nothing to do. */
1791         break;
1792       case GNUNET_SET_RESULT_SYMMETRIC:
1793         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1794         break;
1795       default:
1796         /* Result mode not supported, should have been caught earlier. */
1797         GNUNET_break (0);
1798         break;
1799     }
1800   }
1801 }
1802
1803
1804 /**
1805  * Handle offers (of GNUNET_HashCode-s) and
1806  * respond with demands (of GNUNET_HashCode-s).
1807  *
1808  * @param cls the union operation
1809  * @param mh the message
1810  */
1811 static void
1812 handle_p2p_offer (void *cls,
1813                     const struct GNUNET_MessageHeader *mh)
1814 {
1815   struct Operation *op = cls;
1816   const struct GNUNET_HashCode *hash;
1817   unsigned int num_hashes;
1818
1819   /* look up elements and send them */
1820   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1821        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1822   {
1823     GNUNET_break_op (0);
1824     fail_union_operation (op);
1825     return;
1826   }
1827   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1828     / sizeof (struct GNUNET_HashCode);
1829   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1830       != num_hashes * sizeof (struct GNUNET_HashCode))
1831   {
1832     GNUNET_break_op (0);
1833     fail_union_operation (op);
1834     return;
1835   }
1836
1837   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1838        num_hashes > 0;
1839        hash++, num_hashes--)
1840   {
1841     struct ElementEntry *ee;
1842     struct GNUNET_MessageHeader *demands;
1843     struct GNUNET_MQ_Envelope *ev;
1844
1845     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1846                                             hash);
1847     if (NULL != ee)
1848       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1849         continue;
1850
1851     if (GNUNET_YES ==
1852         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1853                                                 hash))
1854     {
1855       LOG (GNUNET_ERROR_TYPE_DEBUG,
1856            "Skipped sending duplicate demand\n");
1857       continue;
1858     }
1859
1860     GNUNET_assert (GNUNET_OK ==
1861                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1862                                                       hash,
1863                                                       NULL,
1864                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1865
1866     LOG (GNUNET_ERROR_TYPE_DEBUG,
1867          "[OP %x] Requesting element (hash %s)\n",
1868          (void *) op, GNUNET_h2s (hash));
1869     ev = GNUNET_MQ_msg_header_extra (demands,
1870                                      sizeof (struct GNUNET_HashCode),
1871                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1872     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1873     GNUNET_MQ_send (op->mq, ev);
1874   }
1875 }
1876
1877
1878 /**
1879  * Handle a done message from a remote peer
1880  *
1881  * @param cls the union operation
1882  * @param mh the message
1883  */
1884 static void
1885 handle_p2p_done (void *cls,
1886                  const struct GNUNET_MessageHeader *mh)
1887 {
1888   struct Operation *op = cls;
1889
1890   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1891   {
1892     /* We got all requests, but still have to send our elements in response. */
1893
1894     op->state->phase = PHASE_FINISH_WAITING;
1895
1896     LOG (GNUNET_ERROR_TYPE_DEBUG,
1897          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1898     /* The active peer is done sending offers
1899      * and inquiries.  This means that all
1900      * our responses to that (demands and offers)
1901      * must be in flight (queued or in mesh).
1902      *
1903      * We should notify the active peer once
1904      * all our demands are satisfied, so that the active
1905      * peer can quit if we gave him everything.
1906      */
1907     maybe_finish (op);
1908     return;
1909   }
1910   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1911   {
1912     LOG (GNUNET_ERROR_TYPE_DEBUG,
1913          "got DONE (as active partner), waiting to finish\n");
1914     /* All demands of the other peer are satisfied,
1915      * and we processed all offers, thus we know
1916      * exactly what our demands must be.
1917      *
1918      * We'll close the channel
1919      * to the other peer once our demands are met.
1920      */
1921     op->state->phase = PHASE_FINISH_CLOSING;
1922     maybe_finish (op);
1923     return;
1924   }
1925   GNUNET_break_op (0);
1926   fail_union_operation (op);
1927 }
1928
1929
1930 /**
1931  * Initiate operation to evaluate a set union with a remote peer.
1932  *
1933  * @param op operation to perform (to be initialized)
1934  * @param opaque_context message to be transmitted to the listener
1935  *        to convince him to accept, may be NULL
1936  */
1937 static void
1938 union_evaluate (struct Operation *op,
1939                 const struct GNUNET_MessageHeader *opaque_context)
1940 {
1941   struct GNUNET_MQ_Envelope *ev;
1942   struct OperationRequestMessage *msg;
1943
1944   GNUNET_assert (NULL == op->state);
1945   op->state = GNUNET_new (struct OperationState);
1946   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1947   /* copy the current generation's strata estimator for this operation */
1948   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1949   /* we started the operation, thus we have to send the operation request */
1950   op->state->phase = PHASE_EXPECT_SE;
1951   op->state->salt_receive = op->state->salt_send = 42;
1952   LOG (GNUNET_ERROR_TYPE_DEBUG,
1953        "Initiating union operation evaluation\n");
1954   GNUNET_STATISTICS_update (_GSS_statistics,
1955                             "# of total union operations",
1956                             1,
1957                             GNUNET_NO);
1958   GNUNET_STATISTICS_update (_GSS_statistics,
1959                             "# of initiated union operations",
1960                             1,
1961                             GNUNET_NO);
1962   ev = GNUNET_MQ_msg_nested_mh (msg,
1963                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1964                                 opaque_context);
1965   if (NULL == ev)
1966   {
1967     /* the context message is too large */
1968     GNUNET_break (0);
1969     GNUNET_SERVICE_client_drop (op->spec->set->client);
1970     return;
1971   }
1972   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1973   GNUNET_MQ_send (op->mq,
1974                   ev);
1975
1976   if (NULL != opaque_context)
1977     LOG (GNUNET_ERROR_TYPE_DEBUG,
1978          "sent op request with context message\n");
1979   else
1980     LOG (GNUNET_ERROR_TYPE_DEBUG,
1981          "sent op request without context message\n");
1982
1983   initialize_key_to_element (op);
1984   op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
1985 }
1986
1987
1988 /**
1989  * Accept an union operation request from a remote peer.
1990  * Only initializes the private operation state.
1991  *
1992  * @param op operation that will be accepted as a union operation
1993  */
1994 static void
1995 union_accept (struct Operation *op)
1996 {
1997   LOG (GNUNET_ERROR_TYPE_DEBUG,
1998        "accepting set union operation\n");
1999   GNUNET_assert (NULL == op->state);
2000
2001   GNUNET_STATISTICS_update (_GSS_statistics,
2002                             "# of accepted union operations",
2003                             1,
2004                             GNUNET_NO);
2005   GNUNET_STATISTICS_update (_GSS_statistics,
2006                             "# of total union operations",
2007                             1,
2008                             GNUNET_NO);
2009
2010   op->state = GNUNET_new (struct OperationState);
2011   op->state->se = strata_estimator_dup (op->spec->set->state->se);
2012   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2013   op->state->salt_receive = op->state->salt_send = 42;
2014   initialize_key_to_element (op);
2015   op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2016   /* kick off the operation */
2017   send_strata_estimator (op);
2018 }
2019
2020
2021 /**
2022  * Create a new set supporting the union operation
2023  *
2024  * We maintain one strata estimator per set and then manipulate it over the
2025  * lifetime of the set, as recreating a strata estimator would be expensive.
2026  *
2027  * @return the newly created set, NULL on error
2028  */
2029 static struct SetState *
2030 union_set_create (void)
2031 {
2032   struct SetState *set_state;
2033
2034   LOG (GNUNET_ERROR_TYPE_DEBUG,
2035        "union set created\n");
2036   set_state = GNUNET_new (struct SetState);
2037   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2038                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2039   if (NULL == set_state->se)
2040   {
2041     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2042                 "Failed to allocate strata estimator\n");
2043     GNUNET_free (set_state);
2044     return NULL;
2045   }
2046   return set_state;
2047 }
2048
2049
2050 /**
2051  * Add the element from the given element message to the set.
2052  *
2053  * @param set_state state of the set want to add to
2054  * @param ee the element to add to the set
2055  */
2056 static void
2057 union_add (struct SetState *set_state, struct ElementEntry *ee)
2058 {
2059   strata_estimator_insert (set_state->se,
2060                            get_ibf_key (&ee->element_hash));
2061 }
2062
2063
2064 /**
2065  * Remove the element given in the element message from the set.
2066  * Only marks the element as removed, so that older set operations can still exchange it.
2067  *
2068  * @param set_state state of the set to remove from
2069  * @param ee set element to remove
2070  */
2071 static void
2072 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2073 {
2074   strata_estimator_remove (set_state->se,
2075                            get_ibf_key (&ee->element_hash));
2076 }
2077
2078
2079 /**
2080  * Destroy a set that supports the union operation.
2081  *
2082  * @param set_state the set to destroy
2083  */
2084 static void
2085 union_set_destroy (struct SetState *set_state)
2086 {
2087   if (NULL != set_state->se)
2088   {
2089     strata_estimator_destroy (set_state->se);
2090     set_state->se = NULL;
2091   }
2092   GNUNET_free (set_state);
2093 }
2094
2095
2096 /**
2097  * Dispatch messages for a union operation.
2098  *
2099  * @param op the state of the union evaluate operation
2100  * @param mh the received message
2101  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2102  *         #GNUNET_OK otherwise
2103  */
2104 int
2105 union_handle_p2p_message (struct Operation *op,
2106                           const struct GNUNET_MessageHeader *mh)
2107 {
2108   //LOG (GNUNET_ERROR_TYPE_DEBUG,
2109   //            "received p2p message (t: %u, s: %u)\n",
2110   //            ntohs (mh->type),
2111   //            ntohs (mh->size));
2112   switch (ntohs (mh->type))
2113   {
2114     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2115       return handle_p2p_ibf (op, mh);
2116     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2117       return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2118     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2119       return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2120     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2121       handle_p2p_elements (op, mh);
2122       break;
2123     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2124       handle_p2p_full_element (op, mh);
2125       break;
2126     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2127       handle_p2p_inquiry (op, mh);
2128       break;
2129     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2130       handle_p2p_done (op, mh);
2131       break;
2132     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2133       handle_p2p_offer (op, mh);
2134       break;
2135     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2136       handle_p2p_demand (op, mh);
2137       break;
2138     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2139       handle_p2p_full_done (op, mh);
2140       break;
2141     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2142       handle_p2p_request_full (op, mh);
2143       break;
2144     default:
2145       /* Something wrong with cadet's message handlers? */
2146       GNUNET_assert (0);
2147   }
2148   return GNUNET_OK;
2149 }
2150
2151
2152 /**
2153  * Handler for peer-disconnects, notifies the client
2154  * about the aborted operation in case the op was not concluded.
2155  *
2156  * @param op the destroyed operation
2157  */
2158 static void
2159 union_peer_disconnect (struct Operation *op)
2160 {
2161   if (PHASE_DONE != op->state->phase)
2162   {
2163     struct GNUNET_MQ_Envelope *ev;
2164     struct GNUNET_SET_ResultMessage *msg;
2165
2166     ev = GNUNET_MQ_msg (msg,
2167                         GNUNET_MESSAGE_TYPE_SET_RESULT);
2168     msg->request_id = htonl (op->spec->client_request_id);
2169     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2170     msg->element_type = htons (0);
2171     GNUNET_MQ_send (op->spec->set->client_mq,
2172                     ev);
2173     LOG (GNUNET_ERROR_TYPE_WARNING,
2174          "other peer disconnected prematurely, phase %u\n",
2175          op->state->phase);
2176     _GSS_operation_destroy (op,
2177                             GNUNET_YES);
2178     return;
2179   }
2180   // else: the session has already been concluded
2181   LOG (GNUNET_ERROR_TYPE_DEBUG,
2182        "other peer disconnected (finished)\n");
2183   if (GNUNET_NO == op->state->client_done_sent)
2184     send_done_and_destroy (op);
2185 }
2186
2187
2188 /**
2189  * Copy union-specific set state.
2190  *
2191  * @param set source set for copying the union state
2192  * @return a copy of the union-specific set state
2193  */
2194 static struct SetState *
2195 union_copy_state (struct Set *set)
2196 {
2197   struct SetState *new_state;
2198
2199   new_state = GNUNET_new (struct SetState);
2200   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2201   new_state->se = strata_estimator_dup (set->state->se);
2202
2203   return new_state;
2204 }
2205
2206
2207 /**
2208  * Get the table with implementing functions for
2209  * set union.
2210  *
2211  * @return the operation specific VTable
2212  */
2213 const struct SetVT *
2214 _GSS_union_vt ()
2215 {
2216   static const struct SetVT union_vt = {
2217     .create = &union_set_create,
2218     .msg_handler = &union_handle_p2p_message,
2219     .add = &union_add,
2220     .remove = &union_remove,
2221     .destroy_set = &union_set_destroy,
2222     .evaluate = &union_evaluate,
2223     .accept = &union_accept,
2224     .peer_disconnect = &union_peer_disconnect,
2225     .cancel = &union_op_cancel,
2226     .copy_state = &union_copy_state,
2227   };
2228
2229   return &union_vt;
2230 }