2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file set/gnunet-service-set_union.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union.h"
32 #include "gnunet-service-set_union_strata_estimator.h"
33 #include "gnunet-service-set_protocol.h"
37 #define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__)
41 * Number of IBFs in a strata estimator.
43 #define SE_STRATA_COUNT 32
46 * Size of the IBFs in the strata estimator.
48 #define SE_IBF_SIZE 80
51 * The hash num parameter for the difference digests and strata estimators.
53 #define SE_IBF_HASH_NUM 4
56 * Number of buckets that can be transmitted in one message.
58 #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
61 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
62 * Choose this value so that computing the IBF is still cheaper
63 * than transmitting all values.
65 #define MAX_IBF_ORDER (20)
68 * Number of buckets used in the ibf per estimated
75 * Current phase we are in for a union operation.
77 enum UnionOperationPhase
80 * We sent the request message, and expect a strata estimator.
85 * We sent the strata estimator, and expect an IBF. This phase is entered once
86 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
88 * XXX: could use better wording.
89 * XXX: repurposed to also expect a "request full set" message, should be renamed
91 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
96 * Continuation for multi part IBFs.
98 PHASE_EXPECT_IBF_CONT,
101 * We are decoding an IBF.
103 PHASE_INVENTORY_ACTIVE,
106 * The other peer is decoding the IBF we just sent.
108 PHASE_INVENTORY_PASSIVE,
111 * The protocol is almost finished, but we still have to flush our message
112 * queue and/or expect some elements.
114 PHASE_FINISH_CLOSING,
117 * In the penultimate phase,
118 * we wait until all our demands
119 * are satisfied. Then we send a done
120 * message, and wait for another done message.
122 PHASE_FINISH_WAITING,
125 * In the ultimate phase, we wait until
126 * our demands are satisfied and then
127 * quit (sending another DONE message).
132 * After sending the full set, wait for responses with the elements
133 * that the local peer is missing.
140 * State of an evaluate operation with another peer.
142 struct OperationState
145 * Copy of the set's strata estimator at the time of
146 * creation of this operation.
148 struct StrataEstimator *se;
151 * The IBF we currently receive.
153 struct InvertibleBloomFilter *remote_ibf;
156 * The IBF with the local set's element.
158 struct InvertibleBloomFilter *local_ibf;
161 * Maps unsalted IBF-Keys to elements.
162 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
163 * Colliding IBF-Keys are linked.
165 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
168 * Current state of the operation.
170 enum UnionOperationPhase phase;
173 * Did we send the client that we are done?
175 int client_done_sent;
178 * Number of ibf buckets already received into the @a remote_ibf.
180 unsigned int ibf_buckets_received;
183 * Hashes for elements that we have demanded from the other peer.
185 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
188 * Salt that we're using for sending IBFs
193 * Salt for the IBF we've received and that we're currently decoding.
195 uint32_t salt_receive;
198 * Number of elements we received from the other peer
199 * that were not in the local set yet.
201 uint32_t received_fresh;
204 * Total number of elements received from the other peer.
206 uint32_t received_total;
209 * Initial size of our set, just before
210 * the operation started.
212 uint64_t initial_size;
217 * The key entry is used to associate an ibf key with an element.
222 * IBF key for the entry, derived from the current salt.
224 struct IBF_Key ibf_key;
227 * The actual element associated with the key.
229 * Only owned by the union operation if element->operation
232 struct ElementEntry *element;
235 * Did we receive this element?
236 * Even if element->is_foreign is false, we might
237 * have received the element, so this indicates that
238 * the other peer has it.
245 * Used as a closure for sending elements
246 * with a specific IBF key.
248 struct SendElementClosure
251 * The IBF key whose matching elements should be
254 struct IBF_Key ibf_key;
257 * Operation for which the elements
260 struct Operation *op;
265 * Extra state required for efficient set union.
270 * The strata estimator is only generated once for
272 * The IBF keys are derived from the element hashes with
275 struct StrataEstimator *se;
280 * Iterator over hash map entries, called to
281 * destroy the linked list of colliding ibf key entries.
284 * @param key current key code
285 * @param value value in the hash map
286 * @return #GNUNET_YES if we should continue to iterate,
290 destroy_key_to_element_iter (void *cls,
294 struct KeyEntry *k = value;
296 GNUNET_assert (NULL != k);
297 if (GNUNET_YES == k->element->remote)
299 GNUNET_free (k->element);
308 * Destroy the union operation. Only things specific to the union
309 * operation are destroyed.
311 * @param op union operation to destroy
314 union_op_cancel (struct Operation *op)
316 LOG (GNUNET_ERROR_TYPE_DEBUG,
317 "destroying union op\n");
318 /* check if the op was canceled twice */
319 GNUNET_assert (NULL != op->state);
320 if (NULL != op->state->remote_ibf)
322 ibf_destroy (op->state->remote_ibf);
323 op->state->remote_ibf = NULL;
325 if (NULL != op->state->demanded_hashes)
327 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
328 op->state->demanded_hashes = NULL;
330 if (NULL != op->state->local_ibf)
332 ibf_destroy (op->state->local_ibf);
333 op->state->local_ibf = NULL;
335 if (NULL != op->state->se)
337 strata_estimator_destroy (op->state->se);
338 op->state->se = NULL;
340 if (NULL != op->state->key_to_element)
342 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
343 &destroy_key_to_element_iter,
345 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
346 op->state->key_to_element = NULL;
348 GNUNET_free (op->state);
350 LOG (GNUNET_ERROR_TYPE_DEBUG,
351 "destroying union op done\n");
356 * Inform the client that the union operation has failed,
357 * and proceed to destroy the evaluate operation.
359 * @param op the union operation to fail
362 fail_union_operation (struct Operation *op)
364 struct GNUNET_MQ_Envelope *ev;
365 struct GNUNET_SET_ResultMessage *msg;
367 LOG (GNUNET_ERROR_TYPE_WARNING,
368 "union operation failed\n");
369 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371 msg->request_id = htonl (op->client_request_id);
372 msg->element_type = htons (0);
373 GNUNET_MQ_send (op->set->cs->mq,
375 _GSS_operation_destroy (op, GNUNET_YES);
380 * Derive the IBF key from a hash code and
383 * @param src the hash code
384 * @return the derived IBF key
386 static struct IBF_Key
387 get_ibf_key (const struct GNUNET_HashCode *src)
392 GNUNET_assert (GNUNET_OK ==
393 GNUNET_CRYPTO_kdf (&key, sizeof(key),
402 * Context for #op_get_element_iterator
404 struct GetElementContext
409 struct GNUNET_HashCode hash;
419 * Iterator over the mapping from IBF keys to element entries. Checks if we
420 * have an element with a given GNUNET_HashCode.
423 * @param key current key code
424 * @param value value in the hash map
425 * @return #GNUNET_YES if we should search further,
426 * #GNUNET_NO if we've found the element.
429 op_get_element_iterator (void *cls,
433 struct GetElementContext *ctx = cls;
434 struct KeyEntry *k = value;
436 GNUNET_assert (NULL != k);
437 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
448 * Determine whether the given element is already in the operation's element
451 * @param op operation that should be tested for 'element_hash'
452 * @param element_hash hash of the element to look for
453 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
455 static struct KeyEntry *
456 op_get_element (struct Operation *op,
457 const struct GNUNET_HashCode *element_hash)
460 struct IBF_Key ibf_key;
461 struct GetElementContext ctx = { { { 0 } }, 0 };
463 ctx.hash = *element_hash;
465 ibf_key = get_ibf_key (element_hash);
466 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
467 (uint32_t) ibf_key.key_val,
468 op_get_element_iterator,
471 /* was the iteration aborted because we found the element? */
472 if (GNUNET_SYSERR == ret)
474 GNUNET_assert (NULL != ctx.k);
482 * Insert an element into the union operation's
483 * key-to-element mapping. Takes ownership of 'ee'.
484 * Note that this does not insert the element in the set,
485 * only in the operation's key-element mapping.
486 * This is done to speed up re-tried operations, if some elements
487 * were transmitted, and then the IBF fails to decode.
489 * XXX: clarify ownership, doesn't sound right.
491 * @param op the union operation
492 * @param ee the element entry
493 * @parem received was this element received from the remote peer?
496 op_register_element (struct Operation *op,
497 struct ElementEntry *ee,
500 struct IBF_Key ibf_key;
503 ibf_key = get_ibf_key (&ee->element_hash);
504 k = GNUNET_new (struct KeyEntry);
506 k->ibf_key = ibf_key;
507 k->received = received;
508 GNUNET_assert (GNUNET_OK ==
509 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
510 (uint32_t) ibf_key.key_val,
512 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
520 salt_key (const struct IBF_Key *k_in,
522 struct IBF_Key *k_out)
525 uint64_t x = k_in->key_val;
528 x = (x >> s) | (x << (64 - s));
537 unsalt_key (const struct IBF_Key *k_in,
539 struct IBF_Key *k_out)
542 uint64_t x = k_in->key_val;
544 x = (x << s) | (x >> (64 - s));
550 * Insert a key into an ibf.
554 * @param value the key entry to get the key from
557 prepare_ibf_iterator (void *cls,
561 struct Operation *op = cls;
562 struct KeyEntry *ke = value;
563 struct IBF_Key salted_key;
565 LOG (GNUNET_ERROR_TYPE_DEBUG,
566 "[OP %x] inserting %lx (hash %s) into ibf\n",
568 (unsigned long) ke->ibf_key.key_val,
569 GNUNET_h2s (&ke->element->element_hash));
570 salt_key (&ke->ibf_key,
571 op->state->salt_send,
573 ibf_insert (op->state->local_ibf, salted_key);
579 * Iterator for initializing the
580 * key-to-element mapping of a union operation
582 * @param cls the union operation `struct Operation *`
584 * @param value the `struct ElementEntry *` to insert
585 * into the key-to-element mapping
586 * @return #GNUNET_YES (to continue iterating)
589 init_key_to_element_iterator (void *cls,
590 const struct GNUNET_HashCode *key,
593 struct Operation *op = cls;
594 struct ElementEntry *ee = value;
596 /* make sure that the element belongs to the set at the time
597 * of creating the operation */
599 _GSS_is_element_of_operation (ee,
602 GNUNET_assert (GNUNET_NO == ee->remote);
603 op_register_element (op,
611 * Initialize the IBF key to element mapping local to this set
614 * @param op the set union operation
617 initialize_key_to_element (struct Operation *op)
621 GNUNET_assert (NULL == op->state->key_to_element);
622 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
623 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
624 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
625 &init_key_to_element_iterator,
631 * Create an ibf with the operation's elements
632 * of the specified size
634 * @param op the union operation
635 * @param size size of the ibf to create
636 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
639 prepare_ibf (struct Operation *op,
642 GNUNET_assert (NULL != op->state->key_to_element);
644 if (NULL != op->state->local_ibf)
645 ibf_destroy (op->state->local_ibf);
646 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
647 if (NULL == op->state->local_ibf)
649 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
650 "Failed to allocate local IBF\n");
651 return GNUNET_SYSERR;
653 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
654 &prepare_ibf_iterator,
661 * Send an ibf of appropriate size.
663 * Fragments the IBF into multiple messages if necessary.
665 * @param op the union operation
666 * @param ibf_order order of the ibf to send, size=2^order
667 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
670 send_ibf (struct Operation *op,
673 unsigned int buckets_sent = 0;
674 struct InvertibleBloomFilter *ibf;
677 prepare_ibf (op, 1 << ibf_order))
679 /* allocation failed */
680 return GNUNET_SYSERR;
683 LOG (GNUNET_ERROR_TYPE_DEBUG,
684 "sending ibf of size %u\n",
688 char name[64] = { 0 };
689 snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
690 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
693 ibf = op->state->local_ibf;
695 while (buckets_sent < (1 << ibf_order))
697 unsigned int buckets_in_message;
698 struct GNUNET_MQ_Envelope *ev;
699 struct IBFMessage *msg;
701 buckets_in_message = (1 << ibf_order) - buckets_sent;
702 /* limit to maximum */
703 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
704 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
706 ev = GNUNET_MQ_msg_extra (msg,
707 buckets_in_message * IBF_BUCKET_SIZE,
708 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
711 msg->order = ibf_order;
712 msg->offset = htonl (buckets_sent);
713 msg->salt = htonl (op->state->salt_send);
714 ibf_write_slice (ibf, buckets_sent,
715 buckets_in_message, &msg[1]);
716 buckets_sent += buckets_in_message;
717 LOG (GNUNET_ERROR_TYPE_DEBUG,
718 "ibf chunk size %u, %u/%u sent\n",
722 GNUNET_MQ_send (op->mq, ev);
725 /* The other peer must decode the IBF, so
727 op->state->phase = PHASE_INVENTORY_PASSIVE;
733 * Compute the necessary order of an ibf
734 * from the size of the symmetric set difference.
736 * @param diff the difference
737 * @return the required size of the ibf
740 get_order_from_difference (unsigned int diff)
742 unsigned int ibf_order;
745 while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
746 ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
747 (ibf_order < MAX_IBF_ORDER))
749 // add one for correction
750 return ibf_order + 1;
755 * Send a set element.
757 * @param cls the union operation `struct Operation *`
759 * @param value the `struct ElementEntry *` to insert
760 * into the key-to-element mapping
761 * @return #GNUNET_YES (to continue iterating)
764 send_full_element_iterator (void *cls,
765 const struct GNUNET_HashCode *key,
768 struct Operation *op = cls;
769 struct GNUNET_SET_ElementMessage *emsg;
770 struct ElementEntry *ee = value;
771 struct GNUNET_SET_Element *el = &ee->element;
772 struct GNUNET_MQ_Envelope *ev;
774 LOG (GNUNET_ERROR_TYPE_DEBUG,
775 "Sending element %s\n",
777 ev = GNUNET_MQ_msg_extra (emsg,
779 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
780 emsg->element_type = htons (el->element_type);
781 GNUNET_memcpy (&emsg[1],
784 GNUNET_MQ_send (op->mq,
791 * Switch to full set transmission for @a op.
793 * @param op operation to switch to full set transmission.
796 send_full_set (struct Operation *op)
798 struct GNUNET_MQ_Envelope *ev;
800 op->state->phase = PHASE_FULL_SENDING;
801 LOG (GNUNET_ERROR_TYPE_DEBUG,
802 "Dedicing to transmit the full set\n");
803 /* FIXME: use a more memory-friendly way of doing this with an
804 iterator, just as we do in the non-full case! */
805 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
806 &send_full_element_iterator,
808 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
809 GNUNET_MQ_send (op->mq,
815 * Handle a strata estimator from a remote peer
817 * @param cls the union operation
818 * @param msg the message
821 check_union_p2p_strata_estimator (void *cls,
822 const struct StrataEstimatorMessage *msg)
824 struct Operation *op = cls;
828 if (op->state->phase != PHASE_EXPECT_SE)
831 return GNUNET_SYSERR;
833 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
835 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
836 if ((GNUNET_NO == is_compressed) &&
837 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
840 return GNUNET_SYSERR;
847 * Handle a strata estimator from a remote peer
849 * @param cls the union operation
850 * @param msg the message
853 handle_union_p2p_strata_estimator (void *cls,
854 const struct StrataEstimatorMessage *msg)
856 struct Operation *op = cls;
857 struct StrataEstimator *remote_se;
863 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
865 GNUNET_STATISTICS_update (_GSS_statistics,
866 "# bytes of SE received",
867 ntohs (msg->header.size),
869 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
870 other_size = GNUNET_ntohll (msg->set_size);
871 remote_se = strata_estimator_create (SE_STRATA_COUNT,
874 if (NULL == remote_se)
876 /* insufficient resources, fail */
877 fail_union_operation (op);
881 strata_estimator_read (&msg[1],
886 /* decompression failed */
887 strata_estimator_destroy (remote_se);
888 fail_union_operation (op);
891 GNUNET_assert (NULL != op->state->se);
892 diff = strata_estimator_difference (remote_se,
898 strata_estimator_destroy (remote_se);
899 strata_estimator_destroy (op->state->se);
900 op->state->se = NULL;
901 LOG (GNUNET_ERROR_TYPE_DEBUG,
902 "got se diff=%d, using ibf size %d\n",
904 1U << get_order_from_difference (diff));
909 set_debug = getenv ("GNUNET_SET_BENCHMARK");
910 if ((NULL != set_debug) &&
911 (0 == strcmp (set_debug, "1")))
913 FILE *f = fopen ("set.log", "a");
914 fprintf (f, "%llu\n", (unsigned long long) diff);
919 if ((GNUNET_YES == op->byzantine) &&
920 (other_size < op->byzantine_lower_bound))
923 fail_union_operation (op);
927 if ((GNUNET_YES == op->force_full) ||
928 (diff > op->state->initial_size / 4) ||
931 LOG (GNUNET_ERROR_TYPE_DEBUG,
932 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
934 op->state->initial_size);
935 GNUNET_STATISTICS_update (_GSS_statistics,
939 if ((op->state->initial_size <= other_size) ||
946 struct GNUNET_MQ_Envelope *ev;
948 LOG (GNUNET_ERROR_TYPE_DEBUG,
949 "Telling other peer that we expect its full set\n");
950 op->state->phase = PHASE_EXPECT_IBF;
951 ev = GNUNET_MQ_msg_header (
952 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
953 GNUNET_MQ_send (op->mq,
959 GNUNET_STATISTICS_update (_GSS_statistics,
965 get_order_from_difference (diff)))
967 /* Internal error, best we can do is shut the connection */
968 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
969 "Failed to send IBF, closing connection\n");
970 fail_union_operation (op);
974 GNUNET_CADET_receive_done (op->channel);
979 * Iterator to send elements to a remote peer
981 * @param cls closure with the element key and the union operation
983 * @param value the key entry
986 send_offers_iterator (void *cls,
990 struct SendElementClosure *sec = cls;
991 struct Operation *op = sec->op;
992 struct KeyEntry *ke = value;
993 struct GNUNET_MQ_Envelope *ev;
994 struct GNUNET_MessageHeader *mh;
996 /* Detect 32-bit key collision for the 64-bit IBF keys. */
997 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
1000 ev = GNUNET_MQ_msg_header_extra (mh,
1001 sizeof(struct GNUNET_HashCode),
1002 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
1004 GNUNET_assert (NULL != ev);
1005 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1006 LOG (GNUNET_ERROR_TYPE_DEBUG,
1007 "[OP %x] sending element offer (%s) to peer\n",
1009 GNUNET_h2s (&ke->element->element_hash));
1010 GNUNET_MQ_send (op->mq, ev);
1016 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1018 * @param op union operation
1019 * @param ibf_key IBF key of interest
1022 send_offers_for_key (struct Operation *op,
1023 struct IBF_Key ibf_key)
1025 struct SendElementClosure send_cls;
1027 send_cls.ibf_key = ibf_key;
1029 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
1030 op->state->key_to_element,
1033 &send_offers_iterator,
1039 * Decode which elements are missing on each side, and
1040 * send the appropriate offers and inquiries.
1042 * @param op union operation
1043 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1046 decode_and_send (struct Operation *op)
1049 struct IBF_Key last_key;
1051 unsigned int num_decoded;
1052 struct InvertibleBloomFilter *diff_ibf;
1054 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1058 op->state->remote_ibf->size))
1061 /* allocation failed */
1062 return GNUNET_SYSERR;
1064 diff_ibf = ibf_dup (op->state->local_ibf);
1065 ibf_subtract (diff_ibf,
1066 op->state->remote_ibf);
1068 ibf_destroy (op->state->remote_ibf);
1069 op->state->remote_ibf = NULL;
1071 LOG (GNUNET_ERROR_TYPE_DEBUG,
1072 "decoding IBF (size=%u)\n",
1076 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1081 int cycle_detected = GNUNET_NO;
1085 res = ibf_decode (diff_ibf, &side, &key);
1086 if (res == GNUNET_OK)
1088 LOG (GNUNET_ERROR_TYPE_DEBUG,
1089 "decoded ibf key %lx\n",
1090 (unsigned long) key.key_val);
1092 if ((num_decoded > diff_ibf->size) ||
1093 ((num_decoded > 1) &&
1094 (last_key.key_val == key.key_val)))
1096 LOG (GNUNET_ERROR_TYPE_DEBUG,
1097 "detected cyclic ibf (decoded %u/%u)\n",
1100 cycle_detected = GNUNET_YES;
1103 if ((GNUNET_SYSERR == res) ||
1104 (GNUNET_YES == cycle_detected))
1108 while (1 << next_order < diff_ibf->size)
1111 if (next_order <= MAX_IBF_ORDER)
1113 LOG (GNUNET_ERROR_TYPE_DEBUG,
1114 "decoding failed, sending larger ibf (size %u)\n",
1116 GNUNET_STATISTICS_update (_GSS_statistics,
1120 op->state->salt_send++;
1122 send_ibf (op, next_order))
1124 /* Internal error, best we can do is shut the connection */
1125 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1126 "Failed to send IBF, closing connection\n");
1127 fail_union_operation (op);
1128 ibf_destroy (diff_ibf);
1129 return GNUNET_SYSERR;
1134 GNUNET_STATISTICS_update (_GSS_statistics,
1135 "# of failed union operations (too large)",
1138 // XXX: Send the whole set, element-by-element
1139 LOG (GNUNET_ERROR_TYPE_ERROR,
1140 "set union failed: reached ibf limit\n");
1141 fail_union_operation (op);
1142 ibf_destroy (diff_ibf);
1143 return GNUNET_SYSERR;
1147 if (GNUNET_NO == res)
1149 struct GNUNET_MQ_Envelope *ev;
1151 LOG (GNUNET_ERROR_TYPE_DEBUG,
1152 "transmitted all values, sending DONE\n");
1153 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1154 GNUNET_MQ_send (op->mq, ev);
1155 /* We now wait until we get a DONE message back
1156 * and then wait for our MQ to be flushed and all our
1157 * demands be delivered. */
1162 struct IBF_Key unsalted_key;
1165 op->state->salt_receive,
1167 send_offers_for_key (op,
1170 else if (-1 == side)
1172 struct GNUNET_MQ_Envelope *ev;
1173 struct InquiryMessage *msg;
1175 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1176 * the effort additional complexity. */
1177 ev = GNUNET_MQ_msg_extra (msg,
1178 sizeof(struct IBF_Key),
1179 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1180 msg->salt = htonl (op->state->salt_receive);
1181 GNUNET_memcpy (&msg[1],
1183 sizeof(struct IBF_Key));
1184 LOG (GNUNET_ERROR_TYPE_DEBUG,
1185 "sending element inquiry for IBF key %lx\n",
1186 (unsigned long) key.key_val);
1187 GNUNET_MQ_send (op->mq, ev);
1194 ibf_destroy (diff_ibf);
1200 * Check an IBF message from a remote peer.
1202 * Reassemble the IBF from multiple pieces, and
1203 * process the whole IBF once possible.
1205 * @param cls the union operation
1206 * @param msg the header of the message
1207 * @return #GNUNET_OK if @a msg is well-formed
1210 check_union_p2p_ibf (void *cls,
1211 const struct IBFMessage *msg)
1213 struct Operation *op = cls;
1214 unsigned int buckets_in_message;
1216 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1218 GNUNET_break_op (0);
1219 return GNUNET_SYSERR;
1221 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1223 if (0 == buckets_in_message)
1225 GNUNET_break_op (0);
1226 return GNUNET_SYSERR;
1228 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1231 GNUNET_break_op (0);
1232 return GNUNET_SYSERR;
1234 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1236 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1238 GNUNET_break_op (0);
1239 return GNUNET_SYSERR;
1241 if (1 << msg->order != op->state->remote_ibf->size)
1243 GNUNET_break_op (0);
1244 return GNUNET_SYSERR;
1246 if (ntohl (msg->salt) != op->state->salt_receive)
1248 GNUNET_break_op (0);
1249 return GNUNET_SYSERR;
1252 else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1253 (op->state->phase != PHASE_EXPECT_IBF))
1255 GNUNET_break_op (0);
1256 return GNUNET_SYSERR;
1264 * Handle an IBF message from a remote peer.
1266 * Reassemble the IBF from multiple pieces, and
1267 * process the whole IBF once possible.
1269 * @param cls the union operation
1270 * @param msg the header of the message
1273 handle_union_p2p_ibf (void *cls,
1274 const struct IBFMessage *msg)
1276 struct Operation *op = cls;
1277 unsigned int buckets_in_message;
1279 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1281 if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1282 (op->state->phase == PHASE_EXPECT_IBF))
1284 op->state->phase = PHASE_EXPECT_IBF_CONT;
1285 GNUNET_assert (NULL == op->state->remote_ibf);
1286 LOG (GNUNET_ERROR_TYPE_DEBUG,
1287 "Creating new ibf of size %u\n",
1289 op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1290 op->state->salt_receive = ntohl (msg->salt);
1291 LOG (GNUNET_ERROR_TYPE_DEBUG,
1292 "Receiving new IBF with salt %u\n",
1293 op->state->salt_receive);
1294 if (NULL == op->state->remote_ibf)
1296 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1297 "Failed to parse remote IBF, closing connection\n");
1298 fail_union_operation (op);
1301 op->state->ibf_buckets_received = 0;
1302 if (0 != ntohl (msg->offset))
1304 GNUNET_break_op (0);
1305 fail_union_operation (op);
1311 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1312 LOG (GNUNET_ERROR_TYPE_DEBUG,
1313 "Received more of IBF\n");
1315 GNUNET_assert (NULL != op->state->remote_ibf);
1317 ibf_read_slice (&msg[1],
1318 op->state->ibf_buckets_received,
1320 op->state->remote_ibf);
1321 op->state->ibf_buckets_received += buckets_in_message;
1323 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1325 LOG (GNUNET_ERROR_TYPE_DEBUG,
1326 "received full ibf\n");
1327 op->state->phase = PHASE_INVENTORY_ACTIVE;
1329 decode_and_send (op))
1331 /* Internal error, best we can do is shut down */
1332 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1333 "Failed to decode IBF, closing connection\n");
1334 fail_union_operation (op);
1338 GNUNET_CADET_receive_done (op->channel);
1343 * Send a result message to the client indicating
1344 * that there is a new element.
1346 * @param op union operation
1347 * @param element element to send
1348 * @param status status to send with the new element
1351 send_client_element (struct Operation *op,
1352 struct GNUNET_SET_Element *element,
1355 struct GNUNET_MQ_Envelope *ev;
1356 struct GNUNET_SET_ResultMessage *rm;
1358 LOG (GNUNET_ERROR_TYPE_DEBUG,
1359 "sending element (size %u) to client\n",
1361 GNUNET_assert (0 != op->client_request_id);
1362 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1365 GNUNET_MQ_discard (ev);
1369 rm->result_status = htons (status);
1370 rm->request_id = htonl (op->client_request_id);
1371 rm->element_type = htons (element->element_type);
1372 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1373 op->state->key_to_element));
1374 GNUNET_memcpy (&rm[1],
1377 GNUNET_MQ_send (op->set->cs->mq,
1383 * Signal to the client that the operation has finished and
1384 * destroy the operation.
1386 * @param cls operation to destroy
1389 send_client_done (void *cls)
1391 struct Operation *op = cls;
1392 struct GNUNET_MQ_Envelope *ev;
1393 struct GNUNET_SET_ResultMessage *rm;
1395 if (GNUNET_YES == op->state->client_done_sent)
1400 if (PHASE_DONE != op->state->phase)
1402 LOG (GNUNET_ERROR_TYPE_WARNING,
1403 "Union operation failed\n");
1404 GNUNET_STATISTICS_update (_GSS_statistics,
1405 "# Union operations failed",
1408 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1409 rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1410 rm->request_id = htonl (op->client_request_id);
1411 rm->element_type = htons (0);
1412 GNUNET_MQ_send (op->set->cs->mq,
1417 op->state->client_done_sent = GNUNET_YES;
1419 GNUNET_STATISTICS_update (_GSS_statistics,
1420 "# Union operations succeeded",
1423 LOG (GNUNET_ERROR_TYPE_INFO,
1424 "Signalling client that union operation is done\n");
1425 ev = GNUNET_MQ_msg (rm,
1426 GNUNET_MESSAGE_TYPE_SET_RESULT);
1427 rm->request_id = htonl (op->client_request_id);
1428 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1429 rm->element_type = htons (0);
1430 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1431 op->state->key_to_element));
1432 GNUNET_MQ_send (op->set->cs->mq,
1438 * Tests if the operation is finished, and if so notify.
1440 * @param op operation to check
1443 maybe_finish (struct Operation *op)
1445 unsigned int num_demanded;
1447 num_demanded = GNUNET_CONTAINER_multihashmap_size (
1448 op->state->demanded_hashes);
1450 if (PHASE_FINISH_WAITING == op->state->phase)
1452 LOG (GNUNET_ERROR_TYPE_DEBUG,
1453 "In PHASE_FINISH_WAITING, pending %u demands\n",
1455 if (0 == num_demanded)
1457 struct GNUNET_MQ_Envelope *ev;
1459 op->state->phase = PHASE_DONE;
1460 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1461 GNUNET_MQ_send (op->mq,
1463 /* We now wait until the other peer sends P2P_OVER
1464 * after it got all elements from us. */
1467 if (PHASE_FINISH_CLOSING == op->state->phase)
1469 LOG (GNUNET_ERROR_TYPE_DEBUG,
1470 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1472 if (0 == num_demanded)
1474 op->state->phase = PHASE_DONE;
1475 send_client_done (op);
1476 _GSS_operation_destroy2 (op);
1483 * Check an element message from a remote peer.
1485 * @param cls the union operation
1486 * @param emsg the message
1489 check_union_p2p_elements (void *cls,
1490 const struct GNUNET_SET_ElementMessage *emsg)
1492 struct Operation *op = cls;
1494 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1496 GNUNET_break_op (0);
1497 return GNUNET_SYSERR;
1499 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1501 GNUNET_break_op (0);
1502 return GNUNET_SYSERR;
1509 * Handle an element message from a remote peer.
1510 * Sent by the other peer either because we decoded an IBF and placed a demand,
1511 * or because the other peer switched to full set transmission.
1513 * @param cls the union operation
1514 * @param emsg the message
1517 handle_union_p2p_elements (void *cls,
1518 const struct GNUNET_SET_ElementMessage *emsg)
1520 struct Operation *op = cls;
1521 struct ElementEntry *ee;
1522 struct KeyEntry *ke;
1523 uint16_t element_size;
1525 element_size = ntohs (emsg->header.size) - sizeof(struct
1526 GNUNET_SET_ElementMessage);
1527 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1528 GNUNET_memcpy (&ee[1],
1531 ee->element.size = element_size;
1532 ee->element.data = &ee[1];
1533 ee->element.element_type = ntohs (emsg->element_type);
1534 ee->remote = GNUNET_YES;
1535 GNUNET_SET_element_hash (&ee->element,
1538 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1542 /* We got something we didn't demand, since it's not in our map. */
1543 GNUNET_break_op (0);
1544 fail_union_operation (op);
1548 LOG (GNUNET_ERROR_TYPE_DEBUG,
1549 "Got element (size %u, hash %s) from peer\n",
1550 (unsigned int) element_size,
1551 GNUNET_h2s (&ee->element_hash));
1553 GNUNET_STATISTICS_update (_GSS_statistics,
1554 "# received elements",
1557 GNUNET_STATISTICS_update (_GSS_statistics,
1558 "# exchanged elements",
1562 op->state->received_total++;
1564 ke = op_get_element (op, &ee->element_hash);
1567 /* Got repeated element. Should not happen since
1568 * we track demands. */
1569 GNUNET_STATISTICS_update (_GSS_statistics,
1570 "# repeated elements",
1573 ke->received = GNUNET_YES;
1578 LOG (GNUNET_ERROR_TYPE_DEBUG,
1579 "Registering new element from remote peer\n");
1580 op->state->received_fresh++;
1581 op_register_element (op, ee, GNUNET_YES);
1582 /* only send results immediately if the client wants it */
1583 switch (op->result_mode)
1585 case GNUNET_SET_RESULT_ADDED:
1586 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1589 case GNUNET_SET_RESULT_SYMMETRIC:
1590 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1594 /* Result mode not supported, should have been caught earlier. */
1600 if ((op->state->received_total > 8) &&
1601 (op->state->received_fresh < op->state->received_total / 3))
1603 /* The other peer gave us lots of old elements, there's something wrong. */
1604 GNUNET_break_op (0);
1605 fail_union_operation (op);
1608 GNUNET_CADET_receive_done (op->channel);
1614 * Check a full element message from a remote peer.
1616 * @param cls the union operation
1617 * @param emsg the message
1620 check_union_p2p_full_element (void *cls,
1621 const struct GNUNET_SET_ElementMessage *emsg)
1623 struct Operation *op = cls;
1625 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1627 GNUNET_break_op (0);
1628 return GNUNET_SYSERR;
1630 // FIXME: check that we expect full elements here?
1636 * Handle an element message from a remote peer.
1638 * @param cls the union operation
1639 * @param emsg the message
1642 handle_union_p2p_full_element (void *cls,
1643 const struct GNUNET_SET_ElementMessage *emsg)
1645 struct Operation *op = cls;
1646 struct ElementEntry *ee;
1647 struct KeyEntry *ke;
1648 uint16_t element_size;
1650 element_size = ntohs (emsg->header.size) - sizeof(struct
1651 GNUNET_SET_ElementMessage);
1652 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1653 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1654 ee->element.size = element_size;
1655 ee->element.data = &ee[1];
1656 ee->element.element_type = ntohs (emsg->element_type);
1657 ee->remote = GNUNET_YES;
1658 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1660 LOG (GNUNET_ERROR_TYPE_DEBUG,
1661 "Got element (full diff, size %u, hash %s) from peer\n",
1662 (unsigned int) element_size,
1663 GNUNET_h2s (&ee->element_hash));
1665 GNUNET_STATISTICS_update (_GSS_statistics,
1666 "# received elements",
1669 GNUNET_STATISTICS_update (_GSS_statistics,
1670 "# exchanged elements",
1674 op->state->received_total++;
1676 ke = op_get_element (op, &ee->element_hash);
1679 /* Got repeated element. Should not happen since
1680 * we track demands. */
1681 GNUNET_STATISTICS_update (_GSS_statistics,
1682 "# repeated elements",
1685 ke->received = GNUNET_YES;
1690 LOG (GNUNET_ERROR_TYPE_DEBUG,
1691 "Registering new element from remote peer\n");
1692 op->state->received_fresh++;
1693 op_register_element (op, ee, GNUNET_YES);
1694 /* only send results immediately if the client wants it */
1695 switch (op->result_mode)
1697 case GNUNET_SET_RESULT_ADDED:
1698 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1701 case GNUNET_SET_RESULT_SYMMETRIC:
1702 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1706 /* Result mode not supported, should have been caught earlier. */
1712 if ((GNUNET_YES == op->byzantine) &&
1713 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1714 (op->state->received_fresh < op->state->received_total / 6))
1716 /* The other peer gave us lots of old elements, there's something wrong. */
1717 LOG (GNUNET_ERROR_TYPE_ERROR,
1718 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1719 (unsigned long long) op->state->received_fresh,
1720 (unsigned long long) op->state->received_total);
1721 GNUNET_break_op (0);
1722 fail_union_operation (op);
1725 GNUNET_CADET_receive_done (op->channel);
1730 * Send offers (for GNUNET_Hash-es) in response
1731 * to inquiries (for IBF_Key-s).
1733 * @param cls the union operation
1734 * @param msg the message
1737 check_union_p2p_inquiry (void *cls,
1738 const struct InquiryMessage *msg)
1740 struct Operation *op = cls;
1741 unsigned int num_keys;
1743 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1745 GNUNET_break_op (0);
1746 return GNUNET_SYSERR;
1748 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1750 GNUNET_break_op (0);
1751 return GNUNET_SYSERR;
1753 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1754 / sizeof(struct IBF_Key);
1755 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1756 != num_keys * sizeof(struct IBF_Key))
1758 GNUNET_break_op (0);
1759 return GNUNET_SYSERR;
1766 * Send offers (for GNUNET_Hash-es) in response
1767 * to inquiries (for IBF_Key-s).
1769 * @param cls the union operation
1770 * @param msg the message
1773 handle_union_p2p_inquiry (void *cls,
1774 const struct InquiryMessage *msg)
1776 struct Operation *op = cls;
1777 const struct IBF_Key *ibf_key;
1778 unsigned int num_keys;
1780 LOG (GNUNET_ERROR_TYPE_DEBUG,
1781 "Received union inquiry\n");
1782 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1783 / sizeof(struct IBF_Key);
1784 ibf_key = (const struct IBF_Key *) &msg[1];
1785 while (0 != num_keys--)
1787 struct IBF_Key unsalted_key;
1789 unsalt_key (ibf_key,
1792 send_offers_for_key (op,
1796 GNUNET_CADET_receive_done (op->channel);
1801 * Iterator over hash map entries, called to
1802 * destroy the linked list of colliding ibf key entries.
1804 * @param cls closure
1805 * @param key current key code
1806 * @param value value in the hash map
1807 * @return #GNUNET_YES if we should continue to iterate,
1808 * #GNUNET_NO if not.
1811 send_missing_full_elements_iter (void *cls,
1815 struct Operation *op = cls;
1816 struct KeyEntry *ke = value;
1817 struct GNUNET_MQ_Envelope *ev;
1818 struct GNUNET_SET_ElementMessage *emsg;
1819 struct ElementEntry *ee = ke->element;
1821 if (GNUNET_YES == ke->received)
1823 ev = GNUNET_MQ_msg_extra (emsg,
1825 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1826 GNUNET_memcpy (&emsg[1],
1829 emsg->element_type = htons (ee->element.element_type);
1830 GNUNET_MQ_send (op->mq,
1837 * Handle a request for full set transmission.
1839 * @parem cls closure, a set union operation
1840 * @param mh the demand message
1843 handle_union_p2p_request_full (void *cls,
1844 const struct GNUNET_MessageHeader *mh)
1846 struct Operation *op = cls;
1848 LOG (GNUNET_ERROR_TYPE_DEBUG,
1849 "Received request for full set transmission\n");
1850 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1852 GNUNET_break_op (0);
1853 fail_union_operation (op);
1856 if (PHASE_EXPECT_IBF != op->state->phase)
1858 GNUNET_break_op (0);
1859 fail_union_operation (op);
1863 // FIXME: we need to check that our set is larger than the
1864 // byzantine_lower_bound by some threshold
1866 GNUNET_CADET_receive_done (op->channel);
1871 * Handle a "full done" message.
1873 * @parem cls closure, a set union operation
1874 * @param mh the demand message
1877 handle_union_p2p_full_done (void *cls,
1878 const struct GNUNET_MessageHeader *mh)
1880 struct Operation *op = cls;
1882 switch (op->state->phase)
1884 case PHASE_EXPECT_IBF:
1886 struct GNUNET_MQ_Envelope *ev;
1888 LOG (GNUNET_ERROR_TYPE_DEBUG,
1889 "got FULL DONE, sending elements that other peer is missing\n");
1891 /* send all the elements that did not come from the remote peer */
1892 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1893 &send_missing_full_elements_iter,
1896 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1897 GNUNET_MQ_send (op->mq,
1899 op->state->phase = PHASE_DONE;
1900 /* we now wait until the other peer sends us the OVER message*/
1904 case PHASE_FULL_SENDING:
1906 LOG (GNUNET_ERROR_TYPE_DEBUG,
1907 "got FULL DONE, finishing\n");
1908 /* We sent the full set, and got the response for that. We're done. */
1909 op->state->phase = PHASE_DONE;
1910 GNUNET_CADET_receive_done (op->channel);
1911 send_client_done (op);
1912 _GSS_operation_destroy2 (op);
1918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919 "Handle full done phase is %u\n",
1920 (unsigned) op->state->phase);
1921 GNUNET_break_op (0);
1922 fail_union_operation (op);
1925 GNUNET_CADET_receive_done (op->channel);
1930 * Check a demand by the other peer for elements based on a list
1931 * of `struct GNUNET_HashCode`s.
1933 * @parem cls closure, a set union operation
1934 * @param mh the demand message
1935 * @return #GNUNET_OK if @a mh is well-formed
1938 check_union_p2p_demand (void *cls,
1939 const struct GNUNET_MessageHeader *mh)
1941 struct Operation *op = cls;
1942 unsigned int num_hashes;
1944 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1946 GNUNET_break_op (0);
1947 return GNUNET_SYSERR;
1949 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1950 / sizeof(struct GNUNET_HashCode);
1951 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1952 != num_hashes * sizeof(struct GNUNET_HashCode))
1954 GNUNET_break_op (0);
1955 return GNUNET_SYSERR;
1962 * Handle a demand by the other peer for elements based on a list
1963 * of `struct GNUNET_HashCode`s.
1965 * @parem cls closure, a set union operation
1966 * @param mh the demand message
1969 handle_union_p2p_demand (void *cls,
1970 const struct GNUNET_MessageHeader *mh)
1972 struct Operation *op = cls;
1973 struct ElementEntry *ee;
1974 struct GNUNET_SET_ElementMessage *emsg;
1975 const struct GNUNET_HashCode *hash;
1976 unsigned int num_hashes;
1977 struct GNUNET_MQ_Envelope *ev;
1979 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1980 / sizeof(struct GNUNET_HashCode);
1981 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1983 hash++, num_hashes--)
1985 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1989 /* Demand for non-existing element. */
1990 GNUNET_break_op (0);
1991 fail_union_operation (op);
1994 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1996 /* Probably confused lazily copied sets. */
1997 GNUNET_break_op (0);
1998 fail_union_operation (op);
2001 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size,
2002 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
2003 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
2004 emsg->reserved = htons (0);
2005 emsg->element_type = htons (ee->element.element_type);
2006 LOG (GNUNET_ERROR_TYPE_DEBUG,
2007 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
2009 (unsigned int) ee->element.size,
2010 GNUNET_h2s (&ee->element_hash));
2011 GNUNET_MQ_send (op->mq, ev);
2012 GNUNET_STATISTICS_update (_GSS_statistics,
2013 "# exchanged elements",
2017 switch (op->result_mode)
2019 case GNUNET_SET_RESULT_ADDED:
2020 /* Nothing to do. */
2023 case GNUNET_SET_RESULT_SYMMETRIC:
2024 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2028 /* Result mode not supported, should have been caught earlier. */
2033 GNUNET_CADET_receive_done (op->channel);
2038 * Check offer (of `struct GNUNET_HashCode`s).
2040 * @param cls the union operation
2041 * @param mh the message
2042 * @return #GNUNET_OK if @a mh is well-formed
2045 check_union_p2p_offer (void *cls,
2046 const struct GNUNET_MessageHeader *mh)
2048 struct Operation *op = cls;
2049 unsigned int num_hashes;
2051 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2053 GNUNET_break_op (0);
2054 return GNUNET_SYSERR;
2056 /* look up elements and send them */
2057 if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2058 (op->state->phase != PHASE_INVENTORY_ACTIVE))
2060 GNUNET_break_op (0);
2061 return GNUNET_SYSERR;
2063 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2064 / sizeof(struct GNUNET_HashCode);
2065 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2066 num_hashes * sizeof(struct GNUNET_HashCode))
2068 GNUNET_break_op (0);
2069 return GNUNET_SYSERR;
2076 * Handle offers (of `struct GNUNET_HashCode`s) and
2077 * respond with demands (of `struct GNUNET_HashCode`s).
2079 * @param cls the union operation
2080 * @param mh the message
2083 handle_union_p2p_offer (void *cls,
2084 const struct GNUNET_MessageHeader *mh)
2086 struct Operation *op = cls;
2087 const struct GNUNET_HashCode *hash;
2088 unsigned int num_hashes;
2090 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2091 / sizeof(struct GNUNET_HashCode);
2092 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2094 hash++, num_hashes--)
2096 struct ElementEntry *ee;
2097 struct GNUNET_MessageHeader *demands;
2098 struct GNUNET_MQ_Envelope *ev;
2100 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2103 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2107 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2110 LOG (GNUNET_ERROR_TYPE_DEBUG,
2111 "Skipped sending duplicate demand\n");
2115 GNUNET_assert (GNUNET_OK ==
2116 GNUNET_CONTAINER_multihashmap_put (
2117 op->state->demanded_hashes,
2120 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2122 LOG (GNUNET_ERROR_TYPE_DEBUG,
2123 "[OP %x] Requesting element (hash %s)\n",
2124 (void *) op, GNUNET_h2s (hash));
2125 ev = GNUNET_MQ_msg_header_extra (demands,
2126 sizeof(struct GNUNET_HashCode),
2127 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2128 GNUNET_memcpy (&demands[1],
2130 sizeof(struct GNUNET_HashCode));
2131 GNUNET_MQ_send (op->mq, ev);
2133 GNUNET_CADET_receive_done (op->channel);
2138 * Handle a done message from a remote peer
2140 * @param cls the union operation
2141 * @param mh the message
2144 handle_union_p2p_done (void *cls,
2145 const struct GNUNET_MessageHeader *mh)
2147 struct Operation *op = cls;
2149 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2151 GNUNET_break_op (0);
2152 fail_union_operation (op);
2155 switch (op->state->phase)
2157 case PHASE_INVENTORY_PASSIVE:
2158 /* We got all requests, but still have to send our elements in response. */
2159 op->state->phase = PHASE_FINISH_WAITING;
2161 LOG (GNUNET_ERROR_TYPE_DEBUG,
2162 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2163 /* The active peer is done sending offers
2164 * and inquiries. This means that all
2165 * our responses to that (demands and offers)
2166 * must be in flight (queued or in mesh).
2168 * We should notify the active peer once
2169 * all our demands are satisfied, so that the active
2170 * peer can quit if we gave it everything.
2171 */GNUNET_CADET_receive_done (op->channel);
2175 case PHASE_INVENTORY_ACTIVE:
2176 LOG (GNUNET_ERROR_TYPE_DEBUG,
2177 "got DONE (as active partner), waiting to finish\n");
2178 /* All demands of the other peer are satisfied,
2179 * and we processed all offers, thus we know
2180 * exactly what our demands must be.
2182 * We'll close the channel
2183 * to the other peer once our demands are met.
2184 */op->state->phase = PHASE_FINISH_CLOSING;
2185 GNUNET_CADET_receive_done (op->channel);
2190 GNUNET_break_op (0);
2191 fail_union_operation (op);
2198 * Handle a over message from a remote peer
2200 * @param cls the union operation
2201 * @param mh the message
2204 handle_union_p2p_over (void *cls,
2205 const struct GNUNET_MessageHeader *mh)
2207 send_client_done (cls);
2212 * Initiate operation to evaluate a set union with a remote peer.
2214 * @param op operation to perform (to be initialized)
2215 * @param opaque_context message to be transmitted to the listener
2216 * to convince it to accept, may be NULL
2218 static struct OperationState *
2219 union_evaluate (struct Operation *op,
2220 const struct GNUNET_MessageHeader *opaque_context)
2222 struct OperationState *state;
2223 struct GNUNET_MQ_Envelope *ev;
2224 struct OperationRequestMessage *msg;
2226 ev = GNUNET_MQ_msg_nested_mh (msg,
2227 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2231 /* the context message is too large */
2235 state = GNUNET_new (struct OperationState);
2236 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2238 /* copy the current generation's strata estimator for this operation */
2239 state->se = strata_estimator_dup (op->set->state->se);
2240 /* we started the operation, thus we have to send the operation request */
2241 state->phase = PHASE_EXPECT_SE;
2242 state->salt_receive = state->salt_send = 42; // FIXME?????
2243 LOG (GNUNET_ERROR_TYPE_DEBUG,
2244 "Initiating union operation evaluation\n");
2245 GNUNET_STATISTICS_update (_GSS_statistics,
2246 "# of total union operations",
2249 GNUNET_STATISTICS_update (_GSS_statistics,
2250 "# of initiated union operations",
2253 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2254 GNUNET_MQ_send (op->mq,
2257 if (NULL != opaque_context)
2258 LOG (GNUNET_ERROR_TYPE_DEBUG,
2259 "sent op request with context message\n");
2261 LOG (GNUNET_ERROR_TYPE_DEBUG,
2262 "sent op request without context message\n");
2265 initialize_key_to_element (op);
2266 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
2267 state->key_to_element);
2273 * Accept an union operation request from a remote peer.
2274 * Only initializes the private operation state.
2276 * @param op operation that will be accepted as a union operation
2278 static struct OperationState *
2279 union_accept (struct Operation *op)
2281 struct OperationState *state;
2282 const struct StrataEstimator *se;
2283 struct GNUNET_MQ_Envelope *ev;
2284 struct StrataEstimatorMessage *strata_msg;
2289 LOG (GNUNET_ERROR_TYPE_DEBUG,
2290 "accepting set union operation\n");
2291 GNUNET_STATISTICS_update (_GSS_statistics,
2292 "# of accepted union operations",
2295 GNUNET_STATISTICS_update (_GSS_statistics,
2296 "# of total union operations",
2300 state = GNUNET_new (struct OperationState);
2301 state->se = strata_estimator_dup (op->set->state->se);
2302 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2304 state->salt_receive = state->salt_send = 42; // FIXME?????
2306 initialize_key_to_element (op);
2307 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
2308 state->key_to_element);
2310 /* kick off the operation */
2312 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2313 len = strata_estimator_write (se,
2315 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2316 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2318 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2319 ev = GNUNET_MQ_msg_extra (strata_msg,
2322 GNUNET_memcpy (&strata_msg[1],
2326 strata_msg->set_size
2327 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
2328 op->set->content->elements));
2329 GNUNET_MQ_send (op->mq,
2331 state->phase = PHASE_EXPECT_IBF;
2337 * Create a new set supporting the union operation
2339 * We maintain one strata estimator per set and then manipulate it over the
2340 * lifetime of the set, as recreating a strata estimator would be expensive.
2342 * @return the newly created set, NULL on error
2344 static struct SetState *
2345 union_set_create (void)
2347 struct SetState *set_state;
2349 LOG (GNUNET_ERROR_TYPE_DEBUG,
2350 "union set created\n");
2351 set_state = GNUNET_new (struct SetState);
2352 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2353 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2354 if (NULL == set_state->se)
2356 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2357 "Failed to allocate strata estimator\n");
2358 GNUNET_free (set_state);
2366 * Add the element from the given element message to the set.
2368 * @param set_state state of the set want to add to
2369 * @param ee the element to add to the set
2372 union_add (struct SetState *set_state,
2373 struct ElementEntry *ee)
2375 strata_estimator_insert (set_state->se,
2376 get_ibf_key (&ee->element_hash));
2381 * Remove the element given in the element message from the set.
2382 * Only marks the element as removed, so that older set operations can still exchange it.
2384 * @param set_state state of the set to remove from
2385 * @param ee set element to remove
2388 union_remove (struct SetState *set_state,
2389 struct ElementEntry *ee)
2391 strata_estimator_remove (set_state->se,
2392 get_ibf_key (&ee->element_hash));
2397 * Destroy a set that supports the union operation.
2399 * @param set_state the set to destroy
2402 union_set_destroy (struct SetState *set_state)
2404 if (NULL != set_state->se)
2406 strata_estimator_destroy (set_state->se);
2407 set_state->se = NULL;
2409 GNUNET_free (set_state);
2414 * Copy union-specific set state.
2416 * @param state source state for copying the union state
2417 * @return a copy of the union-specific set state
2419 static struct SetState *
2420 union_copy_state (struct SetState *state)
2422 struct SetState *new_state;
2424 GNUNET_assert ((NULL != state) &&
2425 (NULL != state->se));
2426 new_state = GNUNET_new (struct SetState);
2427 new_state->se = strata_estimator_dup (state->se);
2434 * Handle case where channel went down for an operation.
2436 * @param op operation that lost the channel
2439 union_channel_death (struct Operation *op)
2441 send_client_done (op);
2442 _GSS_operation_destroy (op,
2448 * Get the table with implementing functions for
2451 * @return the operation specific VTable
2453 const struct SetVT *
2456 static const struct SetVT union_vt = {
2457 .create = &union_set_create,
2459 .remove = &union_remove,
2460 .destroy_set = &union_set_destroy,
2461 .evaluate = &union_evaluate,
2462 .accept = &union_accept,
2463 .cancel = &union_op_cancel,
2464 .copy_state = &union_copy_state,
2465 .channel_death = &union_channel_death