2 This file is part of GNUnet
3 Copyright (C) 2013-2015 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
23 * @brief two-peer set operations
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
40 * Number of IBFs in a strata estimator.
42 #define SE_STRATA_COUNT 32
45 * Size of the IBFs in the strata estimator.
47 #define SE_IBF_SIZE 80
50 * The hash num parameter for the difference digests and strata estimators.
52 #define SE_IBF_HASH_NUM 4
55 * Number of buckets that can be transmitted in one message.
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
60 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61 * Choose this value so that computing the IBF is still cheaper
62 * than transmitting all values.
64 #define MAX_IBF_ORDER (20)
67 * Number of buckets used in the ibf per estimated
74 * Current phase we are in for a union operation.
76 enum UnionOperationPhase
79 * We sent the request message, and expect a strata estimator.
84 * We sent the strata estimator, and expect an IBF. This phase is entered once
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87 * XXX: could use better wording.
89 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
94 * Continuation for multi part IBFs.
96 PHASE_EXPECT_IBF_CONT,
99 * We are decoding an IBF.
101 PHASE_INVENTORY_ACTIVE,
104 * The other peer is decoding the IBF we just sent.
106 PHASE_INVENTORY_PASSIVE,
109 * The protocol is almost finished, but we still have to flush our message
110 * queue and/or expect some elements.
112 PHASE_FINISH_CLOSING,
115 * In the penultimate phase,
116 * we wait until all our demands
117 * are satisfied. Then we send a done
118 * message, and wait for another done message.*/
119 PHASE_FINISH_WAITING,
122 * In the ultimate phase, we wait until
123 * our demands are satisfied and then
124 * quit (sending another DONE message). */
130 * State of an evaluate operation with another peer.
132 struct OperationState
135 * Copy of the set's strata estimator at the time of
136 * creation of this operation.
138 struct StrataEstimator *se;
141 * The IBF we currently receive.
143 struct InvertibleBloomFilter *remote_ibf;
146 * The IBF with the local set's element.
148 struct InvertibleBloomFilter *local_ibf;
151 * Maps IBF-Keys (specific to the current salt) to elements.
152 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
153 * Colliding IBF-Keys are linked.
155 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
158 * Current state of the operation.
160 enum UnionOperationPhase phase;
163 * Did we send the client that we are done?
165 int client_done_sent;
168 * Number of ibf buckets already received into the @a remote_ibf.
170 unsigned int ibf_buckets_received;
173 * Hashes for elements that we have demanded from the other peer.
175 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
178 * Salt that we're using for sending IBFs
183 * Salt for the IBF we've received and that we're currently decoding.
185 uint32_t salt_receive;
190 * The key entry is used to associate an ibf key with an element.
195 * IBF key for the entry, derived from the current salt.
197 struct IBF_Key ibf_key;
200 * The actual element associated with the key.
202 * Only owned by the union operation if element->operation
205 struct ElementEntry *element;
210 * Used as a closure for sending elements
211 * with a specific IBF key.
213 struct SendElementClosure
216 * The IBF key whose matching elements should be
219 struct IBF_Key ibf_key;
222 * Operation for which the elements
225 struct Operation *op;
230 * Extra state required for efficient set union.
235 * The strata estimator is only generated once for
237 * The IBF keys are derived from the element hashes with
240 struct StrataEstimator *se;
245 * Iterator over hash map entries, called to
246 * destroy the linked list of colliding ibf key entries.
249 * @param key current key code
250 * @param value value in the hash map
251 * @return #GNUNET_YES if we should continue to iterate,
255 destroy_key_to_element_iter (void *cls,
259 struct KeyEntry *k = value;
261 GNUNET_assert (NULL != k);
262 if (GNUNET_YES == k->element->remote)
264 GNUNET_free (k->element);
273 * Destroy the union operation. Only things specific to the union
274 * operation are destroyed.
276 * @param op union operation to destroy
279 union_op_cancel (struct Operation *op)
281 LOG (GNUNET_ERROR_TYPE_DEBUG,
282 "destroying union op\n");
283 /* check if the op was canceled twice */
284 GNUNET_assert (NULL != op->state);
285 if (NULL != op->state->remote_ibf)
287 ibf_destroy (op->state->remote_ibf);
288 op->state->remote_ibf = NULL;
290 if (NULL != op->state->demanded_hashes)
292 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
293 op->state->demanded_hashes = NULL;
295 if (NULL != op->state->local_ibf)
297 ibf_destroy (op->state->local_ibf);
298 op->state->local_ibf = NULL;
300 if (NULL != op->state->se)
302 strata_estimator_destroy (op->state->se);
303 op->state->se = NULL;
305 if (NULL != op->state->key_to_element)
307 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
308 &destroy_key_to_element_iter,
310 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
311 op->state->key_to_element = NULL;
313 GNUNET_free (op->state);
315 LOG (GNUNET_ERROR_TYPE_DEBUG,
316 "destroying union op done\n");
321 * Inform the client that the union operation has failed,
322 * and proceed to destroy the evaluate operation.
324 * @param op the union operation to fail
327 fail_union_operation (struct Operation *op)
329 struct GNUNET_MQ_Envelope *ev;
330 struct GNUNET_SET_ResultMessage *msg;
332 LOG (GNUNET_ERROR_TYPE_ERROR,
333 "union operation failed\n");
334 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
335 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
336 msg->request_id = htonl (op->spec->client_request_id);
337 msg->element_type = htons (0);
338 GNUNET_MQ_send (op->spec->set->client_mq, ev);
339 _GSS_operation_destroy (op, GNUNET_YES);
344 * Derive the IBF key from a hash code and
347 * @param src the hash code
348 * @return the derived IBF key
350 static struct IBF_Key
351 get_ibf_key (const struct GNUNET_HashCode *src)
356 GNUNET_CRYPTO_kdf (&key, sizeof (key),
358 &salt, sizeof (salt),
365 * Iterator over the mapping from IBF keys to element entries. Checks if we
366 * have an element with a given GNUNET_HashCode.
369 * @param key current key code
370 * @param value value in the hash map
371 * @return #GNUNET_YES if we should search further,
372 * #GNUNET_NO if we've found the element.
375 op_has_element_iterator (void *cls,
379 struct GNUNET_HashCode *element_hash = cls;
380 struct KeyEntry *k = value;
382 GNUNET_assert (NULL != k);
383 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
391 * Determine whether the given element is already in the operation's element
394 * @param op operation that should be tested for 'element_hash'
395 * @param element_hash hash of the element to look for
396 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
399 op_has_element (struct Operation *op,
400 const struct GNUNET_HashCode *element_hash)
403 struct IBF_Key ibf_key;
405 ibf_key = get_ibf_key (element_hash);
406 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
407 (uint32_t) ibf_key.key_val,
408 op_has_element_iterator,
409 (void *) element_hash);
411 /* was the iteration aborted because we found the element? */
412 if (GNUNET_SYSERR == ret)
419 * Insert an element into the union operation's
420 * key-to-element mapping. Takes ownership of 'ee'.
421 * Note that this does not insert the element in the set,
422 * only in the operation's key-element mapping.
423 * This is done to speed up re-tried operations, if some elements
424 * were transmitted, and then the IBF fails to decode.
426 * XXX: clarify ownership, doesn't sound right.
428 * @param op the union operation
429 * @param ee the element entry
432 op_register_element (struct Operation *op,
433 struct ElementEntry *ee)
435 struct IBF_Key ibf_key;
438 ibf_key = get_ibf_key (&ee->element_hash);
439 k = GNUNET_new (struct KeyEntry);
441 k->ibf_key = ibf_key;
442 GNUNET_assert (GNUNET_OK ==
443 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
444 (uint32_t) ibf_key.key_val,
446 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
451 salt_key (const struct IBF_Key *k_in,
453 struct IBF_Key *k_out)
456 uint64_t x = k_in->key_val;
458 x = (x >> s) | (x << (64 - s));
464 unsalt_key (const struct IBF_Key *k_in,
466 struct IBF_Key *k_out)
469 uint64_t x = k_in->key_val;
470 x = (x << s) | (x >> (64 - s));
476 * Insert a key into an ibf.
480 * @param value the key entry to get the key from
483 prepare_ibf_iterator (void *cls,
487 struct Operation *op = cls;
488 struct KeyEntry *ke = value;
489 struct IBF_Key salted_key;
491 LOG (GNUNET_ERROR_TYPE_DEBUG,
492 "[OP %x] inserting %lx (hash %s) into ibf\n",
494 (unsigned long) ke->ibf_key.key_val,
495 GNUNET_h2s (&ke->element->element_hash));
496 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
497 ibf_insert (op->state->local_ibf, salted_key);
503 * Iterator for initializing the
504 * key-to-element mapping of a union operation
506 * @param cls the union operation `struct Operation *`
508 * @param value the `struct ElementEntry *` to insert
509 * into the key-to-element mapping
510 * @return #GNUNET_YES (to continue iterating)
513 init_key_to_element_iterator (void *cls,
514 const struct GNUNET_HashCode *key,
517 struct Operation *op = cls;
518 struct ElementEntry *ee = value;
520 /* make sure that the element belongs to the set at the time
521 * of creating the operation */
522 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
525 GNUNET_assert (GNUNET_NO == ee->remote);
527 op_register_element (op, ee);
533 * Create an ibf with the operation's elements
534 * of the specified size
536 * @param op the union operation
537 * @param size size of the ibf to create
538 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
541 prepare_ibf (struct Operation *op,
544 if (NULL == op->state->key_to_element)
548 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
549 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
550 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
551 init_key_to_element_iterator, op);
553 if (NULL != op->state->local_ibf)
554 ibf_destroy (op->state->local_ibf);
555 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
556 if (NULL == op->state->local_ibf)
558 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
559 "Failed to allocate local IBF\n");
560 return GNUNET_SYSERR;
562 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
563 &prepare_ibf_iterator,
570 * Send an ibf of appropriate size.
572 * Fragments the IBF into multiple messages if necessary.
574 * @param op the union operation
575 * @param ibf_order order of the ibf to send, size=2^order
576 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
579 send_ibf (struct Operation *op,
582 unsigned int buckets_sent = 0;
583 struct InvertibleBloomFilter *ibf;
586 prepare_ibf (op, 1<<ibf_order))
588 /* allocation failed */
589 return GNUNET_SYSERR;
592 LOG (GNUNET_ERROR_TYPE_DEBUG,
593 "sending ibf of size %u\n",
597 char name[64] = { 0 };
598 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
599 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
602 ibf = op->state->local_ibf;
604 while (buckets_sent < (1 << ibf_order))
606 unsigned int buckets_in_message;
607 struct GNUNET_MQ_Envelope *ev;
608 struct IBFMessage *msg;
610 buckets_in_message = (1 << ibf_order) - buckets_sent;
611 /* limit to maximum */
612 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
613 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
615 ev = GNUNET_MQ_msg_extra (msg,
616 buckets_in_message * IBF_BUCKET_SIZE,
617 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
620 msg->order = ibf_order;
621 msg->offset = htonl (buckets_sent);
622 msg->salt = htonl (op->state->salt_send);
623 ibf_write_slice (ibf, buckets_sent,
624 buckets_in_message, &msg[1]);
625 buckets_sent += buckets_in_message;
626 LOG (GNUNET_ERROR_TYPE_DEBUG,
627 "ibf chunk size %u, %u/%u sent\n",
631 GNUNET_MQ_send (op->mq, ev);
634 /* The other peer must decode the IBF, so
636 op->state->phase = PHASE_INVENTORY_PASSIVE;
642 * Send a strata estimator to the remote peer.
644 * @param op the union operation with the remote peer
647 send_strata_estimator (struct Operation *op)
649 const struct StrataEstimator *se = op->state->se;
650 struct GNUNET_MQ_Envelope *ev;
651 struct GNUNET_MessageHeader *strata_msg;
656 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
657 len = strata_estimator_write (op->state->se,
659 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
660 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
662 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
663 ev = GNUNET_MQ_msg_header_extra (strata_msg,
666 GNUNET_memcpy (&strata_msg[1],
670 GNUNET_MQ_send (op->mq,
672 op->state->phase = PHASE_EXPECT_IBF;
673 LOG (GNUNET_ERROR_TYPE_DEBUG,
674 "sent SE, expecting IBF\n");
679 * Compute the necessary order of an ibf
680 * from the size of the symmetric set difference.
682 * @param diff the difference
683 * @return the required size of the ibf
686 get_order_from_difference (unsigned int diff)
688 unsigned int ibf_order;
691 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
692 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
694 if (ibf_order > MAX_IBF_ORDER)
695 ibf_order = MAX_IBF_ORDER;
701 * Handle a strata estimator from a remote peer
703 * @param cls the union operation
704 * @param mh the message
705 * @param is_compressed #GNUNET_YES if the estimator is compressed
706 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
707 * #GNUNET_OK otherwise
710 handle_p2p_strata_estimator (void *cls,
711 const struct GNUNET_MessageHeader *mh,
714 struct Operation *op = cls;
715 struct StrataEstimator *remote_se;
719 GNUNET_STATISTICS_update (_GSS_statistics,
720 "# bytes of SE received",
724 if (op->state->phase != PHASE_EXPECT_SE)
726 fail_union_operation (op);
728 return GNUNET_SYSERR;
730 len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
731 if ( (GNUNET_NO == is_compressed) &&
732 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
734 fail_union_operation (op);
736 return GNUNET_SYSERR;
738 remote_se = strata_estimator_create (SE_STRATA_COUNT,
741 if (NULL == remote_se)
743 /* insufficient resources, fail */
744 fail_union_operation (op);
745 return GNUNET_SYSERR;
748 strata_estimator_read (&mh[1],
753 /* decompression failed */
754 fail_union_operation (op);
755 strata_estimator_destroy (remote_se);
756 return GNUNET_SYSERR;
758 GNUNET_assert (NULL != op->state->se);
759 diff = strata_estimator_difference (remote_se,
761 strata_estimator_destroy (remote_se);
762 strata_estimator_destroy (op->state->se);
763 op->state->se = NULL;
764 LOG (GNUNET_ERROR_TYPE_DEBUG,
765 "got se diff=%d, using ibf size %d\n",
767 1<<get_order_from_difference (diff));
770 get_order_from_difference (diff)))
772 /* Internal error, best we can do is shut the connection */
773 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
774 "Failed to send IBF, closing connection\n");
775 fail_union_operation (op);
776 return GNUNET_SYSERR;
783 * Iterator to send elements to a remote peer
785 * @param cls closure with the element key and the union operation
787 * @param value the key entry
790 send_offers_iterator (void *cls,
794 struct SendElementClosure *sec = cls;
795 struct Operation *op = sec->op;
796 struct KeyEntry *ke = value;
797 struct GNUNET_MQ_Envelope *ev;
798 struct GNUNET_MessageHeader *mh;
800 /* Detect 32-bit key collision for the 64-bit IBF keys. */
801 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
804 ev = GNUNET_MQ_msg_header_extra (mh,
805 sizeof (struct GNUNET_HashCode),
806 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
808 GNUNET_assert (NULL != ev);
809 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
810 LOG (GNUNET_ERROR_TYPE_DEBUG,
811 "[OP %x] sending element offer (%s) to peer\n",
813 GNUNET_h2s (&ke->element->element_hash));
814 GNUNET_MQ_send (op->mq, ev);
820 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
822 * @param op union operation
823 * @param ibf_key IBF key of interest
826 send_offers_for_key (struct Operation *op,
827 struct IBF_Key ibf_key)
829 struct SendElementClosure send_cls;
831 send_cls.ibf_key = ibf_key;
833 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
834 (uint32_t) ibf_key.key_val,
835 &send_offers_iterator,
841 * Decode which elements are missing on each side, and
842 * send the appropriate offers and inquiries.
844 * @param op union operation
845 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
848 decode_and_send (struct Operation *op)
851 struct IBF_Key last_key;
853 unsigned int num_decoded;
854 struct InvertibleBloomFilter *diff_ibf;
856 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
859 prepare_ibf (op, op->state->remote_ibf->size))
862 /* allocation failed */
863 return GNUNET_SYSERR;
865 diff_ibf = ibf_dup (op->state->local_ibf);
866 ibf_subtract (diff_ibf, op->state->remote_ibf);
868 ibf_destroy (op->state->remote_ibf);
869 op->state->remote_ibf = NULL;
871 LOG (GNUNET_ERROR_TYPE_DEBUG,
872 "decoding IBF (size=%u)\n",
876 last_key.key_val = 0;
881 int cycle_detected = GNUNET_NO;
885 res = ibf_decode (diff_ibf, &side, &key);
886 if (res == GNUNET_OK)
888 LOG (GNUNET_ERROR_TYPE_DEBUG,
889 "decoded ibf key %lx\n",
890 (unsigned long) key.key_val);
892 if ( (num_decoded > diff_ibf->size) ||
893 (num_decoded > 1 && last_key.key_val == key.key_val) )
895 LOG (GNUNET_ERROR_TYPE_DEBUG,
896 "detected cyclic ibf (decoded %u/%u)\n",
899 cycle_detected = GNUNET_YES;
902 if ( (GNUNET_SYSERR == res) ||
903 (GNUNET_YES == cycle_detected) )
907 while (1<<next_order < diff_ibf->size)
910 if (next_order <= MAX_IBF_ORDER)
912 LOG (GNUNET_ERROR_TYPE_DEBUG,
913 "decoding failed, sending larger ibf (size %u)\n",
915 GNUNET_STATISTICS_update (_GSS_statistics,
919 op->state->salt_send++;
921 send_ibf (op, next_order))
923 /* Internal error, best we can do is shut the connection */
924 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
925 "Failed to send IBF, closing connection\n");
926 fail_union_operation (op);
927 ibf_destroy (diff_ibf);
928 return GNUNET_SYSERR;
933 GNUNET_STATISTICS_update (_GSS_statistics,
934 "# of failed union operations (too large)",
937 // XXX: Send the whole set, element-by-element
938 LOG (GNUNET_ERROR_TYPE_ERROR,
939 "set union failed: reached ibf limit\n");
940 fail_union_operation (op);
941 ibf_destroy (diff_ibf);
942 return GNUNET_SYSERR;
946 if (GNUNET_NO == res)
948 struct GNUNET_MQ_Envelope *ev;
950 LOG (GNUNET_ERROR_TYPE_DEBUG,
951 "transmitted all values, sending DONE\n");
952 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
953 GNUNET_MQ_send (op->mq, ev);
954 /* We now wait until we get a DONE message back
955 * and then wait for our MQ to be flushed and all our
956 * demands be delivered. */
961 struct IBF_Key unsalted_key;
962 unsalt_key (&key, op->state->salt_receive, &unsalted_key);
963 send_offers_for_key (op, unsalted_key);
967 struct GNUNET_MQ_Envelope *ev;
968 struct InquiryMessage *msg;
970 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
971 * the effort additional complexity. */
972 ev = GNUNET_MQ_msg_extra (msg,
973 sizeof (struct IBF_Key),
974 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
975 msg->salt = htonl (op->state->salt_receive);
976 GNUNET_memcpy (&msg[1],
978 sizeof (struct IBF_Key));
979 LOG (GNUNET_ERROR_TYPE_DEBUG,
980 "sending element inquiry for IBF key %lx\n",
981 (unsigned long) key.key_val);
982 GNUNET_MQ_send (op->mq, ev);
989 ibf_destroy (diff_ibf);
995 * Handle an IBF message from a remote peer.
997 * Reassemble the IBF from multiple pieces, and
998 * process the whole IBF once possible.
1000 * @param cls the union operation
1001 * @param mh the header of the message
1002 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1003 * #GNUNET_OK otherwise
1006 handle_p2p_ibf (void *cls,
1007 const struct GNUNET_MessageHeader *mh)
1009 struct Operation *op = cls;
1010 const struct IBFMessage *msg;
1011 unsigned int buckets_in_message;
1013 if (ntohs (mh->size) < sizeof (struct IBFMessage))
1015 GNUNET_break_op (0);
1016 fail_union_operation (op);
1017 return GNUNET_SYSERR;
1019 msg = (const struct IBFMessage *) mh;
1020 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1021 (op->state->phase == PHASE_EXPECT_IBF) )
1023 op->state->phase = PHASE_EXPECT_IBF_CONT;
1024 GNUNET_assert (NULL == op->state->remote_ibf);
1025 LOG (GNUNET_ERROR_TYPE_DEBUG,
1026 "Creating new ibf of size %u\n",
1028 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1029 op->state->salt_receive = ntohl (msg->salt);
1030 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1031 if (NULL == op->state->remote_ibf)
1033 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1034 "Failed to parse remote IBF, closing connection\n");
1035 fail_union_operation (op);
1036 return GNUNET_SYSERR;
1038 op->state->ibf_buckets_received = 0;
1039 if (0 != ntohl (msg->offset))
1041 GNUNET_break_op (0);
1042 fail_union_operation (op);
1043 return GNUNET_SYSERR;
1046 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1048 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1050 GNUNET_break_op (0);
1051 fail_union_operation (op);
1052 return GNUNET_SYSERR;
1054 if (1<<msg->order != op->state->remote_ibf->size)
1056 GNUNET_break_op (0);
1057 fail_union_operation (op);
1058 return GNUNET_SYSERR;
1060 if (ntohl (msg->salt) != op->state->salt_receive)
1062 GNUNET_break_op (0);
1063 fail_union_operation (op);
1064 return GNUNET_SYSERR;
1072 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1074 if (0 == buckets_in_message)
1076 GNUNET_break_op (0);
1077 fail_union_operation (op);
1078 return GNUNET_SYSERR;
1081 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1083 GNUNET_break_op (0);
1084 fail_union_operation (op);
1085 return GNUNET_SYSERR;
1088 GNUNET_assert (NULL != op->state->remote_ibf);
1090 ibf_read_slice (&msg[1],
1091 op->state->ibf_buckets_received,
1093 op->state->remote_ibf);
1094 op->state->ibf_buckets_received += buckets_in_message;
1096 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1098 LOG (GNUNET_ERROR_TYPE_DEBUG,
1099 "received full ibf\n");
1100 op->state->phase = PHASE_INVENTORY_ACTIVE;
1102 decode_and_send (op))
1104 /* Internal error, best we can do is shut down */
1105 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1106 "Failed to decode IBF, closing connection\n");
1107 return GNUNET_SYSERR;
1115 * Send a result message to the client indicating
1116 * that there is a new element.
1118 * @param op union operation
1119 * @param element element to send
1120 * @param status status to send with the new element
1123 send_client_element (struct Operation *op,
1124 struct GNUNET_SET_Element *element,
1127 struct GNUNET_MQ_Envelope *ev;
1128 struct GNUNET_SET_ResultMessage *rm;
1130 LOG (GNUNET_ERROR_TYPE_DEBUG,
1131 "sending element (size %u) to client\n",
1133 GNUNET_assert (0 != op->spec->client_request_id);
1134 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1137 GNUNET_MQ_discard (ev);
1141 rm->result_status = htons (status);
1142 rm->request_id = htonl (op->spec->client_request_id);
1143 rm->element_type = element->element_type;
1144 GNUNET_memcpy (&rm[1], element->data, element->size);
1145 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1150 * Signal to the client that the operation has finished and
1151 * destroy the operation.
1153 * @param cls operation to destroy
1156 send_done_and_destroy (void *cls)
1158 struct Operation *op = cls;
1159 struct GNUNET_MQ_Envelope *ev;
1160 struct GNUNET_SET_ResultMessage *rm;
1162 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1163 rm->request_id = htonl (op->spec->client_request_id);
1164 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1165 rm->element_type = htons (0);
1166 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1167 /* Will also call the union-specific cancel function. */
1168 _GSS_operation_destroy (op, GNUNET_YES);
1173 maybe_finish (struct Operation *op)
1175 unsigned int num_demanded;
1177 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1179 if (PHASE_FINISH_WAITING == op->state->phase)
1181 LOG (GNUNET_ERROR_TYPE_DEBUG,
1182 "In PHASE_FINISH_WAITING, pending %u demands\n",
1184 if (0 == num_demanded)
1186 struct GNUNET_MQ_Envelope *ev;
1188 op->state->phase = PHASE_DONE;
1189 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1190 GNUNET_MQ_send (op->mq, ev);
1192 /* We now wait until the other peer closes the channel
1193 * after it got all elements from us. */
1196 if (PHASE_FINISH_CLOSING == op->state->phase)
1198 LOG (GNUNET_ERROR_TYPE_DEBUG,
1199 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1201 if (0 == num_demanded)
1203 op->state->phase = PHASE_DONE;
1204 send_done_and_destroy (op);
1211 * Handle an element message from a remote peer.
1213 * @param cls the union operation
1214 * @param mh the message
1217 handle_p2p_elements (void *cls,
1218 const struct GNUNET_MessageHeader *mh)
1220 struct Operation *op = cls;
1221 struct ElementEntry *ee;
1222 const struct GNUNET_SET_ElementMessage *emsg;
1223 uint16_t element_size;
1225 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1227 GNUNET_break_op (0);
1228 fail_union_operation (op);
1231 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1233 GNUNET_break_op (0);
1234 fail_union_operation (op);
1238 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1240 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1241 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1242 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1243 ee->element.size = element_size;
1244 ee->element.data = &ee[1];
1245 ee->element.element_type = ntohs (emsg->element_type);
1246 ee->remote = GNUNET_YES;
1247 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1250 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1254 /* We got something we didn't demand, since it's not in our map. */
1255 GNUNET_break_op (0);
1257 fail_union_operation (op);
1261 LOG (GNUNET_ERROR_TYPE_DEBUG,
1262 "Got element (size %u, hash %s) from peer\n",
1263 (unsigned int) element_size,
1264 GNUNET_h2s (&ee->element_hash));
1266 GNUNET_STATISTICS_update (_GSS_statistics,
1267 "# received elements",
1270 GNUNET_STATISTICS_update (_GSS_statistics,
1271 "# exchanged elements",
1275 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1277 /* Got repeated element. Should not happen since
1278 * we track demands. */
1279 GNUNET_STATISTICS_update (_GSS_statistics,
1280 "# repeated elements",
1287 LOG (GNUNET_ERROR_TYPE_DEBUG,
1288 "Registering new element from remote peer\n");
1289 op_register_element (op, ee);
1290 /* only send results immediately if the client wants it */
1291 switch (op->spec->result_mode)
1293 case GNUNET_SET_RESULT_ADDED:
1294 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1296 case GNUNET_SET_RESULT_SYMMETRIC:
1297 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1300 /* Result mode not supported, should have been caught earlier. */
1311 * Send offers (for GNUNET_Hash-es) in response
1312 * to inquiries (for IBF_Key-s).
1314 * @param cls the union operation
1315 * @param mh the message
1318 handle_p2p_inquiry (void *cls,
1319 const struct GNUNET_MessageHeader *mh)
1321 struct Operation *op = cls;
1322 const struct IBF_Key *ibf_key;
1323 unsigned int num_keys;
1324 struct InquiryMessage *msg;
1326 /* look up elements and send them */
1327 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1329 GNUNET_break_op (0);
1330 fail_union_operation (op);
1333 num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1334 / sizeof (struct IBF_Key);
1335 if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1336 != num_keys * sizeof (struct IBF_Key))
1338 GNUNET_break_op (0);
1339 fail_union_operation (op);
1343 msg = (struct InquiryMessage *) mh;
1345 ibf_key = (const struct IBF_Key *) &msg[1];
1346 while (0 != num_keys--)
1348 struct IBF_Key unsalted_key;
1349 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1350 send_offers_for_key (op, unsalted_key);
1360 handle_p2p_demand (void *cls,
1361 const struct GNUNET_MessageHeader *mh)
1363 struct Operation *op = cls;
1364 struct ElementEntry *ee;
1365 struct GNUNET_SET_ElementMessage *emsg;
1366 const struct GNUNET_HashCode *hash;
1367 unsigned int num_hashes;
1368 struct GNUNET_MQ_Envelope *ev;
1370 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1371 / sizeof (struct GNUNET_HashCode);
1372 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1373 != num_hashes * sizeof (struct GNUNET_HashCode))
1375 GNUNET_break_op (0);
1376 fail_union_operation (op);
1380 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1382 hash++, num_hashes--)
1384 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1387 /* Demand for non-existing element. */
1388 GNUNET_break_op (0);
1389 fail_union_operation (op);
1392 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1394 /* Probably confused lazily copied sets. */
1395 GNUNET_break_op (0);
1396 fail_union_operation (op);
1399 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1400 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1401 emsg->reserved = htons (0);
1402 emsg->element_type = htons (ee->element.element_type);
1403 LOG (GNUNET_ERROR_TYPE_DEBUG,
1404 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1406 (unsigned int) ee->element.size,
1407 GNUNET_h2s (&ee->element_hash));
1408 GNUNET_MQ_send (op->mq, ev);
1409 GNUNET_STATISTICS_update (_GSS_statistics,
1410 "# exchanged elements",
1414 switch (op->spec->result_mode)
1416 case GNUNET_SET_RESULT_ADDED:
1417 /* Nothing to do. */
1419 case GNUNET_SET_RESULT_SYMMETRIC:
1420 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1423 /* Result mode not supported, should have been caught earlier. */
1432 * Handle offers (of GNUNET_HashCode-s) and
1433 * respond with demands (of GNUNET_HashCode-s).
1435 * @param cls the union operation
1436 * @param mh the message
1439 handle_p2p_offer (void *cls,
1440 const struct GNUNET_MessageHeader *mh)
1442 struct Operation *op = cls;
1443 const struct GNUNET_HashCode *hash;
1444 unsigned int num_hashes;
1446 /* look up elements and send them */
1447 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1448 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1450 GNUNET_break_op (0);
1451 fail_union_operation (op);
1454 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1455 / sizeof (struct GNUNET_HashCode);
1456 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1457 != num_hashes * sizeof (struct GNUNET_HashCode))
1459 GNUNET_break_op (0);
1460 fail_union_operation (op);
1464 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1466 hash++, num_hashes--)
1468 struct ElementEntry *ee;
1469 struct GNUNET_MessageHeader *demands;
1470 struct GNUNET_MQ_Envelope *ev;
1472 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1475 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1479 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1482 LOG (GNUNET_ERROR_TYPE_DEBUG,
1483 "Skipped sending duplicate demand\n");
1487 GNUNET_assert (GNUNET_OK ==
1488 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1491 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1493 LOG (GNUNET_ERROR_TYPE_DEBUG,
1494 "[OP %x] Requesting element (hash %s)\n",
1495 (void *) op, GNUNET_h2s (hash));
1496 ev = GNUNET_MQ_msg_header_extra (demands,
1497 sizeof (struct GNUNET_HashCode),
1498 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1499 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1500 GNUNET_MQ_send (op->mq, ev);
1506 * Handle a done message from a remote peer
1508 * @param cls the union operation
1509 * @param mh the message
1512 handle_p2p_done (void *cls,
1513 const struct GNUNET_MessageHeader *mh)
1515 struct Operation *op = cls;
1517 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1519 /* We got all requests, but still have to send our elements in response. */
1521 op->state->phase = PHASE_FINISH_WAITING;
1523 LOG (GNUNET_ERROR_TYPE_DEBUG,
1524 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1525 /* The active peer is done sending offers
1526 * and inquiries. This means that all
1527 * our responses to that (demands and offers)
1528 * must be in flight (queued or in mesh).
1530 * We should notify the active peer once
1531 * all our demands are satisfied, so that the active
1532 * peer can quit if we gave him everything.
1537 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1539 LOG (GNUNET_ERROR_TYPE_DEBUG,
1540 "got DONE (as active partner), waiting to finish\n");
1541 /* All demands of the other peer are satisfied,
1542 * and we processed all offers, thus we know
1543 * exactly what our demands must be.
1545 * We'll close the channel
1546 * to the other peer once our demands are met.
1548 op->state->phase = PHASE_FINISH_CLOSING;
1552 GNUNET_break_op (0);
1553 fail_union_operation (op);
1558 * Initiate operation to evaluate a set union with a remote peer.
1560 * @param op operation to perform (to be initialized)
1561 * @param opaque_context message to be transmitted to the listener
1562 * to convince him to accept, may be NULL
1565 union_evaluate (struct Operation *op,
1566 const struct GNUNET_MessageHeader *opaque_context)
1568 struct GNUNET_MQ_Envelope *ev;
1569 struct OperationRequestMessage *msg;
1571 GNUNET_assert (NULL == op->state);
1572 op->state = GNUNET_new (struct OperationState);
1573 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1574 /* copy the current generation's strata estimator for this operation */
1575 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1576 /* we started the operation, thus we have to send the operation request */
1577 op->state->phase = PHASE_EXPECT_SE;
1578 op->state->salt_receive = op->state->salt_send = 42;
1579 LOG (GNUNET_ERROR_TYPE_DEBUG,
1580 "Initiating union operation evaluation\n");
1581 GNUNET_STATISTICS_update (_GSS_statistics,
1582 "# of total union operations",
1585 GNUNET_STATISTICS_update (_GSS_statistics,
1586 "# of initiated union operations",
1589 ev = GNUNET_MQ_msg_nested_mh (msg,
1590 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1594 /* the context message is too large */
1596 GNUNET_SERVER_client_disconnect (op->spec->set->client);
1599 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1600 GNUNET_MQ_send (op->mq,
1603 if (NULL != opaque_context)
1604 LOG (GNUNET_ERROR_TYPE_DEBUG,
1605 "sent op request with context message\n");
1607 LOG (GNUNET_ERROR_TYPE_DEBUG,
1608 "sent op request without context message\n");
1613 * Accept an union operation request from a remote peer.
1614 * Only initializes the private operation state.
1616 * @param op operation that will be accepted as a union operation
1619 union_accept (struct Operation *op)
1621 LOG (GNUNET_ERROR_TYPE_DEBUG,
1622 "accepting set union operation\n");
1623 GNUNET_assert (NULL == op->state);
1625 GNUNET_STATISTICS_update (_GSS_statistics,
1626 "# of accepted union operations",
1629 GNUNET_STATISTICS_update (_GSS_statistics,
1630 "# of total union operations",
1634 op->state = GNUNET_new (struct OperationState);
1635 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1636 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1637 op->state->salt_receive = op->state->salt_send = 42;
1638 /* kick off the operation */
1639 send_strata_estimator (op);
1644 * Create a new set supporting the union operation
1646 * We maintain one strata estimator per set and then manipulate it over the
1647 * lifetime of the set, as recreating a strata estimator would be expensive.
1649 * @return the newly created set, NULL on error
1651 static struct SetState *
1652 union_set_create (void)
1654 struct SetState *set_state;
1656 LOG (GNUNET_ERROR_TYPE_DEBUG,
1657 "union set created\n");
1658 set_state = GNUNET_new (struct SetState);
1659 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1660 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1661 if (NULL == set_state->se)
1663 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1664 "Failed to allocate strata estimator\n");
1665 GNUNET_free (set_state);
1673 * Add the element from the given element message to the set.
1675 * @param set_state state of the set want to add to
1676 * @param ee the element to add to the set
1679 union_add (struct SetState *set_state, struct ElementEntry *ee)
1681 strata_estimator_insert (set_state->se,
1682 get_ibf_key (&ee->element_hash));
1687 * Remove the element given in the element message from the set.
1688 * Only marks the element as removed, so that older set operations can still exchange it.
1690 * @param set_state state of the set to remove from
1691 * @param ee set element to remove
1694 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1696 strata_estimator_remove (set_state->se,
1697 get_ibf_key (&ee->element_hash));
1702 * Destroy a set that supports the union operation.
1704 * @param set_state the set to destroy
1707 union_set_destroy (struct SetState *set_state)
1709 if (NULL != set_state->se)
1711 strata_estimator_destroy (set_state->se);
1712 set_state->se = NULL;
1714 GNUNET_free (set_state);
1719 * Dispatch messages for a union operation.
1721 * @param op the state of the union evaluate operation
1722 * @param mh the received message
1723 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1724 * #GNUNET_OK otherwise
1727 union_handle_p2p_message (struct Operation *op,
1728 const struct GNUNET_MessageHeader *mh)
1730 //LOG (GNUNET_ERROR_TYPE_DEBUG,
1731 // "received p2p message (t: %u, s: %u)\n",
1732 // ntohs (mh->type),
1733 // ntohs (mh->size));
1734 switch (ntohs (mh->type))
1736 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1737 return handle_p2p_ibf (op, mh);
1738 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1739 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1740 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1741 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1742 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1743 handle_p2p_elements (op, mh);
1745 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1746 handle_p2p_inquiry (op, mh);
1748 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1749 handle_p2p_done (op, mh);
1751 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1752 handle_p2p_offer (op, mh);
1754 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1755 handle_p2p_demand (op, mh);
1758 /* Something wrong with cadet's message handlers? */
1766 * Handler for peer-disconnects, notifies the client
1767 * about the aborted operation in case the op was not concluded.
1769 * @param op the destroyed operation
1772 union_peer_disconnect (struct Operation *op)
1774 if (PHASE_DONE != op->state->phase)
1776 struct GNUNET_MQ_Envelope *ev;
1777 struct GNUNET_SET_ResultMessage *msg;
1779 ev = GNUNET_MQ_msg (msg,
1780 GNUNET_MESSAGE_TYPE_SET_RESULT);
1781 msg->request_id = htonl (op->spec->client_request_id);
1782 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1783 msg->element_type = htons (0);
1784 GNUNET_MQ_send (op->spec->set->client_mq,
1786 LOG (GNUNET_ERROR_TYPE_WARNING,
1787 "other peer disconnected prematurely, phase %u\n",
1789 _GSS_operation_destroy (op,
1793 // else: the session has already been concluded
1794 LOG (GNUNET_ERROR_TYPE_DEBUG,
1795 "other peer disconnected (finished)\n");
1796 if (GNUNET_NO == op->state->client_done_sent)
1797 send_done_and_destroy (op);
1802 * Copy union-specific set state.
1804 * @param set source set for copying the union state
1805 * @return a copy of the union-specific set state
1807 static struct SetState *
1808 union_copy_state (struct Set *set)
1810 struct SetState *new_state;
1812 new_state = GNUNET_new (struct SetState);
1813 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1814 new_state->se = strata_estimator_dup (set->state->se);
1821 * Get the table with implementing functions for
1824 * @return the operation specific VTable
1826 const struct SetVT *
1829 static const struct SetVT union_vt = {
1830 .create = &union_set_create,
1831 .msg_handler = &union_handle_p2p_message,
1833 .remove = &union_remove,
1834 .destroy_set = &union_set_destroy,
1835 .evaluate = &union_evaluate,
1836 .accept = &union_accept,
1837 .peer_disconnect = &union_peer_disconnect,
1838 .cancel = &union_op_cancel,
1839 .copy_state = &union_copy_state,