2 This file is part of GNUnet
3 Copyright (C) 2013-2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
23 * @brief two-peer set operations
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
40 * Number of IBFs in a strata estimator.
42 #define SE_STRATA_COUNT 32
45 * Size of the IBFs in the strata estimator.
47 #define SE_IBF_SIZE 80
50 * The hash num parameter for the difference digests and strata estimators.
52 #define SE_IBF_HASH_NUM 4
55 * Number of buckets that can be transmitted in one message.
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
60 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61 * Choose this value so that computing the IBF is still cheaper
62 * than transmitting all values.
64 #define MAX_IBF_ORDER (20)
67 * Number of buckets used in the ibf per estimated
74 * Current phase we are in for a union operation.
76 enum UnionOperationPhase
79 * We sent the request message, and expect a strata estimator.
84 * We sent the strata estimator, and expect an IBF. This phase is entered once
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87 * XXX: could use better wording.
89 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
94 * Continuation for multi part IBFs.
96 PHASE_EXPECT_IBF_CONT,
99 * We are decoding an IBF.
101 PHASE_INVENTORY_ACTIVE,
104 * The other peer is decoding the IBF we just sent.
106 PHASE_INVENTORY_PASSIVE,
109 * The protocol is almost finished, but we still have to flush our message
110 * queue and/or expect some elements.
112 PHASE_FINISH_CLOSING,
115 * In the penultimate phase,
116 * we wait until all our demands
117 * are satisfied. Then we send a done
118 * message, and wait for another done message.*/
119 PHASE_FINISH_WAITING,
122 * In the ultimate phase, we wait until
123 * our demands are satisfied and then
124 * quit (sending another DONE message). */
130 * State of an evaluate operation with another peer.
132 struct OperationState
135 * Copy of the set's strata estimator at the time of
136 * creation of this operation.
138 struct StrataEstimator *se;
141 * The IBF we currently receive.
143 struct InvertibleBloomFilter *remote_ibf;
146 * The IBF with the local set's element.
148 struct InvertibleBloomFilter *local_ibf;
151 * Maps IBF-Keys (specific to the current salt) to elements.
152 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
153 * Colliding IBF-Keys are linked.
155 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
158 * Current state of the operation.
160 enum UnionOperationPhase phase;
163 * Did we send the client that we are done?
165 int client_done_sent;
168 * Number of ibf buckets already received into the @a remote_ibf.
170 unsigned int ibf_buckets_received;
173 * Hashes for elements that we have demanded from the other peer.
175 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
178 * Salt that we're using for sending IBFs
183 * Salt for the IBF we've received and that we're currently decoding.
185 uint32_t salt_receive;
190 * The key entry is used to associate an ibf key with an element.
195 * IBF key for the entry, derived from the current salt.
197 struct IBF_Key ibf_key;
200 * The actual element associated with the key.
202 * Only owned by the union operation if element->operation
205 struct ElementEntry *element;
210 * Used as a closure for sending elements
211 * with a specific IBF key.
213 struct SendElementClosure
216 * The IBF key whose matching elements should be
219 struct IBF_Key ibf_key;
222 * Operation for which the elements
225 struct Operation *op;
230 * Extra state required for efficient set union.
235 * The strata estimator is only generated once for
237 * The IBF keys are derived from the element hashes with
240 struct StrataEstimator *se;
245 * Iterator over hash map entries, called to
246 * destroy the linked list of colliding ibf key entries.
249 * @param key current key code
250 * @param value value in the hash map
251 * @return #GNUNET_YES if we should continue to iterate,
255 destroy_key_to_element_iter (void *cls,
259 struct KeyEntry *k = value;
261 GNUNET_assert (NULL != k);
262 if (GNUNET_YES == k->element->remote)
264 GNUNET_free (k->element);
273 * Destroy the union operation. Only things specific to the union
274 * operation are destroyed.
276 * @param op union operation to destroy
279 union_op_cancel (struct Operation *op)
281 LOG (GNUNET_ERROR_TYPE_DEBUG,
282 "destroying union op\n");
283 /* check if the op was canceled twice */
284 GNUNET_assert (NULL != op->state);
285 if (NULL != op->state->remote_ibf)
287 ibf_destroy (op->state->remote_ibf);
288 op->state->remote_ibf = NULL;
290 if (NULL != op->state->demanded_hashes)
292 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
293 op->state->demanded_hashes = NULL;
295 if (NULL != op->state->local_ibf)
297 ibf_destroy (op->state->local_ibf);
298 op->state->local_ibf = NULL;
300 if (NULL != op->state->se)
302 strata_estimator_destroy (op->state->se);
303 op->state->se = NULL;
305 if (NULL != op->state->key_to_element)
307 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
308 &destroy_key_to_element_iter,
310 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
311 op->state->key_to_element = NULL;
313 GNUNET_free (op->state);
315 LOG (GNUNET_ERROR_TYPE_DEBUG,
316 "destroying union op done\n");
321 * Inform the client that the union operation has failed,
322 * and proceed to destroy the evaluate operation.
324 * @param op the union operation to fail
327 fail_union_operation (struct Operation *op)
329 struct GNUNET_MQ_Envelope *ev;
330 struct GNUNET_SET_ResultMessage *msg;
332 LOG (GNUNET_ERROR_TYPE_ERROR,
333 "union operation failed\n");
334 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
335 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
336 msg->request_id = htonl (op->spec->client_request_id);
337 msg->element_type = htons (0);
338 GNUNET_MQ_send (op->spec->set->client_mq, ev);
339 _GSS_operation_destroy (op, GNUNET_YES);
344 * Derive the IBF key from a hash code and
347 * @param src the hash code
348 * @return the derived IBF key
350 static struct IBF_Key
351 get_ibf_key (const struct GNUNET_HashCode *src)
356 GNUNET_CRYPTO_kdf (&key, sizeof (key),
358 &salt, sizeof (salt),
365 * Iterator over the mapping from IBF keys to element entries. Checks if we
366 * have an element with a given GNUNET_HashCode.
369 * @param key current key code
370 * @param value value in the hash map
371 * @return #GNUNET_YES if we should search further,
372 * #GNUNET_NO if we've found the element.
375 op_has_element_iterator (void *cls,
379 struct GNUNET_HashCode *element_hash = cls;
380 struct KeyEntry *k = value;
382 GNUNET_assert (NULL != k);
383 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
391 * Determine whether the given element is already in the operation's element
394 * @param op operation that should be tested for 'element_hash'
395 * @param element_hash hash of the element to look for
396 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
399 op_has_element (struct Operation *op,
400 const struct GNUNET_HashCode *element_hash)
403 struct IBF_Key ibf_key;
405 ibf_key = get_ibf_key (element_hash);
406 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
407 (uint32_t) ibf_key.key_val,
408 op_has_element_iterator,
409 (void *) element_hash);
411 /* was the iteration aborted because we found the element? */
412 if (GNUNET_SYSERR == ret)
419 * Insert an element into the union operation's
420 * key-to-element mapping. Takes ownership of 'ee'.
421 * Note that this does not insert the element in the set,
422 * only in the operation's key-element mapping.
423 * This is done to speed up re-tried operations, if some elements
424 * were transmitted, and then the IBF fails to decode.
426 * XXX: clarify ownership, doesn't sound right.
428 * @param op the union operation
429 * @param ee the element entry
432 op_register_element (struct Operation *op,
433 struct ElementEntry *ee)
435 struct IBF_Key ibf_key;
438 ibf_key = get_ibf_key (&ee->element_hash);
439 k = GNUNET_new (struct KeyEntry);
441 k->ibf_key = ibf_key;
442 GNUNET_assert (GNUNET_OK ==
443 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
444 (uint32_t) ibf_key.key_val,
446 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
451 salt_key (const struct IBF_Key *k_in,
453 struct IBF_Key *k_out)
456 uint64_t x = k_in->key_val;
458 x = (x >> s) | (x << (64 - s));
464 unsalt_key (const struct IBF_Key *k_in,
466 struct IBF_Key *k_out)
469 uint64_t x = k_in->key_val;
470 x = (x << s) | (x >> (64 - s));
476 * Insert a key into an ibf.
480 * @param value the key entry to get the key from
483 prepare_ibf_iterator (void *cls,
487 struct Operation *op = cls;
488 struct KeyEntry *ke = value;
489 struct IBF_Key salted_key;
491 LOG (GNUNET_ERROR_TYPE_DEBUG,
492 "[OP %x] inserting %lx (hash %s) into ibf\n",
494 (unsigned long) ke->ibf_key.key_val,
495 GNUNET_h2s (&ke->element->element_hash));
496 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
497 ibf_insert (op->state->local_ibf, salted_key);
503 * Iterator for initializing the
504 * key-to-element mapping of a union operation
506 * @param cls the union operation `struct Operation *`
508 * @param value the `struct ElementEntry *` to insert
509 * into the key-to-element mapping
510 * @return #GNUNET_YES (to continue iterating)
513 init_key_to_element_iterator (void *cls,
514 const struct GNUNET_HashCode *key,
517 struct Operation *op = cls;
518 struct ElementEntry *ee = value;
520 /* make sure that the element belongs to the set at the time
521 * of creating the operation */
522 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
525 GNUNET_assert (GNUNET_NO == ee->remote);
527 op_register_element (op, ee);
533 * Create an ibf with the operation's elements
534 * of the specified size
536 * @param op the union operation
537 * @param size size of the ibf to create
538 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
541 prepare_ibf (struct Operation *op,
544 if (NULL == op->state->key_to_element)
548 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
549 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
550 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
551 init_key_to_element_iterator, op);
553 if (NULL != op->state->local_ibf)
554 ibf_destroy (op->state->local_ibf);
555 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
556 if (NULL == op->state->local_ibf)
558 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
559 "Failed to allocate local IBF\n");
560 return GNUNET_SYSERR;
562 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
563 &prepare_ibf_iterator,
570 * Send an ibf of appropriate size.
572 * Fragments the IBF into multiple messages if necessary.
574 * @param op the union operation
575 * @param ibf_order order of the ibf to send, size=2^order
576 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
579 send_ibf (struct Operation *op,
582 unsigned int buckets_sent = 0;
583 struct InvertibleBloomFilter *ibf;
586 prepare_ibf (op, 1<<ibf_order))
588 /* allocation failed */
589 return GNUNET_SYSERR;
592 LOG (GNUNET_ERROR_TYPE_DEBUG,
593 "sending ibf of size %u\n",
597 char name[64] = { 0 };
598 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
599 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
602 ibf = op->state->local_ibf;
604 while (buckets_sent < (1 << ibf_order))
606 unsigned int buckets_in_message;
607 struct GNUNET_MQ_Envelope *ev;
608 struct IBFMessage *msg;
610 buckets_in_message = (1 << ibf_order) - buckets_sent;
611 /* limit to maximum */
612 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
613 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
615 ev = GNUNET_MQ_msg_extra (msg,
616 buckets_in_message * IBF_BUCKET_SIZE,
617 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
620 msg->order = ibf_order;
621 msg->offset = htonl (buckets_sent);
622 msg->salt = htonl (op->state->salt_send);
623 ibf_write_slice (ibf, buckets_sent,
624 buckets_in_message, &msg[1]);
625 buckets_sent += buckets_in_message;
626 LOG (GNUNET_ERROR_TYPE_DEBUG,
627 "ibf chunk size %u, %u/%u sent\n",
631 GNUNET_MQ_send (op->mq, ev);
634 /* The other peer must decode the IBF, so
636 op->state->phase = PHASE_INVENTORY_PASSIVE;
642 * Send a strata estimator to the remote peer.
644 * @param op the union operation with the remote peer
647 send_strata_estimator (struct Operation *op)
649 const struct StrataEstimator *se = op->state->se;
650 struct GNUNET_MQ_Envelope *ev;
651 struct GNUNET_MessageHeader *strata_msg;
656 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
657 len = strata_estimator_write (op->state->se,
659 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
660 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
662 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
663 ev = GNUNET_MQ_msg_header_extra (strata_msg,
666 GNUNET_memcpy (&strata_msg[1],
670 GNUNET_MQ_send (op->mq,
672 op->state->phase = PHASE_EXPECT_IBF;
673 LOG (GNUNET_ERROR_TYPE_DEBUG,
674 "sent SE, expecting IBF\n");
679 * Compute the necessary order of an ibf
680 * from the size of the symmetric set difference.
682 * @param diff the difference
683 * @return the required size of the ibf
686 get_order_from_difference (unsigned int diff)
688 unsigned int ibf_order;
691 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
692 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
694 if (ibf_order > MAX_IBF_ORDER)
695 ibf_order = MAX_IBF_ORDER;
701 * Handle a strata estimator from a remote peer
703 * @param cls the union operation
704 * @param mh the message
705 * @param is_compressed #GNUNET_YES if the estimator is compressed
706 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
707 * #GNUNET_OK otherwise
710 handle_p2p_strata_estimator (void *cls,
711 const struct GNUNET_MessageHeader *mh,
714 struct Operation *op = cls;
715 struct StrataEstimator *remote_se;
719 GNUNET_STATISTICS_update (_GSS_statistics,
720 "# bytes of SE received",
724 if (op->state->phase != PHASE_EXPECT_SE)
726 fail_union_operation (op);
728 return GNUNET_SYSERR;
730 len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
731 if ( (GNUNET_NO == is_compressed) &&
732 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
734 fail_union_operation (op);
736 return GNUNET_SYSERR;
738 remote_se = strata_estimator_create (SE_STRATA_COUNT,
741 if (NULL == remote_se)
743 /* insufficient resources, fail */
744 fail_union_operation (op);
745 return GNUNET_SYSERR;
748 strata_estimator_read (&mh[1],
753 /* decompression failed */
754 fail_union_operation (op);
755 strata_estimator_destroy (remote_se);
756 return GNUNET_SYSERR;
758 GNUNET_assert (NULL != op->state->se);
759 diff = strata_estimator_difference (remote_se,
761 strata_estimator_destroy (remote_se);
762 strata_estimator_destroy (op->state->se);
763 op->state->se = NULL;
764 LOG (GNUNET_ERROR_TYPE_DEBUG,
765 "got se diff=%d, using ibf size %d\n",
767 1<<get_order_from_difference (diff));
770 get_order_from_difference (diff)))
772 /* Internal error, best we can do is shut the connection */
773 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
774 "Failed to send IBF, closing connection\n");
775 fail_union_operation (op);
776 return GNUNET_SYSERR;
783 * Iterator to send elements to a remote peer
785 * @param cls closure with the element key and the union operation
787 * @param value the key entry
790 send_offers_iterator (void *cls,
794 struct SendElementClosure *sec = cls;
795 struct Operation *op = sec->op;
796 struct KeyEntry *ke = value;
797 struct GNUNET_MQ_Envelope *ev;
798 struct GNUNET_MessageHeader *mh;
800 /* Detect 32-bit key collision for the 64-bit IBF keys. */
801 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
804 ev = GNUNET_MQ_msg_header_extra (mh,
805 sizeof (struct GNUNET_HashCode),
806 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
808 GNUNET_assert (NULL != ev);
809 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
810 LOG (GNUNET_ERROR_TYPE_DEBUG,
811 "[OP %x] sending element offer (%s) to peer\n",
813 GNUNET_h2s (&ke->element->element_hash));
814 GNUNET_MQ_send (op->mq, ev);
820 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
822 * @param op union operation
823 * @param ibf_key IBF key of interest
826 send_offers_for_key (struct Operation *op,
827 struct IBF_Key ibf_key)
829 struct SendElementClosure send_cls;
831 send_cls.ibf_key = ibf_key;
833 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
834 (uint32_t) ibf_key.key_val,
835 &send_offers_iterator,
841 * Decode which elements are missing on each side, and
842 * send the appropriate offers and inquiries.
844 * @param op union operation
845 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
848 decode_and_send (struct Operation *op)
851 struct IBF_Key last_key;
853 unsigned int num_decoded;
854 struct InvertibleBloomFilter *diff_ibf;
856 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
859 prepare_ibf (op, op->state->remote_ibf->size))
862 /* allocation failed */
863 return GNUNET_SYSERR;
865 diff_ibf = ibf_dup (op->state->local_ibf);
866 ibf_subtract (diff_ibf, op->state->remote_ibf);
868 ibf_destroy (op->state->remote_ibf);
869 op->state->remote_ibf = NULL;
871 LOG (GNUNET_ERROR_TYPE_DEBUG,
872 "decoding IBF (size=%u)\n",
876 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
881 int cycle_detected = GNUNET_NO;
885 res = ibf_decode (diff_ibf, &side, &key);
886 if (res == GNUNET_OK)
888 LOG (GNUNET_ERROR_TYPE_DEBUG,
889 "decoded ibf key %lx\n",
890 (unsigned long) key.key_val);
892 if ( (num_decoded > diff_ibf->size) ||
893 ( (num_decoded > 1) &&
894 (last_key.key_val == key.key_val) ) )
896 LOG (GNUNET_ERROR_TYPE_DEBUG,
897 "detected cyclic ibf (decoded %u/%u)\n",
900 cycle_detected = GNUNET_YES;
903 if ( (GNUNET_SYSERR == res) ||
904 (GNUNET_YES == cycle_detected) )
908 while (1<<next_order < diff_ibf->size)
911 if (next_order <= MAX_IBF_ORDER)
913 LOG (GNUNET_ERROR_TYPE_DEBUG,
914 "decoding failed, sending larger ibf (size %u)\n",
916 GNUNET_STATISTICS_update (_GSS_statistics,
920 op->state->salt_send++;
922 send_ibf (op, next_order))
924 /* Internal error, best we can do is shut the connection */
925 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
926 "Failed to send IBF, closing connection\n");
927 fail_union_operation (op);
928 ibf_destroy (diff_ibf);
929 return GNUNET_SYSERR;
934 GNUNET_STATISTICS_update (_GSS_statistics,
935 "# of failed union operations (too large)",
938 // XXX: Send the whole set, element-by-element
939 LOG (GNUNET_ERROR_TYPE_ERROR,
940 "set union failed: reached ibf limit\n");
941 fail_union_operation (op);
942 ibf_destroy (diff_ibf);
943 return GNUNET_SYSERR;
947 if (GNUNET_NO == res)
949 struct GNUNET_MQ_Envelope *ev;
951 LOG (GNUNET_ERROR_TYPE_DEBUG,
952 "transmitted all values, sending DONE\n");
953 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
954 GNUNET_MQ_send (op->mq, ev);
955 /* We now wait until we get a DONE message back
956 * and then wait for our MQ to be flushed and all our
957 * demands be delivered. */
962 struct IBF_Key unsalted_key;
963 unsalt_key (&key, op->state->salt_receive, &unsalted_key);
964 send_offers_for_key (op, unsalted_key);
968 struct GNUNET_MQ_Envelope *ev;
969 struct InquiryMessage *msg;
971 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
972 * the effort additional complexity. */
973 ev = GNUNET_MQ_msg_extra (msg,
974 sizeof (struct IBF_Key),
975 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
976 msg->salt = htonl (op->state->salt_receive);
977 GNUNET_memcpy (&msg[1],
979 sizeof (struct IBF_Key));
980 LOG (GNUNET_ERROR_TYPE_DEBUG,
981 "sending element inquiry for IBF key %lx\n",
982 (unsigned long) key.key_val);
983 GNUNET_MQ_send (op->mq, ev);
990 ibf_destroy (diff_ibf);
996 * Handle an IBF message from a remote peer.
998 * Reassemble the IBF from multiple pieces, and
999 * process the whole IBF once possible.
1001 * @param cls the union operation
1002 * @param mh the header of the message
1003 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1004 * #GNUNET_OK otherwise
1007 handle_p2p_ibf (void *cls,
1008 const struct GNUNET_MessageHeader *mh)
1010 struct Operation *op = cls;
1011 const struct IBFMessage *msg;
1012 unsigned int buckets_in_message;
1014 if (ntohs (mh->size) < sizeof (struct IBFMessage))
1016 GNUNET_break_op (0);
1017 fail_union_operation (op);
1018 return GNUNET_SYSERR;
1020 msg = (const struct IBFMessage *) mh;
1021 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1022 (op->state->phase == PHASE_EXPECT_IBF) )
1024 op->state->phase = PHASE_EXPECT_IBF_CONT;
1025 GNUNET_assert (NULL == op->state->remote_ibf);
1026 LOG (GNUNET_ERROR_TYPE_DEBUG,
1027 "Creating new ibf of size %u\n",
1029 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1030 op->state->salt_receive = ntohl (msg->salt);
1031 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1032 if (NULL == op->state->remote_ibf)
1034 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1035 "Failed to parse remote IBF, closing connection\n");
1036 fail_union_operation (op);
1037 return GNUNET_SYSERR;
1039 op->state->ibf_buckets_received = 0;
1040 if (0 != ntohl (msg->offset))
1042 GNUNET_break_op (0);
1043 fail_union_operation (op);
1044 return GNUNET_SYSERR;
1047 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1049 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1051 GNUNET_break_op (0);
1052 fail_union_operation (op);
1053 return GNUNET_SYSERR;
1055 if (1<<msg->order != op->state->remote_ibf->size)
1057 GNUNET_break_op (0);
1058 fail_union_operation (op);
1059 return GNUNET_SYSERR;
1061 if (ntohl (msg->salt) != op->state->salt_receive)
1063 GNUNET_break_op (0);
1064 fail_union_operation (op);
1065 return GNUNET_SYSERR;
1073 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1075 if (0 == buckets_in_message)
1077 GNUNET_break_op (0);
1078 fail_union_operation (op);
1079 return GNUNET_SYSERR;
1082 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1084 GNUNET_break_op (0);
1085 fail_union_operation (op);
1086 return GNUNET_SYSERR;
1089 GNUNET_assert (NULL != op->state->remote_ibf);
1091 ibf_read_slice (&msg[1],
1092 op->state->ibf_buckets_received,
1094 op->state->remote_ibf);
1095 op->state->ibf_buckets_received += buckets_in_message;
1097 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1099 LOG (GNUNET_ERROR_TYPE_DEBUG,
1100 "received full ibf\n");
1101 op->state->phase = PHASE_INVENTORY_ACTIVE;
1103 decode_and_send (op))
1105 /* Internal error, best we can do is shut down */
1106 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1107 "Failed to decode IBF, closing connection\n");
1108 return GNUNET_SYSERR;
1116 * Send a result message to the client indicating
1117 * that there is a new element.
1119 * @param op union operation
1120 * @param element element to send
1121 * @param status status to send with the new element
1124 send_client_element (struct Operation *op,
1125 struct GNUNET_SET_Element *element,
1128 struct GNUNET_MQ_Envelope *ev;
1129 struct GNUNET_SET_ResultMessage *rm;
1131 LOG (GNUNET_ERROR_TYPE_DEBUG,
1132 "sending element (size %u) to client\n",
1134 GNUNET_assert (0 != op->spec->client_request_id);
1135 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1138 GNUNET_MQ_discard (ev);
1142 rm->result_status = htons (status);
1143 rm->request_id = htonl (op->spec->client_request_id);
1144 rm->element_type = element->element_type;
1145 GNUNET_memcpy (&rm[1], element->data, element->size);
1146 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1151 * Signal to the client that the operation has finished and
1152 * destroy the operation.
1154 * @param cls operation to destroy
1157 send_done_and_destroy (void *cls)
1159 struct Operation *op = cls;
1160 struct GNUNET_MQ_Envelope *ev;
1161 struct GNUNET_SET_ResultMessage *rm;
1163 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1164 rm->request_id = htonl (op->spec->client_request_id);
1165 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1166 rm->element_type = htons (0);
1167 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1168 /* Will also call the union-specific cancel function. */
1169 _GSS_operation_destroy (op, GNUNET_YES);
1174 maybe_finish (struct Operation *op)
1176 unsigned int num_demanded;
1178 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1180 if (PHASE_FINISH_WAITING == op->state->phase)
1182 LOG (GNUNET_ERROR_TYPE_DEBUG,
1183 "In PHASE_FINISH_WAITING, pending %u demands\n",
1185 if (0 == num_demanded)
1187 struct GNUNET_MQ_Envelope *ev;
1189 op->state->phase = PHASE_DONE;
1190 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1191 GNUNET_MQ_send (op->mq, ev);
1193 /* We now wait until the other peer closes the channel
1194 * after it got all elements from us. */
1197 if (PHASE_FINISH_CLOSING == op->state->phase)
1199 LOG (GNUNET_ERROR_TYPE_DEBUG,
1200 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1202 if (0 == num_demanded)
1204 op->state->phase = PHASE_DONE;
1205 send_done_and_destroy (op);
1212 * Handle an element message from a remote peer.
1214 * @param cls the union operation
1215 * @param mh the message
1218 handle_p2p_elements (void *cls,
1219 const struct GNUNET_MessageHeader *mh)
1221 struct Operation *op = cls;
1222 struct ElementEntry *ee;
1223 const struct GNUNET_SET_ElementMessage *emsg;
1224 uint16_t element_size;
1226 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1228 GNUNET_break_op (0);
1229 fail_union_operation (op);
1232 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1234 GNUNET_break_op (0);
1235 fail_union_operation (op);
1239 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1241 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1242 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1243 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1244 ee->element.size = element_size;
1245 ee->element.data = &ee[1];
1246 ee->element.element_type = ntohs (emsg->element_type);
1247 ee->remote = GNUNET_YES;
1248 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1251 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1255 /* We got something we didn't demand, since it's not in our map. */
1256 GNUNET_break_op (0);
1258 fail_union_operation (op);
1262 LOG (GNUNET_ERROR_TYPE_DEBUG,
1263 "Got element (size %u, hash %s) from peer\n",
1264 (unsigned int) element_size,
1265 GNUNET_h2s (&ee->element_hash));
1267 GNUNET_STATISTICS_update (_GSS_statistics,
1268 "# received elements",
1271 GNUNET_STATISTICS_update (_GSS_statistics,
1272 "# exchanged elements",
1276 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1278 /* Got repeated element. Should not happen since
1279 * we track demands. */
1280 GNUNET_STATISTICS_update (_GSS_statistics,
1281 "# repeated elements",
1288 LOG (GNUNET_ERROR_TYPE_DEBUG,
1289 "Registering new element from remote peer\n");
1290 op_register_element (op, ee);
1291 /* only send results immediately if the client wants it */
1292 switch (op->spec->result_mode)
1294 case GNUNET_SET_RESULT_ADDED:
1295 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1297 case GNUNET_SET_RESULT_SYMMETRIC:
1298 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1301 /* Result mode not supported, should have been caught earlier. */
1312 * Send offers (for GNUNET_Hash-es) in response
1313 * to inquiries (for IBF_Key-s).
1315 * @param cls the union operation
1316 * @param mh the message
1319 handle_p2p_inquiry (void *cls,
1320 const struct GNUNET_MessageHeader *mh)
1322 struct Operation *op = cls;
1323 const struct IBF_Key *ibf_key;
1324 unsigned int num_keys;
1325 struct InquiryMessage *msg;
1327 /* look up elements and send them */
1328 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1330 GNUNET_break_op (0);
1331 fail_union_operation (op);
1334 num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1335 / sizeof (struct IBF_Key);
1336 if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1337 != num_keys * sizeof (struct IBF_Key))
1339 GNUNET_break_op (0);
1340 fail_union_operation (op);
1344 msg = (struct InquiryMessage *) mh;
1346 ibf_key = (const struct IBF_Key *) &msg[1];
1347 while (0 != num_keys--)
1349 struct IBF_Key unsalted_key;
1350 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1351 send_offers_for_key (op, unsalted_key);
1361 handle_p2p_demand (void *cls,
1362 const struct GNUNET_MessageHeader *mh)
1364 struct Operation *op = cls;
1365 struct ElementEntry *ee;
1366 struct GNUNET_SET_ElementMessage *emsg;
1367 const struct GNUNET_HashCode *hash;
1368 unsigned int num_hashes;
1369 struct GNUNET_MQ_Envelope *ev;
1371 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1372 / sizeof (struct GNUNET_HashCode);
1373 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1374 != num_hashes * sizeof (struct GNUNET_HashCode))
1376 GNUNET_break_op (0);
1377 fail_union_operation (op);
1381 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1383 hash++, num_hashes--)
1385 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1388 /* Demand for non-existing element. */
1389 GNUNET_break_op (0);
1390 fail_union_operation (op);
1393 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1395 /* Probably confused lazily copied sets. */
1396 GNUNET_break_op (0);
1397 fail_union_operation (op);
1400 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1401 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1402 emsg->reserved = htons (0);
1403 emsg->element_type = htons (ee->element.element_type);
1404 LOG (GNUNET_ERROR_TYPE_DEBUG,
1405 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1407 (unsigned int) ee->element.size,
1408 GNUNET_h2s (&ee->element_hash));
1409 GNUNET_MQ_send (op->mq, ev);
1410 GNUNET_STATISTICS_update (_GSS_statistics,
1411 "# exchanged elements",
1415 switch (op->spec->result_mode)
1417 case GNUNET_SET_RESULT_ADDED:
1418 /* Nothing to do. */
1420 case GNUNET_SET_RESULT_SYMMETRIC:
1421 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1424 /* Result mode not supported, should have been caught earlier. */
1433 * Handle offers (of GNUNET_HashCode-s) and
1434 * respond with demands (of GNUNET_HashCode-s).
1436 * @param cls the union operation
1437 * @param mh the message
1440 handle_p2p_offer (void *cls,
1441 const struct GNUNET_MessageHeader *mh)
1443 struct Operation *op = cls;
1444 const struct GNUNET_HashCode *hash;
1445 unsigned int num_hashes;
1447 /* look up elements and send them */
1448 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1449 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1451 GNUNET_break_op (0);
1452 fail_union_operation (op);
1455 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1456 / sizeof (struct GNUNET_HashCode);
1457 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1458 != num_hashes * sizeof (struct GNUNET_HashCode))
1460 GNUNET_break_op (0);
1461 fail_union_operation (op);
1465 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1467 hash++, num_hashes--)
1469 struct ElementEntry *ee;
1470 struct GNUNET_MessageHeader *demands;
1471 struct GNUNET_MQ_Envelope *ev;
1473 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1476 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1480 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1483 LOG (GNUNET_ERROR_TYPE_DEBUG,
1484 "Skipped sending duplicate demand\n");
1488 GNUNET_assert (GNUNET_OK ==
1489 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1492 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1494 LOG (GNUNET_ERROR_TYPE_DEBUG,
1495 "[OP %x] Requesting element (hash %s)\n",
1496 (void *) op, GNUNET_h2s (hash));
1497 ev = GNUNET_MQ_msg_header_extra (demands,
1498 sizeof (struct GNUNET_HashCode),
1499 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1500 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1501 GNUNET_MQ_send (op->mq, ev);
1507 * Handle a done message from a remote peer
1509 * @param cls the union operation
1510 * @param mh the message
1513 handle_p2p_done (void *cls,
1514 const struct GNUNET_MessageHeader *mh)
1516 struct Operation *op = cls;
1518 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1520 /* We got all requests, but still have to send our elements in response. */
1522 op->state->phase = PHASE_FINISH_WAITING;
1524 LOG (GNUNET_ERROR_TYPE_DEBUG,
1525 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1526 /* The active peer is done sending offers
1527 * and inquiries. This means that all
1528 * our responses to that (demands and offers)
1529 * must be in flight (queued or in mesh).
1531 * We should notify the active peer once
1532 * all our demands are satisfied, so that the active
1533 * peer can quit if we gave him everything.
1538 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1540 LOG (GNUNET_ERROR_TYPE_DEBUG,
1541 "got DONE (as active partner), waiting to finish\n");
1542 /* All demands of the other peer are satisfied,
1543 * and we processed all offers, thus we know
1544 * exactly what our demands must be.
1546 * We'll close the channel
1547 * to the other peer once our demands are met.
1549 op->state->phase = PHASE_FINISH_CLOSING;
1553 GNUNET_break_op (0);
1554 fail_union_operation (op);
1559 * Initiate operation to evaluate a set union with a remote peer.
1561 * @param op operation to perform (to be initialized)
1562 * @param opaque_context message to be transmitted to the listener
1563 * to convince him to accept, may be NULL
1566 union_evaluate (struct Operation *op,
1567 const struct GNUNET_MessageHeader *opaque_context)
1569 struct GNUNET_MQ_Envelope *ev;
1570 struct OperationRequestMessage *msg;
1572 GNUNET_assert (NULL == op->state);
1573 op->state = GNUNET_new (struct OperationState);
1574 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1575 /* copy the current generation's strata estimator for this operation */
1576 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1577 /* we started the operation, thus we have to send the operation request */
1578 op->state->phase = PHASE_EXPECT_SE;
1579 op->state->salt_receive = op->state->salt_send = 42;
1580 LOG (GNUNET_ERROR_TYPE_DEBUG,
1581 "Initiating union operation evaluation\n");
1582 GNUNET_STATISTICS_update (_GSS_statistics,
1583 "# of total union operations",
1586 GNUNET_STATISTICS_update (_GSS_statistics,
1587 "# of initiated union operations",
1590 ev = GNUNET_MQ_msg_nested_mh (msg,
1591 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1595 /* the context message is too large */
1597 GNUNET_SERVER_client_disconnect (op->spec->set->client);
1600 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1601 GNUNET_MQ_send (op->mq,
1604 if (NULL != opaque_context)
1605 LOG (GNUNET_ERROR_TYPE_DEBUG,
1606 "sent op request with context message\n");
1608 LOG (GNUNET_ERROR_TYPE_DEBUG,
1609 "sent op request without context message\n");
1614 * Accept an union operation request from a remote peer.
1615 * Only initializes the private operation state.
1617 * @param op operation that will be accepted as a union operation
1620 union_accept (struct Operation *op)
1622 LOG (GNUNET_ERROR_TYPE_DEBUG,
1623 "accepting set union operation\n");
1624 GNUNET_assert (NULL == op->state);
1626 GNUNET_STATISTICS_update (_GSS_statistics,
1627 "# of accepted union operations",
1630 GNUNET_STATISTICS_update (_GSS_statistics,
1631 "# of total union operations",
1635 op->state = GNUNET_new (struct OperationState);
1636 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1637 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1638 op->state->salt_receive = op->state->salt_send = 42;
1639 /* kick off the operation */
1640 send_strata_estimator (op);
1645 * Create a new set supporting the union operation
1647 * We maintain one strata estimator per set and then manipulate it over the
1648 * lifetime of the set, as recreating a strata estimator would be expensive.
1650 * @return the newly created set, NULL on error
1652 static struct SetState *
1653 union_set_create (void)
1655 struct SetState *set_state;
1657 LOG (GNUNET_ERROR_TYPE_DEBUG,
1658 "union set created\n");
1659 set_state = GNUNET_new (struct SetState);
1660 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1661 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1662 if (NULL == set_state->se)
1664 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1665 "Failed to allocate strata estimator\n");
1666 GNUNET_free (set_state);
1674 * Add the element from the given element message to the set.
1676 * @param set_state state of the set want to add to
1677 * @param ee the element to add to the set
1680 union_add (struct SetState *set_state, struct ElementEntry *ee)
1682 strata_estimator_insert (set_state->se,
1683 get_ibf_key (&ee->element_hash));
1688 * Remove the element given in the element message from the set.
1689 * Only marks the element as removed, so that older set operations can still exchange it.
1691 * @param set_state state of the set to remove from
1692 * @param ee set element to remove
1695 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1697 strata_estimator_remove (set_state->se,
1698 get_ibf_key (&ee->element_hash));
1703 * Destroy a set that supports the union operation.
1705 * @param set_state the set to destroy
1708 union_set_destroy (struct SetState *set_state)
1710 if (NULL != set_state->se)
1712 strata_estimator_destroy (set_state->se);
1713 set_state->se = NULL;
1715 GNUNET_free (set_state);
1720 * Dispatch messages for a union operation.
1722 * @param op the state of the union evaluate operation
1723 * @param mh the received message
1724 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1725 * #GNUNET_OK otherwise
1728 union_handle_p2p_message (struct Operation *op,
1729 const struct GNUNET_MessageHeader *mh)
1731 //LOG (GNUNET_ERROR_TYPE_DEBUG,
1732 // "received p2p message (t: %u, s: %u)\n",
1733 // ntohs (mh->type),
1734 // ntohs (mh->size));
1735 switch (ntohs (mh->type))
1737 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1738 return handle_p2p_ibf (op, mh);
1739 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1740 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1741 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1742 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1743 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1744 handle_p2p_elements (op, mh);
1746 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1747 handle_p2p_inquiry (op, mh);
1749 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1750 handle_p2p_done (op, mh);
1752 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1753 handle_p2p_offer (op, mh);
1755 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1756 handle_p2p_demand (op, mh);
1759 /* Something wrong with cadet's message handlers? */
1767 * Handler for peer-disconnects, notifies the client
1768 * about the aborted operation in case the op was not concluded.
1770 * @param op the destroyed operation
1773 union_peer_disconnect (struct Operation *op)
1775 if (PHASE_DONE != op->state->phase)
1777 struct GNUNET_MQ_Envelope *ev;
1778 struct GNUNET_SET_ResultMessage *msg;
1780 ev = GNUNET_MQ_msg (msg,
1781 GNUNET_MESSAGE_TYPE_SET_RESULT);
1782 msg->request_id = htonl (op->spec->client_request_id);
1783 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1784 msg->element_type = htons (0);
1785 GNUNET_MQ_send (op->spec->set->client_mq,
1787 LOG (GNUNET_ERROR_TYPE_WARNING,
1788 "other peer disconnected prematurely, phase %u\n",
1790 _GSS_operation_destroy (op,
1794 // else: the session has already been concluded
1795 LOG (GNUNET_ERROR_TYPE_DEBUG,
1796 "other peer disconnected (finished)\n");
1797 if (GNUNET_NO == op->state->client_done_sent)
1798 send_done_and_destroy (op);
1803 * Copy union-specific set state.
1805 * @param set source set for copying the union state
1806 * @return a copy of the union-specific set state
1808 static struct SetState *
1809 union_copy_state (struct Set *set)
1811 struct SetState *new_state;
1813 new_state = GNUNET_new (struct SetState);
1814 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1815 new_state->se = strata_estimator_dup (set->state->se);
1822 * Get the table with implementing functions for
1825 * @return the operation specific VTable
1827 const struct SetVT *
1830 static const struct SetVT union_vt = {
1831 .create = &union_set_create,
1832 .msg_handler = &union_handle_p2p_message,
1834 .remove = &union_remove,
1835 .destroy_set = &union_set_destroy,
1836 .evaluate = &union_evaluate,
1837 .accept = &union_accept,
1838 .peer_disconnect = &union_peer_disconnect,
1839 .cancel = &union_op_cancel,
1840 .copy_state = &union_copy_state,