fix state when requesting full IBF
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2013-2016 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file set/gnunet-service-set_union.c
22
23  * @brief two-peer set operations
24  * @author Florian Dold
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
33 #include <gcrypt.h>
34
35
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
37
38
39 /**
40  * Number of IBFs in a strata estimator.
41  */
42 #define SE_STRATA_COUNT 32
43
44 /**
45  * Size of the IBFs in the strata estimator.
46  */
47 #define SE_IBF_SIZE 80
48
49 /**
50  * The hash num parameter for the difference digests and strata estimators.
51  */
52 #define SE_IBF_HASH_NUM 4
53
54 /**
55  * Number of buckets that can be transmitted in one message.
56  */
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
58
59 /**
60  * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61  * Choose this value so that computing the IBF is still cheaper
62  * than transmitting all values.
63  */
64 #define MAX_IBF_ORDER (20)
65
66 /**
67  * Number of buckets used in the ibf per estimated
68  * difference.
69  */
70 #define IBF_ALPHA 4
71
72
73 /**
74  * Current phase we are in for a union operation.
75  */
76 enum UnionOperationPhase
77 {
78   /**
79    * We sent the request message, and expect a strata estimator.
80    */
81   PHASE_EXPECT_SE,
82
83   /**
84    * We sent the strata estimator, and expect an IBF. This phase is entered once
85    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86    *
87    * XXX: could use better wording.
88    * XXX: repurposed to also expect a "request full set" message, should be renamed
89    *
90    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
91    */
92   PHASE_EXPECT_IBF,
93
94   /**
95    * Continuation for multi part IBFs.
96    */
97   PHASE_EXPECT_IBF_CONT,
98
99   /**
100    * We are decoding an IBF.
101    */
102   PHASE_INVENTORY_ACTIVE,
103
104   /**
105    * The other peer is decoding the IBF we just sent.
106    */
107   PHASE_INVENTORY_PASSIVE,
108
109   /**
110    * The protocol is almost finished, but we still have to flush our message
111    * queue and/or expect some elements.
112    */
113   PHASE_FINISH_CLOSING,
114
115   /**
116    * In the penultimate phase,
117    * we wait until all our demands
118    * are satisfied.  Then we send a done
119    * message, and wait for another done message.
120    */
121   PHASE_FINISH_WAITING,
122
123   /**
124    * In the ultimate phase, we wait until
125    * our demands are satisfied and then
126    * quit (sending another DONE message).
127    */
128   PHASE_DONE,
129
130   /**
131    * After sending the full set, wait for responses with the elements
132    * that the local peer is missing.
133    */
134   PHASE_FULL_SENDING,
135 };
136
137
138 /**
139  * State of an evaluate operation with another peer.
140  */
141 struct OperationState
142 {
143   /**
144    * Copy of the set's strata estimator at the time of
145    * creation of this operation.
146    */
147   struct StrataEstimator *se;
148
149   /**
150    * The IBF we currently receive.
151    */
152   struct InvertibleBloomFilter *remote_ibf;
153
154   /**
155    * The IBF with the local set's element.
156    */
157   struct InvertibleBloomFilter *local_ibf;
158
159   /**
160    * Maps unsalted IBF-Keys to elements.
161    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
162    * Colliding IBF-Keys are linked.
163    */
164   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
165
166   /**
167    * Current state of the operation.
168    */
169   enum UnionOperationPhase phase;
170
171   /**
172    * Did we send the client that we are done?
173    */
174   int client_done_sent;
175
176   /**
177    * Number of ibf buckets already received into the @a remote_ibf.
178    */
179   unsigned int ibf_buckets_received;
180
181   /**
182    * Hashes for elements that we have demanded from the other peer.
183    */
184   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
185
186   /**
187    * Salt that we're using for sending IBFs
188    */
189   uint32_t salt_send;
190
191   /**
192    * Salt for the IBF we've received and that we're currently decoding.
193    */
194   uint32_t salt_receive;
195
196   /**
197    * Number of elements we received from the other peer
198    * that were not in the local set yet.
199    */
200   uint32_t received_fresh;
201
202   /**
203    * Total number of elements received from the other peer.
204    */
205   uint32_t received_total;
206
207   /**
208    * Initial size of our set, just before
209    * the operation started.
210    */
211   uint64_t initial_size;
212 };
213
214
215 /**
216  * The key entry is used to associate an ibf key with an element.
217  */
218 struct KeyEntry
219 {
220   /**
221    * IBF key for the entry, derived from the current salt.
222    */
223   struct IBF_Key ibf_key;
224
225   /**
226    * The actual element associated with the key.
227    *
228    * Only owned by the union operation if element->operation
229    * is #GNUNET_YES.
230    */
231   struct ElementEntry *element;
232
233   /**
234    * Did we receive this element?
235    * Even if element->is_foreign is false, we might
236    * have received the element, so this indicates that
237    * the other peer has it.
238    */
239   int received;
240 };
241
242
243 /**
244  * Used as a closure for sending elements
245  * with a specific IBF key.
246  */
247 struct SendElementClosure
248 {
249   /**
250    * The IBF key whose matching elements should be
251    * sent.
252    */
253   struct IBF_Key ibf_key;
254
255   /**
256    * Operation for which the elements
257    * should be sent.
258    */
259   struct Operation *op;
260 };
261
262
263 /**
264  * Extra state required for efficient set union.
265  */
266 struct SetState
267 {
268   /**
269    * The strata estimator is only generated once for
270    * each set.
271    * The IBF keys are derived from the element hashes with
272    * salt=0.
273    */
274   struct StrataEstimator *se;
275 };
276
277
278 /**
279  * Iterator over hash map entries, called to
280  * destroy the linked list of colliding ibf key entries.
281  *
282  * @param cls closure
283  * @param key current key code
284  * @param value value in the hash map
285  * @return #GNUNET_YES if we should continue to iterate,
286  *         #GNUNET_NO if not.
287  */
288 static int
289 destroy_key_to_element_iter (void *cls,
290                              uint32_t key,
291                              void *value)
292 {
293   struct KeyEntry *k = value;
294
295   GNUNET_assert (NULL != k);
296   if (GNUNET_YES == k->element->remote)
297   {
298     GNUNET_free (k->element);
299     k->element = NULL;
300   }
301   GNUNET_free (k);
302   return GNUNET_YES;
303 }
304
305
306 /**
307  * Destroy the union operation.  Only things specific to the union
308  * operation are destroyed.
309  *
310  * @param op union operation to destroy
311  */
312 static void
313 union_op_cancel (struct Operation *op)
314 {
315   LOG (GNUNET_ERROR_TYPE_DEBUG,
316        "destroying union op\n");
317   /* check if the op was canceled twice */
318   GNUNET_assert (NULL != op->state);
319   if (NULL != op->state->remote_ibf)
320   {
321     ibf_destroy (op->state->remote_ibf);
322     op->state->remote_ibf = NULL;
323   }
324   if (NULL != op->state->demanded_hashes)
325   {
326     GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
327     op->state->demanded_hashes = NULL;
328   }
329   if (NULL != op->state->local_ibf)
330   {
331     ibf_destroy (op->state->local_ibf);
332     op->state->local_ibf = NULL;
333   }
334   if (NULL != op->state->se)
335   {
336     strata_estimator_destroy (op->state->se);
337     op->state->se = NULL;
338   }
339   if (NULL != op->state->key_to_element)
340   {
341     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
342                                              &destroy_key_to_element_iter,
343                                              NULL);
344     GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
345     op->state->key_to_element = NULL;
346   }
347   GNUNET_free (op->state);
348   op->state = NULL;
349   LOG (GNUNET_ERROR_TYPE_DEBUG,
350        "destroying union op done\n");
351 }
352
353
354 /**
355  * Inform the client that the union operation has failed,
356  * and proceed to destroy the evaluate operation.
357  *
358  * @param op the union operation to fail
359  */
360 static void
361 fail_union_operation (struct Operation *op)
362 {
363   struct GNUNET_MQ_Envelope *ev;
364   struct GNUNET_SET_ResultMessage *msg;
365
366   LOG (GNUNET_ERROR_TYPE_ERROR,
367        "union operation failed\n");
368   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
369   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
370   msg->request_id = htonl (op->spec->client_request_id);
371   msg->element_type = htons (0);
372   GNUNET_MQ_send (op->spec->set->client_mq, ev);
373   _GSS_operation_destroy (op, GNUNET_YES);
374 }
375
376
377 /**
378  * Derive the IBF key from a hash code and
379  * a salt.
380  *
381  * @param src the hash code
382  * @return the derived IBF key
383  */
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
386 {
387   struct IBF_Key key;
388   uint16_t salt = 0;
389
390   GNUNET_CRYPTO_kdf (&key, sizeof (key),
391                      src, sizeof *src,
392                      &salt, sizeof (salt),
393                      NULL, 0);
394   return key;
395 }
396
397
398 /**
399  * Context for #op_get_element_iterator
400  */
401 struct GetElementContext
402 {
403   struct GNUNET_HashCode hash;
404   struct KeyEntry *k;
405 };
406
407
408 /**
409  * Iterator over the mapping from IBF keys to element entries.  Checks if we
410  * have an element with a given GNUNET_HashCode.
411  *
412  * @param cls closure
413  * @param key current key code
414  * @param value value in the hash map
415  * @return #GNUNET_YES if we should search further,
416  *         #GNUNET_NO if we've found the element.
417  */
418 static int
419 op_get_element_iterator (void *cls,
420                          uint32_t key,
421                          void *value)
422 {
423   struct GetElementContext *ctx = cls;
424   struct KeyEntry *k = value;
425
426   GNUNET_assert (NULL != k);
427   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
428                                    &ctx->hash))
429   {
430     ctx->k = k;
431     return GNUNET_NO;
432   }
433   return GNUNET_YES;
434 }
435
436
437 /**
438  * Determine whether the given element is already in the operation's element
439  * set.
440  *
441  * @param op operation that should be tested for 'element_hash'
442  * @param element_hash hash of the element to look for
443  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
444  */
445 static struct KeyEntry *
446 op_get_element (struct Operation *op,
447                 const struct GNUNET_HashCode *element_hash)
448 {
449   int ret;
450   struct IBF_Key ibf_key;
451   struct GetElementContext ctx = {{{ 0 }} , 0};
452
453   ctx.hash = *element_hash;
454
455   ibf_key = get_ibf_key (element_hash);
456   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
457                                                       (uint32_t) ibf_key.key_val,
458                                                       op_get_element_iterator,
459                                                       &ctx);
460
461   /* was the iteration aborted because we found the element? */
462   if (GNUNET_SYSERR == ret)
463   {
464     GNUNET_assert (NULL != ctx.k);
465     return ctx.k;
466   }
467   return NULL;
468 }
469
470
471 /**
472  * Insert an element into the union operation's
473  * key-to-element mapping. Takes ownership of 'ee'.
474  * Note that this does not insert the element in the set,
475  * only in the operation's key-element mapping.
476  * This is done to speed up re-tried operations, if some elements
477  * were transmitted, and then the IBF fails to decode.
478  *
479  * XXX: clarify ownership, doesn't sound right.
480  *
481  * @param op the union operation
482  * @param ee the element entry
483  * @parem received was this element received from the remote peer?
484  */
485 static void
486 op_register_element (struct Operation *op,
487                      struct ElementEntry *ee,
488                      int received)
489 {
490   struct IBF_Key ibf_key;
491   struct KeyEntry *k;
492
493   ibf_key = get_ibf_key (&ee->element_hash);
494   k = GNUNET_new (struct KeyEntry);
495   k->element = ee;
496   k->ibf_key = ibf_key;
497   k->received = received;
498   GNUNET_assert (GNUNET_OK ==
499                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
500                                                       (uint32_t) ibf_key.key_val,
501                                                       k,
502                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
503 }
504
505
506 static void
507 salt_key (const struct IBF_Key *k_in,
508           uint32_t salt,
509           struct IBF_Key *k_out)
510 {
511   int s = salt % 64;
512   uint64_t x = k_in->key_val;
513   /* rotate ibf key */
514   x = (x >> s) | (x << (64 - s));
515   k_out->key_val = x;
516 }
517
518
519 static void
520 unsalt_key (const struct IBF_Key *k_in,
521             uint32_t salt,
522             struct IBF_Key *k_out)
523 {
524   int s = salt % 64;
525   uint64_t x = k_in->key_val;
526   x = (x << s) | (x >> (64 - s));
527   k_out->key_val = x;
528 }
529
530
531 /**
532  * Insert a key into an ibf.
533  *
534  * @param cls the ibf
535  * @param key unused
536  * @param value the key entry to get the key from
537  */
538 static int
539 prepare_ibf_iterator (void *cls,
540                       uint32_t key,
541                       void *value)
542 {
543   struct Operation *op = cls;
544   struct KeyEntry *ke = value;
545   struct IBF_Key salted_key;
546
547   LOG (GNUNET_ERROR_TYPE_DEBUG,
548        "[OP %x] inserting %lx (hash %s) into ibf\n",
549        (void *) op,
550        (unsigned long) ke->ibf_key.key_val,
551        GNUNET_h2s (&ke->element->element_hash));
552   salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
553   ibf_insert (op->state->local_ibf, salted_key);
554   return GNUNET_YES;
555 }
556
557
558 /**
559  * Iterator for initializing the
560  * key-to-element mapping of a union operation
561  *
562  * @param cls the union operation `struct Operation *`
563  * @param key unused
564  * @param value the `struct ElementEntry *` to insert
565  *        into the key-to-element mapping
566  * @return #GNUNET_YES (to continue iterating)
567  */
568 static int
569 init_key_to_element_iterator (void *cls,
570                               const struct GNUNET_HashCode *key,
571                               void *value)
572 {
573   struct Operation *op = cls;
574   struct ElementEntry *ee = value;
575
576   /* make sure that the element belongs to the set at the time
577    * of creating the operation */
578   if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
579     return GNUNET_YES;
580
581   GNUNET_assert (GNUNET_NO == ee->remote);
582
583   op_register_element (op, ee, GNUNET_NO);
584   return GNUNET_YES;
585 }
586
587
588 /**
589  * Initialize the IBF key to element mapping local to this set
590  * operation.
591  *
592  * @param op the set union operation
593  */
594 static void
595 initialize_key_to_element (struct Operation *op)
596 {
597   unsigned int len;
598
599   GNUNET_assert (NULL == op->state->key_to_element);
600   len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601   op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602   GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
603 }
604
605
606 /**
607  * Create an ibf with the operation's elements
608  * of the specified size
609  *
610  * @param op the union operation
611  * @param size size of the ibf to create
612  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
613  */
614 static int
615 prepare_ibf (struct Operation *op,
616              uint32_t size)
617 {
618   GNUNET_assert (NULL != op->state->key_to_element);
619
620   if (NULL != op->state->local_ibf)
621     ibf_destroy (op->state->local_ibf);
622   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
623   if (NULL == op->state->local_ibf)
624   {
625     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
626                 "Failed to allocate local IBF\n");
627     return GNUNET_SYSERR;
628   }
629   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
630                                            &prepare_ibf_iterator,
631                                            op);
632   return GNUNET_OK;
633 }
634
635
636 /**
637  * Send an ibf of appropriate size.
638  *
639  * Fragments the IBF into multiple messages if necessary.
640  *
641  * @param op the union operation
642  * @param ibf_order order of the ibf to send, size=2^order
643  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
644  */
645 static int
646 send_ibf (struct Operation *op,
647           uint16_t ibf_order)
648 {
649   unsigned int buckets_sent = 0;
650   struct InvertibleBloomFilter *ibf;
651
652   if (GNUNET_OK !=
653       prepare_ibf (op, 1<<ibf_order))
654   {
655     /* allocation failed */
656     return GNUNET_SYSERR;
657   }
658
659   LOG (GNUNET_ERROR_TYPE_DEBUG,
660        "sending ibf of size %u\n",
661        1<<ibf_order);
662
663   {
664     char name[64] = { 0 };
665     snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
666     GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
667   }
668
669   ibf = op->state->local_ibf;
670
671   while (buckets_sent < (1 << ibf_order))
672   {
673     unsigned int buckets_in_message;
674     struct GNUNET_MQ_Envelope *ev;
675     struct IBFMessage *msg;
676
677     buckets_in_message = (1 << ibf_order) - buckets_sent;
678     /* limit to maximum */
679     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
680       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
681
682     ev = GNUNET_MQ_msg_extra (msg,
683                               buckets_in_message * IBF_BUCKET_SIZE,
684                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
685     msg->reserved1 = 0;
686     msg->reserved2 = 0;
687     msg->order = ibf_order;
688     msg->offset = htonl (buckets_sent);
689     msg->salt = htonl (op->state->salt_send);
690     ibf_write_slice (ibf, buckets_sent,
691                      buckets_in_message, &msg[1]);
692     buckets_sent += buckets_in_message;
693     LOG (GNUNET_ERROR_TYPE_DEBUG,
694          "ibf chunk size %u, %u/%u sent\n",
695          buckets_in_message,
696          buckets_sent,
697          1<<ibf_order);
698     GNUNET_MQ_send (op->mq, ev);
699   }
700
701   /* The other peer must decode the IBF, so
702    * we're passive. */
703   op->state->phase = PHASE_INVENTORY_PASSIVE;
704   return GNUNET_OK;
705 }
706
707
708 /**
709  * Send a strata estimator to the remote peer.
710  *
711  * @param op the union operation with the remote peer
712  */
713 static void
714 send_strata_estimator (struct Operation *op)
715 {
716   const struct StrataEstimator *se = op->state->se;
717   struct GNUNET_MQ_Envelope *ev;
718   struct StrataEstimatorMessage *strata_msg;
719   char *buf;
720   size_t len;
721   uint16_t type;
722
723   buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
724   len = strata_estimator_write (op->state->se,
725                                 buf);
726   if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
727     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
728   else
729     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
730   ev = GNUNET_MQ_msg_extra (strata_msg,
731                             len,
732                             type);
733   GNUNET_memcpy (&strata_msg[1],
734           buf,
735           len);
736   GNUNET_free (buf);
737   strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
738   GNUNET_MQ_send (op->mq,
739                   ev);
740   op->state->phase = PHASE_EXPECT_IBF;
741   LOG (GNUNET_ERROR_TYPE_DEBUG,
742        "sent SE, expecting IBF\n");
743 }
744
745
746 /**
747  * Compute the necessary order of an ibf
748  * from the size of the symmetric set difference.
749  *
750  * @param diff the difference
751  * @return the required size of the ibf
752  */
753 static unsigned int
754 get_order_from_difference (unsigned int diff)
755 {
756   unsigned int ibf_order;
757
758   ibf_order = 2;
759   while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
760           ((1<<ibf_order) < SE_IBF_HASH_NUM) )
761     ibf_order++;
762   if (ibf_order > MAX_IBF_ORDER)
763     ibf_order = MAX_IBF_ORDER;
764   return ibf_order;
765 }
766
767
768 /**
769  * Send a set element.
770  *
771  * @param cls the union operation `struct Operation *`
772  * @param key unused
773  * @param value the `struct ElementEntry *` to insert
774  *        into the key-to-element mapping
775  * @return #GNUNET_YES (to continue iterating)
776  */
777 static int
778 send_element_iterator (void *cls,
779                        const struct GNUNET_HashCode *key,
780                        void *value)
781 {
782   struct Operation *op = cls;
783   struct GNUNET_SET_ElementMessage *emsg;
784   struct GNUNET_SET_Element *el = value;
785   struct GNUNET_MQ_Envelope *ev;
786
787   ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
788   emsg->element_type = htonl (el->element_type);
789   GNUNET_memcpy (&emsg[1], el->data, el->size);
790   GNUNET_MQ_send (op->mq, ev);
791   return GNUNET_YES;
792 }
793
794
795 static void
796 send_full_set (struct Operation *op)
797 {
798   struct GNUNET_MQ_Envelope *ev;
799
800   op->state->phase = PHASE_FULL_SENDING;
801
802   (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
803                                                 &send_element_iterator, op);
804   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
805   GNUNET_MQ_send (op->mq, ev);
806 }
807
808
809 /**
810  * Handle a strata estimator from a remote peer
811  *
812  * @param cls the union operation
813  * @param mh the message
814  * @param is_compressed #GNUNET_YES if the estimator is compressed
815  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
816  *         #GNUNET_OK otherwise
817  */
818 static int
819 handle_p2p_strata_estimator (void *cls,
820                              const struct GNUNET_MessageHeader *mh,
821                              int is_compressed)
822 {
823   struct Operation *op = cls;
824   struct StrataEstimator *remote_se;
825   struct StrataEstimatorMessage *msg = (void *) mh;
826   unsigned int diff;
827   uint64_t other_size;
828   size_t len;
829
830   GNUNET_STATISTICS_update (_GSS_statistics,
831                             "# bytes of SE received",
832                             ntohs (mh->size),
833                             GNUNET_NO);
834
835   if (op->state->phase != PHASE_EXPECT_SE)
836   {
837     GNUNET_break (0);
838     fail_union_operation (op);
839     return GNUNET_SYSERR;
840   }
841   len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
842   if ( (GNUNET_NO == is_compressed) &&
843        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
844   {
845     fail_union_operation (op);
846     GNUNET_break (0);
847     return GNUNET_SYSERR;
848   }
849   other_size = GNUNET_ntohll (msg->set_size);
850   remote_se = strata_estimator_create (SE_STRATA_COUNT,
851                                        SE_IBF_SIZE,
852                                        SE_IBF_HASH_NUM);
853   if (NULL == remote_se)
854   {
855     /* insufficient resources, fail */
856     fail_union_operation (op);
857     return GNUNET_SYSERR;
858   }
859   if (GNUNET_OK !=
860       strata_estimator_read (&msg[1],
861                              len,
862                              is_compressed,
863                              remote_se))
864   {
865     /* decompression failed */
866     fail_union_operation (op);
867     strata_estimator_destroy (remote_se);
868     return GNUNET_SYSERR;
869   }
870   GNUNET_assert (NULL != op->state->se);
871   diff = strata_estimator_difference (remote_se,
872                                       op->state->se);
873   strata_estimator_destroy (remote_se);
874   strata_estimator_destroy (op->state->se);
875   op->state->se = NULL;
876   LOG (GNUNET_ERROR_TYPE_DEBUG,
877        "got se diff=%d, using ibf size %d\n",
878        diff,
879        1<<get_order_from_difference (diff));
880
881   if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
882   {
883     GNUNET_break (0);
884     fail_union_operation (op);
885     return GNUNET_SYSERR;
886   }
887
888
889   if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2))
890   {
891     LOG (GNUNET_ERROR_TYPE_INFO,
892          "Sending full set (diff=%d, own set=%u)\n",
893          diff,
894          op->state->initial_size);
895     if (op->state->initial_size <= other_size)
896     {
897       send_full_set (op);
898     }
899     else
900     {
901       struct GNUNET_MQ_Envelope *ev;
902       op->state->phase = PHASE_EXPECT_IBF;
903       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
904       GNUNET_MQ_send (op->mq, ev);
905     }
906   }
907   else
908   {
909     if (GNUNET_OK !=
910         send_ibf (op,
911                   get_order_from_difference (diff)))
912     {
913       /* Internal error, best we can do is shut the connection */
914       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
915                   "Failed to send IBF, closing connection\n");
916       fail_union_operation (op);
917       return GNUNET_SYSERR;
918     }
919   }
920
921   return GNUNET_OK;
922 }
923
924
925 /**
926  * Iterator to send elements to a remote peer
927  *
928  * @param cls closure with the element key and the union operation
929  * @param key ignored
930  * @param value the key entry
931  */
932 static int
933 send_offers_iterator (void *cls,
934                       uint32_t key,
935                       void *value)
936 {
937   struct SendElementClosure *sec = cls;
938   struct Operation *op = sec->op;
939   struct KeyEntry *ke = value;
940   struct GNUNET_MQ_Envelope *ev;
941   struct GNUNET_MessageHeader *mh;
942
943   /* Detect 32-bit key collision for the 64-bit IBF keys. */
944   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
945     return GNUNET_YES;
946
947   ev = GNUNET_MQ_msg_header_extra (mh,
948                                    sizeof (struct GNUNET_HashCode),
949                                    GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
950
951   GNUNET_assert (NULL != ev);
952   *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
953   LOG (GNUNET_ERROR_TYPE_DEBUG,
954        "[OP %x] sending element offer (%s) to peer\n",
955        (void *) op,
956        GNUNET_h2s (&ke->element->element_hash));
957   GNUNET_MQ_send (op->mq, ev);
958   return GNUNET_YES;
959 }
960
961
962 /**
963  * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
964  *
965  * @param op union operation
966  * @param ibf_key IBF key of interest
967  */
968 static void
969 send_offers_for_key (struct Operation *op,
970                      struct IBF_Key ibf_key)
971 {
972   struct SendElementClosure send_cls;
973
974   send_cls.ibf_key = ibf_key;
975   send_cls.op = op;
976   (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
977                                                        (uint32_t) ibf_key.key_val,
978                                                        &send_offers_iterator,
979                                                        &send_cls);
980 }
981
982
983 /**
984  * Decode which elements are missing on each side, and
985  * send the appropriate offers and inquiries.
986  *
987  * @param op union operation
988  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
989  */
990 static int
991 decode_and_send (struct Operation *op)
992 {
993   struct IBF_Key key;
994   struct IBF_Key last_key;
995   int side;
996   unsigned int num_decoded;
997   struct InvertibleBloomFilter *diff_ibf;
998
999   GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1000
1001   if (GNUNET_OK !=
1002       prepare_ibf (op, op->state->remote_ibf->size))
1003   {
1004     GNUNET_break (0);
1005     /* allocation failed */
1006     return GNUNET_SYSERR;
1007   }
1008   diff_ibf = ibf_dup (op->state->local_ibf);
1009   ibf_subtract (diff_ibf, op->state->remote_ibf);
1010
1011   ibf_destroy (op->state->remote_ibf);
1012   op->state->remote_ibf = NULL;
1013
1014   LOG (GNUNET_ERROR_TYPE_DEBUG,
1015        "decoding IBF (size=%u)\n",
1016        diff_ibf->size);
1017
1018   num_decoded = 0;
1019   key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1020
1021   while (1)
1022   {
1023     int res;
1024     int cycle_detected = GNUNET_NO;
1025
1026     last_key = key;
1027
1028     res = ibf_decode (diff_ibf, &side, &key);
1029     if (res == GNUNET_OK)
1030     {
1031       LOG (GNUNET_ERROR_TYPE_DEBUG,
1032            "decoded ibf key %lx\n",
1033            (unsigned long) key.key_val);
1034       num_decoded += 1;
1035       if ( (num_decoded > diff_ibf->size) ||
1036            ( (num_decoded > 1) &&
1037              (last_key.key_val == key.key_val) ) )
1038       {
1039         LOG (GNUNET_ERROR_TYPE_DEBUG,
1040              "detected cyclic ibf (decoded %u/%u)\n",
1041              num_decoded,
1042              diff_ibf->size);
1043         cycle_detected = GNUNET_YES;
1044       }
1045     }
1046     if ( (GNUNET_SYSERR == res) ||
1047          (GNUNET_YES == cycle_detected) )
1048     {
1049       int next_order;
1050       next_order = 0;
1051       while (1<<next_order < diff_ibf->size)
1052         next_order++;
1053       next_order++;
1054       if (next_order <= MAX_IBF_ORDER)
1055       {
1056         LOG (GNUNET_ERROR_TYPE_DEBUG,
1057              "decoding failed, sending larger ibf (size %u)\n",
1058              1<<next_order);
1059         GNUNET_STATISTICS_update (_GSS_statistics,
1060                                   "# of IBF retries",
1061                                   1,
1062                                   GNUNET_NO);
1063         op->state->salt_send++;
1064         if (GNUNET_OK !=
1065             send_ibf (op, next_order))
1066         {
1067           /* Internal error, best we can do is shut the connection */
1068           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1069                       "Failed to send IBF, closing connection\n");
1070           fail_union_operation (op);
1071           ibf_destroy (diff_ibf);
1072           return GNUNET_SYSERR;
1073         }
1074       }
1075       else
1076       {
1077         GNUNET_STATISTICS_update (_GSS_statistics,
1078                                   "# of failed union operations (too large)",
1079                                   1,
1080                                   GNUNET_NO);
1081         // XXX: Send the whole set, element-by-element
1082         LOG (GNUNET_ERROR_TYPE_ERROR,
1083              "set union failed: reached ibf limit\n");
1084         fail_union_operation (op);
1085         ibf_destroy (diff_ibf);
1086         return GNUNET_SYSERR;
1087       }
1088       break;
1089     }
1090     if (GNUNET_NO == res)
1091     {
1092       struct GNUNET_MQ_Envelope *ev;
1093
1094       LOG (GNUNET_ERROR_TYPE_DEBUG,
1095            "transmitted all values, sending DONE\n");
1096       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1097       GNUNET_MQ_send (op->mq, ev);
1098       /* We now wait until we get a DONE message back
1099        * and then wait for our MQ to be flushed and all our
1100        * demands be delivered. */
1101       break;
1102     }
1103     if (1 == side)
1104     {
1105       struct IBF_Key unsalted_key;
1106       unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1107       send_offers_for_key (op, unsalted_key);
1108     }
1109     else if (-1 == side)
1110     {
1111       struct GNUNET_MQ_Envelope *ev;
1112       struct InquiryMessage *msg;
1113
1114       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1115        * the effort additional complexity. */
1116       ev = GNUNET_MQ_msg_extra (msg,
1117                                 sizeof (struct IBF_Key),
1118                                 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1119       msg->salt = htonl (op->state->salt_receive);
1120       GNUNET_memcpy (&msg[1],
1121               &key,
1122               sizeof (struct IBF_Key));
1123       LOG (GNUNET_ERROR_TYPE_DEBUG,
1124            "sending element inquiry for IBF key %lx\n",
1125            (unsigned long) key.key_val);
1126       GNUNET_MQ_send (op->mq, ev);
1127     }
1128     else
1129     {
1130       GNUNET_assert (0);
1131     }
1132   }
1133   ibf_destroy (diff_ibf);
1134   return GNUNET_OK;
1135 }
1136
1137
1138 /**
1139  * Handle an IBF message from a remote peer.
1140  *
1141  * Reassemble the IBF from multiple pieces, and
1142  * process the whole IBF once possible.
1143  *
1144  * @param cls the union operation
1145  * @param mh the header of the message
1146  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1147  *         #GNUNET_OK otherwise
1148  */
1149 static int
1150 handle_p2p_ibf (void *cls,
1151                 const struct GNUNET_MessageHeader *mh)
1152 {
1153   struct Operation *op = cls;
1154   const struct IBFMessage *msg;
1155   unsigned int buckets_in_message;
1156
1157   if (ntohs (mh->size) < sizeof (struct IBFMessage))
1158   {
1159     GNUNET_break_op (0);
1160     fail_union_operation (op);
1161     return GNUNET_SYSERR;
1162   }
1163   msg = (const struct IBFMessage *) mh;
1164   if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1165        (op->state->phase == PHASE_EXPECT_IBF) )
1166   {
1167     op->state->phase = PHASE_EXPECT_IBF_CONT;
1168     GNUNET_assert (NULL == op->state->remote_ibf);
1169     LOG (GNUNET_ERROR_TYPE_DEBUG,
1170          "Creating new ibf of size %u\n",
1171          1 << msg->order);
1172     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1173     op->state->salt_receive = ntohl (msg->salt);
1174     LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1175     if (NULL == op->state->remote_ibf)
1176     {
1177       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1178                   "Failed to parse remote IBF, closing connection\n");
1179       fail_union_operation (op);
1180       return GNUNET_SYSERR;
1181     }
1182     op->state->ibf_buckets_received = 0;
1183     if (0 != ntohl (msg->offset))
1184     {
1185       GNUNET_break_op (0);
1186       fail_union_operation (op);
1187       return GNUNET_SYSERR;
1188     }
1189   }
1190   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1191   {
1192     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1193     {
1194       GNUNET_break_op (0);
1195       fail_union_operation (op);
1196       return GNUNET_SYSERR;
1197     }
1198     if (1<<msg->order != op->state->remote_ibf->size)
1199     {
1200       GNUNET_break_op (0);
1201       fail_union_operation (op);
1202       return GNUNET_SYSERR;
1203     }
1204     if (ntohl (msg->salt) != op->state->salt_receive)
1205     {
1206       GNUNET_break_op (0);
1207       fail_union_operation (op);
1208       return GNUNET_SYSERR;
1209     }
1210   }
1211   else
1212   {
1213     GNUNET_assert (0);
1214   }
1215
1216   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1217
1218   if (0 == buckets_in_message)
1219   {
1220     GNUNET_break_op (0);
1221     fail_union_operation (op);
1222     return GNUNET_SYSERR;
1223   }
1224
1225   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1226   {
1227     GNUNET_break_op (0);
1228     fail_union_operation (op);
1229     return GNUNET_SYSERR;
1230   }
1231
1232   GNUNET_assert (NULL != op->state->remote_ibf);
1233
1234   ibf_read_slice (&msg[1],
1235                   op->state->ibf_buckets_received,
1236                   buckets_in_message,
1237                   op->state->remote_ibf);
1238   op->state->ibf_buckets_received += buckets_in_message;
1239
1240   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1241   {
1242     LOG (GNUNET_ERROR_TYPE_DEBUG,
1243          "received full ibf\n");
1244     op->state->phase = PHASE_INVENTORY_ACTIVE;
1245     if (GNUNET_OK !=
1246         decode_and_send (op))
1247     {
1248       /* Internal error, best we can do is shut down */
1249       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1250                   "Failed to decode IBF, closing connection\n");
1251       return GNUNET_SYSERR;
1252     }
1253   }
1254   return GNUNET_OK;
1255 }
1256
1257
1258 /**
1259  * Send a result message to the client indicating
1260  * that there is a new element.
1261  *
1262  * @param op union operation
1263  * @param element element to send
1264  * @param status status to send with the new element
1265  */
1266 static void
1267 send_client_element (struct Operation *op,
1268                      struct GNUNET_SET_Element *element,
1269                      int status)
1270 {
1271   struct GNUNET_MQ_Envelope *ev;
1272   struct GNUNET_SET_ResultMessage *rm;
1273
1274   LOG (GNUNET_ERROR_TYPE_DEBUG,
1275        "sending element (size %u) to client\n",
1276        element->size);
1277   GNUNET_assert (0 != op->spec->client_request_id);
1278   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1279   if (NULL == ev)
1280   {
1281     GNUNET_MQ_discard (ev);
1282     GNUNET_break (0);
1283     return;
1284   }
1285   rm->result_status = htons (status);
1286   rm->request_id = htonl (op->spec->client_request_id);
1287   rm->element_type = element->element_type;
1288   GNUNET_memcpy (&rm[1], element->data, element->size);
1289   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1290 }
1291
1292
1293 /**
1294  * Signal to the client that the operation has finished and
1295  * destroy the operation.
1296  *
1297  * @param cls operation to destroy
1298  */
1299 static void
1300 send_done_and_destroy (void *cls)
1301 {
1302   struct Operation *op = cls;
1303   struct GNUNET_MQ_Envelope *ev;
1304   struct GNUNET_SET_ResultMessage *rm;
1305
1306   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1307   rm->request_id = htonl (op->spec->client_request_id);
1308   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1309   rm->element_type = htons (0);
1310   GNUNET_MQ_send (op->spec->set->client_mq, ev);
1311   /* Will also call the union-specific cancel function. */
1312   _GSS_operation_destroy (op, GNUNET_YES);
1313 }
1314
1315
1316 static void
1317 maybe_finish (struct Operation *op)
1318 {
1319   unsigned int num_demanded;
1320
1321   num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1322
1323   if (PHASE_FINISH_WAITING == op->state->phase)
1324   {
1325     LOG (GNUNET_ERROR_TYPE_DEBUG,
1326          "In PHASE_FINISH_WAITING, pending %u demands\n",
1327          num_demanded);
1328     if (0 == num_demanded)
1329     {
1330       struct GNUNET_MQ_Envelope *ev;
1331
1332       op->state->phase = PHASE_DONE;
1333       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1334       GNUNET_MQ_send (op->mq, ev);
1335
1336       /* We now wait until the other peer closes the channel
1337        * after it got all elements from us. */
1338     }
1339   }
1340   if (PHASE_FINISH_CLOSING == op->state->phase)
1341   {
1342     LOG (GNUNET_ERROR_TYPE_DEBUG,
1343          "In PHASE_FINISH_CLOSING, pending %u demands\n",
1344          num_demanded);
1345     if (0 == num_demanded)
1346     {
1347       op->state->phase = PHASE_DONE;
1348       send_done_and_destroy (op);
1349     }
1350   }
1351 }
1352
1353
1354 /**
1355  * Handle an element message from a remote peer.
1356  * Sent by the other peer either because we decoded an IBF and placed a demand,
1357  * or because the other peer switched to full set transmission.
1358  *
1359  * @param cls the union operation
1360  * @param mh the message
1361  */
1362 static void
1363 handle_p2p_elements (void *cls,
1364                      const struct GNUNET_MessageHeader *mh)
1365 {
1366   struct Operation *op = cls;
1367   struct ElementEntry *ee;
1368   const struct GNUNET_SET_ElementMessage *emsg;
1369   uint16_t element_size;
1370
1371   if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1372   {
1373     GNUNET_break_op (0);
1374     fail_union_operation (op);
1375     return;
1376   }
1377   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1378   {
1379     GNUNET_break_op (0);
1380     fail_union_operation (op);
1381     return;
1382   }
1383
1384   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1385
1386   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1387   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1388   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1389   ee->element.size = element_size;
1390   ee->element.data = &ee[1];
1391   ee->element.element_type = ntohs (emsg->element_type);
1392   ee->remote = GNUNET_YES;
1393   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1394
1395   if (GNUNET_NO ==
1396       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1397                                             &ee->element_hash,
1398                                             NULL))
1399   {
1400     /* We got something we didn't demand, since it's not in our map. */
1401     GNUNET_break_op (0);
1402     GNUNET_free (ee);
1403     fail_union_operation (op);
1404     return;
1405   }
1406
1407   LOG (GNUNET_ERROR_TYPE_DEBUG,
1408        "Got element (size %u, hash %s) from peer\n",
1409        (unsigned int) element_size,
1410        GNUNET_h2s (&ee->element_hash));
1411
1412   GNUNET_STATISTICS_update (_GSS_statistics,
1413                             "# received elements",
1414                             1,
1415                             GNUNET_NO);
1416   GNUNET_STATISTICS_update (_GSS_statistics,
1417                             "# exchanged elements",
1418                             1,
1419                             GNUNET_NO);
1420
1421   op->state->received_total += 1;
1422
1423   struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1424
1425   if (NULL != ke)
1426   {
1427     /* Got repeated element.  Should not happen since
1428      * we track demands. */
1429     GNUNET_STATISTICS_update (_GSS_statistics,
1430                               "# repeated elements",
1431                               1,
1432                               GNUNET_NO);
1433     ke->received = GNUNET_YES;
1434     GNUNET_free (ee);
1435   }
1436   else
1437   {
1438     LOG (GNUNET_ERROR_TYPE_DEBUG,
1439          "Registering new element from remote peer\n");
1440     op->state->received_fresh += 1;
1441     op_register_element (op, ee, GNUNET_YES);
1442     /* only send results immediately if the client wants it */
1443     switch (op->spec->result_mode)
1444     {
1445       case GNUNET_SET_RESULT_ADDED:
1446         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1447         break;
1448       case GNUNET_SET_RESULT_SYMMETRIC:
1449         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1450         break;
1451       default:
1452         /* Result mode not supported, should have been caught earlier. */
1453         GNUNET_break (0);
1454         break;
1455     }
1456   }
1457
1458   if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1459   {
1460     /* The other peer gave us lots of old elements, there's something wrong. */
1461     GNUNET_break_op (0);
1462     fail_union_operation (op);
1463     return;
1464   }
1465
1466   maybe_finish (op);
1467 }
1468
1469
1470 /**
1471  * Handle an element message from a remote peer.
1472  *
1473  * @param cls the union operation
1474  * @param mh the message
1475  */
1476 static void
1477 handle_p2p_full_element (void *cls,
1478                          const struct GNUNET_MessageHeader *mh)
1479 {
1480   struct Operation *op = cls;
1481   struct ElementEntry *ee;
1482   const struct GNUNET_SET_ElementMessage *emsg;
1483   uint16_t element_size;
1484
1485   if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1486   {
1487     GNUNET_break_op (0);
1488     fail_union_operation (op);
1489     return;
1490   }
1491
1492   emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1493
1494   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1495   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1496   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1497   ee->element.size = element_size;
1498   ee->element.data = &ee[1];
1499   ee->element.element_type = ntohs (emsg->element_type);
1500   ee->remote = GNUNET_YES;
1501   GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1502
1503   LOG (GNUNET_ERROR_TYPE_DEBUG,
1504        "Got element (full diff, size %u, hash %s) from peer\n",
1505        (unsigned int) element_size,
1506        GNUNET_h2s (&ee->element_hash));
1507
1508   GNUNET_STATISTICS_update (_GSS_statistics,
1509                             "# received elements",
1510                             1,
1511                             GNUNET_NO);
1512   GNUNET_STATISTICS_update (_GSS_statistics,
1513                             "# exchanged elements",
1514                             1,
1515                             GNUNET_NO);
1516
1517   op->state->received_total += 1;
1518
1519   struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1520
1521   if (NULL != ke)
1522   {
1523     /* Got repeated element.  Should not happen since
1524      * we track demands. */
1525     GNUNET_STATISTICS_update (_GSS_statistics,
1526                               "# repeated elements",
1527                               1,
1528                               GNUNET_NO);
1529     ke->received = GNUNET_YES;
1530     GNUNET_free (ee);
1531   }
1532   else
1533   {
1534     LOG (GNUNET_ERROR_TYPE_DEBUG,
1535          "Registering new element from remote peer\n");
1536     op->state->received_fresh += 1;
1537     op_register_element (op, ee, GNUNET_YES);
1538     /* only send results immediately if the client wants it */
1539     switch (op->spec->result_mode)
1540     {
1541       case GNUNET_SET_RESULT_ADDED:
1542         send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1543         break;
1544       case GNUNET_SET_RESULT_SYMMETRIC:
1545         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1546         break;
1547       default:
1548         /* Result mode not supported, should have been caught earlier. */
1549         GNUNET_break (0);
1550         break;
1551     }
1552   }
1553
1554   if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1555   {
1556     /* The other peer gave us lots of old elements, there's something wrong. */
1557     GNUNET_break_op (0);
1558     fail_union_operation (op);
1559     return;
1560   }
1561 }
1562
1563 /**
1564  * Send offers (for GNUNET_Hash-es) in response
1565  * to inquiries (for IBF_Key-s).
1566  *
1567  * @param cls the union operation
1568  * @param mh the message
1569  */
1570 static void
1571 handle_p2p_inquiry (void *cls,
1572                     const struct GNUNET_MessageHeader *mh)
1573 {
1574   struct Operation *op = cls;
1575   const struct IBF_Key *ibf_key;
1576   unsigned int num_keys;
1577   struct InquiryMessage *msg;
1578
1579   /* look up elements and send them */
1580   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1581   {
1582     GNUNET_break_op (0);
1583     fail_union_operation (op);
1584     return;
1585   }
1586   num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1587       / sizeof (struct IBF_Key);
1588   if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1589       != num_keys * sizeof (struct IBF_Key))
1590   {
1591     GNUNET_break_op (0);
1592     fail_union_operation (op);
1593     return;
1594   }
1595
1596   msg = (struct InquiryMessage *) mh;
1597
1598   ibf_key = (const struct IBF_Key *) &msg[1];
1599   while (0 != num_keys--)
1600   {
1601     struct IBF_Key unsalted_key;
1602     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1603     send_offers_for_key (op, unsalted_key);
1604     ibf_key++;
1605   }
1606 }
1607
1608
1609 /**
1610  * Iterator over hash map entries, called to
1611  * destroy the linked list of colliding ibf key entries.
1612  *
1613  * @param cls closure
1614  * @param key current key code
1615  * @param value value in the hash map
1616  * @return #GNUNET_YES if we should continue to iterate,
1617  *         #GNUNET_NO if not.
1618  */
1619 static int
1620 send_missing_elements_iter (void *cls,
1621                             uint32_t key,
1622                             void *value)
1623 {
1624   struct Operation *op = cls;
1625   struct KeyEntry *ke = value;
1626   struct GNUNET_MQ_Envelope *ev;
1627   struct GNUNET_SET_ElementMessage *emsg;
1628   struct ElementEntry *ee = ke->element;
1629
1630   if (GNUNET_YES == ke->received)
1631     return GNUNET_YES;
1632
1633   ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1634   GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1635   emsg->reserved = htons (0);
1636   emsg->element_type = htons (ee->element.element_type);
1637   GNUNET_MQ_send (op->mq, ev);
1638
1639   return GNUNET_YES;
1640 }
1641
1642
1643 /**
1644  * Handle a 
1645  *
1646  * @parem cls closure, a set union operation
1647  * @param mh the demand message
1648  */
1649 static void
1650 handle_p2p_request_full (void *cls,
1651                          const struct GNUNET_MessageHeader *mh)
1652 {
1653   struct Operation *op = cls;
1654
1655   if (PHASE_EXPECT_IBF != op->state->phase)
1656   {
1657     fail_union_operation (op);
1658     GNUNET_break_op (0);
1659     return;
1660   }
1661
1662   // FIXME: we need to check that our set is larger than the
1663   // byzantine_lower_bound by some threshold
1664   send_full_set (op);
1665 }
1666
1667
1668 /**
1669  * Handle a "full done" message.
1670  *
1671  * @parem cls closure, a set union operation
1672  * @param mh the demand message
1673  */
1674 static void
1675 handle_p2p_full_done (void *cls,
1676                       const struct GNUNET_MessageHeader *mh)
1677 {
1678   struct Operation *op = cls;
1679
1680   if (PHASE_EXPECT_IBF == op->state->phase)
1681   {
1682     struct GNUNET_MQ_Envelope *ev;
1683
1684     LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1685
1686     /* send all the elements that did not come from the remote peer */
1687     GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1688                                              &send_missing_elements_iter,
1689                                              op);
1690
1691     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1692     GNUNET_MQ_send (op->mq, ev);
1693     op->state->phase = PHASE_DONE;
1694
1695     /* we now wait until the other peer shuts the tunnel down*/
1696   }
1697   else if (PHASE_FULL_SENDING == op->state->phase)
1698   {
1699     LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1700     /* We sent the full set, and got the response for that.  We're done. */
1701     op->state->phase = PHASE_DONE;
1702     send_done_and_destroy (op);
1703   }
1704   else
1705   {
1706     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1707     GNUNET_break_op (0);
1708     fail_union_operation (op);
1709     return;
1710   }
1711 }
1712
1713
1714 /**
1715  * Handle a demand by the other peer for elements based on a list
1716  * of GNUNET_HashCode-s.
1717  *
1718  * @parem cls closure, a set union operation
1719  * @param mh the demand message
1720  */
1721 static void
1722 handle_p2p_demand (void *cls,
1723                    const struct GNUNET_MessageHeader *mh)
1724 {
1725   struct Operation *op = cls;
1726   struct ElementEntry *ee;
1727   struct GNUNET_SET_ElementMessage *emsg;
1728   const struct GNUNET_HashCode *hash;
1729   unsigned int num_hashes;
1730   struct GNUNET_MQ_Envelope *ev;
1731
1732   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1733     / sizeof (struct GNUNET_HashCode);
1734   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1735       != num_hashes * sizeof (struct GNUNET_HashCode))
1736   {
1737     GNUNET_break_op (0);
1738     fail_union_operation (op);
1739     return;
1740   }
1741
1742   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1743        num_hashes > 0;
1744        hash++, num_hashes--)
1745   {
1746     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1747     if (NULL == ee)
1748     {
1749       /* Demand for non-existing element. */
1750       GNUNET_break_op (0);
1751       fail_union_operation (op);
1752       return;
1753     }
1754     if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1755     {
1756       /* Probably confused lazily copied sets. */
1757       GNUNET_break_op (0);
1758       fail_union_operation (op);
1759       return;
1760     }
1761     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1762     GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1763     emsg->reserved = htons (0);
1764     emsg->element_type = htons (ee->element.element_type);
1765     LOG (GNUNET_ERROR_TYPE_DEBUG,
1766          "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1767          (void *) op,
1768          (unsigned int) ee->element.size,
1769          GNUNET_h2s (&ee->element_hash));
1770     GNUNET_MQ_send (op->mq, ev);
1771     GNUNET_STATISTICS_update (_GSS_statistics,
1772                               "# exchanged elements",
1773                               1,
1774                               GNUNET_NO);
1775
1776     switch (op->spec->result_mode)
1777     {
1778       case GNUNET_SET_RESULT_ADDED:
1779         /* Nothing to do. */
1780         break;
1781       case GNUNET_SET_RESULT_SYMMETRIC:
1782         send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1783         break;
1784       default:
1785         /* Result mode not supported, should have been caught earlier. */
1786         GNUNET_break (0);
1787         break;
1788     }
1789   }
1790 }
1791
1792
1793 /**
1794  * Handle offers (of GNUNET_HashCode-s) and
1795  * respond with demands (of GNUNET_HashCode-s).
1796  *
1797  * @param cls the union operation
1798  * @param mh the message
1799  */
1800 static void
1801 handle_p2p_offer (void *cls,
1802                     const struct GNUNET_MessageHeader *mh)
1803 {
1804   struct Operation *op = cls;
1805   const struct GNUNET_HashCode *hash;
1806   unsigned int num_hashes;
1807
1808   /* look up elements and send them */
1809   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1810        (op->state->phase != PHASE_INVENTORY_ACTIVE))
1811   {
1812     GNUNET_break_op (0);
1813     fail_union_operation (op);
1814     return;
1815   }
1816   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1817     / sizeof (struct GNUNET_HashCode);
1818   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1819       != num_hashes * sizeof (struct GNUNET_HashCode))
1820   {
1821     GNUNET_break_op (0);
1822     fail_union_operation (op);
1823     return;
1824   }
1825
1826   for (hash = (const struct GNUNET_HashCode *) &mh[1];
1827        num_hashes > 0;
1828        hash++, num_hashes--)
1829   {
1830     struct ElementEntry *ee;
1831     struct GNUNET_MessageHeader *demands;
1832     struct GNUNET_MQ_Envelope *ev;
1833
1834     ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1835                                             hash);
1836     if (NULL != ee)
1837       if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1838         continue;
1839
1840     if (GNUNET_YES ==
1841         GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1842                                                 hash))
1843     {
1844       LOG (GNUNET_ERROR_TYPE_DEBUG,
1845            "Skipped sending duplicate demand\n");
1846       continue;
1847     }
1848
1849     GNUNET_assert (GNUNET_OK ==
1850                    GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1851                                                       hash,
1852                                                       NULL,
1853                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1854
1855     LOG (GNUNET_ERROR_TYPE_DEBUG,
1856          "[OP %x] Requesting element (hash %s)\n",
1857          (void *) op, GNUNET_h2s (hash));
1858     ev = GNUNET_MQ_msg_header_extra (demands,
1859                                      sizeof (struct GNUNET_HashCode),
1860                                      GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1861     *(struct GNUNET_HashCode *) &demands[1] = *hash;
1862     GNUNET_MQ_send (op->mq, ev);
1863   }
1864 }
1865
1866
1867 /**
1868  * Handle a done message from a remote peer
1869  *
1870  * @param cls the union operation
1871  * @param mh the message
1872  */
1873 static void
1874 handle_p2p_done (void *cls,
1875                  const struct GNUNET_MessageHeader *mh)
1876 {
1877   struct Operation *op = cls;
1878
1879   if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1880   {
1881     /* We got all requests, but still have to send our elements in response. */
1882
1883     op->state->phase = PHASE_FINISH_WAITING;
1884
1885     LOG (GNUNET_ERROR_TYPE_DEBUG,
1886          "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1887     /* The active peer is done sending offers
1888      * and inquiries.  This means that all
1889      * our responses to that (demands and offers)
1890      * must be in flight (queued or in mesh).
1891      *
1892      * We should notify the active peer once
1893      * all our demands are satisfied, so that the active
1894      * peer can quit if we gave him everything.
1895      */
1896     maybe_finish (op);
1897     return;
1898   }
1899   if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1900   {
1901     LOG (GNUNET_ERROR_TYPE_DEBUG,
1902          "got DONE (as active partner), waiting to finish\n");
1903     /* All demands of the other peer are satisfied,
1904      * and we processed all offers, thus we know
1905      * exactly what our demands must be.
1906      *
1907      * We'll close the channel
1908      * to the other peer once our demands are met.
1909      */
1910     op->state->phase = PHASE_FINISH_CLOSING;
1911     maybe_finish (op);
1912     return;
1913   }
1914   GNUNET_break_op (0);
1915   fail_union_operation (op);
1916 }
1917
1918
1919 /**
1920  * Initiate operation to evaluate a set union with a remote peer.
1921  *
1922  * @param op operation to perform (to be initialized)
1923  * @param opaque_context message to be transmitted to the listener
1924  *        to convince him to accept, may be NULL
1925  */
1926 static void
1927 union_evaluate (struct Operation *op,
1928                 const struct GNUNET_MessageHeader *opaque_context)
1929 {
1930   struct GNUNET_MQ_Envelope *ev;
1931   struct OperationRequestMessage *msg;
1932
1933   GNUNET_assert (NULL == op->state);
1934   op->state = GNUNET_new (struct OperationState);
1935   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1936   /* copy the current generation's strata estimator for this operation */
1937   op->state->se = strata_estimator_dup (op->spec->set->state->se);
1938   /* we started the operation, thus we have to send the operation request */
1939   op->state->phase = PHASE_EXPECT_SE;
1940   op->state->salt_receive = op->state->salt_send = 42;
1941   LOG (GNUNET_ERROR_TYPE_DEBUG,
1942        "Initiating union operation evaluation\n");
1943   GNUNET_STATISTICS_update (_GSS_statistics,
1944                             "# of total union operations",
1945                             1,
1946                             GNUNET_NO);
1947   GNUNET_STATISTICS_update (_GSS_statistics,
1948                             "# of initiated union operations",
1949                             1,
1950                             GNUNET_NO);
1951   ev = GNUNET_MQ_msg_nested_mh (msg,
1952                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1953                                 opaque_context);
1954   if (NULL == ev)
1955   {
1956     /* the context message is too large */
1957     GNUNET_break (0);
1958     GNUNET_SERVICE_client_drop (op->spec->set->client);
1959     return;
1960   }
1961   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1962   GNUNET_MQ_send (op->mq,
1963                   ev);
1964
1965   if (NULL != opaque_context)
1966     LOG (GNUNET_ERROR_TYPE_DEBUG,
1967          "sent op request with context message\n");
1968   else
1969     LOG (GNUNET_ERROR_TYPE_DEBUG,
1970          "sent op request without context message\n");
1971
1972   op->state->initial_size = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
1973   initialize_key_to_element (op);
1974 }
1975
1976
1977 /**
1978  * Accept an union operation request from a remote peer.
1979  * Only initializes the private operation state.
1980  *
1981  * @param op operation that will be accepted as a union operation
1982  */
1983 static void
1984 union_accept (struct Operation *op)
1985 {
1986   LOG (GNUNET_ERROR_TYPE_DEBUG,
1987        "accepting set union operation\n");
1988   GNUNET_assert (NULL == op->state);
1989
1990   GNUNET_STATISTICS_update (_GSS_statistics,
1991                             "# of accepted union operations",
1992                             1,
1993                             GNUNET_NO);
1994   GNUNET_STATISTICS_update (_GSS_statistics,
1995                             "# of total union operations",
1996                             1,
1997                             GNUNET_NO);
1998
1999   op->state = GNUNET_new (struct OperationState);
2000   op->state->se = strata_estimator_dup (op->spec->set->state->se);
2001   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2002   op->state->salt_receive = op->state->salt_send = 42;
2003   op->state->initial_size = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
2004   initialize_key_to_element (op);
2005   /* kick off the operation */
2006   send_strata_estimator (op);
2007 }
2008
2009
2010 /**
2011  * Create a new set supporting the union operation
2012  *
2013  * We maintain one strata estimator per set and then manipulate it over the
2014  * lifetime of the set, as recreating a strata estimator would be expensive.
2015  *
2016  * @return the newly created set, NULL on error
2017  */
2018 static struct SetState *
2019 union_set_create (void)
2020 {
2021   struct SetState *set_state;
2022
2023   LOG (GNUNET_ERROR_TYPE_DEBUG,
2024        "union set created\n");
2025   set_state = GNUNET_new (struct SetState);
2026   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2027                                            SE_IBF_SIZE, SE_IBF_HASH_NUM);
2028   if (NULL == set_state->se)
2029   {
2030     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2031                 "Failed to allocate strata estimator\n");
2032     GNUNET_free (set_state);
2033     return NULL;
2034   }
2035   return set_state;
2036 }
2037
2038
2039 /**
2040  * Add the element from the given element message to the set.
2041  *
2042  * @param set_state state of the set want to add to
2043  * @param ee the element to add to the set
2044  */
2045 static void
2046 union_add (struct SetState *set_state, struct ElementEntry *ee)
2047 {
2048   strata_estimator_insert (set_state->se,
2049                            get_ibf_key (&ee->element_hash));
2050 }
2051
2052
2053 /**
2054  * Remove the element given in the element message from the set.
2055  * Only marks the element as removed, so that older set operations can still exchange it.
2056  *
2057  * @param set_state state of the set to remove from
2058  * @param ee set element to remove
2059  */
2060 static void
2061 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2062 {
2063   strata_estimator_remove (set_state->se,
2064                            get_ibf_key (&ee->element_hash));
2065 }
2066
2067
2068 /**
2069  * Destroy a set that supports the union operation.
2070  *
2071  * @param set_state the set to destroy
2072  */
2073 static void
2074 union_set_destroy (struct SetState *set_state)
2075 {
2076   if (NULL != set_state->se)
2077   {
2078     strata_estimator_destroy (set_state->se);
2079     set_state->se = NULL;
2080   }
2081   GNUNET_free (set_state);
2082 }
2083
2084
2085 /**
2086  * Dispatch messages for a union operation.
2087  *
2088  * @param op the state of the union evaluate operation
2089  * @param mh the received message
2090  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2091  *         #GNUNET_OK otherwise
2092  */
2093 int
2094 union_handle_p2p_message (struct Operation *op,
2095                           const struct GNUNET_MessageHeader *mh)
2096 {
2097   //LOG (GNUNET_ERROR_TYPE_DEBUG,
2098   //            "received p2p message (t: %u, s: %u)\n",
2099   //            ntohs (mh->type),
2100   //            ntohs (mh->size));
2101   switch (ntohs (mh->type))
2102   {
2103     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2104       return handle_p2p_ibf (op, mh);
2105     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2106       return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2107     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2108       return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2109     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2110       handle_p2p_elements (op, mh);
2111       break;
2112     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2113       handle_p2p_full_element (op, mh);
2114       break;
2115     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2116       handle_p2p_inquiry (op, mh);
2117       break;
2118     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2119       handle_p2p_done (op, mh);
2120       break;
2121     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2122       handle_p2p_offer (op, mh);
2123       break;
2124     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2125       handle_p2p_demand (op, mh);
2126       break;
2127     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2128       handle_p2p_full_done (op, mh);
2129       break;
2130     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2131       handle_p2p_request_full (op, mh);
2132       break;
2133     default:
2134       /* Something wrong with cadet's message handlers? */
2135       GNUNET_assert (0);
2136   }
2137   return GNUNET_OK;
2138 }
2139
2140
2141 /**
2142  * Handler for peer-disconnects, notifies the client
2143  * about the aborted operation in case the op was not concluded.
2144  *
2145  * @param op the destroyed operation
2146  */
2147 static void
2148 union_peer_disconnect (struct Operation *op)
2149 {
2150   if (PHASE_DONE != op->state->phase)
2151   {
2152     struct GNUNET_MQ_Envelope *ev;
2153     struct GNUNET_SET_ResultMessage *msg;
2154
2155     ev = GNUNET_MQ_msg (msg,
2156                         GNUNET_MESSAGE_TYPE_SET_RESULT);
2157     msg->request_id = htonl (op->spec->client_request_id);
2158     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2159     msg->element_type = htons (0);
2160     GNUNET_MQ_send (op->spec->set->client_mq,
2161                     ev);
2162     LOG (GNUNET_ERROR_TYPE_WARNING,
2163          "other peer disconnected prematurely, phase %u\n",
2164          op->state->phase);
2165     _GSS_operation_destroy (op,
2166                             GNUNET_YES);
2167     return;
2168   }
2169   // else: the session has already been concluded
2170   LOG (GNUNET_ERROR_TYPE_DEBUG,
2171        "other peer disconnected (finished)\n");
2172   if (GNUNET_NO == op->state->client_done_sent)
2173     send_done_and_destroy (op);
2174 }
2175
2176
2177 /**
2178  * Copy union-specific set state.
2179  *
2180  * @param set source set for copying the union state
2181  * @return a copy of the union-specific set state
2182  */
2183 static struct SetState *
2184 union_copy_state (struct Set *set)
2185 {
2186   struct SetState *new_state;
2187
2188   new_state = GNUNET_new (struct SetState);
2189   GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2190   new_state->se = strata_estimator_dup (set->state->se);
2191
2192   return new_state;
2193 }
2194
2195
2196 /**
2197  * Get the table with implementing functions for
2198  * set union.
2199  *
2200  * @return the operation specific VTable
2201  */
2202 const struct SetVT *
2203 _GSS_union_vt ()
2204 {
2205   static const struct SetVT union_vt = {
2206     .create = &union_set_create,
2207     .msg_handler = &union_handle_p2p_message,
2208     .add = &union_add,
2209     .remove = &union_remove,
2210     .destroy_set = &union_set_destroy,
2211     .evaluate = &union_evaluate,
2212     .accept = &union_accept,
2213     .peer_disconnect = &union_peer_disconnect,
2214     .cancel = &union_op_cancel,
2215     .copy_state = &union_copy_state,
2216   };
2217
2218   return &union_vt;
2219 }