2 This file is part of GNUnet
3 Copyright (C) 2013-2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
23 * @brief two-peer set operations
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
40 * Number of IBFs in a strata estimator.
42 #define SE_STRATA_COUNT 32
45 * Size of the IBFs in the strata estimator.
47 #define SE_IBF_SIZE 80
50 * The hash num parameter for the difference digests and strata estimators.
52 #define SE_IBF_HASH_NUM 4
55 * Number of buckets that can be transmitted in one message.
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
60 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61 * Choose this value so that computing the IBF is still cheaper
62 * than transmitting all values.
64 #define MAX_IBF_ORDER (20)
67 * Number of buckets used in the ibf per estimated
74 * Current phase we are in for a union operation.
76 enum UnionOperationPhase
79 * We sent the request message, and expect a strata estimator.
84 * We sent the strata estimator, and expect an IBF. This phase is entered once
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87 * XXX: could use better wording.
88 * XXX: repurposed to also expect a "request full set" message, should be renamed
90 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
95 * Continuation for multi part IBFs.
97 PHASE_EXPECT_IBF_CONT,
100 * We are decoding an IBF.
102 PHASE_INVENTORY_ACTIVE,
105 * The other peer is decoding the IBF we just sent.
107 PHASE_INVENTORY_PASSIVE,
110 * The protocol is almost finished, but we still have to flush our message
111 * queue and/or expect some elements.
113 PHASE_FINISH_CLOSING,
116 * In the penultimate phase,
117 * we wait until all our demands
118 * are satisfied. Then we send a done
119 * message, and wait for another done message.
121 PHASE_FINISH_WAITING,
124 * In the ultimate phase, we wait until
125 * our demands are satisfied and then
126 * quit (sending another DONE message).
131 * After sending the full set, wait for responses with the elements
132 * that the local peer is missing.
139 * State of an evaluate operation with another peer.
141 struct OperationState
144 * Copy of the set's strata estimator at the time of
145 * creation of this operation.
147 struct StrataEstimator *se;
150 * The IBF we currently receive.
152 struct InvertibleBloomFilter *remote_ibf;
155 * The IBF with the local set's element.
157 struct InvertibleBloomFilter *local_ibf;
160 * Maps unsalted IBF-Keys to elements.
161 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
162 * Colliding IBF-Keys are linked.
164 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
167 * Current state of the operation.
169 enum UnionOperationPhase phase;
172 * Did we send the client that we are done?
174 int client_done_sent;
177 * Number of ibf buckets already received into the @a remote_ibf.
179 unsigned int ibf_buckets_received;
182 * Hashes for elements that we have demanded from the other peer.
184 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
187 * Salt that we're using for sending IBFs
192 * Salt for the IBF we've received and that we're currently decoding.
194 uint32_t salt_receive;
197 * Number of elements we received from the other peer
198 * that were not in the local set yet.
200 uint32_t received_fresh;
203 * Total number of elements received from the other peer.
205 uint32_t received_total;
208 * Initial size of our set, just before
209 * the operation started.
211 uint64_t initial_size;
216 * The key entry is used to associate an ibf key with an element.
221 * IBF key for the entry, derived from the current salt.
223 struct IBF_Key ibf_key;
226 * The actual element associated with the key.
228 * Only owned by the union operation if element->operation
231 struct ElementEntry *element;
234 * Did we receive this element?
235 * Even if element->is_foreign is false, we might
236 * have received the element, so this indicates that
237 * the other peer has it.
244 * Used as a closure for sending elements
245 * with a specific IBF key.
247 struct SendElementClosure
250 * The IBF key whose matching elements should be
253 struct IBF_Key ibf_key;
256 * Operation for which the elements
259 struct Operation *op;
264 * Extra state required for efficient set union.
269 * The strata estimator is only generated once for
271 * The IBF keys are derived from the element hashes with
274 struct StrataEstimator *se;
279 * Iterator over hash map entries, called to
280 * destroy the linked list of colliding ibf key entries.
283 * @param key current key code
284 * @param value value in the hash map
285 * @return #GNUNET_YES if we should continue to iterate,
289 destroy_key_to_element_iter (void *cls,
293 struct KeyEntry *k = value;
295 GNUNET_assert (NULL != k);
296 if (GNUNET_YES == k->element->remote)
298 GNUNET_free (k->element);
307 * Destroy the union operation. Only things specific to the union
308 * operation are destroyed.
310 * @param op union operation to destroy
313 union_op_cancel (struct Operation *op)
315 LOG (GNUNET_ERROR_TYPE_DEBUG,
316 "destroying union op\n");
317 /* check if the op was canceled twice */
318 GNUNET_assert (NULL != op->state);
319 if (NULL != op->state->remote_ibf)
321 ibf_destroy (op->state->remote_ibf);
322 op->state->remote_ibf = NULL;
324 if (NULL != op->state->demanded_hashes)
326 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
327 op->state->demanded_hashes = NULL;
329 if (NULL != op->state->local_ibf)
331 ibf_destroy (op->state->local_ibf);
332 op->state->local_ibf = NULL;
334 if (NULL != op->state->se)
336 strata_estimator_destroy (op->state->se);
337 op->state->se = NULL;
339 if (NULL != op->state->key_to_element)
341 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
342 &destroy_key_to_element_iter,
344 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
345 op->state->key_to_element = NULL;
347 GNUNET_free (op->state);
349 LOG (GNUNET_ERROR_TYPE_DEBUG,
350 "destroying union op done\n");
355 * Inform the client that the union operation has failed,
356 * and proceed to destroy the evaluate operation.
358 * @param op the union operation to fail
361 fail_union_operation (struct Operation *op)
363 struct GNUNET_MQ_Envelope *ev;
364 struct GNUNET_SET_ResultMessage *msg;
366 LOG (GNUNET_ERROR_TYPE_ERROR,
367 "union operation failed\n");
368 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
369 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
370 msg->request_id = htonl (op->spec->client_request_id);
371 msg->element_type = htons (0);
372 GNUNET_MQ_send (op->spec->set->client_mq, ev);
373 _GSS_operation_destroy (op, GNUNET_YES);
378 * Derive the IBF key from a hash code and
381 * @param src the hash code
382 * @return the derived IBF key
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
390 GNUNET_CRYPTO_kdf (&key, sizeof (key),
392 &salt, sizeof (salt),
399 * Context for #op_get_element_iterator
401 struct GetElementContext
403 struct GNUNET_HashCode hash;
409 * Iterator over the mapping from IBF keys to element entries. Checks if we
410 * have an element with a given GNUNET_HashCode.
413 * @param key current key code
414 * @param value value in the hash map
415 * @return #GNUNET_YES if we should search further,
416 * #GNUNET_NO if we've found the element.
419 op_get_element_iterator (void *cls,
423 struct GetElementContext *ctx = cls;
424 struct KeyEntry *k = value;
426 GNUNET_assert (NULL != k);
427 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
438 * Determine whether the given element is already in the operation's element
441 * @param op operation that should be tested for 'element_hash'
442 * @param element_hash hash of the element to look for
443 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
445 static struct KeyEntry *
446 op_get_element (struct Operation *op,
447 const struct GNUNET_HashCode *element_hash)
450 struct IBF_Key ibf_key;
451 struct GetElementContext ctx = {{{ 0 }} , 0};
453 ctx.hash = *element_hash;
455 ibf_key = get_ibf_key (element_hash);
456 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
457 (uint32_t) ibf_key.key_val,
458 op_get_element_iterator,
461 /* was the iteration aborted because we found the element? */
462 if (GNUNET_SYSERR == ret)
464 GNUNET_assert (NULL != ctx.k);
472 * Insert an element into the union operation's
473 * key-to-element mapping. Takes ownership of 'ee'.
474 * Note that this does not insert the element in the set,
475 * only in the operation's key-element mapping.
476 * This is done to speed up re-tried operations, if some elements
477 * were transmitted, and then the IBF fails to decode.
479 * XXX: clarify ownership, doesn't sound right.
481 * @param op the union operation
482 * @param ee the element entry
483 * @parem received was this element received from the remote peer?
486 op_register_element (struct Operation *op,
487 struct ElementEntry *ee,
490 struct IBF_Key ibf_key;
493 ibf_key = get_ibf_key (&ee->element_hash);
494 k = GNUNET_new (struct KeyEntry);
496 k->ibf_key = ibf_key;
497 k->received = received;
498 GNUNET_assert (GNUNET_OK ==
499 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
500 (uint32_t) ibf_key.key_val,
502 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
507 salt_key (const struct IBF_Key *k_in,
509 struct IBF_Key *k_out)
512 uint64_t x = k_in->key_val;
514 x = (x >> s) | (x << (64 - s));
520 unsalt_key (const struct IBF_Key *k_in,
522 struct IBF_Key *k_out)
525 uint64_t x = k_in->key_val;
526 x = (x << s) | (x >> (64 - s));
532 * Insert a key into an ibf.
536 * @param value the key entry to get the key from
539 prepare_ibf_iterator (void *cls,
543 struct Operation *op = cls;
544 struct KeyEntry *ke = value;
545 struct IBF_Key salted_key;
547 LOG (GNUNET_ERROR_TYPE_DEBUG,
548 "[OP %x] inserting %lx (hash %s) into ibf\n",
550 (unsigned long) ke->ibf_key.key_val,
551 GNUNET_h2s (&ke->element->element_hash));
552 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
553 ibf_insert (op->state->local_ibf, salted_key);
559 * Iterator for initializing the
560 * key-to-element mapping of a union operation
562 * @param cls the union operation `struct Operation *`
564 * @param value the `struct ElementEntry *` to insert
565 * into the key-to-element mapping
566 * @return #GNUNET_YES (to continue iterating)
569 init_key_to_element_iterator (void *cls,
570 const struct GNUNET_HashCode *key,
573 struct Operation *op = cls;
574 struct ElementEntry *ee = value;
576 /* make sure that the element belongs to the set at the time
577 * of creating the operation */
578 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
581 GNUNET_assert (GNUNET_NO == ee->remote);
583 op_register_element (op, ee, GNUNET_NO);
589 * Initialize the IBF key to element mapping local to this set
592 * @param op the set union operation
595 initialize_key_to_element (struct Operation *op)
599 GNUNET_assert (NULL == op->state->key_to_element);
600 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
607 * Create an ibf with the operation's elements
608 * of the specified size
610 * @param op the union operation
611 * @param size size of the ibf to create
612 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
615 prepare_ibf (struct Operation *op,
618 GNUNET_assert (NULL != op->state->key_to_element);
620 if (NULL != op->state->local_ibf)
621 ibf_destroy (op->state->local_ibf);
622 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
623 if (NULL == op->state->local_ibf)
625 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
626 "Failed to allocate local IBF\n");
627 return GNUNET_SYSERR;
629 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
630 &prepare_ibf_iterator,
637 * Send an ibf of appropriate size.
639 * Fragments the IBF into multiple messages if necessary.
641 * @param op the union operation
642 * @param ibf_order order of the ibf to send, size=2^order
643 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
646 send_ibf (struct Operation *op,
649 unsigned int buckets_sent = 0;
650 struct InvertibleBloomFilter *ibf;
653 prepare_ibf (op, 1<<ibf_order))
655 /* allocation failed */
656 return GNUNET_SYSERR;
659 LOG (GNUNET_ERROR_TYPE_DEBUG,
660 "sending ibf of size %u\n",
664 char name[64] = { 0 };
665 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
666 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
669 ibf = op->state->local_ibf;
671 while (buckets_sent < (1 << ibf_order))
673 unsigned int buckets_in_message;
674 struct GNUNET_MQ_Envelope *ev;
675 struct IBFMessage *msg;
677 buckets_in_message = (1 << ibf_order) - buckets_sent;
678 /* limit to maximum */
679 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
680 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
682 ev = GNUNET_MQ_msg_extra (msg,
683 buckets_in_message * IBF_BUCKET_SIZE,
684 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
687 msg->order = ibf_order;
688 msg->offset = htonl (buckets_sent);
689 msg->salt = htonl (op->state->salt_send);
690 ibf_write_slice (ibf, buckets_sent,
691 buckets_in_message, &msg[1]);
692 buckets_sent += buckets_in_message;
693 LOG (GNUNET_ERROR_TYPE_DEBUG,
694 "ibf chunk size %u, %u/%u sent\n",
698 GNUNET_MQ_send (op->mq, ev);
701 /* The other peer must decode the IBF, so
703 op->state->phase = PHASE_INVENTORY_PASSIVE;
709 * Send a strata estimator to the remote peer.
711 * @param op the union operation with the remote peer
714 send_strata_estimator (struct Operation *op)
716 const struct StrataEstimator *se = op->state->se;
717 struct GNUNET_MQ_Envelope *ev;
718 struct StrataEstimatorMessage *strata_msg;
723 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
724 len = strata_estimator_write (op->state->se,
726 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
727 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
729 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
730 ev = GNUNET_MQ_msg_extra (strata_msg,
733 GNUNET_memcpy (&strata_msg[1],
737 strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
738 GNUNET_MQ_send (op->mq,
740 op->state->phase = PHASE_EXPECT_IBF;
741 LOG (GNUNET_ERROR_TYPE_DEBUG,
742 "sent SE, expecting IBF\n");
747 * Compute the necessary order of an ibf
748 * from the size of the symmetric set difference.
750 * @param diff the difference
751 * @return the required size of the ibf
754 get_order_from_difference (unsigned int diff)
756 unsigned int ibf_order;
759 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
760 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
762 if (ibf_order > MAX_IBF_ORDER)
763 ibf_order = MAX_IBF_ORDER;
769 * Send a set element.
771 * @param cls the union operation `struct Operation *`
773 * @param value the `struct ElementEntry *` to insert
774 * into the key-to-element mapping
775 * @return #GNUNET_YES (to continue iterating)
778 send_element_iterator (void *cls,
779 const struct GNUNET_HashCode *key,
782 struct Operation *op = cls;
783 struct GNUNET_SET_ElementMessage *emsg;
784 struct GNUNET_SET_Element *el = value;
785 struct GNUNET_MQ_Envelope *ev;
787 ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
788 emsg->element_type = htonl (el->element_type);
789 GNUNET_memcpy (&emsg[1], el->data, el->size);
790 GNUNET_MQ_send (op->mq, ev);
796 send_full_set (struct Operation *op)
798 struct GNUNET_MQ_Envelope *ev;
800 op->state->phase = PHASE_FULL_SENDING;
802 (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
803 &send_element_iterator, op);
804 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
805 GNUNET_MQ_send (op->mq, ev);
810 * Handle a strata estimator from a remote peer
812 * @param cls the union operation
813 * @param mh the message
814 * @param is_compressed #GNUNET_YES if the estimator is compressed
815 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
816 * #GNUNET_OK otherwise
819 handle_p2p_strata_estimator (void *cls,
820 const struct GNUNET_MessageHeader *mh,
823 struct Operation *op = cls;
824 struct StrataEstimator *remote_se;
825 struct StrataEstimatorMessage *msg = (void *) mh;
830 GNUNET_STATISTICS_update (_GSS_statistics,
831 "# bytes of SE received",
835 if (op->state->phase != PHASE_EXPECT_SE)
838 fail_union_operation (op);
839 return GNUNET_SYSERR;
841 len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
842 if ( (GNUNET_NO == is_compressed) &&
843 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
845 fail_union_operation (op);
847 return GNUNET_SYSERR;
849 other_size = GNUNET_ntohll (msg->set_size);
850 remote_se = strata_estimator_create (SE_STRATA_COUNT,
853 if (NULL == remote_se)
855 /* insufficient resources, fail */
856 fail_union_operation (op);
857 return GNUNET_SYSERR;
860 strata_estimator_read (&msg[1],
865 /* decompression failed */
866 fail_union_operation (op);
867 strata_estimator_destroy (remote_se);
868 return GNUNET_SYSERR;
870 GNUNET_assert (NULL != op->state->se);
871 diff = strata_estimator_difference (remote_se,
873 strata_estimator_destroy (remote_se);
874 strata_estimator_destroy (op->state->se);
875 op->state->se = NULL;
876 LOG (GNUNET_ERROR_TYPE_DEBUG,
877 "got se diff=%d, using ibf size %d\n",
879 1<<get_order_from_difference (diff));
881 if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
884 fail_union_operation (op);
885 return GNUNET_SYSERR;
889 if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2))
891 LOG (GNUNET_ERROR_TYPE_INFO,
892 "Sending full set (diff=%d, own set=%u)\n",
894 op->state->initial_size);
895 if (op->state->initial_size <= other_size)
901 struct GNUNET_MQ_Envelope *ev;
902 op->state->phase = PHASE_EXPECT_IBF;
903 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
904 GNUNET_MQ_send (op->mq, ev);
911 get_order_from_difference (diff)))
913 /* Internal error, best we can do is shut the connection */
914 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
915 "Failed to send IBF, closing connection\n");
916 fail_union_operation (op);
917 return GNUNET_SYSERR;
926 * Iterator to send elements to a remote peer
928 * @param cls closure with the element key and the union operation
930 * @param value the key entry
933 send_offers_iterator (void *cls,
937 struct SendElementClosure *sec = cls;
938 struct Operation *op = sec->op;
939 struct KeyEntry *ke = value;
940 struct GNUNET_MQ_Envelope *ev;
941 struct GNUNET_MessageHeader *mh;
943 /* Detect 32-bit key collision for the 64-bit IBF keys. */
944 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
947 ev = GNUNET_MQ_msg_header_extra (mh,
948 sizeof (struct GNUNET_HashCode),
949 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
951 GNUNET_assert (NULL != ev);
952 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
953 LOG (GNUNET_ERROR_TYPE_DEBUG,
954 "[OP %x] sending element offer (%s) to peer\n",
956 GNUNET_h2s (&ke->element->element_hash));
957 GNUNET_MQ_send (op->mq, ev);
963 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
965 * @param op union operation
966 * @param ibf_key IBF key of interest
969 send_offers_for_key (struct Operation *op,
970 struct IBF_Key ibf_key)
972 struct SendElementClosure send_cls;
974 send_cls.ibf_key = ibf_key;
976 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
977 (uint32_t) ibf_key.key_val,
978 &send_offers_iterator,
984 * Decode which elements are missing on each side, and
985 * send the appropriate offers and inquiries.
987 * @param op union operation
988 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
991 decode_and_send (struct Operation *op)
994 struct IBF_Key last_key;
996 unsigned int num_decoded;
997 struct InvertibleBloomFilter *diff_ibf;
999 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1002 prepare_ibf (op, op->state->remote_ibf->size))
1005 /* allocation failed */
1006 return GNUNET_SYSERR;
1008 diff_ibf = ibf_dup (op->state->local_ibf);
1009 ibf_subtract (diff_ibf, op->state->remote_ibf);
1011 ibf_destroy (op->state->remote_ibf);
1012 op->state->remote_ibf = NULL;
1014 LOG (GNUNET_ERROR_TYPE_DEBUG,
1015 "decoding IBF (size=%u)\n",
1019 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1024 int cycle_detected = GNUNET_NO;
1028 res = ibf_decode (diff_ibf, &side, &key);
1029 if (res == GNUNET_OK)
1031 LOG (GNUNET_ERROR_TYPE_DEBUG,
1032 "decoded ibf key %lx\n",
1033 (unsigned long) key.key_val);
1035 if ( (num_decoded > diff_ibf->size) ||
1036 ( (num_decoded > 1) &&
1037 (last_key.key_val == key.key_val) ) )
1039 LOG (GNUNET_ERROR_TYPE_DEBUG,
1040 "detected cyclic ibf (decoded %u/%u)\n",
1043 cycle_detected = GNUNET_YES;
1046 if ( (GNUNET_SYSERR == res) ||
1047 (GNUNET_YES == cycle_detected) )
1051 while (1<<next_order < diff_ibf->size)
1054 if (next_order <= MAX_IBF_ORDER)
1056 LOG (GNUNET_ERROR_TYPE_DEBUG,
1057 "decoding failed, sending larger ibf (size %u)\n",
1059 GNUNET_STATISTICS_update (_GSS_statistics,
1063 op->state->salt_send++;
1065 send_ibf (op, next_order))
1067 /* Internal error, best we can do is shut the connection */
1068 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1069 "Failed to send IBF, closing connection\n");
1070 fail_union_operation (op);
1071 ibf_destroy (diff_ibf);
1072 return GNUNET_SYSERR;
1077 GNUNET_STATISTICS_update (_GSS_statistics,
1078 "# of failed union operations (too large)",
1081 // XXX: Send the whole set, element-by-element
1082 LOG (GNUNET_ERROR_TYPE_ERROR,
1083 "set union failed: reached ibf limit\n");
1084 fail_union_operation (op);
1085 ibf_destroy (diff_ibf);
1086 return GNUNET_SYSERR;
1090 if (GNUNET_NO == res)
1092 struct GNUNET_MQ_Envelope *ev;
1094 LOG (GNUNET_ERROR_TYPE_DEBUG,
1095 "transmitted all values, sending DONE\n");
1096 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1097 GNUNET_MQ_send (op->mq, ev);
1098 /* We now wait until we get a DONE message back
1099 * and then wait for our MQ to be flushed and all our
1100 * demands be delivered. */
1105 struct IBF_Key unsalted_key;
1106 unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1107 send_offers_for_key (op, unsalted_key);
1109 else if (-1 == side)
1111 struct GNUNET_MQ_Envelope *ev;
1112 struct InquiryMessage *msg;
1114 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1115 * the effort additional complexity. */
1116 ev = GNUNET_MQ_msg_extra (msg,
1117 sizeof (struct IBF_Key),
1118 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1119 msg->salt = htonl (op->state->salt_receive);
1120 GNUNET_memcpy (&msg[1],
1122 sizeof (struct IBF_Key));
1123 LOG (GNUNET_ERROR_TYPE_DEBUG,
1124 "sending element inquiry for IBF key %lx\n",
1125 (unsigned long) key.key_val);
1126 GNUNET_MQ_send (op->mq, ev);
1133 ibf_destroy (diff_ibf);
1139 * Handle an IBF message from a remote peer.
1141 * Reassemble the IBF from multiple pieces, and
1142 * process the whole IBF once possible.
1144 * @param cls the union operation
1145 * @param mh the header of the message
1146 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1147 * #GNUNET_OK otherwise
1150 handle_p2p_ibf (void *cls,
1151 const struct GNUNET_MessageHeader *mh)
1153 struct Operation *op = cls;
1154 const struct IBFMessage *msg;
1155 unsigned int buckets_in_message;
1157 if (ntohs (mh->size) < sizeof (struct IBFMessage))
1159 GNUNET_break_op (0);
1160 fail_union_operation (op);
1161 return GNUNET_SYSERR;
1163 msg = (const struct IBFMessage *) mh;
1164 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1165 (op->state->phase == PHASE_EXPECT_IBF) )
1167 op->state->phase = PHASE_EXPECT_IBF_CONT;
1168 GNUNET_assert (NULL == op->state->remote_ibf);
1169 LOG (GNUNET_ERROR_TYPE_DEBUG,
1170 "Creating new ibf of size %u\n",
1172 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1173 op->state->salt_receive = ntohl (msg->salt);
1174 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1175 if (NULL == op->state->remote_ibf)
1177 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1178 "Failed to parse remote IBF, closing connection\n");
1179 fail_union_operation (op);
1180 return GNUNET_SYSERR;
1182 op->state->ibf_buckets_received = 0;
1183 if (0 != ntohl (msg->offset))
1185 GNUNET_break_op (0);
1186 fail_union_operation (op);
1187 return GNUNET_SYSERR;
1190 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1192 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1194 GNUNET_break_op (0);
1195 fail_union_operation (op);
1196 return GNUNET_SYSERR;
1198 if (1<<msg->order != op->state->remote_ibf->size)
1200 GNUNET_break_op (0);
1201 fail_union_operation (op);
1202 return GNUNET_SYSERR;
1204 if (ntohl (msg->salt) != op->state->salt_receive)
1206 GNUNET_break_op (0);
1207 fail_union_operation (op);
1208 return GNUNET_SYSERR;
1216 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1218 if (0 == buckets_in_message)
1220 GNUNET_break_op (0);
1221 fail_union_operation (op);
1222 return GNUNET_SYSERR;
1225 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1227 GNUNET_break_op (0);
1228 fail_union_operation (op);
1229 return GNUNET_SYSERR;
1232 GNUNET_assert (NULL != op->state->remote_ibf);
1234 ibf_read_slice (&msg[1],
1235 op->state->ibf_buckets_received,
1237 op->state->remote_ibf);
1238 op->state->ibf_buckets_received += buckets_in_message;
1240 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1242 LOG (GNUNET_ERROR_TYPE_DEBUG,
1243 "received full ibf\n");
1244 op->state->phase = PHASE_INVENTORY_ACTIVE;
1246 decode_and_send (op))
1248 /* Internal error, best we can do is shut down */
1249 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1250 "Failed to decode IBF, closing connection\n");
1251 return GNUNET_SYSERR;
1259 * Send a result message to the client indicating
1260 * that there is a new element.
1262 * @param op union operation
1263 * @param element element to send
1264 * @param status status to send with the new element
1267 send_client_element (struct Operation *op,
1268 struct GNUNET_SET_Element *element,
1271 struct GNUNET_MQ_Envelope *ev;
1272 struct GNUNET_SET_ResultMessage *rm;
1274 LOG (GNUNET_ERROR_TYPE_DEBUG,
1275 "sending element (size %u) to client\n",
1277 GNUNET_assert (0 != op->spec->client_request_id);
1278 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1281 GNUNET_MQ_discard (ev);
1285 rm->result_status = htons (status);
1286 rm->request_id = htonl (op->spec->client_request_id);
1287 rm->element_type = element->element_type;
1288 GNUNET_memcpy (&rm[1], element->data, element->size);
1289 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1294 * Signal to the client that the operation has finished and
1295 * destroy the operation.
1297 * @param cls operation to destroy
1300 send_done_and_destroy (void *cls)
1302 struct Operation *op = cls;
1303 struct GNUNET_MQ_Envelope *ev;
1304 struct GNUNET_SET_ResultMessage *rm;
1306 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1307 rm->request_id = htonl (op->spec->client_request_id);
1308 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1309 rm->element_type = htons (0);
1310 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1311 /* Will also call the union-specific cancel function. */
1312 _GSS_operation_destroy (op, GNUNET_YES);
1317 maybe_finish (struct Operation *op)
1319 unsigned int num_demanded;
1321 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1323 if (PHASE_FINISH_WAITING == op->state->phase)
1325 LOG (GNUNET_ERROR_TYPE_DEBUG,
1326 "In PHASE_FINISH_WAITING, pending %u demands\n",
1328 if (0 == num_demanded)
1330 struct GNUNET_MQ_Envelope *ev;
1332 op->state->phase = PHASE_DONE;
1333 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1334 GNUNET_MQ_send (op->mq, ev);
1336 /* We now wait until the other peer closes the channel
1337 * after it got all elements from us. */
1340 if (PHASE_FINISH_CLOSING == op->state->phase)
1342 LOG (GNUNET_ERROR_TYPE_DEBUG,
1343 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1345 if (0 == num_demanded)
1347 op->state->phase = PHASE_DONE;
1348 send_done_and_destroy (op);
1355 * Handle an element message from a remote peer.
1356 * Sent by the other peer either because we decoded an IBF and placed a demand,
1357 * or because the other peer switched to full set transmission.
1359 * @param cls the union operation
1360 * @param mh the message
1363 handle_p2p_elements (void *cls,
1364 const struct GNUNET_MessageHeader *mh)
1366 struct Operation *op = cls;
1367 struct ElementEntry *ee;
1368 const struct GNUNET_SET_ElementMessage *emsg;
1369 uint16_t element_size;
1371 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1373 GNUNET_break_op (0);
1374 fail_union_operation (op);
1377 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1379 GNUNET_break_op (0);
1380 fail_union_operation (op);
1384 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1386 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1387 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1388 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1389 ee->element.size = element_size;
1390 ee->element.data = &ee[1];
1391 ee->element.element_type = ntohs (emsg->element_type);
1392 ee->remote = GNUNET_YES;
1393 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1396 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1400 /* We got something we didn't demand, since it's not in our map. */
1401 GNUNET_break_op (0);
1403 fail_union_operation (op);
1407 LOG (GNUNET_ERROR_TYPE_DEBUG,
1408 "Got element (size %u, hash %s) from peer\n",
1409 (unsigned int) element_size,
1410 GNUNET_h2s (&ee->element_hash));
1412 GNUNET_STATISTICS_update (_GSS_statistics,
1413 "# received elements",
1416 GNUNET_STATISTICS_update (_GSS_statistics,
1417 "# exchanged elements",
1421 op->state->received_total += 1;
1423 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1427 /* Got repeated element. Should not happen since
1428 * we track demands. */
1429 GNUNET_STATISTICS_update (_GSS_statistics,
1430 "# repeated elements",
1433 ke->received = GNUNET_YES;
1438 LOG (GNUNET_ERROR_TYPE_DEBUG,
1439 "Registering new element from remote peer\n");
1440 op->state->received_fresh += 1;
1441 op_register_element (op, ee, GNUNET_YES);
1442 /* only send results immediately if the client wants it */
1443 switch (op->spec->result_mode)
1445 case GNUNET_SET_RESULT_ADDED:
1446 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1448 case GNUNET_SET_RESULT_SYMMETRIC:
1449 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1452 /* Result mode not supported, should have been caught earlier. */
1458 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1460 /* The other peer gave us lots of old elements, there's something wrong. */
1461 GNUNET_break_op (0);
1462 fail_union_operation (op);
1471 * Handle an element message from a remote peer.
1473 * @param cls the union operation
1474 * @param mh the message
1477 handle_p2p_full_element (void *cls,
1478 const struct GNUNET_MessageHeader *mh)
1480 struct Operation *op = cls;
1481 struct ElementEntry *ee;
1482 const struct GNUNET_SET_ElementMessage *emsg;
1483 uint16_t element_size;
1485 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1487 GNUNET_break_op (0);
1488 fail_union_operation (op);
1492 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1494 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1495 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1496 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1497 ee->element.size = element_size;
1498 ee->element.data = &ee[1];
1499 ee->element.element_type = ntohs (emsg->element_type);
1500 ee->remote = GNUNET_YES;
1501 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1503 LOG (GNUNET_ERROR_TYPE_DEBUG,
1504 "Got element (full diff, size %u, hash %s) from peer\n",
1505 (unsigned int) element_size,
1506 GNUNET_h2s (&ee->element_hash));
1508 GNUNET_STATISTICS_update (_GSS_statistics,
1509 "# received elements",
1512 GNUNET_STATISTICS_update (_GSS_statistics,
1513 "# exchanged elements",
1517 op->state->received_total += 1;
1519 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1523 /* Got repeated element. Should not happen since
1524 * we track demands. */
1525 GNUNET_STATISTICS_update (_GSS_statistics,
1526 "# repeated elements",
1529 ke->received = GNUNET_YES;
1534 LOG (GNUNET_ERROR_TYPE_DEBUG,
1535 "Registering new element from remote peer\n");
1536 op->state->received_fresh += 1;
1537 op_register_element (op, ee, GNUNET_YES);
1538 /* only send results immediately if the client wants it */
1539 switch (op->spec->result_mode)
1541 case GNUNET_SET_RESULT_ADDED:
1542 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1544 case GNUNET_SET_RESULT_SYMMETRIC:
1545 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1548 /* Result mode not supported, should have been caught earlier. */
1554 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1556 /* The other peer gave us lots of old elements, there's something wrong. */
1557 GNUNET_break_op (0);
1558 fail_union_operation (op);
1564 * Send offers (for GNUNET_Hash-es) in response
1565 * to inquiries (for IBF_Key-s).
1567 * @param cls the union operation
1568 * @param mh the message
1571 handle_p2p_inquiry (void *cls,
1572 const struct GNUNET_MessageHeader *mh)
1574 struct Operation *op = cls;
1575 const struct IBF_Key *ibf_key;
1576 unsigned int num_keys;
1577 struct InquiryMessage *msg;
1579 /* look up elements and send them */
1580 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1582 GNUNET_break_op (0);
1583 fail_union_operation (op);
1586 num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1587 / sizeof (struct IBF_Key);
1588 if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1589 != num_keys * sizeof (struct IBF_Key))
1591 GNUNET_break_op (0);
1592 fail_union_operation (op);
1596 msg = (struct InquiryMessage *) mh;
1598 ibf_key = (const struct IBF_Key *) &msg[1];
1599 while (0 != num_keys--)
1601 struct IBF_Key unsalted_key;
1602 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1603 send_offers_for_key (op, unsalted_key);
1610 * Iterator over hash map entries, called to
1611 * destroy the linked list of colliding ibf key entries.
1613 * @param cls closure
1614 * @param key current key code
1615 * @param value value in the hash map
1616 * @return #GNUNET_YES if we should continue to iterate,
1617 * #GNUNET_NO if not.
1620 send_missing_elements_iter (void *cls,
1624 struct Operation *op = cls;
1625 struct KeyEntry *ke = value;
1626 struct GNUNET_MQ_Envelope *ev;
1627 struct GNUNET_SET_ElementMessage *emsg;
1628 struct ElementEntry *ee = ke->element;
1630 if (GNUNET_YES == ke->received)
1633 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1634 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1635 emsg->reserved = htons (0);
1636 emsg->element_type = htons (ee->element.element_type);
1637 GNUNET_MQ_send (op->mq, ev);
1646 * @parem cls closure, a set union operation
1647 * @param mh the demand message
1650 handle_p2p_request_full (void *cls,
1651 const struct GNUNET_MessageHeader *mh)
1653 struct Operation *op = cls;
1655 if (PHASE_EXPECT_IBF != op->state->phase)
1657 fail_union_operation (op);
1658 GNUNET_break_op (0);
1662 // FIXME: we need to check that our set is larger than the
1663 // byzantine_lower_bound by some threshold
1669 * Handle a "full done" message.
1671 * @parem cls closure, a set union operation
1672 * @param mh the demand message
1675 handle_p2p_full_done (void *cls,
1676 const struct GNUNET_MessageHeader *mh)
1678 struct Operation *op = cls;
1680 if (PHASE_EXPECT_IBF == op->state->phase)
1682 struct GNUNET_MQ_Envelope *ev;
1684 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1686 /* send all the elements that did not come from the remote peer */
1687 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1688 &send_missing_elements_iter,
1691 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1692 GNUNET_MQ_send (op->mq, ev);
1693 op->state->phase = PHASE_DONE;
1695 /* we now wait until the other peer shuts the tunnel down*/
1697 else if (PHASE_FULL_SENDING == op->state->phase)
1699 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1700 /* We sent the full set, and got the response for that. We're done. */
1701 op->state->phase = PHASE_DONE;
1702 send_done_and_destroy (op);
1706 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1707 GNUNET_break_op (0);
1708 fail_union_operation (op);
1715 * Handle a demand by the other peer for elements based on a list
1716 * of GNUNET_HashCode-s.
1718 * @parem cls closure, a set union operation
1719 * @param mh the demand message
1722 handle_p2p_demand (void *cls,
1723 const struct GNUNET_MessageHeader *mh)
1725 struct Operation *op = cls;
1726 struct ElementEntry *ee;
1727 struct GNUNET_SET_ElementMessage *emsg;
1728 const struct GNUNET_HashCode *hash;
1729 unsigned int num_hashes;
1730 struct GNUNET_MQ_Envelope *ev;
1732 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1733 / sizeof (struct GNUNET_HashCode);
1734 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1735 != num_hashes * sizeof (struct GNUNET_HashCode))
1737 GNUNET_break_op (0);
1738 fail_union_operation (op);
1742 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1744 hash++, num_hashes--)
1746 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1749 /* Demand for non-existing element. */
1750 GNUNET_break_op (0);
1751 fail_union_operation (op);
1754 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1756 /* Probably confused lazily copied sets. */
1757 GNUNET_break_op (0);
1758 fail_union_operation (op);
1761 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1762 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1763 emsg->reserved = htons (0);
1764 emsg->element_type = htons (ee->element.element_type);
1765 LOG (GNUNET_ERROR_TYPE_DEBUG,
1766 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1768 (unsigned int) ee->element.size,
1769 GNUNET_h2s (&ee->element_hash));
1770 GNUNET_MQ_send (op->mq, ev);
1771 GNUNET_STATISTICS_update (_GSS_statistics,
1772 "# exchanged elements",
1776 switch (op->spec->result_mode)
1778 case GNUNET_SET_RESULT_ADDED:
1779 /* Nothing to do. */
1781 case GNUNET_SET_RESULT_SYMMETRIC:
1782 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1785 /* Result mode not supported, should have been caught earlier. */
1794 * Handle offers (of GNUNET_HashCode-s) and
1795 * respond with demands (of GNUNET_HashCode-s).
1797 * @param cls the union operation
1798 * @param mh the message
1801 handle_p2p_offer (void *cls,
1802 const struct GNUNET_MessageHeader *mh)
1804 struct Operation *op = cls;
1805 const struct GNUNET_HashCode *hash;
1806 unsigned int num_hashes;
1808 /* look up elements and send them */
1809 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1810 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1812 GNUNET_break_op (0);
1813 fail_union_operation (op);
1816 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1817 / sizeof (struct GNUNET_HashCode);
1818 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1819 != num_hashes * sizeof (struct GNUNET_HashCode))
1821 GNUNET_break_op (0);
1822 fail_union_operation (op);
1826 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1828 hash++, num_hashes--)
1830 struct ElementEntry *ee;
1831 struct GNUNET_MessageHeader *demands;
1832 struct GNUNET_MQ_Envelope *ev;
1834 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1837 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1841 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1844 LOG (GNUNET_ERROR_TYPE_DEBUG,
1845 "Skipped sending duplicate demand\n");
1849 GNUNET_assert (GNUNET_OK ==
1850 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1853 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1855 LOG (GNUNET_ERROR_TYPE_DEBUG,
1856 "[OP %x] Requesting element (hash %s)\n",
1857 (void *) op, GNUNET_h2s (hash));
1858 ev = GNUNET_MQ_msg_header_extra (demands,
1859 sizeof (struct GNUNET_HashCode),
1860 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1861 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1862 GNUNET_MQ_send (op->mq, ev);
1868 * Handle a done message from a remote peer
1870 * @param cls the union operation
1871 * @param mh the message
1874 handle_p2p_done (void *cls,
1875 const struct GNUNET_MessageHeader *mh)
1877 struct Operation *op = cls;
1879 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1881 /* We got all requests, but still have to send our elements in response. */
1883 op->state->phase = PHASE_FINISH_WAITING;
1885 LOG (GNUNET_ERROR_TYPE_DEBUG,
1886 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1887 /* The active peer is done sending offers
1888 * and inquiries. This means that all
1889 * our responses to that (demands and offers)
1890 * must be in flight (queued or in mesh).
1892 * We should notify the active peer once
1893 * all our demands are satisfied, so that the active
1894 * peer can quit if we gave him everything.
1899 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1901 LOG (GNUNET_ERROR_TYPE_DEBUG,
1902 "got DONE (as active partner), waiting to finish\n");
1903 /* All demands of the other peer are satisfied,
1904 * and we processed all offers, thus we know
1905 * exactly what our demands must be.
1907 * We'll close the channel
1908 * to the other peer once our demands are met.
1910 op->state->phase = PHASE_FINISH_CLOSING;
1914 GNUNET_break_op (0);
1915 fail_union_operation (op);
1920 * Initiate operation to evaluate a set union with a remote peer.
1922 * @param op operation to perform (to be initialized)
1923 * @param opaque_context message to be transmitted to the listener
1924 * to convince him to accept, may be NULL
1927 union_evaluate (struct Operation *op,
1928 const struct GNUNET_MessageHeader *opaque_context)
1930 struct GNUNET_MQ_Envelope *ev;
1931 struct OperationRequestMessage *msg;
1933 GNUNET_assert (NULL == op->state);
1934 op->state = GNUNET_new (struct OperationState);
1935 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1936 /* copy the current generation's strata estimator for this operation */
1937 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1938 /* we started the operation, thus we have to send the operation request */
1939 op->state->phase = PHASE_EXPECT_SE;
1940 op->state->salt_receive = op->state->salt_send = 42;
1941 LOG (GNUNET_ERROR_TYPE_DEBUG,
1942 "Initiating union operation evaluation\n");
1943 GNUNET_STATISTICS_update (_GSS_statistics,
1944 "# of total union operations",
1947 GNUNET_STATISTICS_update (_GSS_statistics,
1948 "# of initiated union operations",
1951 ev = GNUNET_MQ_msg_nested_mh (msg,
1952 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1956 /* the context message is too large */
1958 GNUNET_SERVICE_client_drop (op->spec->set->client);
1961 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1962 GNUNET_MQ_send (op->mq,
1965 if (NULL != opaque_context)
1966 LOG (GNUNET_ERROR_TYPE_DEBUG,
1967 "sent op request with context message\n");
1969 LOG (GNUNET_ERROR_TYPE_DEBUG,
1970 "sent op request without context message\n");
1972 op->state->initial_size = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
1973 initialize_key_to_element (op);
1978 * Accept an union operation request from a remote peer.
1979 * Only initializes the private operation state.
1981 * @param op operation that will be accepted as a union operation
1984 union_accept (struct Operation *op)
1986 LOG (GNUNET_ERROR_TYPE_DEBUG,
1987 "accepting set union operation\n");
1988 GNUNET_assert (NULL == op->state);
1990 GNUNET_STATISTICS_update (_GSS_statistics,
1991 "# of accepted union operations",
1994 GNUNET_STATISTICS_update (_GSS_statistics,
1995 "# of total union operations",
1999 op->state = GNUNET_new (struct OperationState);
2000 op->state->se = strata_estimator_dup (op->spec->set->state->se);
2001 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2002 op->state->salt_receive = op->state->salt_send = 42;
2003 op->state->initial_size = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
2004 initialize_key_to_element (op);
2005 /* kick off the operation */
2006 send_strata_estimator (op);
2011 * Create a new set supporting the union operation
2013 * We maintain one strata estimator per set and then manipulate it over the
2014 * lifetime of the set, as recreating a strata estimator would be expensive.
2016 * @return the newly created set, NULL on error
2018 static struct SetState *
2019 union_set_create (void)
2021 struct SetState *set_state;
2023 LOG (GNUNET_ERROR_TYPE_DEBUG,
2024 "union set created\n");
2025 set_state = GNUNET_new (struct SetState);
2026 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2027 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2028 if (NULL == set_state->se)
2030 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2031 "Failed to allocate strata estimator\n");
2032 GNUNET_free (set_state);
2040 * Add the element from the given element message to the set.
2042 * @param set_state state of the set want to add to
2043 * @param ee the element to add to the set
2046 union_add (struct SetState *set_state, struct ElementEntry *ee)
2048 strata_estimator_insert (set_state->se,
2049 get_ibf_key (&ee->element_hash));
2054 * Remove the element given in the element message from the set.
2055 * Only marks the element as removed, so that older set operations can still exchange it.
2057 * @param set_state state of the set to remove from
2058 * @param ee set element to remove
2061 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2063 strata_estimator_remove (set_state->se,
2064 get_ibf_key (&ee->element_hash));
2069 * Destroy a set that supports the union operation.
2071 * @param set_state the set to destroy
2074 union_set_destroy (struct SetState *set_state)
2076 if (NULL != set_state->se)
2078 strata_estimator_destroy (set_state->se);
2079 set_state->se = NULL;
2081 GNUNET_free (set_state);
2086 * Dispatch messages for a union operation.
2088 * @param op the state of the union evaluate operation
2089 * @param mh the received message
2090 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2091 * #GNUNET_OK otherwise
2094 union_handle_p2p_message (struct Operation *op,
2095 const struct GNUNET_MessageHeader *mh)
2097 //LOG (GNUNET_ERROR_TYPE_DEBUG,
2098 // "received p2p message (t: %u, s: %u)\n",
2099 // ntohs (mh->type),
2100 // ntohs (mh->size));
2101 switch (ntohs (mh->type))
2103 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2104 return handle_p2p_ibf (op, mh);
2105 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2106 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2107 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2108 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2109 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2110 handle_p2p_elements (op, mh);
2112 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2113 handle_p2p_full_element (op, mh);
2115 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2116 handle_p2p_inquiry (op, mh);
2118 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2119 handle_p2p_done (op, mh);
2121 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2122 handle_p2p_offer (op, mh);
2124 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2125 handle_p2p_demand (op, mh);
2127 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2128 handle_p2p_full_done (op, mh);
2130 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2131 handle_p2p_request_full (op, mh);
2134 /* Something wrong with cadet's message handlers? */
2142 * Handler for peer-disconnects, notifies the client
2143 * about the aborted operation in case the op was not concluded.
2145 * @param op the destroyed operation
2148 union_peer_disconnect (struct Operation *op)
2150 if (PHASE_DONE != op->state->phase)
2152 struct GNUNET_MQ_Envelope *ev;
2153 struct GNUNET_SET_ResultMessage *msg;
2155 ev = GNUNET_MQ_msg (msg,
2156 GNUNET_MESSAGE_TYPE_SET_RESULT);
2157 msg->request_id = htonl (op->spec->client_request_id);
2158 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2159 msg->element_type = htons (0);
2160 GNUNET_MQ_send (op->spec->set->client_mq,
2162 LOG (GNUNET_ERROR_TYPE_WARNING,
2163 "other peer disconnected prematurely, phase %u\n",
2165 _GSS_operation_destroy (op,
2169 // else: the session has already been concluded
2170 LOG (GNUNET_ERROR_TYPE_DEBUG,
2171 "other peer disconnected (finished)\n");
2172 if (GNUNET_NO == op->state->client_done_sent)
2173 send_done_and_destroy (op);
2178 * Copy union-specific set state.
2180 * @param set source set for copying the union state
2181 * @return a copy of the union-specific set state
2183 static struct SetState *
2184 union_copy_state (struct Set *set)
2186 struct SetState *new_state;
2188 new_state = GNUNET_new (struct SetState);
2189 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2190 new_state->se = strata_estimator_dup (set->state->se);
2197 * Get the table with implementing functions for
2200 * @return the operation specific VTable
2202 const struct SetVT *
2205 static const struct SetVT union_vt = {
2206 .create = &union_set_create,
2207 .msg_handler = &union_handle_p2p_message,
2209 .remove = &union_remove,
2210 .destroy_set = &union_set_destroy,
2211 .evaluate = &union_evaluate,
2212 .accept = &union_accept,
2213 .peer_disconnect = &union_peer_disconnect,
2214 .cancel = &union_op_cancel,
2215 .copy_state = &union_copy_state,