2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 * @file set/gnunet-service-set_union.c
20 * @brief two-peer set operations
21 * @author Florian Dold
22 * @author Christian Grothoff
25 #include "gnunet_util_lib.h"
26 #include "gnunet_statistics_service.h"
27 #include "gnunet-service-set.h"
29 #include "gnunet-service-set_union.h"
30 #include "gnunet-service-set_union_strata_estimator.h"
31 #include "gnunet-service-set_protocol.h"
35 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
39 * Number of IBFs in a strata estimator.
41 #define SE_STRATA_COUNT 32
44 * Size of the IBFs in the strata estimator.
46 #define SE_IBF_SIZE 80
49 * The hash num parameter for the difference digests and strata estimators.
51 #define SE_IBF_HASH_NUM 4
54 * Number of buckets that can be transmitted in one message.
56 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
59 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60 * Choose this value so that computing the IBF is still cheaper
61 * than transmitting all values.
63 #define MAX_IBF_ORDER (20)
66 * Number of buckets used in the ibf per estimated
73 * Current phase we are in for a union operation.
75 enum UnionOperationPhase
78 * We sent the request message, and expect a strata estimator.
83 * We sent the strata estimator, and expect an IBF. This phase is entered once
84 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86 * XXX: could use better wording.
87 * XXX: repurposed to also expect a "request full set" message, should be renamed
89 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
94 * Continuation for multi part IBFs.
96 PHASE_EXPECT_IBF_CONT,
99 * We are decoding an IBF.
101 PHASE_INVENTORY_ACTIVE,
104 * The other peer is decoding the IBF we just sent.
106 PHASE_INVENTORY_PASSIVE,
109 * The protocol is almost finished, but we still have to flush our message
110 * queue and/or expect some elements.
112 PHASE_FINISH_CLOSING,
115 * In the penultimate phase,
116 * we wait until all our demands
117 * are satisfied. Then we send a done
118 * message, and wait for another done message.
120 PHASE_FINISH_WAITING,
123 * In the ultimate phase, we wait until
124 * our demands are satisfied and then
125 * quit (sending another DONE message).
130 * After sending the full set, wait for responses with the elements
131 * that the local peer is missing.
138 * State of an evaluate operation with another peer.
140 struct OperationState
143 * Copy of the set's strata estimator at the time of
144 * creation of this operation.
146 struct StrataEstimator *se;
149 * The IBF we currently receive.
151 struct InvertibleBloomFilter *remote_ibf;
154 * The IBF with the local set's element.
156 struct InvertibleBloomFilter *local_ibf;
159 * Maps unsalted IBF-Keys to elements.
160 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
161 * Colliding IBF-Keys are linked.
163 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
166 * Current state of the operation.
168 enum UnionOperationPhase phase;
171 * Did we send the client that we are done?
173 int client_done_sent;
176 * Number of ibf buckets already received into the @a remote_ibf.
178 unsigned int ibf_buckets_received;
181 * Hashes for elements that we have demanded from the other peer.
183 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
186 * Salt that we're using for sending IBFs
191 * Salt for the IBF we've received and that we're currently decoding.
193 uint32_t salt_receive;
196 * Number of elements we received from the other peer
197 * that were not in the local set yet.
199 uint32_t received_fresh;
202 * Total number of elements received from the other peer.
204 uint32_t received_total;
207 * Initial size of our set, just before
208 * the operation started.
210 uint64_t initial_size;
215 * The key entry is used to associate an ibf key with an element.
220 * IBF key for the entry, derived from the current salt.
222 struct IBF_Key ibf_key;
225 * The actual element associated with the key.
227 * Only owned by the union operation if element->operation
230 struct ElementEntry *element;
233 * Did we receive this element?
234 * Even if element->is_foreign is false, we might
235 * have received the element, so this indicates that
236 * the other peer has it.
243 * Used as a closure for sending elements
244 * with a specific IBF key.
246 struct SendElementClosure
249 * The IBF key whose matching elements should be
252 struct IBF_Key ibf_key;
255 * Operation for which the elements
258 struct Operation *op;
263 * Extra state required for efficient set union.
268 * The strata estimator is only generated once for
270 * The IBF keys are derived from the element hashes with
273 struct StrataEstimator *se;
278 * Iterator over hash map entries, called to
279 * destroy the linked list of colliding ibf key entries.
282 * @param key current key code
283 * @param value value in the hash map
284 * @return #GNUNET_YES if we should continue to iterate,
288 destroy_key_to_element_iter (void *cls,
292 struct KeyEntry *k = value;
294 GNUNET_assert (NULL != k);
295 if (GNUNET_YES == k->element->remote)
297 GNUNET_free (k->element);
306 * Destroy the union operation. Only things specific to the union
307 * operation are destroyed.
309 * @param op union operation to destroy
312 union_op_cancel (struct Operation *op)
314 LOG (GNUNET_ERROR_TYPE_DEBUG,
315 "destroying union op\n");
316 /* check if the op was canceled twice */
317 GNUNET_assert (NULL != op->state);
318 if (NULL != op->state->remote_ibf)
320 ibf_destroy (op->state->remote_ibf);
321 op->state->remote_ibf = NULL;
323 if (NULL != op->state->demanded_hashes)
325 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
326 op->state->demanded_hashes = NULL;
328 if (NULL != op->state->local_ibf)
330 ibf_destroy (op->state->local_ibf);
331 op->state->local_ibf = NULL;
333 if (NULL != op->state->se)
335 strata_estimator_destroy (op->state->se);
336 op->state->se = NULL;
338 if (NULL != op->state->key_to_element)
340 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
341 &destroy_key_to_element_iter,
343 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
344 op->state->key_to_element = NULL;
346 GNUNET_free (op->state);
348 LOG (GNUNET_ERROR_TYPE_DEBUG,
349 "destroying union op done\n");
354 * Inform the client that the union operation has failed,
355 * and proceed to destroy the evaluate operation.
357 * @param op the union operation to fail
360 fail_union_operation (struct Operation *op)
362 struct GNUNET_MQ_Envelope *ev;
363 struct GNUNET_SET_ResultMessage *msg;
365 LOG (GNUNET_ERROR_TYPE_WARNING,
366 "union operation failed\n");
367 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
368 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
369 msg->request_id = htonl (op->client_request_id);
370 msg->element_type = htons (0);
371 GNUNET_MQ_send (op->set->cs->mq,
373 _GSS_operation_destroy (op, GNUNET_YES);
378 * Derive the IBF key from a hash code and
381 * @param src the hash code
382 * @return the derived IBF key
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
390 GNUNET_assert (GNUNET_OK ==
391 GNUNET_CRYPTO_kdf (&key, sizeof (key),
393 &salt, sizeof (salt),
400 * Context for #op_get_element_iterator
402 struct GetElementContext
407 struct GNUNET_HashCode hash;
417 * Iterator over the mapping from IBF keys to element entries. Checks if we
418 * have an element with a given GNUNET_HashCode.
421 * @param key current key code
422 * @param value value in the hash map
423 * @return #GNUNET_YES if we should search further,
424 * #GNUNET_NO if we've found the element.
427 op_get_element_iterator (void *cls,
431 struct GetElementContext *ctx = cls;
432 struct KeyEntry *k = value;
434 GNUNET_assert (NULL != k);
435 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
446 * Determine whether the given element is already in the operation's element
449 * @param op operation that should be tested for 'element_hash'
450 * @param element_hash hash of the element to look for
451 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
453 static struct KeyEntry *
454 op_get_element (struct Operation *op,
455 const struct GNUNET_HashCode *element_hash)
458 struct IBF_Key ibf_key;
459 struct GetElementContext ctx = {{{ 0 }} , 0};
461 ctx.hash = *element_hash;
463 ibf_key = get_ibf_key (element_hash);
464 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
465 (uint32_t) ibf_key.key_val,
466 op_get_element_iterator,
469 /* was the iteration aborted because we found the element? */
470 if (GNUNET_SYSERR == ret)
472 GNUNET_assert (NULL != ctx.k);
480 * Insert an element into the union operation's
481 * key-to-element mapping. Takes ownership of 'ee'.
482 * Note that this does not insert the element in the set,
483 * only in the operation's key-element mapping.
484 * This is done to speed up re-tried operations, if some elements
485 * were transmitted, and then the IBF fails to decode.
487 * XXX: clarify ownership, doesn't sound right.
489 * @param op the union operation
490 * @param ee the element entry
491 * @parem received was this element received from the remote peer?
494 op_register_element (struct Operation *op,
495 struct ElementEntry *ee,
498 struct IBF_Key ibf_key;
501 ibf_key = get_ibf_key (&ee->element_hash);
502 k = GNUNET_new (struct KeyEntry);
504 k->ibf_key = ibf_key;
505 k->received = received;
506 GNUNET_assert (GNUNET_OK ==
507 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
508 (uint32_t) ibf_key.key_val,
510 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
518 salt_key (const struct IBF_Key *k_in,
520 struct IBF_Key *k_out)
523 uint64_t x = k_in->key_val;
525 x = (x >> s) | (x << (64 - s));
534 unsalt_key (const struct IBF_Key *k_in,
536 struct IBF_Key *k_out)
539 uint64_t x = k_in->key_val;
540 x = (x << s) | (x >> (64 - s));
546 * Insert a key into an ibf.
550 * @param value the key entry to get the key from
553 prepare_ibf_iterator (void *cls,
557 struct Operation *op = cls;
558 struct KeyEntry *ke = value;
559 struct IBF_Key salted_key;
561 LOG (GNUNET_ERROR_TYPE_DEBUG,
562 "[OP %x] inserting %lx (hash %s) into ibf\n",
564 (unsigned long) ke->ibf_key.key_val,
565 GNUNET_h2s (&ke->element->element_hash));
566 salt_key (&ke->ibf_key,
567 op->state->salt_send,
569 ibf_insert (op->state->local_ibf, salted_key);
575 * Iterator for initializing the
576 * key-to-element mapping of a union operation
578 * @param cls the union operation `struct Operation *`
580 * @param value the `struct ElementEntry *` to insert
581 * into the key-to-element mapping
582 * @return #GNUNET_YES (to continue iterating)
585 init_key_to_element_iterator (void *cls,
586 const struct GNUNET_HashCode *key,
589 struct Operation *op = cls;
590 struct ElementEntry *ee = value;
592 /* make sure that the element belongs to the set at the time
593 * of creating the operation */
595 _GSS_is_element_of_operation (ee,
598 GNUNET_assert (GNUNET_NO == ee->remote);
599 op_register_element (op,
607 * Initialize the IBF key to element mapping local to this set
610 * @param op the set union operation
613 initialize_key_to_element (struct Operation *op)
617 GNUNET_assert (NULL == op->state->key_to_element);
618 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
619 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
620 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
621 &init_key_to_element_iterator,
627 * Create an ibf with the operation's elements
628 * of the specified size
630 * @param op the union operation
631 * @param size size of the ibf to create
632 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
635 prepare_ibf (struct Operation *op,
638 GNUNET_assert (NULL != op->state->key_to_element);
640 if (NULL != op->state->local_ibf)
641 ibf_destroy (op->state->local_ibf);
642 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
643 if (NULL == op->state->local_ibf)
645 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
646 "Failed to allocate local IBF\n");
647 return GNUNET_SYSERR;
649 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
650 &prepare_ibf_iterator,
657 * Send an ibf of appropriate size.
659 * Fragments the IBF into multiple messages if necessary.
661 * @param op the union operation
662 * @param ibf_order order of the ibf to send, size=2^order
663 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
666 send_ibf (struct Operation *op,
669 unsigned int buckets_sent = 0;
670 struct InvertibleBloomFilter *ibf;
673 prepare_ibf (op, 1<<ibf_order))
675 /* allocation failed */
676 return GNUNET_SYSERR;
679 LOG (GNUNET_ERROR_TYPE_DEBUG,
680 "sending ibf of size %u\n",
684 char name[64] = { 0 };
685 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
686 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
689 ibf = op->state->local_ibf;
691 while (buckets_sent < (1 << ibf_order))
693 unsigned int buckets_in_message;
694 struct GNUNET_MQ_Envelope *ev;
695 struct IBFMessage *msg;
697 buckets_in_message = (1 << ibf_order) - buckets_sent;
698 /* limit to maximum */
699 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
700 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
702 ev = GNUNET_MQ_msg_extra (msg,
703 buckets_in_message * IBF_BUCKET_SIZE,
704 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
707 msg->order = ibf_order;
708 msg->offset = htonl (buckets_sent);
709 msg->salt = htonl (op->state->salt_send);
710 ibf_write_slice (ibf, buckets_sent,
711 buckets_in_message, &msg[1]);
712 buckets_sent += buckets_in_message;
713 LOG (GNUNET_ERROR_TYPE_DEBUG,
714 "ibf chunk size %u, %u/%u sent\n",
718 GNUNET_MQ_send (op->mq, ev);
721 /* The other peer must decode the IBF, so
723 op->state->phase = PHASE_INVENTORY_PASSIVE;
729 * Compute the necessary order of an ibf
730 * from the size of the symmetric set difference.
732 * @param diff the difference
733 * @return the required size of the ibf
736 get_order_from_difference (unsigned int diff)
738 unsigned int ibf_order;
741 while ( ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
742 ((1<<ibf_order) < SE_IBF_HASH_NUM) ) &&
743 (ibf_order < MAX_IBF_ORDER) )
745 // add one for correction
746 return ibf_order + 1;
751 * Send a set element.
753 * @param cls the union operation `struct Operation *`
755 * @param value the `struct ElementEntry *` to insert
756 * into the key-to-element mapping
757 * @return #GNUNET_YES (to continue iterating)
760 send_full_element_iterator (void *cls,
761 const struct GNUNET_HashCode *key,
764 struct Operation *op = cls;
765 struct GNUNET_SET_ElementMessage *emsg;
766 struct ElementEntry *ee = value;
767 struct GNUNET_SET_Element *el = &ee->element;
768 struct GNUNET_MQ_Envelope *ev;
770 LOG (GNUNET_ERROR_TYPE_DEBUG,
771 "Sending element %s\n",
773 ev = GNUNET_MQ_msg_extra (emsg,
775 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
776 emsg->element_type = htons (el->element_type);
777 GNUNET_memcpy (&emsg[1],
780 GNUNET_MQ_send (op->mq,
787 * Switch to full set transmission for @a op.
789 * @param op operation to switch to full set transmission.
792 send_full_set (struct Operation *op)
794 struct GNUNET_MQ_Envelope *ev;
796 op->state->phase = PHASE_FULL_SENDING;
797 LOG (GNUNET_ERROR_TYPE_DEBUG,
798 "Dedicing to transmit the full set\n");
799 /* FIXME: use a more memory-friendly way of doing this with an
800 iterator, just as we do in the non-full case! */
801 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
802 &send_full_element_iterator,
804 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
805 GNUNET_MQ_send (op->mq,
811 * Handle a strata estimator from a remote peer
813 * @param cls the union operation
814 * @param msg the message
817 check_union_p2p_strata_estimator (void *cls,
818 const struct StrataEstimatorMessage *msg)
820 struct Operation *op = cls;
824 if (op->state->phase != PHASE_EXPECT_SE)
827 return GNUNET_SYSERR;
829 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
830 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
831 if ( (GNUNET_NO == is_compressed) &&
832 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
835 return GNUNET_SYSERR;
842 * Handle a strata estimator from a remote peer
844 * @param cls the union operation
845 * @param msg the message
848 handle_union_p2p_strata_estimator (void *cls,
849 const struct StrataEstimatorMessage *msg)
851 struct Operation *op = cls;
852 struct StrataEstimator *remote_se;
858 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
859 GNUNET_STATISTICS_update (_GSS_statistics,
860 "# bytes of SE received",
861 ntohs (msg->header.size),
863 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
864 other_size = GNUNET_ntohll (msg->set_size);
865 remote_se = strata_estimator_create (SE_STRATA_COUNT,
868 if (NULL == remote_se)
870 /* insufficient resources, fail */
871 fail_union_operation (op);
875 strata_estimator_read (&msg[1],
880 /* decompression failed */
881 strata_estimator_destroy (remote_se);
882 fail_union_operation (op);
885 GNUNET_assert (NULL != op->state->se);
886 diff = strata_estimator_difference (remote_se,
892 strata_estimator_destroy (remote_se);
893 strata_estimator_destroy (op->state->se);
894 op->state->se = NULL;
895 LOG (GNUNET_ERROR_TYPE_DEBUG,
896 "got se diff=%d, using ibf size %d\n",
898 1U << get_order_from_difference (diff));
903 set_debug = getenv ("GNUNET_SET_BENCHMARK");
904 if ( (NULL != set_debug) &&
905 (0 == strcmp (set_debug, "1")) )
907 FILE *f = fopen ("set.log", "a");
908 fprintf (f, "%llu\n", (unsigned long long) diff);
913 if ( (GNUNET_YES == op->byzantine) &&
914 (other_size < op->byzantine_lower_bound) )
917 fail_union_operation (op);
921 if ( (GNUNET_YES == op->force_full) ||
922 (diff > op->state->initial_size / 4) ||
925 LOG (GNUNET_ERROR_TYPE_DEBUG,
926 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
928 op->state->initial_size);
929 GNUNET_STATISTICS_update (_GSS_statistics,
933 if ( (op->state->initial_size <= other_size) ||
940 struct GNUNET_MQ_Envelope *ev;
942 LOG (GNUNET_ERROR_TYPE_DEBUG,
943 "Telling other peer that we expect its full set\n");
944 op->state->phase = PHASE_EXPECT_IBF;
945 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
946 GNUNET_MQ_send (op->mq,
952 GNUNET_STATISTICS_update (_GSS_statistics,
958 get_order_from_difference (diff)))
960 /* Internal error, best we can do is shut the connection */
961 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
962 "Failed to send IBF, closing connection\n");
963 fail_union_operation (op);
967 GNUNET_CADET_receive_done (op->channel);
972 * Iterator to send elements to a remote peer
974 * @param cls closure with the element key and the union operation
976 * @param value the key entry
979 send_offers_iterator (void *cls,
983 struct SendElementClosure *sec = cls;
984 struct Operation *op = sec->op;
985 struct KeyEntry *ke = value;
986 struct GNUNET_MQ_Envelope *ev;
987 struct GNUNET_MessageHeader *mh;
989 /* Detect 32-bit key collision for the 64-bit IBF keys. */
990 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
993 ev = GNUNET_MQ_msg_header_extra (mh,
994 sizeof (struct GNUNET_HashCode),
995 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
997 GNUNET_assert (NULL != ev);
998 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
999 LOG (GNUNET_ERROR_TYPE_DEBUG,
1000 "[OP %x] sending element offer (%s) to peer\n",
1002 GNUNET_h2s (&ke->element->element_hash));
1003 GNUNET_MQ_send (op->mq, ev);
1009 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1011 * @param op union operation
1012 * @param ibf_key IBF key of interest
1015 send_offers_for_key (struct Operation *op,
1016 struct IBF_Key ibf_key)
1018 struct SendElementClosure send_cls;
1020 send_cls.ibf_key = ibf_key;
1022 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
1023 (uint32_t) ibf_key.key_val,
1024 &send_offers_iterator,
1030 * Decode which elements are missing on each side, and
1031 * send the appropriate offers and inquiries.
1033 * @param op union operation
1034 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1037 decode_and_send (struct Operation *op)
1040 struct IBF_Key last_key;
1042 unsigned int num_decoded;
1043 struct InvertibleBloomFilter *diff_ibf;
1045 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1049 op->state->remote_ibf->size))
1052 /* allocation failed */
1053 return GNUNET_SYSERR;
1055 diff_ibf = ibf_dup (op->state->local_ibf);
1056 ibf_subtract (diff_ibf,
1057 op->state->remote_ibf);
1059 ibf_destroy (op->state->remote_ibf);
1060 op->state->remote_ibf = NULL;
1062 LOG (GNUNET_ERROR_TYPE_DEBUG,
1063 "decoding IBF (size=%u)\n",
1067 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1072 int cycle_detected = GNUNET_NO;
1076 res = ibf_decode (diff_ibf, &side, &key);
1077 if (res == GNUNET_OK)
1079 LOG (GNUNET_ERROR_TYPE_DEBUG,
1080 "decoded ibf key %lx\n",
1081 (unsigned long) key.key_val);
1083 if ( (num_decoded > diff_ibf->size) ||
1084 ( (num_decoded > 1) &&
1085 (last_key.key_val == key.key_val) ) )
1087 LOG (GNUNET_ERROR_TYPE_DEBUG,
1088 "detected cyclic ibf (decoded %u/%u)\n",
1091 cycle_detected = GNUNET_YES;
1094 if ( (GNUNET_SYSERR == res) ||
1095 (GNUNET_YES == cycle_detected) )
1099 while (1<<next_order < diff_ibf->size)
1102 if (next_order <= MAX_IBF_ORDER)
1104 LOG (GNUNET_ERROR_TYPE_DEBUG,
1105 "decoding failed, sending larger ibf (size %u)\n",
1107 GNUNET_STATISTICS_update (_GSS_statistics,
1111 op->state->salt_send++;
1113 send_ibf (op, next_order))
1115 /* Internal error, best we can do is shut the connection */
1116 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1117 "Failed to send IBF, closing connection\n");
1118 fail_union_operation (op);
1119 ibf_destroy (diff_ibf);
1120 return GNUNET_SYSERR;
1125 GNUNET_STATISTICS_update (_GSS_statistics,
1126 "# of failed union operations (too large)",
1129 // XXX: Send the whole set, element-by-element
1130 LOG (GNUNET_ERROR_TYPE_ERROR,
1131 "set union failed: reached ibf limit\n");
1132 fail_union_operation (op);
1133 ibf_destroy (diff_ibf);
1134 return GNUNET_SYSERR;
1138 if (GNUNET_NO == res)
1140 struct GNUNET_MQ_Envelope *ev;
1142 LOG (GNUNET_ERROR_TYPE_DEBUG,
1143 "transmitted all values, sending DONE\n");
1144 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1145 GNUNET_MQ_send (op->mq, ev);
1146 /* We now wait until we get a DONE message back
1147 * and then wait for our MQ to be flushed and all our
1148 * demands be delivered. */
1153 struct IBF_Key unsalted_key;
1156 op->state->salt_receive,
1158 send_offers_for_key (op,
1161 else if (-1 == side)
1163 struct GNUNET_MQ_Envelope *ev;
1164 struct InquiryMessage *msg;
1166 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1167 * the effort additional complexity. */
1168 ev = GNUNET_MQ_msg_extra (msg,
1169 sizeof (struct IBF_Key),
1170 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1171 msg->salt = htonl (op->state->salt_receive);
1172 GNUNET_memcpy (&msg[1],
1174 sizeof (struct IBF_Key));
1175 LOG (GNUNET_ERROR_TYPE_DEBUG,
1176 "sending element inquiry for IBF key %lx\n",
1177 (unsigned long) key.key_val);
1178 GNUNET_MQ_send (op->mq, ev);
1185 ibf_destroy (diff_ibf);
1191 * Check an IBF message from a remote peer.
1193 * Reassemble the IBF from multiple pieces, and
1194 * process the whole IBF once possible.
1196 * @param cls the union operation
1197 * @param msg the header of the message
1198 * @return #GNUNET_OK if @a msg is well-formed
1201 check_union_p2p_ibf (void *cls,
1202 const struct IBFMessage *msg)
1204 struct Operation *op = cls;
1205 unsigned int buckets_in_message;
1207 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1209 GNUNET_break_op (0);
1210 return GNUNET_SYSERR;
1212 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1213 if (0 == buckets_in_message)
1215 GNUNET_break_op (0);
1216 return GNUNET_SYSERR;
1218 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1220 GNUNET_break_op (0);
1221 return GNUNET_SYSERR;
1223 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1225 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1227 GNUNET_break_op (0);
1228 return GNUNET_SYSERR;
1230 if (1<<msg->order != op->state->remote_ibf->size)
1232 GNUNET_break_op (0);
1233 return GNUNET_SYSERR;
1235 if (ntohl (msg->salt) != op->state->salt_receive)
1237 GNUNET_break_op (0);
1238 return GNUNET_SYSERR;
1241 else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1242 (op->state->phase != PHASE_EXPECT_IBF) )
1244 GNUNET_break_op (0);
1245 return GNUNET_SYSERR;
1253 * Handle an IBF message from a remote peer.
1255 * Reassemble the IBF from multiple pieces, and
1256 * process the whole IBF once possible.
1258 * @param cls the union operation
1259 * @param msg the header of the message
1262 handle_union_p2p_ibf (void *cls,
1263 const struct IBFMessage *msg)
1265 struct Operation *op = cls;
1266 unsigned int buckets_in_message;
1268 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1269 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1270 (op->state->phase == PHASE_EXPECT_IBF) )
1272 op->state->phase = PHASE_EXPECT_IBF_CONT;
1273 GNUNET_assert (NULL == op->state->remote_ibf);
1274 LOG (GNUNET_ERROR_TYPE_DEBUG,
1275 "Creating new ibf of size %u\n",
1277 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1278 op->state->salt_receive = ntohl (msg->salt);
1279 LOG (GNUNET_ERROR_TYPE_DEBUG,
1280 "Receiving new IBF with salt %u\n",
1281 op->state->salt_receive);
1282 if (NULL == op->state->remote_ibf)
1284 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1285 "Failed to parse remote IBF, closing connection\n");
1286 fail_union_operation (op);
1289 op->state->ibf_buckets_received = 0;
1290 if (0 != ntohl (msg->offset))
1292 GNUNET_break_op (0);
1293 fail_union_operation (op);
1299 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1300 LOG (GNUNET_ERROR_TYPE_DEBUG,
1301 "Received more of IBF\n");
1303 GNUNET_assert (NULL != op->state->remote_ibf);
1305 ibf_read_slice (&msg[1],
1306 op->state->ibf_buckets_received,
1308 op->state->remote_ibf);
1309 op->state->ibf_buckets_received += buckets_in_message;
1311 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1313 LOG (GNUNET_ERROR_TYPE_DEBUG,
1314 "received full ibf\n");
1315 op->state->phase = PHASE_INVENTORY_ACTIVE;
1317 decode_and_send (op))
1319 /* Internal error, best we can do is shut down */
1320 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1321 "Failed to decode IBF, closing connection\n");
1322 fail_union_operation (op);
1326 GNUNET_CADET_receive_done (op->channel);
1331 * Send a result message to the client indicating
1332 * that there is a new element.
1334 * @param op union operation
1335 * @param element element to send
1336 * @param status status to send with the new element
1339 send_client_element (struct Operation *op,
1340 struct GNUNET_SET_Element *element,
1343 struct GNUNET_MQ_Envelope *ev;
1344 struct GNUNET_SET_ResultMessage *rm;
1346 LOG (GNUNET_ERROR_TYPE_DEBUG,
1347 "sending element (size %u) to client\n",
1349 GNUNET_assert (0 != op->client_request_id);
1350 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1353 GNUNET_MQ_discard (ev);
1357 rm->result_status = htons (status);
1358 rm->request_id = htonl (op->client_request_id);
1359 rm->element_type = htons (element->element_type);
1360 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1361 GNUNET_memcpy (&rm[1],
1364 GNUNET_MQ_send (op->set->cs->mq,
1370 * Destroy remote channel.
1372 * @param op operation
1375 destroy_channel (struct Operation *op)
1377 struct GNUNET_CADET_Channel *channel;
1379 if (NULL != (channel = op->channel))
1381 /* This will free op; called conditionally as this helper function
1382 is also called from within the channel disconnect handler. */
1384 GNUNET_CADET_channel_destroy (channel);
1390 * Signal to the client that the operation has finished and
1391 * destroy the operation.
1393 * @param cls operation to destroy
1396 send_client_done (void *cls)
1398 struct Operation *op = cls;
1399 struct GNUNET_MQ_Envelope *ev;
1400 struct GNUNET_SET_ResultMessage *rm;
1402 if (GNUNET_YES == op->state->client_done_sent) {
1406 if (PHASE_DONE != op->state->phase) {
1407 LOG (GNUNET_ERROR_TYPE_WARNING,
1408 "Union operation failed\n");
1409 GNUNET_STATISTICS_update (_GSS_statistics,
1410 "# Union operations failed",
1413 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1414 rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1415 rm->request_id = htonl (op->client_request_id);
1416 rm->element_type = htons (0);
1417 GNUNET_MQ_send (op->set->cs->mq,
1422 op->state->client_done_sent = GNUNET_YES;
1424 GNUNET_STATISTICS_update (_GSS_statistics,
1425 "# Union operations succeeded",
1428 LOG (GNUNET_ERROR_TYPE_INFO,
1429 "Signalling client that union operation is done\n");
1430 ev = GNUNET_MQ_msg (rm,
1431 GNUNET_MESSAGE_TYPE_SET_RESULT);
1432 rm->request_id = htonl (op->client_request_id);
1433 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1434 rm->element_type = htons (0);
1435 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1436 GNUNET_MQ_send (op->set->cs->mq,
1442 * Tests if the operation is finished, and if so notify.
1444 * @param op operation to check
1447 maybe_finish (struct Operation *op)
1449 unsigned int num_demanded;
1451 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1453 if (PHASE_FINISH_WAITING == op->state->phase)
1455 LOG (GNUNET_ERROR_TYPE_DEBUG,
1456 "In PHASE_FINISH_WAITING, pending %u demands\n",
1458 if (0 == num_demanded)
1460 struct GNUNET_MQ_Envelope *ev;
1462 op->state->phase = PHASE_DONE;
1463 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1464 GNUNET_MQ_send (op->mq,
1466 /* We now wait until the other peer sends P2P_OVER
1467 * after it got all elements from us. */
1470 if (PHASE_FINISH_CLOSING == op->state->phase)
1472 LOG (GNUNET_ERROR_TYPE_DEBUG,
1473 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1475 if (0 == num_demanded)
1477 op->state->phase = PHASE_DONE;
1478 send_client_done (op);
1479 destroy_channel (op);
1486 * Check an element message from a remote peer.
1488 * @param cls the union operation
1489 * @param emsg the message
1492 check_union_p2p_elements (void *cls,
1493 const struct GNUNET_SET_ElementMessage *emsg)
1495 struct Operation *op = cls;
1497 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1499 GNUNET_break_op (0);
1500 return GNUNET_SYSERR;
1502 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1504 GNUNET_break_op (0);
1505 return GNUNET_SYSERR;
1512 * Handle an element message from a remote peer.
1513 * Sent by the other peer either because we decoded an IBF and placed a demand,
1514 * or because the other peer switched to full set transmission.
1516 * @param cls the union operation
1517 * @param emsg the message
1520 handle_union_p2p_elements (void *cls,
1521 const struct GNUNET_SET_ElementMessage *emsg)
1523 struct Operation *op = cls;
1524 struct ElementEntry *ee;
1525 struct KeyEntry *ke;
1526 uint16_t element_size;
1528 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1529 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1530 GNUNET_memcpy (&ee[1],
1533 ee->element.size = element_size;
1534 ee->element.data = &ee[1];
1535 ee->element.element_type = ntohs (emsg->element_type);
1536 ee->remote = GNUNET_YES;
1537 GNUNET_SET_element_hash (&ee->element,
1540 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1544 /* We got something we didn't demand, since it's not in our map. */
1545 GNUNET_break_op (0);
1546 fail_union_operation (op);
1550 LOG (GNUNET_ERROR_TYPE_DEBUG,
1551 "Got element (size %u, hash %s) from peer\n",
1552 (unsigned int) element_size,
1553 GNUNET_h2s (&ee->element_hash));
1555 GNUNET_STATISTICS_update (_GSS_statistics,
1556 "# received elements",
1559 GNUNET_STATISTICS_update (_GSS_statistics,
1560 "# exchanged elements",
1564 op->state->received_total++;
1566 ke = op_get_element (op, &ee->element_hash);
1569 /* Got repeated element. Should not happen since
1570 * we track demands. */
1571 GNUNET_STATISTICS_update (_GSS_statistics,
1572 "# repeated elements",
1575 ke->received = GNUNET_YES;
1580 LOG (GNUNET_ERROR_TYPE_DEBUG,
1581 "Registering new element from remote peer\n");
1582 op->state->received_fresh++;
1583 op_register_element (op, ee, GNUNET_YES);
1584 /* only send results immediately if the client wants it */
1585 switch (op->result_mode)
1587 case GNUNET_SET_RESULT_ADDED:
1588 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1590 case GNUNET_SET_RESULT_SYMMETRIC:
1591 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1594 /* Result mode not supported, should have been caught earlier. */
1600 if ( (op->state->received_total > 8) &&
1601 (op->state->received_fresh < op->state->received_total / 3) )
1603 /* The other peer gave us lots of old elements, there's something wrong. */
1604 GNUNET_break_op (0);
1605 fail_union_operation (op);
1608 GNUNET_CADET_receive_done (op->channel);
1614 * Check a full element message from a remote peer.
1616 * @param cls the union operation
1617 * @param emsg the message
1620 check_union_p2p_full_element (void *cls,
1621 const struct GNUNET_SET_ElementMessage *emsg)
1623 struct Operation *op = cls;
1625 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1627 GNUNET_break_op (0);
1628 return GNUNET_SYSERR;
1630 // FIXME: check that we expect full elements here?
1636 * Handle an element message from a remote peer.
1638 * @param cls the union operation
1639 * @param emsg the message
1642 handle_union_p2p_full_element (void *cls,
1643 const struct GNUNET_SET_ElementMessage *emsg)
1645 struct Operation *op = cls;
1646 struct ElementEntry *ee;
1647 struct KeyEntry *ke;
1648 uint16_t element_size;
1650 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1651 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1652 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1653 ee->element.size = element_size;
1654 ee->element.data = &ee[1];
1655 ee->element.element_type = ntohs (emsg->element_type);
1656 ee->remote = GNUNET_YES;
1657 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1659 LOG (GNUNET_ERROR_TYPE_DEBUG,
1660 "Got element (full diff, size %u, hash %s) from peer\n",
1661 (unsigned int) element_size,
1662 GNUNET_h2s (&ee->element_hash));
1664 GNUNET_STATISTICS_update (_GSS_statistics,
1665 "# received elements",
1668 GNUNET_STATISTICS_update (_GSS_statistics,
1669 "# exchanged elements",
1673 op->state->received_total++;
1675 ke = op_get_element (op, &ee->element_hash);
1678 /* Got repeated element. Should not happen since
1679 * we track demands. */
1680 GNUNET_STATISTICS_update (_GSS_statistics,
1681 "# repeated elements",
1684 ke->received = GNUNET_YES;
1689 LOG (GNUNET_ERROR_TYPE_DEBUG,
1690 "Registering new element from remote peer\n");
1691 op->state->received_fresh++;
1692 op_register_element (op, ee, GNUNET_YES);
1693 /* only send results immediately if the client wants it */
1694 switch (op->result_mode)
1696 case GNUNET_SET_RESULT_ADDED:
1697 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1699 case GNUNET_SET_RESULT_SYMMETRIC:
1700 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1703 /* Result mode not supported, should have been caught earlier. */
1709 if ( (GNUNET_YES == op->byzantine) &&
1710 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1711 (op->state->received_fresh < op->state->received_total / 6) )
1713 /* The other peer gave us lots of old elements, there's something wrong. */
1714 LOG (GNUNET_ERROR_TYPE_ERROR,
1715 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1716 (unsigned long long) op->state->received_fresh,
1717 (unsigned long long) op->state->received_total);
1718 GNUNET_break_op (0);
1719 fail_union_operation (op);
1722 GNUNET_CADET_receive_done (op->channel);
1727 * Send offers (for GNUNET_Hash-es) in response
1728 * to inquiries (for IBF_Key-s).
1730 * @param cls the union operation
1731 * @param msg the message
1734 check_union_p2p_inquiry (void *cls,
1735 const struct InquiryMessage *msg)
1737 struct Operation *op = cls;
1738 unsigned int num_keys;
1740 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1742 GNUNET_break_op (0);
1743 return GNUNET_SYSERR;
1745 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1747 GNUNET_break_op (0);
1748 return GNUNET_SYSERR;
1750 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1751 / sizeof (struct IBF_Key);
1752 if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1753 != num_keys * sizeof (struct IBF_Key))
1755 GNUNET_break_op (0);
1756 return GNUNET_SYSERR;
1763 * Send offers (for GNUNET_Hash-es) in response
1764 * to inquiries (for IBF_Key-s).
1766 * @param cls the union operation
1767 * @param msg the message
1770 handle_union_p2p_inquiry (void *cls,
1771 const struct InquiryMessage *msg)
1773 struct Operation *op = cls;
1774 const struct IBF_Key *ibf_key;
1775 unsigned int num_keys;
1777 LOG (GNUNET_ERROR_TYPE_DEBUG,
1778 "Received union inquiry\n");
1779 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1780 / sizeof (struct IBF_Key);
1781 ibf_key = (const struct IBF_Key *) &msg[1];
1782 while (0 != num_keys--)
1784 struct IBF_Key unsalted_key;
1786 unsalt_key (ibf_key,
1789 send_offers_for_key (op,
1793 GNUNET_CADET_receive_done (op->channel);
1798 * Iterator over hash map entries, called to
1799 * destroy the linked list of colliding ibf key entries.
1801 * @param cls closure
1802 * @param key current key code
1803 * @param value value in the hash map
1804 * @return #GNUNET_YES if we should continue to iterate,
1805 * #GNUNET_NO if not.
1808 send_missing_full_elements_iter (void *cls,
1812 struct Operation *op = cls;
1813 struct KeyEntry *ke = value;
1814 struct GNUNET_MQ_Envelope *ev;
1815 struct GNUNET_SET_ElementMessage *emsg;
1816 struct ElementEntry *ee = ke->element;
1818 if (GNUNET_YES == ke->received)
1820 ev = GNUNET_MQ_msg_extra (emsg,
1822 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1823 GNUNET_memcpy (&emsg[1],
1826 emsg->element_type = htons (ee->element.element_type);
1827 GNUNET_MQ_send (op->mq,
1834 * Handle a request for full set transmission.
1836 * @parem cls closure, a set union operation
1837 * @param mh the demand message
1840 handle_union_p2p_request_full (void *cls,
1841 const struct GNUNET_MessageHeader *mh)
1843 struct Operation *op = cls;
1845 LOG (GNUNET_ERROR_TYPE_DEBUG,
1846 "Received request for full set transmission\n");
1847 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1849 GNUNET_break_op (0);
1850 fail_union_operation (op);
1853 if (PHASE_EXPECT_IBF != op->state->phase)
1855 GNUNET_break_op (0);
1856 fail_union_operation (op);
1860 // FIXME: we need to check that our set is larger than the
1861 // byzantine_lower_bound by some threshold
1863 GNUNET_CADET_receive_done (op->channel);
1868 * Handle a "full done" message.
1870 * @parem cls closure, a set union operation
1871 * @param mh the demand message
1874 handle_union_p2p_full_done (void *cls,
1875 const struct GNUNET_MessageHeader *mh)
1877 struct Operation *op = cls;
1879 switch (op->state->phase)
1881 case PHASE_EXPECT_IBF:
1883 struct GNUNET_MQ_Envelope *ev;
1885 LOG (GNUNET_ERROR_TYPE_DEBUG,
1886 "got FULL DONE, sending elements that other peer is missing\n");
1888 /* send all the elements that did not come from the remote peer */
1889 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1890 &send_missing_full_elements_iter,
1893 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1894 GNUNET_MQ_send (op->mq,
1896 op->state->phase = PHASE_DONE;
1897 /* we now wait until the other peer sends us the OVER message*/
1900 case PHASE_FULL_SENDING:
1902 LOG (GNUNET_ERROR_TYPE_DEBUG,
1903 "got FULL DONE, finishing\n");
1904 /* We sent the full set, and got the response for that. We're done. */
1905 op->state->phase = PHASE_DONE;
1906 GNUNET_CADET_receive_done (op->channel);
1907 send_client_done (op);
1908 destroy_channel (op);
1913 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1914 "Handle full done phase is %u\n",
1915 (unsigned) op->state->phase);
1916 GNUNET_break_op (0);
1917 fail_union_operation (op);
1920 GNUNET_CADET_receive_done (op->channel);
1925 * Check a demand by the other peer for elements based on a list
1926 * of `struct GNUNET_HashCode`s.
1928 * @parem cls closure, a set union operation
1929 * @param mh the demand message
1930 * @return #GNUNET_OK if @a mh is well-formed
1933 check_union_p2p_demand (void *cls,
1934 const struct GNUNET_MessageHeader *mh)
1936 struct Operation *op = cls;
1937 unsigned int num_hashes;
1939 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1941 GNUNET_break_op (0);
1942 return GNUNET_SYSERR;
1944 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1945 / sizeof (struct GNUNET_HashCode);
1946 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1947 != num_hashes * sizeof (struct GNUNET_HashCode))
1949 GNUNET_break_op (0);
1950 return GNUNET_SYSERR;
1957 * Handle a demand by the other peer for elements based on a list
1958 * of `struct GNUNET_HashCode`s.
1960 * @parem cls closure, a set union operation
1961 * @param mh the demand message
1964 handle_union_p2p_demand (void *cls,
1965 const struct GNUNET_MessageHeader *mh)
1967 struct Operation *op = cls;
1968 struct ElementEntry *ee;
1969 struct GNUNET_SET_ElementMessage *emsg;
1970 const struct GNUNET_HashCode *hash;
1971 unsigned int num_hashes;
1972 struct GNUNET_MQ_Envelope *ev;
1974 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1975 / sizeof (struct GNUNET_HashCode);
1976 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1978 hash++, num_hashes--)
1980 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1984 /* Demand for non-existing element. */
1985 GNUNET_break_op (0);
1986 fail_union_operation (op);
1989 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1991 /* Probably confused lazily copied sets. */
1992 GNUNET_break_op (0);
1993 fail_union_operation (op);
1996 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1997 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1998 emsg->reserved = htons (0);
1999 emsg->element_type = htons (ee->element.element_type);
2000 LOG (GNUNET_ERROR_TYPE_DEBUG,
2001 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
2003 (unsigned int) ee->element.size,
2004 GNUNET_h2s (&ee->element_hash));
2005 GNUNET_MQ_send (op->mq, ev);
2006 GNUNET_STATISTICS_update (_GSS_statistics,
2007 "# exchanged elements",
2011 switch (op->result_mode)
2013 case GNUNET_SET_RESULT_ADDED:
2014 /* Nothing to do. */
2016 case GNUNET_SET_RESULT_SYMMETRIC:
2017 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2020 /* Result mode not supported, should have been caught earlier. */
2025 GNUNET_CADET_receive_done (op->channel);
2030 * Check offer (of `struct GNUNET_HashCode`s).
2032 * @param cls the union operation
2033 * @param mh the message
2034 * @return #GNUNET_OK if @a mh is well-formed
2037 check_union_p2p_offer (void *cls,
2038 const struct GNUNET_MessageHeader *mh)
2040 struct Operation *op = cls;
2041 unsigned int num_hashes;
2043 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2045 GNUNET_break_op (0);
2046 return GNUNET_SYSERR;
2048 /* look up elements and send them */
2049 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2050 (op->state->phase != PHASE_INVENTORY_ACTIVE))
2052 GNUNET_break_op (0);
2053 return GNUNET_SYSERR;
2055 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2056 / sizeof (struct GNUNET_HashCode);
2057 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) !=
2058 num_hashes * sizeof (struct GNUNET_HashCode))
2060 GNUNET_break_op (0);
2061 return GNUNET_SYSERR;
2068 * Handle offers (of `struct GNUNET_HashCode`s) and
2069 * respond with demands (of `struct GNUNET_HashCode`s).
2071 * @param cls the union operation
2072 * @param mh the message
2075 handle_union_p2p_offer (void *cls,
2076 const struct GNUNET_MessageHeader *mh)
2078 struct Operation *op = cls;
2079 const struct GNUNET_HashCode *hash;
2080 unsigned int num_hashes;
2082 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2083 / sizeof (struct GNUNET_HashCode);
2084 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2086 hash++, num_hashes--)
2088 struct ElementEntry *ee;
2089 struct GNUNET_MessageHeader *demands;
2090 struct GNUNET_MQ_Envelope *ev;
2092 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2095 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2099 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2102 LOG (GNUNET_ERROR_TYPE_DEBUG,
2103 "Skipped sending duplicate demand\n");
2107 GNUNET_assert (GNUNET_OK ==
2108 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
2111 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2113 LOG (GNUNET_ERROR_TYPE_DEBUG,
2114 "[OP %x] Requesting element (hash %s)\n",
2115 (void *) op, GNUNET_h2s (hash));
2116 ev = GNUNET_MQ_msg_header_extra (demands,
2117 sizeof (struct GNUNET_HashCode),
2118 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2119 GNUNET_memcpy (&demands[1],
2121 sizeof (struct GNUNET_HashCode));
2122 GNUNET_MQ_send (op->mq, ev);
2124 GNUNET_CADET_receive_done (op->channel);
2129 * Handle a done message from a remote peer
2131 * @param cls the union operation
2132 * @param mh the message
2135 handle_union_p2p_done (void *cls,
2136 const struct GNUNET_MessageHeader *mh)
2138 struct Operation *op = cls;
2140 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2142 GNUNET_break_op (0);
2143 fail_union_operation (op);
2146 switch (op->state->phase)
2148 case PHASE_INVENTORY_PASSIVE:
2149 /* We got all requests, but still have to send our elements in response. */
2150 op->state->phase = PHASE_FINISH_WAITING;
2152 LOG (GNUNET_ERROR_TYPE_DEBUG,
2153 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2154 /* The active peer is done sending offers
2155 * and inquiries. This means that all
2156 * our responses to that (demands and offers)
2157 * must be in flight (queued or in mesh).
2159 * We should notify the active peer once
2160 * all our demands are satisfied, so that the active
2161 * peer can quit if we gave it everything.
2163 GNUNET_CADET_receive_done (op->channel);
2166 case PHASE_INVENTORY_ACTIVE:
2167 LOG (GNUNET_ERROR_TYPE_DEBUG,
2168 "got DONE (as active partner), waiting to finish\n");
2169 /* All demands of the other peer are satisfied,
2170 * and we processed all offers, thus we know
2171 * exactly what our demands must be.
2173 * We'll close the channel
2174 * to the other peer once our demands are met.
2176 op->state->phase = PHASE_FINISH_CLOSING;
2177 GNUNET_CADET_receive_done (op->channel);
2181 GNUNET_break_op (0);
2182 fail_union_operation (op);
2188 * Handle a over message from a remote peer
2190 * @param cls the union operation
2191 * @param mh the message
2194 handle_union_p2p_over (void *cls,
2195 const struct GNUNET_MessageHeader *mh)
2197 send_client_done (cls);
2202 * Initiate operation to evaluate a set union with a remote peer.
2204 * @param op operation to perform (to be initialized)
2205 * @param opaque_context message to be transmitted to the listener
2206 * to convince it to accept, may be NULL
2208 static struct OperationState *
2209 union_evaluate (struct Operation *op,
2210 const struct GNUNET_MessageHeader *opaque_context)
2212 struct OperationState *state;
2213 struct GNUNET_MQ_Envelope *ev;
2214 struct OperationRequestMessage *msg;
2216 ev = GNUNET_MQ_msg_nested_mh (msg,
2217 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2221 /* the context message is too large */
2225 state = GNUNET_new (struct OperationState);
2226 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2228 /* copy the current generation's strata estimator for this operation */
2229 state->se = strata_estimator_dup (op->set->state->se);
2230 /* we started the operation, thus we have to send the operation request */
2231 state->phase = PHASE_EXPECT_SE;
2232 state->salt_receive = state->salt_send = 42; // FIXME?????
2233 LOG (GNUNET_ERROR_TYPE_DEBUG,
2234 "Initiating union operation evaluation\n");
2235 GNUNET_STATISTICS_update (_GSS_statistics,
2236 "# of total union operations",
2239 GNUNET_STATISTICS_update (_GSS_statistics,
2240 "# of initiated union operations",
2243 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2244 GNUNET_MQ_send (op->mq,
2247 if (NULL != opaque_context)
2248 LOG (GNUNET_ERROR_TYPE_DEBUG,
2249 "sent op request with context message\n");
2251 LOG (GNUNET_ERROR_TYPE_DEBUG,
2252 "sent op request without context message\n");
2255 initialize_key_to_element (op);
2256 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2262 * Accept an union operation request from a remote peer.
2263 * Only initializes the private operation state.
2265 * @param op operation that will be accepted as a union operation
2267 static struct OperationState *
2268 union_accept (struct Operation *op)
2270 struct OperationState *state;
2271 const struct StrataEstimator *se;
2272 struct GNUNET_MQ_Envelope *ev;
2273 struct StrataEstimatorMessage *strata_msg;
2278 LOG (GNUNET_ERROR_TYPE_DEBUG,
2279 "accepting set union operation\n");
2280 GNUNET_STATISTICS_update (_GSS_statistics,
2281 "# of accepted union operations",
2284 GNUNET_STATISTICS_update (_GSS_statistics,
2285 "# of total union operations",
2289 state = GNUNET_new (struct OperationState);
2290 state->se = strata_estimator_dup (op->set->state->se);
2291 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2293 state->salt_receive = state->salt_send = 42; // FIXME?????
2295 initialize_key_to_element (op);
2296 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2298 /* kick off the operation */
2300 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2301 len = strata_estimator_write (se,
2303 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2304 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2306 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2307 ev = GNUNET_MQ_msg_extra (strata_msg,
2310 GNUNET_memcpy (&strata_msg[1],
2314 strata_msg->set_size
2315 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
2316 GNUNET_MQ_send (op->mq,
2318 state->phase = PHASE_EXPECT_IBF;
2324 * Create a new set supporting the union operation
2326 * We maintain one strata estimator per set and then manipulate it over the
2327 * lifetime of the set, as recreating a strata estimator would be expensive.
2329 * @return the newly created set, NULL on error
2331 static struct SetState *
2332 union_set_create (void)
2334 struct SetState *set_state;
2336 LOG (GNUNET_ERROR_TYPE_DEBUG,
2337 "union set created\n");
2338 set_state = GNUNET_new (struct SetState);
2339 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2340 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2341 if (NULL == set_state->se)
2343 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2344 "Failed to allocate strata estimator\n");
2345 GNUNET_free (set_state);
2353 * Add the element from the given element message to the set.
2355 * @param set_state state of the set want to add to
2356 * @param ee the element to add to the set
2359 union_add (struct SetState *set_state,
2360 struct ElementEntry *ee)
2362 strata_estimator_insert (set_state->se,
2363 get_ibf_key (&ee->element_hash));
2368 * Remove the element given in the element message from the set.
2369 * Only marks the element as removed, so that older set operations can still exchange it.
2371 * @param set_state state of the set to remove from
2372 * @param ee set element to remove
2375 union_remove (struct SetState *set_state,
2376 struct ElementEntry *ee)
2378 strata_estimator_remove (set_state->se,
2379 get_ibf_key (&ee->element_hash));
2384 * Destroy a set that supports the union operation.
2386 * @param set_state the set to destroy
2389 union_set_destroy (struct SetState *set_state)
2391 if (NULL != set_state->se)
2393 strata_estimator_destroy (set_state->se);
2394 set_state->se = NULL;
2396 GNUNET_free (set_state);
2401 * Copy union-specific set state.
2403 * @param state source state for copying the union state
2404 * @return a copy of the union-specific set state
2406 static struct SetState *
2407 union_copy_state (struct SetState *state)
2409 struct SetState *new_state;
2411 GNUNET_assert ( (NULL != state) &&
2412 (NULL != state->se) );
2413 new_state = GNUNET_new (struct SetState);
2414 new_state->se = strata_estimator_dup (state->se);
2421 * Handle case where channel went down for an operation.
2423 * @param op operation that lost the channel
2426 union_channel_death (struct Operation *op)
2428 send_client_done (op);
2429 _GSS_operation_destroy (op,
2435 * Get the table with implementing functions for
2438 * @return the operation specific VTable
2440 const struct SetVT *
2443 static const struct SetVT union_vt = {
2444 .create = &union_set_create,
2446 .remove = &union_remove,
2447 .destroy_set = &union_set_destroy,
2448 .evaluate = &union_evaluate,
2449 .accept = &union_accept,
2450 .cancel = &union_op_cancel,
2451 .copy_state = &union_copy_state,
2452 .channel_death = &union_channel_death