set: more statistics
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2016 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
33 #include <gcrypt.h>
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
37
38
39 /**
40  * Number of IBFs in a strata estimator.
41  */
42 #define SE_STRATA_COUNT 32
43
44 /**
45  * Size of the IBFs in the strata estimator.
46  */
47 #define SE_IBF_SIZE 80
48
49 /**
50  * The hash num parameter for the difference digests and strata estimators.
51  */
52 #define SE_IBF_HASH_NUM 4
53
54 /**
55  * Number of buckets that can be transmitted in one message.
56  */
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
58
59 /**
60  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61  * Choose this value so that computing the IBF is still cheaper
62  * than transmitting all values.
63  */
64 #define MAX_IBF_ORDER (20)
65
66 /**
67  * Number of buckets used in the ibf per estimated
68  * difference.
69  */
70 #define IBF_ALPHA 4
71
72
73 /**
74  * Current phase we are in for a union operation.
75  */
76 enum UnionOperationPhase
77 {
78   /**
79    * We sent the request message, and expect a strata estimator.
80    */
81   PHASE_EXPECT_SE,
82
83   /**
84    * We sent the strata estimator, and expect an IBF. This phase is entered once
85    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86    *
87    * XXX: could use better wording.
88    * XXX: repurposed to also expect a "request full set" message, should be renamed
89    *
90    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
91    */
92   PHASE_EXPECT_IBF,
93
94   /**
95    * Continuation for multi part IBFs.
96    */
97   PHASE_EXPECT_IBF_CONT,
98
99   /**
100    * We are decoding an IBF.
101    */
102   PHASE_INVENTORY_ACTIVE,
103
104   /**
105    * The other peer is decoding the IBF we just sent.
106    */
107   PHASE_INVENTORY_PASSIVE,
108
109   /**
110    * The protocol is almost finished, but we still have to flush our message
111    * queue and/or expect some elements.
112    */
113   PHASE_FINISH_CLOSING,
114
115   /**
116    * In the penultimate phase,
117    * we wait until all our demands
118    * are satisfied.  Then we send a done
119    * message, and wait for another done message.
120    */
121   PHASE_FINISH_WAITING,
122
123   /**
124    * In the ultimate phase, we wait until
125    * our demands are satisfied and then
126    * quit (sending another DONE message).
127    */
128   PHASE_DONE,
129
130   /**
131    * After sending the full set, wait for responses with the elements
132    * that the local peer is missing.
133    */
134   PHASE_FULL_SENDING,
135 };
136
137
138 /**
139  * State of an evaluate operation with another peer.
140  */
141 struct OperationState
142 {
143   /**
144    * Copy of the set's strata estimator at the time of
145    * creation of this operation.
146    */
147   struct StrataEstimator *se;
148
149   /**
150    * The IBF we currently receive.
151    */
152   struct InvertibleBloomFilter *remote_ibf;
153
154   /**
155    * The IBF with the local set's element.
156    */
157   struct InvertibleBloomFilter *local_ibf;
158
159   /**
160    * Maps unsalted IBF-Keys to elements.
161    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
162    * Colliding IBF-Keys are linked.
163    */
164   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
165
166   /**
167    * Current state of the operation.
168    */
169   enum UnionOperationPhase phase;
170
171   /**
172    * Did we send the client that we are done?
173    */
174   int client_done_sent;
175
176   /**
177    * Number of ibf buckets already received into the @a remote_ibf.
178    */
179   unsigned int ibf_buckets_received;
180
181   /**
182    * Hashes for elements that we have demanded from the other peer.
183    */
184   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
185
186   /**
187    * Salt that we're using for sending IBFs
188    */
189   uint32_t salt_send;
190
191   /**
192    * Salt for the IBF we've received and that we're currently decoding.
193    */
194   uint32_t salt_receive;
195
196   /**
197    * Number of elements we received from the other peer
198    * that were not in the local set yet.
199    */
200   uint32_t received_fresh;
201
202   /**
203    * Total number of elements received from the other peer.
204    */
205   uint32_t received_total;
206
207   /**
208    * Initial size of our set, just before
209    * the operation started.
210    */
211   uint64_t initial_size;
212 };
213
214
215 /**
216  * The key entry is used to associate an ibf key with an element.
217  */
218 struct KeyEntry
219 {
220   /**
221    * IBF key for the entry, derived from the current salt.
222    */
223   struct IBF_Key ibf_key;
224
225   /**
226    * The actual element associated with the key.
227    *
228    * Only owned by the union operation if element->operation
229    * is #GNUNET_YES.
230    */
231   struct ElementEntry *element;
232
233   /**
234    * Did we receive this element?
235    * Even if element->is_foreign is false, we might
236    * have received the element, so this indicates that
237    * the other peer has it.
238    */
239   int received;
240 };
241
242
243 /**
244  * Used as a closure for sending elements
245  * with a specific IBF key.
246  */
247 struct SendElementClosure
248 {
249   /**
250    * The IBF key whose matching elements should be
251    * sent.
252    */
253   struct IBF_Key ibf_key;
254
255   /**
256    * Operation for which the elements
257    * should be sent.
258    */
259   struct Operation *op;
260 };
261
262
263 /**
264  * Extra state required for efficient set union.
265  */
266 struct SetState
267 {
268   /**
269    * The strata estimator is only generated once for
270    * each set.
271    * The IBF keys are derived from the element hashes with
272    * salt=0.
273    */
274   struct StrataEstimator *se;
275 };
276
277
278 /**
279  * Iterator over hash map entries, called to
280  * destroy the linked list of colliding ibf key entries.
281  *
282  * @param cls closure
283  * @param key current key code
284  * @param value value in the hash map
285  * @return #GNUNET_YES if we should continue to iterate,
286  *         #GNUNET_NO if not.
287  */
288 static int
289 destroy_key_to_element_iter (void *cls,
290                              uint32_t key,
291                              void *value)
292 {
293   struct KeyEntry *k = value;
294
295   GNUNET_assert (NULL != k);
296   if (GNUNET_YES == k->element->remote)
297   {
298     GNUNET_free (k->element);
299     k->element = NULL;
300   }
301   GNUNET_free (k);
302   return GNUNET_YES;
303 }
304
305
306 /**
307  * Destroy the union operation.  Only things specific to the union
308  * operation are destroyed.
309  *
310  * @param op union operation to destroy
311  */
312 static void
313 union_op_cancel (struct Operation *op)
314 {
315   LOG (GNUNET_ERROR_TYPE_DEBUG,
316        "destroying union op\n");
317   /* check if the op was canceled twice */
318   GNUNET_assert (NULL != op->state);
319   if (NULL != op->state->remote_ibf)
320   {
321     ibf_destroy (op->state->remote_ibf);
322     op->state->remote_ibf = NULL;
323   }
324   if (NULL != op->state->demanded_hashes)
325   {
326     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
327     op->state->demanded_hashes = NULL;
328   }
329   if (NULL != op->state->local_ibf)
330   {
331     ibf_destroy (op->state->local_ibf);
332     op->state->local_ibf = NULL;
333   }
334   if (NULL != op->state->se)
335   {
336     strata_estimator_destroy (op->state->se);
337     op->state->se = NULL;
338   }
339   if (NULL != op->state->key_to_element)
340   {
341     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
342                                              &destroy_key_to_element_iter,
343                                              NULL);
344     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
345     op->state->key_to_element = NULL;
346   }
347   GNUNET_free (op->state);
348   op->state = NULL;
349   LOG (GNUNET_ERROR_TYPE_DEBUG,
350        "destroying union op done\n");
351 }
352
353
354 /**
355  * Inform the client that the union operation has failed,
356  * and proceed to destroy the evaluate operation.
357  *
358  * @param op the union operation to fail
359  */
360 static void
361 fail_union_operation (struct Operation *op)
362 {
363   struct GNUNET_MQ_Envelope *ev;
364   struct GNUNET_SET_ResultMessage *msg;
365
366   LOG (GNUNET_ERROR_TYPE_ERROR,
367        "union operation failed\n");
368   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
369   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
370   msg->request_id = htonl (op->spec->client_request_id);
371   msg->element_type = htons (0);
372   GNUNET_MQ_send (op->spec->set->client_mq, ev);
373   _GSS_operation_destroy (op, GNUNET_YES);
374 }
375
376
377 /**
378  * Derive the IBF key from a hash code and
379  * a salt.
380  *
381  * @param src the hash code
382  * @return the derived IBF key
383  */
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
386 {
387   struct IBF_Key key;
388   uint16_t salt = 0;
389
390   GNUNET_CRYPTO_kdf (&key, sizeof (key),
391                      src, sizeof *src,
392                      &salt, sizeof (salt),
393                      NULL, 0);
394   return key;
395 }
396
397
398 /**
399  * Context for #op_get_element_iterator
400  */
401 struct GetElementContext
402 {
403   struct GNUNET_HashCode hash;
404   struct KeyEntry *k;
405 };
406
407
408 /**
409  * Iterator over the mapping from IBF keys to element entries.  Checks if we
410  * have an element with a given GNUNET_HashCode.
411  *
412  * @param cls closure
413  * @param key current key code
414  * @param value value in the hash map
415  * @return #GNUNET_YES if we should search further,
416  *         #GNUNET_NO if we've found the element.
417  */
418 static int
419 op_get_element_iterator (void *cls,
420                          uint32_t key,
421                          void *value)
422 {
423   struct GetElementContext *ctx = cls;
424   struct KeyEntry *k = value;
425
426   GNUNET_assert (NULL != k);
427   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
428                                    &ctx->hash))
429   {
430     ctx->k = k;
431     return GNUNET_NO;
432   }
433   return GNUNET_YES;
434 }
435
436
437 /**
438  * Determine whether the given element is already in the operation's element
439  * set.
440  *
441  * @param op operation that should be tested for 'element_hash'
442  * @param element_hash hash of the element to look for
443  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
444  */
445 static struct KeyEntry *
446 op_get_element (struct Operation *op,
447                 const struct GNUNET_HashCode *element_hash)
448 {
449   int ret;
450   struct IBF_Key ibf_key;
451   struct GetElementContext ctx = {{{ 0 }} , 0};
452
453   ctx.hash = *element_hash;
454
455   ibf_key = get_ibf_key (element_hash);
456   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
457                                                       (uint32_t) ibf_key.key_val,
458                                                       op_get_element_iterator,
459                                                       &ctx);
460
461   /* was the iteration aborted because we found the element? */
462   if (GNUNET_SYSERR == ret)
463   {
464     GNUNET_assert (NULL != ctx.k);
465     return ctx.k;
466   }
467   return NULL;
468 }
469
470
471 /**
472  * Insert an element into the union operation's
473  * key-to-element mapping. Takes ownership of 'ee'.
474  * Note that this does not insert the element in the set,
475  * only in the operation's key-element mapping.
476  * This is done to speed up re-tried operations, if some elements
477  * were transmitted, and then the IBF fails to decode.
478  *
479  * XXX: clarify ownership, doesn't sound right.
480  *
481  * @param op the union operation
482  * @param ee the element entry
483  * @parem received was this element received from the remote peer?
484  */
485 static void
486 op_register_element (struct Operation *op,
487                      struct ElementEntry *ee,
488                      int received)
489 {
490   struct IBF_Key ibf_key;
491   struct KeyEntry *k;
492
493   ibf_key = get_ibf_key (&ee->element_hash);
494   k = GNUNET_new (struct KeyEntry);
495   k->element = ee;
496   k->ibf_key = ibf_key;
497   k->received = received;
498   GNUNET_assert (GNUNET_OK ==
499                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
500                                                       (uint32_t) ibf_key.key_val,
501                                                       k,
502                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
503 }
504
505
506 static void
507 salt_key (const struct IBF_Key *k_in,
508           uint32_t salt,
509           struct IBF_Key *k_out)
510 {
511   int s = salt % 64;
512   uint64_t x = k_in->key_val;
513   /* rotate ibf key */
514   x = (x >> s) | (x << (64 - s));
515   k_out->key_val = x;
516 }
517
518
519 static void
520 unsalt_key (const struct IBF_Key *k_in,
521             uint32_t salt,
522             struct IBF_Key *k_out)
523 {
524   int s = salt % 64;
525   uint64_t x = k_in->key_val;
526   x = (x << s) | (x >> (64 - s));
527   k_out->key_val = x;
528 }
529
530
531 /**
532  * Insert a key into an ibf.
533  *
534  * @param cls the ibf
535  * @param key unused
536  * @param value the key entry to get the key from
537  */
538 static int
539 prepare_ibf_iterator (void *cls,
540                       uint32_t key,
541                       void *value)
542 {
543   struct Operation *op = cls;
544   struct KeyEntry *ke = value;
545   struct IBF_Key salted_key;
546
547   LOG (GNUNET_ERROR_TYPE_DEBUG,
548        "[OP %x] inserting %lx (hash %s) into ibf\n",
549        (void *) op,
550        (unsigned long) ke->ibf_key.key_val,
551        GNUNET_h2s (&ke->element->element_hash));
552   salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
553   ibf_insert (op->state->local_ibf, salted_key);
554   return GNUNET_YES;
555 }
556
557
558 /**
559  * Iterator for initializing the
560  * key-to-element mapping of a union operation
561  *
562  * @param cls the union operation `struct Operation *`
563  * @param key unused
564  * @param value the `struct ElementEntry *` to insert
565  *        into the key-to-element mapping
566  * @return #GNUNET_YES (to continue iterating)
567  */
568 static int
569 init_key_to_element_iterator (void *cls,
570                               const struct GNUNET_HashCode *key,
571                               void *value)
572 {
573   struct Operation *op = cls;
574   struct ElementEntry *ee = value;
575
576   /* make sure that the element belongs to the set at the time
577    * of creating the operation */
578   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
579     return GNUNET_YES;
580
581   GNUNET_assert (GNUNET_NO == ee->remote);
582
583   op_register_element (op, ee, GNUNET_NO);
584   return GNUNET_YES;
585 }
586
587
588 /**
589  * Initialize the IBF key to element mapping local to this set
590  * operation.
591  *
592  * @param op the set union operation
593  */
594 static void
595 initialize_key_to_element (struct Operation *op)
596 {
597   unsigned int len;
598
599   GNUNET_assert (NULL == op->state->key_to_element);
600   len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
603 }
604
605
606 /**
607  * Create an ibf with the operation's elements
608  * of the specified size
609  *
610  * @param op the union operation
611  * @param size size of the ibf to create
612  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
613  */
614 static int
615 prepare_ibf (struct Operation *op,
616              uint32_t size)
617 {
618   GNUNET_assert (NULL != op->state->key_to_element);
619
620   if (NULL != op->state->local_ibf)
621     ibf_destroy (op->state->local_ibf);
622   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
623   if (NULL == op->state->local_ibf)
624   {
625     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
626                 "Failed to allocate local IBF\n");
627     return GNUNET_SYSERR;
628   }
629   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
630                                            &prepare_ibf_iterator,
631                                            op);
632   return GNUNET_OK;
633 }
634
635
636 /**
637  * Send an ibf of appropriate size.
638  *
639  * Fragments the IBF into multiple messages if necessary.
640  *
641  * @param op the union operation
642  * @param ibf_order order of the ibf to send, size=2^order
643  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
644  */
645 static int
646 send_ibf (struct Operation *op,
647           uint16_t ibf_order)
648 {
649   unsigned int buckets_sent = 0;
650   struct InvertibleBloomFilter *ibf;
651
652   if (GNUNET_OK !=
653       prepare_ibf (op, 1<<ibf_order))
654   {
655     /* allocation failed */
656     return GNUNET_SYSERR;
657   }
658
659   LOG (GNUNET_ERROR_TYPE_DEBUG,
660        "sending ibf of size %u\n",
661        1<<ibf_order);
662
663   {
664     char name[64] = { 0 };
665     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
666     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
667   }
668
669   ibf = op->state->local_ibf;
670
671   while (buckets_sent < (1 << ibf_order))
672   {
673     unsigned int buckets_in_message;
674     struct GNUNET_MQ_Envelope *ev;
675     struct IBFMessage *msg;
676
677     buckets_in_message = (1 << ibf_order) - buckets_sent;
678     /* limit to maximum */
679     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
680       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
681
682     ev = GNUNET_MQ_msg_extra (msg,
683                               buckets_in_message * IBF_BUCKET_SIZE,
684                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
685     msg->reserved1 = 0;
686     msg->reserved2 = 0;
687     msg->order = ibf_order;
688     msg->offset = htonl (buckets_sent);
689     msg->salt = htonl (op->state->salt_send);
690     ibf_write_slice (ibf, buckets_sent,
691                      buckets_in_message, &msg[1]);
692     buckets_sent += buckets_in_message;
693     LOG (GNUNET_ERROR_TYPE_DEBUG,
694          "ibf chunk size %u, %u/%u sent\n",
695          buckets_in_message,
696          buckets_sent,
697          1<<ibf_order);
698     GNUNET_MQ_send (op->mq, ev);
699   }
700
701   /* The other peer must decode the IBF, so
702    * we're passive. */
703   op->state->phase = PHASE_INVENTORY_PASSIVE;
704   return GNUNET_OK;
705 }
706
707
708 /**
709  * Send a strata estimator to the remote peer.
710  *
711  * @param op the union operation with the remote peer
712  */
713 static void
714 send_strata_estimator (struct Operation *op)
715 {
716   const struct StrataEstimator *se = op->state->se;
717   struct GNUNET_MQ_Envelope *ev;
718   struct StrataEstimatorMessage *strata_msg;
719   char *buf;
720   size_t len;
721   uint16_t type;
722
723   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
724   len = strata_estimator_write (op->state->se,
725                                 buf);
726   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
727     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
728   else
729     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
730   ev = GNUNET_MQ_msg_extra (strata_msg,
731                             len,
732                             type);
733   GNUNET_memcpy (&strata_msg[1],
734           buf,
735           len);
736   GNUNET_free (buf);
737   strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
738   GNUNET_MQ_send (op->mq,
739                   ev);
740   op->state->phase = PHASE_EXPECT_IBF;
741   LOG (GNUNET_ERROR_TYPE_DEBUG,
742        "sent SE, expecting IBF\n");
743 }
744
745
746 /**
747  * Compute the necessary order of an ibf
748  * from the size of the symmetric set difference.
749  *
750  * @param diff the difference
751  * @return the required size of the ibf
752  */
753 static unsigned int
754 get_order_from_difference (unsigned int diff)
755 {
756   unsigned int ibf_order;
757
758   ibf_order = 2;
759   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
760           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
761     ibf_order++;
762   if (ibf_order > MAX_IBF_ORDER)
763     ibf_order = MAX_IBF_ORDER;
764   // add one for correction
765   return ibf_order + 1;
766 }
767
768
769 /**
770  * Send a set element.
771  *
772  * @param cls the union operation `struct Operation *`
773  * @param key unused
774  * @param value the `struct ElementEntry *` to insert
775  *        into the key-to-element mapping
776  * @return #GNUNET_YES (to continue iterating)
777  */
778 static int
779 send_element_iterator (void *cls,
780                        const struct GNUNET_HashCode *key,
781                        void *value)
782 {
783   struct Operation *op = cls;
784   struct GNUNET_SET_ElementMessage *emsg;
785   struct ElementEntry *ee = value;
786   struct GNUNET_SET_Element *el = &ee->element;
787   struct GNUNET_MQ_Envelope *ev;
788
789
790   ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
791   emsg->element_type = htons (el->element_type);
792   GNUNET_memcpy (&emsg[1], el->data, el->size);
793   GNUNET_MQ_send (op->mq, ev);
794   return GNUNET_YES;
795 }
796
797
798 static void
799 send_full_set (struct Operation *op)
800 {
801   struct GNUNET_MQ_Envelope *ev;
802
803   op->state->phase = PHASE_FULL_SENDING;
804
805   (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
806                                                 &send_element_iterator, op);
807   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
808   GNUNET_MQ_send (op->mq, ev);
809 }
810
811
812 /**
813  * Handle a strata estimator from a remote peer
814  *
815  * @param cls the union operation
816  * @param mh the message
817  * @param is_compressed #GNUNET_YES if the estimator is compressed
818  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
819  *         #GNUNET_OK otherwise
820  */
821 static int
822 handle_p2p_strata_estimator (void *cls,
823                              const struct GNUNET_MessageHeader *mh,
824                              int is_compressed)
825 {
826   struct Operation *op = cls;
827   struct StrataEstimator *remote_se;
828   struct StrataEstimatorMessage *msg = (void *) mh;
829   unsigned int diff;
830   uint64_t other_size;
831   size_t len;
832
833   GNUNET_STATISTICS_update (_GSS_statistics,
834                             "# bytes of SE received",
835                             ntohs (mh->size),
836                             GNUNET_NO);
837
838   if (op->state->phase != PHASE_EXPECT_SE)
839   {
840     GNUNET_break (0);
841     fail_union_operation (op);
842     return GNUNET_SYSERR;
843   }
844   len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
845   if ( (GNUNET_NO == is_compressed) &&
846        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
847   {
848     fail_union_operation (op);
849     GNUNET_break (0);
850     return GNUNET_SYSERR;
851   }
852   other_size = GNUNET_ntohll (msg->set_size);
853   remote_se = strata_estimator_create (SE_STRATA_COUNT,
854                                        SE_IBF_SIZE,
855                                        SE_IBF_HASH_NUM);
856   if (NULL == remote_se)
857   {
858     /* insufficient resources, fail */
859     fail_union_operation (op);
860     return GNUNET_SYSERR;
861   }
862   if (GNUNET_OK !=
863       strata_estimator_read (&msg[1],
864                              len,
865                              is_compressed,
866                              remote_se))
867   {
868     /* decompression failed */
869     fail_union_operation (op);
870     strata_estimator_destroy (remote_se);
871     return GNUNET_SYSERR;
872   }
873   GNUNET_assert (NULL != op->state->se);
874   diff = strata_estimator_difference (remote_se,
875                                       op->state->se);
876   strata_estimator_destroy (remote_se);
877   strata_estimator_destroy (op->state->se);
878   op->state->se = NULL;
879   LOG (GNUNET_ERROR_TYPE_DEBUG,
880        "got se diff=%d, using ibf size %d\n",
881        diff,
882        1<<get_order_from_difference (diff));
883
884   if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
885   {
886     GNUNET_break (0);
887     fail_union_operation (op);
888     return GNUNET_SYSERR;
889   }
890
891
892   if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2))
893   {
894     LOG (GNUNET_ERROR_TYPE_INFO,
895          "Sending full set (diff=%d, own set=%u)\n",
896          diff,
897          op->state->initial_size);
898     GNUNET_STATISTICS_update (_GSS_statistics,
899                               "# of full sends",
900                               1,
901                               GNUNET_NO);
902     if (op->state->initial_size <= other_size)
903     {
904       send_full_set (op);
905     }
906     else
907     {
908       struct GNUNET_MQ_Envelope *ev;
909       op->state->phase = PHASE_EXPECT_IBF;
910       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
911       GNUNET_MQ_send (op->mq, ev);
912     }
913   }
914   else
915   {
916     GNUNET_STATISTICS_update (_GSS_statistics,
917                               "# of ibf sends",
918                               1,
919                               GNUNET_NO);
920     if (GNUNET_OK !=
921         send_ibf (op,
922                   get_order_from_difference (diff)))
923     {
924       /* Internal error, best we can do is shut the connection */
925       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
926                   "Failed to send IBF, closing connection\n");
927       fail_union_operation (op);
928       return GNUNET_SYSERR;
929     }
930   }
931
932   return GNUNET_OK;
933 }
934
935
936 /**
937  * Iterator to send elements to a remote peer
938  *
939  * @param cls closure with the element key and the union operation
940  * @param key ignored
941  * @param value the key entry
942  */
943 static int
944 send_offers_iterator (void *cls,
945                       uint32_t key,
946                       void *value)
947 {
948   struct SendElementClosure *sec = cls;
949   struct Operation *op = sec->op;
950   struct KeyEntry *ke = value;
951   struct GNUNET_MQ_Envelope *ev;
952   struct GNUNET_MessageHeader *mh;
953
954   /* Detect 32-bit key collision for the 64-bit IBF keys. */
955   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
956     return GNUNET_YES;
957
958   ev = GNUNET_MQ_msg_header_extra (mh,
959                                    sizeof (struct GNUNET_HashCode),
960                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
961
962   GNUNET_assert (NULL != ev);
963   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
964   LOG (GNUNET_ERROR_TYPE_DEBUG,
965        "[OP %x] sending element offer (%s) to peer\n",
966        (void *) op,
967        GNUNET_h2s (&ke->element->element_hash));
968   GNUNET_MQ_send (op->mq, ev);
969   return GNUNET_YES;
970 }
971
972
973 /**
974  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
975  *
976  * @param op union operation
977  * @param ibf_key IBF key of interest
978  */
979 static void
980 send_offers_for_key (struct Operation *op,
981                      struct IBF_Key ibf_key)
982 {
983   struct SendElementClosure send_cls;
984
985   send_cls.ibf_key = ibf_key;
986   send_cls.op = op;
987   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
988                                                        (uint32_t) ibf_key.key_val,
989                                                        &send_offers_iterator,
990                                                        &send_cls);
991 }
992
993
994 /**
995  * Decode which elements are missing on each side, and
996  * send the appropriate offers and inquiries.
997  *
998  * @param op union operation
999  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1000  */
1001 static int
1002 decode_and_send (struct Operation *op)
1003 {
1004   struct IBF_Key key;
1005   struct IBF_Key last_key;
1006   int side;
1007   unsigned int num_decoded;
1008   struct InvertibleBloomFilter *diff_ibf;
1009
1010   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1011
1012   if (GNUNET_OK !=
1013       prepare_ibf (op, op->state->remote_ibf->size))
1014   {
1015     GNUNET_break (0);
1016     /* allocation failed */
1017     return GNUNET_SYSERR;
1018   }
1019   diff_ibf = ibf_dup (op->state->local_ibf);
1020   ibf_subtract (diff_ibf, op->state->remote_ibf);
1021
1022   ibf_destroy (op->state->remote_ibf);
1023   op->state->remote_ibf = NULL;
1024
1025   LOG (GNUNET_ERROR_TYPE_DEBUG,
1026        "decoding IBF (size=%u)\n",
1027        diff_ibf->size);
1028
1029   num_decoded = 0;
1030   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1031
1032   while (1)
1033   {
1034     int res;
1035     int cycle_detected = GNUNET_NO;
1036
1037     last_key = key;
1038
1039     res = ibf_decode (diff_ibf, &side, &key);
1040     if (res == GNUNET_OK)
1041     {
1042       LOG (GNUNET_ERROR_TYPE_DEBUG,
1043            "decoded ibf key %lx\n",
1044            (unsigned long) key.key_val);
1045       num_decoded += 1;
1046       if ( (num_decoded > diff_ibf->size) ||
1047            ( (num_decoded > 1) &&
1048              (last_key.key_val == key.key_val) ) )
1049       {
1050         LOG (GNUNET_ERROR_TYPE_DEBUG,
1051              "detected cyclic ibf (decoded %u/%u)\n",
1052              num_decoded,
1053              diff_ibf->size);
1054         cycle_detected = GNUNET_YES;
1055       }
1056     }
1057     if ( (GNUNET_SYSERR == res) ||
1058          (GNUNET_YES == cycle_detected) )
1059     {
1060       int next_order;
1061       next_order = 0;
1062       while (1<<next_order < diff_ibf->size)
1063         next_order++;
1064       next_order++;
1065       if (next_order <= MAX_IBF_ORDER)
1066       {
1067         LOG (GNUNET_ERROR_TYPE_DEBUG,
1068              "decoding failed, sending larger ibf (size %u)\n",
1069              1<<next_order);
1070         GNUNET_STATISTICS_update (_GSS_statistics,
1071                                   "# of IBF retries",
1072                                   1,
1073                                   GNUNET_NO);
1074         op->state->salt_send++;
1075         if (GNUNET_OK !=
1076             send_ibf (op, next_order))
1077         {
1078           /* Internal error, best we can do is shut the connection */
1079           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1080                       "Failed to send IBF, closing connection\n");
1081           fail_union_operation (op);
1082           ibf_destroy (diff_ibf);
1083           return GNUNET_SYSERR;
1084         }
1085       }
1086       else
1087       {
1088         GNUNET_STATISTICS_update (_GSS_statistics,
1089                                   "# of failed union operations (too large)",
1090                                   1,
1091                                   GNUNET_NO);
1092         // XXX: Send the whole set, element-by-element
1093         LOG (GNUNET_ERROR_TYPE_ERROR,
1094              "set union failed: reached ibf limit\n");
1095         fail_union_operation (op);
1096         ibf_destroy (diff_ibf);
1097         return GNUNET_SYSERR;
1098       }
1099       break;
1100     }
1101     if (GNUNET_NO == res)
1102     {
1103       struct GNUNET_MQ_Envelope *ev;
1104
1105       LOG (GNUNET_ERROR_TYPE_DEBUG,
1106            "transmitted all values, sending DONE\n");
1107       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1108       GNUNET_MQ_send (op->mq, ev);
1109       /* We now wait until we get a DONE message back
1110        * and then wait for our MQ to be flushed and all our
1111        * demands be delivered. */
1112       break;
1113     }
1114     if (1 == side)
1115     {
1116       struct IBF_Key unsalted_key;
1117       unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1118       send_offers_for_key (op, unsalted_key);
1119     }
1120     else if (-1 == side)
1121     {
1122       struct GNUNET_MQ_Envelope *ev;
1123       struct InquiryMessage *msg;
1124
1125       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1126        * the effort additional complexity. */
1127       ev = GNUNET_MQ_msg_extra (msg,
1128                                 sizeof (struct IBF_Key),
1129                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1130       msg->salt = htonl (op->state->salt_receive);
1131       GNUNET_memcpy (&msg[1],
1132               &key,
1133               sizeof (struct IBF_Key));
1134       LOG (GNUNET_ERROR_TYPE_DEBUG,
1135            "sending element inquiry for IBF key %lx\n",
1136            (unsigned long) key.key_val);
1137       GNUNET_MQ_send (op->mq, ev);
1138     }
1139     else
1140     {
1141       GNUNET_assert (0);
1142     }
1143   }
1144   ibf_destroy (diff_ibf);
1145   return GNUNET_OK;
1146 }
1147
1148
1149 /**
1150  * Handle an IBF message from a remote peer.
1151  *
1152  * Reassemble the IBF from multiple pieces, and
1153  * process the whole IBF once possible.
1154  *
1155  * @param cls the union operation
1156  * @param mh the header of the message
1157  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1158  *         #GNUNET_OK otherwise
1159  */
1160 static int
1161 handle_p2p_ibf (void *cls,
1162                 const struct GNUNET_MessageHeader *mh)
1163 {
1164   struct Operation *op = cls;
1165   const struct IBFMessage *msg;
1166   unsigned int buckets_in_message;
1167
1168   if (ntohs (mh->size) < sizeof (struct IBFMessage))
1169   {
1170     GNUNET_break_op (0);
1171     fail_union_operation (op);
1172     return GNUNET_SYSERR;
1173   }
1174   msg = (const struct IBFMessage *) mh;
1175   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1176        (op->state->phase == PHASE_EXPECT_IBF) )
1177   {
1178     op->state->phase = PHASE_EXPECT_IBF_CONT;
1179     GNUNET_assert (NULL == op->state->remote_ibf);
1180     LOG (GNUNET_ERROR_TYPE_DEBUG,
1181          "Creating new ibf of size %u\n",
1182          1 << msg->order);
1183     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1184     op->state->salt_receive = ntohl (msg->salt);
1185     LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1186     if (NULL == op->state->remote_ibf)
1187     {
1188       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1189                   "Failed to parse remote IBF, closing connection\n");
1190       fail_union_operation (op);
1191       return GNUNET_SYSERR;
1192     }
1193     op->state->ibf_buckets_received = 0;
1194     if (0 != ntohl (msg->offset))
1195     {
1196       GNUNET_break_op (0);
1197       fail_union_operation (op);
1198       return GNUNET_SYSERR;
1199     }
1200   }
1201   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1202   {
1203     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1204     {
1205       GNUNET_break_op (0);
1206       fail_union_operation (op);
1207       return GNUNET_SYSERR;
1208     }
1209     if (1<<msg->order != op->state->remote_ibf->size)
1210     {
1211       GNUNET_break_op (0);
1212       fail_union_operation (op);
1213       return GNUNET_SYSERR;
1214     }
1215     if (ntohl (msg->salt) != op->state->salt_receive)
1216     {
1217       GNUNET_break_op (0);
1218       fail_union_operation (op);
1219       return GNUNET_SYSERR;
1220     }
1221   }
1222   else
1223   {
1224     GNUNET_assert (0);
1225   }
1226
1227   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1228
1229   if (0 == buckets_in_message)
1230   {
1231     GNUNET_break_op (0);
1232     fail_union_operation (op);
1233     return GNUNET_SYSERR;
1234   }
1235
1236   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1237   {
1238     GNUNET_break_op (0);
1239     fail_union_operation (op);
1240     return GNUNET_SYSERR;
1241   }
1242
1243   GNUNET_assert (NULL != op->state->remote_ibf);
1244
1245   ibf_read_slice (&msg[1],
1246                   op->state->ibf_buckets_received,
1247                   buckets_in_message,
1248                   op->state->remote_ibf);
1249   op->state->ibf_buckets_received += buckets_in_message;
1250
1251   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1252   {
1253     LOG (GNUNET_ERROR_TYPE_DEBUG,
1254          "received full ibf\n");
1255     op->state->phase = PHASE_INVENTORY_ACTIVE;
1256     if (GNUNET_OK !=
1257         decode_and_send (op))
1258     {
1259       /* Internal error, best we can do is shut down */
1260       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1261                   "Failed to decode IBF, closing connection\n");
1262       return GNUNET_SYSERR;
1263     }
1264   }
1265   return GNUNET_OK;
1266 }
1267
1268
1269 /**
1270  * Send a result message to the client indicating
1271  * that there is a new element.
1272  *
1273  * @param op union operation
1274  * @param element element to send
1275  * @param status status to send with the new element
1276  */
1277 static void
1278 send_client_element (struct Operation *op,
1279                      struct GNUNET_SET_Element *element,
1280                      int status)
1281 {
1282   struct GNUNET_MQ_Envelope *ev;
1283   struct GNUNET_SET_ResultMessage *rm;
1284
1285   LOG (GNUNET_ERROR_TYPE_DEBUG,
1286        "sending element (size %u) to client\n",
1287        element->size);
1288   GNUNET_assert (0 != op->spec->client_request_id);
1289   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1290   if (NULL == ev)
1291   {
1292     GNUNET_MQ_discard (ev);
1293     GNUNET_break (0);
1294     return;
1295   }
1296   rm->result_status = htons (status);
1297   rm->request_id = htonl (op->spec->client_request_id);
1298   rm->element_type = htons (element->element_type);
1299   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1300   GNUNET_memcpy (&rm[1], element->data, element->size);
1301   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1302 }
1303
1304
1305 /**
1306  * Signal to the client that the operation has finished and
1307  * destroy the operation.
1308  *
1309  * @param cls operation to destroy
1310  */
1311 static void
1312 send_done_and_destroy (void *cls)
1313 {
1314   struct Operation *op = cls;
1315   struct GNUNET_MQ_Envelope *ev;
1316   struct GNUNET_SET_ResultMessage *rm;
1317
1318   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1319   rm->request_id = htonl (op->spec->client_request_id);
1320   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1321   rm->element_type = htons (0);
1322   rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1323   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1324   /* Will also call the union-specific cancel function. */
1325   _GSS_operation_destroy (op, GNUNET_YES);
1326 }
1327
1328
1329 static void
1330 maybe_finish (struct Operation *op)
1331 {
1332   unsigned int num_demanded;
1333
1334   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1335
1336   if (PHASE_FINISH_WAITING == op->state->phase)
1337   {
1338     LOG (GNUNET_ERROR_TYPE_DEBUG,
1339          "In PHASE_FINISH_WAITING, pending %u demands\n",
1340          num_demanded);
1341     if (0 == num_demanded)
1342     {
1343       struct GNUNET_MQ_Envelope *ev;
1344
1345       op->state->phase = PHASE_DONE;
1346       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1347       GNUNET_MQ_send (op->mq, ev);
1348
1349       /* We now wait until the other peer closes the channel
1350        * after it got all elements from us. */
1351     }
1352   }
1353   if (PHASE_FINISH_CLOSING == op->state->phase)
1354   {
1355     LOG (GNUNET_ERROR_TYPE_DEBUG,
1356          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1357          num_demanded);
1358     if (0 == num_demanded)
1359     {
1360       op->state->phase = PHASE_DONE;
1361       send_done_and_destroy (op);
1362     }
1363   }
1364 }
1365
1366
1367 /**
1368  * Handle an element message from a remote peer.
1369  * Sent by the other peer either because we decoded an IBF and placed a demand,
1370  * or because the other peer switched to full set transmission.
1371  *
1372  * @param cls the union operation
1373  * @param mh the message
1374  */
1375 static void
1376 handle_p2p_elements (void *cls,
1377                      const struct GNUNET_MessageHeader *mh)
1378 {
1379   struct Operation *op = cls;
1380   struct ElementEntry *ee;
1381   const struct GNUNET_SET_ElementMessage *emsg;
1382   uint16_t element_size;
1383
1384   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1385   {
1386     GNUNET_break_op (0);
1387     fail_union_operation (op);
1388     return;
1389   }
1390   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1391   {
1392     GNUNET_break_op (0);
1393     fail_union_operation (op);
1394     return;
1395   }
1396
1397   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1398
1399   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1400   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1401   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1402   ee->element.size = element_size;
1403   ee->element.data = &ee[1];
1404   ee->element.element_type = ntohs (emsg->element_type);
1405   ee->remote = GNUNET_YES;
1406   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1407
1408   if (GNUNET_NO ==
1409       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1410                                             &ee->element_hash,
1411                                             NULL))
1412   {
1413     /* We got something we didn't demand, since it's not in our map. */
1414     GNUNET_break_op (0);
1415     GNUNET_free (ee);
1416     fail_union_operation (op);
1417     return;
1418   }
1419
1420   LOG (GNUNET_ERROR_TYPE_DEBUG,
1421        "Got element (size %u, hash %s) from peer\n",
1422        (unsigned int) element_size,
1423        GNUNET_h2s (&ee->element_hash));
1424
1425   GNUNET_STATISTICS_update (_GSS_statistics,
1426                             "# received elements",
1427                             1,
1428                             GNUNET_NO);
1429   GNUNET_STATISTICS_update (_GSS_statistics,
1430                             "# exchanged elements",
1431                             1,
1432                             GNUNET_NO);
1433
1434   op->state->received_total += 1;
1435
1436   struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1437
1438   if (NULL != ke)
1439   {
1440     /* Got repeated element.  Should not happen since
1441      * we track demands. */
1442     GNUNET_STATISTICS_update (_GSS_statistics,
1443                               "# repeated elements",
1444                               1,
1445                               GNUNET_NO);
1446     ke->received = GNUNET_YES;
1447     GNUNET_free (ee);
1448   }
1449   else
1450   {
1451     LOG (GNUNET_ERROR_TYPE_DEBUG,
1452          "Registering new element from remote peer\n");
1453     op->state->received_fresh += 1;
1454     op_register_element (op, ee, GNUNET_YES);
1455     /* only send results immediately if the client wants it */
1456     switch (op->spec->result_mode)
1457     {
1458       case GNUNET_SET_RESULT_ADDED:
1459         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1460         break;
1461       case GNUNET_SET_RESULT_SYMMETRIC:
1462         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1463         break;
1464       default:
1465         /* Result mode not supported, should have been caught earlier. */
1466         GNUNET_break (0);
1467         break;
1468     }
1469   }
1470
1471   if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1472   {
1473     /* The other peer gave us lots of old elements, there's something wrong. */
1474     GNUNET_break_op (0);
1475     fail_union_operation (op);
1476     return;
1477   }
1478
1479   maybe_finish (op);
1480 }
1481
1482
1483 /**
1484  * Handle an element message from a remote peer.
1485  *
1486  * @param cls the union operation
1487  * @param mh the message
1488  */
1489 static void
1490 handle_p2p_full_element (void *cls,
1491                          const struct GNUNET_MessageHeader *mh)
1492 {
1493   struct Operation *op = cls;
1494   struct ElementEntry *ee;
1495   const struct GNUNET_SET_ElementMessage *emsg;
1496   uint16_t element_size;
1497
1498   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1499   {
1500     GNUNET_break_op (0);
1501     fail_union_operation (op);
1502     return;
1503   }
1504
1505   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1506
1507   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1508   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1509   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1510   ee->element.size = element_size;
1511   ee->element.data = &ee[1];
1512   ee->element.element_type = ntohs (emsg->element_type);
1513   ee->remote = GNUNET_YES;
1514   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1515
1516   LOG (GNUNET_ERROR_TYPE_DEBUG,
1517        "Got element (full diff, size %u, hash %s) from peer\n",
1518        (unsigned int) element_size,
1519        GNUNET_h2s (&ee->element_hash));
1520
1521   GNUNET_STATISTICS_update (_GSS_statistics,
1522                             "# received elements",
1523                             1,
1524                             GNUNET_NO);
1525   GNUNET_STATISTICS_update (_GSS_statistics,
1526                             "# exchanged elements",
1527                             1,
1528                             GNUNET_NO);
1529
1530   op->state->received_total += 1;
1531
1532   struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1533
1534   if (NULL != ke)
1535   {
1536     /* Got repeated element.  Should not happen since
1537      * we track demands. */
1538     GNUNET_STATISTICS_update (_GSS_statistics,
1539                               "# repeated elements",
1540                               1,
1541                               GNUNET_NO);
1542     ke->received = GNUNET_YES;
1543     GNUNET_free (ee);
1544   }
1545   else
1546   {
1547     LOG (GNUNET_ERROR_TYPE_DEBUG,
1548          "Registering new element from remote peer\n");
1549     op->state->received_fresh += 1;
1550     op_register_element (op, ee, GNUNET_YES);
1551     /* only send results immediately if the client wants it */
1552     switch (op->spec->result_mode)
1553     {
1554       case GNUNET_SET_RESULT_ADDED:
1555         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1556         break;
1557       case GNUNET_SET_RESULT_SYMMETRIC:
1558         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1559         break;
1560       default:
1561         /* Result mode not supported, should have been caught earlier. */
1562         GNUNET_break (0);
1563         break;
1564     }
1565   }
1566
1567   if ( (GNUNET_YES == op->spec->byzantine) && 
1568        (op->state->received_total > 128) && 
1569        (op->state->received_fresh < op->state->received_total / 3) )
1570   {
1571     /* The other peer gave us lots of old elements, there's something wrong. */
1572     LOG (GNUNET_ERROR_TYPE_ERROR,
1573          "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1574          (unsigned long long) op->state->received_fresh,
1575          (unsigned long long) op->state->received_total);
1576     GNUNET_break_op (0);
1577     fail_union_operation (op);
1578     return;
1579   }
1580 }
1581
1582 /**
1583  * Send offers (for GNUNET_Hash-es) in response
1584  * to inquiries (for IBF_Key-s).
1585  *
1586  * @param cls the union operation
1587  * @param mh the message
1588  */
1589 static void
1590 handle_p2p_inquiry (void *cls,
1591                     const struct GNUNET_MessageHeader *mh)
1592 {
1593   struct Operation *op = cls;
1594   const struct IBF_Key *ibf_key;
1595   unsigned int num_keys;
1596   struct InquiryMessage *msg;
1597
1598   /* look up elements and send them */
1599   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1600   {
1601     GNUNET_break_op (0);
1602     fail_union_operation (op);
1603     return;
1604   }
1605   num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1606       / sizeof (struct IBF_Key);
1607   if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1608       != num_keys * sizeof (struct IBF_Key))
1609   {
1610     GNUNET_break_op (0);
1611     fail_union_operation (op);
1612     return;
1613   }
1614
1615   msg = (struct InquiryMessage *) mh;
1616
1617   ibf_key = (const struct IBF_Key *) &msg[1];
1618   while (0 != num_keys--)
1619   {
1620     struct IBF_Key unsalted_key;
1621     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1622     send_offers_for_key (op, unsalted_key);
1623     ibf_key++;
1624   }
1625 }
1626
1627
1628 /**
1629  * Iterator over hash map entries, called to
1630  * destroy the linked list of colliding ibf key entries.
1631  *
1632  * @param cls closure
1633  * @param key current key code
1634  * @param value value in the hash map
1635  * @return #GNUNET_YES if we should continue to iterate,
1636  *         #GNUNET_NO if not.
1637  */
1638 static int
1639 send_missing_elements_iter (void *cls,
1640                             uint32_t key,
1641                             void *value)
1642 {
1643   struct Operation *op = cls;
1644   struct KeyEntry *ke = value;
1645   struct GNUNET_MQ_Envelope *ev;
1646   struct GNUNET_SET_ElementMessage *emsg;
1647   struct ElementEntry *ee = ke->element;
1648
1649   if (GNUNET_YES == ke->received)
1650     return GNUNET_YES;
1651
1652   ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1653   GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1654   emsg->reserved = htons (0);
1655   emsg->element_type = htons (ee->element.element_type);
1656   GNUNET_MQ_send (op->mq, ev);
1657
1658   return GNUNET_YES;
1659 }
1660
1661
1662 /**
1663  * Handle a 
1664  *
1665  * @parem cls closure, a set union operation
1666  * @param mh the demand message
1667  */
1668 static void
1669 handle_p2p_request_full (void *cls,
1670                          const struct GNUNET_MessageHeader *mh)
1671 {
1672   struct Operation *op = cls;
1673
1674   if (PHASE_EXPECT_IBF != op->state->phase)
1675   {
1676     fail_union_operation (op);
1677     GNUNET_break_op (0);
1678     return;
1679   }
1680
1681   // FIXME: we need to check that our set is larger than the
1682   // byzantine_lower_bound by some threshold
1683   send_full_set (op);
1684 }
1685
1686
1687 /**
1688  * Handle a "full done" message.
1689  *
1690  * @parem cls closure, a set union operation
1691  * @param mh the demand message
1692  */
1693 static void
1694 handle_p2p_full_done (void *cls,
1695                       const struct GNUNET_MessageHeader *mh)
1696 {
1697   struct Operation *op = cls;
1698
1699   if (PHASE_EXPECT_IBF == op->state->phase)
1700   {
1701     struct GNUNET_MQ_Envelope *ev;
1702
1703     LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1704
1705     /* send all the elements that did not come from the remote peer */
1706     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1707                                              &send_missing_elements_iter,
1708                                              op);
1709
1710     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1711     GNUNET_MQ_send (op->mq, ev);
1712     op->state->phase = PHASE_DONE;
1713
1714     /* we now wait until the other peer shuts the tunnel down*/
1715   }
1716   else if (PHASE_FULL_SENDING == op->state->phase)
1717   {
1718     LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1719     /* We sent the full set, and got the response for that.  We're done. */
1720     op->state->phase = PHASE_DONE;
1721     send_done_and_destroy (op);
1722   }
1723   else
1724   {
1725     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1726     GNUNET_break_op (0);
1727     fail_union_operation (op);
1728     return;
1729   }
1730 }
1731
1732
1733 /**
1734  * Handle a demand by the other peer for elements based on a list
1735  * of GNUNET_HashCode-s.
1736  *
1737  * @parem cls closure, a set union operation
1738  * @param mh the demand message
1739  */
1740 static void
1741 handle_p2p_demand (void *cls,
1742                    const struct GNUNET_MessageHeader *mh)
1743 {
1744   struct Operation *op = cls;
1745   struct ElementEntry *ee;
1746   struct GNUNET_SET_ElementMessage *emsg;
1747   const struct GNUNET_HashCode *hash;
1748   unsigned int num_hashes;
1749   struct GNUNET_MQ_Envelope *ev;
1750
1751   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1752     / sizeof (struct GNUNET_HashCode);
1753   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1754       != num_hashes * sizeof (struct GNUNET_HashCode))
1755   {
1756     GNUNET_break_op (0);
1757     fail_union_operation (op);
1758     return;
1759   }
1760
1761   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1762        num_hashes > 0;
1763        hash++, num_hashes--)
1764   {
1765     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1766     if (NULL == ee)
1767     {
1768       /* Demand for non-existing element. */
1769       GNUNET_break_op (0);
1770       fail_union_operation (op);
1771       return;
1772     }
1773     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1774     {
1775       /* Probably confused lazily copied sets. */
1776       GNUNET_break_op (0);
1777       fail_union_operation (op);
1778       return;
1779     }
1780     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1781     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1782     emsg->reserved = htons (0);
1783     emsg->element_type = htons (ee->element.element_type);
1784     LOG (GNUNET_ERROR_TYPE_DEBUG,
1785          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1786          (void *) op,
1787          (unsigned int) ee->element.size,
1788          GNUNET_h2s (&ee->element_hash));
1789     GNUNET_MQ_send (op->mq, ev);
1790     GNUNET_STATISTICS_update (_GSS_statistics,
1791                               "# exchanged elements",
1792                               1,
1793                               GNUNET_NO);
1794
1795     switch (op->spec->result_mode)
1796     {
1797       case GNUNET_SET_RESULT_ADDED:
1798         /* Nothing to do. */
1799         break;
1800       case GNUNET_SET_RESULT_SYMMETRIC:
1801         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1802         break;
1803       default:
1804         /* Result mode not supported, should have been caught earlier. */
1805         GNUNET_break (0);
1806         break;
1807     }
1808   }
1809 }
1810
1811
1812 /**
1813  * Handle offers (of GNUNET_HashCode-s) and
1814  * respond with demands (of GNUNET_HashCode-s).
1815  *
1816  * @param cls the union operation
1817  * @param mh the message
1818  */
1819 static void
1820 handle_p2p_offer (void *cls,
1821                     const struct GNUNET_MessageHeader *mh)
1822 {
1823   struct Operation *op = cls;
1824   const struct GNUNET_HashCode *hash;
1825   unsigned int num_hashes;
1826
1827   /* look up elements and send them */
1828   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1829        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1830   {
1831     GNUNET_break_op (0);
1832     fail_union_operation (op);
1833     return;
1834   }
1835   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1836     / sizeof (struct GNUNET_HashCode);
1837   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1838       != num_hashes * sizeof (struct GNUNET_HashCode))
1839   {
1840     GNUNET_break_op (0);
1841     fail_union_operation (op);
1842     return;
1843   }
1844
1845   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1846        num_hashes > 0;
1847        hash++, num_hashes--)
1848   {
1849     struct ElementEntry *ee;
1850     struct GNUNET_MessageHeader *demands;
1851     struct GNUNET_MQ_Envelope *ev;
1852
1853     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1854                                             hash);
1855     if (NULL != ee)
1856       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1857         continue;
1858
1859     if (GNUNET_YES ==
1860         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1861                                                 hash))
1862     {
1863       LOG (GNUNET_ERROR_TYPE_DEBUG,
1864            "Skipped sending duplicate demand\n");
1865       continue;
1866     }
1867
1868     GNUNET_assert (GNUNET_OK ==
1869                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1870                                                       hash,
1871                                                       NULL,
1872                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1873
1874     LOG (GNUNET_ERROR_TYPE_DEBUG,
1875          "[OP %x] Requesting element (hash %s)\n",
1876          (void *) op, GNUNET_h2s (hash));
1877     ev = GNUNET_MQ_msg_header_extra (demands,
1878                                      sizeof (struct GNUNET_HashCode),
1879                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1880     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1881     GNUNET_MQ_send (op->mq, ev);
1882   }
1883 }
1884
1885
1886 /**
1887  * Handle a done message from a remote peer
1888  *
1889  * @param cls the union operation
1890  * @param mh the message
1891  */
1892 static void
1893 handle_p2p_done (void *cls,
1894                  const struct GNUNET_MessageHeader *mh)
1895 {
1896   struct Operation *op = cls;
1897
1898   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1899   {
1900     /* We got all requests, but still have to send our elements in response. */
1901
1902     op->state->phase = PHASE_FINISH_WAITING;
1903
1904     LOG (GNUNET_ERROR_TYPE_DEBUG,
1905          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1906     /* The active peer is done sending offers
1907      * and inquiries.  This means that all
1908      * our responses to that (demands and offers)
1909      * must be in flight (queued or in mesh).
1910      *
1911      * We should notify the active peer once
1912      * all our demands are satisfied, so that the active
1913      * peer can quit if we gave him everything.
1914      */
1915     maybe_finish (op);
1916     return;
1917   }
1918   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1919   {
1920     LOG (GNUNET_ERROR_TYPE_DEBUG,
1921          "got DONE (as active partner), waiting to finish\n");
1922     /* All demands of the other peer are satisfied,
1923      * and we processed all offers, thus we know
1924      * exactly what our demands must be.
1925      *
1926      * We'll close the channel
1927      * to the other peer once our demands are met.
1928      */
1929     op->state->phase = PHASE_FINISH_CLOSING;
1930     maybe_finish (op);
1931     return;
1932   }
1933   GNUNET_break_op (0);
1934   fail_union_operation (op);
1935 }
1936
1937
1938 /**
1939  * Initiate operation to evaluate a set union with a remote peer.
1940  *
1941  * @param op operation to perform (to be initialized)
1942  * @param opaque_context message to be transmitted to the listener
1943  *        to convince him to accept, may be NULL
1944  */
1945 static void
1946 union_evaluate (struct Operation *op,
1947                 const struct GNUNET_MessageHeader *opaque_context)
1948 {
1949   struct GNUNET_MQ_Envelope *ev;
1950   struct OperationRequestMessage *msg;
1951
1952   GNUNET_assert (NULL == op->state);
1953   op->state = GNUNET_new (struct OperationState);
1954   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1955   /* copy the current generation's strata estimator for this operation */
1956   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1957   /* we started the operation, thus we have to send the operation request */
1958   op->state->phase = PHASE_EXPECT_SE;
1959   op->state->salt_receive = op->state->salt_send = 42;
1960   LOG (GNUNET_ERROR_TYPE_DEBUG,
1961        "Initiating union operation evaluation\n");
1962   GNUNET_STATISTICS_update (_GSS_statistics,
1963                             "# of total union operations",
1964                             1,
1965                             GNUNET_NO);
1966   GNUNET_STATISTICS_update (_GSS_statistics,
1967                             "# of initiated union operations",
1968                             1,
1969                             GNUNET_NO);
1970   ev = GNUNET_MQ_msg_nested_mh (msg,
1971                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1972                                 opaque_context);
1973   if (NULL == ev)
1974   {
1975     /* the context message is too large */
1976     GNUNET_break (0);
1977     GNUNET_SERVICE_client_drop (op->spec->set->client);
1978     return;
1979   }
1980   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1981   GNUNET_MQ_send (op->mq,
1982                   ev);
1983
1984   if (NULL != opaque_context)
1985     LOG (GNUNET_ERROR_TYPE_DEBUG,
1986          "sent op request with context message\n");
1987   else
1988     LOG (GNUNET_ERROR_TYPE_DEBUG,
1989          "sent op request without context message\n");
1990
1991   initialize_key_to_element (op);
1992   op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
1993 }
1994
1995
1996 /**
1997  * Accept an union operation request from a remote peer.
1998  * Only initializes the private operation state.
1999  *
2000  * @param op operation that will be accepted as a union operation
2001  */
2002 static void
2003 union_accept (struct Operation *op)
2004 {
2005   LOG (GNUNET_ERROR_TYPE_DEBUG,
2006        "accepting set union operation\n");
2007   GNUNET_assert (NULL == op->state);
2008
2009   GNUNET_STATISTICS_update (_GSS_statistics,
2010                             "# of accepted union operations",
2011                             1,
2012                             GNUNET_NO);
2013   GNUNET_STATISTICS_update (_GSS_statistics,
2014                             "# of total union operations",
2015                             1,
2016                             GNUNET_NO);
2017
2018   op->state = GNUNET_new (struct OperationState);
2019   op->state->se = strata_estimator_dup (op->spec->set->state->se);
2020   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2021   op->state->salt_receive = op->state->salt_send = 42;
2022   initialize_key_to_element (op);
2023   op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2024   /* kick off the operation */
2025   send_strata_estimator (op);
2026 }
2027
2028
2029 /**
2030  * Create a new set supporting the union operation
2031  *
2032  * We maintain one strata estimator per set and then manipulate it over the
2033  * lifetime of the set, as recreating a strata estimator would be expensive.
2034  *
2035  * @return the newly created set, NULL on error
2036  */
2037 static struct SetState *
2038 union_set_create (void)
2039 {
2040   struct SetState *set_state;
2041
2042   LOG (GNUNET_ERROR_TYPE_DEBUG,
2043        "union set created\n");
2044   set_state = GNUNET_new (struct SetState);
2045   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2046                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2047   if (NULL == set_state->se)
2048   {
2049     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2050                 "Failed to allocate strata estimator\n");
2051     GNUNET_free (set_state);
2052     return NULL;
2053   }
2054   return set_state;
2055 }
2056
2057
2058 /**
2059  * Add the element from the given element message to the set.
2060  *
2061  * @param set_state state of the set want to add to
2062  * @param ee the element to add to the set
2063  */
2064 static void
2065 union_add (struct SetState *set_state, struct ElementEntry *ee)
2066 {
2067   strata_estimator_insert (set_state->se,
2068                            get_ibf_key (&ee->element_hash));
2069 }
2070
2071
2072 /**
2073  * Remove the element given in the element message from the set.
2074  * Only marks the element as removed, so that older set operations can still exchange it.
2075  *
2076  * @param set_state state of the set to remove from
2077  * @param ee set element to remove
2078  */
2079 static void
2080 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2081 {
2082   strata_estimator_remove (set_state->se,
2083                            get_ibf_key (&ee->element_hash));
2084 }
2085
2086
2087 /**
2088  * Destroy a set that supports the union operation.
2089  *
2090  * @param set_state the set to destroy
2091  */
2092 static void
2093 union_set_destroy (struct SetState *set_state)
2094 {
2095   if (NULL != set_state->se)
2096   {
2097     strata_estimator_destroy (set_state->se);
2098     set_state->se = NULL;
2099   }
2100   GNUNET_free (set_state);
2101 }
2102
2103
2104 /**
2105  * Dispatch messages for a union operation.
2106  *
2107  * @param op the state of the union evaluate operation
2108  * @param mh the received message
2109  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2110  *         #GNUNET_OK otherwise
2111  */
2112 int
2113 union_handle_p2p_message (struct Operation *op,
2114                           const struct GNUNET_MessageHeader *mh)
2115 {
2116   //LOG (GNUNET_ERROR_TYPE_DEBUG,
2117   //            "received p2p message (t: %u, s: %u)\n",
2118   //            ntohs (mh->type),
2119   //            ntohs (mh->size));
2120   switch (ntohs (mh->type))
2121   {
2122     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2123       return handle_p2p_ibf (op, mh);
2124     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2125       return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2126     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2127       return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2128     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2129       handle_p2p_elements (op, mh);
2130       break;
2131     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2132       handle_p2p_full_element (op, mh);
2133       break;
2134     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2135       handle_p2p_inquiry (op, mh);
2136       break;
2137     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2138       handle_p2p_done (op, mh);
2139       break;
2140     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2141       handle_p2p_offer (op, mh);
2142       break;
2143     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2144       handle_p2p_demand (op, mh);
2145       break;
2146     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2147       handle_p2p_full_done (op, mh);
2148       break;
2149     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2150       handle_p2p_request_full (op, mh);
2151       break;
2152     default:
2153       /* Something wrong with cadet's message handlers? */
2154       GNUNET_assert (0);
2155   }
2156   return GNUNET_OK;
2157 }
2158
2159
2160 /**
2161  * Handler for peer-disconnects, notifies the client
2162  * about the aborted operation in case the op was not concluded.
2163  *
2164  * @param op the destroyed operation
2165  */
2166 static void
2167 union_peer_disconnect (struct Operation *op)
2168 {
2169   if (PHASE_DONE != op->state->phase)
2170   {
2171     struct GNUNET_MQ_Envelope *ev;
2172     struct GNUNET_SET_ResultMessage *msg;
2173
2174     ev = GNUNET_MQ_msg (msg,
2175                         GNUNET_MESSAGE_TYPE_SET_RESULT);
2176     msg->request_id = htonl (op->spec->client_request_id);
2177     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2178     msg->element_type = htons (0);
2179     GNUNET_MQ_send (op->spec->set->client_mq,
2180                     ev);
2181     LOG (GNUNET_ERROR_TYPE_WARNING,
2182          "other peer disconnected prematurely, phase %u\n",
2183          op->state->phase);
2184     _GSS_operation_destroy (op,
2185                             GNUNET_YES);
2186     return;
2187   }
2188   // else: the session has already been concluded
2189   LOG (GNUNET_ERROR_TYPE_DEBUG,
2190        "other peer disconnected (finished)\n");
2191   if (GNUNET_NO == op->state->client_done_sent)
2192     send_done_and_destroy (op);
2193 }
2194
2195
2196 /**
2197  * Copy union-specific set state.
2198  *
2199  * @param set source set for copying the union state
2200  * @return a copy of the union-specific set state
2201  */
2202 static struct SetState *
2203 union_copy_state (struct Set *set)
2204 {
2205   struct SetState *new_state;
2206
2207   new_state = GNUNET_new (struct SetState);
2208   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2209   new_state->se = strata_estimator_dup (set->state->se);
2210
2211   return new_state;
2212 }
2213
2214
2215 /**
2216  * Get the table with implementing functions for
2217  * set union.
2218  *
2219  * @return the operation specific VTable
2220  */
2221 const struct SetVT *
2222 _GSS_union_vt ()
2223 {
2224   static const struct SetVT union_vt = {
2225     .create = &union_set_create,
2226     .msg_handler = &union_handle_p2p_message,
2227     .add = &union_add,
2228     .remove = &union_remove,
2229     .destroy_set = &union_set_destroy,
2230     .evaluate = &union_evaluate,
2231     .accept = &union_accept,
2232     .peer_disconnect = &union_peer_disconnect,
2233     .cancel = &union_op_cancel,
2234     .copy_state = &union_copy_state,
2235   };
2236
2237   return &union_vt;
2238 }