2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union.h"
32 #include "gnunet-service-set_union_strata_estimator.h"
33 #include "gnunet-service-set_protocol.h"
37 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
41 * Number of IBFs in a strata estimator.
43 #define SE_STRATA_COUNT 32
46 * Size of the IBFs in the strata estimator.
48 #define SE_IBF_SIZE 80
51 * The hash num parameter for the difference digests and strata estimators.
53 #define SE_IBF_HASH_NUM 4
56 * Number of buckets that can be transmitted in one message.
58 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
61 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
62 * Choose this value so that computing the IBF is still cheaper
63 * than transmitting all values.
65 #define MAX_IBF_ORDER (20)
68 * Number of buckets used in the ibf per estimated
75 * Current phase we are in for a union operation.
77 enum UnionOperationPhase
80 * We sent the request message, and expect a strata estimator.
85 * We sent the strata estimator, and expect an IBF. This phase is entered once
86 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
88 * XXX: could use better wording.
89 * XXX: repurposed to also expect a "request full set" message, should be renamed
91 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
96 * Continuation for multi part IBFs.
98 PHASE_EXPECT_IBF_CONT,
101 * We are decoding an IBF.
103 PHASE_INVENTORY_ACTIVE,
106 * The other peer is decoding the IBF we just sent.
108 PHASE_INVENTORY_PASSIVE,
111 * The protocol is almost finished, but we still have to flush our message
112 * queue and/or expect some elements.
114 PHASE_FINISH_CLOSING,
117 * In the penultimate phase,
118 * we wait until all our demands
119 * are satisfied. Then we send a done
120 * message, and wait for another done message.
122 PHASE_FINISH_WAITING,
125 * In the ultimate phase, we wait until
126 * our demands are satisfied and then
127 * quit (sending another DONE message).
132 * After sending the full set, wait for responses with the elements
133 * that the local peer is missing.
140 * State of an evaluate operation with another peer.
142 struct OperationState
145 * Copy of the set's strata estimator at the time of
146 * creation of this operation.
148 struct StrataEstimator *se;
151 * The IBF we currently receive.
153 struct InvertibleBloomFilter *remote_ibf;
156 * The IBF with the local set's element.
158 struct InvertibleBloomFilter *local_ibf;
161 * Maps unsalted IBF-Keys to elements.
162 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
163 * Colliding IBF-Keys are linked.
165 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
168 * Current state of the operation.
170 enum UnionOperationPhase phase;
173 * Did we send the client that we are done?
175 int client_done_sent;
178 * Number of ibf buckets already received into the @a remote_ibf.
180 unsigned int ibf_buckets_received;
183 * Hashes for elements that we have demanded from the other peer.
185 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
188 * Salt that we're using for sending IBFs
193 * Salt for the IBF we've received and that we're currently decoding.
195 uint32_t salt_receive;
198 * Number of elements we received from the other peer
199 * that were not in the local set yet.
201 uint32_t received_fresh;
204 * Total number of elements received from the other peer.
206 uint32_t received_total;
209 * Initial size of our set, just before
210 * the operation started.
212 uint64_t initial_size;
217 * The key entry is used to associate an ibf key with an element.
222 * IBF key for the entry, derived from the current salt.
224 struct IBF_Key ibf_key;
227 * The actual element associated with the key.
229 * Only owned by the union operation if element->operation
232 struct ElementEntry *element;
235 * Did we receive this element?
236 * Even if element->is_foreign is false, we might
237 * have received the element, so this indicates that
238 * the other peer has it.
245 * Used as a closure for sending elements
246 * with a specific IBF key.
248 struct SendElementClosure
251 * The IBF key whose matching elements should be
254 struct IBF_Key ibf_key;
257 * Operation for which the elements
260 struct Operation *op;
265 * Extra state required for efficient set union.
270 * The strata estimator is only generated once for
272 * The IBF keys are derived from the element hashes with
275 struct StrataEstimator *se;
280 * Iterator over hash map entries, called to
281 * destroy the linked list of colliding ibf key entries.
284 * @param key current key code
285 * @param value value in the hash map
286 * @return #GNUNET_YES if we should continue to iterate,
290 destroy_key_to_element_iter (void *cls,
294 struct KeyEntry *k = value;
296 GNUNET_assert (NULL != k);
297 if (GNUNET_YES == k->element->remote)
299 GNUNET_free (k->element);
308 * Destroy the union operation. Only things specific to the union
309 * operation are destroyed.
311 * @param op union operation to destroy
314 union_op_cancel (struct Operation *op)
316 LOG (GNUNET_ERROR_TYPE_DEBUG,
317 "destroying union op\n");
318 /* check if the op was canceled twice */
319 GNUNET_assert (NULL != op->state);
320 if (NULL != op->state->remote_ibf)
322 ibf_destroy (op->state->remote_ibf);
323 op->state->remote_ibf = NULL;
325 if (NULL != op->state->demanded_hashes)
327 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
328 op->state->demanded_hashes = NULL;
330 if (NULL != op->state->local_ibf)
332 ibf_destroy (op->state->local_ibf);
333 op->state->local_ibf = NULL;
335 if (NULL != op->state->se)
337 strata_estimator_destroy (op->state->se);
338 op->state->se = NULL;
340 if (NULL != op->state->key_to_element)
342 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
343 &destroy_key_to_element_iter,
345 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
346 op->state->key_to_element = NULL;
348 GNUNET_free (op->state);
350 LOG (GNUNET_ERROR_TYPE_DEBUG,
351 "destroying union op done\n");
356 * Inform the client that the union operation has failed,
357 * and proceed to destroy the evaluate operation.
359 * @param op the union operation to fail
362 fail_union_operation (struct Operation *op)
364 struct GNUNET_MQ_Envelope *ev;
365 struct GNUNET_SET_ResultMessage *msg;
367 LOG (GNUNET_ERROR_TYPE_ERROR,
368 "union operation failed\n");
369 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371 msg->request_id = htonl (op->client_request_id);
372 msg->element_type = htons (0);
373 GNUNET_MQ_send (op->set->cs->mq,
375 _GSS_operation_destroy (op, GNUNET_YES);
380 * Derive the IBF key from a hash code and
383 * @param src the hash code
384 * @return the derived IBF key
386 static struct IBF_Key
387 get_ibf_key (const struct GNUNET_HashCode *src)
392 GNUNET_CRYPTO_kdf (&key, sizeof (key),
394 &salt, sizeof (salt),
401 * Context for #op_get_element_iterator
403 struct GetElementContext
408 struct GNUNET_HashCode hash;
418 * Iterator over the mapping from IBF keys to element entries. Checks if we
419 * have an element with a given GNUNET_HashCode.
422 * @param key current key code
423 * @param value value in the hash map
424 * @return #GNUNET_YES if we should search further,
425 * #GNUNET_NO if we've found the element.
428 op_get_element_iterator (void *cls,
432 struct GetElementContext *ctx = cls;
433 struct KeyEntry *k = value;
435 GNUNET_assert (NULL != k);
436 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
447 * Determine whether the given element is already in the operation's element
450 * @param op operation that should be tested for 'element_hash'
451 * @param element_hash hash of the element to look for
452 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
454 static struct KeyEntry *
455 op_get_element (struct Operation *op,
456 const struct GNUNET_HashCode *element_hash)
459 struct IBF_Key ibf_key;
460 struct GetElementContext ctx = {{{ 0 }} , 0};
462 ctx.hash = *element_hash;
464 ibf_key = get_ibf_key (element_hash);
465 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
466 (uint32_t) ibf_key.key_val,
467 op_get_element_iterator,
470 /* was the iteration aborted because we found the element? */
471 if (GNUNET_SYSERR == ret)
473 GNUNET_assert (NULL != ctx.k);
481 * Insert an element into the union operation's
482 * key-to-element mapping. Takes ownership of 'ee'.
483 * Note that this does not insert the element in the set,
484 * only in the operation's key-element mapping.
485 * This is done to speed up re-tried operations, if some elements
486 * were transmitted, and then the IBF fails to decode.
488 * XXX: clarify ownership, doesn't sound right.
490 * @param op the union operation
491 * @param ee the element entry
492 * @parem received was this element received from the remote peer?
495 op_register_element (struct Operation *op,
496 struct ElementEntry *ee,
499 struct IBF_Key ibf_key;
502 ibf_key = get_ibf_key (&ee->element_hash);
503 k = GNUNET_new (struct KeyEntry);
505 k->ibf_key = ibf_key;
506 k->received = received;
507 GNUNET_assert (GNUNET_OK ==
508 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
509 (uint32_t) ibf_key.key_val,
511 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
519 salt_key (const struct IBF_Key *k_in,
521 struct IBF_Key *k_out)
524 uint64_t x = k_in->key_val;
526 x = (x >> s) | (x << (64 - s));
535 unsalt_key (const struct IBF_Key *k_in,
537 struct IBF_Key *k_out)
540 uint64_t x = k_in->key_val;
541 x = (x << s) | (x >> (64 - s));
547 * Insert a key into an ibf.
551 * @param value the key entry to get the key from
554 prepare_ibf_iterator (void *cls,
558 struct Operation *op = cls;
559 struct KeyEntry *ke = value;
560 struct IBF_Key salted_key;
562 LOG (GNUNET_ERROR_TYPE_DEBUG,
563 "[OP %x] inserting %lx (hash %s) into ibf\n",
565 (unsigned long) ke->ibf_key.key_val,
566 GNUNET_h2s (&ke->element->element_hash));
567 salt_key (&ke->ibf_key,
568 op->state->salt_send,
570 ibf_insert (op->state->local_ibf, salted_key);
576 * Iterator for initializing the
577 * key-to-element mapping of a union operation
579 * @param cls the union operation `struct Operation *`
581 * @param value the `struct ElementEntry *` to insert
582 * into the key-to-element mapping
583 * @return #GNUNET_YES (to continue iterating)
586 init_key_to_element_iterator (void *cls,
587 const struct GNUNET_HashCode *key,
590 struct Operation *op = cls;
591 struct ElementEntry *ee = value;
593 /* make sure that the element belongs to the set at the time
594 * of creating the operation */
596 _GSS_is_element_of_operation (ee,
599 GNUNET_assert (GNUNET_NO == ee->remote);
600 op_register_element (op,
608 * Initialize the IBF key to element mapping local to this set
611 * @param op the set union operation
614 initialize_key_to_element (struct Operation *op)
618 GNUNET_assert (NULL == op->state->key_to_element);
619 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
620 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
621 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
622 &init_key_to_element_iterator,
628 * Create an ibf with the operation's elements
629 * of the specified size
631 * @param op the union operation
632 * @param size size of the ibf to create
633 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
636 prepare_ibf (struct Operation *op,
639 GNUNET_assert (NULL != op->state->key_to_element);
641 if (NULL != op->state->local_ibf)
642 ibf_destroy (op->state->local_ibf);
643 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
644 if (NULL == op->state->local_ibf)
646 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
647 "Failed to allocate local IBF\n");
648 return GNUNET_SYSERR;
650 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
651 &prepare_ibf_iterator,
658 * Send an ibf of appropriate size.
660 * Fragments the IBF into multiple messages if necessary.
662 * @param op the union operation
663 * @param ibf_order order of the ibf to send, size=2^order
664 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
667 send_ibf (struct Operation *op,
670 unsigned int buckets_sent = 0;
671 struct InvertibleBloomFilter *ibf;
674 prepare_ibf (op, 1<<ibf_order))
676 /* allocation failed */
677 return GNUNET_SYSERR;
680 LOG (GNUNET_ERROR_TYPE_DEBUG,
681 "sending ibf of size %u\n",
685 char name[64] = { 0 };
686 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
687 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
690 ibf = op->state->local_ibf;
692 while (buckets_sent < (1 << ibf_order))
694 unsigned int buckets_in_message;
695 struct GNUNET_MQ_Envelope *ev;
696 struct IBFMessage *msg;
698 buckets_in_message = (1 << ibf_order) - buckets_sent;
699 /* limit to maximum */
700 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
701 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
703 ev = GNUNET_MQ_msg_extra (msg,
704 buckets_in_message * IBF_BUCKET_SIZE,
705 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
708 msg->order = ibf_order;
709 msg->offset = htonl (buckets_sent);
710 msg->salt = htonl (op->state->salt_send);
711 ibf_write_slice (ibf, buckets_sent,
712 buckets_in_message, &msg[1]);
713 buckets_sent += buckets_in_message;
714 LOG (GNUNET_ERROR_TYPE_DEBUG,
715 "ibf chunk size %u, %u/%u sent\n",
719 GNUNET_MQ_send (op->mq, ev);
722 /* The other peer must decode the IBF, so
724 op->state->phase = PHASE_INVENTORY_PASSIVE;
730 * Compute the necessary order of an ibf
731 * from the size of the symmetric set difference.
733 * @param diff the difference
734 * @return the required size of the ibf
737 get_order_from_difference (unsigned int diff)
739 unsigned int ibf_order;
742 while ( ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
743 ((1<<ibf_order) < SE_IBF_HASH_NUM) ) &&
744 (ibf_order < MAX_IBF_ORDER) )
746 // add one for correction
747 return ibf_order + 1;
752 * Send a set element.
754 * @param cls the union operation `struct Operation *`
756 * @param value the `struct ElementEntry *` to insert
757 * into the key-to-element mapping
758 * @return #GNUNET_YES (to continue iterating)
761 send_full_element_iterator (void *cls,
762 const struct GNUNET_HashCode *key,
765 struct Operation *op = cls;
766 struct GNUNET_SET_ElementMessage *emsg;
767 struct ElementEntry *ee = value;
768 struct GNUNET_SET_Element *el = &ee->element;
769 struct GNUNET_MQ_Envelope *ev;
771 LOG (GNUNET_ERROR_TYPE_DEBUG,
772 "Sending element %s\n",
774 ev = GNUNET_MQ_msg_extra (emsg,
776 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
777 emsg->element_type = htons (el->element_type);
778 GNUNET_memcpy (&emsg[1],
781 GNUNET_MQ_send (op->mq,
788 * Switch to full set transmission for @a op.
790 * @param op operation to switch to full set transmission.
793 send_full_set (struct Operation *op)
795 struct GNUNET_MQ_Envelope *ev;
797 op->state->phase = PHASE_FULL_SENDING;
798 LOG (GNUNET_ERROR_TYPE_DEBUG,
799 "Dedicing to transmit the full set\n");
800 /* FIXME: use a more memory-friendly way of doing this with an
801 iterator, just as we do in the non-full case! */
802 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
803 &send_full_element_iterator,
805 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
806 GNUNET_MQ_send (op->mq,
812 * Handle a strata estimator from a remote peer
814 * @param cls the union operation
815 * @param msg the message
818 check_union_p2p_strata_estimator (void *cls,
819 const struct StrataEstimatorMessage *msg)
821 struct Operation *op = cls;
825 if (op->state->phase != PHASE_EXPECT_SE)
828 return GNUNET_SYSERR;
830 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
831 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
832 if ( (GNUNET_NO == is_compressed) &&
833 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
836 return GNUNET_SYSERR;
843 * Handle a strata estimator from a remote peer
845 * @param cls the union operation
846 * @param msg the message
849 handle_union_p2p_strata_estimator (void *cls,
850 const struct StrataEstimatorMessage *msg)
852 struct Operation *op = cls;
853 struct StrataEstimator *remote_se;
859 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
860 GNUNET_STATISTICS_update (_GSS_statistics,
861 "# bytes of SE received",
862 ntohs (msg->header.size),
864 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
865 other_size = GNUNET_ntohll (msg->set_size);
866 remote_se = strata_estimator_create (SE_STRATA_COUNT,
869 if (NULL == remote_se)
871 /* insufficient resources, fail */
872 fail_union_operation (op);
876 strata_estimator_read (&msg[1],
881 /* decompression failed */
882 strata_estimator_destroy (remote_se);
883 fail_union_operation (op);
886 GNUNET_assert (NULL != op->state->se);
887 diff = strata_estimator_difference (remote_se,
893 strata_estimator_destroy (remote_se);
894 strata_estimator_destroy (op->state->se);
895 op->state->se = NULL;
896 LOG (GNUNET_ERROR_TYPE_DEBUG,
897 "got se diff=%d, using ibf size %d\n",
899 1U << get_order_from_difference (diff));
904 set_debug = getenv ("GNUNET_SET_BENCHMARK");
905 if ( (NULL != set_debug) &&
906 (0 == strcmp (set_debug, "1")) )
908 FILE *f = fopen ("set.log", "a");
909 fprintf (f, "%llu\n", (unsigned long long) diff);
914 if ( (GNUNET_YES == op->byzantine) &&
915 (other_size < op->byzantine_lower_bound) )
918 fail_union_operation (op);
922 if ( (GNUNET_YES == op->force_full) ||
923 (diff > op->state->initial_size / 4) ||
926 LOG (GNUNET_ERROR_TYPE_DEBUG,
927 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
929 op->state->initial_size);
930 GNUNET_STATISTICS_update (_GSS_statistics,
934 if ( (op->state->initial_size <= other_size) ||
941 struct GNUNET_MQ_Envelope *ev;
943 LOG (GNUNET_ERROR_TYPE_DEBUG,
944 "Telling other peer that we expect its full set\n");
945 op->state->phase = PHASE_EXPECT_IBF;
946 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
947 GNUNET_MQ_send (op->mq,
953 GNUNET_STATISTICS_update (_GSS_statistics,
959 get_order_from_difference (diff)))
961 /* Internal error, best we can do is shut the connection */
962 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
963 "Failed to send IBF, closing connection\n");
964 fail_union_operation (op);
968 GNUNET_CADET_receive_done (op->channel);
973 * Iterator to send elements to a remote peer
975 * @param cls closure with the element key and the union operation
977 * @param value the key entry
980 send_offers_iterator (void *cls,
984 struct SendElementClosure *sec = cls;
985 struct Operation *op = sec->op;
986 struct KeyEntry *ke = value;
987 struct GNUNET_MQ_Envelope *ev;
988 struct GNUNET_MessageHeader *mh;
990 /* Detect 32-bit key collision for the 64-bit IBF keys. */
991 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
994 ev = GNUNET_MQ_msg_header_extra (mh,
995 sizeof (struct GNUNET_HashCode),
996 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
998 GNUNET_assert (NULL != ev);
999 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1000 LOG (GNUNET_ERROR_TYPE_DEBUG,
1001 "[OP %x] sending element offer (%s) to peer\n",
1003 GNUNET_h2s (&ke->element->element_hash));
1004 GNUNET_MQ_send (op->mq, ev);
1010 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1012 * @param op union operation
1013 * @param ibf_key IBF key of interest
1016 send_offers_for_key (struct Operation *op,
1017 struct IBF_Key ibf_key)
1019 struct SendElementClosure send_cls;
1021 send_cls.ibf_key = ibf_key;
1023 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
1024 (uint32_t) ibf_key.key_val,
1025 &send_offers_iterator,
1031 * Decode which elements are missing on each side, and
1032 * send the appropriate offers and inquiries.
1034 * @param op union operation
1035 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1038 decode_and_send (struct Operation *op)
1041 struct IBF_Key last_key;
1043 unsigned int num_decoded;
1044 struct InvertibleBloomFilter *diff_ibf;
1046 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1050 op->state->remote_ibf->size))
1053 /* allocation failed */
1054 return GNUNET_SYSERR;
1056 diff_ibf = ibf_dup (op->state->local_ibf);
1057 ibf_subtract (diff_ibf,
1058 op->state->remote_ibf);
1060 ibf_destroy (op->state->remote_ibf);
1061 op->state->remote_ibf = NULL;
1063 LOG (GNUNET_ERROR_TYPE_DEBUG,
1064 "decoding IBF (size=%u)\n",
1068 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1073 int cycle_detected = GNUNET_NO;
1077 res = ibf_decode (diff_ibf, &side, &key);
1078 if (res == GNUNET_OK)
1080 LOG (GNUNET_ERROR_TYPE_DEBUG,
1081 "decoded ibf key %lx\n",
1082 (unsigned long) key.key_val);
1084 if ( (num_decoded > diff_ibf->size) ||
1085 ( (num_decoded > 1) &&
1086 (last_key.key_val == key.key_val) ) )
1088 LOG (GNUNET_ERROR_TYPE_DEBUG,
1089 "detected cyclic ibf (decoded %u/%u)\n",
1092 cycle_detected = GNUNET_YES;
1095 if ( (GNUNET_SYSERR == res) ||
1096 (GNUNET_YES == cycle_detected) )
1100 while (1<<next_order < diff_ibf->size)
1103 if (next_order <= MAX_IBF_ORDER)
1105 LOG (GNUNET_ERROR_TYPE_DEBUG,
1106 "decoding failed, sending larger ibf (size %u)\n",
1108 GNUNET_STATISTICS_update (_GSS_statistics,
1112 op->state->salt_send++;
1114 send_ibf (op, next_order))
1116 /* Internal error, best we can do is shut the connection */
1117 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1118 "Failed to send IBF, closing connection\n");
1119 fail_union_operation (op);
1120 ibf_destroy (diff_ibf);
1121 return GNUNET_SYSERR;
1126 GNUNET_STATISTICS_update (_GSS_statistics,
1127 "# of failed union operations (too large)",
1130 // XXX: Send the whole set, element-by-element
1131 LOG (GNUNET_ERROR_TYPE_ERROR,
1132 "set union failed: reached ibf limit\n");
1133 fail_union_operation (op);
1134 ibf_destroy (diff_ibf);
1135 return GNUNET_SYSERR;
1139 if (GNUNET_NO == res)
1141 struct GNUNET_MQ_Envelope *ev;
1143 LOG (GNUNET_ERROR_TYPE_DEBUG,
1144 "transmitted all values, sending DONE\n");
1145 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1146 GNUNET_MQ_send (op->mq, ev);
1147 /* We now wait until we get a DONE message back
1148 * and then wait for our MQ to be flushed and all our
1149 * demands be delivered. */
1154 struct IBF_Key unsalted_key;
1157 op->state->salt_receive,
1159 send_offers_for_key (op,
1162 else if (-1 == side)
1164 struct GNUNET_MQ_Envelope *ev;
1165 struct InquiryMessage *msg;
1167 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1168 * the effort additional complexity. */
1169 ev = GNUNET_MQ_msg_extra (msg,
1170 sizeof (struct IBF_Key),
1171 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1172 msg->salt = htonl (op->state->salt_receive);
1173 GNUNET_memcpy (&msg[1],
1175 sizeof (struct IBF_Key));
1176 LOG (GNUNET_ERROR_TYPE_DEBUG,
1177 "sending element inquiry for IBF key %lx\n",
1178 (unsigned long) key.key_val);
1179 GNUNET_MQ_send (op->mq, ev);
1186 ibf_destroy (diff_ibf);
1192 * Check an IBF message from a remote peer.
1194 * Reassemble the IBF from multiple pieces, and
1195 * process the whole IBF once possible.
1197 * @param cls the union operation
1198 * @param msg the header of the message
1199 * @return #GNUNET_OK if @a msg is well-formed
1202 check_union_p2p_ibf (void *cls,
1203 const struct IBFMessage *msg)
1205 struct Operation *op = cls;
1206 unsigned int buckets_in_message;
1208 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1210 GNUNET_break_op (0);
1211 return GNUNET_SYSERR;
1213 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1214 if (0 == buckets_in_message)
1216 GNUNET_break_op (0);
1217 return GNUNET_SYSERR;
1219 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1221 GNUNET_break_op (0);
1222 return GNUNET_SYSERR;
1224 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1226 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1228 GNUNET_break_op (0);
1229 return GNUNET_SYSERR;
1231 if (1<<msg->order != op->state->remote_ibf->size)
1233 GNUNET_break_op (0);
1234 return GNUNET_SYSERR;
1236 if (ntohl (msg->salt) != op->state->salt_receive)
1238 GNUNET_break_op (0);
1239 return GNUNET_SYSERR;
1242 else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1243 (op->state->phase != PHASE_EXPECT_IBF) )
1245 GNUNET_break_op (0);
1246 return GNUNET_SYSERR;
1254 * Handle an IBF message from a remote peer.
1256 * Reassemble the IBF from multiple pieces, and
1257 * process the whole IBF once possible.
1259 * @param cls the union operation
1260 * @param msg the header of the message
1263 handle_union_p2p_ibf (void *cls,
1264 const struct IBFMessage *msg)
1266 struct Operation *op = cls;
1267 unsigned int buckets_in_message;
1269 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1270 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1271 (op->state->phase == PHASE_EXPECT_IBF) )
1273 op->state->phase = PHASE_EXPECT_IBF_CONT;
1274 GNUNET_assert (NULL == op->state->remote_ibf);
1275 LOG (GNUNET_ERROR_TYPE_DEBUG,
1276 "Creating new ibf of size %u\n",
1278 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1279 op->state->salt_receive = ntohl (msg->salt);
1280 LOG (GNUNET_ERROR_TYPE_DEBUG,
1281 "Receiving new IBF with salt %u\n",
1282 op->state->salt_receive);
1283 if (NULL == op->state->remote_ibf)
1285 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1286 "Failed to parse remote IBF, closing connection\n");
1287 fail_union_operation (op);
1290 op->state->ibf_buckets_received = 0;
1291 if (0 != ntohl (msg->offset))
1293 GNUNET_break_op (0);
1294 fail_union_operation (op);
1300 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1301 LOG (GNUNET_ERROR_TYPE_DEBUG,
1302 "Received more of IBF\n");
1304 GNUNET_assert (NULL != op->state->remote_ibf);
1306 ibf_read_slice (&msg[1],
1307 op->state->ibf_buckets_received,
1309 op->state->remote_ibf);
1310 op->state->ibf_buckets_received += buckets_in_message;
1312 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1314 LOG (GNUNET_ERROR_TYPE_DEBUG,
1315 "received full ibf\n");
1316 op->state->phase = PHASE_INVENTORY_ACTIVE;
1318 decode_and_send (op))
1320 /* Internal error, best we can do is shut down */
1321 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1322 "Failed to decode IBF, closing connection\n");
1323 fail_union_operation (op);
1327 GNUNET_CADET_receive_done (op->channel);
1332 * Send a result message to the client indicating
1333 * that there is a new element.
1335 * @param op union operation
1336 * @param element element to send
1337 * @param status status to send with the new element
1340 send_client_element (struct Operation *op,
1341 struct GNUNET_SET_Element *element,
1344 struct GNUNET_MQ_Envelope *ev;
1345 struct GNUNET_SET_ResultMessage *rm;
1347 LOG (GNUNET_ERROR_TYPE_DEBUG,
1348 "sending element (size %u) to client\n",
1350 GNUNET_assert (0 != op->client_request_id);
1351 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1354 GNUNET_MQ_discard (ev);
1358 rm->result_status = htons (status);
1359 rm->request_id = htonl (op->client_request_id);
1360 rm->element_type = htons (element->element_type);
1361 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1362 GNUNET_memcpy (&rm[1],
1365 GNUNET_MQ_send (op->set->cs->mq,
1371 * Destroy remote channel.
1373 * @param op operation
1375 void destroy_channel (struct Operation *op)
1377 struct GNUNET_CADET_Channel *channel;
1379 if (NULL != (channel = op->channel))
1381 /* This will free op; called conditionally as this helper function
1382 is also called from within the channel disconnect handler. */
1384 GNUNET_CADET_channel_destroy (channel);
1390 * Signal to the client that the operation has finished and
1391 * destroy the operation.
1393 * @param cls operation to destroy
1396 send_client_done (void *cls)
1398 struct Operation *op = cls;
1399 struct GNUNET_MQ_Envelope *ev;
1400 struct GNUNET_SET_ResultMessage *rm;
1402 if (GNUNET_YES == op->state->client_done_sent) {
1406 if (PHASE_DONE != op->state->phase) {
1407 LOG (GNUNET_ERROR_TYPE_ERROR,
1408 "union operation failed\n");
1409 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1410 rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1411 rm->request_id = htonl (op->client_request_id);
1412 rm->element_type = htons (0);
1413 GNUNET_MQ_send (op->set->cs->mq,
1418 op->state->client_done_sent = GNUNET_YES;
1420 LOG (GNUNET_ERROR_TYPE_INFO,
1421 "Signalling client that union operation is done\n");
1422 ev = GNUNET_MQ_msg (rm,
1423 GNUNET_MESSAGE_TYPE_SET_RESULT);
1424 rm->request_id = htonl (op->client_request_id);
1425 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1426 rm->element_type = htons (0);
1427 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1428 GNUNET_MQ_send (op->set->cs->mq,
1434 * Tests if the operation is finished, and if so notify.
1436 * @param op operation to check
1439 maybe_finish (struct Operation *op)
1441 unsigned int num_demanded;
1443 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1445 if (PHASE_FINISH_WAITING == op->state->phase)
1447 LOG (GNUNET_ERROR_TYPE_DEBUG,
1448 "In PHASE_FINISH_WAITING, pending %u demands\n",
1450 if (0 == num_demanded)
1452 struct GNUNET_MQ_Envelope *ev;
1454 op->state->phase = PHASE_DONE;
1455 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1456 GNUNET_MQ_send (op->mq,
1458 /* We now wait until the other peer sends P2P_OVER
1459 * after it got all elements from us. */
1462 if (PHASE_FINISH_CLOSING == op->state->phase)
1464 LOG (GNUNET_ERROR_TYPE_DEBUG,
1465 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1467 if (0 == num_demanded)
1469 op->state->phase = PHASE_DONE;
1470 send_client_done (op);
1471 destroy_channel (op);
1478 * Check an element message from a remote peer.
1480 * @param cls the union operation
1481 * @param emsg the message
1484 check_union_p2p_elements (void *cls,
1485 const struct GNUNET_SET_ElementMessage *emsg)
1487 struct Operation *op = cls;
1489 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1491 GNUNET_break_op (0);
1492 return GNUNET_SYSERR;
1494 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1496 GNUNET_break_op (0);
1497 return GNUNET_SYSERR;
1504 * Handle an element message from a remote peer.
1505 * Sent by the other peer either because we decoded an IBF and placed a demand,
1506 * or because the other peer switched to full set transmission.
1508 * @param cls the union operation
1509 * @param emsg the message
1512 handle_union_p2p_elements (void *cls,
1513 const struct GNUNET_SET_ElementMessage *emsg)
1515 struct Operation *op = cls;
1516 struct ElementEntry *ee;
1517 struct KeyEntry *ke;
1518 uint16_t element_size;
1520 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1521 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1522 GNUNET_memcpy (&ee[1],
1525 ee->element.size = element_size;
1526 ee->element.data = &ee[1];
1527 ee->element.element_type = ntohs (emsg->element_type);
1528 ee->remote = GNUNET_YES;
1529 GNUNET_SET_element_hash (&ee->element,
1532 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1536 /* We got something we didn't demand, since it's not in our map. */
1537 GNUNET_break_op (0);
1538 fail_union_operation (op);
1542 LOG (GNUNET_ERROR_TYPE_DEBUG,
1543 "Got element (size %u, hash %s) from peer\n",
1544 (unsigned int) element_size,
1545 GNUNET_h2s (&ee->element_hash));
1547 GNUNET_STATISTICS_update (_GSS_statistics,
1548 "# received elements",
1551 GNUNET_STATISTICS_update (_GSS_statistics,
1552 "# exchanged elements",
1556 op->state->received_total++;
1558 ke = op_get_element (op, &ee->element_hash);
1561 /* Got repeated element. Should not happen since
1562 * we track demands. */
1563 GNUNET_STATISTICS_update (_GSS_statistics,
1564 "# repeated elements",
1567 ke->received = GNUNET_YES;
1572 LOG (GNUNET_ERROR_TYPE_DEBUG,
1573 "Registering new element from remote peer\n");
1574 op->state->received_fresh++;
1575 op_register_element (op, ee, GNUNET_YES);
1576 /* only send results immediately if the client wants it */
1577 switch (op->result_mode)
1579 case GNUNET_SET_RESULT_ADDED:
1580 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1582 case GNUNET_SET_RESULT_SYMMETRIC:
1583 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1586 /* Result mode not supported, should have been caught earlier. */
1592 if ( (op->state->received_total > 8) &&
1593 (op->state->received_fresh < op->state->received_total / 3) )
1595 /* The other peer gave us lots of old elements, there's something wrong. */
1596 GNUNET_break_op (0);
1597 fail_union_operation (op);
1600 GNUNET_CADET_receive_done (op->channel);
1606 * Check a full element message from a remote peer.
1608 * @param cls the union operation
1609 * @param emsg the message
1612 check_union_p2p_full_element (void *cls,
1613 const struct GNUNET_SET_ElementMessage *emsg)
1615 struct Operation *op = cls;
1617 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1619 GNUNET_break_op (0);
1620 return GNUNET_SYSERR;
1622 // FIXME: check that we expect full elements here?
1628 * Handle an element message from a remote peer.
1630 * @param cls the union operation
1631 * @param emsg the message
1634 handle_union_p2p_full_element (void *cls,
1635 const struct GNUNET_SET_ElementMessage *emsg)
1637 struct Operation *op = cls;
1638 struct ElementEntry *ee;
1639 struct KeyEntry *ke;
1640 uint16_t element_size;
1642 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1643 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1644 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1645 ee->element.size = element_size;
1646 ee->element.data = &ee[1];
1647 ee->element.element_type = ntohs (emsg->element_type);
1648 ee->remote = GNUNET_YES;
1649 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1651 LOG (GNUNET_ERROR_TYPE_DEBUG,
1652 "Got element (full diff, size %u, hash %s) from peer\n",
1653 (unsigned int) element_size,
1654 GNUNET_h2s (&ee->element_hash));
1656 GNUNET_STATISTICS_update (_GSS_statistics,
1657 "# received elements",
1660 GNUNET_STATISTICS_update (_GSS_statistics,
1661 "# exchanged elements",
1665 op->state->received_total++;
1667 ke = op_get_element (op, &ee->element_hash);
1670 /* Got repeated element. Should not happen since
1671 * we track demands. */
1672 GNUNET_STATISTICS_update (_GSS_statistics,
1673 "# repeated elements",
1676 ke->received = GNUNET_YES;
1681 LOG (GNUNET_ERROR_TYPE_DEBUG,
1682 "Registering new element from remote peer\n");
1683 op->state->received_fresh++;
1684 op_register_element (op, ee, GNUNET_YES);
1685 /* only send results immediately if the client wants it */
1686 switch (op->result_mode)
1688 case GNUNET_SET_RESULT_ADDED:
1689 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1691 case GNUNET_SET_RESULT_SYMMETRIC:
1692 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1695 /* Result mode not supported, should have been caught earlier. */
1701 if ( (GNUNET_YES == op->byzantine) &&
1702 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1703 (op->state->received_fresh < op->state->received_total / 6) )
1705 /* The other peer gave us lots of old elements, there's something wrong. */
1706 LOG (GNUNET_ERROR_TYPE_ERROR,
1707 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1708 (unsigned long long) op->state->received_fresh,
1709 (unsigned long long) op->state->received_total);
1710 GNUNET_break_op (0);
1711 fail_union_operation (op);
1714 GNUNET_CADET_receive_done (op->channel);
1719 * Send offers (for GNUNET_Hash-es) in response
1720 * to inquiries (for IBF_Key-s).
1722 * @param cls the union operation
1723 * @param msg the message
1726 check_union_p2p_inquiry (void *cls,
1727 const struct InquiryMessage *msg)
1729 struct Operation *op = cls;
1730 unsigned int num_keys;
1732 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1734 GNUNET_break_op (0);
1735 return GNUNET_SYSERR;
1737 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1739 GNUNET_break_op (0);
1740 return GNUNET_SYSERR;
1742 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1743 / sizeof (struct IBF_Key);
1744 if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1745 != num_keys * sizeof (struct IBF_Key))
1747 GNUNET_break_op (0);
1748 return GNUNET_SYSERR;
1755 * Send offers (for GNUNET_Hash-es) in response
1756 * to inquiries (for IBF_Key-s).
1758 * @param cls the union operation
1759 * @param msg the message
1762 handle_union_p2p_inquiry (void *cls,
1763 const struct InquiryMessage *msg)
1765 struct Operation *op = cls;
1766 const struct IBF_Key *ibf_key;
1767 unsigned int num_keys;
1769 LOG (GNUNET_ERROR_TYPE_DEBUG,
1770 "Received union inquiry\n");
1771 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1772 / sizeof (struct IBF_Key);
1773 ibf_key = (const struct IBF_Key *) &msg[1];
1774 while (0 != num_keys--)
1776 struct IBF_Key unsalted_key;
1778 unsalt_key (ibf_key,
1781 send_offers_for_key (op,
1785 GNUNET_CADET_receive_done (op->channel);
1790 * Iterator over hash map entries, called to
1791 * destroy the linked list of colliding ibf key entries.
1793 * @param cls closure
1794 * @param key current key code
1795 * @param value value in the hash map
1796 * @return #GNUNET_YES if we should continue to iterate,
1797 * #GNUNET_NO if not.
1800 send_missing_full_elements_iter (void *cls,
1804 struct Operation *op = cls;
1805 struct KeyEntry *ke = value;
1806 struct GNUNET_MQ_Envelope *ev;
1807 struct GNUNET_SET_ElementMessage *emsg;
1808 struct ElementEntry *ee = ke->element;
1810 if (GNUNET_YES == ke->received)
1812 ev = GNUNET_MQ_msg_extra (emsg,
1814 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1815 GNUNET_memcpy (&emsg[1],
1818 emsg->element_type = htons (ee->element.element_type);
1819 GNUNET_MQ_send (op->mq,
1826 * Handle a request for full set transmission.
1828 * @parem cls closure, a set union operation
1829 * @param mh the demand message
1832 handle_union_p2p_request_full (void *cls,
1833 const struct GNUNET_MessageHeader *mh)
1835 struct Operation *op = cls;
1837 LOG (GNUNET_ERROR_TYPE_DEBUG,
1838 "Received request for full set transmission\n");
1839 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1841 GNUNET_break_op (0);
1842 fail_union_operation (op);
1845 if (PHASE_EXPECT_IBF != op->state->phase)
1847 GNUNET_break_op (0);
1848 fail_union_operation (op);
1852 // FIXME: we need to check that our set is larger than the
1853 // byzantine_lower_bound by some threshold
1855 GNUNET_CADET_receive_done (op->channel);
1860 * Handle a "full done" message.
1862 * @parem cls closure, a set union operation
1863 * @param mh the demand message
1866 handle_union_p2p_full_done (void *cls,
1867 const struct GNUNET_MessageHeader *mh)
1869 struct Operation *op = cls;
1871 switch (op->state->phase)
1873 case PHASE_EXPECT_IBF:
1875 struct GNUNET_MQ_Envelope *ev;
1877 LOG (GNUNET_ERROR_TYPE_DEBUG,
1878 "got FULL DONE, sending elements that other peer is missing\n");
1880 /* send all the elements that did not come from the remote peer */
1881 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1882 &send_missing_full_elements_iter,
1885 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1886 GNUNET_MQ_send (op->mq,
1888 op->state->phase = PHASE_DONE;
1889 /* we now wait until the other peer sends us the OVER message*/
1892 case PHASE_FULL_SENDING:
1894 LOG (GNUNET_ERROR_TYPE_DEBUG,
1895 "got FULL DONE, finishing\n");
1896 /* We sent the full set, and got the response for that. We're done. */
1897 op->state->phase = PHASE_DONE;
1898 GNUNET_CADET_receive_done (op->channel);
1899 send_client_done (op);
1900 destroy_channel (op);
1905 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906 "Handle full done phase is %u\n",
1907 (unsigned) op->state->phase);
1908 GNUNET_break_op (0);
1909 fail_union_operation (op);
1912 GNUNET_CADET_receive_done (op->channel);
1917 * Check a demand by the other peer for elements based on a list
1918 * of `struct GNUNET_HashCode`s.
1920 * @parem cls closure, a set union operation
1921 * @param mh the demand message
1922 * @return #GNUNET_OK if @a mh is well-formed
1925 check_union_p2p_demand (void *cls,
1926 const struct GNUNET_MessageHeader *mh)
1928 struct Operation *op = cls;
1929 unsigned int num_hashes;
1931 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1933 GNUNET_break_op (0);
1934 return GNUNET_SYSERR;
1936 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1937 / sizeof (struct GNUNET_HashCode);
1938 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1939 != num_hashes * sizeof (struct GNUNET_HashCode))
1941 GNUNET_break_op (0);
1942 return GNUNET_SYSERR;
1949 * Handle a demand by the other peer for elements based on a list
1950 * of `struct GNUNET_HashCode`s.
1952 * @parem cls closure, a set union operation
1953 * @param mh the demand message
1956 handle_union_p2p_demand (void *cls,
1957 const struct GNUNET_MessageHeader *mh)
1959 struct Operation *op = cls;
1960 struct ElementEntry *ee;
1961 struct GNUNET_SET_ElementMessage *emsg;
1962 const struct GNUNET_HashCode *hash;
1963 unsigned int num_hashes;
1964 struct GNUNET_MQ_Envelope *ev;
1966 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1967 / sizeof (struct GNUNET_HashCode);
1968 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1970 hash++, num_hashes--)
1972 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1976 /* Demand for non-existing element. */
1977 GNUNET_break_op (0);
1978 fail_union_operation (op);
1981 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1983 /* Probably confused lazily copied sets. */
1984 GNUNET_break_op (0);
1985 fail_union_operation (op);
1988 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1989 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1990 emsg->reserved = htons (0);
1991 emsg->element_type = htons (ee->element.element_type);
1992 LOG (GNUNET_ERROR_TYPE_DEBUG,
1993 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1995 (unsigned int) ee->element.size,
1996 GNUNET_h2s (&ee->element_hash));
1997 GNUNET_MQ_send (op->mq, ev);
1998 GNUNET_STATISTICS_update (_GSS_statistics,
1999 "# exchanged elements",
2003 switch (op->result_mode)
2005 case GNUNET_SET_RESULT_ADDED:
2006 /* Nothing to do. */
2008 case GNUNET_SET_RESULT_SYMMETRIC:
2009 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2012 /* Result mode not supported, should have been caught earlier. */
2017 GNUNET_CADET_receive_done (op->channel);
2022 * Check offer (of `struct GNUNET_HashCode`s).
2024 * @param cls the union operation
2025 * @param mh the message
2026 * @return #GNUNET_OK if @a mh is well-formed
2029 check_union_p2p_offer (void *cls,
2030 const struct GNUNET_MessageHeader *mh)
2032 struct Operation *op = cls;
2033 unsigned int num_hashes;
2035 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2037 GNUNET_break_op (0);
2038 return GNUNET_SYSERR;
2040 /* look up elements and send them */
2041 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2042 (op->state->phase != PHASE_INVENTORY_ACTIVE))
2044 GNUNET_break_op (0);
2045 return GNUNET_SYSERR;
2047 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2048 / sizeof (struct GNUNET_HashCode);
2049 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) !=
2050 num_hashes * sizeof (struct GNUNET_HashCode))
2052 GNUNET_break_op (0);
2053 return GNUNET_SYSERR;
2060 * Handle offers (of `struct GNUNET_HashCode`s) and
2061 * respond with demands (of `struct GNUNET_HashCode`s).
2063 * @param cls the union operation
2064 * @param mh the message
2067 handle_union_p2p_offer (void *cls,
2068 const struct GNUNET_MessageHeader *mh)
2070 struct Operation *op = cls;
2071 const struct GNUNET_HashCode *hash;
2072 unsigned int num_hashes;
2074 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2075 / sizeof (struct GNUNET_HashCode);
2076 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2078 hash++, num_hashes--)
2080 struct ElementEntry *ee;
2081 struct GNUNET_MessageHeader *demands;
2082 struct GNUNET_MQ_Envelope *ev;
2084 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2087 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2091 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2094 LOG (GNUNET_ERROR_TYPE_DEBUG,
2095 "Skipped sending duplicate demand\n");
2099 GNUNET_assert (GNUNET_OK ==
2100 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
2103 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2105 LOG (GNUNET_ERROR_TYPE_DEBUG,
2106 "[OP %x] Requesting element (hash %s)\n",
2107 (void *) op, GNUNET_h2s (hash));
2108 ev = GNUNET_MQ_msg_header_extra (demands,
2109 sizeof (struct GNUNET_HashCode),
2110 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2111 GNUNET_memcpy (&demands[1],
2113 sizeof (struct GNUNET_HashCode));
2114 GNUNET_MQ_send (op->mq, ev);
2116 GNUNET_CADET_receive_done (op->channel);
2121 * Handle a done message from a remote peer
2123 * @param cls the union operation
2124 * @param mh the message
2127 handle_union_p2p_done (void *cls,
2128 const struct GNUNET_MessageHeader *mh)
2130 struct Operation *op = cls;
2132 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2134 GNUNET_break_op (0);
2135 fail_union_operation (op);
2138 switch (op->state->phase)
2140 case PHASE_INVENTORY_PASSIVE:
2141 /* We got all requests, but still have to send our elements in response. */
2142 op->state->phase = PHASE_FINISH_WAITING;
2144 LOG (GNUNET_ERROR_TYPE_DEBUG,
2145 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2146 /* The active peer is done sending offers
2147 * and inquiries. This means that all
2148 * our responses to that (demands and offers)
2149 * must be in flight (queued or in mesh).
2151 * We should notify the active peer once
2152 * all our demands are satisfied, so that the active
2153 * peer can quit if we gave him everything.
2155 GNUNET_CADET_receive_done (op->channel);
2158 case PHASE_INVENTORY_ACTIVE:
2159 LOG (GNUNET_ERROR_TYPE_DEBUG,
2160 "got DONE (as active partner), waiting to finish\n");
2161 /* All demands of the other peer are satisfied,
2162 * and we processed all offers, thus we know
2163 * exactly what our demands must be.
2165 * We'll close the channel
2166 * to the other peer once our demands are met.
2168 op->state->phase = PHASE_FINISH_CLOSING;
2169 GNUNET_CADET_receive_done (op->channel);
2173 GNUNET_break_op (0);
2174 fail_union_operation (op);
2180 * Handle a over message from a remote peer
2182 * @param cls the union operation
2183 * @param mh the message
2186 handle_union_p2p_over (void *cls,
2187 const struct GNUNET_MessageHeader *mh)
2189 send_client_done (cls);
2194 * Initiate operation to evaluate a set union with a remote peer.
2196 * @param op operation to perform (to be initialized)
2197 * @param opaque_context message to be transmitted to the listener
2198 * to convince him to accept, may be NULL
2200 static struct OperationState *
2201 union_evaluate (struct Operation *op,
2202 const struct GNUNET_MessageHeader *opaque_context)
2204 struct OperationState *state;
2205 struct GNUNET_MQ_Envelope *ev;
2206 struct OperationRequestMessage *msg;
2208 ev = GNUNET_MQ_msg_nested_mh (msg,
2209 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2213 /* the context message is too large */
2217 state = GNUNET_new (struct OperationState);
2218 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2220 /* copy the current generation's strata estimator for this operation */
2221 state->se = strata_estimator_dup (op->set->state->se);
2222 /* we started the operation, thus we have to send the operation request */
2223 state->phase = PHASE_EXPECT_SE;
2224 state->salt_receive = state->salt_send = 42; // FIXME?????
2225 LOG (GNUNET_ERROR_TYPE_DEBUG,
2226 "Initiating union operation evaluation\n");
2227 GNUNET_STATISTICS_update (_GSS_statistics,
2228 "# of total union operations",
2231 GNUNET_STATISTICS_update (_GSS_statistics,
2232 "# of initiated union operations",
2235 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2236 GNUNET_MQ_send (op->mq,
2239 if (NULL != opaque_context)
2240 LOG (GNUNET_ERROR_TYPE_DEBUG,
2241 "sent op request with context message\n");
2243 LOG (GNUNET_ERROR_TYPE_DEBUG,
2244 "sent op request without context message\n");
2247 initialize_key_to_element (op);
2248 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2254 * Accept an union operation request from a remote peer.
2255 * Only initializes the private operation state.
2257 * @param op operation that will be accepted as a union operation
2259 static struct OperationState *
2260 union_accept (struct Operation *op)
2262 struct OperationState *state;
2263 const struct StrataEstimator *se;
2264 struct GNUNET_MQ_Envelope *ev;
2265 struct StrataEstimatorMessage *strata_msg;
2270 LOG (GNUNET_ERROR_TYPE_DEBUG,
2271 "accepting set union operation\n");
2272 GNUNET_STATISTICS_update (_GSS_statistics,
2273 "# of accepted union operations",
2276 GNUNET_STATISTICS_update (_GSS_statistics,
2277 "# of total union operations",
2281 state = GNUNET_new (struct OperationState);
2282 state->se = strata_estimator_dup (op->set->state->se);
2283 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2285 state->salt_receive = state->salt_send = 42; // FIXME?????
2287 initialize_key_to_element (op);
2288 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2290 /* kick off the operation */
2292 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2293 len = strata_estimator_write (se,
2295 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2296 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2298 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2299 ev = GNUNET_MQ_msg_extra (strata_msg,
2302 GNUNET_memcpy (&strata_msg[1],
2306 strata_msg->set_size
2307 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
2308 GNUNET_MQ_send (op->mq,
2310 state->phase = PHASE_EXPECT_IBF;
2316 * Create a new set supporting the union operation
2318 * We maintain one strata estimator per set and then manipulate it over the
2319 * lifetime of the set, as recreating a strata estimator would be expensive.
2321 * @return the newly created set, NULL on error
2323 static struct SetState *
2324 union_set_create (void)
2326 struct SetState *set_state;
2328 LOG (GNUNET_ERROR_TYPE_DEBUG,
2329 "union set created\n");
2330 set_state = GNUNET_new (struct SetState);
2331 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2332 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2333 if (NULL == set_state->se)
2335 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2336 "Failed to allocate strata estimator\n");
2337 GNUNET_free (set_state);
2345 * Add the element from the given element message to the set.
2347 * @param set_state state of the set want to add to
2348 * @param ee the element to add to the set
2351 union_add (struct SetState *set_state,
2352 struct ElementEntry *ee)
2354 strata_estimator_insert (set_state->se,
2355 get_ibf_key (&ee->element_hash));
2360 * Remove the element given in the element message from the set.
2361 * Only marks the element as removed, so that older set operations can still exchange it.
2363 * @param set_state state of the set to remove from
2364 * @param ee set element to remove
2367 union_remove (struct SetState *set_state,
2368 struct ElementEntry *ee)
2370 strata_estimator_remove (set_state->se,
2371 get_ibf_key (&ee->element_hash));
2376 * Destroy a set that supports the union operation.
2378 * @param set_state the set to destroy
2381 union_set_destroy (struct SetState *set_state)
2383 if (NULL != set_state->se)
2385 strata_estimator_destroy (set_state->se);
2386 set_state->se = NULL;
2388 GNUNET_free (set_state);
2393 * Copy union-specific set state.
2395 * @param state source state for copying the union state
2396 * @return a copy of the union-specific set state
2398 static struct SetState *
2399 union_copy_state (struct SetState *state)
2401 struct SetState *new_state;
2403 GNUNET_assert ( (NULL != state) &&
2404 (NULL != state->se) );
2405 new_state = GNUNET_new (struct SetState);
2406 new_state->se = strata_estimator_dup (state->se);
2413 * Handle case where channel went down for an operation.
2415 * @param op operation that lost the channel
2418 union_channel_death (struct Operation *op)
2420 send_client_done (op);
2421 _GSS_operation_destroy (op,
2427 * Get the table with implementing functions for
2430 * @return the operation specific VTable
2432 const struct SetVT *
2435 static const struct SetVT union_vt = {
2436 .create = &union_set_create,
2438 .remove = &union_remove,
2439 .destroy_set = &union_set_destroy,
2440 .evaluate = &union_evaluate,
2441 .accept = &union_accept,
2442 .cancel = &union_op_cancel,
2443 .copy_state = &union_copy_state,
2444 .channel_death = &union_channel_death