2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
16 * @file set/gnunet-service-set_union.c
17 * @brief two-peer set operations
18 * @author Florian Dold
19 * @author Christian Grothoff
22 #include "gnunet_util_lib.h"
23 #include "gnunet_statistics_service.h"
24 #include "gnunet-service-set.h"
26 #include "gnunet-service-set_union.h"
27 #include "gnunet-service-set_union_strata_estimator.h"
28 #include "gnunet-service-set_protocol.h"
32 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
36 * Number of IBFs in a strata estimator.
38 #define SE_STRATA_COUNT 32
41 * Size of the IBFs in the strata estimator.
43 #define SE_IBF_SIZE 80
46 * The hash num parameter for the difference digests and strata estimators.
48 #define SE_IBF_HASH_NUM 4
51 * Number of buckets that can be transmitted in one message.
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
56 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57 * Choose this value so that computing the IBF is still cheaper
58 * than transmitting all values.
60 #define MAX_IBF_ORDER (20)
63 * Number of buckets used in the ibf per estimated
70 * Current phase we are in for a union operation.
72 enum UnionOperationPhase
75 * We sent the request message, and expect a strata estimator.
80 * We sent the strata estimator, and expect an IBF. This phase is entered once
81 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
83 * XXX: could use better wording.
84 * XXX: repurposed to also expect a "request full set" message, should be renamed
86 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
91 * Continuation for multi part IBFs.
93 PHASE_EXPECT_IBF_CONT,
96 * We are decoding an IBF.
98 PHASE_INVENTORY_ACTIVE,
101 * The other peer is decoding the IBF we just sent.
103 PHASE_INVENTORY_PASSIVE,
106 * The protocol is almost finished, but we still have to flush our message
107 * queue and/or expect some elements.
109 PHASE_FINISH_CLOSING,
112 * In the penultimate phase,
113 * we wait until all our demands
114 * are satisfied. Then we send a done
115 * message, and wait for another done message.
117 PHASE_FINISH_WAITING,
120 * In the ultimate phase, we wait until
121 * our demands are satisfied and then
122 * quit (sending another DONE message).
127 * After sending the full set, wait for responses with the elements
128 * that the local peer is missing.
135 * State of an evaluate operation with another peer.
137 struct OperationState
140 * Copy of the set's strata estimator at the time of
141 * creation of this operation.
143 struct StrataEstimator *se;
146 * The IBF we currently receive.
148 struct InvertibleBloomFilter *remote_ibf;
151 * The IBF with the local set's element.
153 struct InvertibleBloomFilter *local_ibf;
156 * Maps unsalted IBF-Keys to elements.
157 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
158 * Colliding IBF-Keys are linked.
160 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
163 * Current state of the operation.
165 enum UnionOperationPhase phase;
168 * Did we send the client that we are done?
170 int client_done_sent;
173 * Number of ibf buckets already received into the @a remote_ibf.
175 unsigned int ibf_buckets_received;
178 * Hashes for elements that we have demanded from the other peer.
180 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
183 * Salt that we're using for sending IBFs
188 * Salt for the IBF we've received and that we're currently decoding.
190 uint32_t salt_receive;
193 * Number of elements we received from the other peer
194 * that were not in the local set yet.
196 uint32_t received_fresh;
199 * Total number of elements received from the other peer.
201 uint32_t received_total;
204 * Initial size of our set, just before
205 * the operation started.
207 uint64_t initial_size;
212 * The key entry is used to associate an ibf key with an element.
217 * IBF key for the entry, derived from the current salt.
219 struct IBF_Key ibf_key;
222 * The actual element associated with the key.
224 * Only owned by the union operation if element->operation
227 struct ElementEntry *element;
230 * Did we receive this element?
231 * Even if element->is_foreign is false, we might
232 * have received the element, so this indicates that
233 * the other peer has it.
240 * Used as a closure for sending elements
241 * with a specific IBF key.
243 struct SendElementClosure
246 * The IBF key whose matching elements should be
249 struct IBF_Key ibf_key;
252 * Operation for which the elements
255 struct Operation *op;
260 * Extra state required for efficient set union.
265 * The strata estimator is only generated once for
267 * The IBF keys are derived from the element hashes with
270 struct StrataEstimator *se;
275 * Iterator over hash map entries, called to
276 * destroy the linked list of colliding ibf key entries.
279 * @param key current key code
280 * @param value value in the hash map
281 * @return #GNUNET_YES if we should continue to iterate,
285 destroy_key_to_element_iter (void *cls,
289 struct KeyEntry *k = value;
291 GNUNET_assert (NULL != k);
292 if (GNUNET_YES == k->element->remote)
294 GNUNET_free (k->element);
303 * Destroy the union operation. Only things specific to the union
304 * operation are destroyed.
306 * @param op union operation to destroy
309 union_op_cancel (struct Operation *op)
311 LOG (GNUNET_ERROR_TYPE_DEBUG,
312 "destroying union op\n");
313 /* check if the op was canceled twice */
314 GNUNET_assert (NULL != op->state);
315 if (NULL != op->state->remote_ibf)
317 ibf_destroy (op->state->remote_ibf);
318 op->state->remote_ibf = NULL;
320 if (NULL != op->state->demanded_hashes)
322 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
323 op->state->demanded_hashes = NULL;
325 if (NULL != op->state->local_ibf)
327 ibf_destroy (op->state->local_ibf);
328 op->state->local_ibf = NULL;
330 if (NULL != op->state->se)
332 strata_estimator_destroy (op->state->se);
333 op->state->se = NULL;
335 if (NULL != op->state->key_to_element)
337 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
338 &destroy_key_to_element_iter,
340 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
341 op->state->key_to_element = NULL;
343 GNUNET_free (op->state);
345 LOG (GNUNET_ERROR_TYPE_DEBUG,
346 "destroying union op done\n");
351 * Inform the client that the union operation has failed,
352 * and proceed to destroy the evaluate operation.
354 * @param op the union operation to fail
357 fail_union_operation (struct Operation *op)
359 struct GNUNET_MQ_Envelope *ev;
360 struct GNUNET_SET_ResultMessage *msg;
362 LOG (GNUNET_ERROR_TYPE_WARNING,
363 "union operation failed\n");
364 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
365 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
366 msg->request_id = htonl (op->client_request_id);
367 msg->element_type = htons (0);
368 GNUNET_MQ_send (op->set->cs->mq,
370 _GSS_operation_destroy (op, GNUNET_YES);
375 * Derive the IBF key from a hash code and
378 * @param src the hash code
379 * @return the derived IBF key
381 static struct IBF_Key
382 get_ibf_key (const struct GNUNET_HashCode *src)
387 GNUNET_assert (GNUNET_OK ==
388 GNUNET_CRYPTO_kdf (&key, sizeof (key),
390 &salt, sizeof (salt),
397 * Context for #op_get_element_iterator
399 struct GetElementContext
404 struct GNUNET_HashCode hash;
414 * Iterator over the mapping from IBF keys to element entries. Checks if we
415 * have an element with a given GNUNET_HashCode.
418 * @param key current key code
419 * @param value value in the hash map
420 * @return #GNUNET_YES if we should search further,
421 * #GNUNET_NO if we've found the element.
424 op_get_element_iterator (void *cls,
428 struct GetElementContext *ctx = cls;
429 struct KeyEntry *k = value;
431 GNUNET_assert (NULL != k);
432 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
443 * Determine whether the given element is already in the operation's element
446 * @param op operation that should be tested for 'element_hash'
447 * @param element_hash hash of the element to look for
448 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
450 static struct KeyEntry *
451 op_get_element (struct Operation *op,
452 const struct GNUNET_HashCode *element_hash)
455 struct IBF_Key ibf_key;
456 struct GetElementContext ctx = {{{ 0 }} , 0};
458 ctx.hash = *element_hash;
460 ibf_key = get_ibf_key (element_hash);
461 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
462 (uint32_t) ibf_key.key_val,
463 op_get_element_iterator,
466 /* was the iteration aborted because we found the element? */
467 if (GNUNET_SYSERR == ret)
469 GNUNET_assert (NULL != ctx.k);
477 * Insert an element into the union operation's
478 * key-to-element mapping. Takes ownership of 'ee'.
479 * Note that this does not insert the element in the set,
480 * only in the operation's key-element mapping.
481 * This is done to speed up re-tried operations, if some elements
482 * were transmitted, and then the IBF fails to decode.
484 * XXX: clarify ownership, doesn't sound right.
486 * @param op the union operation
487 * @param ee the element entry
488 * @parem received was this element received from the remote peer?
491 op_register_element (struct Operation *op,
492 struct ElementEntry *ee,
495 struct IBF_Key ibf_key;
498 ibf_key = get_ibf_key (&ee->element_hash);
499 k = GNUNET_new (struct KeyEntry);
501 k->ibf_key = ibf_key;
502 k->received = received;
503 GNUNET_assert (GNUNET_OK ==
504 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
505 (uint32_t) ibf_key.key_val,
507 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
515 salt_key (const struct IBF_Key *k_in,
517 struct IBF_Key *k_out)
520 uint64_t x = k_in->key_val;
522 x = (x >> s) | (x << (64 - s));
531 unsalt_key (const struct IBF_Key *k_in,
533 struct IBF_Key *k_out)
536 uint64_t x = k_in->key_val;
537 x = (x << s) | (x >> (64 - s));
543 * Insert a key into an ibf.
547 * @param value the key entry to get the key from
550 prepare_ibf_iterator (void *cls,
554 struct Operation *op = cls;
555 struct KeyEntry *ke = value;
556 struct IBF_Key salted_key;
558 LOG (GNUNET_ERROR_TYPE_DEBUG,
559 "[OP %x] inserting %lx (hash %s) into ibf\n",
561 (unsigned long) ke->ibf_key.key_val,
562 GNUNET_h2s (&ke->element->element_hash));
563 salt_key (&ke->ibf_key,
564 op->state->salt_send,
566 ibf_insert (op->state->local_ibf, salted_key);
572 * Iterator for initializing the
573 * key-to-element mapping of a union operation
575 * @param cls the union operation `struct Operation *`
577 * @param value the `struct ElementEntry *` to insert
578 * into the key-to-element mapping
579 * @return #GNUNET_YES (to continue iterating)
582 init_key_to_element_iterator (void *cls,
583 const struct GNUNET_HashCode *key,
586 struct Operation *op = cls;
587 struct ElementEntry *ee = value;
589 /* make sure that the element belongs to the set at the time
590 * of creating the operation */
592 _GSS_is_element_of_operation (ee,
595 GNUNET_assert (GNUNET_NO == ee->remote);
596 op_register_element (op,
604 * Initialize the IBF key to element mapping local to this set
607 * @param op the set union operation
610 initialize_key_to_element (struct Operation *op)
614 GNUNET_assert (NULL == op->state->key_to_element);
615 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
616 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
617 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
618 &init_key_to_element_iterator,
624 * Create an ibf with the operation's elements
625 * of the specified size
627 * @param op the union operation
628 * @param size size of the ibf to create
629 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
632 prepare_ibf (struct Operation *op,
635 GNUNET_assert (NULL != op->state->key_to_element);
637 if (NULL != op->state->local_ibf)
638 ibf_destroy (op->state->local_ibf);
639 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
640 if (NULL == op->state->local_ibf)
642 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
643 "Failed to allocate local IBF\n");
644 return GNUNET_SYSERR;
646 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
647 &prepare_ibf_iterator,
654 * Send an ibf of appropriate size.
656 * Fragments the IBF into multiple messages if necessary.
658 * @param op the union operation
659 * @param ibf_order order of the ibf to send, size=2^order
660 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
663 send_ibf (struct Operation *op,
666 unsigned int buckets_sent = 0;
667 struct InvertibleBloomFilter *ibf;
670 prepare_ibf (op, 1<<ibf_order))
672 /* allocation failed */
673 return GNUNET_SYSERR;
676 LOG (GNUNET_ERROR_TYPE_DEBUG,
677 "sending ibf of size %u\n",
681 char name[64] = { 0 };
682 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
683 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
686 ibf = op->state->local_ibf;
688 while (buckets_sent < (1 << ibf_order))
690 unsigned int buckets_in_message;
691 struct GNUNET_MQ_Envelope *ev;
692 struct IBFMessage *msg;
694 buckets_in_message = (1 << ibf_order) - buckets_sent;
695 /* limit to maximum */
696 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
697 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
699 ev = GNUNET_MQ_msg_extra (msg,
700 buckets_in_message * IBF_BUCKET_SIZE,
701 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
704 msg->order = ibf_order;
705 msg->offset = htonl (buckets_sent);
706 msg->salt = htonl (op->state->salt_send);
707 ibf_write_slice (ibf, buckets_sent,
708 buckets_in_message, &msg[1]);
709 buckets_sent += buckets_in_message;
710 LOG (GNUNET_ERROR_TYPE_DEBUG,
711 "ibf chunk size %u, %u/%u sent\n",
715 GNUNET_MQ_send (op->mq, ev);
718 /* The other peer must decode the IBF, so
720 op->state->phase = PHASE_INVENTORY_PASSIVE;
726 * Compute the necessary order of an ibf
727 * from the size of the symmetric set difference.
729 * @param diff the difference
730 * @return the required size of the ibf
733 get_order_from_difference (unsigned int diff)
735 unsigned int ibf_order;
738 while ( ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
739 ((1<<ibf_order) < SE_IBF_HASH_NUM) ) &&
740 (ibf_order < MAX_IBF_ORDER) )
742 // add one for correction
743 return ibf_order + 1;
748 * Send a set element.
750 * @param cls the union operation `struct Operation *`
752 * @param value the `struct ElementEntry *` to insert
753 * into the key-to-element mapping
754 * @return #GNUNET_YES (to continue iterating)
757 send_full_element_iterator (void *cls,
758 const struct GNUNET_HashCode *key,
761 struct Operation *op = cls;
762 struct GNUNET_SET_ElementMessage *emsg;
763 struct ElementEntry *ee = value;
764 struct GNUNET_SET_Element *el = &ee->element;
765 struct GNUNET_MQ_Envelope *ev;
767 LOG (GNUNET_ERROR_TYPE_DEBUG,
768 "Sending element %s\n",
770 ev = GNUNET_MQ_msg_extra (emsg,
772 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
773 emsg->element_type = htons (el->element_type);
774 GNUNET_memcpy (&emsg[1],
777 GNUNET_MQ_send (op->mq,
784 * Switch to full set transmission for @a op.
786 * @param op operation to switch to full set transmission.
789 send_full_set (struct Operation *op)
791 struct GNUNET_MQ_Envelope *ev;
793 op->state->phase = PHASE_FULL_SENDING;
794 LOG (GNUNET_ERROR_TYPE_DEBUG,
795 "Dedicing to transmit the full set\n");
796 /* FIXME: use a more memory-friendly way of doing this with an
797 iterator, just as we do in the non-full case! */
798 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
799 &send_full_element_iterator,
801 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
802 GNUNET_MQ_send (op->mq,
808 * Handle a strata estimator from a remote peer
810 * @param cls the union operation
811 * @param msg the message
814 check_union_p2p_strata_estimator (void *cls,
815 const struct StrataEstimatorMessage *msg)
817 struct Operation *op = cls;
821 if (op->state->phase != PHASE_EXPECT_SE)
824 return GNUNET_SYSERR;
826 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
827 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
828 if ( (GNUNET_NO == is_compressed) &&
829 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
832 return GNUNET_SYSERR;
839 * Handle a strata estimator from a remote peer
841 * @param cls the union operation
842 * @param msg the message
845 handle_union_p2p_strata_estimator (void *cls,
846 const struct StrataEstimatorMessage *msg)
848 struct Operation *op = cls;
849 struct StrataEstimator *remote_se;
855 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
856 GNUNET_STATISTICS_update (_GSS_statistics,
857 "# bytes of SE received",
858 ntohs (msg->header.size),
860 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
861 other_size = GNUNET_ntohll (msg->set_size);
862 remote_se = strata_estimator_create (SE_STRATA_COUNT,
865 if (NULL == remote_se)
867 /* insufficient resources, fail */
868 fail_union_operation (op);
872 strata_estimator_read (&msg[1],
877 /* decompression failed */
878 strata_estimator_destroy (remote_se);
879 fail_union_operation (op);
882 GNUNET_assert (NULL != op->state->se);
883 diff = strata_estimator_difference (remote_se,
889 strata_estimator_destroy (remote_se);
890 strata_estimator_destroy (op->state->se);
891 op->state->se = NULL;
892 LOG (GNUNET_ERROR_TYPE_DEBUG,
893 "got se diff=%d, using ibf size %d\n",
895 1U << get_order_from_difference (diff));
900 set_debug = getenv ("GNUNET_SET_BENCHMARK");
901 if ( (NULL != set_debug) &&
902 (0 == strcmp (set_debug, "1")) )
904 FILE *f = fopen ("set.log", "a");
905 fprintf (f, "%llu\n", (unsigned long long) diff);
910 if ( (GNUNET_YES == op->byzantine) &&
911 (other_size < op->byzantine_lower_bound) )
914 fail_union_operation (op);
918 if ( (GNUNET_YES == op->force_full) ||
919 (diff > op->state->initial_size / 4) ||
922 LOG (GNUNET_ERROR_TYPE_DEBUG,
923 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
925 op->state->initial_size);
926 GNUNET_STATISTICS_update (_GSS_statistics,
930 if ( (op->state->initial_size <= other_size) ||
937 struct GNUNET_MQ_Envelope *ev;
939 LOG (GNUNET_ERROR_TYPE_DEBUG,
940 "Telling other peer that we expect its full set\n");
941 op->state->phase = PHASE_EXPECT_IBF;
942 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
943 GNUNET_MQ_send (op->mq,
949 GNUNET_STATISTICS_update (_GSS_statistics,
955 get_order_from_difference (diff)))
957 /* Internal error, best we can do is shut the connection */
958 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
959 "Failed to send IBF, closing connection\n");
960 fail_union_operation (op);
964 GNUNET_CADET_receive_done (op->channel);
969 * Iterator to send elements to a remote peer
971 * @param cls closure with the element key and the union operation
973 * @param value the key entry
976 send_offers_iterator (void *cls,
980 struct SendElementClosure *sec = cls;
981 struct Operation *op = sec->op;
982 struct KeyEntry *ke = value;
983 struct GNUNET_MQ_Envelope *ev;
984 struct GNUNET_MessageHeader *mh;
986 /* Detect 32-bit key collision for the 64-bit IBF keys. */
987 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
990 ev = GNUNET_MQ_msg_header_extra (mh,
991 sizeof (struct GNUNET_HashCode),
992 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
994 GNUNET_assert (NULL != ev);
995 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
996 LOG (GNUNET_ERROR_TYPE_DEBUG,
997 "[OP %x] sending element offer (%s) to peer\n",
999 GNUNET_h2s (&ke->element->element_hash));
1000 GNUNET_MQ_send (op->mq, ev);
1006 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1008 * @param op union operation
1009 * @param ibf_key IBF key of interest
1012 send_offers_for_key (struct Operation *op,
1013 struct IBF_Key ibf_key)
1015 struct SendElementClosure send_cls;
1017 send_cls.ibf_key = ibf_key;
1019 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
1020 (uint32_t) ibf_key.key_val,
1021 &send_offers_iterator,
1027 * Decode which elements are missing on each side, and
1028 * send the appropriate offers and inquiries.
1030 * @param op union operation
1031 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1034 decode_and_send (struct Operation *op)
1037 struct IBF_Key last_key;
1039 unsigned int num_decoded;
1040 struct InvertibleBloomFilter *diff_ibf;
1042 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1046 op->state->remote_ibf->size))
1049 /* allocation failed */
1050 return GNUNET_SYSERR;
1052 diff_ibf = ibf_dup (op->state->local_ibf);
1053 ibf_subtract (diff_ibf,
1054 op->state->remote_ibf);
1056 ibf_destroy (op->state->remote_ibf);
1057 op->state->remote_ibf = NULL;
1059 LOG (GNUNET_ERROR_TYPE_DEBUG,
1060 "decoding IBF (size=%u)\n",
1064 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1069 int cycle_detected = GNUNET_NO;
1073 res = ibf_decode (diff_ibf, &side, &key);
1074 if (res == GNUNET_OK)
1076 LOG (GNUNET_ERROR_TYPE_DEBUG,
1077 "decoded ibf key %lx\n",
1078 (unsigned long) key.key_val);
1080 if ( (num_decoded > diff_ibf->size) ||
1081 ( (num_decoded > 1) &&
1082 (last_key.key_val == key.key_val) ) )
1084 LOG (GNUNET_ERROR_TYPE_DEBUG,
1085 "detected cyclic ibf (decoded %u/%u)\n",
1088 cycle_detected = GNUNET_YES;
1091 if ( (GNUNET_SYSERR == res) ||
1092 (GNUNET_YES == cycle_detected) )
1096 while (1<<next_order < diff_ibf->size)
1099 if (next_order <= MAX_IBF_ORDER)
1101 LOG (GNUNET_ERROR_TYPE_DEBUG,
1102 "decoding failed, sending larger ibf (size %u)\n",
1104 GNUNET_STATISTICS_update (_GSS_statistics,
1108 op->state->salt_send++;
1110 send_ibf (op, next_order))
1112 /* Internal error, best we can do is shut the connection */
1113 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1114 "Failed to send IBF, closing connection\n");
1115 fail_union_operation (op);
1116 ibf_destroy (diff_ibf);
1117 return GNUNET_SYSERR;
1122 GNUNET_STATISTICS_update (_GSS_statistics,
1123 "# of failed union operations (too large)",
1126 // XXX: Send the whole set, element-by-element
1127 LOG (GNUNET_ERROR_TYPE_ERROR,
1128 "set union failed: reached ibf limit\n");
1129 fail_union_operation (op);
1130 ibf_destroy (diff_ibf);
1131 return GNUNET_SYSERR;
1135 if (GNUNET_NO == res)
1137 struct GNUNET_MQ_Envelope *ev;
1139 LOG (GNUNET_ERROR_TYPE_DEBUG,
1140 "transmitted all values, sending DONE\n");
1141 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1142 GNUNET_MQ_send (op->mq, ev);
1143 /* We now wait until we get a DONE message back
1144 * and then wait for our MQ to be flushed and all our
1145 * demands be delivered. */
1150 struct IBF_Key unsalted_key;
1153 op->state->salt_receive,
1155 send_offers_for_key (op,
1158 else if (-1 == side)
1160 struct GNUNET_MQ_Envelope *ev;
1161 struct InquiryMessage *msg;
1163 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1164 * the effort additional complexity. */
1165 ev = GNUNET_MQ_msg_extra (msg,
1166 sizeof (struct IBF_Key),
1167 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1168 msg->salt = htonl (op->state->salt_receive);
1169 GNUNET_memcpy (&msg[1],
1171 sizeof (struct IBF_Key));
1172 LOG (GNUNET_ERROR_TYPE_DEBUG,
1173 "sending element inquiry for IBF key %lx\n",
1174 (unsigned long) key.key_val);
1175 GNUNET_MQ_send (op->mq, ev);
1182 ibf_destroy (diff_ibf);
1188 * Check an IBF message from a remote peer.
1190 * Reassemble the IBF from multiple pieces, and
1191 * process the whole IBF once possible.
1193 * @param cls the union operation
1194 * @param msg the header of the message
1195 * @return #GNUNET_OK if @a msg is well-formed
1198 check_union_p2p_ibf (void *cls,
1199 const struct IBFMessage *msg)
1201 struct Operation *op = cls;
1202 unsigned int buckets_in_message;
1204 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1206 GNUNET_break_op (0);
1207 return GNUNET_SYSERR;
1209 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1210 if (0 == buckets_in_message)
1212 GNUNET_break_op (0);
1213 return GNUNET_SYSERR;
1215 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1217 GNUNET_break_op (0);
1218 return GNUNET_SYSERR;
1220 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1222 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1224 GNUNET_break_op (0);
1225 return GNUNET_SYSERR;
1227 if (1<<msg->order != op->state->remote_ibf->size)
1229 GNUNET_break_op (0);
1230 return GNUNET_SYSERR;
1232 if (ntohl (msg->salt) != op->state->salt_receive)
1234 GNUNET_break_op (0);
1235 return GNUNET_SYSERR;
1238 else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1239 (op->state->phase != PHASE_EXPECT_IBF) )
1241 GNUNET_break_op (0);
1242 return GNUNET_SYSERR;
1250 * Handle an IBF message from a remote peer.
1252 * Reassemble the IBF from multiple pieces, and
1253 * process the whole IBF once possible.
1255 * @param cls the union operation
1256 * @param msg the header of the message
1259 handle_union_p2p_ibf (void *cls,
1260 const struct IBFMessage *msg)
1262 struct Operation *op = cls;
1263 unsigned int buckets_in_message;
1265 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1266 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1267 (op->state->phase == PHASE_EXPECT_IBF) )
1269 op->state->phase = PHASE_EXPECT_IBF_CONT;
1270 GNUNET_assert (NULL == op->state->remote_ibf);
1271 LOG (GNUNET_ERROR_TYPE_DEBUG,
1272 "Creating new ibf of size %u\n",
1274 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1275 op->state->salt_receive = ntohl (msg->salt);
1276 LOG (GNUNET_ERROR_TYPE_DEBUG,
1277 "Receiving new IBF with salt %u\n",
1278 op->state->salt_receive);
1279 if (NULL == op->state->remote_ibf)
1281 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1282 "Failed to parse remote IBF, closing connection\n");
1283 fail_union_operation (op);
1286 op->state->ibf_buckets_received = 0;
1287 if (0 != ntohl (msg->offset))
1289 GNUNET_break_op (0);
1290 fail_union_operation (op);
1296 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1297 LOG (GNUNET_ERROR_TYPE_DEBUG,
1298 "Received more of IBF\n");
1300 GNUNET_assert (NULL != op->state->remote_ibf);
1302 ibf_read_slice (&msg[1],
1303 op->state->ibf_buckets_received,
1305 op->state->remote_ibf);
1306 op->state->ibf_buckets_received += buckets_in_message;
1308 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1310 LOG (GNUNET_ERROR_TYPE_DEBUG,
1311 "received full ibf\n");
1312 op->state->phase = PHASE_INVENTORY_ACTIVE;
1314 decode_and_send (op))
1316 /* Internal error, best we can do is shut down */
1317 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1318 "Failed to decode IBF, closing connection\n");
1319 fail_union_operation (op);
1323 GNUNET_CADET_receive_done (op->channel);
1328 * Send a result message to the client indicating
1329 * that there is a new element.
1331 * @param op union operation
1332 * @param element element to send
1333 * @param status status to send with the new element
1336 send_client_element (struct Operation *op,
1337 struct GNUNET_SET_Element *element,
1340 struct GNUNET_MQ_Envelope *ev;
1341 struct GNUNET_SET_ResultMessage *rm;
1343 LOG (GNUNET_ERROR_TYPE_DEBUG,
1344 "sending element (size %u) to client\n",
1346 GNUNET_assert (0 != op->client_request_id);
1347 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1350 GNUNET_MQ_discard (ev);
1354 rm->result_status = htons (status);
1355 rm->request_id = htonl (op->client_request_id);
1356 rm->element_type = htons (element->element_type);
1357 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1358 GNUNET_memcpy (&rm[1],
1361 GNUNET_MQ_send (op->set->cs->mq,
1367 * Destroy remote channel.
1369 * @param op operation
1371 void destroy_channel (struct Operation *op)
1373 struct GNUNET_CADET_Channel *channel;
1375 if (NULL != (channel = op->channel))
1377 /* This will free op; called conditionally as this helper function
1378 is also called from within the channel disconnect handler. */
1380 GNUNET_CADET_channel_destroy (channel);
1386 * Signal to the client that the operation has finished and
1387 * destroy the operation.
1389 * @param cls operation to destroy
1392 send_client_done (void *cls)
1394 struct Operation *op = cls;
1395 struct GNUNET_MQ_Envelope *ev;
1396 struct GNUNET_SET_ResultMessage *rm;
1398 if (GNUNET_YES == op->state->client_done_sent) {
1402 if (PHASE_DONE != op->state->phase) {
1403 LOG (GNUNET_ERROR_TYPE_WARNING,
1404 "union operation failed\n");
1405 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1406 rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1407 rm->request_id = htonl (op->client_request_id);
1408 rm->element_type = htons (0);
1409 GNUNET_MQ_send (op->set->cs->mq,
1414 op->state->client_done_sent = GNUNET_YES;
1416 LOG (GNUNET_ERROR_TYPE_INFO,
1417 "Signalling client that union operation is done\n");
1418 ev = GNUNET_MQ_msg (rm,
1419 GNUNET_MESSAGE_TYPE_SET_RESULT);
1420 rm->request_id = htonl (op->client_request_id);
1421 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1422 rm->element_type = htons (0);
1423 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1424 GNUNET_MQ_send (op->set->cs->mq,
1430 * Tests if the operation is finished, and if so notify.
1432 * @param op operation to check
1435 maybe_finish (struct Operation *op)
1437 unsigned int num_demanded;
1439 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1441 if (PHASE_FINISH_WAITING == op->state->phase)
1443 LOG (GNUNET_ERROR_TYPE_DEBUG,
1444 "In PHASE_FINISH_WAITING, pending %u demands\n",
1446 if (0 == num_demanded)
1448 struct GNUNET_MQ_Envelope *ev;
1450 op->state->phase = PHASE_DONE;
1451 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1452 GNUNET_MQ_send (op->mq,
1454 /* We now wait until the other peer sends P2P_OVER
1455 * after it got all elements from us. */
1458 if (PHASE_FINISH_CLOSING == op->state->phase)
1460 LOG (GNUNET_ERROR_TYPE_DEBUG,
1461 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1463 if (0 == num_demanded)
1465 op->state->phase = PHASE_DONE;
1466 send_client_done (op);
1467 destroy_channel (op);
1474 * Check an element message from a remote peer.
1476 * @param cls the union operation
1477 * @param emsg the message
1480 check_union_p2p_elements (void *cls,
1481 const struct GNUNET_SET_ElementMessage *emsg)
1483 struct Operation *op = cls;
1485 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1487 GNUNET_break_op (0);
1488 return GNUNET_SYSERR;
1490 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1492 GNUNET_break_op (0);
1493 return GNUNET_SYSERR;
1500 * Handle an element message from a remote peer.
1501 * Sent by the other peer either because we decoded an IBF and placed a demand,
1502 * or because the other peer switched to full set transmission.
1504 * @param cls the union operation
1505 * @param emsg the message
1508 handle_union_p2p_elements (void *cls,
1509 const struct GNUNET_SET_ElementMessage *emsg)
1511 struct Operation *op = cls;
1512 struct ElementEntry *ee;
1513 struct KeyEntry *ke;
1514 uint16_t element_size;
1516 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1517 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1518 GNUNET_memcpy (&ee[1],
1521 ee->element.size = element_size;
1522 ee->element.data = &ee[1];
1523 ee->element.element_type = ntohs (emsg->element_type);
1524 ee->remote = GNUNET_YES;
1525 GNUNET_SET_element_hash (&ee->element,
1528 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1532 /* We got something we didn't demand, since it's not in our map. */
1533 GNUNET_break_op (0);
1534 fail_union_operation (op);
1538 LOG (GNUNET_ERROR_TYPE_DEBUG,
1539 "Got element (size %u, hash %s) from peer\n",
1540 (unsigned int) element_size,
1541 GNUNET_h2s (&ee->element_hash));
1543 GNUNET_STATISTICS_update (_GSS_statistics,
1544 "# received elements",
1547 GNUNET_STATISTICS_update (_GSS_statistics,
1548 "# exchanged elements",
1552 op->state->received_total++;
1554 ke = op_get_element (op, &ee->element_hash);
1557 /* Got repeated element. Should not happen since
1558 * we track demands. */
1559 GNUNET_STATISTICS_update (_GSS_statistics,
1560 "# repeated elements",
1563 ke->received = GNUNET_YES;
1568 LOG (GNUNET_ERROR_TYPE_DEBUG,
1569 "Registering new element from remote peer\n");
1570 op->state->received_fresh++;
1571 op_register_element (op, ee, GNUNET_YES);
1572 /* only send results immediately if the client wants it */
1573 switch (op->result_mode)
1575 case GNUNET_SET_RESULT_ADDED:
1576 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1578 case GNUNET_SET_RESULT_SYMMETRIC:
1579 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1582 /* Result mode not supported, should have been caught earlier. */
1588 if ( (op->state->received_total > 8) &&
1589 (op->state->received_fresh < op->state->received_total / 3) )
1591 /* The other peer gave us lots of old elements, there's something wrong. */
1592 GNUNET_break_op (0);
1593 fail_union_operation (op);
1596 GNUNET_CADET_receive_done (op->channel);
1602 * Check a full element message from a remote peer.
1604 * @param cls the union operation
1605 * @param emsg the message
1608 check_union_p2p_full_element (void *cls,
1609 const struct GNUNET_SET_ElementMessage *emsg)
1611 struct Operation *op = cls;
1613 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1615 GNUNET_break_op (0);
1616 return GNUNET_SYSERR;
1618 // FIXME: check that we expect full elements here?
1624 * Handle an element message from a remote peer.
1626 * @param cls the union operation
1627 * @param emsg the message
1630 handle_union_p2p_full_element (void *cls,
1631 const struct GNUNET_SET_ElementMessage *emsg)
1633 struct Operation *op = cls;
1634 struct ElementEntry *ee;
1635 struct KeyEntry *ke;
1636 uint16_t element_size;
1638 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1639 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1640 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1641 ee->element.size = element_size;
1642 ee->element.data = &ee[1];
1643 ee->element.element_type = ntohs (emsg->element_type);
1644 ee->remote = GNUNET_YES;
1645 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1647 LOG (GNUNET_ERROR_TYPE_DEBUG,
1648 "Got element (full diff, size %u, hash %s) from peer\n",
1649 (unsigned int) element_size,
1650 GNUNET_h2s (&ee->element_hash));
1652 GNUNET_STATISTICS_update (_GSS_statistics,
1653 "# received elements",
1656 GNUNET_STATISTICS_update (_GSS_statistics,
1657 "# exchanged elements",
1661 op->state->received_total++;
1663 ke = op_get_element (op, &ee->element_hash);
1666 /* Got repeated element. Should not happen since
1667 * we track demands. */
1668 GNUNET_STATISTICS_update (_GSS_statistics,
1669 "# repeated elements",
1672 ke->received = GNUNET_YES;
1677 LOG (GNUNET_ERROR_TYPE_DEBUG,
1678 "Registering new element from remote peer\n");
1679 op->state->received_fresh++;
1680 op_register_element (op, ee, GNUNET_YES);
1681 /* only send results immediately if the client wants it */
1682 switch (op->result_mode)
1684 case GNUNET_SET_RESULT_ADDED:
1685 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1687 case GNUNET_SET_RESULT_SYMMETRIC:
1688 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1691 /* Result mode not supported, should have been caught earlier. */
1697 if ( (GNUNET_YES == op->byzantine) &&
1698 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1699 (op->state->received_fresh < op->state->received_total / 6) )
1701 /* The other peer gave us lots of old elements, there's something wrong. */
1702 LOG (GNUNET_ERROR_TYPE_ERROR,
1703 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1704 (unsigned long long) op->state->received_fresh,
1705 (unsigned long long) op->state->received_total);
1706 GNUNET_break_op (0);
1707 fail_union_operation (op);
1710 GNUNET_CADET_receive_done (op->channel);
1715 * Send offers (for GNUNET_Hash-es) in response
1716 * to inquiries (for IBF_Key-s).
1718 * @param cls the union operation
1719 * @param msg the message
1722 check_union_p2p_inquiry (void *cls,
1723 const struct InquiryMessage *msg)
1725 struct Operation *op = cls;
1726 unsigned int num_keys;
1728 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1730 GNUNET_break_op (0);
1731 return GNUNET_SYSERR;
1733 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1735 GNUNET_break_op (0);
1736 return GNUNET_SYSERR;
1738 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1739 / sizeof (struct IBF_Key);
1740 if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1741 != num_keys * sizeof (struct IBF_Key))
1743 GNUNET_break_op (0);
1744 return GNUNET_SYSERR;
1751 * Send offers (for GNUNET_Hash-es) in response
1752 * to inquiries (for IBF_Key-s).
1754 * @param cls the union operation
1755 * @param msg the message
1758 handle_union_p2p_inquiry (void *cls,
1759 const struct InquiryMessage *msg)
1761 struct Operation *op = cls;
1762 const struct IBF_Key *ibf_key;
1763 unsigned int num_keys;
1765 LOG (GNUNET_ERROR_TYPE_DEBUG,
1766 "Received union inquiry\n");
1767 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1768 / sizeof (struct IBF_Key);
1769 ibf_key = (const struct IBF_Key *) &msg[1];
1770 while (0 != num_keys--)
1772 struct IBF_Key unsalted_key;
1774 unsalt_key (ibf_key,
1777 send_offers_for_key (op,
1781 GNUNET_CADET_receive_done (op->channel);
1786 * Iterator over hash map entries, called to
1787 * destroy the linked list of colliding ibf key entries.
1789 * @param cls closure
1790 * @param key current key code
1791 * @param value value in the hash map
1792 * @return #GNUNET_YES if we should continue to iterate,
1793 * #GNUNET_NO if not.
1796 send_missing_full_elements_iter (void *cls,
1800 struct Operation *op = cls;
1801 struct KeyEntry *ke = value;
1802 struct GNUNET_MQ_Envelope *ev;
1803 struct GNUNET_SET_ElementMessage *emsg;
1804 struct ElementEntry *ee = ke->element;
1806 if (GNUNET_YES == ke->received)
1808 ev = GNUNET_MQ_msg_extra (emsg,
1810 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1811 GNUNET_memcpy (&emsg[1],
1814 emsg->element_type = htons (ee->element.element_type);
1815 GNUNET_MQ_send (op->mq,
1822 * Handle a request for full set transmission.
1824 * @parem cls closure, a set union operation
1825 * @param mh the demand message
1828 handle_union_p2p_request_full (void *cls,
1829 const struct GNUNET_MessageHeader *mh)
1831 struct Operation *op = cls;
1833 LOG (GNUNET_ERROR_TYPE_DEBUG,
1834 "Received request for full set transmission\n");
1835 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1837 GNUNET_break_op (0);
1838 fail_union_operation (op);
1841 if (PHASE_EXPECT_IBF != op->state->phase)
1843 GNUNET_break_op (0);
1844 fail_union_operation (op);
1848 // FIXME: we need to check that our set is larger than the
1849 // byzantine_lower_bound by some threshold
1851 GNUNET_CADET_receive_done (op->channel);
1856 * Handle a "full done" message.
1858 * @parem cls closure, a set union operation
1859 * @param mh the demand message
1862 handle_union_p2p_full_done (void *cls,
1863 const struct GNUNET_MessageHeader *mh)
1865 struct Operation *op = cls;
1867 switch (op->state->phase)
1869 case PHASE_EXPECT_IBF:
1871 struct GNUNET_MQ_Envelope *ev;
1873 LOG (GNUNET_ERROR_TYPE_DEBUG,
1874 "got FULL DONE, sending elements that other peer is missing\n");
1876 /* send all the elements that did not come from the remote peer */
1877 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1878 &send_missing_full_elements_iter,
1881 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1882 GNUNET_MQ_send (op->mq,
1884 op->state->phase = PHASE_DONE;
1885 /* we now wait until the other peer sends us the OVER message*/
1888 case PHASE_FULL_SENDING:
1890 LOG (GNUNET_ERROR_TYPE_DEBUG,
1891 "got FULL DONE, finishing\n");
1892 /* We sent the full set, and got the response for that. We're done. */
1893 op->state->phase = PHASE_DONE;
1894 GNUNET_CADET_receive_done (op->channel);
1895 send_client_done (op);
1896 destroy_channel (op);
1901 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1902 "Handle full done phase is %u\n",
1903 (unsigned) op->state->phase);
1904 GNUNET_break_op (0);
1905 fail_union_operation (op);
1908 GNUNET_CADET_receive_done (op->channel);
1913 * Check a demand by the other peer for elements based on a list
1914 * of `struct GNUNET_HashCode`s.
1916 * @parem cls closure, a set union operation
1917 * @param mh the demand message
1918 * @return #GNUNET_OK if @a mh is well-formed
1921 check_union_p2p_demand (void *cls,
1922 const struct GNUNET_MessageHeader *mh)
1924 struct Operation *op = cls;
1925 unsigned int num_hashes;
1927 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1929 GNUNET_break_op (0);
1930 return GNUNET_SYSERR;
1932 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1933 / sizeof (struct GNUNET_HashCode);
1934 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1935 != num_hashes * sizeof (struct GNUNET_HashCode))
1937 GNUNET_break_op (0);
1938 return GNUNET_SYSERR;
1945 * Handle a demand by the other peer for elements based on a list
1946 * of `struct GNUNET_HashCode`s.
1948 * @parem cls closure, a set union operation
1949 * @param mh the demand message
1952 handle_union_p2p_demand (void *cls,
1953 const struct GNUNET_MessageHeader *mh)
1955 struct Operation *op = cls;
1956 struct ElementEntry *ee;
1957 struct GNUNET_SET_ElementMessage *emsg;
1958 const struct GNUNET_HashCode *hash;
1959 unsigned int num_hashes;
1960 struct GNUNET_MQ_Envelope *ev;
1962 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1963 / sizeof (struct GNUNET_HashCode);
1964 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1966 hash++, num_hashes--)
1968 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1972 /* Demand for non-existing element. */
1973 GNUNET_break_op (0);
1974 fail_union_operation (op);
1977 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1979 /* Probably confused lazily copied sets. */
1980 GNUNET_break_op (0);
1981 fail_union_operation (op);
1984 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1985 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1986 emsg->reserved = htons (0);
1987 emsg->element_type = htons (ee->element.element_type);
1988 LOG (GNUNET_ERROR_TYPE_DEBUG,
1989 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1991 (unsigned int) ee->element.size,
1992 GNUNET_h2s (&ee->element_hash));
1993 GNUNET_MQ_send (op->mq, ev);
1994 GNUNET_STATISTICS_update (_GSS_statistics,
1995 "# exchanged elements",
1999 switch (op->result_mode)
2001 case GNUNET_SET_RESULT_ADDED:
2002 /* Nothing to do. */
2004 case GNUNET_SET_RESULT_SYMMETRIC:
2005 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2008 /* Result mode not supported, should have been caught earlier. */
2013 GNUNET_CADET_receive_done (op->channel);
2018 * Check offer (of `struct GNUNET_HashCode`s).
2020 * @param cls the union operation
2021 * @param mh the message
2022 * @return #GNUNET_OK if @a mh is well-formed
2025 check_union_p2p_offer (void *cls,
2026 const struct GNUNET_MessageHeader *mh)
2028 struct Operation *op = cls;
2029 unsigned int num_hashes;
2031 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2033 GNUNET_break_op (0);
2034 return GNUNET_SYSERR;
2036 /* look up elements and send them */
2037 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2038 (op->state->phase != PHASE_INVENTORY_ACTIVE))
2040 GNUNET_break_op (0);
2041 return GNUNET_SYSERR;
2043 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2044 / sizeof (struct GNUNET_HashCode);
2045 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) !=
2046 num_hashes * sizeof (struct GNUNET_HashCode))
2048 GNUNET_break_op (0);
2049 return GNUNET_SYSERR;
2056 * Handle offers (of `struct GNUNET_HashCode`s) and
2057 * respond with demands (of `struct GNUNET_HashCode`s).
2059 * @param cls the union operation
2060 * @param mh the message
2063 handle_union_p2p_offer (void *cls,
2064 const struct GNUNET_MessageHeader *mh)
2066 struct Operation *op = cls;
2067 const struct GNUNET_HashCode *hash;
2068 unsigned int num_hashes;
2070 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2071 / sizeof (struct GNUNET_HashCode);
2072 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2074 hash++, num_hashes--)
2076 struct ElementEntry *ee;
2077 struct GNUNET_MessageHeader *demands;
2078 struct GNUNET_MQ_Envelope *ev;
2080 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2083 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2087 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2090 LOG (GNUNET_ERROR_TYPE_DEBUG,
2091 "Skipped sending duplicate demand\n");
2095 GNUNET_assert (GNUNET_OK ==
2096 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
2099 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2101 LOG (GNUNET_ERROR_TYPE_DEBUG,
2102 "[OP %x] Requesting element (hash %s)\n",
2103 (void *) op, GNUNET_h2s (hash));
2104 ev = GNUNET_MQ_msg_header_extra (demands,
2105 sizeof (struct GNUNET_HashCode),
2106 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2107 GNUNET_memcpy (&demands[1],
2109 sizeof (struct GNUNET_HashCode));
2110 GNUNET_MQ_send (op->mq, ev);
2112 GNUNET_CADET_receive_done (op->channel);
2117 * Handle a done message from a remote peer
2119 * @param cls the union operation
2120 * @param mh the message
2123 handle_union_p2p_done (void *cls,
2124 const struct GNUNET_MessageHeader *mh)
2126 struct Operation *op = cls;
2128 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2130 GNUNET_break_op (0);
2131 fail_union_operation (op);
2134 switch (op->state->phase)
2136 case PHASE_INVENTORY_PASSIVE:
2137 /* We got all requests, but still have to send our elements in response. */
2138 op->state->phase = PHASE_FINISH_WAITING;
2140 LOG (GNUNET_ERROR_TYPE_DEBUG,
2141 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2142 /* The active peer is done sending offers
2143 * and inquiries. This means that all
2144 * our responses to that (demands and offers)
2145 * must be in flight (queued or in mesh).
2147 * We should notify the active peer once
2148 * all our demands are satisfied, so that the active
2149 * peer can quit if we gave him everything.
2151 GNUNET_CADET_receive_done (op->channel);
2154 case PHASE_INVENTORY_ACTIVE:
2155 LOG (GNUNET_ERROR_TYPE_DEBUG,
2156 "got DONE (as active partner), waiting to finish\n");
2157 /* All demands of the other peer are satisfied,
2158 * and we processed all offers, thus we know
2159 * exactly what our demands must be.
2161 * We'll close the channel
2162 * to the other peer once our demands are met.
2164 op->state->phase = PHASE_FINISH_CLOSING;
2165 GNUNET_CADET_receive_done (op->channel);
2169 GNUNET_break_op (0);
2170 fail_union_operation (op);
2176 * Handle a over message from a remote peer
2178 * @param cls the union operation
2179 * @param mh the message
2182 handle_union_p2p_over (void *cls,
2183 const struct GNUNET_MessageHeader *mh)
2185 send_client_done (cls);
2190 * Initiate operation to evaluate a set union with a remote peer.
2192 * @param op operation to perform (to be initialized)
2193 * @param opaque_context message to be transmitted to the listener
2194 * to convince him to accept, may be NULL
2196 static struct OperationState *
2197 union_evaluate (struct Operation *op,
2198 const struct GNUNET_MessageHeader *opaque_context)
2200 struct OperationState *state;
2201 struct GNUNET_MQ_Envelope *ev;
2202 struct OperationRequestMessage *msg;
2204 ev = GNUNET_MQ_msg_nested_mh (msg,
2205 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2209 /* the context message is too large */
2213 state = GNUNET_new (struct OperationState);
2214 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2216 /* copy the current generation's strata estimator for this operation */
2217 state->se = strata_estimator_dup (op->set->state->se);
2218 /* we started the operation, thus we have to send the operation request */
2219 state->phase = PHASE_EXPECT_SE;
2220 state->salt_receive = state->salt_send = 42; // FIXME?????
2221 LOG (GNUNET_ERROR_TYPE_DEBUG,
2222 "Initiating union operation evaluation\n");
2223 GNUNET_STATISTICS_update (_GSS_statistics,
2224 "# of total union operations",
2227 GNUNET_STATISTICS_update (_GSS_statistics,
2228 "# of initiated union operations",
2231 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2232 GNUNET_MQ_send (op->mq,
2235 if (NULL != opaque_context)
2236 LOG (GNUNET_ERROR_TYPE_DEBUG,
2237 "sent op request with context message\n");
2239 LOG (GNUNET_ERROR_TYPE_DEBUG,
2240 "sent op request without context message\n");
2243 initialize_key_to_element (op);
2244 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2250 * Accept an union operation request from a remote peer.
2251 * Only initializes the private operation state.
2253 * @param op operation that will be accepted as a union operation
2255 static struct OperationState *
2256 union_accept (struct Operation *op)
2258 struct OperationState *state;
2259 const struct StrataEstimator *se;
2260 struct GNUNET_MQ_Envelope *ev;
2261 struct StrataEstimatorMessage *strata_msg;
2266 LOG (GNUNET_ERROR_TYPE_DEBUG,
2267 "accepting set union operation\n");
2268 GNUNET_STATISTICS_update (_GSS_statistics,
2269 "# of accepted union operations",
2272 GNUNET_STATISTICS_update (_GSS_statistics,
2273 "# of total union operations",
2277 state = GNUNET_new (struct OperationState);
2278 state->se = strata_estimator_dup (op->set->state->se);
2279 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2281 state->salt_receive = state->salt_send = 42; // FIXME?????
2283 initialize_key_to_element (op);
2284 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2286 /* kick off the operation */
2288 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2289 len = strata_estimator_write (se,
2291 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2292 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2294 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2295 ev = GNUNET_MQ_msg_extra (strata_msg,
2298 GNUNET_memcpy (&strata_msg[1],
2302 strata_msg->set_size
2303 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
2304 GNUNET_MQ_send (op->mq,
2306 state->phase = PHASE_EXPECT_IBF;
2312 * Create a new set supporting the union operation
2314 * We maintain one strata estimator per set and then manipulate it over the
2315 * lifetime of the set, as recreating a strata estimator would be expensive.
2317 * @return the newly created set, NULL on error
2319 static struct SetState *
2320 union_set_create (void)
2322 struct SetState *set_state;
2324 LOG (GNUNET_ERROR_TYPE_DEBUG,
2325 "union set created\n");
2326 set_state = GNUNET_new (struct SetState);
2327 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2328 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2329 if (NULL == set_state->se)
2331 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2332 "Failed to allocate strata estimator\n");
2333 GNUNET_free (set_state);
2341 * Add the element from the given element message to the set.
2343 * @param set_state state of the set want to add to
2344 * @param ee the element to add to the set
2347 union_add (struct SetState *set_state,
2348 struct ElementEntry *ee)
2350 strata_estimator_insert (set_state->se,
2351 get_ibf_key (&ee->element_hash));
2356 * Remove the element given in the element message from the set.
2357 * Only marks the element as removed, so that older set operations can still exchange it.
2359 * @param set_state state of the set to remove from
2360 * @param ee set element to remove
2363 union_remove (struct SetState *set_state,
2364 struct ElementEntry *ee)
2366 strata_estimator_remove (set_state->se,
2367 get_ibf_key (&ee->element_hash));
2372 * Destroy a set that supports the union operation.
2374 * @param set_state the set to destroy
2377 union_set_destroy (struct SetState *set_state)
2379 if (NULL != set_state->se)
2381 strata_estimator_destroy (set_state->se);
2382 set_state->se = NULL;
2384 GNUNET_free (set_state);
2389 * Copy union-specific set state.
2391 * @param state source state for copying the union state
2392 * @return a copy of the union-specific set state
2394 static struct SetState *
2395 union_copy_state (struct SetState *state)
2397 struct SetState *new_state;
2399 GNUNET_assert ( (NULL != state) &&
2400 (NULL != state->se) );
2401 new_state = GNUNET_new (struct SetState);
2402 new_state->se = strata_estimator_dup (state->se);
2409 * Handle case where channel went down for an operation.
2411 * @param op operation that lost the channel
2414 union_channel_death (struct Operation *op)
2416 send_client_done (op);
2417 _GSS_operation_destroy (op,
2423 * Get the table with implementing functions for
2426 * @return the operation specific VTable
2428 const struct SetVT *
2431 static const struct SetVT union_vt = {
2432 .create = &union_set_create,
2434 .remove = &union_remove,
2435 .destroy_set = &union_set_destroy,
2436 .evaluate = &union_evaluate,
2437 .accept = &union_accept,
2438 .cancel = &union_op_cancel,
2439 .copy_state = &union_copy_state,
2440 .channel_death = &union_channel_death