2 This file is part of GNUnet
3 Copyright (C) 2013-2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
23 * @brief two-peer set operations
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
40 * Number of IBFs in a strata estimator.
42 #define SE_STRATA_COUNT 32
45 * Size of the IBFs in the strata estimator.
47 #define SE_IBF_SIZE 80
50 * The hash num parameter for the difference digests and strata estimators.
52 #define SE_IBF_HASH_NUM 4
55 * Number of buckets that can be transmitted in one message.
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
60 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61 * Choose this value so that computing the IBF is still cheaper
62 * than transmitting all values.
64 #define MAX_IBF_ORDER (20)
67 * Number of buckets used in the ibf per estimated
74 * Current phase we are in for a union operation.
76 enum UnionOperationPhase
79 * We sent the request message, and expect a strata estimator.
84 * We sent the strata estimator, and expect an IBF. This phase is entered once
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87 * XXX: could use better wording.
88 * XXX: repurposed to also expect a "request full set" message, should be renamed
90 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
95 * Continuation for multi part IBFs.
97 PHASE_EXPECT_IBF_CONT,
100 * We are decoding an IBF.
102 PHASE_INVENTORY_ACTIVE,
105 * The other peer is decoding the IBF we just sent.
107 PHASE_INVENTORY_PASSIVE,
110 * The protocol is almost finished, but we still have to flush our message
111 * queue and/or expect some elements.
113 PHASE_FINISH_CLOSING,
116 * In the penultimate phase,
117 * we wait until all our demands
118 * are satisfied. Then we send a done
119 * message, and wait for another done message.
121 PHASE_FINISH_WAITING,
124 * In the ultimate phase, we wait until
125 * our demands are satisfied and then
126 * quit (sending another DONE message).
131 * After sending the full set, wait for responses with the elements
132 * that the local peer is missing.
139 * State of an evaluate operation with another peer.
141 struct OperationState
144 * Copy of the set's strata estimator at the time of
145 * creation of this operation.
147 struct StrataEstimator *se;
150 * The IBF we currently receive.
152 struct InvertibleBloomFilter *remote_ibf;
155 * The IBF with the local set's element.
157 struct InvertibleBloomFilter *local_ibf;
160 * Maps unsalted IBF-Keys to elements.
161 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
162 * Colliding IBF-Keys are linked.
164 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
167 * Current state of the operation.
169 enum UnionOperationPhase phase;
172 * Did we send the client that we are done?
174 int client_done_sent;
177 * Number of ibf buckets already received into the @a remote_ibf.
179 unsigned int ibf_buckets_received;
182 * Hashes for elements that we have demanded from the other peer.
184 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
187 * Salt that we're using for sending IBFs
192 * Salt for the IBF we've received and that we're currently decoding.
194 uint32_t salt_receive;
197 * Number of elements we received from the other peer
198 * that were not in the local set yet.
200 uint32_t received_fresh;
203 * Total number of elements received from the other peer.
205 uint32_t received_total;
208 * Initial size of our set, just before
209 * the operation started.
211 uint64_t initial_size;
216 * The key entry is used to associate an ibf key with an element.
221 * IBF key for the entry, derived from the current salt.
223 struct IBF_Key ibf_key;
226 * The actual element associated with the key.
228 * Only owned by the union operation if element->operation
231 struct ElementEntry *element;
234 * Did we receive this element?
235 * Even if element->is_foreign is false, we might
236 * have received the element, so this indicates that
237 * the other peer has it.
244 * Used as a closure for sending elements
245 * with a specific IBF key.
247 struct SendElementClosure
250 * The IBF key whose matching elements should be
253 struct IBF_Key ibf_key;
256 * Operation for which the elements
259 struct Operation *op;
264 * Extra state required for efficient set union.
269 * The strata estimator is only generated once for
271 * The IBF keys are derived from the element hashes with
274 struct StrataEstimator *se;
279 * Iterator over hash map entries, called to
280 * destroy the linked list of colliding ibf key entries.
283 * @param key current key code
284 * @param value value in the hash map
285 * @return #GNUNET_YES if we should continue to iterate,
289 destroy_key_to_element_iter (void *cls,
293 struct KeyEntry *k = value;
295 GNUNET_assert (NULL != k);
296 if (GNUNET_YES == k->element->remote)
298 GNUNET_free (k->element);
307 * Destroy the union operation. Only things specific to the union
308 * operation are destroyed.
310 * @param op union operation to destroy
313 union_op_cancel (struct Operation *op)
315 LOG (GNUNET_ERROR_TYPE_DEBUG,
316 "destroying union op\n");
317 /* check if the op was canceled twice */
318 GNUNET_assert (NULL != op->state);
319 if (NULL != op->state->remote_ibf)
321 ibf_destroy (op->state->remote_ibf);
322 op->state->remote_ibf = NULL;
324 if (NULL != op->state->demanded_hashes)
326 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
327 op->state->demanded_hashes = NULL;
329 if (NULL != op->state->local_ibf)
331 ibf_destroy (op->state->local_ibf);
332 op->state->local_ibf = NULL;
334 if (NULL != op->state->se)
336 strata_estimator_destroy (op->state->se);
337 op->state->se = NULL;
339 if (NULL != op->state->key_to_element)
341 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
342 &destroy_key_to_element_iter,
344 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
345 op->state->key_to_element = NULL;
347 GNUNET_free (op->state);
349 LOG (GNUNET_ERROR_TYPE_DEBUG,
350 "destroying union op done\n");
355 * Inform the client that the union operation has failed,
356 * and proceed to destroy the evaluate operation.
358 * @param op the union operation to fail
361 fail_union_operation (struct Operation *op)
363 struct GNUNET_MQ_Envelope *ev;
364 struct GNUNET_SET_ResultMessage *msg;
366 LOG (GNUNET_ERROR_TYPE_ERROR,
367 "union operation failed\n");
368 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
369 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
370 msg->request_id = htonl (op->spec->client_request_id);
371 msg->element_type = htons (0);
372 GNUNET_MQ_send (op->spec->set->client_mq, ev);
373 _GSS_operation_destroy (op, GNUNET_YES);
378 * Derive the IBF key from a hash code and
381 * @param src the hash code
382 * @return the derived IBF key
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
390 GNUNET_CRYPTO_kdf (&key, sizeof (key),
392 &salt, sizeof (salt),
399 * Context for #op_get_element_iterator
401 struct GetElementContext
403 struct GNUNET_HashCode hash;
409 * Iterator over the mapping from IBF keys to element entries. Checks if we
410 * have an element with a given GNUNET_HashCode.
413 * @param key current key code
414 * @param value value in the hash map
415 * @return #GNUNET_YES if we should search further,
416 * #GNUNET_NO if we've found the element.
419 op_get_element_iterator (void *cls,
423 struct GetElementContext *ctx = cls;
424 struct KeyEntry *k = value;
426 GNUNET_assert (NULL != k);
427 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
438 * Determine whether the given element is already in the operation's element
441 * @param op operation that should be tested for 'element_hash'
442 * @param element_hash hash of the element to look for
443 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
445 static struct KeyEntry *
446 op_get_element (struct Operation *op,
447 const struct GNUNET_HashCode *element_hash)
450 struct IBF_Key ibf_key;
451 struct GetElementContext ctx = {{{ 0 }} , 0};
453 ctx.hash = *element_hash;
455 ibf_key = get_ibf_key (element_hash);
456 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
457 (uint32_t) ibf_key.key_val,
458 op_get_element_iterator,
461 /* was the iteration aborted because we found the element? */
462 if (GNUNET_SYSERR == ret)
464 GNUNET_assert (NULL != ctx.k);
472 * Insert an element into the union operation's
473 * key-to-element mapping. Takes ownership of 'ee'.
474 * Note that this does not insert the element in the set,
475 * only in the operation's key-element mapping.
476 * This is done to speed up re-tried operations, if some elements
477 * were transmitted, and then the IBF fails to decode.
479 * XXX: clarify ownership, doesn't sound right.
481 * @param op the union operation
482 * @param ee the element entry
483 * @parem received was this element received from the remote peer?
486 op_register_element (struct Operation *op,
487 struct ElementEntry *ee,
490 struct IBF_Key ibf_key;
493 ibf_key = get_ibf_key (&ee->element_hash);
494 k = GNUNET_new (struct KeyEntry);
496 k->ibf_key = ibf_key;
497 k->received = received;
498 GNUNET_assert (GNUNET_OK ==
499 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
500 (uint32_t) ibf_key.key_val,
502 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
507 salt_key (const struct IBF_Key *k_in,
509 struct IBF_Key *k_out)
512 uint64_t x = k_in->key_val;
514 x = (x >> s) | (x << (64 - s));
520 unsalt_key (const struct IBF_Key *k_in,
522 struct IBF_Key *k_out)
525 uint64_t x = k_in->key_val;
526 x = (x << s) | (x >> (64 - s));
532 * Insert a key into an ibf.
536 * @param value the key entry to get the key from
539 prepare_ibf_iterator (void *cls,
543 struct Operation *op = cls;
544 struct KeyEntry *ke = value;
545 struct IBF_Key salted_key;
547 LOG (GNUNET_ERROR_TYPE_DEBUG,
548 "[OP %x] inserting %lx (hash %s) into ibf\n",
550 (unsigned long) ke->ibf_key.key_val,
551 GNUNET_h2s (&ke->element->element_hash));
552 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
553 ibf_insert (op->state->local_ibf, salted_key);
559 * Iterator for initializing the
560 * key-to-element mapping of a union operation
562 * @param cls the union operation `struct Operation *`
564 * @param value the `struct ElementEntry *` to insert
565 * into the key-to-element mapping
566 * @return #GNUNET_YES (to continue iterating)
569 init_key_to_element_iterator (void *cls,
570 const struct GNUNET_HashCode *key,
573 struct Operation *op = cls;
574 struct ElementEntry *ee = value;
576 /* make sure that the element belongs to the set at the time
577 * of creating the operation */
578 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
581 GNUNET_assert (GNUNET_NO == ee->remote);
583 op_register_element (op, ee, GNUNET_NO);
589 * Initialize the IBF key to element mapping local to this set
592 * @param op the set union operation
595 initialize_key_to_element (struct Operation *op)
599 GNUNET_assert (NULL == op->state->key_to_element);
600 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
607 * Create an ibf with the operation's elements
608 * of the specified size
610 * @param op the union operation
611 * @param size size of the ibf to create
612 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
615 prepare_ibf (struct Operation *op,
618 GNUNET_assert (NULL != op->state->key_to_element);
620 if (NULL != op->state->local_ibf)
621 ibf_destroy (op->state->local_ibf);
622 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
623 if (NULL == op->state->local_ibf)
625 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
626 "Failed to allocate local IBF\n");
627 return GNUNET_SYSERR;
629 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
630 &prepare_ibf_iterator,
637 * Send an ibf of appropriate size.
639 * Fragments the IBF into multiple messages if necessary.
641 * @param op the union operation
642 * @param ibf_order order of the ibf to send, size=2^order
643 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
646 send_ibf (struct Operation *op,
649 unsigned int buckets_sent = 0;
650 struct InvertibleBloomFilter *ibf;
653 prepare_ibf (op, 1<<ibf_order))
655 /* allocation failed */
656 return GNUNET_SYSERR;
659 LOG (GNUNET_ERROR_TYPE_DEBUG,
660 "sending ibf of size %u\n",
664 char name[64] = { 0 };
665 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
666 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
669 ibf = op->state->local_ibf;
671 while (buckets_sent < (1 << ibf_order))
673 unsigned int buckets_in_message;
674 struct GNUNET_MQ_Envelope *ev;
675 struct IBFMessage *msg;
677 buckets_in_message = (1 << ibf_order) - buckets_sent;
678 /* limit to maximum */
679 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
680 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
682 ev = GNUNET_MQ_msg_extra (msg,
683 buckets_in_message * IBF_BUCKET_SIZE,
684 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
687 msg->order = ibf_order;
688 msg->offset = htonl (buckets_sent);
689 msg->salt = htonl (op->state->salt_send);
690 ibf_write_slice (ibf, buckets_sent,
691 buckets_in_message, &msg[1]);
692 buckets_sent += buckets_in_message;
693 LOG (GNUNET_ERROR_TYPE_DEBUG,
694 "ibf chunk size %u, %u/%u sent\n",
698 GNUNET_MQ_send (op->mq, ev);
701 /* The other peer must decode the IBF, so
703 op->state->phase = PHASE_INVENTORY_PASSIVE;
709 * Send a strata estimator to the remote peer.
711 * @param op the union operation with the remote peer
714 send_strata_estimator (struct Operation *op)
716 const struct StrataEstimator *se = op->state->se;
717 struct GNUNET_MQ_Envelope *ev;
718 struct StrataEstimatorMessage *strata_msg;
723 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
724 len = strata_estimator_write (op->state->se,
726 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
727 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
729 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
730 ev = GNUNET_MQ_msg_extra (strata_msg,
733 GNUNET_memcpy (&strata_msg[1],
737 strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
738 GNUNET_MQ_send (op->mq,
740 op->state->phase = PHASE_EXPECT_IBF;
741 LOG (GNUNET_ERROR_TYPE_DEBUG,
742 "sent SE, expecting IBF\n");
747 * Compute the necessary order of an ibf
748 * from the size of the symmetric set difference.
750 * @param diff the difference
751 * @return the required size of the ibf
754 get_order_from_difference (unsigned int diff)
756 unsigned int ibf_order;
759 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
760 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
762 if (ibf_order > MAX_IBF_ORDER)
763 ibf_order = MAX_IBF_ORDER;
769 * Send a set element.
771 * @param cls the union operation `struct Operation *`
773 * @param value the `struct ElementEntry *` to insert
774 * into the key-to-element mapping
775 * @return #GNUNET_YES (to continue iterating)
778 send_element_iterator (void *cls,
779 const struct GNUNET_HashCode *key,
782 struct Operation *op = cls;
783 struct GNUNET_SET_ElementMessage *emsg;
784 struct ElementEntry *ee = value;
785 struct GNUNET_SET_Element *el = &ee->element;
786 struct GNUNET_MQ_Envelope *ev;
788 ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
789 emsg->element_type = htonl (el->element_type);
790 GNUNET_memcpy (&emsg[1], el->data, el->size);
791 GNUNET_MQ_send (op->mq, ev);
797 send_full_set (struct Operation *op)
799 struct GNUNET_MQ_Envelope *ev;
801 op->state->phase = PHASE_FULL_SENDING;
803 (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
804 &send_element_iterator, op);
805 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
806 GNUNET_MQ_send (op->mq, ev);
811 * Handle a strata estimator from a remote peer
813 * @param cls the union operation
814 * @param mh the message
815 * @param is_compressed #GNUNET_YES if the estimator is compressed
816 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
817 * #GNUNET_OK otherwise
820 handle_p2p_strata_estimator (void *cls,
821 const struct GNUNET_MessageHeader *mh,
824 struct Operation *op = cls;
825 struct StrataEstimator *remote_se;
826 struct StrataEstimatorMessage *msg = (void *) mh;
831 GNUNET_STATISTICS_update (_GSS_statistics,
832 "# bytes of SE received",
836 if (op->state->phase != PHASE_EXPECT_SE)
839 fail_union_operation (op);
840 return GNUNET_SYSERR;
842 len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
843 if ( (GNUNET_NO == is_compressed) &&
844 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
846 fail_union_operation (op);
848 return GNUNET_SYSERR;
850 other_size = GNUNET_ntohll (msg->set_size);
851 remote_se = strata_estimator_create (SE_STRATA_COUNT,
854 if (NULL == remote_se)
856 /* insufficient resources, fail */
857 fail_union_operation (op);
858 return GNUNET_SYSERR;
861 strata_estimator_read (&msg[1],
866 /* decompression failed */
867 fail_union_operation (op);
868 strata_estimator_destroy (remote_se);
869 return GNUNET_SYSERR;
871 GNUNET_assert (NULL != op->state->se);
872 diff = strata_estimator_difference (remote_se,
874 strata_estimator_destroy (remote_se);
875 strata_estimator_destroy (op->state->se);
876 op->state->se = NULL;
877 LOG (GNUNET_ERROR_TYPE_DEBUG,
878 "got se diff=%d, using ibf size %d\n",
880 1<<get_order_from_difference (diff));
882 if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
885 fail_union_operation (op);
886 return GNUNET_SYSERR;
890 if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2))
892 LOG (GNUNET_ERROR_TYPE_INFO,
893 "Sending full set (diff=%d, own set=%u)\n",
895 op->state->initial_size);
896 if (op->state->initial_size <= other_size)
902 struct GNUNET_MQ_Envelope *ev;
903 op->state->phase = PHASE_EXPECT_IBF;
904 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
905 GNUNET_MQ_send (op->mq, ev);
912 get_order_from_difference (diff)))
914 /* Internal error, best we can do is shut the connection */
915 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
916 "Failed to send IBF, closing connection\n");
917 fail_union_operation (op);
918 return GNUNET_SYSERR;
927 * Iterator to send elements to a remote peer
929 * @param cls closure with the element key and the union operation
931 * @param value the key entry
934 send_offers_iterator (void *cls,
938 struct SendElementClosure *sec = cls;
939 struct Operation *op = sec->op;
940 struct KeyEntry *ke = value;
941 struct GNUNET_MQ_Envelope *ev;
942 struct GNUNET_MessageHeader *mh;
944 /* Detect 32-bit key collision for the 64-bit IBF keys. */
945 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
948 ev = GNUNET_MQ_msg_header_extra (mh,
949 sizeof (struct GNUNET_HashCode),
950 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
952 GNUNET_assert (NULL != ev);
953 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
954 LOG (GNUNET_ERROR_TYPE_DEBUG,
955 "[OP %x] sending element offer (%s) to peer\n",
957 GNUNET_h2s (&ke->element->element_hash));
958 GNUNET_MQ_send (op->mq, ev);
964 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
966 * @param op union operation
967 * @param ibf_key IBF key of interest
970 send_offers_for_key (struct Operation *op,
971 struct IBF_Key ibf_key)
973 struct SendElementClosure send_cls;
975 send_cls.ibf_key = ibf_key;
977 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
978 (uint32_t) ibf_key.key_val,
979 &send_offers_iterator,
985 * Decode which elements are missing on each side, and
986 * send the appropriate offers and inquiries.
988 * @param op union operation
989 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
992 decode_and_send (struct Operation *op)
995 struct IBF_Key last_key;
997 unsigned int num_decoded;
998 struct InvertibleBloomFilter *diff_ibf;
1000 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1003 prepare_ibf (op, op->state->remote_ibf->size))
1006 /* allocation failed */
1007 return GNUNET_SYSERR;
1009 diff_ibf = ibf_dup (op->state->local_ibf);
1010 ibf_subtract (diff_ibf, op->state->remote_ibf);
1012 ibf_destroy (op->state->remote_ibf);
1013 op->state->remote_ibf = NULL;
1015 LOG (GNUNET_ERROR_TYPE_DEBUG,
1016 "decoding IBF (size=%u)\n",
1020 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1025 int cycle_detected = GNUNET_NO;
1029 res = ibf_decode (diff_ibf, &side, &key);
1030 if (res == GNUNET_OK)
1032 LOG (GNUNET_ERROR_TYPE_DEBUG,
1033 "decoded ibf key %lx\n",
1034 (unsigned long) key.key_val);
1036 if ( (num_decoded > diff_ibf->size) ||
1037 ( (num_decoded > 1) &&
1038 (last_key.key_val == key.key_val) ) )
1040 LOG (GNUNET_ERROR_TYPE_DEBUG,
1041 "detected cyclic ibf (decoded %u/%u)\n",
1044 cycle_detected = GNUNET_YES;
1047 if ( (GNUNET_SYSERR == res) ||
1048 (GNUNET_YES == cycle_detected) )
1052 while (1<<next_order < diff_ibf->size)
1055 if (next_order <= MAX_IBF_ORDER)
1057 LOG (GNUNET_ERROR_TYPE_DEBUG,
1058 "decoding failed, sending larger ibf (size %u)\n",
1060 GNUNET_STATISTICS_update (_GSS_statistics,
1064 op->state->salt_send++;
1066 send_ibf (op, next_order))
1068 /* Internal error, best we can do is shut the connection */
1069 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1070 "Failed to send IBF, closing connection\n");
1071 fail_union_operation (op);
1072 ibf_destroy (diff_ibf);
1073 return GNUNET_SYSERR;
1078 GNUNET_STATISTICS_update (_GSS_statistics,
1079 "# of failed union operations (too large)",
1082 // XXX: Send the whole set, element-by-element
1083 LOG (GNUNET_ERROR_TYPE_ERROR,
1084 "set union failed: reached ibf limit\n");
1085 fail_union_operation (op);
1086 ibf_destroy (diff_ibf);
1087 return GNUNET_SYSERR;
1091 if (GNUNET_NO == res)
1093 struct GNUNET_MQ_Envelope *ev;
1095 LOG (GNUNET_ERROR_TYPE_DEBUG,
1096 "transmitted all values, sending DONE\n");
1097 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1098 GNUNET_MQ_send (op->mq, ev);
1099 /* We now wait until we get a DONE message back
1100 * and then wait for our MQ to be flushed and all our
1101 * demands be delivered. */
1106 struct IBF_Key unsalted_key;
1107 unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1108 send_offers_for_key (op, unsalted_key);
1110 else if (-1 == side)
1112 struct GNUNET_MQ_Envelope *ev;
1113 struct InquiryMessage *msg;
1115 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1116 * the effort additional complexity. */
1117 ev = GNUNET_MQ_msg_extra (msg,
1118 sizeof (struct IBF_Key),
1119 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1120 msg->salt = htonl (op->state->salt_receive);
1121 GNUNET_memcpy (&msg[1],
1123 sizeof (struct IBF_Key));
1124 LOG (GNUNET_ERROR_TYPE_DEBUG,
1125 "sending element inquiry for IBF key %lx\n",
1126 (unsigned long) key.key_val);
1127 GNUNET_MQ_send (op->mq, ev);
1134 ibf_destroy (diff_ibf);
1140 * Handle an IBF message from a remote peer.
1142 * Reassemble the IBF from multiple pieces, and
1143 * process the whole IBF once possible.
1145 * @param cls the union operation
1146 * @param mh the header of the message
1147 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1148 * #GNUNET_OK otherwise
1151 handle_p2p_ibf (void *cls,
1152 const struct GNUNET_MessageHeader *mh)
1154 struct Operation *op = cls;
1155 const struct IBFMessage *msg;
1156 unsigned int buckets_in_message;
1158 if (ntohs (mh->size) < sizeof (struct IBFMessage))
1160 GNUNET_break_op (0);
1161 fail_union_operation (op);
1162 return GNUNET_SYSERR;
1164 msg = (const struct IBFMessage *) mh;
1165 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1166 (op->state->phase == PHASE_EXPECT_IBF) )
1168 op->state->phase = PHASE_EXPECT_IBF_CONT;
1169 GNUNET_assert (NULL == op->state->remote_ibf);
1170 LOG (GNUNET_ERROR_TYPE_DEBUG,
1171 "Creating new ibf of size %u\n",
1173 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1174 op->state->salt_receive = ntohl (msg->salt);
1175 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1176 if (NULL == op->state->remote_ibf)
1178 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1179 "Failed to parse remote IBF, closing connection\n");
1180 fail_union_operation (op);
1181 return GNUNET_SYSERR;
1183 op->state->ibf_buckets_received = 0;
1184 if (0 != ntohl (msg->offset))
1186 GNUNET_break_op (0);
1187 fail_union_operation (op);
1188 return GNUNET_SYSERR;
1191 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1193 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1195 GNUNET_break_op (0);
1196 fail_union_operation (op);
1197 return GNUNET_SYSERR;
1199 if (1<<msg->order != op->state->remote_ibf->size)
1201 GNUNET_break_op (0);
1202 fail_union_operation (op);
1203 return GNUNET_SYSERR;
1205 if (ntohl (msg->salt) != op->state->salt_receive)
1207 GNUNET_break_op (0);
1208 fail_union_operation (op);
1209 return GNUNET_SYSERR;
1217 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1219 if (0 == buckets_in_message)
1221 GNUNET_break_op (0);
1222 fail_union_operation (op);
1223 return GNUNET_SYSERR;
1226 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1228 GNUNET_break_op (0);
1229 fail_union_operation (op);
1230 return GNUNET_SYSERR;
1233 GNUNET_assert (NULL != op->state->remote_ibf);
1235 ibf_read_slice (&msg[1],
1236 op->state->ibf_buckets_received,
1238 op->state->remote_ibf);
1239 op->state->ibf_buckets_received += buckets_in_message;
1241 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1243 LOG (GNUNET_ERROR_TYPE_DEBUG,
1244 "received full ibf\n");
1245 op->state->phase = PHASE_INVENTORY_ACTIVE;
1247 decode_and_send (op))
1249 /* Internal error, best we can do is shut down */
1250 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1251 "Failed to decode IBF, closing connection\n");
1252 return GNUNET_SYSERR;
1260 * Send a result message to the client indicating
1261 * that there is a new element.
1263 * @param op union operation
1264 * @param element element to send
1265 * @param status status to send with the new element
1268 send_client_element (struct Operation *op,
1269 struct GNUNET_SET_Element *element,
1272 struct GNUNET_MQ_Envelope *ev;
1273 struct GNUNET_SET_ResultMessage *rm;
1275 LOG (GNUNET_ERROR_TYPE_DEBUG,
1276 "sending element (size %u) to client\n",
1278 GNUNET_assert (0 != op->spec->client_request_id);
1279 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1282 GNUNET_MQ_discard (ev);
1286 rm->result_status = htons (status);
1287 rm->request_id = htonl (op->spec->client_request_id);
1288 rm->element_type = htons (element->element_type);
1289 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1290 GNUNET_memcpy (&rm[1], element->data, element->size);
1291 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1296 * Signal to the client that the operation has finished and
1297 * destroy the operation.
1299 * @param cls operation to destroy
1302 send_done_and_destroy (void *cls)
1304 struct Operation *op = cls;
1305 struct GNUNET_MQ_Envelope *ev;
1306 struct GNUNET_SET_ResultMessage *rm;
1308 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1309 rm->request_id = htonl (op->spec->client_request_id);
1310 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1311 rm->element_type = htons (0);
1312 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1313 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1314 /* Will also call the union-specific cancel function. */
1315 _GSS_operation_destroy (op, GNUNET_YES);
1320 maybe_finish (struct Operation *op)
1322 unsigned int num_demanded;
1324 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1326 if (PHASE_FINISH_WAITING == op->state->phase)
1328 LOG (GNUNET_ERROR_TYPE_DEBUG,
1329 "In PHASE_FINISH_WAITING, pending %u demands\n",
1331 if (0 == num_demanded)
1333 struct GNUNET_MQ_Envelope *ev;
1335 op->state->phase = PHASE_DONE;
1336 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1337 GNUNET_MQ_send (op->mq, ev);
1339 /* We now wait until the other peer closes the channel
1340 * after it got all elements from us. */
1343 if (PHASE_FINISH_CLOSING == op->state->phase)
1345 LOG (GNUNET_ERROR_TYPE_DEBUG,
1346 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1348 if (0 == num_demanded)
1350 op->state->phase = PHASE_DONE;
1351 send_done_and_destroy (op);
1358 * Handle an element message from a remote peer.
1359 * Sent by the other peer either because we decoded an IBF and placed a demand,
1360 * or because the other peer switched to full set transmission.
1362 * @param cls the union operation
1363 * @param mh the message
1366 handle_p2p_elements (void *cls,
1367 const struct GNUNET_MessageHeader *mh)
1369 struct Operation *op = cls;
1370 struct ElementEntry *ee;
1371 const struct GNUNET_SET_ElementMessage *emsg;
1372 uint16_t element_size;
1374 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1376 GNUNET_break_op (0);
1377 fail_union_operation (op);
1380 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1382 GNUNET_break_op (0);
1383 fail_union_operation (op);
1387 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1389 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1390 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1391 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1392 ee->element.size = element_size;
1393 ee->element.data = &ee[1];
1394 ee->element.element_type = ntohs (emsg->element_type);
1395 ee->remote = GNUNET_YES;
1396 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1399 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1403 /* We got something we didn't demand, since it's not in our map. */
1404 GNUNET_break_op (0);
1406 fail_union_operation (op);
1410 LOG (GNUNET_ERROR_TYPE_DEBUG,
1411 "Got element (size %u, hash %s) from peer\n",
1412 (unsigned int) element_size,
1413 GNUNET_h2s (&ee->element_hash));
1415 GNUNET_STATISTICS_update (_GSS_statistics,
1416 "# received elements",
1419 GNUNET_STATISTICS_update (_GSS_statistics,
1420 "# exchanged elements",
1424 op->state->received_total += 1;
1426 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1430 /* Got repeated element. Should not happen since
1431 * we track demands. */
1432 GNUNET_STATISTICS_update (_GSS_statistics,
1433 "# repeated elements",
1436 ke->received = GNUNET_YES;
1441 LOG (GNUNET_ERROR_TYPE_DEBUG,
1442 "Registering new element from remote peer\n");
1443 op->state->received_fresh += 1;
1444 op_register_element (op, ee, GNUNET_YES);
1445 /* only send results immediately if the client wants it */
1446 switch (op->spec->result_mode)
1448 case GNUNET_SET_RESULT_ADDED:
1449 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1451 case GNUNET_SET_RESULT_SYMMETRIC:
1452 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1455 /* Result mode not supported, should have been caught earlier. */
1461 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1463 /* The other peer gave us lots of old elements, there's something wrong. */
1464 GNUNET_break_op (0);
1465 fail_union_operation (op);
1474 * Handle an element message from a remote peer.
1476 * @param cls the union operation
1477 * @param mh the message
1480 handle_p2p_full_element (void *cls,
1481 const struct GNUNET_MessageHeader *mh)
1483 struct Operation *op = cls;
1484 struct ElementEntry *ee;
1485 const struct GNUNET_SET_ElementMessage *emsg;
1486 uint16_t element_size;
1488 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1490 GNUNET_break_op (0);
1491 fail_union_operation (op);
1495 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1497 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1498 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1499 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1500 ee->element.size = element_size;
1501 ee->element.data = &ee[1];
1502 ee->element.element_type = ntohs (emsg->element_type);
1503 ee->remote = GNUNET_YES;
1504 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1506 LOG (GNUNET_ERROR_TYPE_DEBUG,
1507 "Got element (full diff, size %u, hash %s) from peer\n",
1508 (unsigned int) element_size,
1509 GNUNET_h2s (&ee->element_hash));
1511 GNUNET_STATISTICS_update (_GSS_statistics,
1512 "# received elements",
1515 GNUNET_STATISTICS_update (_GSS_statistics,
1516 "# exchanged elements",
1520 op->state->received_total += 1;
1522 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1526 /* Got repeated element. Should not happen since
1527 * we track demands. */
1528 GNUNET_STATISTICS_update (_GSS_statistics,
1529 "# repeated elements",
1532 ke->received = GNUNET_YES;
1537 LOG (GNUNET_ERROR_TYPE_DEBUG,
1538 "Registering new element from remote peer\n");
1539 op->state->received_fresh += 1;
1540 op_register_element (op, ee, GNUNET_YES);
1541 /* only send results immediately if the client wants it */
1542 switch (op->spec->result_mode)
1544 case GNUNET_SET_RESULT_ADDED:
1545 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1547 case GNUNET_SET_RESULT_SYMMETRIC:
1548 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1551 /* Result mode not supported, should have been caught earlier. */
1557 if ( (GNUNET_YES == op->spec->byzantine) &&
1558 (op->state->received_total > 8) &&
1559 (op->state->received_fresh < op->state->received_total / 3) )
1561 /* The other peer gave us lots of old elements, there's something wrong. */
1562 GNUNET_break_op (0);
1563 fail_union_operation (op);
1569 * Send offers (for GNUNET_Hash-es) in response
1570 * to inquiries (for IBF_Key-s).
1572 * @param cls the union operation
1573 * @param mh the message
1576 handle_p2p_inquiry (void *cls,
1577 const struct GNUNET_MessageHeader *mh)
1579 struct Operation *op = cls;
1580 const struct IBF_Key *ibf_key;
1581 unsigned int num_keys;
1582 struct InquiryMessage *msg;
1584 /* look up elements and send them */
1585 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1587 GNUNET_break_op (0);
1588 fail_union_operation (op);
1591 num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1592 / sizeof (struct IBF_Key);
1593 if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1594 != num_keys * sizeof (struct IBF_Key))
1596 GNUNET_break_op (0);
1597 fail_union_operation (op);
1601 msg = (struct InquiryMessage *) mh;
1603 ibf_key = (const struct IBF_Key *) &msg[1];
1604 while (0 != num_keys--)
1606 struct IBF_Key unsalted_key;
1607 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1608 send_offers_for_key (op, unsalted_key);
1615 * Iterator over hash map entries, called to
1616 * destroy the linked list of colliding ibf key entries.
1618 * @param cls closure
1619 * @param key current key code
1620 * @param value value in the hash map
1621 * @return #GNUNET_YES if we should continue to iterate,
1622 * #GNUNET_NO if not.
1625 send_missing_elements_iter (void *cls,
1629 struct Operation *op = cls;
1630 struct KeyEntry *ke = value;
1631 struct GNUNET_MQ_Envelope *ev;
1632 struct GNUNET_SET_ElementMessage *emsg;
1633 struct ElementEntry *ee = ke->element;
1635 if (GNUNET_YES == ke->received)
1638 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1639 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1640 emsg->reserved = htons (0);
1641 emsg->element_type = htons (ee->element.element_type);
1642 GNUNET_MQ_send (op->mq, ev);
1651 * @parem cls closure, a set union operation
1652 * @param mh the demand message
1655 handle_p2p_request_full (void *cls,
1656 const struct GNUNET_MessageHeader *mh)
1658 struct Operation *op = cls;
1660 if (PHASE_EXPECT_IBF != op->state->phase)
1662 fail_union_operation (op);
1663 GNUNET_break_op (0);
1667 // FIXME: we need to check that our set is larger than the
1668 // byzantine_lower_bound by some threshold
1674 * Handle a "full done" message.
1676 * @parem cls closure, a set union operation
1677 * @param mh the demand message
1680 handle_p2p_full_done (void *cls,
1681 const struct GNUNET_MessageHeader *mh)
1683 struct Operation *op = cls;
1685 if (PHASE_EXPECT_IBF == op->state->phase)
1687 struct GNUNET_MQ_Envelope *ev;
1689 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1691 /* send all the elements that did not come from the remote peer */
1692 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1693 &send_missing_elements_iter,
1696 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1697 GNUNET_MQ_send (op->mq, ev);
1698 op->state->phase = PHASE_DONE;
1700 /* we now wait until the other peer shuts the tunnel down*/
1702 else if (PHASE_FULL_SENDING == op->state->phase)
1704 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1705 /* We sent the full set, and got the response for that. We're done. */
1706 op->state->phase = PHASE_DONE;
1707 send_done_and_destroy (op);
1711 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1712 GNUNET_break_op (0);
1713 fail_union_operation (op);
1720 * Handle a demand by the other peer for elements based on a list
1721 * of GNUNET_HashCode-s.
1723 * @parem cls closure, a set union operation
1724 * @param mh the demand message
1727 handle_p2p_demand (void *cls,
1728 const struct GNUNET_MessageHeader *mh)
1730 struct Operation *op = cls;
1731 struct ElementEntry *ee;
1732 struct GNUNET_SET_ElementMessage *emsg;
1733 const struct GNUNET_HashCode *hash;
1734 unsigned int num_hashes;
1735 struct GNUNET_MQ_Envelope *ev;
1737 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1738 / sizeof (struct GNUNET_HashCode);
1739 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1740 != num_hashes * sizeof (struct GNUNET_HashCode))
1742 GNUNET_break_op (0);
1743 fail_union_operation (op);
1747 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1749 hash++, num_hashes--)
1751 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1754 /* Demand for non-existing element. */
1755 GNUNET_break_op (0);
1756 fail_union_operation (op);
1759 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1761 /* Probably confused lazily copied sets. */
1762 GNUNET_break_op (0);
1763 fail_union_operation (op);
1766 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1767 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1768 emsg->reserved = htons (0);
1769 emsg->element_type = htons (ee->element.element_type);
1770 LOG (GNUNET_ERROR_TYPE_DEBUG,
1771 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1773 (unsigned int) ee->element.size,
1774 GNUNET_h2s (&ee->element_hash));
1775 GNUNET_MQ_send (op->mq, ev);
1776 GNUNET_STATISTICS_update (_GSS_statistics,
1777 "# exchanged elements",
1781 switch (op->spec->result_mode)
1783 case GNUNET_SET_RESULT_ADDED:
1784 /* Nothing to do. */
1786 case GNUNET_SET_RESULT_SYMMETRIC:
1787 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1790 /* Result mode not supported, should have been caught earlier. */
1799 * Handle offers (of GNUNET_HashCode-s) and
1800 * respond with demands (of GNUNET_HashCode-s).
1802 * @param cls the union operation
1803 * @param mh the message
1806 handle_p2p_offer (void *cls,
1807 const struct GNUNET_MessageHeader *mh)
1809 struct Operation *op = cls;
1810 const struct GNUNET_HashCode *hash;
1811 unsigned int num_hashes;
1813 /* look up elements and send them */
1814 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1815 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1817 GNUNET_break_op (0);
1818 fail_union_operation (op);
1821 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1822 / sizeof (struct GNUNET_HashCode);
1823 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1824 != num_hashes * sizeof (struct GNUNET_HashCode))
1826 GNUNET_break_op (0);
1827 fail_union_operation (op);
1831 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1833 hash++, num_hashes--)
1835 struct ElementEntry *ee;
1836 struct GNUNET_MessageHeader *demands;
1837 struct GNUNET_MQ_Envelope *ev;
1839 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1842 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1846 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1849 LOG (GNUNET_ERROR_TYPE_DEBUG,
1850 "Skipped sending duplicate demand\n");
1854 GNUNET_assert (GNUNET_OK ==
1855 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1858 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1860 LOG (GNUNET_ERROR_TYPE_DEBUG,
1861 "[OP %x] Requesting element (hash %s)\n",
1862 (void *) op, GNUNET_h2s (hash));
1863 ev = GNUNET_MQ_msg_header_extra (demands,
1864 sizeof (struct GNUNET_HashCode),
1865 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1866 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1867 GNUNET_MQ_send (op->mq, ev);
1873 * Handle a done message from a remote peer
1875 * @param cls the union operation
1876 * @param mh the message
1879 handle_p2p_done (void *cls,
1880 const struct GNUNET_MessageHeader *mh)
1882 struct Operation *op = cls;
1884 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1886 /* We got all requests, but still have to send our elements in response. */
1888 op->state->phase = PHASE_FINISH_WAITING;
1890 LOG (GNUNET_ERROR_TYPE_DEBUG,
1891 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1892 /* The active peer is done sending offers
1893 * and inquiries. This means that all
1894 * our responses to that (demands and offers)
1895 * must be in flight (queued or in mesh).
1897 * We should notify the active peer once
1898 * all our demands are satisfied, so that the active
1899 * peer can quit if we gave him everything.
1904 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1906 LOG (GNUNET_ERROR_TYPE_DEBUG,
1907 "got DONE (as active partner), waiting to finish\n");
1908 /* All demands of the other peer are satisfied,
1909 * and we processed all offers, thus we know
1910 * exactly what our demands must be.
1912 * We'll close the channel
1913 * to the other peer once our demands are met.
1915 op->state->phase = PHASE_FINISH_CLOSING;
1919 GNUNET_break_op (0);
1920 fail_union_operation (op);
1925 * Initiate operation to evaluate a set union with a remote peer.
1927 * @param op operation to perform (to be initialized)
1928 * @param opaque_context message to be transmitted to the listener
1929 * to convince him to accept, may be NULL
1932 union_evaluate (struct Operation *op,
1933 const struct GNUNET_MessageHeader *opaque_context)
1935 struct GNUNET_MQ_Envelope *ev;
1936 struct OperationRequestMessage *msg;
1938 GNUNET_assert (NULL == op->state);
1939 op->state = GNUNET_new (struct OperationState);
1940 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1941 /* copy the current generation's strata estimator for this operation */
1942 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1943 /* we started the operation, thus we have to send the operation request */
1944 op->state->phase = PHASE_EXPECT_SE;
1945 op->state->salt_receive = op->state->salt_send = 42;
1946 LOG (GNUNET_ERROR_TYPE_DEBUG,
1947 "Initiating union operation evaluation\n");
1948 GNUNET_STATISTICS_update (_GSS_statistics,
1949 "# of total union operations",
1952 GNUNET_STATISTICS_update (_GSS_statistics,
1953 "# of initiated union operations",
1956 ev = GNUNET_MQ_msg_nested_mh (msg,
1957 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1961 /* the context message is too large */
1963 GNUNET_SERVICE_client_drop (op->spec->set->client);
1966 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1967 GNUNET_MQ_send (op->mq,
1970 if (NULL != opaque_context)
1971 LOG (GNUNET_ERROR_TYPE_DEBUG,
1972 "sent op request with context message\n");
1974 LOG (GNUNET_ERROR_TYPE_DEBUG,
1975 "sent op request without context message\n");
1977 initialize_key_to_element (op);
1978 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
1983 * Accept an union operation request from a remote peer.
1984 * Only initializes the private operation state.
1986 * @param op operation that will be accepted as a union operation
1989 union_accept (struct Operation *op)
1991 LOG (GNUNET_ERROR_TYPE_DEBUG,
1992 "accepting set union operation\n");
1993 GNUNET_assert (NULL == op->state);
1995 GNUNET_STATISTICS_update (_GSS_statistics,
1996 "# of accepted union operations",
1999 GNUNET_STATISTICS_update (_GSS_statistics,
2000 "# of total union operations",
2004 op->state = GNUNET_new (struct OperationState);
2005 op->state->se = strata_estimator_dup (op->spec->set->state->se);
2006 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2007 op->state->salt_receive = op->state->salt_send = 42;
2008 initialize_key_to_element (op);
2009 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2010 /* kick off the operation */
2011 send_strata_estimator (op);
2016 * Create a new set supporting the union operation
2018 * We maintain one strata estimator per set and then manipulate it over the
2019 * lifetime of the set, as recreating a strata estimator would be expensive.
2021 * @return the newly created set, NULL on error
2023 static struct SetState *
2024 union_set_create (void)
2026 struct SetState *set_state;
2028 LOG (GNUNET_ERROR_TYPE_DEBUG,
2029 "union set created\n");
2030 set_state = GNUNET_new (struct SetState);
2031 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2032 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2033 if (NULL == set_state->se)
2035 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2036 "Failed to allocate strata estimator\n");
2037 GNUNET_free (set_state);
2045 * Add the element from the given element message to the set.
2047 * @param set_state state of the set want to add to
2048 * @param ee the element to add to the set
2051 union_add (struct SetState *set_state, struct ElementEntry *ee)
2053 strata_estimator_insert (set_state->se,
2054 get_ibf_key (&ee->element_hash));
2059 * Remove the element given in the element message from the set.
2060 * Only marks the element as removed, so that older set operations can still exchange it.
2062 * @param set_state state of the set to remove from
2063 * @param ee set element to remove
2066 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2068 strata_estimator_remove (set_state->se,
2069 get_ibf_key (&ee->element_hash));
2074 * Destroy a set that supports the union operation.
2076 * @param set_state the set to destroy
2079 union_set_destroy (struct SetState *set_state)
2081 if (NULL != set_state->se)
2083 strata_estimator_destroy (set_state->se);
2084 set_state->se = NULL;
2086 GNUNET_free (set_state);
2091 * Dispatch messages for a union operation.
2093 * @param op the state of the union evaluate operation
2094 * @param mh the received message
2095 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2096 * #GNUNET_OK otherwise
2099 union_handle_p2p_message (struct Operation *op,
2100 const struct GNUNET_MessageHeader *mh)
2102 //LOG (GNUNET_ERROR_TYPE_DEBUG,
2103 // "received p2p message (t: %u, s: %u)\n",
2104 // ntohs (mh->type),
2105 // ntohs (mh->size));
2106 switch (ntohs (mh->type))
2108 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2109 return handle_p2p_ibf (op, mh);
2110 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2111 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2112 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2113 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2114 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2115 handle_p2p_elements (op, mh);
2117 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2118 handle_p2p_full_element (op, mh);
2120 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2121 handle_p2p_inquiry (op, mh);
2123 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2124 handle_p2p_done (op, mh);
2126 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2127 handle_p2p_offer (op, mh);
2129 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2130 handle_p2p_demand (op, mh);
2132 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2133 handle_p2p_full_done (op, mh);
2135 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2136 handle_p2p_request_full (op, mh);
2139 /* Something wrong with cadet's message handlers? */
2147 * Handler for peer-disconnects, notifies the client
2148 * about the aborted operation in case the op was not concluded.
2150 * @param op the destroyed operation
2153 union_peer_disconnect (struct Operation *op)
2155 if (PHASE_DONE != op->state->phase)
2157 struct GNUNET_MQ_Envelope *ev;
2158 struct GNUNET_SET_ResultMessage *msg;
2160 ev = GNUNET_MQ_msg (msg,
2161 GNUNET_MESSAGE_TYPE_SET_RESULT);
2162 msg->request_id = htonl (op->spec->client_request_id);
2163 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2164 msg->element_type = htons (0);
2165 GNUNET_MQ_send (op->spec->set->client_mq,
2167 LOG (GNUNET_ERROR_TYPE_WARNING,
2168 "other peer disconnected prematurely, phase %u\n",
2170 _GSS_operation_destroy (op,
2174 // else: the session has already been concluded
2175 LOG (GNUNET_ERROR_TYPE_DEBUG,
2176 "other peer disconnected (finished)\n");
2177 if (GNUNET_NO == op->state->client_done_sent)
2178 send_done_and_destroy (op);
2183 * Copy union-specific set state.
2185 * @param set source set for copying the union state
2186 * @return a copy of the union-specific set state
2188 static struct SetState *
2189 union_copy_state (struct Set *set)
2191 struct SetState *new_state;
2193 new_state = GNUNET_new (struct SetState);
2194 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2195 new_state->se = strata_estimator_dup (set->state->se);
2202 * Get the table with implementing functions for
2205 * @return the operation specific VTable
2207 const struct SetVT *
2210 static const struct SetVT union_vt = {
2211 .create = &union_set_create,
2212 .msg_handler = &union_handle_p2p_message,
2214 .remove = &union_remove,
2215 .destroy_set = &union_set_destroy,
2216 .evaluate = &union_evaluate,
2217 .accept = &union_accept,
2218 .peer_disconnect = &union_peer_disconnect,
2219 .cancel = &union_op_cancel,
2220 .copy_state = &union_copy_state,