2 This file is part of GNUnet
3 Copyright (C) 2013-2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
23 * @brief two-peer set operations
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
40 * Number of IBFs in a strata estimator.
42 #define SE_STRATA_COUNT 32
45 * Size of the IBFs in the strata estimator.
47 #define SE_IBF_SIZE 80
50 * The hash num parameter for the difference digests and strata estimators.
52 #define SE_IBF_HASH_NUM 4
55 * Number of buckets that can be transmitted in one message.
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
60 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61 * Choose this value so that computing the IBF is still cheaper
62 * than transmitting all values.
64 #define MAX_IBF_ORDER (20)
67 * Number of buckets used in the ibf per estimated
74 * Current phase we are in for a union operation.
76 enum UnionOperationPhase
79 * We sent the request message, and expect a strata estimator.
84 * We sent the strata estimator, and expect an IBF. This phase is entered once
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87 * XXX: could use better wording.
88 * XXX: repurposed to also expect a "request full set" message, should be renamed
90 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
95 * Continuation for multi part IBFs.
97 PHASE_EXPECT_IBF_CONT,
100 * We are decoding an IBF.
102 PHASE_INVENTORY_ACTIVE,
105 * The other peer is decoding the IBF we just sent.
107 PHASE_INVENTORY_PASSIVE,
110 * The protocol is almost finished, but we still have to flush our message
111 * queue and/or expect some elements.
113 PHASE_FINISH_CLOSING,
116 * In the penultimate phase,
117 * we wait until all our demands
118 * are satisfied. Then we send a done
119 * message, and wait for another done message.
121 PHASE_FINISH_WAITING,
124 * In the ultimate phase, we wait until
125 * our demands are satisfied and then
126 * quit (sending another DONE message).
131 * After sending the full set, wait for responses with the elements
132 * that the local peer is missing.
139 * State of an evaluate operation with another peer.
141 struct OperationState
144 * Copy of the set's strata estimator at the time of
145 * creation of this operation.
147 struct StrataEstimator *se;
150 * The IBF we currently receive.
152 struct InvertibleBloomFilter *remote_ibf;
155 * The IBF with the local set's element.
157 struct InvertibleBloomFilter *local_ibf;
160 * Maps unsalted IBF-Keys to elements.
161 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
162 * Colliding IBF-Keys are linked.
164 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
167 * Current state of the operation.
169 enum UnionOperationPhase phase;
172 * Did we send the client that we are done?
174 int client_done_sent;
177 * Number of ibf buckets already received into the @a remote_ibf.
179 unsigned int ibf_buckets_received;
182 * Hashes for elements that we have demanded from the other peer.
184 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
187 * Salt that we're using for sending IBFs
192 * Salt for the IBF we've received and that we're currently decoding.
194 uint32_t salt_receive;
197 * Number of elements we received from the other peer
198 * that were not in the local set yet.
200 uint32_t received_fresh;
203 * Total number of elements received from the other peer.
205 uint32_t received_total;
208 * Initial size of our set, just before
209 * the operation started.
211 uint64_t initial_size;
216 * The key entry is used to associate an ibf key with an element.
221 * IBF key for the entry, derived from the current salt.
223 struct IBF_Key ibf_key;
226 * The actual element associated with the key.
228 * Only owned by the union operation if element->operation
231 struct ElementEntry *element;
234 * Did we receive this element?
235 * Even if element->is_foreign is false, we might
236 * have received the element, so this indicates that
237 * the other peer has it.
244 * Used as a closure for sending elements
245 * with a specific IBF key.
247 struct SendElementClosure
250 * The IBF key whose matching elements should be
253 struct IBF_Key ibf_key;
256 * Operation for which the elements
259 struct Operation *op;
264 * Extra state required for efficient set union.
269 * The strata estimator is only generated once for
271 * The IBF keys are derived from the element hashes with
274 struct StrataEstimator *se;
279 * Iterator over hash map entries, called to
280 * destroy the linked list of colliding ibf key entries.
283 * @param key current key code
284 * @param value value in the hash map
285 * @return #GNUNET_YES if we should continue to iterate,
289 destroy_key_to_element_iter (void *cls,
293 struct KeyEntry *k = value;
295 GNUNET_assert (NULL != k);
296 if (GNUNET_YES == k->element->remote)
298 GNUNET_free (k->element);
307 * Destroy the union operation. Only things specific to the union
308 * operation are destroyed.
310 * @param op union operation to destroy
313 union_op_cancel (struct Operation *op)
315 LOG (GNUNET_ERROR_TYPE_DEBUG,
316 "destroying union op\n");
317 /* check if the op was canceled twice */
318 GNUNET_assert (NULL != op->state);
319 if (NULL != op->state->remote_ibf)
321 ibf_destroy (op->state->remote_ibf);
322 op->state->remote_ibf = NULL;
324 if (NULL != op->state->demanded_hashes)
326 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
327 op->state->demanded_hashes = NULL;
329 if (NULL != op->state->local_ibf)
331 ibf_destroy (op->state->local_ibf);
332 op->state->local_ibf = NULL;
334 if (NULL != op->state->se)
336 strata_estimator_destroy (op->state->se);
337 op->state->se = NULL;
339 if (NULL != op->state->key_to_element)
341 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
342 &destroy_key_to_element_iter,
344 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
345 op->state->key_to_element = NULL;
347 GNUNET_free (op->state);
349 LOG (GNUNET_ERROR_TYPE_DEBUG,
350 "destroying union op done\n");
355 * Inform the client that the union operation has failed,
356 * and proceed to destroy the evaluate operation.
358 * @param op the union operation to fail
361 fail_union_operation (struct Operation *op)
363 struct GNUNET_MQ_Envelope *ev;
364 struct GNUNET_SET_ResultMessage *msg;
366 LOG (GNUNET_ERROR_TYPE_ERROR,
367 "union operation failed\n");
368 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
369 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
370 msg->request_id = htonl (op->spec->client_request_id);
371 msg->element_type = htons (0);
372 GNUNET_MQ_send (op->spec->set->client_mq, ev);
373 _GSS_operation_destroy (op, GNUNET_YES);
378 * Derive the IBF key from a hash code and
381 * @param src the hash code
382 * @return the derived IBF key
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
390 GNUNET_CRYPTO_kdf (&key, sizeof (key),
392 &salt, sizeof (salt),
399 * Context for #op_get_element_iterator
401 struct GetElementContext
403 struct GNUNET_HashCode hash;
409 * Iterator over the mapping from IBF keys to element entries. Checks if we
410 * have an element with a given GNUNET_HashCode.
413 * @param key current key code
414 * @param value value in the hash map
415 * @return #GNUNET_YES if we should search further,
416 * #GNUNET_NO if we've found the element.
419 op_get_element_iterator (void *cls,
423 struct GetElementContext *ctx = cls;
424 struct KeyEntry *k = value;
426 GNUNET_assert (NULL != k);
427 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
438 * Determine whether the given element is already in the operation's element
441 * @param op operation that should be tested for 'element_hash'
442 * @param element_hash hash of the element to look for
443 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
445 static struct KeyEntry *
446 op_get_element (struct Operation *op,
447 const struct GNUNET_HashCode *element_hash)
450 struct IBF_Key ibf_key;
451 struct GetElementContext ctx = {{{ 0 }} , 0};
453 ctx.hash = *element_hash;
455 ibf_key = get_ibf_key (element_hash);
456 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
457 (uint32_t) ibf_key.key_val,
458 op_get_element_iterator,
461 /* was the iteration aborted because we found the element? */
462 if (GNUNET_SYSERR == ret)
464 GNUNET_assert (NULL != ctx.k);
472 * Insert an element into the union operation's
473 * key-to-element mapping. Takes ownership of 'ee'.
474 * Note that this does not insert the element in the set,
475 * only in the operation's key-element mapping.
476 * This is done to speed up re-tried operations, if some elements
477 * were transmitted, and then the IBF fails to decode.
479 * XXX: clarify ownership, doesn't sound right.
481 * @param op the union operation
482 * @param ee the element entry
483 * @parem received was this element received from the remote peer?
486 op_register_element (struct Operation *op,
487 struct ElementEntry *ee,
490 struct IBF_Key ibf_key;
493 ibf_key = get_ibf_key (&ee->element_hash);
494 k = GNUNET_new (struct KeyEntry);
496 k->ibf_key = ibf_key;
497 k->received = received;
498 GNUNET_assert (GNUNET_OK ==
499 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
500 (uint32_t) ibf_key.key_val,
502 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
507 salt_key (const struct IBF_Key *k_in,
509 struct IBF_Key *k_out)
512 uint64_t x = k_in->key_val;
514 x = (x >> s) | (x << (64 - s));
520 unsalt_key (const struct IBF_Key *k_in,
522 struct IBF_Key *k_out)
525 uint64_t x = k_in->key_val;
526 x = (x << s) | (x >> (64 - s));
532 * Insert a key into an ibf.
536 * @param value the key entry to get the key from
539 prepare_ibf_iterator (void *cls,
543 struct Operation *op = cls;
544 struct KeyEntry *ke = value;
545 struct IBF_Key salted_key;
547 LOG (GNUNET_ERROR_TYPE_DEBUG,
548 "[OP %x] inserting %lx (hash %s) into ibf\n",
550 (unsigned long) ke->ibf_key.key_val,
551 GNUNET_h2s (&ke->element->element_hash));
552 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
553 ibf_insert (op->state->local_ibf, salted_key);
559 * Iterator for initializing the
560 * key-to-element mapping of a union operation
562 * @param cls the union operation `struct Operation *`
564 * @param value the `struct ElementEntry *` to insert
565 * into the key-to-element mapping
566 * @return #GNUNET_YES (to continue iterating)
569 init_key_to_element_iterator (void *cls,
570 const struct GNUNET_HashCode *key,
573 struct Operation *op = cls;
574 struct ElementEntry *ee = value;
576 /* make sure that the element belongs to the set at the time
577 * of creating the operation */
578 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
581 GNUNET_assert (GNUNET_NO == ee->remote);
583 op_register_element (op, ee, GNUNET_NO);
589 * Initialize the IBF key to element mapping local to this set
592 * @param op the set union operation
595 initialize_key_to_element (struct Operation *op)
599 GNUNET_assert (NULL == op->state->key_to_element);
600 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
607 * Create an ibf with the operation's elements
608 * of the specified size
610 * @param op the union operation
611 * @param size size of the ibf to create
612 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
615 prepare_ibf (struct Operation *op,
618 GNUNET_assert (NULL != op->state->key_to_element);
620 if (NULL != op->state->local_ibf)
621 ibf_destroy (op->state->local_ibf);
622 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
623 if (NULL == op->state->local_ibf)
625 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
626 "Failed to allocate local IBF\n");
627 return GNUNET_SYSERR;
629 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
630 &prepare_ibf_iterator,
637 * Send an ibf of appropriate size.
639 * Fragments the IBF into multiple messages if necessary.
641 * @param op the union operation
642 * @param ibf_order order of the ibf to send, size=2^order
643 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
646 send_ibf (struct Operation *op,
649 unsigned int buckets_sent = 0;
650 struct InvertibleBloomFilter *ibf;
653 prepare_ibf (op, 1<<ibf_order))
655 /* allocation failed */
656 return GNUNET_SYSERR;
659 LOG (GNUNET_ERROR_TYPE_DEBUG,
660 "sending ibf of size %u\n",
664 char name[64] = { 0 };
665 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
666 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
669 ibf = op->state->local_ibf;
671 while (buckets_sent < (1 << ibf_order))
673 unsigned int buckets_in_message;
674 struct GNUNET_MQ_Envelope *ev;
675 struct IBFMessage *msg;
677 buckets_in_message = (1 << ibf_order) - buckets_sent;
678 /* limit to maximum */
679 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
680 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
682 ev = GNUNET_MQ_msg_extra (msg,
683 buckets_in_message * IBF_BUCKET_SIZE,
684 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
687 msg->order = ibf_order;
688 msg->offset = htonl (buckets_sent);
689 msg->salt = htonl (op->state->salt_send);
690 ibf_write_slice (ibf, buckets_sent,
691 buckets_in_message, &msg[1]);
692 buckets_sent += buckets_in_message;
693 LOG (GNUNET_ERROR_TYPE_DEBUG,
694 "ibf chunk size %u, %u/%u sent\n",
698 GNUNET_MQ_send (op->mq, ev);
701 /* The other peer must decode the IBF, so
703 op->state->phase = PHASE_INVENTORY_PASSIVE;
709 * Send a strata estimator to the remote peer.
711 * @param op the union operation with the remote peer
714 send_strata_estimator (struct Operation *op)
716 const struct StrataEstimator *se = op->state->se;
717 struct GNUNET_MQ_Envelope *ev;
718 struct StrataEstimatorMessage *strata_msg;
723 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
724 len = strata_estimator_write (op->state->se,
726 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
727 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
729 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
730 ev = GNUNET_MQ_msg_extra (strata_msg,
733 GNUNET_memcpy (&strata_msg[1],
737 strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
738 GNUNET_MQ_send (op->mq,
740 op->state->phase = PHASE_EXPECT_IBF;
741 LOG (GNUNET_ERROR_TYPE_DEBUG,
742 "sent SE, expecting IBF\n");
747 * Compute the necessary order of an ibf
748 * from the size of the symmetric set difference.
750 * @param diff the difference
751 * @return the required size of the ibf
754 get_order_from_difference (unsigned int diff)
756 unsigned int ibf_order;
759 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
760 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
762 if (ibf_order > MAX_IBF_ORDER)
763 ibf_order = MAX_IBF_ORDER;
764 // add one for correction
765 return ibf_order + 1;
770 * Send a set element.
772 * @param cls the union operation `struct Operation *`
774 * @param value the `struct ElementEntry *` to insert
775 * into the key-to-element mapping
776 * @return #GNUNET_YES (to continue iterating)
779 send_element_iterator (void *cls,
780 const struct GNUNET_HashCode *key,
783 struct Operation *op = cls;
784 struct GNUNET_SET_ElementMessage *emsg;
785 struct ElementEntry *ee = value;
786 struct GNUNET_SET_Element *el = &ee->element;
787 struct GNUNET_MQ_Envelope *ev;
790 ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
791 emsg->element_type = htons (el->element_type);
792 GNUNET_memcpy (&emsg[1], el->data, el->size);
793 GNUNET_MQ_send (op->mq, ev);
799 send_full_set (struct Operation *op)
801 struct GNUNET_MQ_Envelope *ev;
803 op->state->phase = PHASE_FULL_SENDING;
805 (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
806 &send_element_iterator, op);
807 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
808 GNUNET_MQ_send (op->mq, ev);
813 * Handle a strata estimator from a remote peer
815 * @param cls the union operation
816 * @param mh the message
817 * @param is_compressed #GNUNET_YES if the estimator is compressed
818 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
819 * #GNUNET_OK otherwise
822 handle_p2p_strata_estimator (void *cls,
823 const struct GNUNET_MessageHeader *mh,
826 struct Operation *op = cls;
827 struct StrataEstimator *remote_se;
828 struct StrataEstimatorMessage *msg = (void *) mh;
833 GNUNET_STATISTICS_update (_GSS_statistics,
834 "# bytes of SE received",
838 if (op->state->phase != PHASE_EXPECT_SE)
841 fail_union_operation (op);
842 return GNUNET_SYSERR;
844 len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
845 if ( (GNUNET_NO == is_compressed) &&
846 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
848 fail_union_operation (op);
850 return GNUNET_SYSERR;
852 other_size = GNUNET_ntohll (msg->set_size);
853 remote_se = strata_estimator_create (SE_STRATA_COUNT,
856 if (NULL == remote_se)
858 /* insufficient resources, fail */
859 fail_union_operation (op);
860 return GNUNET_SYSERR;
863 strata_estimator_read (&msg[1],
868 /* decompression failed */
869 fail_union_operation (op);
870 strata_estimator_destroy (remote_se);
871 return GNUNET_SYSERR;
873 GNUNET_assert (NULL != op->state->se);
874 diff = strata_estimator_difference (remote_se,
876 strata_estimator_destroy (remote_se);
877 strata_estimator_destroy (op->state->se);
878 op->state->se = NULL;
879 LOG (GNUNET_ERROR_TYPE_DEBUG,
880 "got se diff=%d, using ibf size %d\n",
882 1<<get_order_from_difference (diff));
884 if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
887 fail_union_operation (op);
888 return GNUNET_SYSERR;
892 if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2))
894 LOG (GNUNET_ERROR_TYPE_INFO,
895 "Sending full set (diff=%d, own set=%u)\n",
897 op->state->initial_size);
898 GNUNET_STATISTICS_update (_GSS_statistics,
902 if (op->state->initial_size <= other_size)
908 struct GNUNET_MQ_Envelope *ev;
909 op->state->phase = PHASE_EXPECT_IBF;
910 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
911 GNUNET_MQ_send (op->mq, ev);
916 GNUNET_STATISTICS_update (_GSS_statistics,
922 get_order_from_difference (diff)))
924 /* Internal error, best we can do is shut the connection */
925 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
926 "Failed to send IBF, closing connection\n");
927 fail_union_operation (op);
928 return GNUNET_SYSERR;
937 * Iterator to send elements to a remote peer
939 * @param cls closure with the element key and the union operation
941 * @param value the key entry
944 send_offers_iterator (void *cls,
948 struct SendElementClosure *sec = cls;
949 struct Operation *op = sec->op;
950 struct KeyEntry *ke = value;
951 struct GNUNET_MQ_Envelope *ev;
952 struct GNUNET_MessageHeader *mh;
954 /* Detect 32-bit key collision for the 64-bit IBF keys. */
955 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
958 ev = GNUNET_MQ_msg_header_extra (mh,
959 sizeof (struct GNUNET_HashCode),
960 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
962 GNUNET_assert (NULL != ev);
963 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
964 LOG (GNUNET_ERROR_TYPE_DEBUG,
965 "[OP %x] sending element offer (%s) to peer\n",
967 GNUNET_h2s (&ke->element->element_hash));
968 GNUNET_MQ_send (op->mq, ev);
974 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
976 * @param op union operation
977 * @param ibf_key IBF key of interest
980 send_offers_for_key (struct Operation *op,
981 struct IBF_Key ibf_key)
983 struct SendElementClosure send_cls;
985 send_cls.ibf_key = ibf_key;
987 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
988 (uint32_t) ibf_key.key_val,
989 &send_offers_iterator,
995 * Decode which elements are missing on each side, and
996 * send the appropriate offers and inquiries.
998 * @param op union operation
999 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1002 decode_and_send (struct Operation *op)
1005 struct IBF_Key last_key;
1007 unsigned int num_decoded;
1008 struct InvertibleBloomFilter *diff_ibf;
1010 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1013 prepare_ibf (op, op->state->remote_ibf->size))
1016 /* allocation failed */
1017 return GNUNET_SYSERR;
1019 diff_ibf = ibf_dup (op->state->local_ibf);
1020 ibf_subtract (diff_ibf, op->state->remote_ibf);
1022 ibf_destroy (op->state->remote_ibf);
1023 op->state->remote_ibf = NULL;
1025 LOG (GNUNET_ERROR_TYPE_DEBUG,
1026 "decoding IBF (size=%u)\n",
1030 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1035 int cycle_detected = GNUNET_NO;
1039 res = ibf_decode (diff_ibf, &side, &key);
1040 if (res == GNUNET_OK)
1042 LOG (GNUNET_ERROR_TYPE_DEBUG,
1043 "decoded ibf key %lx\n",
1044 (unsigned long) key.key_val);
1046 if ( (num_decoded > diff_ibf->size) ||
1047 ( (num_decoded > 1) &&
1048 (last_key.key_val == key.key_val) ) )
1050 LOG (GNUNET_ERROR_TYPE_DEBUG,
1051 "detected cyclic ibf (decoded %u/%u)\n",
1054 cycle_detected = GNUNET_YES;
1057 if ( (GNUNET_SYSERR == res) ||
1058 (GNUNET_YES == cycle_detected) )
1062 while (1<<next_order < diff_ibf->size)
1065 if (next_order <= MAX_IBF_ORDER)
1067 LOG (GNUNET_ERROR_TYPE_DEBUG,
1068 "decoding failed, sending larger ibf (size %u)\n",
1070 GNUNET_STATISTICS_update (_GSS_statistics,
1074 op->state->salt_send++;
1076 send_ibf (op, next_order))
1078 /* Internal error, best we can do is shut the connection */
1079 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1080 "Failed to send IBF, closing connection\n");
1081 fail_union_operation (op);
1082 ibf_destroy (diff_ibf);
1083 return GNUNET_SYSERR;
1088 GNUNET_STATISTICS_update (_GSS_statistics,
1089 "# of failed union operations (too large)",
1092 // XXX: Send the whole set, element-by-element
1093 LOG (GNUNET_ERROR_TYPE_ERROR,
1094 "set union failed: reached ibf limit\n");
1095 fail_union_operation (op);
1096 ibf_destroy (diff_ibf);
1097 return GNUNET_SYSERR;
1101 if (GNUNET_NO == res)
1103 struct GNUNET_MQ_Envelope *ev;
1105 LOG (GNUNET_ERROR_TYPE_DEBUG,
1106 "transmitted all values, sending DONE\n");
1107 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1108 GNUNET_MQ_send (op->mq, ev);
1109 /* We now wait until we get a DONE message back
1110 * and then wait for our MQ to be flushed and all our
1111 * demands be delivered. */
1116 struct IBF_Key unsalted_key;
1117 unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1118 send_offers_for_key (op, unsalted_key);
1120 else if (-1 == side)
1122 struct GNUNET_MQ_Envelope *ev;
1123 struct InquiryMessage *msg;
1125 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1126 * the effort additional complexity. */
1127 ev = GNUNET_MQ_msg_extra (msg,
1128 sizeof (struct IBF_Key),
1129 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1130 msg->salt = htonl (op->state->salt_receive);
1131 GNUNET_memcpy (&msg[1],
1133 sizeof (struct IBF_Key));
1134 LOG (GNUNET_ERROR_TYPE_DEBUG,
1135 "sending element inquiry for IBF key %lx\n",
1136 (unsigned long) key.key_val);
1137 GNUNET_MQ_send (op->mq, ev);
1144 ibf_destroy (diff_ibf);
1150 * Handle an IBF message from a remote peer.
1152 * Reassemble the IBF from multiple pieces, and
1153 * process the whole IBF once possible.
1155 * @param cls the union operation
1156 * @param mh the header of the message
1157 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1158 * #GNUNET_OK otherwise
1161 handle_p2p_ibf (void *cls,
1162 const struct GNUNET_MessageHeader *mh)
1164 struct Operation *op = cls;
1165 const struct IBFMessage *msg;
1166 unsigned int buckets_in_message;
1168 if (ntohs (mh->size) < sizeof (struct IBFMessage))
1170 GNUNET_break_op (0);
1171 fail_union_operation (op);
1172 return GNUNET_SYSERR;
1174 msg = (const struct IBFMessage *) mh;
1175 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1176 (op->state->phase == PHASE_EXPECT_IBF) )
1178 op->state->phase = PHASE_EXPECT_IBF_CONT;
1179 GNUNET_assert (NULL == op->state->remote_ibf);
1180 LOG (GNUNET_ERROR_TYPE_DEBUG,
1181 "Creating new ibf of size %u\n",
1183 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1184 op->state->salt_receive = ntohl (msg->salt);
1185 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1186 if (NULL == op->state->remote_ibf)
1188 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1189 "Failed to parse remote IBF, closing connection\n");
1190 fail_union_operation (op);
1191 return GNUNET_SYSERR;
1193 op->state->ibf_buckets_received = 0;
1194 if (0 != ntohl (msg->offset))
1196 GNUNET_break_op (0);
1197 fail_union_operation (op);
1198 return GNUNET_SYSERR;
1201 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1203 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1205 GNUNET_break_op (0);
1206 fail_union_operation (op);
1207 return GNUNET_SYSERR;
1209 if (1<<msg->order != op->state->remote_ibf->size)
1211 GNUNET_break_op (0);
1212 fail_union_operation (op);
1213 return GNUNET_SYSERR;
1215 if (ntohl (msg->salt) != op->state->salt_receive)
1217 GNUNET_break_op (0);
1218 fail_union_operation (op);
1219 return GNUNET_SYSERR;
1227 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1229 if (0 == buckets_in_message)
1231 GNUNET_break_op (0);
1232 fail_union_operation (op);
1233 return GNUNET_SYSERR;
1236 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1238 GNUNET_break_op (0);
1239 fail_union_operation (op);
1240 return GNUNET_SYSERR;
1243 GNUNET_assert (NULL != op->state->remote_ibf);
1245 ibf_read_slice (&msg[1],
1246 op->state->ibf_buckets_received,
1248 op->state->remote_ibf);
1249 op->state->ibf_buckets_received += buckets_in_message;
1251 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1253 LOG (GNUNET_ERROR_TYPE_DEBUG,
1254 "received full ibf\n");
1255 op->state->phase = PHASE_INVENTORY_ACTIVE;
1257 decode_and_send (op))
1259 /* Internal error, best we can do is shut down */
1260 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1261 "Failed to decode IBF, closing connection\n");
1262 return GNUNET_SYSERR;
1270 * Send a result message to the client indicating
1271 * that there is a new element.
1273 * @param op union operation
1274 * @param element element to send
1275 * @param status status to send with the new element
1278 send_client_element (struct Operation *op,
1279 struct GNUNET_SET_Element *element,
1282 struct GNUNET_MQ_Envelope *ev;
1283 struct GNUNET_SET_ResultMessage *rm;
1285 LOG (GNUNET_ERROR_TYPE_DEBUG,
1286 "sending element (size %u) to client\n",
1288 GNUNET_assert (0 != op->spec->client_request_id);
1289 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1292 GNUNET_MQ_discard (ev);
1296 rm->result_status = htons (status);
1297 rm->request_id = htonl (op->spec->client_request_id);
1298 rm->element_type = htons (element->element_type);
1299 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1300 GNUNET_memcpy (&rm[1], element->data, element->size);
1301 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1306 * Signal to the client that the operation has finished and
1307 * destroy the operation.
1309 * @param cls operation to destroy
1312 send_done_and_destroy (void *cls)
1314 struct Operation *op = cls;
1315 struct GNUNET_MQ_Envelope *ev;
1316 struct GNUNET_SET_ResultMessage *rm;
1318 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1319 rm->request_id = htonl (op->spec->client_request_id);
1320 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1321 rm->element_type = htons (0);
1322 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1323 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1324 /* Will also call the union-specific cancel function. */
1325 _GSS_operation_destroy (op, GNUNET_YES);
1330 maybe_finish (struct Operation *op)
1332 unsigned int num_demanded;
1334 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1336 if (PHASE_FINISH_WAITING == op->state->phase)
1338 LOG (GNUNET_ERROR_TYPE_DEBUG,
1339 "In PHASE_FINISH_WAITING, pending %u demands\n",
1341 if (0 == num_demanded)
1343 struct GNUNET_MQ_Envelope *ev;
1345 op->state->phase = PHASE_DONE;
1346 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1347 GNUNET_MQ_send (op->mq, ev);
1349 /* We now wait until the other peer closes the channel
1350 * after it got all elements from us. */
1353 if (PHASE_FINISH_CLOSING == op->state->phase)
1355 LOG (GNUNET_ERROR_TYPE_DEBUG,
1356 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1358 if (0 == num_demanded)
1360 op->state->phase = PHASE_DONE;
1361 send_done_and_destroy (op);
1368 * Handle an element message from a remote peer.
1369 * Sent by the other peer either because we decoded an IBF and placed a demand,
1370 * or because the other peer switched to full set transmission.
1372 * @param cls the union operation
1373 * @param mh the message
1376 handle_p2p_elements (void *cls,
1377 const struct GNUNET_MessageHeader *mh)
1379 struct Operation *op = cls;
1380 struct ElementEntry *ee;
1381 const struct GNUNET_SET_ElementMessage *emsg;
1382 uint16_t element_size;
1384 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1386 GNUNET_break_op (0);
1387 fail_union_operation (op);
1390 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1392 GNUNET_break_op (0);
1393 fail_union_operation (op);
1397 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1399 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1400 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1401 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1402 ee->element.size = element_size;
1403 ee->element.data = &ee[1];
1404 ee->element.element_type = ntohs (emsg->element_type);
1405 ee->remote = GNUNET_YES;
1406 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1409 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1413 /* We got something we didn't demand, since it's not in our map. */
1414 GNUNET_break_op (0);
1416 fail_union_operation (op);
1420 LOG (GNUNET_ERROR_TYPE_DEBUG,
1421 "Got element (size %u, hash %s) from peer\n",
1422 (unsigned int) element_size,
1423 GNUNET_h2s (&ee->element_hash));
1425 GNUNET_STATISTICS_update (_GSS_statistics,
1426 "# received elements",
1429 GNUNET_STATISTICS_update (_GSS_statistics,
1430 "# exchanged elements",
1434 op->state->received_total += 1;
1436 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1440 /* Got repeated element. Should not happen since
1441 * we track demands. */
1442 GNUNET_STATISTICS_update (_GSS_statistics,
1443 "# repeated elements",
1446 ke->received = GNUNET_YES;
1451 LOG (GNUNET_ERROR_TYPE_DEBUG,
1452 "Registering new element from remote peer\n");
1453 op->state->received_fresh += 1;
1454 op_register_element (op, ee, GNUNET_YES);
1455 /* only send results immediately if the client wants it */
1456 switch (op->spec->result_mode)
1458 case GNUNET_SET_RESULT_ADDED:
1459 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1461 case GNUNET_SET_RESULT_SYMMETRIC:
1462 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1465 /* Result mode not supported, should have been caught earlier. */
1471 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1473 /* The other peer gave us lots of old elements, there's something wrong. */
1474 GNUNET_break_op (0);
1475 fail_union_operation (op);
1484 * Handle an element message from a remote peer.
1486 * @param cls the union operation
1487 * @param mh the message
1490 handle_p2p_full_element (void *cls,
1491 const struct GNUNET_MessageHeader *mh)
1493 struct Operation *op = cls;
1494 struct ElementEntry *ee;
1495 const struct GNUNET_SET_ElementMessage *emsg;
1496 uint16_t element_size;
1498 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1500 GNUNET_break_op (0);
1501 fail_union_operation (op);
1505 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1507 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1508 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1509 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1510 ee->element.size = element_size;
1511 ee->element.data = &ee[1];
1512 ee->element.element_type = ntohs (emsg->element_type);
1513 ee->remote = GNUNET_YES;
1514 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1516 LOG (GNUNET_ERROR_TYPE_DEBUG,
1517 "Got element (full diff, size %u, hash %s) from peer\n",
1518 (unsigned int) element_size,
1519 GNUNET_h2s (&ee->element_hash));
1521 GNUNET_STATISTICS_update (_GSS_statistics,
1522 "# received elements",
1525 GNUNET_STATISTICS_update (_GSS_statistics,
1526 "# exchanged elements",
1530 op->state->received_total += 1;
1532 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1536 /* Got repeated element. Should not happen since
1537 * we track demands. */
1538 GNUNET_STATISTICS_update (_GSS_statistics,
1539 "# repeated elements",
1542 ke->received = GNUNET_YES;
1547 LOG (GNUNET_ERROR_TYPE_DEBUG,
1548 "Registering new element from remote peer\n");
1549 op->state->received_fresh += 1;
1550 op_register_element (op, ee, GNUNET_YES);
1551 /* only send results immediately if the client wants it */
1552 switch (op->spec->result_mode)
1554 case GNUNET_SET_RESULT_ADDED:
1555 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1557 case GNUNET_SET_RESULT_SYMMETRIC:
1558 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1561 /* Result mode not supported, should have been caught earlier. */
1567 if ( (GNUNET_YES == op->spec->byzantine) &&
1568 (op->state->received_total > 128) &&
1569 (op->state->received_fresh < op->state->received_total / 3) )
1571 /* The other peer gave us lots of old elements, there's something wrong. */
1572 LOG (GNUNET_ERROR_TYPE_ERROR,
1573 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1574 (unsigned long long) op->state->received_fresh,
1575 (unsigned long long) op->state->received_total);
1576 GNUNET_break_op (0);
1577 fail_union_operation (op);
1583 * Send offers (for GNUNET_Hash-es) in response
1584 * to inquiries (for IBF_Key-s).
1586 * @param cls the union operation
1587 * @param mh the message
1590 handle_p2p_inquiry (void *cls,
1591 const struct GNUNET_MessageHeader *mh)
1593 struct Operation *op = cls;
1594 const struct IBF_Key *ibf_key;
1595 unsigned int num_keys;
1596 struct InquiryMessage *msg;
1598 /* look up elements and send them */
1599 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1601 GNUNET_break_op (0);
1602 fail_union_operation (op);
1605 num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1606 / sizeof (struct IBF_Key);
1607 if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1608 != num_keys * sizeof (struct IBF_Key))
1610 GNUNET_break_op (0);
1611 fail_union_operation (op);
1615 msg = (struct InquiryMessage *) mh;
1617 ibf_key = (const struct IBF_Key *) &msg[1];
1618 while (0 != num_keys--)
1620 struct IBF_Key unsalted_key;
1621 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1622 send_offers_for_key (op, unsalted_key);
1629 * Iterator over hash map entries, called to
1630 * destroy the linked list of colliding ibf key entries.
1632 * @param cls closure
1633 * @param key current key code
1634 * @param value value in the hash map
1635 * @return #GNUNET_YES if we should continue to iterate,
1636 * #GNUNET_NO if not.
1639 send_missing_elements_iter (void *cls,
1643 struct Operation *op = cls;
1644 struct KeyEntry *ke = value;
1645 struct GNUNET_MQ_Envelope *ev;
1646 struct GNUNET_SET_ElementMessage *emsg;
1647 struct ElementEntry *ee = ke->element;
1649 if (GNUNET_YES == ke->received)
1652 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1653 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1654 emsg->reserved = htons (0);
1655 emsg->element_type = htons (ee->element.element_type);
1656 GNUNET_MQ_send (op->mq, ev);
1665 * @parem cls closure, a set union operation
1666 * @param mh the demand message
1669 handle_p2p_request_full (void *cls,
1670 const struct GNUNET_MessageHeader *mh)
1672 struct Operation *op = cls;
1674 if (PHASE_EXPECT_IBF != op->state->phase)
1676 fail_union_operation (op);
1677 GNUNET_break_op (0);
1681 // FIXME: we need to check that our set is larger than the
1682 // byzantine_lower_bound by some threshold
1688 * Handle a "full done" message.
1690 * @parem cls closure, a set union operation
1691 * @param mh the demand message
1694 handle_p2p_full_done (void *cls,
1695 const struct GNUNET_MessageHeader *mh)
1697 struct Operation *op = cls;
1699 if (PHASE_EXPECT_IBF == op->state->phase)
1701 struct GNUNET_MQ_Envelope *ev;
1703 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1705 /* send all the elements that did not come from the remote peer */
1706 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1707 &send_missing_elements_iter,
1710 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1711 GNUNET_MQ_send (op->mq, ev);
1712 op->state->phase = PHASE_DONE;
1714 /* we now wait until the other peer shuts the tunnel down*/
1716 else if (PHASE_FULL_SENDING == op->state->phase)
1718 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1719 /* We sent the full set, and got the response for that. We're done. */
1720 op->state->phase = PHASE_DONE;
1721 send_done_and_destroy (op);
1725 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1726 GNUNET_break_op (0);
1727 fail_union_operation (op);
1734 * Handle a demand by the other peer for elements based on a list
1735 * of GNUNET_HashCode-s.
1737 * @parem cls closure, a set union operation
1738 * @param mh the demand message
1741 handle_p2p_demand (void *cls,
1742 const struct GNUNET_MessageHeader *mh)
1744 struct Operation *op = cls;
1745 struct ElementEntry *ee;
1746 struct GNUNET_SET_ElementMessage *emsg;
1747 const struct GNUNET_HashCode *hash;
1748 unsigned int num_hashes;
1749 struct GNUNET_MQ_Envelope *ev;
1751 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1752 / sizeof (struct GNUNET_HashCode);
1753 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1754 != num_hashes * sizeof (struct GNUNET_HashCode))
1756 GNUNET_break_op (0);
1757 fail_union_operation (op);
1761 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1763 hash++, num_hashes--)
1765 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1768 /* Demand for non-existing element. */
1769 GNUNET_break_op (0);
1770 fail_union_operation (op);
1773 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1775 /* Probably confused lazily copied sets. */
1776 GNUNET_break_op (0);
1777 fail_union_operation (op);
1780 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1781 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1782 emsg->reserved = htons (0);
1783 emsg->element_type = htons (ee->element.element_type);
1784 LOG (GNUNET_ERROR_TYPE_DEBUG,
1785 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1787 (unsigned int) ee->element.size,
1788 GNUNET_h2s (&ee->element_hash));
1789 GNUNET_MQ_send (op->mq, ev);
1790 GNUNET_STATISTICS_update (_GSS_statistics,
1791 "# exchanged elements",
1795 switch (op->spec->result_mode)
1797 case GNUNET_SET_RESULT_ADDED:
1798 /* Nothing to do. */
1800 case GNUNET_SET_RESULT_SYMMETRIC:
1801 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1804 /* Result mode not supported, should have been caught earlier. */
1813 * Handle offers (of GNUNET_HashCode-s) and
1814 * respond with demands (of GNUNET_HashCode-s).
1816 * @param cls the union operation
1817 * @param mh the message
1820 handle_p2p_offer (void *cls,
1821 const struct GNUNET_MessageHeader *mh)
1823 struct Operation *op = cls;
1824 const struct GNUNET_HashCode *hash;
1825 unsigned int num_hashes;
1827 /* look up elements and send them */
1828 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1829 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1831 GNUNET_break_op (0);
1832 fail_union_operation (op);
1835 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1836 / sizeof (struct GNUNET_HashCode);
1837 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1838 != num_hashes * sizeof (struct GNUNET_HashCode))
1840 GNUNET_break_op (0);
1841 fail_union_operation (op);
1845 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1847 hash++, num_hashes--)
1849 struct ElementEntry *ee;
1850 struct GNUNET_MessageHeader *demands;
1851 struct GNUNET_MQ_Envelope *ev;
1853 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1856 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1860 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1863 LOG (GNUNET_ERROR_TYPE_DEBUG,
1864 "Skipped sending duplicate demand\n");
1868 GNUNET_assert (GNUNET_OK ==
1869 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1872 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1874 LOG (GNUNET_ERROR_TYPE_DEBUG,
1875 "[OP %x] Requesting element (hash %s)\n",
1876 (void *) op, GNUNET_h2s (hash));
1877 ev = GNUNET_MQ_msg_header_extra (demands,
1878 sizeof (struct GNUNET_HashCode),
1879 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1880 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1881 GNUNET_MQ_send (op->mq, ev);
1887 * Handle a done message from a remote peer
1889 * @param cls the union operation
1890 * @param mh the message
1893 handle_p2p_done (void *cls,
1894 const struct GNUNET_MessageHeader *mh)
1896 struct Operation *op = cls;
1898 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1900 /* We got all requests, but still have to send our elements in response. */
1902 op->state->phase = PHASE_FINISH_WAITING;
1904 LOG (GNUNET_ERROR_TYPE_DEBUG,
1905 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1906 /* The active peer is done sending offers
1907 * and inquiries. This means that all
1908 * our responses to that (demands and offers)
1909 * must be in flight (queued or in mesh).
1911 * We should notify the active peer once
1912 * all our demands are satisfied, so that the active
1913 * peer can quit if we gave him everything.
1918 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1920 LOG (GNUNET_ERROR_TYPE_DEBUG,
1921 "got DONE (as active partner), waiting to finish\n");
1922 /* All demands of the other peer are satisfied,
1923 * and we processed all offers, thus we know
1924 * exactly what our demands must be.
1926 * We'll close the channel
1927 * to the other peer once our demands are met.
1929 op->state->phase = PHASE_FINISH_CLOSING;
1933 GNUNET_break_op (0);
1934 fail_union_operation (op);
1939 * Initiate operation to evaluate a set union with a remote peer.
1941 * @param op operation to perform (to be initialized)
1942 * @param opaque_context message to be transmitted to the listener
1943 * to convince him to accept, may be NULL
1946 union_evaluate (struct Operation *op,
1947 const struct GNUNET_MessageHeader *opaque_context)
1949 struct GNUNET_MQ_Envelope *ev;
1950 struct OperationRequestMessage *msg;
1952 GNUNET_assert (NULL == op->state);
1953 op->state = GNUNET_new (struct OperationState);
1954 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1955 /* copy the current generation's strata estimator for this operation */
1956 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1957 /* we started the operation, thus we have to send the operation request */
1958 op->state->phase = PHASE_EXPECT_SE;
1959 op->state->salt_receive = op->state->salt_send = 42;
1960 LOG (GNUNET_ERROR_TYPE_DEBUG,
1961 "Initiating union operation evaluation\n");
1962 GNUNET_STATISTICS_update (_GSS_statistics,
1963 "# of total union operations",
1966 GNUNET_STATISTICS_update (_GSS_statistics,
1967 "# of initiated union operations",
1970 ev = GNUNET_MQ_msg_nested_mh (msg,
1971 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1975 /* the context message is too large */
1977 GNUNET_SERVICE_client_drop (op->spec->set->client);
1980 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1981 GNUNET_MQ_send (op->mq,
1984 if (NULL != opaque_context)
1985 LOG (GNUNET_ERROR_TYPE_DEBUG,
1986 "sent op request with context message\n");
1988 LOG (GNUNET_ERROR_TYPE_DEBUG,
1989 "sent op request without context message\n");
1991 initialize_key_to_element (op);
1992 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
1997 * Accept an union operation request from a remote peer.
1998 * Only initializes the private operation state.
2000 * @param op operation that will be accepted as a union operation
2003 union_accept (struct Operation *op)
2005 LOG (GNUNET_ERROR_TYPE_DEBUG,
2006 "accepting set union operation\n");
2007 GNUNET_assert (NULL == op->state);
2009 GNUNET_STATISTICS_update (_GSS_statistics,
2010 "# of accepted union operations",
2013 GNUNET_STATISTICS_update (_GSS_statistics,
2014 "# of total union operations",
2018 op->state = GNUNET_new (struct OperationState);
2019 op->state->se = strata_estimator_dup (op->spec->set->state->se);
2020 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2021 op->state->salt_receive = op->state->salt_send = 42;
2022 initialize_key_to_element (op);
2023 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2024 /* kick off the operation */
2025 send_strata_estimator (op);
2030 * Create a new set supporting the union operation
2032 * We maintain one strata estimator per set and then manipulate it over the
2033 * lifetime of the set, as recreating a strata estimator would be expensive.
2035 * @return the newly created set, NULL on error
2037 static struct SetState *
2038 union_set_create (void)
2040 struct SetState *set_state;
2042 LOG (GNUNET_ERROR_TYPE_DEBUG,
2043 "union set created\n");
2044 set_state = GNUNET_new (struct SetState);
2045 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2046 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2047 if (NULL == set_state->se)
2049 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2050 "Failed to allocate strata estimator\n");
2051 GNUNET_free (set_state);
2059 * Add the element from the given element message to the set.
2061 * @param set_state state of the set want to add to
2062 * @param ee the element to add to the set
2065 union_add (struct SetState *set_state, struct ElementEntry *ee)
2067 strata_estimator_insert (set_state->se,
2068 get_ibf_key (&ee->element_hash));
2073 * Remove the element given in the element message from the set.
2074 * Only marks the element as removed, so that older set operations can still exchange it.
2076 * @param set_state state of the set to remove from
2077 * @param ee set element to remove
2080 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2082 strata_estimator_remove (set_state->se,
2083 get_ibf_key (&ee->element_hash));
2088 * Destroy a set that supports the union operation.
2090 * @param set_state the set to destroy
2093 union_set_destroy (struct SetState *set_state)
2095 if (NULL != set_state->se)
2097 strata_estimator_destroy (set_state->se);
2098 set_state->se = NULL;
2100 GNUNET_free (set_state);
2105 * Dispatch messages for a union operation.
2107 * @param op the state of the union evaluate operation
2108 * @param mh the received message
2109 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2110 * #GNUNET_OK otherwise
2113 union_handle_p2p_message (struct Operation *op,
2114 const struct GNUNET_MessageHeader *mh)
2116 //LOG (GNUNET_ERROR_TYPE_DEBUG,
2117 // "received p2p message (t: %u, s: %u)\n",
2118 // ntohs (mh->type),
2119 // ntohs (mh->size));
2120 switch (ntohs (mh->type))
2122 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2123 return handle_p2p_ibf (op, mh);
2124 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2125 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2126 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2127 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2128 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2129 handle_p2p_elements (op, mh);
2131 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2132 handle_p2p_full_element (op, mh);
2134 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2135 handle_p2p_inquiry (op, mh);
2137 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2138 handle_p2p_done (op, mh);
2140 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2141 handle_p2p_offer (op, mh);
2143 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2144 handle_p2p_demand (op, mh);
2146 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2147 handle_p2p_full_done (op, mh);
2149 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2150 handle_p2p_request_full (op, mh);
2153 /* Something wrong with cadet's message handlers? */
2161 * Handler for peer-disconnects, notifies the client
2162 * about the aborted operation in case the op was not concluded.
2164 * @param op the destroyed operation
2167 union_peer_disconnect (struct Operation *op)
2169 if (PHASE_DONE != op->state->phase)
2171 struct GNUNET_MQ_Envelope *ev;
2172 struct GNUNET_SET_ResultMessage *msg;
2174 ev = GNUNET_MQ_msg (msg,
2175 GNUNET_MESSAGE_TYPE_SET_RESULT);
2176 msg->request_id = htonl (op->spec->client_request_id);
2177 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2178 msg->element_type = htons (0);
2179 GNUNET_MQ_send (op->spec->set->client_mq,
2181 LOG (GNUNET_ERROR_TYPE_WARNING,
2182 "other peer disconnected prematurely, phase %u\n",
2184 _GSS_operation_destroy (op,
2188 // else: the session has already been concluded
2189 LOG (GNUNET_ERROR_TYPE_DEBUG,
2190 "other peer disconnected (finished)\n");
2191 if (GNUNET_NO == op->state->client_done_sent)
2192 send_done_and_destroy (op);
2197 * Copy union-specific set state.
2199 * @param set source set for copying the union state
2200 * @return a copy of the union-specific set state
2202 static struct SetState *
2203 union_copy_state (struct Set *set)
2205 struct SetState *new_state;
2207 new_state = GNUNET_new (struct SetState);
2208 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2209 new_state->se = strata_estimator_dup (set->state->se);
2216 * Get the table with implementing functions for
2219 * @return the operation specific VTable
2221 const struct SetVT *
2224 static const struct SetVT union_vt = {
2225 .create = &union_set_create,
2226 .msg_handler = &union_handle_p2p_message,
2228 .remove = &union_remove,
2229 .destroy_set = &union_set_destroy,
2230 .evaluate = &union_evaluate,
2231 .accept = &union_accept,
2232 .peer_disconnect = &union_peer_disconnect,
2233 .cancel = &union_op_cancel,
2234 .copy_state = &union_copy_state,