2 This file is part of GNUnet
3 Copyright (C) 2013-2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
23 * @brief two-peer set operations
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
40 * Number of IBFs in a strata estimator.
42 #define SE_STRATA_COUNT 32
45 * Size of the IBFs in the strata estimator.
47 #define SE_IBF_SIZE 80
50 * The hash num parameter for the difference digests and strata estimators.
52 #define SE_IBF_HASH_NUM 4
55 * Number of buckets that can be transmitted in one message.
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
60 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61 * Choose this value so that computing the IBF is still cheaper
62 * than transmitting all values.
64 #define MAX_IBF_ORDER (20)
67 * Number of buckets used in the ibf per estimated
74 * Current phase we are in for a union operation.
76 enum UnionOperationPhase
79 * We sent the request message, and expect a strata estimator.
84 * We sent the strata estimator, and expect an IBF. This phase is entered once
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87 * XXX: could use better wording.
88 * XXX: repurposed to also expect a "request full set" message, should be renamed
90 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
95 * Continuation for multi part IBFs.
97 PHASE_EXPECT_IBF_CONT,
100 * We are decoding an IBF.
102 PHASE_INVENTORY_ACTIVE,
105 * The other peer is decoding the IBF we just sent.
107 PHASE_INVENTORY_PASSIVE,
110 * The protocol is almost finished, but we still have to flush our message
111 * queue and/or expect some elements.
113 PHASE_FINISH_CLOSING,
116 * In the penultimate phase,
117 * we wait until all our demands
118 * are satisfied. Then we send a done
119 * message, and wait for another done message.
121 PHASE_FINISH_WAITING,
124 * In the ultimate phase, we wait until
125 * our demands are satisfied and then
126 * quit (sending another DONE message).
131 * After sending the full set, wait for responses with the elements
132 * that the local peer is missing.
139 * State of an evaluate operation with another peer.
141 struct OperationState
144 * Copy of the set's strata estimator at the time of
145 * creation of this operation.
147 struct StrataEstimator *se;
150 * The IBF we currently receive.
152 struct InvertibleBloomFilter *remote_ibf;
155 * The IBF with the local set's element.
157 struct InvertibleBloomFilter *local_ibf;
160 * Maps unsalted IBF-Keys to elements.
161 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
162 * Colliding IBF-Keys are linked.
164 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
167 * Current state of the operation.
169 enum UnionOperationPhase phase;
172 * Did we send the client that we are done?
174 int client_done_sent;
177 * Number of ibf buckets already received into the @a remote_ibf.
179 unsigned int ibf_buckets_received;
182 * Hashes for elements that we have demanded from the other peer.
184 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
187 * Salt that we're using for sending IBFs
192 * Salt for the IBF we've received and that we're currently decoding.
194 uint32_t salt_receive;
197 * Number of elements we received from the other peer
198 * that were not in the local set yet.
200 uint32_t received_fresh;
203 * Total number of elements received from the other peer.
205 uint32_t received_total;
208 * Initial size of our set, just before
209 * the operation started.
211 uint64_t initial_size;
216 * The key entry is used to associate an ibf key with an element.
221 * IBF key for the entry, derived from the current salt.
223 struct IBF_Key ibf_key;
226 * The actual element associated with the key.
228 * Only owned by the union operation if element->operation
231 struct ElementEntry *element;
234 * Did we receive this element?
235 * Even if element->is_foreign is false, we might
236 * have received the element, so this indicates that
237 * the other peer has it.
244 * Used as a closure for sending elements
245 * with a specific IBF key.
247 struct SendElementClosure
250 * The IBF key whose matching elements should be
253 struct IBF_Key ibf_key;
256 * Operation for which the elements
259 struct Operation *op;
264 * Extra state required for efficient set union.
269 * The strata estimator is only generated once for
271 * The IBF keys are derived from the element hashes with
274 struct StrataEstimator *se;
279 * Iterator over hash map entries, called to
280 * destroy the linked list of colliding ibf key entries.
283 * @param key current key code
284 * @param value value in the hash map
285 * @return #GNUNET_YES if we should continue to iterate,
289 destroy_key_to_element_iter (void *cls,
293 struct KeyEntry *k = value;
295 GNUNET_assert (NULL != k);
296 if (GNUNET_YES == k->element->remote)
298 GNUNET_free (k->element);
307 * Destroy the union operation. Only things specific to the union
308 * operation are destroyed.
310 * @param op union operation to destroy
313 union_op_cancel (struct Operation *op)
315 LOG (GNUNET_ERROR_TYPE_DEBUG,
316 "destroying union op\n");
317 /* check if the op was canceled twice */
318 GNUNET_assert (NULL != op->state);
319 if (NULL != op->state->remote_ibf)
321 ibf_destroy (op->state->remote_ibf);
322 op->state->remote_ibf = NULL;
324 if (NULL != op->state->demanded_hashes)
326 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
327 op->state->demanded_hashes = NULL;
329 if (NULL != op->state->local_ibf)
331 ibf_destroy (op->state->local_ibf);
332 op->state->local_ibf = NULL;
334 if (NULL != op->state->se)
336 strata_estimator_destroy (op->state->se);
337 op->state->se = NULL;
339 if (NULL != op->state->key_to_element)
341 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
342 &destroy_key_to_element_iter,
344 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
345 op->state->key_to_element = NULL;
347 GNUNET_free (op->state);
349 LOG (GNUNET_ERROR_TYPE_DEBUG,
350 "destroying union op done\n");
355 * Inform the client that the union operation has failed,
356 * and proceed to destroy the evaluate operation.
358 * @param op the union operation to fail
361 fail_union_operation (struct Operation *op)
363 struct GNUNET_MQ_Envelope *ev;
364 struct GNUNET_SET_ResultMessage *msg;
366 LOG (GNUNET_ERROR_TYPE_ERROR,
367 "union operation failed\n");
368 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
369 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
370 msg->request_id = htonl (op->spec->client_request_id);
371 msg->element_type = htons (0);
372 GNUNET_MQ_send (op->spec->set->client_mq, ev);
373 _GSS_operation_destroy (op, GNUNET_YES);
378 * Derive the IBF key from a hash code and
381 * @param src the hash code
382 * @return the derived IBF key
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
390 GNUNET_CRYPTO_kdf (&key, sizeof (key),
392 &salt, sizeof (salt),
399 * Context for #op_get_element_iterator
401 struct GetElementContext
403 struct GNUNET_HashCode hash;
409 * Iterator over the mapping from IBF keys to element entries. Checks if we
410 * have an element with a given GNUNET_HashCode.
413 * @param key current key code
414 * @param value value in the hash map
415 * @return #GNUNET_YES if we should search further,
416 * #GNUNET_NO if we've found the element.
419 op_get_element_iterator (void *cls,
423 struct GetElementContext *ctx = cls;
424 struct KeyEntry *k = value;
426 GNUNET_assert (NULL != k);
427 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
438 * Determine whether the given element is already in the operation's element
441 * @param op operation that should be tested for 'element_hash'
442 * @param element_hash hash of the element to look for
443 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
445 static struct KeyEntry *
446 op_get_element (struct Operation *op,
447 const struct GNUNET_HashCode *element_hash)
450 struct IBF_Key ibf_key;
451 struct GetElementContext ctx = {{{ 0 }} , 0};
453 ctx.hash = *element_hash;
455 ibf_key = get_ibf_key (element_hash);
456 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
457 (uint32_t) ibf_key.key_val,
458 op_get_element_iterator,
461 /* was the iteration aborted because we found the element? */
462 if (GNUNET_SYSERR == ret)
464 GNUNET_assert (NULL != ctx.k);
472 * Insert an element into the union operation's
473 * key-to-element mapping. Takes ownership of 'ee'.
474 * Note that this does not insert the element in the set,
475 * only in the operation's key-element mapping.
476 * This is done to speed up re-tried operations, if some elements
477 * were transmitted, and then the IBF fails to decode.
479 * XXX: clarify ownership, doesn't sound right.
481 * @param op the union operation
482 * @param ee the element entry
483 * @parem received was this element received from the remote peer?
486 op_register_element (struct Operation *op,
487 struct ElementEntry *ee,
490 struct IBF_Key ibf_key;
493 ibf_key = get_ibf_key (&ee->element_hash);
494 k = GNUNET_new (struct KeyEntry);
496 k->ibf_key = ibf_key;
497 k->received = received;
498 GNUNET_assert (GNUNET_OK ==
499 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
500 (uint32_t) ibf_key.key_val,
502 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
507 salt_key (const struct IBF_Key *k_in,
509 struct IBF_Key *k_out)
512 uint64_t x = k_in->key_val;
514 x = (x >> s) | (x << (64 - s));
520 unsalt_key (const struct IBF_Key *k_in,
522 struct IBF_Key *k_out)
525 uint64_t x = k_in->key_val;
526 x = (x << s) | (x >> (64 - s));
532 * Insert a key into an ibf.
536 * @param value the key entry to get the key from
539 prepare_ibf_iterator (void *cls,
543 struct Operation *op = cls;
544 struct KeyEntry *ke = value;
545 struct IBF_Key salted_key;
547 LOG (GNUNET_ERROR_TYPE_DEBUG,
548 "[OP %x] inserting %lx (hash %s) into ibf\n",
550 (unsigned long) ke->ibf_key.key_val,
551 GNUNET_h2s (&ke->element->element_hash));
552 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
553 ibf_insert (op->state->local_ibf, salted_key);
559 * Iterator for initializing the
560 * key-to-element mapping of a union operation
562 * @param cls the union operation `struct Operation *`
564 * @param value the `struct ElementEntry *` to insert
565 * into the key-to-element mapping
566 * @return #GNUNET_YES (to continue iterating)
569 init_key_to_element_iterator (void *cls,
570 const struct GNUNET_HashCode *key,
573 struct Operation *op = cls;
574 struct ElementEntry *ee = value;
576 /* make sure that the element belongs to the set at the time
577 * of creating the operation */
578 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
581 GNUNET_assert (GNUNET_NO == ee->remote);
583 op_register_element (op, ee, GNUNET_NO);
589 * Initialize the IBF key to element mapping local to this set
592 * @param op the set union operation
595 initialize_key_to_element (struct Operation *op)
599 GNUNET_assert (NULL == op->state->key_to_element);
600 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
607 * Create an ibf with the operation's elements
608 * of the specified size
610 * @param op the union operation
611 * @param size size of the ibf to create
612 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
615 prepare_ibf (struct Operation *op,
618 GNUNET_assert (NULL != op->state->key_to_element);
620 if (NULL != op->state->local_ibf)
621 ibf_destroy (op->state->local_ibf);
622 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
623 if (NULL == op->state->local_ibf)
625 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
626 "Failed to allocate local IBF\n");
627 return GNUNET_SYSERR;
629 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
630 &prepare_ibf_iterator,
637 * Send an ibf of appropriate size.
639 * Fragments the IBF into multiple messages if necessary.
641 * @param op the union operation
642 * @param ibf_order order of the ibf to send, size=2^order
643 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
646 send_ibf (struct Operation *op,
649 unsigned int buckets_sent = 0;
650 struct InvertibleBloomFilter *ibf;
653 prepare_ibf (op, 1<<ibf_order))
655 /* allocation failed */
656 return GNUNET_SYSERR;
659 LOG (GNUNET_ERROR_TYPE_DEBUG,
660 "sending ibf of size %u\n",
664 char name[64] = { 0 };
665 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
666 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
669 ibf = op->state->local_ibf;
671 while (buckets_sent < (1 << ibf_order))
673 unsigned int buckets_in_message;
674 struct GNUNET_MQ_Envelope *ev;
675 struct IBFMessage *msg;
677 buckets_in_message = (1 << ibf_order) - buckets_sent;
678 /* limit to maximum */
679 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
680 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
682 ev = GNUNET_MQ_msg_extra (msg,
683 buckets_in_message * IBF_BUCKET_SIZE,
684 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
687 msg->order = ibf_order;
688 msg->offset = htonl (buckets_sent);
689 msg->salt = htonl (op->state->salt_send);
690 ibf_write_slice (ibf, buckets_sent,
691 buckets_in_message, &msg[1]);
692 buckets_sent += buckets_in_message;
693 LOG (GNUNET_ERROR_TYPE_DEBUG,
694 "ibf chunk size %u, %u/%u sent\n",
698 GNUNET_MQ_send (op->mq, ev);
701 /* The other peer must decode the IBF, so
703 op->state->phase = PHASE_INVENTORY_PASSIVE;
709 * Send a strata estimator to the remote peer.
711 * @param op the union operation with the remote peer
714 send_strata_estimator (struct Operation *op)
716 const struct StrataEstimator *se = op->state->se;
717 struct GNUNET_MQ_Envelope *ev;
718 struct StrataEstimatorMessage *strata_msg;
723 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
724 len = strata_estimator_write (op->state->se,
726 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
727 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
729 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
730 ev = GNUNET_MQ_msg_extra (strata_msg,
733 GNUNET_memcpy (&strata_msg[1],
737 strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
738 GNUNET_MQ_send (op->mq,
740 op->state->phase = PHASE_EXPECT_IBF;
741 LOG (GNUNET_ERROR_TYPE_DEBUG,
742 "sent SE, expecting IBF\n");
747 * Compute the necessary order of an ibf
748 * from the size of the symmetric set difference.
750 * @param diff the difference
751 * @return the required size of the ibf
754 get_order_from_difference (unsigned int diff)
756 unsigned int ibf_order;
759 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
760 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
762 if (ibf_order > MAX_IBF_ORDER)
763 ibf_order = MAX_IBF_ORDER;
764 // add one for correction
765 return ibf_order + 1;
770 * Send a set element.
772 * @param cls the union operation `struct Operation *`
774 * @param value the `struct ElementEntry *` to insert
775 * into the key-to-element mapping
776 * @return #GNUNET_YES (to continue iterating)
779 send_element_iterator (void *cls,
780 const struct GNUNET_HashCode *key,
783 struct Operation *op = cls;
784 struct GNUNET_SET_ElementMessage *emsg;
785 struct ElementEntry *ee = value;
786 struct GNUNET_SET_Element *el = &ee->element;
787 struct GNUNET_MQ_Envelope *ev;
790 ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
791 emsg->element_type = htons (el->element_type);
792 GNUNET_memcpy (&emsg[1], el->data, el->size);
793 GNUNET_MQ_send (op->mq, ev);
799 send_full_set (struct Operation *op)
801 struct GNUNET_MQ_Envelope *ev;
803 op->state->phase = PHASE_FULL_SENDING;
805 (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
806 &send_element_iterator, op);
807 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
808 GNUNET_MQ_send (op->mq, ev);
813 * Handle a strata estimator from a remote peer
815 * @param cls the union operation
816 * @param mh the message
817 * @param is_compressed #GNUNET_YES if the estimator is compressed
818 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
819 * #GNUNET_OK otherwise
822 handle_p2p_strata_estimator (void *cls,
823 const struct GNUNET_MessageHeader *mh,
826 struct Operation *op = cls;
827 struct StrataEstimator *remote_se;
828 struct StrataEstimatorMessage *msg = (void *) mh;
833 GNUNET_STATISTICS_update (_GSS_statistics,
834 "# bytes of SE received",
838 if (op->state->phase != PHASE_EXPECT_SE)
841 fail_union_operation (op);
842 return GNUNET_SYSERR;
844 len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
845 if ( (GNUNET_NO == is_compressed) &&
846 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
848 fail_union_operation (op);
850 return GNUNET_SYSERR;
852 other_size = GNUNET_ntohll (msg->set_size);
853 remote_se = strata_estimator_create (SE_STRATA_COUNT,
856 if (NULL == remote_se)
858 /* insufficient resources, fail */
859 fail_union_operation (op);
860 return GNUNET_SYSERR;
863 strata_estimator_read (&msg[1],
868 /* decompression failed */
869 fail_union_operation (op);
870 strata_estimator_destroy (remote_se);
871 return GNUNET_SYSERR;
873 GNUNET_assert (NULL != op->state->se);
874 diff = strata_estimator_difference (remote_se,
880 strata_estimator_destroy (remote_se);
881 strata_estimator_destroy (op->state->se);
882 op->state->se = NULL;
883 LOG (GNUNET_ERROR_TYPE_DEBUG,
884 "got se diff=%d, using ibf size %d\n",
886 1<<get_order_from_difference (diff));
888 if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
891 fail_union_operation (op);
892 return GNUNET_SYSERR;
896 if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4))
898 LOG (GNUNET_ERROR_TYPE_INFO,
899 "Sending full set (diff=%d, own set=%u)\n",
901 op->state->initial_size);
902 GNUNET_STATISTICS_update (_GSS_statistics,
906 if (op->state->initial_size <= other_size)
912 struct GNUNET_MQ_Envelope *ev;
913 op->state->phase = PHASE_EXPECT_IBF;
914 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
915 GNUNET_MQ_send (op->mq, ev);
920 GNUNET_STATISTICS_update (_GSS_statistics,
926 get_order_from_difference (diff)))
928 /* Internal error, best we can do is shut the connection */
929 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
930 "Failed to send IBF, closing connection\n");
931 fail_union_operation (op);
932 return GNUNET_SYSERR;
941 * Iterator to send elements to a remote peer
943 * @param cls closure with the element key and the union operation
945 * @param value the key entry
948 send_offers_iterator (void *cls,
952 struct SendElementClosure *sec = cls;
953 struct Operation *op = sec->op;
954 struct KeyEntry *ke = value;
955 struct GNUNET_MQ_Envelope *ev;
956 struct GNUNET_MessageHeader *mh;
958 /* Detect 32-bit key collision for the 64-bit IBF keys. */
959 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
962 ev = GNUNET_MQ_msg_header_extra (mh,
963 sizeof (struct GNUNET_HashCode),
964 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
966 GNUNET_assert (NULL != ev);
967 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
968 LOG (GNUNET_ERROR_TYPE_DEBUG,
969 "[OP %x] sending element offer (%s) to peer\n",
971 GNUNET_h2s (&ke->element->element_hash));
972 GNUNET_MQ_send (op->mq, ev);
978 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
980 * @param op union operation
981 * @param ibf_key IBF key of interest
984 send_offers_for_key (struct Operation *op,
985 struct IBF_Key ibf_key)
987 struct SendElementClosure send_cls;
989 send_cls.ibf_key = ibf_key;
991 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
992 (uint32_t) ibf_key.key_val,
993 &send_offers_iterator,
999 * Decode which elements are missing on each side, and
1000 * send the appropriate offers and inquiries.
1002 * @param op union operation
1003 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1006 decode_and_send (struct Operation *op)
1009 struct IBF_Key last_key;
1011 unsigned int num_decoded;
1012 struct InvertibleBloomFilter *diff_ibf;
1014 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1017 prepare_ibf (op, op->state->remote_ibf->size))
1020 /* allocation failed */
1021 return GNUNET_SYSERR;
1023 diff_ibf = ibf_dup (op->state->local_ibf);
1024 ibf_subtract (diff_ibf, op->state->remote_ibf);
1026 ibf_destroy (op->state->remote_ibf);
1027 op->state->remote_ibf = NULL;
1029 LOG (GNUNET_ERROR_TYPE_DEBUG,
1030 "decoding IBF (size=%u)\n",
1034 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1039 int cycle_detected = GNUNET_NO;
1043 res = ibf_decode (diff_ibf, &side, &key);
1044 if (res == GNUNET_OK)
1046 LOG (GNUNET_ERROR_TYPE_DEBUG,
1047 "decoded ibf key %lx\n",
1048 (unsigned long) key.key_val);
1050 if ( (num_decoded > diff_ibf->size) ||
1051 ( (num_decoded > 1) &&
1052 (last_key.key_val == key.key_val) ) )
1054 LOG (GNUNET_ERROR_TYPE_DEBUG,
1055 "detected cyclic ibf (decoded %u/%u)\n",
1058 cycle_detected = GNUNET_YES;
1061 if ( (GNUNET_SYSERR == res) ||
1062 (GNUNET_YES == cycle_detected) )
1066 while (1<<next_order < diff_ibf->size)
1069 if (next_order <= MAX_IBF_ORDER)
1071 LOG (GNUNET_ERROR_TYPE_DEBUG,
1072 "decoding failed, sending larger ibf (size %u)\n",
1074 GNUNET_STATISTICS_update (_GSS_statistics,
1078 op->state->salt_send++;
1080 send_ibf (op, next_order))
1082 /* Internal error, best we can do is shut the connection */
1083 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1084 "Failed to send IBF, closing connection\n");
1085 fail_union_operation (op);
1086 ibf_destroy (diff_ibf);
1087 return GNUNET_SYSERR;
1092 GNUNET_STATISTICS_update (_GSS_statistics,
1093 "# of failed union operations (too large)",
1096 // XXX: Send the whole set, element-by-element
1097 LOG (GNUNET_ERROR_TYPE_ERROR,
1098 "set union failed: reached ibf limit\n");
1099 fail_union_operation (op);
1100 ibf_destroy (diff_ibf);
1101 return GNUNET_SYSERR;
1105 if (GNUNET_NO == res)
1107 struct GNUNET_MQ_Envelope *ev;
1109 LOG (GNUNET_ERROR_TYPE_DEBUG,
1110 "transmitted all values, sending DONE\n");
1111 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1112 GNUNET_MQ_send (op->mq, ev);
1113 /* We now wait until we get a DONE message back
1114 * and then wait for our MQ to be flushed and all our
1115 * demands be delivered. */
1120 struct IBF_Key unsalted_key;
1121 unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1122 send_offers_for_key (op, unsalted_key);
1124 else if (-1 == side)
1126 struct GNUNET_MQ_Envelope *ev;
1127 struct InquiryMessage *msg;
1129 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1130 * the effort additional complexity. */
1131 ev = GNUNET_MQ_msg_extra (msg,
1132 sizeof (struct IBF_Key),
1133 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1134 msg->salt = htonl (op->state->salt_receive);
1135 GNUNET_memcpy (&msg[1],
1137 sizeof (struct IBF_Key));
1138 LOG (GNUNET_ERROR_TYPE_DEBUG,
1139 "sending element inquiry for IBF key %lx\n",
1140 (unsigned long) key.key_val);
1141 GNUNET_MQ_send (op->mq, ev);
1148 ibf_destroy (diff_ibf);
1154 * Handle an IBF message from a remote peer.
1156 * Reassemble the IBF from multiple pieces, and
1157 * process the whole IBF once possible.
1159 * @param cls the union operation
1160 * @param mh the header of the message
1161 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1162 * #GNUNET_OK otherwise
1165 handle_p2p_ibf (void *cls,
1166 const struct GNUNET_MessageHeader *mh)
1168 struct Operation *op = cls;
1169 const struct IBFMessage *msg;
1170 unsigned int buckets_in_message;
1172 if (ntohs (mh->size) < sizeof (struct IBFMessage))
1174 GNUNET_break_op (0);
1175 fail_union_operation (op);
1176 return GNUNET_SYSERR;
1178 msg = (const struct IBFMessage *) mh;
1179 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1180 (op->state->phase == PHASE_EXPECT_IBF) )
1182 op->state->phase = PHASE_EXPECT_IBF_CONT;
1183 GNUNET_assert (NULL == op->state->remote_ibf);
1184 LOG (GNUNET_ERROR_TYPE_DEBUG,
1185 "Creating new ibf of size %u\n",
1187 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1188 op->state->salt_receive = ntohl (msg->salt);
1189 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1190 if (NULL == op->state->remote_ibf)
1192 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1193 "Failed to parse remote IBF, closing connection\n");
1194 fail_union_operation (op);
1195 return GNUNET_SYSERR;
1197 op->state->ibf_buckets_received = 0;
1198 if (0 != ntohl (msg->offset))
1200 GNUNET_break_op (0);
1201 fail_union_operation (op);
1202 return GNUNET_SYSERR;
1205 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1207 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1209 GNUNET_break_op (0);
1210 fail_union_operation (op);
1211 return GNUNET_SYSERR;
1213 if (1<<msg->order != op->state->remote_ibf->size)
1215 GNUNET_break_op (0);
1216 fail_union_operation (op);
1217 return GNUNET_SYSERR;
1219 if (ntohl (msg->salt) != op->state->salt_receive)
1221 GNUNET_break_op (0);
1222 fail_union_operation (op);
1223 return GNUNET_SYSERR;
1231 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1233 if (0 == buckets_in_message)
1235 GNUNET_break_op (0);
1236 fail_union_operation (op);
1237 return GNUNET_SYSERR;
1240 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1242 GNUNET_break_op (0);
1243 fail_union_operation (op);
1244 return GNUNET_SYSERR;
1247 GNUNET_assert (NULL != op->state->remote_ibf);
1249 ibf_read_slice (&msg[1],
1250 op->state->ibf_buckets_received,
1252 op->state->remote_ibf);
1253 op->state->ibf_buckets_received += buckets_in_message;
1255 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1257 LOG (GNUNET_ERROR_TYPE_DEBUG,
1258 "received full ibf\n");
1259 op->state->phase = PHASE_INVENTORY_ACTIVE;
1261 decode_and_send (op))
1263 /* Internal error, best we can do is shut down */
1264 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1265 "Failed to decode IBF, closing connection\n");
1266 return GNUNET_SYSERR;
1274 * Send a result message to the client indicating
1275 * that there is a new element.
1277 * @param op union operation
1278 * @param element element to send
1279 * @param status status to send with the new element
1282 send_client_element (struct Operation *op,
1283 struct GNUNET_SET_Element *element,
1286 struct GNUNET_MQ_Envelope *ev;
1287 struct GNUNET_SET_ResultMessage *rm;
1289 LOG (GNUNET_ERROR_TYPE_DEBUG,
1290 "sending element (size %u) to client\n",
1292 GNUNET_assert (0 != op->spec->client_request_id);
1293 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1296 GNUNET_MQ_discard (ev);
1300 rm->result_status = htons (status);
1301 rm->request_id = htonl (op->spec->client_request_id);
1302 rm->element_type = htons (element->element_type);
1303 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1304 GNUNET_memcpy (&rm[1], element->data, element->size);
1305 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1310 * Signal to the client that the operation has finished and
1311 * destroy the operation.
1313 * @param cls operation to destroy
1316 send_done_and_destroy (void *cls)
1318 struct Operation *op = cls;
1319 struct GNUNET_MQ_Envelope *ev;
1320 struct GNUNET_SET_ResultMessage *rm;
1322 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1323 rm->request_id = htonl (op->spec->client_request_id);
1324 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1325 rm->element_type = htons (0);
1326 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1327 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1328 /* Will also call the union-specific cancel function. */
1329 _GSS_operation_destroy (op, GNUNET_YES);
1334 maybe_finish (struct Operation *op)
1336 unsigned int num_demanded;
1338 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1340 if (PHASE_FINISH_WAITING == op->state->phase)
1342 LOG (GNUNET_ERROR_TYPE_DEBUG,
1343 "In PHASE_FINISH_WAITING, pending %u demands\n",
1345 if (0 == num_demanded)
1347 struct GNUNET_MQ_Envelope *ev;
1349 op->state->phase = PHASE_DONE;
1350 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1351 GNUNET_MQ_send (op->mq, ev);
1353 /* We now wait until the other peer closes the channel
1354 * after it got all elements from us. */
1357 if (PHASE_FINISH_CLOSING == op->state->phase)
1359 LOG (GNUNET_ERROR_TYPE_DEBUG,
1360 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1362 if (0 == num_demanded)
1364 op->state->phase = PHASE_DONE;
1365 send_done_and_destroy (op);
1372 * Handle an element message from a remote peer.
1373 * Sent by the other peer either because we decoded an IBF and placed a demand,
1374 * or because the other peer switched to full set transmission.
1376 * @param cls the union operation
1377 * @param mh the message
1380 handle_p2p_elements (void *cls,
1381 const struct GNUNET_MessageHeader *mh)
1383 struct Operation *op = cls;
1384 struct ElementEntry *ee;
1385 const struct GNUNET_SET_ElementMessage *emsg;
1386 uint16_t element_size;
1388 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1390 GNUNET_break_op (0);
1391 fail_union_operation (op);
1394 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1396 GNUNET_break_op (0);
1397 fail_union_operation (op);
1401 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1403 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1404 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1405 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1406 ee->element.size = element_size;
1407 ee->element.data = &ee[1];
1408 ee->element.element_type = ntohs (emsg->element_type);
1409 ee->remote = GNUNET_YES;
1410 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1413 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1417 /* We got something we didn't demand, since it's not in our map. */
1418 GNUNET_break_op (0);
1420 fail_union_operation (op);
1424 LOG (GNUNET_ERROR_TYPE_DEBUG,
1425 "Got element (size %u, hash %s) from peer\n",
1426 (unsigned int) element_size,
1427 GNUNET_h2s (&ee->element_hash));
1429 GNUNET_STATISTICS_update (_GSS_statistics,
1430 "# received elements",
1433 GNUNET_STATISTICS_update (_GSS_statistics,
1434 "# exchanged elements",
1438 op->state->received_total += 1;
1440 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1444 /* Got repeated element. Should not happen since
1445 * we track demands. */
1446 GNUNET_STATISTICS_update (_GSS_statistics,
1447 "# repeated elements",
1450 ke->received = GNUNET_YES;
1455 LOG (GNUNET_ERROR_TYPE_DEBUG,
1456 "Registering new element from remote peer\n");
1457 op->state->received_fresh += 1;
1458 op_register_element (op, ee, GNUNET_YES);
1459 /* only send results immediately if the client wants it */
1460 switch (op->spec->result_mode)
1462 case GNUNET_SET_RESULT_ADDED:
1463 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1465 case GNUNET_SET_RESULT_SYMMETRIC:
1466 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1469 /* Result mode not supported, should have been caught earlier. */
1475 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1477 /* The other peer gave us lots of old elements, there's something wrong. */
1478 GNUNET_break_op (0);
1479 fail_union_operation (op);
1488 * Handle an element message from a remote peer.
1490 * @param cls the union operation
1491 * @param mh the message
1494 handle_p2p_full_element (void *cls,
1495 const struct GNUNET_MessageHeader *mh)
1497 struct Operation *op = cls;
1498 struct ElementEntry *ee;
1499 const struct GNUNET_SET_ElementMessage *emsg;
1500 uint16_t element_size;
1502 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1504 GNUNET_break_op (0);
1505 fail_union_operation (op);
1509 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1511 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1512 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1513 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1514 ee->element.size = element_size;
1515 ee->element.data = &ee[1];
1516 ee->element.element_type = ntohs (emsg->element_type);
1517 ee->remote = GNUNET_YES;
1518 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1520 LOG (GNUNET_ERROR_TYPE_DEBUG,
1521 "Got element (full diff, size %u, hash %s) from peer\n",
1522 (unsigned int) element_size,
1523 GNUNET_h2s (&ee->element_hash));
1525 GNUNET_STATISTICS_update (_GSS_statistics,
1526 "# received elements",
1529 GNUNET_STATISTICS_update (_GSS_statistics,
1530 "# exchanged elements",
1534 op->state->received_total += 1;
1536 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1540 /* Got repeated element. Should not happen since
1541 * we track demands. */
1542 GNUNET_STATISTICS_update (_GSS_statistics,
1543 "# repeated elements",
1546 ke->received = GNUNET_YES;
1551 LOG (GNUNET_ERROR_TYPE_DEBUG,
1552 "Registering new element from remote peer\n");
1553 op->state->received_fresh += 1;
1554 op_register_element (op, ee, GNUNET_YES);
1555 /* only send results immediately if the client wants it */
1556 switch (op->spec->result_mode)
1558 case GNUNET_SET_RESULT_ADDED:
1559 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1561 case GNUNET_SET_RESULT_SYMMETRIC:
1562 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1565 /* Result mode not supported, should have been caught earlier. */
1571 if ( (GNUNET_YES == op->spec->byzantine) &&
1572 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1573 (op->state->received_fresh < op->state->received_total / 6) )
1575 /* The other peer gave us lots of old elements, there's something wrong. */
1576 LOG (GNUNET_ERROR_TYPE_ERROR,
1577 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1578 (unsigned long long) op->state->received_fresh,
1579 (unsigned long long) op->state->received_total);
1580 GNUNET_break_op (0);
1581 fail_union_operation (op);
1587 * Send offers (for GNUNET_Hash-es) in response
1588 * to inquiries (for IBF_Key-s).
1590 * @param cls the union operation
1591 * @param mh the message
1594 handle_p2p_inquiry (void *cls,
1595 const struct GNUNET_MessageHeader *mh)
1597 struct Operation *op = cls;
1598 const struct IBF_Key *ibf_key;
1599 unsigned int num_keys;
1600 struct InquiryMessage *msg;
1602 /* look up elements and send them */
1603 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1605 GNUNET_break_op (0);
1606 fail_union_operation (op);
1609 num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1610 / sizeof (struct IBF_Key);
1611 if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1612 != num_keys * sizeof (struct IBF_Key))
1614 GNUNET_break_op (0);
1615 fail_union_operation (op);
1619 msg = (struct InquiryMessage *) mh;
1621 ibf_key = (const struct IBF_Key *) &msg[1];
1622 while (0 != num_keys--)
1624 struct IBF_Key unsalted_key;
1625 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1626 send_offers_for_key (op, unsalted_key);
1633 * Iterator over hash map entries, called to
1634 * destroy the linked list of colliding ibf key entries.
1636 * @param cls closure
1637 * @param key current key code
1638 * @param value value in the hash map
1639 * @return #GNUNET_YES if we should continue to iterate,
1640 * #GNUNET_NO if not.
1643 send_missing_elements_iter (void *cls,
1647 struct Operation *op = cls;
1648 struct KeyEntry *ke = value;
1649 struct GNUNET_MQ_Envelope *ev;
1650 struct GNUNET_SET_ElementMessage *emsg;
1651 struct ElementEntry *ee = ke->element;
1653 if (GNUNET_YES == ke->received)
1656 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1657 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1658 emsg->reserved = htons (0);
1659 emsg->element_type = htons (ee->element.element_type);
1660 GNUNET_MQ_send (op->mq, ev);
1669 * @parem cls closure, a set union operation
1670 * @param mh the demand message
1673 handle_p2p_request_full (void *cls,
1674 const struct GNUNET_MessageHeader *mh)
1676 struct Operation *op = cls;
1678 if (PHASE_EXPECT_IBF != op->state->phase)
1680 fail_union_operation (op);
1681 GNUNET_break_op (0);
1685 // FIXME: we need to check that our set is larger than the
1686 // byzantine_lower_bound by some threshold
1692 * Handle a "full done" message.
1694 * @parem cls closure, a set union operation
1695 * @param mh the demand message
1698 handle_p2p_full_done (void *cls,
1699 const struct GNUNET_MessageHeader *mh)
1701 struct Operation *op = cls;
1703 if (PHASE_EXPECT_IBF == op->state->phase)
1705 struct GNUNET_MQ_Envelope *ev;
1707 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1709 /* send all the elements that did not come from the remote peer */
1710 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1711 &send_missing_elements_iter,
1714 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1715 GNUNET_MQ_send (op->mq, ev);
1716 op->state->phase = PHASE_DONE;
1718 /* we now wait until the other peer shuts the tunnel down*/
1720 else if (PHASE_FULL_SENDING == op->state->phase)
1722 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1723 /* We sent the full set, and got the response for that. We're done. */
1724 op->state->phase = PHASE_DONE;
1725 send_done_and_destroy (op);
1729 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1730 GNUNET_break_op (0);
1731 fail_union_operation (op);
1738 * Handle a demand by the other peer for elements based on a list
1739 * of GNUNET_HashCode-s.
1741 * @parem cls closure, a set union operation
1742 * @param mh the demand message
1745 handle_p2p_demand (void *cls,
1746 const struct GNUNET_MessageHeader *mh)
1748 struct Operation *op = cls;
1749 struct ElementEntry *ee;
1750 struct GNUNET_SET_ElementMessage *emsg;
1751 const struct GNUNET_HashCode *hash;
1752 unsigned int num_hashes;
1753 struct GNUNET_MQ_Envelope *ev;
1755 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1756 / sizeof (struct GNUNET_HashCode);
1757 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1758 != num_hashes * sizeof (struct GNUNET_HashCode))
1760 GNUNET_break_op (0);
1761 fail_union_operation (op);
1765 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1767 hash++, num_hashes--)
1769 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1772 /* Demand for non-existing element. */
1773 GNUNET_break_op (0);
1774 fail_union_operation (op);
1777 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1779 /* Probably confused lazily copied sets. */
1780 GNUNET_break_op (0);
1781 fail_union_operation (op);
1784 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1785 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1786 emsg->reserved = htons (0);
1787 emsg->element_type = htons (ee->element.element_type);
1788 LOG (GNUNET_ERROR_TYPE_DEBUG,
1789 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1791 (unsigned int) ee->element.size,
1792 GNUNET_h2s (&ee->element_hash));
1793 GNUNET_MQ_send (op->mq, ev);
1794 GNUNET_STATISTICS_update (_GSS_statistics,
1795 "# exchanged elements",
1799 switch (op->spec->result_mode)
1801 case GNUNET_SET_RESULT_ADDED:
1802 /* Nothing to do. */
1804 case GNUNET_SET_RESULT_SYMMETRIC:
1805 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1808 /* Result mode not supported, should have been caught earlier. */
1817 * Handle offers (of GNUNET_HashCode-s) and
1818 * respond with demands (of GNUNET_HashCode-s).
1820 * @param cls the union operation
1821 * @param mh the message
1824 handle_p2p_offer (void *cls,
1825 const struct GNUNET_MessageHeader *mh)
1827 struct Operation *op = cls;
1828 const struct GNUNET_HashCode *hash;
1829 unsigned int num_hashes;
1831 /* look up elements and send them */
1832 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1833 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1835 GNUNET_break_op (0);
1836 fail_union_operation (op);
1839 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1840 / sizeof (struct GNUNET_HashCode);
1841 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1842 != num_hashes * sizeof (struct GNUNET_HashCode))
1844 GNUNET_break_op (0);
1845 fail_union_operation (op);
1849 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1851 hash++, num_hashes--)
1853 struct ElementEntry *ee;
1854 struct GNUNET_MessageHeader *demands;
1855 struct GNUNET_MQ_Envelope *ev;
1857 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1860 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1864 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1867 LOG (GNUNET_ERROR_TYPE_DEBUG,
1868 "Skipped sending duplicate demand\n");
1872 GNUNET_assert (GNUNET_OK ==
1873 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1876 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1878 LOG (GNUNET_ERROR_TYPE_DEBUG,
1879 "[OP %x] Requesting element (hash %s)\n",
1880 (void *) op, GNUNET_h2s (hash));
1881 ev = GNUNET_MQ_msg_header_extra (demands,
1882 sizeof (struct GNUNET_HashCode),
1883 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1884 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1885 GNUNET_MQ_send (op->mq, ev);
1891 * Handle a done message from a remote peer
1893 * @param cls the union operation
1894 * @param mh the message
1897 handle_p2p_done (void *cls,
1898 const struct GNUNET_MessageHeader *mh)
1900 struct Operation *op = cls;
1902 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1904 /* We got all requests, but still have to send our elements in response. */
1906 op->state->phase = PHASE_FINISH_WAITING;
1908 LOG (GNUNET_ERROR_TYPE_DEBUG,
1909 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1910 /* The active peer is done sending offers
1911 * and inquiries. This means that all
1912 * our responses to that (demands and offers)
1913 * must be in flight (queued or in mesh).
1915 * We should notify the active peer once
1916 * all our demands are satisfied, so that the active
1917 * peer can quit if we gave him everything.
1922 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1924 LOG (GNUNET_ERROR_TYPE_DEBUG,
1925 "got DONE (as active partner), waiting to finish\n");
1926 /* All demands of the other peer are satisfied,
1927 * and we processed all offers, thus we know
1928 * exactly what our demands must be.
1930 * We'll close the channel
1931 * to the other peer once our demands are met.
1933 op->state->phase = PHASE_FINISH_CLOSING;
1937 GNUNET_break_op (0);
1938 fail_union_operation (op);
1943 * Initiate operation to evaluate a set union with a remote peer.
1945 * @param op operation to perform (to be initialized)
1946 * @param opaque_context message to be transmitted to the listener
1947 * to convince him to accept, may be NULL
1950 union_evaluate (struct Operation *op,
1951 const struct GNUNET_MessageHeader *opaque_context)
1953 struct GNUNET_MQ_Envelope *ev;
1954 struct OperationRequestMessage *msg;
1956 GNUNET_assert (NULL == op->state);
1957 op->state = GNUNET_new (struct OperationState);
1958 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1959 /* copy the current generation's strata estimator for this operation */
1960 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1961 /* we started the operation, thus we have to send the operation request */
1962 op->state->phase = PHASE_EXPECT_SE;
1963 op->state->salt_receive = op->state->salt_send = 42;
1964 LOG (GNUNET_ERROR_TYPE_DEBUG,
1965 "Initiating union operation evaluation\n");
1966 GNUNET_STATISTICS_update (_GSS_statistics,
1967 "# of total union operations",
1970 GNUNET_STATISTICS_update (_GSS_statistics,
1971 "# of initiated union operations",
1974 ev = GNUNET_MQ_msg_nested_mh (msg,
1975 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1979 /* the context message is too large */
1981 GNUNET_SERVICE_client_drop (op->spec->set->client);
1984 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1985 GNUNET_MQ_send (op->mq,
1988 if (NULL != opaque_context)
1989 LOG (GNUNET_ERROR_TYPE_DEBUG,
1990 "sent op request with context message\n");
1992 LOG (GNUNET_ERROR_TYPE_DEBUG,
1993 "sent op request without context message\n");
1995 initialize_key_to_element (op);
1996 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2001 * Accept an union operation request from a remote peer.
2002 * Only initializes the private operation state.
2004 * @param op operation that will be accepted as a union operation
2007 union_accept (struct Operation *op)
2009 LOG (GNUNET_ERROR_TYPE_DEBUG,
2010 "accepting set union operation\n");
2011 GNUNET_assert (NULL == op->state);
2013 GNUNET_STATISTICS_update (_GSS_statistics,
2014 "# of accepted union operations",
2017 GNUNET_STATISTICS_update (_GSS_statistics,
2018 "# of total union operations",
2022 op->state = GNUNET_new (struct OperationState);
2023 op->state->se = strata_estimator_dup (op->spec->set->state->se);
2024 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2025 op->state->salt_receive = op->state->salt_send = 42;
2026 initialize_key_to_element (op);
2027 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2028 /* kick off the operation */
2029 send_strata_estimator (op);
2034 * Create a new set supporting the union operation
2036 * We maintain one strata estimator per set and then manipulate it over the
2037 * lifetime of the set, as recreating a strata estimator would be expensive.
2039 * @return the newly created set, NULL on error
2041 static struct SetState *
2042 union_set_create (void)
2044 struct SetState *set_state;
2046 LOG (GNUNET_ERROR_TYPE_DEBUG,
2047 "union set created\n");
2048 set_state = GNUNET_new (struct SetState);
2049 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2050 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2051 if (NULL == set_state->se)
2053 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2054 "Failed to allocate strata estimator\n");
2055 GNUNET_free (set_state);
2063 * Add the element from the given element message to the set.
2065 * @param set_state state of the set want to add to
2066 * @param ee the element to add to the set
2069 union_add (struct SetState *set_state, struct ElementEntry *ee)
2071 strata_estimator_insert (set_state->se,
2072 get_ibf_key (&ee->element_hash));
2077 * Remove the element given in the element message from the set.
2078 * Only marks the element as removed, so that older set operations can still exchange it.
2080 * @param set_state state of the set to remove from
2081 * @param ee set element to remove
2084 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2086 strata_estimator_remove (set_state->se,
2087 get_ibf_key (&ee->element_hash));
2092 * Destroy a set that supports the union operation.
2094 * @param set_state the set to destroy
2097 union_set_destroy (struct SetState *set_state)
2099 if (NULL != set_state->se)
2101 strata_estimator_destroy (set_state->se);
2102 set_state->se = NULL;
2104 GNUNET_free (set_state);
2109 * Dispatch messages for a union operation.
2111 * @param op the state of the union evaluate operation
2112 * @param mh the received message
2113 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2114 * #GNUNET_OK otherwise
2117 union_handle_p2p_message (struct Operation *op,
2118 const struct GNUNET_MessageHeader *mh)
2120 //LOG (GNUNET_ERROR_TYPE_DEBUG,
2121 // "received p2p message (t: %u, s: %u)\n",
2122 // ntohs (mh->type),
2123 // ntohs (mh->size));
2124 switch (ntohs (mh->type))
2126 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2127 return handle_p2p_ibf (op, mh);
2128 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2129 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2130 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2131 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2132 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2133 handle_p2p_elements (op, mh);
2135 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2136 handle_p2p_full_element (op, mh);
2138 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2139 handle_p2p_inquiry (op, mh);
2141 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2142 handle_p2p_done (op, mh);
2144 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2145 handle_p2p_offer (op, mh);
2147 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2148 handle_p2p_demand (op, mh);
2150 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2151 handle_p2p_full_done (op, mh);
2153 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2154 handle_p2p_request_full (op, mh);
2157 /* Something wrong with cadet's message handlers? */
2165 * Handler for peer-disconnects, notifies the client
2166 * about the aborted operation in case the op was not concluded.
2168 * @param op the destroyed operation
2171 union_peer_disconnect (struct Operation *op)
2173 if (PHASE_DONE != op->state->phase)
2175 struct GNUNET_MQ_Envelope *ev;
2176 struct GNUNET_SET_ResultMessage *msg;
2178 ev = GNUNET_MQ_msg (msg,
2179 GNUNET_MESSAGE_TYPE_SET_RESULT);
2180 msg->request_id = htonl (op->spec->client_request_id);
2181 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2182 msg->element_type = htons (0);
2183 GNUNET_MQ_send (op->spec->set->client_mq,
2185 LOG (GNUNET_ERROR_TYPE_WARNING,
2186 "other peer disconnected prematurely, phase %u\n",
2188 _GSS_operation_destroy (op,
2192 // else: the session has already been concluded
2193 LOG (GNUNET_ERROR_TYPE_DEBUG,
2194 "other peer disconnected (finished)\n");
2195 if (GNUNET_NO == op->state->client_done_sent)
2196 send_done_and_destroy (op);
2201 * Copy union-specific set state.
2203 * @param set source set for copying the union state
2204 * @return a copy of the union-specific set state
2206 static struct SetState *
2207 union_copy_state (struct Set *set)
2209 struct SetState *new_state;
2211 new_state = GNUNET_new (struct SetState);
2212 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2213 new_state->se = strata_estimator_dup (set->state->se);
2220 * Get the table with implementing functions for
2223 * @return the operation specific VTable
2225 const struct SetVT *
2228 static const struct SetVT union_vt = {
2229 .create = &union_set_create,
2230 .msg_handler = &union_handle_p2p_message,
2232 .remove = &union_remove,
2233 .destroy_set = &union_set_destroy,
2234 .evaluate = &union_evaluate,
2235 .accept = &union_accept,
2236 .peer_disconnect = &union_peer_disconnect,
2237 .cancel = &union_op_cancel,
2238 .copy_state = &union_copy_state,