2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file set/gnunet-service-set_union.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union.h"
32 #include "gnunet-service-set_union_strata_estimator.h"
33 #include "gnunet-service-set_protocol.h"
37 #define LOG(kind, ...) GNUNET_log_from(kind, "set-union", __VA_ARGS__)
41 * Number of IBFs in a strata estimator.
43 #define SE_STRATA_COUNT 32
46 * Size of the IBFs in the strata estimator.
48 #define SE_IBF_SIZE 80
51 * The hash num parameter for the difference digests and strata estimators.
53 #define SE_IBF_HASH_NUM 4
56 * Number of buckets that can be transmitted in one message.
58 #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
61 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
62 * Choose this value so that computing the IBF is still cheaper
63 * than transmitting all values.
65 #define MAX_IBF_ORDER (20)
68 * Number of buckets used in the ibf per estimated
75 * Current phase we are in for a union operation.
77 enum UnionOperationPhase {
79 * We sent the request message, and expect a strata estimator.
84 * We sent the strata estimator, and expect an IBF. This phase is entered once
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87 * XXX: could use better wording.
88 * XXX: repurposed to also expect a "request full set" message, should be renamed
90 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
95 * Continuation for multi part IBFs.
97 PHASE_EXPECT_IBF_CONT,
100 * We are decoding an IBF.
102 PHASE_INVENTORY_ACTIVE,
105 * The other peer is decoding the IBF we just sent.
107 PHASE_INVENTORY_PASSIVE,
110 * The protocol is almost finished, but we still have to flush our message
111 * queue and/or expect some elements.
113 PHASE_FINISH_CLOSING,
116 * In the penultimate phase,
117 * we wait until all our demands
118 * are satisfied. Then we send a done
119 * message, and wait for another done message.
121 PHASE_FINISH_WAITING,
124 * In the ultimate phase, we wait until
125 * our demands are satisfied and then
126 * quit (sending another DONE message).
131 * After sending the full set, wait for responses with the elements
132 * that the local peer is missing.
139 * State of an evaluate operation with another peer.
141 struct OperationState {
143 * Copy of the set's strata estimator at the time of
144 * creation of this operation.
146 struct StrataEstimator *se;
149 * The IBF we currently receive.
151 struct InvertibleBloomFilter *remote_ibf;
154 * The IBF with the local set's element.
156 struct InvertibleBloomFilter *local_ibf;
159 * Maps unsalted IBF-Keys to elements.
160 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
161 * Colliding IBF-Keys are linked.
163 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
166 * Current state of the operation.
168 enum UnionOperationPhase phase;
171 * Did we send the client that we are done?
173 int client_done_sent;
176 * Number of ibf buckets already received into the @a remote_ibf.
178 unsigned int ibf_buckets_received;
181 * Hashes for elements that we have demanded from the other peer.
183 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
186 * Salt that we're using for sending IBFs
191 * Salt for the IBF we've received and that we're currently decoding.
193 uint32_t salt_receive;
196 * Number of elements we received from the other peer
197 * that were not in the local set yet.
199 uint32_t received_fresh;
202 * Total number of elements received from the other peer.
204 uint32_t received_total;
207 * Initial size of our set, just before
208 * the operation started.
210 uint64_t initial_size;
215 * The key entry is used to associate an ibf key with an element.
219 * IBF key for the entry, derived from the current salt.
221 struct IBF_Key ibf_key;
224 * The actual element associated with the key.
226 * Only owned by the union operation if element->operation
229 struct ElementEntry *element;
232 * Did we receive this element?
233 * Even if element->is_foreign is false, we might
234 * have received the element, so this indicates that
235 * the other peer has it.
242 * Used as a closure for sending elements
243 * with a specific IBF key.
245 struct SendElementClosure {
247 * The IBF key whose matching elements should be
250 struct IBF_Key ibf_key;
253 * Operation for which the elements
256 struct Operation *op;
261 * Extra state required for efficient set union.
265 * The strata estimator is only generated once for
267 * The IBF keys are derived from the element hashes with
270 struct StrataEstimator *se;
275 * Iterator over hash map entries, called to
276 * destroy the linked list of colliding ibf key entries.
279 * @param key current key code
280 * @param value value in the hash map
281 * @return #GNUNET_YES if we should continue to iterate,
285 destroy_key_to_element_iter(void *cls,
289 struct KeyEntry *k = value;
291 GNUNET_assert(NULL != k);
292 if (GNUNET_YES == k->element->remote)
294 GNUNET_free(k->element);
303 * Destroy the union operation. Only things specific to the union
304 * operation are destroyed.
306 * @param op union operation to destroy
309 union_op_cancel(struct Operation *op)
311 LOG(GNUNET_ERROR_TYPE_DEBUG,
312 "destroying union op\n");
313 /* check if the op was canceled twice */
314 GNUNET_assert(NULL != op->state);
315 if (NULL != op->state->remote_ibf)
317 ibf_destroy(op->state->remote_ibf);
318 op->state->remote_ibf = NULL;
320 if (NULL != op->state->demanded_hashes)
322 GNUNET_CONTAINER_multihashmap_destroy(op->state->demanded_hashes);
323 op->state->demanded_hashes = NULL;
325 if (NULL != op->state->local_ibf)
327 ibf_destroy(op->state->local_ibf);
328 op->state->local_ibf = NULL;
330 if (NULL != op->state->se)
332 strata_estimator_destroy(op->state->se);
333 op->state->se = NULL;
335 if (NULL != op->state->key_to_element)
337 GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element,
338 &destroy_key_to_element_iter,
340 GNUNET_CONTAINER_multihashmap32_destroy(op->state->key_to_element);
341 op->state->key_to_element = NULL;
343 GNUNET_free(op->state);
345 LOG(GNUNET_ERROR_TYPE_DEBUG,
346 "destroying union op done\n");
351 * Inform the client that the union operation has failed,
352 * and proceed to destroy the evaluate operation.
354 * @param op the union operation to fail
357 fail_union_operation(struct Operation *op)
359 struct GNUNET_MQ_Envelope *ev;
360 struct GNUNET_SET_ResultMessage *msg;
362 LOG(GNUNET_ERROR_TYPE_WARNING,
363 "union operation failed\n");
364 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
365 msg->result_status = htons(GNUNET_SET_STATUS_FAILURE);
366 msg->request_id = htonl(op->client_request_id);
367 msg->element_type = htons(0);
368 GNUNET_MQ_send(op->set->cs->mq,
370 _GSS_operation_destroy(op, GNUNET_YES);
375 * Derive the IBF key from a hash code and
378 * @param src the hash code
379 * @return the derived IBF key
381 static struct IBF_Key
382 get_ibf_key(const struct GNUNET_HashCode *src)
387 GNUNET_assert(GNUNET_OK ==
388 GNUNET_CRYPTO_kdf(&key, sizeof(key),
397 * Context for #op_get_element_iterator
399 struct GetElementContext {
403 struct GNUNET_HashCode hash;
413 * Iterator over the mapping from IBF keys to element entries. Checks if we
414 * have an element with a given GNUNET_HashCode.
417 * @param key current key code
418 * @param value value in the hash map
419 * @return #GNUNET_YES if we should search further,
420 * #GNUNET_NO if we've found the element.
423 op_get_element_iterator(void *cls,
427 struct GetElementContext *ctx = cls;
428 struct KeyEntry *k = value;
430 GNUNET_assert(NULL != k);
431 if (0 == GNUNET_CRYPTO_hash_cmp(&k->element->element_hash,
442 * Determine whether the given element is already in the operation's element
445 * @param op operation that should be tested for 'element_hash'
446 * @param element_hash hash of the element to look for
447 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
449 static struct KeyEntry *
450 op_get_element(struct Operation *op,
451 const struct GNUNET_HashCode *element_hash)
454 struct IBF_Key ibf_key;
455 struct GetElementContext ctx = { { { 0 } }, 0 };
457 ctx.hash = *element_hash;
459 ibf_key = get_ibf_key(element_hash);
460 ret = GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element,
461 (uint32_t)ibf_key.key_val,
462 op_get_element_iterator,
465 /* was the iteration aborted because we found the element? */
466 if (GNUNET_SYSERR == ret)
468 GNUNET_assert(NULL != ctx.k);
476 * Insert an element into the union operation's
477 * key-to-element mapping. Takes ownership of 'ee'.
478 * Note that this does not insert the element in the set,
479 * only in the operation's key-element mapping.
480 * This is done to speed up re-tried operations, if some elements
481 * were transmitted, and then the IBF fails to decode.
483 * XXX: clarify ownership, doesn't sound right.
485 * @param op the union operation
486 * @param ee the element entry
487 * @parem received was this element received from the remote peer?
490 op_register_element(struct Operation *op,
491 struct ElementEntry *ee,
494 struct IBF_Key ibf_key;
497 ibf_key = get_ibf_key(&ee->element_hash);
498 k = GNUNET_new(struct KeyEntry);
500 k->ibf_key = ibf_key;
501 k->received = received;
502 GNUNET_assert(GNUNET_OK ==
503 GNUNET_CONTAINER_multihashmap32_put(op->state->key_to_element,
504 (uint32_t)ibf_key.key_val,
506 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
514 salt_key(const struct IBF_Key *k_in,
516 struct IBF_Key *k_out)
519 uint64_t x = k_in->key_val;
522 x = (x >> s) | (x << (64 - s));
531 unsalt_key(const struct IBF_Key *k_in,
533 struct IBF_Key *k_out)
536 uint64_t x = k_in->key_val;
538 x = (x << s) | (x >> (64 - s));
544 * Insert a key into an ibf.
548 * @param value the key entry to get the key from
551 prepare_ibf_iterator(void *cls,
555 struct Operation *op = cls;
556 struct KeyEntry *ke = value;
557 struct IBF_Key salted_key;
559 LOG(GNUNET_ERROR_TYPE_DEBUG,
560 "[OP %x] inserting %lx (hash %s) into ibf\n",
562 (unsigned long)ke->ibf_key.key_val,
563 GNUNET_h2s(&ke->element->element_hash));
564 salt_key(&ke->ibf_key,
565 op->state->salt_send,
567 ibf_insert(op->state->local_ibf, salted_key);
573 * Iterator for initializing the
574 * key-to-element mapping of a union operation
576 * @param cls the union operation `struct Operation *`
578 * @param value the `struct ElementEntry *` to insert
579 * into the key-to-element mapping
580 * @return #GNUNET_YES (to continue iterating)
583 init_key_to_element_iterator(void *cls,
584 const struct GNUNET_HashCode *key,
587 struct Operation *op = cls;
588 struct ElementEntry *ee = value;
590 /* make sure that the element belongs to the set at the time
591 * of creating the operation */
593 _GSS_is_element_of_operation(ee,
596 GNUNET_assert(GNUNET_NO == ee->remote);
597 op_register_element(op,
605 * Initialize the IBF key to element mapping local to this set
608 * @param op the set union operation
611 initialize_key_to_element(struct Operation *op)
615 GNUNET_assert(NULL == op->state->key_to_element);
616 len = GNUNET_CONTAINER_multihashmap_size(op->set->content->elements);
617 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create(len + 1);
618 GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements,
619 &init_key_to_element_iterator,
625 * Create an ibf with the operation's elements
626 * of the specified size
628 * @param op the union operation
629 * @param size size of the ibf to create
630 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
633 prepare_ibf(struct Operation *op,
636 GNUNET_assert(NULL != op->state->key_to_element);
638 if (NULL != op->state->local_ibf)
639 ibf_destroy(op->state->local_ibf);
640 op->state->local_ibf = ibf_create(size, SE_IBF_HASH_NUM);
641 if (NULL == op->state->local_ibf)
643 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
644 "Failed to allocate local IBF\n");
645 return GNUNET_SYSERR;
647 GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element,
648 &prepare_ibf_iterator,
655 * Send an ibf of appropriate size.
657 * Fragments the IBF into multiple messages if necessary.
659 * @param op the union operation
660 * @param ibf_order order of the ibf to send, size=2^order
661 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
664 send_ibf(struct Operation *op,
667 unsigned int buckets_sent = 0;
668 struct InvertibleBloomFilter *ibf;
671 prepare_ibf(op, 1 << ibf_order))
673 /* allocation failed */
674 return GNUNET_SYSERR;
677 LOG(GNUNET_ERROR_TYPE_DEBUG,
678 "sending ibf of size %u\n",
682 char name[64] = { 0 };
683 snprintf(name, sizeof(name), "# sent IBF (order %u)", ibf_order);
684 GNUNET_STATISTICS_update(_GSS_statistics, name, 1, GNUNET_NO);
687 ibf = op->state->local_ibf;
689 while (buckets_sent < (1 << ibf_order))
691 unsigned int buckets_in_message;
692 struct GNUNET_MQ_Envelope *ev;
693 struct IBFMessage *msg;
695 buckets_in_message = (1 << ibf_order) - buckets_sent;
696 /* limit to maximum */
697 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
698 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
700 ev = GNUNET_MQ_msg_extra(msg,
701 buckets_in_message * IBF_BUCKET_SIZE,
702 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
705 msg->order = ibf_order;
706 msg->offset = htonl(buckets_sent);
707 msg->salt = htonl(op->state->salt_send);
708 ibf_write_slice(ibf, buckets_sent,
709 buckets_in_message, &msg[1]);
710 buckets_sent += buckets_in_message;
711 LOG(GNUNET_ERROR_TYPE_DEBUG,
712 "ibf chunk size %u, %u/%u sent\n",
716 GNUNET_MQ_send(op->mq, ev);
719 /* The other peer must decode the IBF, so
721 op->state->phase = PHASE_INVENTORY_PASSIVE;
727 * Compute the necessary order of an ibf
728 * from the size of the symmetric set difference.
730 * @param diff the difference
731 * @return the required size of the ibf
734 get_order_from_difference(unsigned int diff)
736 unsigned int ibf_order;
739 while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
740 ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
741 (ibf_order < MAX_IBF_ORDER))
743 // add one for correction
744 return ibf_order + 1;
749 * Send a set element.
751 * @param cls the union operation `struct Operation *`
753 * @param value the `struct ElementEntry *` to insert
754 * into the key-to-element mapping
755 * @return #GNUNET_YES (to continue iterating)
758 send_full_element_iterator(void *cls,
759 const struct GNUNET_HashCode *key,
762 struct Operation *op = cls;
763 struct GNUNET_SET_ElementMessage *emsg;
764 struct ElementEntry *ee = value;
765 struct GNUNET_SET_Element *el = &ee->element;
766 struct GNUNET_MQ_Envelope *ev;
768 LOG(GNUNET_ERROR_TYPE_DEBUG,
769 "Sending element %s\n",
771 ev = GNUNET_MQ_msg_extra(emsg,
773 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
774 emsg->element_type = htons(el->element_type);
775 GNUNET_memcpy(&emsg[1],
778 GNUNET_MQ_send(op->mq,
785 * Switch to full set transmission for @a op.
787 * @param op operation to switch to full set transmission.
790 send_full_set(struct Operation *op)
792 struct GNUNET_MQ_Envelope *ev;
794 op->state->phase = PHASE_FULL_SENDING;
795 LOG(GNUNET_ERROR_TYPE_DEBUG,
796 "Dedicing to transmit the full set\n");
797 /* FIXME: use a more memory-friendly way of doing this with an
798 iterator, just as we do in the non-full case! */
799 (void)GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements,
800 &send_full_element_iterator,
802 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
803 GNUNET_MQ_send(op->mq,
809 * Handle a strata estimator from a remote peer
811 * @param cls the union operation
812 * @param msg the message
815 check_union_p2p_strata_estimator(void *cls,
816 const struct StrataEstimatorMessage *msg)
818 struct Operation *op = cls;
822 if (op->state->phase != PHASE_EXPECT_SE)
825 return GNUNET_SYSERR;
827 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type));
828 len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage);
829 if ((GNUNET_NO == is_compressed) &&
830 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
833 return GNUNET_SYSERR;
840 * Handle a strata estimator from a remote peer
842 * @param cls the union operation
843 * @param msg the message
846 handle_union_p2p_strata_estimator(void *cls,
847 const struct StrataEstimatorMessage *msg)
849 struct Operation *op = cls;
850 struct StrataEstimator *remote_se;
856 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type));
857 GNUNET_STATISTICS_update(_GSS_statistics,
858 "# bytes of SE received",
859 ntohs(msg->header.size),
861 len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage);
862 other_size = GNUNET_ntohll(msg->set_size);
863 remote_se = strata_estimator_create(SE_STRATA_COUNT,
866 if (NULL == remote_se)
868 /* insufficient resources, fail */
869 fail_union_operation(op);
873 strata_estimator_read(&msg[1],
878 /* decompression failed */
879 strata_estimator_destroy(remote_se);
880 fail_union_operation(op);
883 GNUNET_assert(NULL != op->state->se);
884 diff = strata_estimator_difference(remote_se,
890 strata_estimator_destroy(remote_se);
891 strata_estimator_destroy(op->state->se);
892 op->state->se = NULL;
893 LOG(GNUNET_ERROR_TYPE_DEBUG,
894 "got se diff=%d, using ibf size %d\n",
896 1U << get_order_from_difference(diff));
901 set_debug = getenv("GNUNET_SET_BENCHMARK");
902 if ((NULL != set_debug) &&
903 (0 == strcmp(set_debug, "1")))
905 FILE *f = fopen("set.log", "a");
906 fprintf(f, "%llu\n", (unsigned long long)diff);
911 if ((GNUNET_YES == op->byzantine) &&
912 (other_size < op->byzantine_lower_bound))
915 fail_union_operation(op);
919 if ((GNUNET_YES == op->force_full) ||
920 (diff > op->state->initial_size / 4) ||
923 LOG(GNUNET_ERROR_TYPE_DEBUG,
924 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
926 op->state->initial_size);
927 GNUNET_STATISTICS_update(_GSS_statistics,
931 if ((op->state->initial_size <= other_size) ||
938 struct GNUNET_MQ_Envelope *ev;
940 LOG(GNUNET_ERROR_TYPE_DEBUG,
941 "Telling other peer that we expect its full set\n");
942 op->state->phase = PHASE_EXPECT_IBF;
943 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
944 GNUNET_MQ_send(op->mq,
950 GNUNET_STATISTICS_update(_GSS_statistics,
956 get_order_from_difference(diff)))
958 /* Internal error, best we can do is shut the connection */
959 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
960 "Failed to send IBF, closing connection\n");
961 fail_union_operation(op);
965 GNUNET_CADET_receive_done(op->channel);
970 * Iterator to send elements to a remote peer
972 * @param cls closure with the element key and the union operation
974 * @param value the key entry
977 send_offers_iterator(void *cls,
981 struct SendElementClosure *sec = cls;
982 struct Operation *op = sec->op;
983 struct KeyEntry *ke = value;
984 struct GNUNET_MQ_Envelope *ev;
985 struct GNUNET_MessageHeader *mh;
987 /* Detect 32-bit key collision for the 64-bit IBF keys. */
988 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
991 ev = GNUNET_MQ_msg_header_extra(mh,
992 sizeof(struct GNUNET_HashCode),
993 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
995 GNUNET_assert(NULL != ev);
996 *(struct GNUNET_HashCode *)&mh[1] = ke->element->element_hash;
997 LOG(GNUNET_ERROR_TYPE_DEBUG,
998 "[OP %x] sending element offer (%s) to peer\n",
1000 GNUNET_h2s(&ke->element->element_hash));
1001 GNUNET_MQ_send(op->mq, ev);
1007 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1009 * @param op union operation
1010 * @param ibf_key IBF key of interest
1013 send_offers_for_key(struct Operation *op,
1014 struct IBF_Key ibf_key)
1016 struct SendElementClosure send_cls;
1018 send_cls.ibf_key = ibf_key;
1020 (void)GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element,
1021 (uint32_t)ibf_key.key_val,
1022 &send_offers_iterator,
1028 * Decode which elements are missing on each side, and
1029 * send the appropriate offers and inquiries.
1031 * @param op union operation
1032 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1035 decode_and_send(struct Operation *op)
1038 struct IBF_Key last_key;
1040 unsigned int num_decoded;
1041 struct InvertibleBloomFilter *diff_ibf;
1043 GNUNET_assert(PHASE_INVENTORY_ACTIVE == op->state->phase);
1047 op->state->remote_ibf->size))
1050 /* allocation failed */
1051 return GNUNET_SYSERR;
1053 diff_ibf = ibf_dup(op->state->local_ibf);
1054 ibf_subtract(diff_ibf,
1055 op->state->remote_ibf);
1057 ibf_destroy(op->state->remote_ibf);
1058 op->state->remote_ibf = NULL;
1060 LOG(GNUNET_ERROR_TYPE_DEBUG,
1061 "decoding IBF (size=%u)\n",
1065 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1070 int cycle_detected = GNUNET_NO;
1074 res = ibf_decode(diff_ibf, &side, &key);
1075 if (res == GNUNET_OK)
1077 LOG(GNUNET_ERROR_TYPE_DEBUG,
1078 "decoded ibf key %lx\n",
1079 (unsigned long)key.key_val);
1081 if ((num_decoded > diff_ibf->size) ||
1082 ((num_decoded > 1) &&
1083 (last_key.key_val == key.key_val)))
1085 LOG(GNUNET_ERROR_TYPE_DEBUG,
1086 "detected cyclic ibf (decoded %u/%u)\n",
1089 cycle_detected = GNUNET_YES;
1092 if ((GNUNET_SYSERR == res) ||
1093 (GNUNET_YES == cycle_detected))
1097 while (1 << next_order < diff_ibf->size)
1100 if (next_order <= MAX_IBF_ORDER)
1102 LOG(GNUNET_ERROR_TYPE_DEBUG,
1103 "decoding failed, sending larger ibf (size %u)\n",
1105 GNUNET_STATISTICS_update(_GSS_statistics,
1109 op->state->salt_send++;
1111 send_ibf(op, next_order))
1113 /* Internal error, best we can do is shut the connection */
1114 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1115 "Failed to send IBF, closing connection\n");
1116 fail_union_operation(op);
1117 ibf_destroy(diff_ibf);
1118 return GNUNET_SYSERR;
1123 GNUNET_STATISTICS_update(_GSS_statistics,
1124 "# of failed union operations (too large)",
1127 // XXX: Send the whole set, element-by-element
1128 LOG(GNUNET_ERROR_TYPE_ERROR,
1129 "set union failed: reached ibf limit\n");
1130 fail_union_operation(op);
1131 ibf_destroy(diff_ibf);
1132 return GNUNET_SYSERR;
1136 if (GNUNET_NO == res)
1138 struct GNUNET_MQ_Envelope *ev;
1140 LOG(GNUNET_ERROR_TYPE_DEBUG,
1141 "transmitted all values, sending DONE\n");
1142 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1143 GNUNET_MQ_send(op->mq, ev);
1144 /* We now wait until we get a DONE message back
1145 * and then wait for our MQ to be flushed and all our
1146 * demands be delivered. */
1151 struct IBF_Key unsalted_key;
1154 op->state->salt_receive,
1156 send_offers_for_key(op,
1159 else if (-1 == side)
1161 struct GNUNET_MQ_Envelope *ev;
1162 struct InquiryMessage *msg;
1164 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1165 * the effort additional complexity. */
1166 ev = GNUNET_MQ_msg_extra(msg,
1167 sizeof(struct IBF_Key),
1168 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1169 msg->salt = htonl(op->state->salt_receive);
1170 GNUNET_memcpy(&msg[1],
1172 sizeof(struct IBF_Key));
1173 LOG(GNUNET_ERROR_TYPE_DEBUG,
1174 "sending element inquiry for IBF key %lx\n",
1175 (unsigned long)key.key_val);
1176 GNUNET_MQ_send(op->mq, ev);
1183 ibf_destroy(diff_ibf);
1189 * Check an IBF message from a remote peer.
1191 * Reassemble the IBF from multiple pieces, and
1192 * process the whole IBF once possible.
1194 * @param cls the union operation
1195 * @param msg the header of the message
1196 * @return #GNUNET_OK if @a msg is well-formed
1199 check_union_p2p_ibf(void *cls,
1200 const struct IBFMessage *msg)
1202 struct Operation *op = cls;
1203 unsigned int buckets_in_message;
1205 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1208 return GNUNET_SYSERR;
1210 buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1211 if (0 == buckets_in_message)
1214 return GNUNET_SYSERR;
1216 if ((ntohs(msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1219 return GNUNET_SYSERR;
1221 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1223 if (ntohl(msg->offset) != op->state->ibf_buckets_received)
1226 return GNUNET_SYSERR;
1228 if (1 << msg->order != op->state->remote_ibf->size)
1231 return GNUNET_SYSERR;
1233 if (ntohl(msg->salt) != op->state->salt_receive)
1236 return GNUNET_SYSERR;
1239 else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1240 (op->state->phase != PHASE_EXPECT_IBF))
1243 return GNUNET_SYSERR;
1251 * Handle an IBF message from a remote peer.
1253 * Reassemble the IBF from multiple pieces, and
1254 * process the whole IBF once possible.
1256 * @param cls the union operation
1257 * @param msg the header of the message
1260 handle_union_p2p_ibf(void *cls,
1261 const struct IBFMessage *msg)
1263 struct Operation *op = cls;
1264 unsigned int buckets_in_message;
1266 buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1267 if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1268 (op->state->phase == PHASE_EXPECT_IBF))
1270 op->state->phase = PHASE_EXPECT_IBF_CONT;
1271 GNUNET_assert(NULL == op->state->remote_ibf);
1272 LOG(GNUNET_ERROR_TYPE_DEBUG,
1273 "Creating new ibf of size %u\n",
1275 op->state->remote_ibf = ibf_create(1 << msg->order, SE_IBF_HASH_NUM);
1276 op->state->salt_receive = ntohl(msg->salt);
1277 LOG(GNUNET_ERROR_TYPE_DEBUG,
1278 "Receiving new IBF with salt %u\n",
1279 op->state->salt_receive);
1280 if (NULL == op->state->remote_ibf)
1282 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1283 "Failed to parse remote IBF, closing connection\n");
1284 fail_union_operation(op);
1287 op->state->ibf_buckets_received = 0;
1288 if (0 != ntohl(msg->offset))
1291 fail_union_operation(op);
1297 GNUNET_assert(op->state->phase == PHASE_EXPECT_IBF_CONT);
1298 LOG(GNUNET_ERROR_TYPE_DEBUG,
1299 "Received more of IBF\n");
1301 GNUNET_assert(NULL != op->state->remote_ibf);
1303 ibf_read_slice(&msg[1],
1304 op->state->ibf_buckets_received,
1306 op->state->remote_ibf);
1307 op->state->ibf_buckets_received += buckets_in_message;
1309 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1311 LOG(GNUNET_ERROR_TYPE_DEBUG,
1312 "received full ibf\n");
1313 op->state->phase = PHASE_INVENTORY_ACTIVE;
1315 decode_and_send(op))
1317 /* Internal error, best we can do is shut down */
1318 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1319 "Failed to decode IBF, closing connection\n");
1320 fail_union_operation(op);
1324 GNUNET_CADET_receive_done(op->channel);
1329 * Send a result message to the client indicating
1330 * that there is a new element.
1332 * @param op union operation
1333 * @param element element to send
1334 * @param status status to send with the new element
1337 send_client_element(struct Operation *op,
1338 struct GNUNET_SET_Element *element,
1341 struct GNUNET_MQ_Envelope *ev;
1342 struct GNUNET_SET_ResultMessage *rm;
1344 LOG(GNUNET_ERROR_TYPE_DEBUG,
1345 "sending element (size %u) to client\n",
1347 GNUNET_assert(0 != op->client_request_id);
1348 ev = GNUNET_MQ_msg_extra(rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1351 GNUNET_MQ_discard(ev);
1355 rm->result_status = htons(status);
1356 rm->request_id = htonl(op->client_request_id);
1357 rm->element_type = htons(element->element_type);
1358 rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element));
1359 GNUNET_memcpy(&rm[1],
1362 GNUNET_MQ_send(op->set->cs->mq,
1368 * Signal to the client that the operation has finished and
1369 * destroy the operation.
1371 * @param cls operation to destroy
1374 send_client_done(void *cls)
1376 struct Operation *op = cls;
1377 struct GNUNET_MQ_Envelope *ev;
1378 struct GNUNET_SET_ResultMessage *rm;
1380 if (GNUNET_YES == op->state->client_done_sent)
1385 if (PHASE_DONE != op->state->phase)
1387 LOG(GNUNET_ERROR_TYPE_WARNING,
1388 "Union operation failed\n");
1389 GNUNET_STATISTICS_update(_GSS_statistics,
1390 "# Union operations failed",
1393 ev = GNUNET_MQ_msg(rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1394 rm->result_status = htons(GNUNET_SET_STATUS_FAILURE);
1395 rm->request_id = htonl(op->client_request_id);
1396 rm->element_type = htons(0);
1397 GNUNET_MQ_send(op->set->cs->mq,
1402 op->state->client_done_sent = GNUNET_YES;
1404 GNUNET_STATISTICS_update(_GSS_statistics,
1405 "# Union operations succeeded",
1408 LOG(GNUNET_ERROR_TYPE_INFO,
1409 "Signalling client that union operation is done\n");
1410 ev = GNUNET_MQ_msg(rm,
1411 GNUNET_MESSAGE_TYPE_SET_RESULT);
1412 rm->request_id = htonl(op->client_request_id);
1413 rm->result_status = htons(GNUNET_SET_STATUS_DONE);
1414 rm->element_type = htons(0);
1415 rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element));
1416 GNUNET_MQ_send(op->set->cs->mq,
1422 * Tests if the operation is finished, and if so notify.
1424 * @param op operation to check
1427 maybe_finish(struct Operation *op)
1429 unsigned int num_demanded;
1431 num_demanded = GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes);
1433 if (PHASE_FINISH_WAITING == op->state->phase)
1435 LOG(GNUNET_ERROR_TYPE_DEBUG,
1436 "In PHASE_FINISH_WAITING, pending %u demands\n",
1438 if (0 == num_demanded)
1440 struct GNUNET_MQ_Envelope *ev;
1442 op->state->phase = PHASE_DONE;
1443 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1444 GNUNET_MQ_send(op->mq,
1446 /* We now wait until the other peer sends P2P_OVER
1447 * after it got all elements from us. */
1450 if (PHASE_FINISH_CLOSING == op->state->phase)
1452 LOG(GNUNET_ERROR_TYPE_DEBUG,
1453 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1455 if (0 == num_demanded)
1457 op->state->phase = PHASE_DONE;
1458 send_client_done(op);
1459 _GSS_operation_destroy2(op);
1466 * Check an element message from a remote peer.
1468 * @param cls the union operation
1469 * @param emsg the message
1472 check_union_p2p_elements(void *cls,
1473 const struct GNUNET_SET_ElementMessage *emsg)
1475 struct Operation *op = cls;
1477 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1480 return GNUNET_SYSERR;
1482 if (0 == GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes))
1485 return GNUNET_SYSERR;
1492 * Handle an element message from a remote peer.
1493 * Sent by the other peer either because we decoded an IBF and placed a demand,
1494 * or because the other peer switched to full set transmission.
1496 * @param cls the union operation
1497 * @param emsg the message
1500 handle_union_p2p_elements(void *cls,
1501 const struct GNUNET_SET_ElementMessage *emsg)
1503 struct Operation *op = cls;
1504 struct ElementEntry *ee;
1505 struct KeyEntry *ke;
1506 uint16_t element_size;
1508 element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage);
1509 ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size);
1510 GNUNET_memcpy(&ee[1],
1513 ee->element.size = element_size;
1514 ee->element.data = &ee[1];
1515 ee->element.element_type = ntohs(emsg->element_type);
1516 ee->remote = GNUNET_YES;
1517 GNUNET_SET_element_hash(&ee->element,
1520 GNUNET_CONTAINER_multihashmap_remove(op->state->demanded_hashes,
1524 /* We got something we didn't demand, since it's not in our map. */
1526 fail_union_operation(op);
1530 LOG(GNUNET_ERROR_TYPE_DEBUG,
1531 "Got element (size %u, hash %s) from peer\n",
1532 (unsigned int)element_size,
1533 GNUNET_h2s(&ee->element_hash));
1535 GNUNET_STATISTICS_update(_GSS_statistics,
1536 "# received elements",
1539 GNUNET_STATISTICS_update(_GSS_statistics,
1540 "# exchanged elements",
1544 op->state->received_total++;
1546 ke = op_get_element(op, &ee->element_hash);
1549 /* Got repeated element. Should not happen since
1550 * we track demands. */
1551 GNUNET_STATISTICS_update(_GSS_statistics,
1552 "# repeated elements",
1555 ke->received = GNUNET_YES;
1560 LOG(GNUNET_ERROR_TYPE_DEBUG,
1561 "Registering new element from remote peer\n");
1562 op->state->received_fresh++;
1563 op_register_element(op, ee, GNUNET_YES);
1564 /* only send results immediately if the client wants it */
1565 switch (op->result_mode)
1567 case GNUNET_SET_RESULT_ADDED:
1568 send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK);
1571 case GNUNET_SET_RESULT_SYMMETRIC:
1572 send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1576 /* Result mode not supported, should have been caught earlier. */
1582 if ((op->state->received_total > 8) &&
1583 (op->state->received_fresh < op->state->received_total / 3))
1585 /* The other peer gave us lots of old elements, there's something wrong. */
1587 fail_union_operation(op);
1590 GNUNET_CADET_receive_done(op->channel);
1596 * Check a full element message from a remote peer.
1598 * @param cls the union operation
1599 * @param emsg the message
1602 check_union_p2p_full_element(void *cls,
1603 const struct GNUNET_SET_ElementMessage *emsg)
1605 struct Operation *op = cls;
1607 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1610 return GNUNET_SYSERR;
1612 // FIXME: check that we expect full elements here?
1618 * Handle an element message from a remote peer.
1620 * @param cls the union operation
1621 * @param emsg the message
1624 handle_union_p2p_full_element(void *cls,
1625 const struct GNUNET_SET_ElementMessage *emsg)
1627 struct Operation *op = cls;
1628 struct ElementEntry *ee;
1629 struct KeyEntry *ke;
1630 uint16_t element_size;
1632 element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage);
1633 ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size);
1634 GNUNET_memcpy(&ee[1], &emsg[1], element_size);
1635 ee->element.size = element_size;
1636 ee->element.data = &ee[1];
1637 ee->element.element_type = ntohs(emsg->element_type);
1638 ee->remote = GNUNET_YES;
1639 GNUNET_SET_element_hash(&ee->element, &ee->element_hash);
1641 LOG(GNUNET_ERROR_TYPE_DEBUG,
1642 "Got element (full diff, size %u, hash %s) from peer\n",
1643 (unsigned int)element_size,
1644 GNUNET_h2s(&ee->element_hash));
1646 GNUNET_STATISTICS_update(_GSS_statistics,
1647 "# received elements",
1650 GNUNET_STATISTICS_update(_GSS_statistics,
1651 "# exchanged elements",
1655 op->state->received_total++;
1657 ke = op_get_element(op, &ee->element_hash);
1660 /* Got repeated element. Should not happen since
1661 * we track demands. */
1662 GNUNET_STATISTICS_update(_GSS_statistics,
1663 "# repeated elements",
1666 ke->received = GNUNET_YES;
1671 LOG(GNUNET_ERROR_TYPE_DEBUG,
1672 "Registering new element from remote peer\n");
1673 op->state->received_fresh++;
1674 op_register_element(op, ee, GNUNET_YES);
1675 /* only send results immediately if the client wants it */
1676 switch (op->result_mode)
1678 case GNUNET_SET_RESULT_ADDED:
1679 send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK);
1682 case GNUNET_SET_RESULT_SYMMETRIC:
1683 send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1687 /* Result mode not supported, should have been caught earlier. */
1693 if ((GNUNET_YES == op->byzantine) &&
1694 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1695 (op->state->received_fresh < op->state->received_total / 6))
1697 /* The other peer gave us lots of old elements, there's something wrong. */
1698 LOG(GNUNET_ERROR_TYPE_ERROR,
1699 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1700 (unsigned long long)op->state->received_fresh,
1701 (unsigned long long)op->state->received_total);
1703 fail_union_operation(op);
1706 GNUNET_CADET_receive_done(op->channel);
1711 * Send offers (for GNUNET_Hash-es) in response
1712 * to inquiries (for IBF_Key-s).
1714 * @param cls the union operation
1715 * @param msg the message
1718 check_union_p2p_inquiry(void *cls,
1719 const struct InquiryMessage *msg)
1721 struct Operation *op = cls;
1722 unsigned int num_keys;
1724 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1727 return GNUNET_SYSERR;
1729 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1732 return GNUNET_SYSERR;
1734 num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1735 / sizeof(struct IBF_Key);
1736 if ((ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1737 != num_keys * sizeof(struct IBF_Key))
1740 return GNUNET_SYSERR;
1747 * Send offers (for GNUNET_Hash-es) in response
1748 * to inquiries (for IBF_Key-s).
1750 * @param cls the union operation
1751 * @param msg the message
1754 handle_union_p2p_inquiry(void *cls,
1755 const struct InquiryMessage *msg)
1757 struct Operation *op = cls;
1758 const struct IBF_Key *ibf_key;
1759 unsigned int num_keys;
1761 LOG(GNUNET_ERROR_TYPE_DEBUG,
1762 "Received union inquiry\n");
1763 num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1764 / sizeof(struct IBF_Key);
1765 ibf_key = (const struct IBF_Key *)&msg[1];
1766 while (0 != num_keys--)
1768 struct IBF_Key unsalted_key;
1773 send_offers_for_key(op,
1777 GNUNET_CADET_receive_done(op->channel);
1782 * Iterator over hash map entries, called to
1783 * destroy the linked list of colliding ibf key entries.
1785 * @param cls closure
1786 * @param key current key code
1787 * @param value value in the hash map
1788 * @return #GNUNET_YES if we should continue to iterate,
1789 * #GNUNET_NO if not.
1792 send_missing_full_elements_iter(void *cls,
1796 struct Operation *op = cls;
1797 struct KeyEntry *ke = value;
1798 struct GNUNET_MQ_Envelope *ev;
1799 struct GNUNET_SET_ElementMessage *emsg;
1800 struct ElementEntry *ee = ke->element;
1802 if (GNUNET_YES == ke->received)
1804 ev = GNUNET_MQ_msg_extra(emsg,
1806 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1807 GNUNET_memcpy(&emsg[1],
1810 emsg->element_type = htons(ee->element.element_type);
1811 GNUNET_MQ_send(op->mq,
1818 * Handle a request for full set transmission.
1820 * @parem cls closure, a set union operation
1821 * @param mh the demand message
1824 handle_union_p2p_request_full(void *cls,
1825 const struct GNUNET_MessageHeader *mh)
1827 struct Operation *op = cls;
1829 LOG(GNUNET_ERROR_TYPE_DEBUG,
1830 "Received request for full set transmission\n");
1831 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1834 fail_union_operation(op);
1837 if (PHASE_EXPECT_IBF != op->state->phase)
1840 fail_union_operation(op);
1844 // FIXME: we need to check that our set is larger than the
1845 // byzantine_lower_bound by some threshold
1847 GNUNET_CADET_receive_done(op->channel);
1852 * Handle a "full done" message.
1854 * @parem cls closure, a set union operation
1855 * @param mh the demand message
1858 handle_union_p2p_full_done(void *cls,
1859 const struct GNUNET_MessageHeader *mh)
1861 struct Operation *op = cls;
1863 switch (op->state->phase)
1865 case PHASE_EXPECT_IBF:
1867 struct GNUNET_MQ_Envelope *ev;
1869 LOG(GNUNET_ERROR_TYPE_DEBUG,
1870 "got FULL DONE, sending elements that other peer is missing\n");
1872 /* send all the elements that did not come from the remote peer */
1873 GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element,
1874 &send_missing_full_elements_iter,
1877 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1878 GNUNET_MQ_send(op->mq,
1880 op->state->phase = PHASE_DONE;
1881 /* we now wait until the other peer sends us the OVER message*/
1885 case PHASE_FULL_SENDING:
1887 LOG(GNUNET_ERROR_TYPE_DEBUG,
1888 "got FULL DONE, finishing\n");
1889 /* We sent the full set, and got the response for that. We're done. */
1890 op->state->phase = PHASE_DONE;
1891 GNUNET_CADET_receive_done(op->channel);
1892 send_client_done(op);
1893 _GSS_operation_destroy2(op);
1899 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1900 "Handle full done phase is %u\n",
1901 (unsigned)op->state->phase);
1903 fail_union_operation(op);
1906 GNUNET_CADET_receive_done(op->channel);
1911 * Check a demand by the other peer for elements based on a list
1912 * of `struct GNUNET_HashCode`s.
1914 * @parem cls closure, a set union operation
1915 * @param mh the demand message
1916 * @return #GNUNET_OK if @a mh is well-formed
1919 check_union_p2p_demand(void *cls,
1920 const struct GNUNET_MessageHeader *mh)
1922 struct Operation *op = cls;
1923 unsigned int num_hashes;
1925 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1928 return GNUNET_SYSERR;
1930 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1931 / sizeof(struct GNUNET_HashCode);
1932 if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1933 != num_hashes * sizeof(struct GNUNET_HashCode))
1936 return GNUNET_SYSERR;
1943 * Handle a demand by the other peer for elements based on a list
1944 * of `struct GNUNET_HashCode`s.
1946 * @parem cls closure, a set union operation
1947 * @param mh the demand message
1950 handle_union_p2p_demand(void *cls,
1951 const struct GNUNET_MessageHeader *mh)
1953 struct Operation *op = cls;
1954 struct ElementEntry *ee;
1955 struct GNUNET_SET_ElementMessage *emsg;
1956 const struct GNUNET_HashCode *hash;
1957 unsigned int num_hashes;
1958 struct GNUNET_MQ_Envelope *ev;
1960 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1961 / sizeof(struct GNUNET_HashCode);
1962 for (hash = (const struct GNUNET_HashCode *)&mh[1];
1964 hash++, num_hashes--)
1966 ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements,
1970 /* Demand for non-existing element. */
1972 fail_union_operation(op);
1975 if (GNUNET_NO == _GSS_is_element_of_operation(ee, op))
1977 /* Probably confused lazily copied sets. */
1979 fail_union_operation(op);
1982 ev = GNUNET_MQ_msg_extra(emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1983 GNUNET_memcpy(&emsg[1], ee->element.data, ee->element.size);
1984 emsg->reserved = htons(0);
1985 emsg->element_type = htons(ee->element.element_type);
1986 LOG(GNUNET_ERROR_TYPE_DEBUG,
1987 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1989 (unsigned int)ee->element.size,
1990 GNUNET_h2s(&ee->element_hash));
1991 GNUNET_MQ_send(op->mq, ev);
1992 GNUNET_STATISTICS_update(_GSS_statistics,
1993 "# exchanged elements",
1997 switch (op->result_mode)
1999 case GNUNET_SET_RESULT_ADDED:
2000 /* Nothing to do. */
2003 case GNUNET_SET_RESULT_SYMMETRIC:
2004 send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2008 /* Result mode not supported, should have been caught earlier. */
2013 GNUNET_CADET_receive_done(op->channel);
2018 * Check offer (of `struct GNUNET_HashCode`s).
2020 * @param cls the union operation
2021 * @param mh the message
2022 * @return #GNUNET_OK if @a mh is well-formed
2025 check_union_p2p_offer(void *cls,
2026 const struct GNUNET_MessageHeader *mh)
2028 struct Operation *op = cls;
2029 unsigned int num_hashes;
2031 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2034 return GNUNET_SYSERR;
2036 /* look up elements and send them */
2037 if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2038 (op->state->phase != PHASE_INVENTORY_ACTIVE))
2041 return GNUNET_SYSERR;
2043 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
2044 / sizeof(struct GNUNET_HashCode);
2045 if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2046 num_hashes * sizeof(struct GNUNET_HashCode))
2049 return GNUNET_SYSERR;
2056 * Handle offers (of `struct GNUNET_HashCode`s) and
2057 * respond with demands (of `struct GNUNET_HashCode`s).
2059 * @param cls the union operation
2060 * @param mh the message
2063 handle_union_p2p_offer(void *cls,
2064 const struct GNUNET_MessageHeader *mh)
2066 struct Operation *op = cls;
2067 const struct GNUNET_HashCode *hash;
2068 unsigned int num_hashes;
2070 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
2071 / sizeof(struct GNUNET_HashCode);
2072 for (hash = (const struct GNUNET_HashCode *)&mh[1];
2074 hash++, num_hashes--)
2076 struct ElementEntry *ee;
2077 struct GNUNET_MessageHeader *demands;
2078 struct GNUNET_MQ_Envelope *ev;
2080 ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements,
2083 if (GNUNET_YES == _GSS_is_element_of_operation(ee, op))
2087 GNUNET_CONTAINER_multihashmap_contains(op->state->demanded_hashes,
2090 LOG(GNUNET_ERROR_TYPE_DEBUG,
2091 "Skipped sending duplicate demand\n");
2095 GNUNET_assert(GNUNET_OK ==
2096 GNUNET_CONTAINER_multihashmap_put(op->state->demanded_hashes,
2099 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2101 LOG(GNUNET_ERROR_TYPE_DEBUG,
2102 "[OP %x] Requesting element (hash %s)\n",
2103 (void *)op, GNUNET_h2s(hash));
2104 ev = GNUNET_MQ_msg_header_extra(demands,
2105 sizeof(struct GNUNET_HashCode),
2106 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2107 GNUNET_memcpy(&demands[1],
2109 sizeof(struct GNUNET_HashCode));
2110 GNUNET_MQ_send(op->mq, ev);
2112 GNUNET_CADET_receive_done(op->channel);
2117 * Handle a done message from a remote peer
2119 * @param cls the union operation
2120 * @param mh the message
2123 handle_union_p2p_done(void *cls,
2124 const struct GNUNET_MessageHeader *mh)
2126 struct Operation *op = cls;
2128 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2131 fail_union_operation(op);
2134 switch (op->state->phase)
2136 case PHASE_INVENTORY_PASSIVE:
2137 /* We got all requests, but still have to send our elements in response. */
2138 op->state->phase = PHASE_FINISH_WAITING;
2140 LOG(GNUNET_ERROR_TYPE_DEBUG,
2141 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2142 /* The active peer is done sending offers
2143 * and inquiries. This means that all
2144 * our responses to that (demands and offers)
2145 * must be in flight (queued or in mesh).
2147 * We should notify the active peer once
2148 * all our demands are satisfied, so that the active
2149 * peer can quit if we gave it everything.
2151 GNUNET_CADET_receive_done(op->channel);
2155 case PHASE_INVENTORY_ACTIVE:
2156 LOG(GNUNET_ERROR_TYPE_DEBUG,
2157 "got DONE (as active partner), waiting to finish\n");
2158 /* All demands of the other peer are satisfied,
2159 * and we processed all offers, thus we know
2160 * exactly what our demands must be.
2162 * We'll close the channel
2163 * to the other peer once our demands are met.
2165 op->state->phase = PHASE_FINISH_CLOSING;
2166 GNUNET_CADET_receive_done(op->channel);
2172 fail_union_operation(op);
2178 * Handle a over message from a remote peer
2180 * @param cls the union operation
2181 * @param mh the message
2184 handle_union_p2p_over(void *cls,
2185 const struct GNUNET_MessageHeader *mh)
2187 send_client_done(cls);
2192 * Initiate operation to evaluate a set union with a remote peer.
2194 * @param op operation to perform (to be initialized)
2195 * @param opaque_context message to be transmitted to the listener
2196 * to convince it to accept, may be NULL
2198 static struct OperationState *
2199 union_evaluate(struct Operation *op,
2200 const struct GNUNET_MessageHeader *opaque_context)
2202 struct OperationState *state;
2203 struct GNUNET_MQ_Envelope *ev;
2204 struct OperationRequestMessage *msg;
2206 ev = GNUNET_MQ_msg_nested_mh(msg,
2207 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2211 /* the context message is too large */
2215 state = GNUNET_new(struct OperationState);
2216 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32,
2218 /* copy the current generation's strata estimator for this operation */
2219 state->se = strata_estimator_dup(op->set->state->se);
2220 /* we started the operation, thus we have to send the operation request */
2221 state->phase = PHASE_EXPECT_SE;
2222 state->salt_receive = state->salt_send = 42; // FIXME?????
2223 LOG(GNUNET_ERROR_TYPE_DEBUG,
2224 "Initiating union operation evaluation\n");
2225 GNUNET_STATISTICS_update(_GSS_statistics,
2226 "# of total union operations",
2229 GNUNET_STATISTICS_update(_GSS_statistics,
2230 "# of initiated union operations",
2233 msg->operation = htonl(GNUNET_SET_OPERATION_UNION);
2234 GNUNET_MQ_send(op->mq,
2237 if (NULL != opaque_context)
2238 LOG(GNUNET_ERROR_TYPE_DEBUG,
2239 "sent op request with context message\n");
2241 LOG(GNUNET_ERROR_TYPE_DEBUG,
2242 "sent op request without context message\n");
2245 initialize_key_to_element(op);
2246 state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element);
2252 * Accept an union operation request from a remote peer.
2253 * Only initializes the private operation state.
2255 * @param op operation that will be accepted as a union operation
2257 static struct OperationState *
2258 union_accept(struct Operation *op)
2260 struct OperationState *state;
2261 const struct StrataEstimator *se;
2262 struct GNUNET_MQ_Envelope *ev;
2263 struct StrataEstimatorMessage *strata_msg;
2268 LOG(GNUNET_ERROR_TYPE_DEBUG,
2269 "accepting set union operation\n");
2270 GNUNET_STATISTICS_update(_GSS_statistics,
2271 "# of accepted union operations",
2274 GNUNET_STATISTICS_update(_GSS_statistics,
2275 "# of total union operations",
2279 state = GNUNET_new(struct OperationState);
2280 state->se = strata_estimator_dup(op->set->state->se);
2281 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32,
2283 state->salt_receive = state->salt_send = 42; // FIXME?????
2285 initialize_key_to_element(op);
2286 state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element);
2288 /* kick off the operation */
2290 buf = GNUNET_malloc(se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2291 len = strata_estimator_write(se,
2293 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2294 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2296 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2297 ev = GNUNET_MQ_msg_extra(strata_msg,
2300 GNUNET_memcpy(&strata_msg[1],
2304 strata_msg->set_size
2305 = GNUNET_htonll(GNUNET_CONTAINER_multihashmap_size(op->set->content->elements));
2306 GNUNET_MQ_send(op->mq,
2308 state->phase = PHASE_EXPECT_IBF;
2314 * Create a new set supporting the union operation
2316 * We maintain one strata estimator per set and then manipulate it over the
2317 * lifetime of the set, as recreating a strata estimator would be expensive.
2319 * @return the newly created set, NULL on error
2321 static struct SetState *
2322 union_set_create(void)
2324 struct SetState *set_state;
2326 LOG(GNUNET_ERROR_TYPE_DEBUG,
2327 "union set created\n");
2328 set_state = GNUNET_new(struct SetState);
2329 set_state->se = strata_estimator_create(SE_STRATA_COUNT,
2330 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2331 if (NULL == set_state->se)
2333 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
2334 "Failed to allocate strata estimator\n");
2335 GNUNET_free(set_state);
2343 * Add the element from the given element message to the set.
2345 * @param set_state state of the set want to add to
2346 * @param ee the element to add to the set
2349 union_add(struct SetState *set_state,
2350 struct ElementEntry *ee)
2352 strata_estimator_insert(set_state->se,
2353 get_ibf_key(&ee->element_hash));
2358 * Remove the element given in the element message from the set.
2359 * Only marks the element as removed, so that older set operations can still exchange it.
2361 * @param set_state state of the set to remove from
2362 * @param ee set element to remove
2365 union_remove(struct SetState *set_state,
2366 struct ElementEntry *ee)
2368 strata_estimator_remove(set_state->se,
2369 get_ibf_key(&ee->element_hash));
2374 * Destroy a set that supports the union operation.
2376 * @param set_state the set to destroy
2379 union_set_destroy(struct SetState *set_state)
2381 if (NULL != set_state->se)
2383 strata_estimator_destroy(set_state->se);
2384 set_state->se = NULL;
2386 GNUNET_free(set_state);
2391 * Copy union-specific set state.
2393 * @param state source state for copying the union state
2394 * @return a copy of the union-specific set state
2396 static struct SetState *
2397 union_copy_state(struct SetState *state)
2399 struct SetState *new_state;
2401 GNUNET_assert((NULL != state) &&
2402 (NULL != state->se));
2403 new_state = GNUNET_new(struct SetState);
2404 new_state->se = strata_estimator_dup(state->se);
2411 * Handle case where channel went down for an operation.
2413 * @param op operation that lost the channel
2416 union_channel_death(struct Operation *op)
2418 send_client_done(op);
2419 _GSS_operation_destroy(op,
2425 * Get the table with implementing functions for
2428 * @return the operation specific VTable
2430 const struct SetVT *
2433 static const struct SetVT union_vt = {
2434 .create = &union_set_create,
2436 .remove = &union_remove,
2437 .destroy_set = &union_set_destroy,
2438 .evaluate = &union_evaluate,
2439 .accept = &union_accept,
2440 .cancel = &union_op_cancel,
2441 .copy_state = &union_copy_state,
2442 .channel_death = &union_channel_death