2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union.h"
32 #include "gnunet-service-set_union_strata_estimator.h"
33 #include "gnunet-service-set_protocol.h"
37 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
41 * Number of IBFs in a strata estimator.
43 #define SE_STRATA_COUNT 32
46 * Size of the IBFs in the strata estimator.
48 #define SE_IBF_SIZE 80
51 * The hash num parameter for the difference digests and strata estimators.
53 #define SE_IBF_HASH_NUM 4
56 * Number of buckets that can be transmitted in one message.
58 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
61 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
62 * Choose this value so that computing the IBF is still cheaper
63 * than transmitting all values.
65 #define MAX_IBF_ORDER (20)
68 * Number of buckets used in the ibf per estimated
75 * Current phase we are in for a union operation.
77 enum UnionOperationPhase
80 * We sent the request message, and expect a strata estimator.
85 * We sent the strata estimator, and expect an IBF. This phase is entered once
86 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
88 * XXX: could use better wording.
89 * XXX: repurposed to also expect a "request full set" message, should be renamed
91 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
96 * Continuation for multi part IBFs.
98 PHASE_EXPECT_IBF_CONT,
101 * We are decoding an IBF.
103 PHASE_INVENTORY_ACTIVE,
106 * The other peer is decoding the IBF we just sent.
108 PHASE_INVENTORY_PASSIVE,
111 * The protocol is almost finished, but we still have to flush our message
112 * queue and/or expect some elements.
114 PHASE_FINISH_CLOSING,
117 * In the penultimate phase,
118 * we wait until all our demands
119 * are satisfied. Then we send a done
120 * message, and wait for another done message.
122 PHASE_FINISH_WAITING,
125 * In the ultimate phase, we wait until
126 * our demands are satisfied and then
127 * quit (sending another DONE message).
132 * After sending the full set, wait for responses with the elements
133 * that the local peer is missing.
140 * State of an evaluate operation with another peer.
142 struct OperationState
145 * Copy of the set's strata estimator at the time of
146 * creation of this operation.
148 struct StrataEstimator *se;
151 * The IBF we currently receive.
153 struct InvertibleBloomFilter *remote_ibf;
156 * The IBF with the local set's element.
158 struct InvertibleBloomFilter *local_ibf;
161 * Maps unsalted IBF-Keys to elements.
162 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
163 * Colliding IBF-Keys are linked.
165 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
168 * Current state of the operation.
170 enum UnionOperationPhase phase;
173 * Did we send the client that we are done?
175 int client_done_sent;
178 * Number of ibf buckets already received into the @a remote_ibf.
180 unsigned int ibf_buckets_received;
183 * Hashes for elements that we have demanded from the other peer.
185 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
188 * Salt that we're using for sending IBFs
193 * Salt for the IBF we've received and that we're currently decoding.
195 uint32_t salt_receive;
198 * Number of elements we received from the other peer
199 * that were not in the local set yet.
201 uint32_t received_fresh;
204 * Total number of elements received from the other peer.
206 uint32_t received_total;
209 * Initial size of our set, just before
210 * the operation started.
212 uint64_t initial_size;
217 * The key entry is used to associate an ibf key with an element.
222 * IBF key for the entry, derived from the current salt.
224 struct IBF_Key ibf_key;
227 * The actual element associated with the key.
229 * Only owned by the union operation if element->operation
232 struct ElementEntry *element;
235 * Did we receive this element?
236 * Even if element->is_foreign is false, we might
237 * have received the element, so this indicates that
238 * the other peer has it.
245 * Used as a closure for sending elements
246 * with a specific IBF key.
248 struct SendElementClosure
251 * The IBF key whose matching elements should be
254 struct IBF_Key ibf_key;
257 * Operation for which the elements
260 struct Operation *op;
265 * Extra state required for efficient set union.
270 * The strata estimator is only generated once for
272 * The IBF keys are derived from the element hashes with
275 struct StrataEstimator *se;
280 * Iterator over hash map entries, called to
281 * destroy the linked list of colliding ibf key entries.
284 * @param key current key code
285 * @param value value in the hash map
286 * @return #GNUNET_YES if we should continue to iterate,
290 destroy_key_to_element_iter (void *cls,
294 struct KeyEntry *k = value;
296 GNUNET_assert (NULL != k);
297 if (GNUNET_YES == k->element->remote)
299 GNUNET_free (k->element);
308 * Destroy the union operation. Only things specific to the union
309 * operation are destroyed.
311 * @param op union operation to destroy
314 union_op_cancel (struct Operation *op)
316 LOG (GNUNET_ERROR_TYPE_DEBUG,
317 "destroying union op\n");
318 /* check if the op was canceled twice */
319 GNUNET_assert (NULL != op->state);
320 if (NULL != op->state->remote_ibf)
322 ibf_destroy (op->state->remote_ibf);
323 op->state->remote_ibf = NULL;
325 if (NULL != op->state->demanded_hashes)
327 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
328 op->state->demanded_hashes = NULL;
330 if (NULL != op->state->local_ibf)
332 ibf_destroy (op->state->local_ibf);
333 op->state->local_ibf = NULL;
335 if (NULL != op->state->se)
337 strata_estimator_destroy (op->state->se);
338 op->state->se = NULL;
340 if (NULL != op->state->key_to_element)
342 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
343 &destroy_key_to_element_iter,
345 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
346 op->state->key_to_element = NULL;
348 GNUNET_free (op->state);
350 LOG (GNUNET_ERROR_TYPE_DEBUG,
351 "destroying union op done\n");
356 * Inform the client that the union operation has failed,
357 * and proceed to destroy the evaluate operation.
359 * @param op the union operation to fail
362 fail_union_operation (struct Operation *op)
364 struct GNUNET_MQ_Envelope *ev;
365 struct GNUNET_SET_ResultMessage *msg;
367 LOG (GNUNET_ERROR_TYPE_ERROR,
368 "union operation failed\n");
369 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371 msg->request_id = htonl (op->spec->client_request_id);
372 msg->element_type = htons (0);
373 GNUNET_MQ_send (op->spec->set->client_mq, ev);
374 _GSS_operation_destroy (op, GNUNET_YES);
379 * Derive the IBF key from a hash code and
382 * @param src the hash code
383 * @return the derived IBF key
385 static struct IBF_Key
386 get_ibf_key (const struct GNUNET_HashCode *src)
391 GNUNET_CRYPTO_kdf (&key, sizeof (key),
393 &salt, sizeof (salt),
400 * Context for #op_get_element_iterator
402 struct GetElementContext
404 struct GNUNET_HashCode hash;
410 * Iterator over the mapping from IBF keys to element entries. Checks if we
411 * have an element with a given GNUNET_HashCode.
414 * @param key current key code
415 * @param value value in the hash map
416 * @return #GNUNET_YES if we should search further,
417 * #GNUNET_NO if we've found the element.
420 op_get_element_iterator (void *cls,
424 struct GetElementContext *ctx = cls;
425 struct KeyEntry *k = value;
427 GNUNET_assert (NULL != k);
428 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
439 * Determine whether the given element is already in the operation's element
442 * @param op operation that should be tested for 'element_hash'
443 * @param element_hash hash of the element to look for
444 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
446 static struct KeyEntry *
447 op_get_element (struct Operation *op,
448 const struct GNUNET_HashCode *element_hash)
451 struct IBF_Key ibf_key;
452 struct GetElementContext ctx = {{{ 0 }} , 0};
454 ctx.hash = *element_hash;
456 ibf_key = get_ibf_key (element_hash);
457 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
458 (uint32_t) ibf_key.key_val,
459 op_get_element_iterator,
462 /* was the iteration aborted because we found the element? */
463 if (GNUNET_SYSERR == ret)
465 GNUNET_assert (NULL != ctx.k);
473 * Insert an element into the union operation's
474 * key-to-element mapping. Takes ownership of 'ee'.
475 * Note that this does not insert the element in the set,
476 * only in the operation's key-element mapping.
477 * This is done to speed up re-tried operations, if some elements
478 * were transmitted, and then the IBF fails to decode.
480 * XXX: clarify ownership, doesn't sound right.
482 * @param op the union operation
483 * @param ee the element entry
484 * @parem received was this element received from the remote peer?
487 op_register_element (struct Operation *op,
488 struct ElementEntry *ee,
491 struct IBF_Key ibf_key;
494 ibf_key = get_ibf_key (&ee->element_hash);
495 k = GNUNET_new (struct KeyEntry);
497 k->ibf_key = ibf_key;
498 k->received = received;
499 GNUNET_assert (GNUNET_OK ==
500 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
501 (uint32_t) ibf_key.key_val,
503 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
508 salt_key (const struct IBF_Key *k_in,
510 struct IBF_Key *k_out)
513 uint64_t x = k_in->key_val;
515 x = (x >> s) | (x << (64 - s));
521 unsalt_key (const struct IBF_Key *k_in,
523 struct IBF_Key *k_out)
526 uint64_t x = k_in->key_val;
527 x = (x << s) | (x >> (64 - s));
533 * Insert a key into an ibf.
537 * @param value the key entry to get the key from
540 prepare_ibf_iterator (void *cls,
544 struct Operation *op = cls;
545 struct KeyEntry *ke = value;
546 struct IBF_Key salted_key;
548 LOG (GNUNET_ERROR_TYPE_DEBUG,
549 "[OP %x] inserting %lx (hash %s) into ibf\n",
551 (unsigned long) ke->ibf_key.key_val,
552 GNUNET_h2s (&ke->element->element_hash));
553 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
554 ibf_insert (op->state->local_ibf, salted_key);
560 * Iterator for initializing the
561 * key-to-element mapping of a union operation
563 * @param cls the union operation `struct Operation *`
565 * @param value the `struct ElementEntry *` to insert
566 * into the key-to-element mapping
567 * @return #GNUNET_YES (to continue iterating)
570 init_key_to_element_iterator (void *cls,
571 const struct GNUNET_HashCode *key,
574 struct Operation *op = cls;
575 struct ElementEntry *ee = value;
577 /* make sure that the element belongs to the set at the time
578 * of creating the operation */
579 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
582 GNUNET_assert (GNUNET_NO == ee->remote);
584 op_register_element (op, ee, GNUNET_NO);
590 * Initialize the IBF key to element mapping local to this set
593 * @param op the set union operation
596 initialize_key_to_element (struct Operation *op)
600 GNUNET_assert (NULL == op->state->key_to_element);
601 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
602 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
603 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
608 * Create an ibf with the operation's elements
609 * of the specified size
611 * @param op the union operation
612 * @param size size of the ibf to create
613 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
616 prepare_ibf (struct Operation *op,
619 GNUNET_assert (NULL != op->state->key_to_element);
621 if (NULL != op->state->local_ibf)
622 ibf_destroy (op->state->local_ibf);
623 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
624 if (NULL == op->state->local_ibf)
626 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
627 "Failed to allocate local IBF\n");
628 return GNUNET_SYSERR;
630 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
631 &prepare_ibf_iterator,
638 * Send an ibf of appropriate size.
640 * Fragments the IBF into multiple messages if necessary.
642 * @param op the union operation
643 * @param ibf_order order of the ibf to send, size=2^order
644 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
647 send_ibf (struct Operation *op,
650 unsigned int buckets_sent = 0;
651 struct InvertibleBloomFilter *ibf;
654 prepare_ibf (op, 1<<ibf_order))
656 /* allocation failed */
657 return GNUNET_SYSERR;
660 LOG (GNUNET_ERROR_TYPE_DEBUG,
661 "sending ibf of size %u\n",
665 char name[64] = { 0 };
666 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
667 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
670 ibf = op->state->local_ibf;
672 while (buckets_sent < (1 << ibf_order))
674 unsigned int buckets_in_message;
675 struct GNUNET_MQ_Envelope *ev;
676 struct IBFMessage *msg;
678 buckets_in_message = (1 << ibf_order) - buckets_sent;
679 /* limit to maximum */
680 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
681 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
683 ev = GNUNET_MQ_msg_extra (msg,
684 buckets_in_message * IBF_BUCKET_SIZE,
685 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
688 msg->order = ibf_order;
689 msg->offset = htonl (buckets_sent);
690 msg->salt = htonl (op->state->salt_send);
691 ibf_write_slice (ibf, buckets_sent,
692 buckets_in_message, &msg[1]);
693 buckets_sent += buckets_in_message;
694 LOG (GNUNET_ERROR_TYPE_DEBUG,
695 "ibf chunk size %u, %u/%u sent\n",
699 GNUNET_MQ_send (op->mq, ev);
702 /* The other peer must decode the IBF, so
704 op->state->phase = PHASE_INVENTORY_PASSIVE;
710 * Send a strata estimator to the remote peer.
712 * @param op the union operation with the remote peer
715 send_strata_estimator (struct Operation *op)
717 const struct StrataEstimator *se = op->state->se;
718 struct GNUNET_MQ_Envelope *ev;
719 struct StrataEstimatorMessage *strata_msg;
724 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
725 len = strata_estimator_write (op->state->se,
727 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
728 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
730 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
731 ev = GNUNET_MQ_msg_extra (strata_msg,
734 GNUNET_memcpy (&strata_msg[1],
738 strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
739 GNUNET_MQ_send (op->mq,
741 op->state->phase = PHASE_EXPECT_IBF;
742 LOG (GNUNET_ERROR_TYPE_DEBUG,
743 "sent SE, expecting IBF\n");
748 * Compute the necessary order of an ibf
749 * from the size of the symmetric set difference.
751 * @param diff the difference
752 * @return the required size of the ibf
755 get_order_from_difference (unsigned int diff)
757 unsigned int ibf_order;
760 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
761 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
763 if (ibf_order > MAX_IBF_ORDER)
764 ibf_order = MAX_IBF_ORDER;
765 // add one for correction
766 return ibf_order + 1;
771 * Send a set element.
773 * @param cls the union operation `struct Operation *`
775 * @param value the `struct ElementEntry *` to insert
776 * into the key-to-element mapping
777 * @return #GNUNET_YES (to continue iterating)
780 send_element_iterator (void *cls,
781 const struct GNUNET_HashCode *key,
784 struct Operation *op = cls;
785 struct GNUNET_SET_ElementMessage *emsg;
786 struct ElementEntry *ee = value;
787 struct GNUNET_SET_Element *el = &ee->element;
788 struct GNUNET_MQ_Envelope *ev;
790 LOG (GNUNET_ERROR_TYPE_INFO,
791 "Sending element %s\n",
793 ev = GNUNET_MQ_msg_extra (emsg,
795 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
796 emsg->element_type = htons (el->element_type);
797 GNUNET_memcpy (&emsg[1],
800 GNUNET_MQ_send (op->mq,
807 send_full_set (struct Operation *op)
809 struct GNUNET_MQ_Envelope *ev;
811 op->state->phase = PHASE_FULL_SENDING;
812 /* FIXME: use a more memory-friendly way of doing this with an
813 iterator, just as we do in the non-full case! */
814 (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
815 &send_element_iterator,
817 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
818 GNUNET_MQ_send (op->mq,
824 * Handle a strata estimator from a remote peer
826 * @param cls the union operation
827 * @param msg the message
830 check_union_p2p_strata_estimator (void *cls,
831 const struct StrataEstimatorMessage *msg)
833 struct Operation *op = cls;
837 if (op->state->phase != PHASE_EXPECT_SE)
840 return GNUNET_SYSERR;
842 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
843 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
844 if ( (GNUNET_NO == is_compressed) &&
845 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
848 return GNUNET_SYSERR;
855 * Handle a strata estimator from a remote peer
857 * @param cls the union operation
858 * @param msg the message
861 handle_union_p2p_strata_estimator (void *cls,
862 const struct StrataEstimatorMessage *msg)
864 struct Operation *op = cls;
865 struct StrataEstimator *remote_se;
871 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
872 GNUNET_STATISTICS_update (_GSS_statistics,
873 "# bytes of SE received",
874 ntohs (msg->header.size),
876 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
877 other_size = GNUNET_ntohll (msg->set_size);
878 remote_se = strata_estimator_create (SE_STRATA_COUNT,
881 if (NULL == remote_se)
883 /* insufficient resources, fail */
884 fail_union_operation (op);
888 strata_estimator_read (&msg[1],
893 /* decompression failed */
894 strata_estimator_destroy (remote_se);
895 fail_union_operation (op);
898 GNUNET_assert (NULL != op->state->se);
899 diff = strata_estimator_difference (remote_se,
905 strata_estimator_destroy (remote_se);
906 strata_estimator_destroy (op->state->se);
907 op->state->se = NULL;
908 LOG (GNUNET_ERROR_TYPE_DEBUG,
909 "got se diff=%d, using ibf size %d\n",
911 1U << get_order_from_difference (diff));
916 set_debug = getenv ("GNUNET_SET_BENCHMARK");
917 if ( (NULL != set_debug) &&
918 (0 == strcmp (set_debug, "1")) )
920 FILE *f = fopen ("set.log", "a");
921 fprintf (f, "%llu\n", (unsigned long long) diff);
926 if ( (GNUNET_YES == op->spec->byzantine) &&
927 (other_size < op->spec->byzantine_lower_bound) )
930 fail_union_operation (op);
934 if ( (GNUNET_YES == op->spec->force_full) ||
935 (diff > op->state->initial_size / 4) ||
938 LOG (GNUNET_ERROR_TYPE_INFO,
939 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
941 op->state->initial_size);
942 GNUNET_STATISTICS_update (_GSS_statistics,
946 if ( (op->state->initial_size <= other_size) ||
953 struct GNUNET_MQ_Envelope *ev;
955 LOG (GNUNET_ERROR_TYPE_INFO,
956 "Telling other peer that we expect its full set\n");
957 op->state->phase = PHASE_EXPECT_IBF;
958 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
959 GNUNET_MQ_send (op->mq,
965 GNUNET_STATISTICS_update (_GSS_statistics,
971 get_order_from_difference (diff)))
973 /* Internal error, best we can do is shut the connection */
974 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
975 "Failed to send IBF, closing connection\n");
976 fail_union_operation (op);
980 GNUNET_CADET_receive_done (op->channel);
985 * Iterator to send elements to a remote peer
987 * @param cls closure with the element key and the union operation
989 * @param value the key entry
992 send_offers_iterator (void *cls,
996 struct SendElementClosure *sec = cls;
997 struct Operation *op = sec->op;
998 struct KeyEntry *ke = value;
999 struct GNUNET_MQ_Envelope *ev;
1000 struct GNUNET_MessageHeader *mh;
1002 /* Detect 32-bit key collision for the 64-bit IBF keys. */
1003 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
1006 ev = GNUNET_MQ_msg_header_extra (mh,
1007 sizeof (struct GNUNET_HashCode),
1008 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
1010 GNUNET_assert (NULL != ev);
1011 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1012 LOG (GNUNET_ERROR_TYPE_DEBUG,
1013 "[OP %x] sending element offer (%s) to peer\n",
1015 GNUNET_h2s (&ke->element->element_hash));
1016 GNUNET_MQ_send (op->mq, ev);
1022 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1024 * @param op union operation
1025 * @param ibf_key IBF key of interest
1028 send_offers_for_key (struct Operation *op,
1029 struct IBF_Key ibf_key)
1031 struct SendElementClosure send_cls;
1033 send_cls.ibf_key = ibf_key;
1035 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
1036 (uint32_t) ibf_key.key_val,
1037 &send_offers_iterator,
1043 * Decode which elements are missing on each side, and
1044 * send the appropriate offers and inquiries.
1046 * @param op union operation
1047 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1050 decode_and_send (struct Operation *op)
1053 struct IBF_Key last_key;
1055 unsigned int num_decoded;
1056 struct InvertibleBloomFilter *diff_ibf;
1058 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1061 prepare_ibf (op, op->state->remote_ibf->size))
1064 /* allocation failed */
1065 return GNUNET_SYSERR;
1067 diff_ibf = ibf_dup (op->state->local_ibf);
1068 ibf_subtract (diff_ibf, op->state->remote_ibf);
1070 ibf_destroy (op->state->remote_ibf);
1071 op->state->remote_ibf = NULL;
1073 LOG (GNUNET_ERROR_TYPE_DEBUG,
1074 "decoding IBF (size=%u)\n",
1078 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1083 int cycle_detected = GNUNET_NO;
1087 res = ibf_decode (diff_ibf, &side, &key);
1088 if (res == GNUNET_OK)
1090 LOG (GNUNET_ERROR_TYPE_DEBUG,
1091 "decoded ibf key %lx\n",
1092 (unsigned long) key.key_val);
1094 if ( (num_decoded > diff_ibf->size) ||
1095 ( (num_decoded > 1) &&
1096 (last_key.key_val == key.key_val) ) )
1098 LOG (GNUNET_ERROR_TYPE_DEBUG,
1099 "detected cyclic ibf (decoded %u/%u)\n",
1102 cycle_detected = GNUNET_YES;
1105 if ( (GNUNET_SYSERR == res) ||
1106 (GNUNET_YES == cycle_detected) )
1110 while (1<<next_order < diff_ibf->size)
1113 if (next_order <= MAX_IBF_ORDER)
1115 LOG (GNUNET_ERROR_TYPE_DEBUG,
1116 "decoding failed, sending larger ibf (size %u)\n",
1118 GNUNET_STATISTICS_update (_GSS_statistics,
1122 op->state->salt_send++;
1124 send_ibf (op, next_order))
1126 /* Internal error, best we can do is shut the connection */
1127 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1128 "Failed to send IBF, closing connection\n");
1129 fail_union_operation (op);
1130 ibf_destroy (diff_ibf);
1131 return GNUNET_SYSERR;
1136 GNUNET_STATISTICS_update (_GSS_statistics,
1137 "# of failed union operations (too large)",
1140 // XXX: Send the whole set, element-by-element
1141 LOG (GNUNET_ERROR_TYPE_ERROR,
1142 "set union failed: reached ibf limit\n");
1143 fail_union_operation (op);
1144 ibf_destroy (diff_ibf);
1145 return GNUNET_SYSERR;
1149 if (GNUNET_NO == res)
1151 struct GNUNET_MQ_Envelope *ev;
1153 LOG (GNUNET_ERROR_TYPE_DEBUG,
1154 "transmitted all values, sending DONE\n");
1155 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1156 GNUNET_MQ_send (op->mq, ev);
1157 /* We now wait until we get a DONE message back
1158 * and then wait for our MQ to be flushed and all our
1159 * demands be delivered. */
1164 struct IBF_Key unsalted_key;
1165 unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1166 send_offers_for_key (op, unsalted_key);
1168 else if (-1 == side)
1170 struct GNUNET_MQ_Envelope *ev;
1171 struct InquiryMessage *msg;
1173 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1174 * the effort additional complexity. */
1175 ev = GNUNET_MQ_msg_extra (msg,
1176 sizeof (struct IBF_Key),
1177 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1178 msg->salt = htonl (op->state->salt_receive);
1179 GNUNET_memcpy (&msg[1],
1181 sizeof (struct IBF_Key));
1182 LOG (GNUNET_ERROR_TYPE_DEBUG,
1183 "sending element inquiry for IBF key %lx\n",
1184 (unsigned long) key.key_val);
1185 GNUNET_MQ_send (op->mq, ev);
1192 ibf_destroy (diff_ibf);
1198 * Check an IBF message from a remote peer.
1200 * Reassemble the IBF from multiple pieces, and
1201 * process the whole IBF once possible.
1203 * @param cls the union operation
1204 * @param msg the header of the message
1205 * @return #GNUNET_OK if @a msg is well-formed
1208 check_union_p2p_ibf (void *cls,
1209 const struct IBFMessage *msg)
1211 struct Operation *op = cls;
1212 unsigned int buckets_in_message;
1214 if (GNUNET_SET_OPERATION_UNION != op->operation)
1216 GNUNET_break_op (0);
1217 return GNUNET_SYSERR;
1219 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1220 if (0 == buckets_in_message)
1222 GNUNET_break_op (0);
1223 return GNUNET_SYSERR;
1225 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1227 GNUNET_break_op (0);
1228 return GNUNET_SYSERR;
1230 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1232 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1234 GNUNET_break_op (0);
1235 return GNUNET_SYSERR;
1237 if (1<<msg->order != op->state->remote_ibf->size)
1239 GNUNET_break_op (0);
1240 return GNUNET_SYSERR;
1242 if (ntohl (msg->salt) != op->state->salt_receive)
1244 GNUNET_break_op (0);
1245 return GNUNET_SYSERR;
1248 else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1249 (op->state->phase != PHASE_EXPECT_IBF) )
1251 GNUNET_break_op (0);
1252 return GNUNET_SYSERR;
1260 * Handle an IBF message from a remote peer.
1262 * Reassemble the IBF from multiple pieces, and
1263 * process the whole IBF once possible.
1265 * @param cls the union operation
1266 * @param msg the header of the message
1269 handle_union_p2p_ibf (void *cls,
1270 const struct IBFMessage *msg)
1272 struct Operation *op = cls;
1273 unsigned int buckets_in_message;
1275 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1276 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1277 (op->state->phase == PHASE_EXPECT_IBF) )
1279 op->state->phase = PHASE_EXPECT_IBF_CONT;
1280 GNUNET_assert (NULL == op->state->remote_ibf);
1281 LOG (GNUNET_ERROR_TYPE_DEBUG,
1282 "Creating new ibf of size %u\n",
1284 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1285 op->state->salt_receive = ntohl (msg->salt);
1286 LOG (GNUNET_ERROR_TYPE_DEBUG,
1287 "Receiving new IBF with salt %u\n",
1288 op->state->salt_receive);
1289 if (NULL == op->state->remote_ibf)
1291 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1292 "Failed to parse remote IBF, closing connection\n");
1293 fail_union_operation (op);
1296 op->state->ibf_buckets_received = 0;
1297 if (0 != ntohl (msg->offset))
1299 GNUNET_break_op (0);
1300 fail_union_operation (op);
1306 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1308 GNUNET_assert (NULL != op->state->remote_ibf);
1310 ibf_read_slice (&msg[1],
1311 op->state->ibf_buckets_received,
1313 op->state->remote_ibf);
1314 op->state->ibf_buckets_received += buckets_in_message;
1316 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1318 LOG (GNUNET_ERROR_TYPE_DEBUG,
1319 "received full ibf\n");
1320 op->state->phase = PHASE_INVENTORY_ACTIVE;
1322 decode_and_send (op))
1324 /* Internal error, best we can do is shut down */
1325 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1326 "Failed to decode IBF, closing connection\n");
1327 fail_union_operation (op);
1331 GNUNET_CADET_receive_done (op->channel);
1336 * Send a result message to the client indicating
1337 * that there is a new element.
1339 * @param op union operation
1340 * @param element element to send
1341 * @param status status to send with the new element
1344 send_client_element (struct Operation *op,
1345 struct GNUNET_SET_Element *element,
1348 struct GNUNET_MQ_Envelope *ev;
1349 struct GNUNET_SET_ResultMessage *rm;
1351 LOG (GNUNET_ERROR_TYPE_DEBUG,
1352 "sending element (size %u) to client\n",
1354 GNUNET_assert (0 != op->spec->client_request_id);
1355 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1358 GNUNET_MQ_discard (ev);
1362 rm->result_status = htons (status);
1363 rm->request_id = htonl (op->spec->client_request_id);
1364 rm->element_type = htons (element->element_type);
1365 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1366 GNUNET_memcpy (&rm[1], element->data, element->size);
1367 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1372 * Signal to the client that the operation has finished and
1373 * destroy the operation.
1375 * @param cls operation to destroy
1378 send_done_and_destroy (void *cls)
1380 struct Operation *op = cls;
1381 struct GNUNET_MQ_Envelope *ev;
1382 struct GNUNET_SET_ResultMessage *rm;
1384 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1385 rm->request_id = htonl (op->spec->client_request_id);
1386 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1387 rm->element_type = htons (0);
1388 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1389 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1390 /* Will also call the union-specific cancel function. */
1391 _GSS_operation_destroy (op, GNUNET_YES);
1396 * Tests if the operation is finished, and if so notify.
1398 * @param op operation to check
1401 maybe_finish (struct Operation *op)
1403 unsigned int num_demanded;
1405 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1407 if (PHASE_FINISH_WAITING == op->state->phase)
1409 LOG (GNUNET_ERROR_TYPE_DEBUG,
1410 "In PHASE_FINISH_WAITING, pending %u demands\n",
1412 if (0 == num_demanded)
1414 struct GNUNET_MQ_Envelope *ev;
1416 op->state->phase = PHASE_DONE;
1417 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1418 GNUNET_MQ_send (op->mq, ev);
1420 /* We now wait until the other peer closes the channel
1421 * after it got all elements from us. */
1424 if (PHASE_FINISH_CLOSING == op->state->phase)
1426 LOG (GNUNET_ERROR_TYPE_DEBUG,
1427 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1429 if (0 == num_demanded)
1431 op->state->phase = PHASE_DONE;
1432 send_done_and_destroy (op);
1439 * Check an element message from a remote peer.
1441 * @param cls the union operation
1442 * @param emsg the message
1445 check_union_p2p_elements (void *cls,
1446 const struct GNUNET_SET_ElementMessage *emsg)
1448 struct Operation *op = cls;
1450 if (GNUNET_SET_OPERATION_UNION != op->operation)
1452 GNUNET_break_op (0);
1453 return GNUNET_SYSERR;
1455 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1457 GNUNET_break_op (0);
1458 return GNUNET_SYSERR;
1465 * Handle an element message from a remote peer.
1466 * Sent by the other peer either because we decoded an IBF and placed a demand,
1467 * or because the other peer switched to full set transmission.
1469 * @param cls the union operation
1470 * @param emsg the message
1473 handle_union_p2p_elements (void *cls,
1474 const struct GNUNET_SET_ElementMessage *emsg)
1476 struct Operation *op = cls;
1477 struct ElementEntry *ee;
1478 struct KeyEntry *ke;
1479 uint16_t element_size;
1481 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1482 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1483 GNUNET_memcpy (&ee[1],
1486 ee->element.size = element_size;
1487 ee->element.data = &ee[1];
1488 ee->element.element_type = ntohs (emsg->element_type);
1489 ee->remote = GNUNET_YES;
1490 GNUNET_SET_element_hash (&ee->element,
1493 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1497 /* We got something we didn't demand, since it's not in our map. */
1498 GNUNET_break_op (0);
1499 fail_union_operation (op);
1503 LOG (GNUNET_ERROR_TYPE_DEBUG,
1504 "Got element (size %u, hash %s) from peer\n",
1505 (unsigned int) element_size,
1506 GNUNET_h2s (&ee->element_hash));
1508 GNUNET_STATISTICS_update (_GSS_statistics,
1509 "# received elements",
1512 GNUNET_STATISTICS_update (_GSS_statistics,
1513 "# exchanged elements",
1517 op->state->received_total++;
1519 ke = op_get_element (op, &ee->element_hash);
1522 /* Got repeated element. Should not happen since
1523 * we track demands. */
1524 GNUNET_STATISTICS_update (_GSS_statistics,
1525 "# repeated elements",
1528 ke->received = GNUNET_YES;
1533 LOG (GNUNET_ERROR_TYPE_DEBUG,
1534 "Registering new element from remote peer\n");
1535 op->state->received_fresh++;
1536 op_register_element (op, ee, GNUNET_YES);
1537 /* only send results immediately if the client wants it */
1538 switch (op->spec->result_mode)
1540 case GNUNET_SET_RESULT_ADDED:
1541 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1543 case GNUNET_SET_RESULT_SYMMETRIC:
1544 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1547 /* Result mode not supported, should have been caught earlier. */
1553 if ( (op->state->received_total > 8) &&
1554 (op->state->received_fresh < op->state->received_total / 3) )
1556 /* The other peer gave us lots of old elements, there's something wrong. */
1557 GNUNET_break_op (0);
1558 fail_union_operation (op);
1561 GNUNET_CADET_receive_done (op->channel);
1567 * Check a full element message from a remote peer.
1569 * @param cls the union operation
1570 * @param emsg the message
1573 check_union_p2p_full_element (void *cls,
1574 const struct GNUNET_SET_ElementMessage *emsg)
1576 struct Operation *op = cls;
1578 if (GNUNET_SET_OPERATION_UNION != op->operation)
1580 GNUNET_break_op (0);
1581 return GNUNET_SYSERR;
1583 // FIXME: check that we expect full elements here?
1589 * Handle an element message from a remote peer.
1591 * @param cls the union operation
1592 * @param emsg the message
1595 handle_union_p2p_full_element (void *cls,
1596 const struct GNUNET_SET_ElementMessage *emsg)
1598 struct Operation *op = cls;
1599 struct ElementEntry *ee;
1600 struct KeyEntry *ke;
1601 uint16_t element_size;
1603 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1604 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1605 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1606 ee->element.size = element_size;
1607 ee->element.data = &ee[1];
1608 ee->element.element_type = ntohs (emsg->element_type);
1609 ee->remote = GNUNET_YES;
1610 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1612 LOG (GNUNET_ERROR_TYPE_DEBUG,
1613 "Got element (full diff, size %u, hash %s) from peer\n",
1614 (unsigned int) element_size,
1615 GNUNET_h2s (&ee->element_hash));
1617 GNUNET_STATISTICS_update (_GSS_statistics,
1618 "# received elements",
1621 GNUNET_STATISTICS_update (_GSS_statistics,
1622 "# exchanged elements",
1626 op->state->received_total++;
1628 ke = op_get_element (op, &ee->element_hash);
1631 /* Got repeated element. Should not happen since
1632 * we track demands. */
1633 GNUNET_STATISTICS_update (_GSS_statistics,
1634 "# repeated elements",
1637 ke->received = GNUNET_YES;
1642 LOG (GNUNET_ERROR_TYPE_DEBUG,
1643 "Registering new element from remote peer\n");
1644 op->state->received_fresh++;
1645 op_register_element (op, ee, GNUNET_YES);
1646 /* only send results immediately if the client wants it */
1647 switch (op->spec->result_mode)
1649 case GNUNET_SET_RESULT_ADDED:
1650 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1652 case GNUNET_SET_RESULT_SYMMETRIC:
1653 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1656 /* Result mode not supported, should have been caught earlier. */
1662 if ( (GNUNET_YES == op->spec->byzantine) &&
1663 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1664 (op->state->received_fresh < op->state->received_total / 6) )
1666 /* The other peer gave us lots of old elements, there's something wrong. */
1667 LOG (GNUNET_ERROR_TYPE_ERROR,
1668 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1669 (unsigned long long) op->state->received_fresh,
1670 (unsigned long long) op->state->received_total);
1671 GNUNET_break_op (0);
1672 fail_union_operation (op);
1675 GNUNET_CADET_receive_done (op->channel);
1680 * Send offers (for GNUNET_Hash-es) in response
1681 * to inquiries (for IBF_Key-s).
1683 * @param cls the union operation
1684 * @param msg the message
1687 check_union_p2p_inquiry (void *cls,
1688 const struct InquiryMessage *msg)
1690 struct Operation *op = cls;
1691 unsigned int num_keys;
1693 if (GNUNET_SET_OPERATION_UNION != op->operation)
1695 GNUNET_break_op (0);
1696 return GNUNET_SYSERR;
1698 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1700 GNUNET_break_op (0);
1701 return GNUNET_SYSERR;
1703 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1704 / sizeof (struct IBF_Key);
1705 if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1706 != num_keys * sizeof (struct IBF_Key))
1708 GNUNET_break_op (0);
1709 return GNUNET_SYSERR;
1716 * Send offers (for GNUNET_Hash-es) in response
1717 * to inquiries (for IBF_Key-s).
1719 * @param cls the union operation
1720 * @param msg the message
1723 handle_union_p2p_inquiry (void *cls,
1724 const struct InquiryMessage *msg)
1726 struct Operation *op = cls;
1727 const struct IBF_Key *ibf_key;
1728 unsigned int num_keys;
1730 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1731 / sizeof (struct IBF_Key);
1732 ibf_key = (const struct IBF_Key *) &msg[1];
1733 while (0 != num_keys--)
1735 struct IBF_Key unsalted_key;
1737 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1738 send_offers_for_key (op, unsalted_key);
1741 GNUNET_CADET_receive_done (op->channel);
1746 * Iterator over hash map entries, called to
1747 * destroy the linked list of colliding ibf key entries.
1749 * @param cls closure
1750 * @param key current key code
1751 * @param value value in the hash map
1752 * @return #GNUNET_YES if we should continue to iterate,
1753 * #GNUNET_NO if not.
1756 send_missing_elements_iter (void *cls,
1760 struct Operation *op = cls;
1761 struct KeyEntry *ke = value;
1762 struct GNUNET_MQ_Envelope *ev;
1763 struct GNUNET_SET_ElementMessage *emsg;
1764 struct ElementEntry *ee = ke->element;
1766 if (GNUNET_YES == ke->received)
1769 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1770 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1771 emsg->reserved = htons (0);
1772 emsg->element_type = htons (ee->element.element_type);
1773 GNUNET_MQ_send (op->mq, ev);
1780 * Handle a request for full set transmission.
1782 * @parem cls closure, a set union operation
1783 * @param mh the demand message
1786 handle_union_p2p_request_full (void *cls,
1787 const struct GNUNET_MessageHeader *mh)
1789 struct Operation *op = cls;
1791 LOG (GNUNET_ERROR_TYPE_INFO,
1792 "Received request for full set transmission\n");
1793 if (GNUNET_SET_OPERATION_UNION != op->operation)
1795 GNUNET_break_op (0);
1796 fail_union_operation (op);
1799 if (PHASE_EXPECT_IBF != op->state->phase)
1801 GNUNET_break_op (0);
1802 fail_union_operation (op);
1806 // FIXME: we need to check that our set is larger than the
1807 // byzantine_lower_bound by some threshold
1809 GNUNET_CADET_receive_done (op->channel);
1814 * Handle a "full done" message.
1816 * @parem cls closure, a set union operation
1817 * @param mh the demand message
1820 handle_union_p2p_full_done (void *cls,
1821 const struct GNUNET_MessageHeader *mh)
1823 struct Operation *op = cls;
1825 switch (op->state->phase)
1827 case PHASE_EXPECT_IBF:
1829 struct GNUNET_MQ_Envelope *ev;
1831 LOG (GNUNET_ERROR_TYPE_DEBUG,
1832 "got FULL DONE, sending elements that other peer is missing\n");
1834 /* send all the elements that did not come from the remote peer */
1835 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1836 &send_missing_elements_iter,
1839 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1840 GNUNET_MQ_send (op->mq, ev);
1841 op->state->phase = PHASE_DONE;
1842 /* we now wait until the other peer shuts the tunnel down*/
1845 case PHASE_FULL_SENDING:
1847 LOG (GNUNET_ERROR_TYPE_DEBUG,
1848 "got FULL DONE, finishing\n");
1849 /* We sent the full set, and got the response for that. We're done. */
1850 op->state->phase = PHASE_DONE;
1851 GNUNET_CADET_receive_done (op->channel);
1852 send_done_and_destroy (op);
1857 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1858 "Handle full done phase is %u\n",
1859 (unsigned) op->state->phase);
1860 GNUNET_break_op (0);
1861 fail_union_operation (op);
1864 GNUNET_CADET_receive_done (op->channel);
1869 * Check a demand by the other peer for elements based on a list
1870 * of `struct GNUNET_HashCode`s.
1872 * @parem cls closure, a set union operation
1873 * @param mh the demand message
1874 * @return #GNUNET_OK if @a mh is well-formed
1877 check_union_p2p_demand (void *cls,
1878 const struct GNUNET_MessageHeader *mh)
1880 struct Operation *op = cls;
1881 unsigned int num_hashes;
1883 if (GNUNET_SET_OPERATION_UNION != op->operation)
1885 GNUNET_break_op (0);
1886 return GNUNET_SYSERR;
1888 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1889 / sizeof (struct GNUNET_HashCode);
1890 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1891 != num_hashes * sizeof (struct GNUNET_HashCode))
1893 GNUNET_break_op (0);
1894 return GNUNET_SYSERR;
1901 * Handle a demand by the other peer for elements based on a list
1902 * of `struct GNUNET_HashCode`s.
1904 * @parem cls closure, a set union operation
1905 * @param mh the demand message
1908 handle_union_p2p_demand (void *cls,
1909 const struct GNUNET_MessageHeader *mh)
1911 struct Operation *op = cls;
1912 struct ElementEntry *ee;
1913 struct GNUNET_SET_ElementMessage *emsg;
1914 const struct GNUNET_HashCode *hash;
1915 unsigned int num_hashes;
1916 struct GNUNET_MQ_Envelope *ev;
1918 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1919 / sizeof (struct GNUNET_HashCode);
1920 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1922 hash++, num_hashes--)
1924 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1928 /* Demand for non-existing element. */
1929 GNUNET_break_op (0);
1930 fail_union_operation (op);
1933 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1935 /* Probably confused lazily copied sets. */
1936 GNUNET_break_op (0);
1937 fail_union_operation (op);
1940 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1941 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1942 emsg->reserved = htons (0);
1943 emsg->element_type = htons (ee->element.element_type);
1944 LOG (GNUNET_ERROR_TYPE_DEBUG,
1945 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1947 (unsigned int) ee->element.size,
1948 GNUNET_h2s (&ee->element_hash));
1949 GNUNET_MQ_send (op->mq, ev);
1950 GNUNET_STATISTICS_update (_GSS_statistics,
1951 "# exchanged elements",
1955 switch (op->spec->result_mode)
1957 case GNUNET_SET_RESULT_ADDED:
1958 /* Nothing to do. */
1960 case GNUNET_SET_RESULT_SYMMETRIC:
1961 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1964 /* Result mode not supported, should have been caught earlier. */
1969 GNUNET_CADET_receive_done (op->channel);
1974 * Check offer (of `struct GNUNET_HashCode`s).
1976 * @param cls the union operation
1977 * @param mh the message
1978 * @return #GNUNET_OK if @a mh is well-formed
1981 check_union_p2p_offer (void *cls,
1982 const struct GNUNET_MessageHeader *mh)
1984 struct Operation *op = cls;
1985 unsigned int num_hashes;
1987 if (GNUNET_SET_OPERATION_UNION != op->operation)
1989 GNUNET_break_op (0);
1990 return GNUNET_SYSERR;
1992 /* look up elements and send them */
1993 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1994 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1996 GNUNET_break_op (0);
1997 return GNUNET_SYSERR;
1999 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2000 / sizeof (struct GNUNET_HashCode);
2001 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2002 != num_hashes * sizeof (struct GNUNET_HashCode))
2004 GNUNET_break_op (0);
2005 return GNUNET_SYSERR;
2012 * Handle offers (of `struct GNUNET_HashCode`s) and
2013 * respond with demands (of `struct GNUNET_HashCode`s).
2015 * @param cls the union operation
2016 * @param mh the message
2019 handle_union_p2p_offer (void *cls,
2020 const struct GNUNET_MessageHeader *mh)
2022 struct Operation *op = cls;
2023 const struct GNUNET_HashCode *hash;
2024 unsigned int num_hashes;
2026 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2027 / sizeof (struct GNUNET_HashCode);
2028 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2030 hash++, num_hashes--)
2032 struct ElementEntry *ee;
2033 struct GNUNET_MessageHeader *demands;
2034 struct GNUNET_MQ_Envelope *ev;
2036 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
2039 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2043 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2046 LOG (GNUNET_ERROR_TYPE_DEBUG,
2047 "Skipped sending duplicate demand\n");
2051 GNUNET_assert (GNUNET_OK ==
2052 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
2055 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2057 LOG (GNUNET_ERROR_TYPE_DEBUG,
2058 "[OP %x] Requesting element (hash %s)\n",
2059 (void *) op, GNUNET_h2s (hash));
2060 ev = GNUNET_MQ_msg_header_extra (demands,
2061 sizeof (struct GNUNET_HashCode),
2062 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2063 *(struct GNUNET_HashCode *) &demands[1] = *hash;
2064 GNUNET_MQ_send (op->mq, ev);
2066 GNUNET_CADET_receive_done (op->channel);
2071 * Handle a done message from a remote peer
2073 * @param cls the union operation
2074 * @param mh the message
2077 handle_union_p2p_done (void *cls,
2078 const struct GNUNET_MessageHeader *mh)
2080 struct Operation *op = cls;
2082 if (GNUNET_SET_OPERATION_UNION != op->operation)
2084 GNUNET_break_op (0);
2085 fail_union_operation (op);
2088 switch (op->state->phase)
2090 case PHASE_INVENTORY_PASSIVE:
2091 /* We got all requests, but still have to send our elements in response. */
2092 op->state->phase = PHASE_FINISH_WAITING;
2094 LOG (GNUNET_ERROR_TYPE_DEBUG,
2095 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2096 /* The active peer is done sending offers
2097 * and inquiries. This means that all
2098 * our responses to that (demands and offers)
2099 * must be in flight (queued or in mesh).
2101 * We should notify the active peer once
2102 * all our demands are satisfied, so that the active
2103 * peer can quit if we gave him everything.
2105 GNUNET_CADET_receive_done (op->channel);
2108 case PHASE_INVENTORY_ACTIVE:
2109 LOG (GNUNET_ERROR_TYPE_DEBUG,
2110 "got DONE (as active partner), waiting to finish\n");
2111 /* All demands of the other peer are satisfied,
2112 * and we processed all offers, thus we know
2113 * exactly what our demands must be.
2115 * We'll close the channel
2116 * to the other peer once our demands are met.
2118 op->state->phase = PHASE_FINISH_CLOSING;
2119 GNUNET_CADET_receive_done (op->channel);
2123 GNUNET_break_op (0);
2124 fail_union_operation (op);
2131 * Initiate operation to evaluate a set union with a remote peer.
2133 * @param op operation to perform (to be initialized)
2134 * @param opaque_context message to be transmitted to the listener
2135 * to convince him to accept, may be NULL
2138 union_evaluate (struct Operation *op,
2139 const struct GNUNET_MessageHeader *opaque_context)
2141 struct GNUNET_MQ_Envelope *ev;
2142 struct OperationRequestMessage *msg;
2144 GNUNET_assert (NULL == op->state);
2145 op->state = GNUNET_new (struct OperationState);
2146 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2147 /* copy the current generation's strata estimator for this operation */
2148 op->state->se = strata_estimator_dup (op->spec->set->state->se);
2149 /* we started the operation, thus we have to send the operation request */
2150 op->state->phase = PHASE_EXPECT_SE;
2151 op->state->salt_receive = op->state->salt_send = 42;
2152 LOG (GNUNET_ERROR_TYPE_DEBUG,
2153 "Initiating union operation evaluation\n");
2154 GNUNET_STATISTICS_update (_GSS_statistics,
2155 "# of total union operations",
2158 GNUNET_STATISTICS_update (_GSS_statistics,
2159 "# of initiated union operations",
2162 ev = GNUNET_MQ_msg_nested_mh (msg,
2163 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2167 /* the context message is too large */
2169 GNUNET_SERVICE_client_drop (op->spec->set->client);
2172 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2173 GNUNET_MQ_send (op->mq,
2176 if (NULL != opaque_context)
2177 LOG (GNUNET_ERROR_TYPE_DEBUG,
2178 "sent op request with context message\n");
2180 LOG (GNUNET_ERROR_TYPE_DEBUG,
2181 "sent op request without context message\n");
2183 initialize_key_to_element (op);
2184 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2189 * Accept an union operation request from a remote peer.
2190 * Only initializes the private operation state.
2192 * @param op operation that will be accepted as a union operation
2195 union_accept (struct Operation *op)
2197 LOG (GNUNET_ERROR_TYPE_DEBUG,
2198 "accepting set union operation\n");
2199 GNUNET_assert (NULL == op->state);
2201 GNUNET_STATISTICS_update (_GSS_statistics,
2202 "# of accepted union operations",
2205 GNUNET_STATISTICS_update (_GSS_statistics,
2206 "# of total union operations",
2210 op->state = GNUNET_new (struct OperationState);
2211 op->state->se = strata_estimator_dup (op->spec->set->state->se);
2212 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2213 op->state->salt_receive = op->state->salt_send = 42;
2214 initialize_key_to_element (op);
2215 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2216 /* kick off the operation */
2217 send_strata_estimator (op);
2222 * Create a new set supporting the union operation
2224 * We maintain one strata estimator per set and then manipulate it over the
2225 * lifetime of the set, as recreating a strata estimator would be expensive.
2227 * @return the newly created set, NULL on error
2229 static struct SetState *
2230 union_set_create (void)
2232 struct SetState *set_state;
2234 LOG (GNUNET_ERROR_TYPE_DEBUG,
2235 "union set created\n");
2236 set_state = GNUNET_new (struct SetState);
2237 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2238 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2239 if (NULL == set_state->se)
2241 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2242 "Failed to allocate strata estimator\n");
2243 GNUNET_free (set_state);
2251 * Add the element from the given element message to the set.
2253 * @param set_state state of the set want to add to
2254 * @param ee the element to add to the set
2257 union_add (struct SetState *set_state, struct ElementEntry *ee)
2259 strata_estimator_insert (set_state->se,
2260 get_ibf_key (&ee->element_hash));
2265 * Remove the element given in the element message from the set.
2266 * Only marks the element as removed, so that older set operations can still exchange it.
2268 * @param set_state state of the set to remove from
2269 * @param ee set element to remove
2272 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2274 strata_estimator_remove (set_state->se,
2275 get_ibf_key (&ee->element_hash));
2280 * Destroy a set that supports the union operation.
2282 * @param set_state the set to destroy
2285 union_set_destroy (struct SetState *set_state)
2287 if (NULL != set_state->se)
2289 strata_estimator_destroy (set_state->se);
2290 set_state->se = NULL;
2292 GNUNET_free (set_state);
2297 * Handler for peer-disconnects, notifies the client
2298 * about the aborted operation in case the op was not concluded.
2300 * @param op the destroyed operation
2303 union_peer_disconnect (struct Operation *op)
2305 if (PHASE_DONE != op->state->phase)
2307 struct GNUNET_MQ_Envelope *ev;
2308 struct GNUNET_SET_ResultMessage *msg;
2310 ev = GNUNET_MQ_msg (msg,
2311 GNUNET_MESSAGE_TYPE_SET_RESULT);
2312 msg->request_id = htonl (op->spec->client_request_id);
2313 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2314 msg->element_type = htons (0);
2315 GNUNET_MQ_send (op->spec->set->client_mq,
2317 LOG (GNUNET_ERROR_TYPE_WARNING,
2318 "other peer disconnected prematurely, phase %u\n",
2320 _GSS_operation_destroy (op,
2324 // else: the session has already been concluded
2325 LOG (GNUNET_ERROR_TYPE_DEBUG,
2326 "other peer disconnected (finished)\n");
2327 if (GNUNET_NO == op->state->client_done_sent)
2328 send_done_and_destroy (op);
2333 * Copy union-specific set state.
2335 * @param set source set for copying the union state
2336 * @return a copy of the union-specific set state
2338 static struct SetState *
2339 union_copy_state (struct Set *set)
2341 struct SetState *new_state;
2343 new_state = GNUNET_new (struct SetState);
2344 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2345 new_state->se = strata_estimator_dup (set->state->se);
2352 * Get the table with implementing functions for
2355 * @return the operation specific VTable
2357 const struct SetVT *
2360 static const struct SetVT union_vt = {
2361 .create = &union_set_create,
2363 .remove = &union_remove,
2364 .destroy_set = &union_set_destroy,
2365 .evaluate = &union_evaluate,
2366 .accept = &union_accept,
2367 .peer_disconnect = &union_peer_disconnect,
2368 .cancel = &union_op_cancel,
2369 .copy_state = &union_copy_state,