fix type error and test cases
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2016 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
33 #include <gcrypt.h>
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
37
38
39 /**
40  * Number of IBFs in a strata estimator.
41  */
42 #define SE_STRATA_COUNT 32
43
44 /**
45  * Size of the IBFs in the strata estimator.
46  */
47 #define SE_IBF_SIZE 80
48
49 /**
50  * The hash num parameter for the difference digests and strata estimators.
51  */
52 #define SE_IBF_HASH_NUM 4
53
54 /**
55  * Number of buckets that can be transmitted in one message.
56  */
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
58
59 /**
60  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61  * Choose this value so that computing the IBF is still cheaper
62  * than transmitting all values.
63  */
64 #define MAX_IBF_ORDER (20)
65
66 /**
67  * Number of buckets used in the ibf per estimated
68  * difference.
69  */
70 #define IBF_ALPHA 4
71
72
73 /**
74  * Current phase we are in for a union operation.
75  */
76 enum UnionOperationPhase
77 {
78   /**
79    * We sent the request message, and expect a strata estimator.
80    */
81   PHASE_EXPECT_SE,
82
83   /**
84    * We sent the strata estimator, and expect an IBF. This phase is entered once
85    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86    *
87    * XXX: could use better wording.
88    * XXX: repurposed to also expect a "request full set" message, should be renamed
89    *
90    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
91    */
92   PHASE_EXPECT_IBF,
93
94   /**
95    * Continuation for multi part IBFs.
96    */
97   PHASE_EXPECT_IBF_CONT,
98
99   /**
100    * We are decoding an IBF.
101    */
102   PHASE_INVENTORY_ACTIVE,
103
104   /**
105    * The other peer is decoding the IBF we just sent.
106    */
107   PHASE_INVENTORY_PASSIVE,
108
109   /**
110    * The protocol is almost finished, but we still have to flush our message
111    * queue and/or expect some elements.
112    */
113   PHASE_FINISH_CLOSING,
114
115   /**
116    * In the penultimate phase,
117    * we wait until all our demands
118    * are satisfied.  Then we send a done
119    * message, and wait for another done message.
120    */
121   PHASE_FINISH_WAITING,
122
123   /**
124    * In the ultimate phase, we wait until
125    * our demands are satisfied and then
126    * quit (sending another DONE message).
127    */
128   PHASE_DONE,
129
130   /**
131    * After sending the full set, wait for responses with the elements
132    * that the local peer is missing.
133    */
134   PHASE_FULL_SENDING,
135 };
136
137
138 /**
139  * State of an evaluate operation with another peer.
140  */
141 struct OperationState
142 {
143   /**
144    * Copy of the set's strata estimator at the time of
145    * creation of this operation.
146    */
147   struct StrataEstimator *se;
148
149   /**
150    * The IBF we currently receive.
151    */
152   struct InvertibleBloomFilter *remote_ibf;
153
154   /**
155    * The IBF with the local set's element.
156    */
157   struct InvertibleBloomFilter *local_ibf;
158
159   /**
160    * Maps unsalted IBF-Keys to elements.
161    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
162    * Colliding IBF-Keys are linked.
163    */
164   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
165
166   /**
167    * Current state of the operation.
168    */
169   enum UnionOperationPhase phase;
170
171   /**
172    * Did we send the client that we are done?
173    */
174   int client_done_sent;
175
176   /**
177    * Number of ibf buckets already received into the @a remote_ibf.
178    */
179   unsigned int ibf_buckets_received;
180
181   /**
182    * Hashes for elements that we have demanded from the other peer.
183    */
184   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
185
186   /**
187    * Salt that we're using for sending IBFs
188    */
189   uint32_t salt_send;
190
191   /**
192    * Salt for the IBF we've received and that we're currently decoding.
193    */
194   uint32_t salt_receive;
195
196   /**
197    * Number of elements we received from the other peer
198    * that were not in the local set yet.
199    */
200   uint32_t received_fresh;
201
202   /**
203    * Total number of elements received from the other peer.
204    */
205   uint32_t received_total;
206
207   /**
208    * Initial size of our set, just before
209    * the operation started.
210    */
211   uint64_t initial_size;
212 };
213
214
215 /**
216  * The key entry is used to associate an ibf key with an element.
217  */
218 struct KeyEntry
219 {
220   /**
221    * IBF key for the entry, derived from the current salt.
222    */
223   struct IBF_Key ibf_key;
224
225   /**
226    * The actual element associated with the key.
227    *
228    * Only owned by the union operation if element->operation
229    * is #GNUNET_YES.
230    */
231   struct ElementEntry *element;
232
233   /**
234    * Did we receive this element?
235    * Even if element->is_foreign is false, we might
236    * have received the element, so this indicates that
237    * the other peer has it.
238    */
239   int received;
240 };
241
242
243 /**
244  * Used as a closure for sending elements
245  * with a specific IBF key.
246  */
247 struct SendElementClosure
248 {
249   /**
250    * The IBF key whose matching elements should be
251    * sent.
252    */
253   struct IBF_Key ibf_key;
254
255   /**
256    * Operation for which the elements
257    * should be sent.
258    */
259   struct Operation *op;
260 };
261
262
263 /**
264  * Extra state required for efficient set union.
265  */
266 struct SetState
267 {
268   /**
269    * The strata estimator is only generated once for
270    * each set.
271    * The IBF keys are derived from the element hashes with
272    * salt=0.
273    */
274   struct StrataEstimator *se;
275 };
276
277
278 /**
279  * Iterator over hash map entries, called to
280  * destroy the linked list of colliding ibf key entries.
281  *
282  * @param cls closure
283  * @param key current key code
284  * @param value value in the hash map
285  * @return #GNUNET_YES if we should continue to iterate,
286  *         #GNUNET_NO if not.
287  */
288 static int
289 destroy_key_to_element_iter (void *cls,
290                              uint32_t key,
291                              void *value)
292 {
293   struct KeyEntry *k = value;
294
295   GNUNET_assert (NULL != k);
296   if (GNUNET_YES == k->element->remote)
297   {
298     GNUNET_free (k->element);
299     k->element = NULL;
300   }
301   GNUNET_free (k);
302   return GNUNET_YES;
303 }
304
305
306 /**
307  * Destroy the union operation.  Only things specific to the union
308  * operation are destroyed.
309  *
310  * @param op union operation to destroy
311  */
312 static void
313 union_op_cancel (struct Operation *op)
314 {
315   LOG (GNUNET_ERROR_TYPE_DEBUG,
316        "destroying union op\n");
317   /* check if the op was canceled twice */
318   GNUNET_assert (NULL != op->state);
319   if (NULL != op->state->remote_ibf)
320   {
321     ibf_destroy (op->state->remote_ibf);
322     op->state->remote_ibf = NULL;
323   }
324   if (NULL != op->state->demanded_hashes)
325   {
326     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
327     op->state->demanded_hashes = NULL;
328   }
329   if (NULL != op->state->local_ibf)
330   {
331     ibf_destroy (op->state->local_ibf);
332     op->state->local_ibf = NULL;
333   }
334   if (NULL != op->state->se)
335   {
336     strata_estimator_destroy (op->state->se);
337     op->state->se = NULL;
338   }
339   if (NULL != op->state->key_to_element)
340   {
341     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
342                                              &destroy_key_to_element_iter,
343                                              NULL);
344     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
345     op->state->key_to_element = NULL;
346   }
347   GNUNET_free (op->state);
348   op->state = NULL;
349   LOG (GNUNET_ERROR_TYPE_DEBUG,
350        "destroying union op done\n");
351 }
352
353
354 /**
355  * Inform the client that the union operation has failed,
356  * and proceed to destroy the evaluate operation.
357  *
358  * @param op the union operation to fail
359  */
360 static void
361 fail_union_operation (struct Operation *op)
362 {
363   struct GNUNET_MQ_Envelope *ev;
364   struct GNUNET_SET_ResultMessage *msg;
365
366   LOG (GNUNET_ERROR_TYPE_ERROR,
367        "union operation failed\n");
368   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
369   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
370   msg->request_id = htonl (op->spec->client_request_id);
371   msg->element_type = htons (0);
372   GNUNET_MQ_send (op->spec->set->client_mq, ev);
373   _GSS_operation_destroy (op, GNUNET_YES);
374 }
375
376
377 /**
378  * Derive the IBF key from a hash code and
379  * a salt.
380  *
381  * @param src the hash code
382  * @return the derived IBF key
383  */
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
386 {
387   struct IBF_Key key;
388   uint16_t salt = 0;
389
390   GNUNET_CRYPTO_kdf (&key, sizeof (key),
391                      src, sizeof *src,
392                      &salt, sizeof (salt),
393                      NULL, 0);
394   return key;
395 }
396
397
398 /**
399  * Context for #op_get_element_iterator
400  */
401 struct GetElementContext
402 {
403   struct GNUNET_HashCode hash;
404   struct KeyEntry *k;
405 };
406
407
408 /**
409  * Iterator over the mapping from IBF keys to element entries.  Checks if we
410  * have an element with a given GNUNET_HashCode.
411  *
412  * @param cls closure
413  * @param key current key code
414  * @param value value in the hash map
415  * @return #GNUNET_YES if we should search further,
416  *         #GNUNET_NO if we've found the element.
417  */
418 static int
419 op_get_element_iterator (void *cls,
420                          uint32_t key,
421                          void *value)
422 {
423   struct GetElementContext *ctx = cls;
424   struct KeyEntry *k = value;
425
426   GNUNET_assert (NULL != k);
427   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
428                                    &ctx->hash))
429   {
430     ctx->k = k;
431     return GNUNET_NO;
432   }
433   return GNUNET_YES;
434 }
435
436
437 /**
438  * Determine whether the given element is already in the operation's element
439  * set.
440  *
441  * @param op operation that should be tested for 'element_hash'
442  * @param element_hash hash of the element to look for
443  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
444  */
445 static struct KeyEntry *
446 op_get_element (struct Operation *op,
447                 const struct GNUNET_HashCode *element_hash)
448 {
449   int ret;
450   struct IBF_Key ibf_key;
451   struct GetElementContext ctx = {{{ 0 }} , 0};
452
453   ctx.hash = *element_hash;
454
455   ibf_key = get_ibf_key (element_hash);
456   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
457                                                       (uint32_t) ibf_key.key_val,
458                                                       op_get_element_iterator,
459                                                       &ctx);
460
461   /* was the iteration aborted because we found the element? */
462   if (GNUNET_SYSERR == ret)
463   {
464     GNUNET_assert (NULL != ctx.k);
465     return ctx.k;
466   }
467   return NULL;
468 }
469
470
471 /**
472  * Insert an element into the union operation's
473  * key-to-element mapping. Takes ownership of 'ee'.
474  * Note that this does not insert the element in the set,
475  * only in the operation's key-element mapping.
476  * This is done to speed up re-tried operations, if some elements
477  * were transmitted, and then the IBF fails to decode.
478  *
479  * XXX: clarify ownership, doesn't sound right.
480  *
481  * @param op the union operation
482  * @param ee the element entry
483  * @parem received was this element received from the remote peer?
484  */
485 static void
486 op_register_element (struct Operation *op,
487                      struct ElementEntry *ee,
488                      int received)
489 {
490   struct IBF_Key ibf_key;
491   struct KeyEntry *k;
492
493   ibf_key = get_ibf_key (&ee->element_hash);
494   k = GNUNET_new (struct KeyEntry);
495   k->element = ee;
496   k->ibf_key = ibf_key;
497   k->received = received;
498   GNUNET_assert (GNUNET_OK ==
499                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
500                                                       (uint32_t) ibf_key.key_val,
501                                                       k,
502                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
503 }
504
505
506 static void
507 salt_key (const struct IBF_Key *k_in,
508           uint32_t salt,
509           struct IBF_Key *k_out)
510 {
511   int s = salt % 64;
512   uint64_t x = k_in->key_val;
513   /* rotate ibf key */
514   x = (x >> s) | (x << (64 - s));
515   k_out->key_val = x;
516 }
517
518
519 static void
520 unsalt_key (const struct IBF_Key *k_in,
521             uint32_t salt,
522             struct IBF_Key *k_out)
523 {
524   int s = salt % 64;
525   uint64_t x = k_in->key_val;
526   x = (x << s) | (x >> (64 - s));
527   k_out->key_val = x;
528 }
529
530
531 /**
532  * Insert a key into an ibf.
533  *
534  * @param cls the ibf
535  * @param key unused
536  * @param value the key entry to get the key from
537  */
538 static int
539 prepare_ibf_iterator (void *cls,
540                       uint32_t key,
541                       void *value)
542 {
543   struct Operation *op = cls;
544   struct KeyEntry *ke = value;
545   struct IBF_Key salted_key;
546
547   LOG (GNUNET_ERROR_TYPE_DEBUG,
548        "[OP %x] inserting %lx (hash %s) into ibf\n",
549        (void *) op,
550        (unsigned long) ke->ibf_key.key_val,
551        GNUNET_h2s (&ke->element->element_hash));
552   salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
553   ibf_insert (op->state->local_ibf, salted_key);
554   return GNUNET_YES;
555 }
556
557
558 /**
559  * Iterator for initializing the
560  * key-to-element mapping of a union operation
561  *
562  * @param cls the union operation `struct Operation *`
563  * @param key unused
564  * @param value the `struct ElementEntry *` to insert
565  *        into the key-to-element mapping
566  * @return #GNUNET_YES (to continue iterating)
567  */
568 static int
569 init_key_to_element_iterator (void *cls,
570                               const struct GNUNET_HashCode *key,
571                               void *value)
572 {
573   struct Operation *op = cls;
574   struct ElementEntry *ee = value;
575
576   /* make sure that the element belongs to the set at the time
577    * of creating the operation */
578   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
579     return GNUNET_YES;
580
581   GNUNET_assert (GNUNET_NO == ee->remote);
582
583   op_register_element (op, ee, GNUNET_NO);
584   return GNUNET_YES;
585 }
586
587
588 /**
589  * Initialize the IBF key to element mapping local to this set
590  * operation.
591  *
592  * @param op the set union operation
593  */
594 static void
595 initialize_key_to_element (struct Operation *op)
596 {
597   unsigned int len;
598
599   GNUNET_assert (NULL == op->state->key_to_element);
600   len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
603 }
604
605
606 /**
607  * Create an ibf with the operation's elements
608  * of the specified size
609  *
610  * @param op the union operation
611  * @param size size of the ibf to create
612  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
613  */
614 static int
615 prepare_ibf (struct Operation *op,
616              uint32_t size)
617 {
618   GNUNET_assert (NULL != op->state->key_to_element);
619
620   if (NULL != op->state->local_ibf)
621     ibf_destroy (op->state->local_ibf);
622   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
623   if (NULL == op->state->local_ibf)
624   {
625     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
626                 "Failed to allocate local IBF\n");
627     return GNUNET_SYSERR;
628   }
629   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
630                                            &prepare_ibf_iterator,
631                                            op);
632   return GNUNET_OK;
633 }
634
635
636 /**
637  * Send an ibf of appropriate size.
638  *
639  * Fragments the IBF into multiple messages if necessary.
640  *
641  * @param op the union operation
642  * @param ibf_order order of the ibf to send, size=2^order
643  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
644  */
645 static int
646 send_ibf (struct Operation *op,
647           uint16_t ibf_order)
648 {
649   unsigned int buckets_sent = 0;
650   struct InvertibleBloomFilter *ibf;
651
652   if (GNUNET_OK !=
653       prepare_ibf (op, 1<<ibf_order))
654   {
655     /* allocation failed */
656     return GNUNET_SYSERR;
657   }
658
659   LOG (GNUNET_ERROR_TYPE_DEBUG,
660        "sending ibf of size %u\n",
661        1<<ibf_order);
662
663   {
664     char name[64] = { 0 };
665     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
666     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
667   }
668
669   ibf = op->state->local_ibf;
670
671   while (buckets_sent < (1 << ibf_order))
672   {
673     unsigned int buckets_in_message;
674     struct GNUNET_MQ_Envelope *ev;
675     struct IBFMessage *msg;
676
677     buckets_in_message = (1 << ibf_order) - buckets_sent;
678     /* limit to maximum */
679     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
680       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
681
682     ev = GNUNET_MQ_msg_extra (msg,
683                               buckets_in_message * IBF_BUCKET_SIZE,
684                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
685     msg->reserved1 = 0;
686     msg->reserved2 = 0;
687     msg->order = ibf_order;
688     msg->offset = htonl (buckets_sent);
689     msg->salt = htonl (op->state->salt_send);
690     ibf_write_slice (ibf, buckets_sent,
691                      buckets_in_message, &msg[1]);
692     buckets_sent += buckets_in_message;
693     LOG (GNUNET_ERROR_TYPE_DEBUG,
694          "ibf chunk size %u, %u/%u sent\n",
695          buckets_in_message,
696          buckets_sent,
697          1<<ibf_order);
698     GNUNET_MQ_send (op->mq, ev);
699   }
700
701   /* The other peer must decode the IBF, so
702    * we're passive. */
703   op->state->phase = PHASE_INVENTORY_PASSIVE;
704   return GNUNET_OK;
705 }
706
707
708 /**
709  * Send a strata estimator to the remote peer.
710  *
711  * @param op the union operation with the remote peer
712  */
713 static void
714 send_strata_estimator (struct Operation *op)
715 {
716   const struct StrataEstimator *se = op->state->se;
717   struct GNUNET_MQ_Envelope *ev;
718   struct StrataEstimatorMessage *strata_msg;
719   char *buf;
720   size_t len;
721   uint16_t type;
722
723   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
724   len = strata_estimator_write (op->state->se,
725                                 buf);
726   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
727     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
728   else
729     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
730   ev = GNUNET_MQ_msg_extra (strata_msg,
731                             len,
732                             type);
733   GNUNET_memcpy (&strata_msg[1],
734           buf,
735           len);
736   GNUNET_free (buf);
737   strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
738   GNUNET_MQ_send (op->mq,
739                   ev);
740   op->state->phase = PHASE_EXPECT_IBF;
741   LOG (GNUNET_ERROR_TYPE_DEBUG,
742        "sent SE, expecting IBF\n");
743 }
744
745
746 /**
747  * Compute the necessary order of an ibf
748  * from the size of the symmetric set difference.
749  *
750  * @param diff the difference
751  * @return the required size of the ibf
752  */
753 static unsigned int
754 get_order_from_difference (unsigned int diff)
755 {
756   unsigned int ibf_order;
757
758   ibf_order = 2;
759   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
760           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
761     ibf_order++;
762   if (ibf_order > MAX_IBF_ORDER)
763     ibf_order = MAX_IBF_ORDER;
764   return ibf_order;
765 }
766
767
768 /**
769  * Send a set element.
770  *
771  * @param cls the union operation `struct Operation *`
772  * @param key unused
773  * @param value the `struct ElementEntry *` to insert
774  *        into the key-to-element mapping
775  * @return #GNUNET_YES (to continue iterating)
776  */
777 static int
778 send_element_iterator (void *cls,
779                        const struct GNUNET_HashCode *key,
780                        void *value)
781 {
782   struct Operation *op = cls;
783   struct GNUNET_SET_ElementMessage *emsg;
784   struct ElementEntry *ee = value;
785   struct GNUNET_SET_Element *el = &ee->element;
786   struct GNUNET_MQ_Envelope *ev;
787
788   ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
789   emsg->element_type = htonl (el->element_type);
790   GNUNET_memcpy (&emsg[1], el->data, el->size);
791   GNUNET_MQ_send (op->mq, ev);
792   return GNUNET_YES;
793 }
794
795
796 static void
797 send_full_set (struct Operation *op)
798 {
799   struct GNUNET_MQ_Envelope *ev;
800
801   op->state->phase = PHASE_FULL_SENDING;
802
803   (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
804                                                 &send_element_iterator, op);
805   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
806   GNUNET_MQ_send (op->mq, ev);
807 }
808
809
810 /**
811  * Handle a strata estimator from a remote peer
812  *
813  * @param cls the union operation
814  * @param mh the message
815  * @param is_compressed #GNUNET_YES if the estimator is compressed
816  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
817  *         #GNUNET_OK otherwise
818  */
819 static int
820 handle_p2p_strata_estimator (void *cls,
821                              const struct GNUNET_MessageHeader *mh,
822                              int is_compressed)
823 {
824   struct Operation *op = cls;
825   struct StrataEstimator *remote_se;
826   struct StrataEstimatorMessage *msg = (void *) mh;
827   unsigned int diff;
828   uint64_t other_size;
829   size_t len;
830
831   GNUNET_STATISTICS_update (_GSS_statistics,
832                             "# bytes of SE received",
833                             ntohs (mh->size),
834                             GNUNET_NO);
835
836   if (op->state->phase != PHASE_EXPECT_SE)
837   {
838     GNUNET_break (0);
839     fail_union_operation (op);
840     return GNUNET_SYSERR;
841   }
842   len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
843   if ( (GNUNET_NO == is_compressed) &&
844        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
845   {
846     fail_union_operation (op);
847     GNUNET_break (0);
848     return GNUNET_SYSERR;
849   }
850   other_size = GNUNET_ntohll (msg->set_size);
851   remote_se = strata_estimator_create (SE_STRATA_COUNT,
852                                        SE_IBF_SIZE,
853                                        SE_IBF_HASH_NUM);
854   if (NULL == remote_se)
855   {
856     /* insufficient resources, fail */
857     fail_union_operation (op);
858     return GNUNET_SYSERR;
859   }
860   if (GNUNET_OK !=
861       strata_estimator_read (&msg[1],
862                              len,
863                              is_compressed,
864                              remote_se))
865   {
866     /* decompression failed */
867     fail_union_operation (op);
868     strata_estimator_destroy (remote_se);
869     return GNUNET_SYSERR;
870   }
871   GNUNET_assert (NULL != op->state->se);
872   diff = strata_estimator_difference (remote_se,
873                                       op->state->se);
874   strata_estimator_destroy (remote_se);
875   strata_estimator_destroy (op->state->se);
876   op->state->se = NULL;
877   LOG (GNUNET_ERROR_TYPE_DEBUG,
878        "got se diff=%d, using ibf size %d\n",
879        diff,
880        1<<get_order_from_difference (diff));
881
882   if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
883   {
884     GNUNET_break (0);
885     fail_union_operation (op);
886     return GNUNET_SYSERR;
887   }
888
889
890   if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2))
891   {
892     LOG (GNUNET_ERROR_TYPE_INFO,
893          "Sending full set (diff=%d, own set=%u)\n",
894          diff,
895          op->state->initial_size);
896     if (op->state->initial_size <= other_size)
897     {
898       send_full_set (op);
899     }
900     else
901     {
902       struct GNUNET_MQ_Envelope *ev;
903       op->state->phase = PHASE_EXPECT_IBF;
904       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
905       GNUNET_MQ_send (op->mq, ev);
906     }
907   }
908   else
909   {
910     if (GNUNET_OK !=
911         send_ibf (op,
912                   get_order_from_difference (diff)))
913     {
914       /* Internal error, best we can do is shut the connection */
915       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
916                   "Failed to send IBF, closing connection\n");
917       fail_union_operation (op);
918       return GNUNET_SYSERR;
919     }
920   }
921
922   return GNUNET_OK;
923 }
924
925
926 /**
927  * Iterator to send elements to a remote peer
928  *
929  * @param cls closure with the element key and the union operation
930  * @param key ignored
931  * @param value the key entry
932  */
933 static int
934 send_offers_iterator (void *cls,
935                       uint32_t key,
936                       void *value)
937 {
938   struct SendElementClosure *sec = cls;
939   struct Operation *op = sec->op;
940   struct KeyEntry *ke = value;
941   struct GNUNET_MQ_Envelope *ev;
942   struct GNUNET_MessageHeader *mh;
943
944   /* Detect 32-bit key collision for the 64-bit IBF keys. */
945   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
946     return GNUNET_YES;
947
948   ev = GNUNET_MQ_msg_header_extra (mh,
949                                    sizeof (struct GNUNET_HashCode),
950                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
951
952   GNUNET_assert (NULL != ev);
953   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
954   LOG (GNUNET_ERROR_TYPE_DEBUG,
955        "[OP %x] sending element offer (%s) to peer\n",
956        (void *) op,
957        GNUNET_h2s (&ke->element->element_hash));
958   GNUNET_MQ_send (op->mq, ev);
959   return GNUNET_YES;
960 }
961
962
963 /**
964  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
965  *
966  * @param op union operation
967  * @param ibf_key IBF key of interest
968  */
969 static void
970 send_offers_for_key (struct Operation *op,
971                      struct IBF_Key ibf_key)
972 {
973   struct SendElementClosure send_cls;
974
975   send_cls.ibf_key = ibf_key;
976   send_cls.op = op;
977   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
978                                                        (uint32_t) ibf_key.key_val,
979                                                        &send_offers_iterator,
980                                                        &send_cls);
981 }
982
983
984 /**
985  * Decode which elements are missing on each side, and
986  * send the appropriate offers and inquiries.
987  *
988  * @param op union operation
989  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
990  */
991 static int
992 decode_and_send (struct Operation *op)
993 {
994   struct IBF_Key key;
995   struct IBF_Key last_key;
996   int side;
997   unsigned int num_decoded;
998   struct InvertibleBloomFilter *diff_ibf;
999
1000   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1001
1002   if (GNUNET_OK !=
1003       prepare_ibf (op, op->state->remote_ibf->size))
1004   {
1005     GNUNET_break (0);
1006     /* allocation failed */
1007     return GNUNET_SYSERR;
1008   }
1009   diff_ibf = ibf_dup (op->state->local_ibf);
1010   ibf_subtract (diff_ibf, op->state->remote_ibf);
1011
1012   ibf_destroy (op->state->remote_ibf);
1013   op->state->remote_ibf = NULL;
1014
1015   LOG (GNUNET_ERROR_TYPE_DEBUG,
1016        "decoding IBF (size=%u)\n",
1017        diff_ibf->size);
1018
1019   num_decoded = 0;
1020   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1021
1022   while (1)
1023   {
1024     int res;
1025     int cycle_detected = GNUNET_NO;
1026
1027     last_key = key;
1028
1029     res = ibf_decode (diff_ibf, &side, &key);
1030     if (res == GNUNET_OK)
1031     {
1032       LOG (GNUNET_ERROR_TYPE_DEBUG,
1033            "decoded ibf key %lx\n",
1034            (unsigned long) key.key_val);
1035       num_decoded += 1;
1036       if ( (num_decoded > diff_ibf->size) ||
1037            ( (num_decoded > 1) &&
1038              (last_key.key_val == key.key_val) ) )
1039       {
1040         LOG (GNUNET_ERROR_TYPE_DEBUG,
1041              "detected cyclic ibf (decoded %u/%u)\n",
1042              num_decoded,
1043              diff_ibf->size);
1044         cycle_detected = GNUNET_YES;
1045       }
1046     }
1047     if ( (GNUNET_SYSERR == res) ||
1048          (GNUNET_YES == cycle_detected) )
1049     {
1050       int next_order;
1051       next_order = 0;
1052       while (1<<next_order < diff_ibf->size)
1053         next_order++;
1054       next_order++;
1055       if (next_order <= MAX_IBF_ORDER)
1056       {
1057         LOG (GNUNET_ERROR_TYPE_DEBUG,
1058              "decoding failed, sending larger ibf (size %u)\n",
1059              1<<next_order);
1060         GNUNET_STATISTICS_update (_GSS_statistics,
1061                                   "# of IBF retries",
1062                                   1,
1063                                   GNUNET_NO);
1064         op->state->salt_send++;
1065         if (GNUNET_OK !=
1066             send_ibf (op, next_order))
1067         {
1068           /* Internal error, best we can do is shut the connection */
1069           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1070                       "Failed to send IBF, closing connection\n");
1071           fail_union_operation (op);
1072           ibf_destroy (diff_ibf);
1073           return GNUNET_SYSERR;
1074         }
1075       }
1076       else
1077       {
1078         GNUNET_STATISTICS_update (_GSS_statistics,
1079                                   "# of failed union operations (too large)",
1080                                   1,
1081                                   GNUNET_NO);
1082         // XXX: Send the whole set, element-by-element
1083         LOG (GNUNET_ERROR_TYPE_ERROR,
1084              "set union failed: reached ibf limit\n");
1085         fail_union_operation (op);
1086         ibf_destroy (diff_ibf);
1087         return GNUNET_SYSERR;
1088       }
1089       break;
1090     }
1091     if (GNUNET_NO == res)
1092     {
1093       struct GNUNET_MQ_Envelope *ev;
1094
1095       LOG (GNUNET_ERROR_TYPE_DEBUG,
1096            "transmitted all values, sending DONE\n");
1097       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1098       GNUNET_MQ_send (op->mq, ev);
1099       /* We now wait until we get a DONE message back
1100        * and then wait for our MQ to be flushed and all our
1101        * demands be delivered. */
1102       break;
1103     }
1104     if (1 == side)
1105     {
1106       struct IBF_Key unsalted_key;
1107       unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1108       send_offers_for_key (op, unsalted_key);
1109     }
1110     else if (-1 == side)
1111     {
1112       struct GNUNET_MQ_Envelope *ev;
1113       struct InquiryMessage *msg;
1114
1115       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1116        * the effort additional complexity. */
1117       ev = GNUNET_MQ_msg_extra (msg,
1118                                 sizeof (struct IBF_Key),
1119                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1120       msg->salt = htonl (op->state->salt_receive);
1121       GNUNET_memcpy (&msg[1],
1122               &key,
1123               sizeof (struct IBF_Key));
1124       LOG (GNUNET_ERROR_TYPE_DEBUG,
1125            "sending element inquiry for IBF key %lx\n",
1126            (unsigned long) key.key_val);
1127       GNUNET_MQ_send (op->mq, ev);
1128     }
1129     else
1130     {
1131       GNUNET_assert (0);
1132     }
1133   }
1134   ibf_destroy (diff_ibf);
1135   return GNUNET_OK;
1136 }
1137
1138
1139 /**
1140  * Handle an IBF message from a remote peer.
1141  *
1142  * Reassemble the IBF from multiple pieces, and
1143  * process the whole IBF once possible.
1144  *
1145  * @param cls the union operation
1146  * @param mh the header of the message
1147  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1148  *         #GNUNET_OK otherwise
1149  */
1150 static int
1151 handle_p2p_ibf (void *cls,
1152                 const struct GNUNET_MessageHeader *mh)
1153 {
1154   struct Operation *op = cls;
1155   const struct IBFMessage *msg;
1156   unsigned int buckets_in_message;
1157
1158   if (ntohs (mh->size) < sizeof (struct IBFMessage))
1159   {
1160     GNUNET_break_op (0);
1161     fail_union_operation (op);
1162     return GNUNET_SYSERR;
1163   }
1164   msg = (const struct IBFMessage *) mh;
1165   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1166        (op->state->phase == PHASE_EXPECT_IBF) )
1167   {
1168     op->state->phase = PHASE_EXPECT_IBF_CONT;
1169     GNUNET_assert (NULL == op->state->remote_ibf);
1170     LOG (GNUNET_ERROR_TYPE_DEBUG,
1171          "Creating new ibf of size %u\n",
1172          1 << msg->order);
1173     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1174     op->state->salt_receive = ntohl (msg->salt);
1175     LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1176     if (NULL == op->state->remote_ibf)
1177     {
1178       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1179                   "Failed to parse remote IBF, closing connection\n");
1180       fail_union_operation (op);
1181       return GNUNET_SYSERR;
1182     }
1183     op->state->ibf_buckets_received = 0;
1184     if (0 != ntohl (msg->offset))
1185     {
1186       GNUNET_break_op (0);
1187       fail_union_operation (op);
1188       return GNUNET_SYSERR;
1189     }
1190   }
1191   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1192   {
1193     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1194     {
1195       GNUNET_break_op (0);
1196       fail_union_operation (op);
1197       return GNUNET_SYSERR;
1198     }
1199     if (1<<msg->order != op->state->remote_ibf->size)
1200     {
1201       GNUNET_break_op (0);
1202       fail_union_operation (op);
1203       return GNUNET_SYSERR;
1204     }
1205     if (ntohl (msg->salt) != op->state->salt_receive)
1206     {
1207       GNUNET_break_op (0);
1208       fail_union_operation (op);
1209       return GNUNET_SYSERR;
1210     }
1211   }
1212   else
1213   {
1214     GNUNET_assert (0);
1215   }
1216
1217   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1218
1219   if (0 == buckets_in_message)
1220   {
1221     GNUNET_break_op (0);
1222     fail_union_operation (op);
1223     return GNUNET_SYSERR;
1224   }
1225
1226   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1227   {
1228     GNUNET_break_op (0);
1229     fail_union_operation (op);
1230     return GNUNET_SYSERR;
1231   }
1232
1233   GNUNET_assert (NULL != op->state->remote_ibf);
1234
1235   ibf_read_slice (&msg[1],
1236                   op->state->ibf_buckets_received,
1237                   buckets_in_message,
1238                   op->state->remote_ibf);
1239   op->state->ibf_buckets_received += buckets_in_message;
1240
1241   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1242   {
1243     LOG (GNUNET_ERROR_TYPE_DEBUG,
1244          "received full ibf\n");
1245     op->state->phase = PHASE_INVENTORY_ACTIVE;
1246     if (GNUNET_OK !=
1247         decode_and_send (op))
1248     {
1249       /* Internal error, best we can do is shut down */
1250       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1251                   "Failed to decode IBF, closing connection\n");
1252       return GNUNET_SYSERR;
1253     }
1254   }
1255   return GNUNET_OK;
1256 }
1257
1258
1259 /**
1260  * Send a result message to the client indicating
1261  * that there is a new element.
1262  *
1263  * @param op union operation
1264  * @param element element to send
1265  * @param status status to send with the new element
1266  */
1267 static void
1268 send_client_element (struct Operation *op,
1269                      struct GNUNET_SET_Element *element,
1270                      int status)
1271 {
1272   struct GNUNET_MQ_Envelope *ev;
1273   struct GNUNET_SET_ResultMessage *rm;
1274
1275   LOG (GNUNET_ERROR_TYPE_DEBUG,
1276        "sending element (size %u) to client\n",
1277        element->size);
1278   GNUNET_assert (0 != op->spec->client_request_id);
1279   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1280   if (NULL == ev)
1281   {
1282     GNUNET_MQ_discard (ev);
1283     GNUNET_break (0);
1284     return;
1285   }
1286   rm->result_status = htons (status);
1287   rm->request_id = htonl (op->spec->client_request_id);
1288   rm->element_type = htons (element->element_type);
1289   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1290   GNUNET_memcpy (&rm[1], element->data, element->size);
1291   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1292 }
1293
1294
1295 /**
1296  * Signal to the client that the operation has finished and
1297  * destroy the operation.
1298  *
1299  * @param cls operation to destroy
1300  */
1301 static void
1302 send_done_and_destroy (void *cls)
1303 {
1304   struct Operation *op = cls;
1305   struct GNUNET_MQ_Envelope *ev;
1306   struct GNUNET_SET_ResultMessage *rm;
1307
1308   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1309   rm->request_id = htonl (op->spec->client_request_id);
1310   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1311   rm->element_type = htons (0);
1312   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1313   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1314   /* Will also call the union-specific cancel function. */
1315   _GSS_operation_destroy (op, GNUNET_YES);
1316 }
1317
1318
1319 static void
1320 maybe_finish (struct Operation *op)
1321 {
1322   unsigned int num_demanded;
1323
1324   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1325
1326   if (PHASE_FINISH_WAITING == op->state->phase)
1327   {
1328     LOG (GNUNET_ERROR_TYPE_DEBUG,
1329          "In PHASE_FINISH_WAITING, pending %u demands\n",
1330          num_demanded);
1331     if (0 == num_demanded)
1332     {
1333       struct GNUNET_MQ_Envelope *ev;
1334
1335       op->state->phase = PHASE_DONE;
1336       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1337       GNUNET_MQ_send (op->mq, ev);
1338
1339       /* We now wait until the other peer closes the channel
1340        * after it got all elements from us. */
1341     }
1342   }
1343   if (PHASE_FINISH_CLOSING == op->state->phase)
1344   {
1345     LOG (GNUNET_ERROR_TYPE_DEBUG,
1346          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1347          num_demanded);
1348     if (0 == num_demanded)
1349     {
1350       op->state->phase = PHASE_DONE;
1351       send_done_and_destroy (op);
1352     }
1353   }
1354 }
1355
1356
1357 /**
1358  * Handle an element message from a remote peer.
1359  * Sent by the other peer either because we decoded an IBF and placed a demand,
1360  * or because the other peer switched to full set transmission.
1361  *
1362  * @param cls the union operation
1363  * @param mh the message
1364  */
1365 static void
1366 handle_p2p_elements (void *cls,
1367                      const struct GNUNET_MessageHeader *mh)
1368 {
1369   struct Operation *op = cls;
1370   struct ElementEntry *ee;
1371   const struct GNUNET_SET_ElementMessage *emsg;
1372   uint16_t element_size;
1373
1374   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1375   {
1376     GNUNET_break_op (0);
1377     fail_union_operation (op);
1378     return;
1379   }
1380   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1381   {
1382     GNUNET_break_op (0);
1383     fail_union_operation (op);
1384     return;
1385   }
1386
1387   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1388
1389   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1390   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1391   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1392   ee->element.size = element_size;
1393   ee->element.data = &ee[1];
1394   ee->element.element_type = ntohs (emsg->element_type);
1395   ee->remote = GNUNET_YES;
1396   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1397
1398   if (GNUNET_NO ==
1399       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1400                                             &ee->element_hash,
1401                                             NULL))
1402   {
1403     /* We got something we didn't demand, since it's not in our map. */
1404     GNUNET_break_op (0);
1405     GNUNET_free (ee);
1406     fail_union_operation (op);
1407     return;
1408   }
1409
1410   LOG (GNUNET_ERROR_TYPE_DEBUG,
1411        "Got element (size %u, hash %s) from peer\n",
1412        (unsigned int) element_size,
1413        GNUNET_h2s (&ee->element_hash));
1414
1415   GNUNET_STATISTICS_update (_GSS_statistics,
1416                             "# received elements",
1417                             1,
1418                             GNUNET_NO);
1419   GNUNET_STATISTICS_update (_GSS_statistics,
1420                             "# exchanged elements",
1421                             1,
1422                             GNUNET_NO);
1423
1424   op->state->received_total += 1;
1425
1426   struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1427
1428   if (NULL != ke)
1429   {
1430     /* Got repeated element.  Should not happen since
1431      * we track demands. */
1432     GNUNET_STATISTICS_update (_GSS_statistics,
1433                               "# repeated elements",
1434                               1,
1435                               GNUNET_NO);
1436     ke->received = GNUNET_YES;
1437     GNUNET_free (ee);
1438   }
1439   else
1440   {
1441     LOG (GNUNET_ERROR_TYPE_DEBUG,
1442          "Registering new element from remote peer\n");
1443     op->state->received_fresh += 1;
1444     op_register_element (op, ee, GNUNET_YES);
1445     /* only send results immediately if the client wants it */
1446     switch (op->spec->result_mode)
1447     {
1448       case GNUNET_SET_RESULT_ADDED:
1449         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1450         break;
1451       case GNUNET_SET_RESULT_SYMMETRIC:
1452         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1453         break;
1454       default:
1455         /* Result mode not supported, should have been caught earlier. */
1456         GNUNET_break (0);
1457         break;
1458     }
1459   }
1460
1461   if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1462   {
1463     /* The other peer gave us lots of old elements, there's something wrong. */
1464     GNUNET_break_op (0);
1465     fail_union_operation (op);
1466     return;
1467   }
1468
1469   maybe_finish (op);
1470 }
1471
1472
1473 /**
1474  * Handle an element message from a remote peer.
1475  *
1476  * @param cls the union operation
1477  * @param mh the message
1478  */
1479 static void
1480 handle_p2p_full_element (void *cls,
1481                          const struct GNUNET_MessageHeader *mh)
1482 {
1483   struct Operation *op = cls;
1484   struct ElementEntry *ee;
1485   const struct GNUNET_SET_ElementMessage *emsg;
1486   uint16_t element_size;
1487
1488   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1489   {
1490     GNUNET_break_op (0);
1491     fail_union_operation (op);
1492     return;
1493   }
1494
1495   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1496
1497   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1498   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1499   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1500   ee->element.size = element_size;
1501   ee->element.data = &ee[1];
1502   ee->element.element_type = ntohs (emsg->element_type);
1503   ee->remote = GNUNET_YES;
1504   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1505
1506   LOG (GNUNET_ERROR_TYPE_DEBUG,
1507        "Got element (full diff, size %u, hash %s) from peer\n",
1508        (unsigned int) element_size,
1509        GNUNET_h2s (&ee->element_hash));
1510
1511   GNUNET_STATISTICS_update (_GSS_statistics,
1512                             "# received elements",
1513                             1,
1514                             GNUNET_NO);
1515   GNUNET_STATISTICS_update (_GSS_statistics,
1516                             "# exchanged elements",
1517                             1,
1518                             GNUNET_NO);
1519
1520   op->state->received_total += 1;
1521
1522   struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1523
1524   if (NULL != ke)
1525   {
1526     /* Got repeated element.  Should not happen since
1527      * we track demands. */
1528     GNUNET_STATISTICS_update (_GSS_statistics,
1529                               "# repeated elements",
1530                               1,
1531                               GNUNET_NO);
1532     ke->received = GNUNET_YES;
1533     GNUNET_free (ee);
1534   }
1535   else
1536   {
1537     LOG (GNUNET_ERROR_TYPE_DEBUG,
1538          "Registering new element from remote peer\n");
1539     op->state->received_fresh += 1;
1540     op_register_element (op, ee, GNUNET_YES);
1541     /* only send results immediately if the client wants it */
1542     switch (op->spec->result_mode)
1543     {
1544       case GNUNET_SET_RESULT_ADDED:
1545         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1546         break;
1547       case GNUNET_SET_RESULT_SYMMETRIC:
1548         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1549         break;
1550       default:
1551         /* Result mode not supported, should have been caught earlier. */
1552         GNUNET_break (0);
1553         break;
1554     }
1555   }
1556
1557   if ( (GNUNET_YES == op->spec->byzantine) && 
1558        (op->state->received_total > 8) && 
1559        (op->state->received_fresh < op->state->received_total / 3) )
1560   {
1561     /* The other peer gave us lots of old elements, there's something wrong. */
1562     GNUNET_break_op (0);
1563     fail_union_operation (op);
1564     return;
1565   }
1566 }
1567
1568 /**
1569  * Send offers (for GNUNET_Hash-es) in response
1570  * to inquiries (for IBF_Key-s).
1571  *
1572  * @param cls the union operation
1573  * @param mh the message
1574  */
1575 static void
1576 handle_p2p_inquiry (void *cls,
1577                     const struct GNUNET_MessageHeader *mh)
1578 {
1579   struct Operation *op = cls;
1580   const struct IBF_Key *ibf_key;
1581   unsigned int num_keys;
1582   struct InquiryMessage *msg;
1583
1584   /* look up elements and send them */
1585   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1586   {
1587     GNUNET_break_op (0);
1588     fail_union_operation (op);
1589     return;
1590   }
1591   num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1592       / sizeof (struct IBF_Key);
1593   if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1594       != num_keys * sizeof (struct IBF_Key))
1595   {
1596     GNUNET_break_op (0);
1597     fail_union_operation (op);
1598     return;
1599   }
1600
1601   msg = (struct InquiryMessage *) mh;
1602
1603   ibf_key = (const struct IBF_Key *) &msg[1];
1604   while (0 != num_keys--)
1605   {
1606     struct IBF_Key unsalted_key;
1607     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1608     send_offers_for_key (op, unsalted_key);
1609     ibf_key++;
1610   }
1611 }
1612
1613
1614 /**
1615  * Iterator over hash map entries, called to
1616  * destroy the linked list of colliding ibf key entries.
1617  *
1618  * @param cls closure
1619  * @param key current key code
1620  * @param value value in the hash map
1621  * @return #GNUNET_YES if we should continue to iterate,
1622  *         #GNUNET_NO if not.
1623  */
1624 static int
1625 send_missing_elements_iter (void *cls,
1626                             uint32_t key,
1627                             void *value)
1628 {
1629   struct Operation *op = cls;
1630   struct KeyEntry *ke = value;
1631   struct GNUNET_MQ_Envelope *ev;
1632   struct GNUNET_SET_ElementMessage *emsg;
1633   struct ElementEntry *ee = ke->element;
1634
1635   if (GNUNET_YES == ke->received)
1636     return GNUNET_YES;
1637
1638   ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1639   GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1640   emsg->reserved = htons (0);
1641   emsg->element_type = htons (ee->element.element_type);
1642   GNUNET_MQ_send (op->mq, ev);
1643
1644   return GNUNET_YES;
1645 }
1646
1647
1648 /**
1649  * Handle a 
1650  *
1651  * @parem cls closure, a set union operation
1652  * @param mh the demand message
1653  */
1654 static void
1655 handle_p2p_request_full (void *cls,
1656                          const struct GNUNET_MessageHeader *mh)
1657 {
1658   struct Operation *op = cls;
1659
1660   if (PHASE_EXPECT_IBF != op->state->phase)
1661   {
1662     fail_union_operation (op);
1663     GNUNET_break_op (0);
1664     return;
1665   }
1666
1667   // FIXME: we need to check that our set is larger than the
1668   // byzantine_lower_bound by some threshold
1669   send_full_set (op);
1670 }
1671
1672
1673 /**
1674  * Handle a "full done" message.
1675  *
1676  * @parem cls closure, a set union operation
1677  * @param mh the demand message
1678  */
1679 static void
1680 handle_p2p_full_done (void *cls,
1681                       const struct GNUNET_MessageHeader *mh)
1682 {
1683   struct Operation *op = cls;
1684
1685   if (PHASE_EXPECT_IBF == op->state->phase)
1686   {
1687     struct GNUNET_MQ_Envelope *ev;
1688
1689     LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1690
1691     /* send all the elements that did not come from the remote peer */
1692     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1693                                              &send_missing_elements_iter,
1694                                              op);
1695
1696     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1697     GNUNET_MQ_send (op->mq, ev);
1698     op->state->phase = PHASE_DONE;
1699
1700     /* we now wait until the other peer shuts the tunnel down*/
1701   }
1702   else if (PHASE_FULL_SENDING == op->state->phase)
1703   {
1704     LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1705     /* We sent the full set, and got the response for that.  We're done. */
1706     op->state->phase = PHASE_DONE;
1707     send_done_and_destroy (op);
1708   }
1709   else
1710   {
1711     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1712     GNUNET_break_op (0);
1713     fail_union_operation (op);
1714     return;
1715   }
1716 }
1717
1718
1719 /**
1720  * Handle a demand by the other peer for elements based on a list
1721  * of GNUNET_HashCode-s.
1722  *
1723  * @parem cls closure, a set union operation
1724  * @param mh the demand message
1725  */
1726 static void
1727 handle_p2p_demand (void *cls,
1728                    const struct GNUNET_MessageHeader *mh)
1729 {
1730   struct Operation *op = cls;
1731   struct ElementEntry *ee;
1732   struct GNUNET_SET_ElementMessage *emsg;
1733   const struct GNUNET_HashCode *hash;
1734   unsigned int num_hashes;
1735   struct GNUNET_MQ_Envelope *ev;
1736
1737   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1738     / sizeof (struct GNUNET_HashCode);
1739   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1740       != num_hashes * sizeof (struct GNUNET_HashCode))
1741   {
1742     GNUNET_break_op (0);
1743     fail_union_operation (op);
1744     return;
1745   }
1746
1747   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1748        num_hashes > 0;
1749        hash++, num_hashes--)
1750   {
1751     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1752     if (NULL == ee)
1753     {
1754       /* Demand for non-existing element. */
1755       GNUNET_break_op (0);
1756       fail_union_operation (op);
1757       return;
1758     }
1759     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1760     {
1761       /* Probably confused lazily copied sets. */
1762       GNUNET_break_op (0);
1763       fail_union_operation (op);
1764       return;
1765     }
1766     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1767     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1768     emsg->reserved = htons (0);
1769     emsg->element_type = htons (ee->element.element_type);
1770     LOG (GNUNET_ERROR_TYPE_DEBUG,
1771          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1772          (void *) op,
1773          (unsigned int) ee->element.size,
1774          GNUNET_h2s (&ee->element_hash));
1775     GNUNET_MQ_send (op->mq, ev);
1776     GNUNET_STATISTICS_update (_GSS_statistics,
1777                               "# exchanged elements",
1778                               1,
1779                               GNUNET_NO);
1780
1781     switch (op->spec->result_mode)
1782     {
1783       case GNUNET_SET_RESULT_ADDED:
1784         /* Nothing to do. */
1785         break;
1786       case GNUNET_SET_RESULT_SYMMETRIC:
1787         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1788         break;
1789       default:
1790         /* Result mode not supported, should have been caught earlier. */
1791         GNUNET_break (0);
1792         break;
1793     }
1794   }
1795 }
1796
1797
1798 /**
1799  * Handle offers (of GNUNET_HashCode-s) and
1800  * respond with demands (of GNUNET_HashCode-s).
1801  *
1802  * @param cls the union operation
1803  * @param mh the message
1804  */
1805 static void
1806 handle_p2p_offer (void *cls,
1807                     const struct GNUNET_MessageHeader *mh)
1808 {
1809   struct Operation *op = cls;
1810   const struct GNUNET_HashCode *hash;
1811   unsigned int num_hashes;
1812
1813   /* look up elements and send them */
1814   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1815        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1816   {
1817     GNUNET_break_op (0);
1818     fail_union_operation (op);
1819     return;
1820   }
1821   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1822     / sizeof (struct GNUNET_HashCode);
1823   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1824       != num_hashes * sizeof (struct GNUNET_HashCode))
1825   {
1826     GNUNET_break_op (0);
1827     fail_union_operation (op);
1828     return;
1829   }
1830
1831   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1832        num_hashes > 0;
1833        hash++, num_hashes--)
1834   {
1835     struct ElementEntry *ee;
1836     struct GNUNET_MessageHeader *demands;
1837     struct GNUNET_MQ_Envelope *ev;
1838
1839     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1840                                             hash);
1841     if (NULL != ee)
1842       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1843         continue;
1844
1845     if (GNUNET_YES ==
1846         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1847                                                 hash))
1848     {
1849       LOG (GNUNET_ERROR_TYPE_DEBUG,
1850            "Skipped sending duplicate demand\n");
1851       continue;
1852     }
1853
1854     GNUNET_assert (GNUNET_OK ==
1855                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1856                                                       hash,
1857                                                       NULL,
1858                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1859
1860     LOG (GNUNET_ERROR_TYPE_DEBUG,
1861          "[OP %x] Requesting element (hash %s)\n",
1862          (void *) op, GNUNET_h2s (hash));
1863     ev = GNUNET_MQ_msg_header_extra (demands,
1864                                      sizeof (struct GNUNET_HashCode),
1865                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1866     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1867     GNUNET_MQ_send (op->mq, ev);
1868   }
1869 }
1870
1871
1872 /**
1873  * Handle a done message from a remote peer
1874  *
1875  * @param cls the union operation
1876  * @param mh the message
1877  */
1878 static void
1879 handle_p2p_done (void *cls,
1880                  const struct GNUNET_MessageHeader *mh)
1881 {
1882   struct Operation *op = cls;
1883
1884   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1885   {
1886     /* We got all requests, but still have to send our elements in response. */
1887
1888     op->state->phase = PHASE_FINISH_WAITING;
1889
1890     LOG (GNUNET_ERROR_TYPE_DEBUG,
1891          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1892     /* The active peer is done sending offers
1893      * and inquiries.  This means that all
1894      * our responses to that (demands and offers)
1895      * must be in flight (queued or in mesh).
1896      *
1897      * We should notify the active peer once
1898      * all our demands are satisfied, so that the active
1899      * peer can quit if we gave him everything.
1900      */
1901     maybe_finish (op);
1902     return;
1903   }
1904   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1905   {
1906     LOG (GNUNET_ERROR_TYPE_DEBUG,
1907          "got DONE (as active partner), waiting to finish\n");
1908     /* All demands of the other peer are satisfied,
1909      * and we processed all offers, thus we know
1910      * exactly what our demands must be.
1911      *
1912      * We'll close the channel
1913      * to the other peer once our demands are met.
1914      */
1915     op->state->phase = PHASE_FINISH_CLOSING;
1916     maybe_finish (op);
1917     return;
1918   }
1919   GNUNET_break_op (0);
1920   fail_union_operation (op);
1921 }
1922
1923
1924 /**
1925  * Initiate operation to evaluate a set union with a remote peer.
1926  *
1927  * @param op operation to perform (to be initialized)
1928  * @param opaque_context message to be transmitted to the listener
1929  *        to convince him to accept, may be NULL
1930  */
1931 static void
1932 union_evaluate (struct Operation *op,
1933                 const struct GNUNET_MessageHeader *opaque_context)
1934 {
1935   struct GNUNET_MQ_Envelope *ev;
1936   struct OperationRequestMessage *msg;
1937
1938   GNUNET_assert (NULL == op->state);
1939   op->state = GNUNET_new (struct OperationState);
1940   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1941   /* copy the current generation's strata estimator for this operation */
1942   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1943   /* we started the operation, thus we have to send the operation request */
1944   op->state->phase = PHASE_EXPECT_SE;
1945   op->state->salt_receive = op->state->salt_send = 42;
1946   LOG (GNUNET_ERROR_TYPE_DEBUG,
1947        "Initiating union operation evaluation\n");
1948   GNUNET_STATISTICS_update (_GSS_statistics,
1949                             "# of total union operations",
1950                             1,
1951                             GNUNET_NO);
1952   GNUNET_STATISTICS_update (_GSS_statistics,
1953                             "# of initiated union operations",
1954                             1,
1955                             GNUNET_NO);
1956   ev = GNUNET_MQ_msg_nested_mh (msg,
1957                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1958                                 opaque_context);
1959   if (NULL == ev)
1960   {
1961     /* the context message is too large */
1962     GNUNET_break (0);
1963     GNUNET_SERVICE_client_drop (op->spec->set->client);
1964     return;
1965   }
1966   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1967   GNUNET_MQ_send (op->mq,
1968                   ev);
1969
1970   if (NULL != opaque_context)
1971     LOG (GNUNET_ERROR_TYPE_DEBUG,
1972          "sent op request with context message\n");
1973   else
1974     LOG (GNUNET_ERROR_TYPE_DEBUG,
1975          "sent op request without context message\n");
1976
1977   initialize_key_to_element (op);
1978   op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
1979 }
1980
1981
1982 /**
1983  * Accept an union operation request from a remote peer.
1984  * Only initializes the private operation state.
1985  *
1986  * @param op operation that will be accepted as a union operation
1987  */
1988 static void
1989 union_accept (struct Operation *op)
1990 {
1991   LOG (GNUNET_ERROR_TYPE_DEBUG,
1992        "accepting set union operation\n");
1993   GNUNET_assert (NULL == op->state);
1994
1995   GNUNET_STATISTICS_update (_GSS_statistics,
1996                             "# of accepted union operations",
1997                             1,
1998                             GNUNET_NO);
1999   GNUNET_STATISTICS_update (_GSS_statistics,
2000                             "# of total union operations",
2001                             1,
2002                             GNUNET_NO);
2003
2004   op->state = GNUNET_new (struct OperationState);
2005   op->state->se = strata_estimator_dup (op->spec->set->state->se);
2006   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2007   op->state->salt_receive = op->state->salt_send = 42;
2008   initialize_key_to_element (op);
2009   op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2010   /* kick off the operation */
2011   send_strata_estimator (op);
2012 }
2013
2014
2015 /**
2016  * Create a new set supporting the union operation
2017  *
2018  * We maintain one strata estimator per set and then manipulate it over the
2019  * lifetime of the set, as recreating a strata estimator would be expensive.
2020  *
2021  * @return the newly created set, NULL on error
2022  */
2023 static struct SetState *
2024 union_set_create (void)
2025 {
2026   struct SetState *set_state;
2027
2028   LOG (GNUNET_ERROR_TYPE_DEBUG,
2029        "union set created\n");
2030   set_state = GNUNET_new (struct SetState);
2031   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2032                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2033   if (NULL == set_state->se)
2034   {
2035     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2036                 "Failed to allocate strata estimator\n");
2037     GNUNET_free (set_state);
2038     return NULL;
2039   }
2040   return set_state;
2041 }
2042
2043
2044 /**
2045  * Add the element from the given element message to the set.
2046  *
2047  * @param set_state state of the set want to add to
2048  * @param ee the element to add to the set
2049  */
2050 static void
2051 union_add (struct SetState *set_state, struct ElementEntry *ee)
2052 {
2053   strata_estimator_insert (set_state->se,
2054                            get_ibf_key (&ee->element_hash));
2055 }
2056
2057
2058 /**
2059  * Remove the element given in the element message from the set.
2060  * Only marks the element as removed, so that older set operations can still exchange it.
2061  *
2062  * @param set_state state of the set to remove from
2063  * @param ee set element to remove
2064  */
2065 static void
2066 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2067 {
2068   strata_estimator_remove (set_state->se,
2069                            get_ibf_key (&ee->element_hash));
2070 }
2071
2072
2073 /**
2074  * Destroy a set that supports the union operation.
2075  *
2076  * @param set_state the set to destroy
2077  */
2078 static void
2079 union_set_destroy (struct SetState *set_state)
2080 {
2081   if (NULL != set_state->se)
2082   {
2083     strata_estimator_destroy (set_state->se);
2084     set_state->se = NULL;
2085   }
2086   GNUNET_free (set_state);
2087 }
2088
2089
2090 /**
2091  * Dispatch messages for a union operation.
2092  *
2093  * @param op the state of the union evaluate operation
2094  * @param mh the received message
2095  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2096  *         #GNUNET_OK otherwise
2097  */
2098 int
2099 union_handle_p2p_message (struct Operation *op,
2100                           const struct GNUNET_MessageHeader *mh)
2101 {
2102   //LOG (GNUNET_ERROR_TYPE_DEBUG,
2103   //            "received p2p message (t: %u, s: %u)\n",
2104   //            ntohs (mh->type),
2105   //            ntohs (mh->size));
2106   switch (ntohs (mh->type))
2107   {
2108     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2109       return handle_p2p_ibf (op, mh);
2110     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2111       return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2112     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2113       return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2114     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2115       handle_p2p_elements (op, mh);
2116       break;
2117     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2118       handle_p2p_full_element (op, mh);
2119       break;
2120     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2121       handle_p2p_inquiry (op, mh);
2122       break;
2123     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2124       handle_p2p_done (op, mh);
2125       break;
2126     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2127       handle_p2p_offer (op, mh);
2128       break;
2129     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2130       handle_p2p_demand (op, mh);
2131       break;
2132     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2133       handle_p2p_full_done (op, mh);
2134       break;
2135     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2136       handle_p2p_request_full (op, mh);
2137       break;
2138     default:
2139       /* Something wrong with cadet's message handlers? */
2140       GNUNET_assert (0);
2141   }
2142   return GNUNET_OK;
2143 }
2144
2145
2146 /**
2147  * Handler for peer-disconnects, notifies the client
2148  * about the aborted operation in case the op was not concluded.
2149  *
2150  * @param op the destroyed operation
2151  */
2152 static void
2153 union_peer_disconnect (struct Operation *op)
2154 {
2155   if (PHASE_DONE != op->state->phase)
2156   {
2157     struct GNUNET_MQ_Envelope *ev;
2158     struct GNUNET_SET_ResultMessage *msg;
2159
2160     ev = GNUNET_MQ_msg (msg,
2161                         GNUNET_MESSAGE_TYPE_SET_RESULT);
2162     msg->request_id = htonl (op->spec->client_request_id);
2163     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2164     msg->element_type = htons (0);
2165     GNUNET_MQ_send (op->spec->set->client_mq,
2166                     ev);
2167     LOG (GNUNET_ERROR_TYPE_WARNING,
2168          "other peer disconnected prematurely, phase %u\n",
2169          op->state->phase);
2170     _GSS_operation_destroy (op,
2171                             GNUNET_YES);
2172     return;
2173   }
2174   // else: the session has already been concluded
2175   LOG (GNUNET_ERROR_TYPE_DEBUG,
2176        "other peer disconnected (finished)\n");
2177   if (GNUNET_NO == op->state->client_done_sent)
2178     send_done_and_destroy (op);
2179 }
2180
2181
2182 /**
2183  * Copy union-specific set state.
2184  *
2185  * @param set source set for copying the union state
2186  * @return a copy of the union-specific set state
2187  */
2188 static struct SetState *
2189 union_copy_state (struct Set *set)
2190 {
2191   struct SetState *new_state;
2192
2193   new_state = GNUNET_new (struct SetState);
2194   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2195   new_state->se = strata_estimator_dup (set->state->se);
2196
2197   return new_state;
2198 }
2199
2200
2201 /**
2202  * Get the table with implementing functions for
2203  * set union.
2204  *
2205  * @return the operation specific VTable
2206  */
2207 const struct SetVT *
2208 _GSS_union_vt ()
2209 {
2210   static const struct SetVT union_vt = {
2211     .create = &union_set_create,
2212     .msg_handler = &union_handle_p2p_message,
2213     .add = &union_add,
2214     .remove = &union_remove,
2215     .destroy_set = &union_set_destroy,
2216     .evaluate = &union_evaluate,
2217     .accept = &union_accept,
2218     .peer_disconnect = &union_peer_disconnect,
2219     .cancel = &union_op_cancel,
2220     .copy_state = &union_copy_state,
2221   };
2222
2223   return &union_vt;
2224 }