2 This file is part of GNUnet
3 Copyright (C) 2013-2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
23 * @brief two-peer set operations
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
40 * Number of IBFs in a strata estimator.
42 #define SE_STRATA_COUNT 32
45 * Size of the IBFs in the strata estimator.
47 #define SE_IBF_SIZE 80
50 * The hash num parameter for the difference digests and strata estimators.
52 #define SE_IBF_HASH_NUM 4
55 * Number of buckets that can be transmitted in one message.
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
60 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61 * Choose this value so that computing the IBF is still cheaper
62 * than transmitting all values.
64 #define MAX_IBF_ORDER (20)
67 * Number of buckets used in the ibf per estimated
74 * Current phase we are in for a union operation.
76 enum UnionOperationPhase
79 * We sent the request message, and expect a strata estimator.
84 * We sent the strata estimator, and expect an IBF. This phase is entered once
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87 * XXX: could use better wording.
88 * XXX: repurposed to also expect a "request full set" message, should be renamed
90 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
95 * Continuation for multi part IBFs.
97 PHASE_EXPECT_IBF_CONT,
100 * We are decoding an IBF.
102 PHASE_INVENTORY_ACTIVE,
105 * The other peer is decoding the IBF we just sent.
107 PHASE_INVENTORY_PASSIVE,
110 * The protocol is almost finished, but we still have to flush our message
111 * queue and/or expect some elements.
113 PHASE_FINISH_CLOSING,
116 * In the penultimate phase,
117 * we wait until all our demands
118 * are satisfied. Then we send a done
119 * message, and wait for another done message.
121 PHASE_FINISH_WAITING,
124 * In the ultimate phase, we wait until
125 * our demands are satisfied and then
126 * quit (sending another DONE message).
131 * After sending the full set, wait for responses with the elements
132 * that the local peer is missing.
139 * State of an evaluate operation with another peer.
141 struct OperationState
144 * Copy of the set's strata estimator at the time of
145 * creation of this operation.
147 struct StrataEstimator *se;
150 * The IBF we currently receive.
152 struct InvertibleBloomFilter *remote_ibf;
155 * The IBF with the local set's element.
157 struct InvertibleBloomFilter *local_ibf;
160 * Maps unsalted IBF-Keys to elements.
161 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
162 * Colliding IBF-Keys are linked.
164 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
167 * Current state of the operation.
169 enum UnionOperationPhase phase;
172 * Did we send the client that we are done?
174 int client_done_sent;
177 * Number of ibf buckets already received into the @a remote_ibf.
179 unsigned int ibf_buckets_received;
182 * Hashes for elements that we have demanded from the other peer.
184 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
187 * Salt that we're using for sending IBFs
192 * Salt for the IBF we've received and that we're currently decoding.
194 uint32_t salt_receive;
197 * Number of elements we received from the other peer
198 * that were not in the local set yet.
200 uint32_t received_fresh;
203 * Total number of elements received from the other peer.
205 uint32_t received_total;
208 * Initial size of our set, just before
209 * the operation started.
211 uint64_t initial_size;
216 * The key entry is used to associate an ibf key with an element.
221 * IBF key for the entry, derived from the current salt.
223 struct IBF_Key ibf_key;
226 * The actual element associated with the key.
228 * Only owned by the union operation if element->operation
231 struct ElementEntry *element;
234 * Did we receive this element?
235 * Even if element->is_foreign is false, we might
236 * have received the element, so this indicates that
237 * the other peer has it.
244 * Used as a closure for sending elements
245 * with a specific IBF key.
247 struct SendElementClosure
250 * The IBF key whose matching elements should be
253 struct IBF_Key ibf_key;
256 * Operation for which the elements
259 struct Operation *op;
264 * Extra state required for efficient set union.
269 * The strata estimator is only generated once for
271 * The IBF keys are derived from the element hashes with
274 struct StrataEstimator *se;
279 * Iterator over hash map entries, called to
280 * destroy the linked list of colliding ibf key entries.
283 * @param key current key code
284 * @param value value in the hash map
285 * @return #GNUNET_YES if we should continue to iterate,
289 destroy_key_to_element_iter (void *cls,
293 struct KeyEntry *k = value;
295 GNUNET_assert (NULL != k);
296 if (GNUNET_YES == k->element->remote)
298 GNUNET_free (k->element);
307 * Destroy the union operation. Only things specific to the union
308 * operation are destroyed.
310 * @param op union operation to destroy
313 union_op_cancel (struct Operation *op)
315 LOG (GNUNET_ERROR_TYPE_DEBUG,
316 "destroying union op\n");
317 /* check if the op was canceled twice */
318 GNUNET_assert (NULL != op->state);
319 if (NULL != op->state->remote_ibf)
321 ibf_destroy (op->state->remote_ibf);
322 op->state->remote_ibf = NULL;
324 if (NULL != op->state->demanded_hashes)
326 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
327 op->state->demanded_hashes = NULL;
329 if (NULL != op->state->local_ibf)
331 ibf_destroy (op->state->local_ibf);
332 op->state->local_ibf = NULL;
334 if (NULL != op->state->se)
336 strata_estimator_destroy (op->state->se);
337 op->state->se = NULL;
339 if (NULL != op->state->key_to_element)
341 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
342 &destroy_key_to_element_iter,
344 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
345 op->state->key_to_element = NULL;
347 GNUNET_free (op->state);
349 LOG (GNUNET_ERROR_TYPE_DEBUG,
350 "destroying union op done\n");
355 * Inform the client that the union operation has failed,
356 * and proceed to destroy the evaluate operation.
358 * @param op the union operation to fail
361 fail_union_operation (struct Operation *op)
363 struct GNUNET_MQ_Envelope *ev;
364 struct GNUNET_SET_ResultMessage *msg;
366 LOG (GNUNET_ERROR_TYPE_ERROR,
367 "union operation failed\n");
368 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
369 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
370 msg->request_id = htonl (op->spec->client_request_id);
371 msg->element_type = htons (0);
372 GNUNET_MQ_send (op->spec->set->client_mq, ev);
373 _GSS_operation_destroy (op, GNUNET_YES);
378 * Derive the IBF key from a hash code and
381 * @param src the hash code
382 * @return the derived IBF key
384 static struct IBF_Key
385 get_ibf_key (const struct GNUNET_HashCode *src)
390 GNUNET_CRYPTO_kdf (&key, sizeof (key),
392 &salt, sizeof (salt),
399 * Context for #op_get_element_iterator
401 struct GetElementContext
403 struct GNUNET_HashCode hash;
409 * Iterator over the mapping from IBF keys to element entries. Checks if we
410 * have an element with a given GNUNET_HashCode.
413 * @param key current key code
414 * @param value value in the hash map
415 * @return #GNUNET_YES if we should search further,
416 * #GNUNET_NO if we've found the element.
419 op_get_element_iterator (void *cls,
423 struct GetElementContext *ctx = cls;
424 struct KeyEntry *k = value;
426 GNUNET_assert (NULL != k);
427 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
438 * Determine whether the given element is already in the operation's element
441 * @param op operation that should be tested for 'element_hash'
442 * @param element_hash hash of the element to look for
443 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
445 static struct KeyEntry *
446 op_get_element (struct Operation *op,
447 const struct GNUNET_HashCode *element_hash)
450 struct IBF_Key ibf_key;
451 struct GetElementContext ctx = {{{ 0 }} , 0};
453 ctx.hash = *element_hash;
455 ibf_key = get_ibf_key (element_hash);
456 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
457 (uint32_t) ibf_key.key_val,
458 op_get_element_iterator,
461 /* was the iteration aborted because we found the element? */
462 if (GNUNET_SYSERR == ret)
464 GNUNET_assert (NULL != ctx.k);
472 * Insert an element into the union operation's
473 * key-to-element mapping. Takes ownership of 'ee'.
474 * Note that this does not insert the element in the set,
475 * only in the operation's key-element mapping.
476 * This is done to speed up re-tried operations, if some elements
477 * were transmitted, and then the IBF fails to decode.
479 * XXX: clarify ownership, doesn't sound right.
481 * @param op the union operation
482 * @param ee the element entry
483 * @parem received was this element received from the remote peer?
486 op_register_element (struct Operation *op,
487 struct ElementEntry *ee,
490 struct IBF_Key ibf_key;
493 ibf_key = get_ibf_key (&ee->element_hash);
494 k = GNUNET_new (struct KeyEntry);
496 k->ibf_key = ibf_key;
497 k->received = received;
498 GNUNET_assert (GNUNET_OK ==
499 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
500 (uint32_t) ibf_key.key_val,
502 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
507 salt_key (const struct IBF_Key *k_in,
509 struct IBF_Key *k_out)
512 uint64_t x = k_in->key_val;
514 x = (x >> s) | (x << (64 - s));
520 unsalt_key (const struct IBF_Key *k_in,
522 struct IBF_Key *k_out)
525 uint64_t x = k_in->key_val;
526 x = (x << s) | (x >> (64 - s));
532 * Insert a key into an ibf.
536 * @param value the key entry to get the key from
539 prepare_ibf_iterator (void *cls,
543 struct Operation *op = cls;
544 struct KeyEntry *ke = value;
545 struct IBF_Key salted_key;
547 LOG (GNUNET_ERROR_TYPE_DEBUG,
548 "[OP %x] inserting %lx (hash %s) into ibf\n",
550 (unsigned long) ke->ibf_key.key_val,
551 GNUNET_h2s (&ke->element->element_hash));
552 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
553 ibf_insert (op->state->local_ibf, salted_key);
559 * Iterator for initializing the
560 * key-to-element mapping of a union operation
562 * @param cls the union operation `struct Operation *`
564 * @param value the `struct ElementEntry *` to insert
565 * into the key-to-element mapping
566 * @return #GNUNET_YES (to continue iterating)
569 init_key_to_element_iterator (void *cls,
570 const struct GNUNET_HashCode *key,
573 struct Operation *op = cls;
574 struct ElementEntry *ee = value;
576 /* make sure that the element belongs to the set at the time
577 * of creating the operation */
578 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
581 GNUNET_assert (GNUNET_NO == ee->remote);
583 op_register_element (op, ee, GNUNET_NO);
589 * Initialize the IBF key to element mapping local to this set
592 * @param op the set union operation
595 initialize_key_to_element (struct Operation *op)
599 GNUNET_assert (NULL == op->state->key_to_element);
600 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
607 * Create an ibf with the operation's elements
608 * of the specified size
610 * @param op the union operation
611 * @param size size of the ibf to create
612 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
615 prepare_ibf (struct Operation *op,
618 GNUNET_assert (NULL != op->state->key_to_element);
620 if (NULL != op->state->local_ibf)
621 ibf_destroy (op->state->local_ibf);
622 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
623 if (NULL == op->state->local_ibf)
625 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
626 "Failed to allocate local IBF\n");
627 return GNUNET_SYSERR;
629 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
630 &prepare_ibf_iterator,
637 * Send an ibf of appropriate size.
639 * Fragments the IBF into multiple messages if necessary.
641 * @param op the union operation
642 * @param ibf_order order of the ibf to send, size=2^order
643 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
646 send_ibf (struct Operation *op,
649 unsigned int buckets_sent = 0;
650 struct InvertibleBloomFilter *ibf;
653 prepare_ibf (op, 1<<ibf_order))
655 /* allocation failed */
656 return GNUNET_SYSERR;
659 LOG (GNUNET_ERROR_TYPE_DEBUG,
660 "sending ibf of size %u\n",
664 char name[64] = { 0 };
665 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
666 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
669 ibf = op->state->local_ibf;
671 while (buckets_sent < (1 << ibf_order))
673 unsigned int buckets_in_message;
674 struct GNUNET_MQ_Envelope *ev;
675 struct IBFMessage *msg;
677 buckets_in_message = (1 << ibf_order) - buckets_sent;
678 /* limit to maximum */
679 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
680 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
682 ev = GNUNET_MQ_msg_extra (msg,
683 buckets_in_message * IBF_BUCKET_SIZE,
684 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
687 msg->order = ibf_order;
688 msg->offset = htonl (buckets_sent);
689 msg->salt = htonl (op->state->salt_send);
690 ibf_write_slice (ibf, buckets_sent,
691 buckets_in_message, &msg[1]);
692 buckets_sent += buckets_in_message;
693 LOG (GNUNET_ERROR_TYPE_DEBUG,
694 "ibf chunk size %u, %u/%u sent\n",
698 GNUNET_MQ_send (op->mq, ev);
701 /* The other peer must decode the IBF, so
703 op->state->phase = PHASE_INVENTORY_PASSIVE;
709 * Send a strata estimator to the remote peer.
711 * @param op the union operation with the remote peer
714 send_strata_estimator (struct Operation *op)
716 const struct StrataEstimator *se = op->state->se;
717 struct GNUNET_MQ_Envelope *ev;
718 struct StrataEstimatorMessage *strata_msg;
723 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
724 len = strata_estimator_write (op->state->se,
726 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
727 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
729 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
730 ev = GNUNET_MQ_msg_extra (strata_msg,
733 GNUNET_memcpy (&strata_msg[1],
737 strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
738 GNUNET_MQ_send (op->mq,
740 op->state->phase = PHASE_EXPECT_IBF;
741 LOG (GNUNET_ERROR_TYPE_DEBUG,
742 "sent SE, expecting IBF\n");
747 * Compute the necessary order of an ibf
748 * from the size of the symmetric set difference.
750 * @param diff the difference
751 * @return the required size of the ibf
754 get_order_from_difference (unsigned int diff)
756 unsigned int ibf_order;
759 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
760 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
762 if (ibf_order > MAX_IBF_ORDER)
763 ibf_order = MAX_IBF_ORDER;
764 // add one for correction
765 return ibf_order + 1;
770 * Send a set element.
772 * @param cls the union operation `struct Operation *`
774 * @param value the `struct ElementEntry *` to insert
775 * into the key-to-element mapping
776 * @return #GNUNET_YES (to continue iterating)
779 send_element_iterator (void *cls,
780 const struct GNUNET_HashCode *key,
783 struct Operation *op = cls;
784 struct GNUNET_SET_ElementMessage *emsg;
785 struct ElementEntry *ee = value;
786 struct GNUNET_SET_Element *el = &ee->element;
787 struct GNUNET_MQ_Envelope *ev;
790 ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
791 emsg->element_type = htons (el->element_type);
792 GNUNET_memcpy (&emsg[1], el->data, el->size);
793 GNUNET_MQ_send (op->mq, ev);
799 send_full_set (struct Operation *op)
801 struct GNUNET_MQ_Envelope *ev;
803 op->state->phase = PHASE_FULL_SENDING;
805 (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
806 &send_element_iterator, op);
807 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
808 GNUNET_MQ_send (op->mq, ev);
813 * Handle a strata estimator from a remote peer
815 * @param cls the union operation
816 * @param mh the message
817 * @param is_compressed #GNUNET_YES if the estimator is compressed
818 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
819 * #GNUNET_OK otherwise
822 handle_p2p_strata_estimator (void *cls,
823 const struct GNUNET_MessageHeader *mh,
826 struct Operation *op = cls;
827 struct StrataEstimator *remote_se;
828 struct StrataEstimatorMessage *msg = (void *) mh;
833 GNUNET_STATISTICS_update (_GSS_statistics,
834 "# bytes of SE received",
838 if (op->state->phase != PHASE_EXPECT_SE)
841 fail_union_operation (op);
842 return GNUNET_SYSERR;
844 len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
845 if ( (GNUNET_NO == is_compressed) &&
846 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
848 fail_union_operation (op);
850 return GNUNET_SYSERR;
852 other_size = GNUNET_ntohll (msg->set_size);
853 remote_se = strata_estimator_create (SE_STRATA_COUNT,
856 if (NULL == remote_se)
858 /* insufficient resources, fail */
859 fail_union_operation (op);
860 return GNUNET_SYSERR;
863 strata_estimator_read (&msg[1],
868 /* decompression failed */
869 fail_union_operation (op);
870 strata_estimator_destroy (remote_se);
871 return GNUNET_SYSERR;
873 GNUNET_assert (NULL != op->state->se);
874 diff = strata_estimator_difference (remote_se,
876 strata_estimator_destroy (remote_se);
877 strata_estimator_destroy (op->state->se);
878 op->state->se = NULL;
879 LOG (GNUNET_ERROR_TYPE_DEBUG,
880 "got se diff=%d, using ibf size %d\n",
882 1<<get_order_from_difference (diff));
884 if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
887 fail_union_operation (op);
888 return GNUNET_SYSERR;
892 if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2))
894 LOG (GNUNET_ERROR_TYPE_INFO,
895 "Sending full set (diff=%d, own set=%u)\n",
897 op->state->initial_size);
898 if (op->state->initial_size <= other_size)
904 struct GNUNET_MQ_Envelope *ev;
905 op->state->phase = PHASE_EXPECT_IBF;
906 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
907 GNUNET_MQ_send (op->mq, ev);
914 get_order_from_difference (diff)))
916 /* Internal error, best we can do is shut the connection */
917 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
918 "Failed to send IBF, closing connection\n");
919 fail_union_operation (op);
920 return GNUNET_SYSERR;
929 * Iterator to send elements to a remote peer
931 * @param cls closure with the element key and the union operation
933 * @param value the key entry
936 send_offers_iterator (void *cls,
940 struct SendElementClosure *sec = cls;
941 struct Operation *op = sec->op;
942 struct KeyEntry *ke = value;
943 struct GNUNET_MQ_Envelope *ev;
944 struct GNUNET_MessageHeader *mh;
946 /* Detect 32-bit key collision for the 64-bit IBF keys. */
947 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
950 ev = GNUNET_MQ_msg_header_extra (mh,
951 sizeof (struct GNUNET_HashCode),
952 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
954 GNUNET_assert (NULL != ev);
955 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
956 LOG (GNUNET_ERROR_TYPE_DEBUG,
957 "[OP %x] sending element offer (%s) to peer\n",
959 GNUNET_h2s (&ke->element->element_hash));
960 GNUNET_MQ_send (op->mq, ev);
966 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
968 * @param op union operation
969 * @param ibf_key IBF key of interest
972 send_offers_for_key (struct Operation *op,
973 struct IBF_Key ibf_key)
975 struct SendElementClosure send_cls;
977 send_cls.ibf_key = ibf_key;
979 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
980 (uint32_t) ibf_key.key_val,
981 &send_offers_iterator,
987 * Decode which elements are missing on each side, and
988 * send the appropriate offers and inquiries.
990 * @param op union operation
991 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
994 decode_and_send (struct Operation *op)
997 struct IBF_Key last_key;
999 unsigned int num_decoded;
1000 struct InvertibleBloomFilter *diff_ibf;
1002 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1005 prepare_ibf (op, op->state->remote_ibf->size))
1008 /* allocation failed */
1009 return GNUNET_SYSERR;
1011 diff_ibf = ibf_dup (op->state->local_ibf);
1012 ibf_subtract (diff_ibf, op->state->remote_ibf);
1014 ibf_destroy (op->state->remote_ibf);
1015 op->state->remote_ibf = NULL;
1017 LOG (GNUNET_ERROR_TYPE_DEBUG,
1018 "decoding IBF (size=%u)\n",
1022 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1027 int cycle_detected = GNUNET_NO;
1031 res = ibf_decode (diff_ibf, &side, &key);
1032 if (res == GNUNET_OK)
1034 LOG (GNUNET_ERROR_TYPE_DEBUG,
1035 "decoded ibf key %lx\n",
1036 (unsigned long) key.key_val);
1038 if ( (num_decoded > diff_ibf->size) ||
1039 ( (num_decoded > 1) &&
1040 (last_key.key_val == key.key_val) ) )
1042 LOG (GNUNET_ERROR_TYPE_DEBUG,
1043 "detected cyclic ibf (decoded %u/%u)\n",
1046 cycle_detected = GNUNET_YES;
1049 if ( (GNUNET_SYSERR == res) ||
1050 (GNUNET_YES == cycle_detected) )
1054 while (1<<next_order < diff_ibf->size)
1057 if (next_order <= MAX_IBF_ORDER)
1059 LOG (GNUNET_ERROR_TYPE_DEBUG,
1060 "decoding failed, sending larger ibf (size %u)\n",
1062 GNUNET_STATISTICS_update (_GSS_statistics,
1066 op->state->salt_send++;
1068 send_ibf (op, next_order))
1070 /* Internal error, best we can do is shut the connection */
1071 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1072 "Failed to send IBF, closing connection\n");
1073 fail_union_operation (op);
1074 ibf_destroy (diff_ibf);
1075 return GNUNET_SYSERR;
1080 GNUNET_STATISTICS_update (_GSS_statistics,
1081 "# of failed union operations (too large)",
1084 // XXX: Send the whole set, element-by-element
1085 LOG (GNUNET_ERROR_TYPE_ERROR,
1086 "set union failed: reached ibf limit\n");
1087 fail_union_operation (op);
1088 ibf_destroy (diff_ibf);
1089 return GNUNET_SYSERR;
1093 if (GNUNET_NO == res)
1095 struct GNUNET_MQ_Envelope *ev;
1097 LOG (GNUNET_ERROR_TYPE_DEBUG,
1098 "transmitted all values, sending DONE\n");
1099 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1100 GNUNET_MQ_send (op->mq, ev);
1101 /* We now wait until we get a DONE message back
1102 * and then wait for our MQ to be flushed and all our
1103 * demands be delivered. */
1108 struct IBF_Key unsalted_key;
1109 unsalt_key (&key, op->state->salt_receive, &unsalted_key);
1110 send_offers_for_key (op, unsalted_key);
1112 else if (-1 == side)
1114 struct GNUNET_MQ_Envelope *ev;
1115 struct InquiryMessage *msg;
1117 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1118 * the effort additional complexity. */
1119 ev = GNUNET_MQ_msg_extra (msg,
1120 sizeof (struct IBF_Key),
1121 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1122 msg->salt = htonl (op->state->salt_receive);
1123 GNUNET_memcpy (&msg[1],
1125 sizeof (struct IBF_Key));
1126 LOG (GNUNET_ERROR_TYPE_DEBUG,
1127 "sending element inquiry for IBF key %lx\n",
1128 (unsigned long) key.key_val);
1129 GNUNET_MQ_send (op->mq, ev);
1136 ibf_destroy (diff_ibf);
1142 * Handle an IBF message from a remote peer.
1144 * Reassemble the IBF from multiple pieces, and
1145 * process the whole IBF once possible.
1147 * @param cls the union operation
1148 * @param mh the header of the message
1149 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1150 * #GNUNET_OK otherwise
1153 handle_p2p_ibf (void *cls,
1154 const struct GNUNET_MessageHeader *mh)
1156 struct Operation *op = cls;
1157 const struct IBFMessage *msg;
1158 unsigned int buckets_in_message;
1160 if (ntohs (mh->size) < sizeof (struct IBFMessage))
1162 GNUNET_break_op (0);
1163 fail_union_operation (op);
1164 return GNUNET_SYSERR;
1166 msg = (const struct IBFMessage *) mh;
1167 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1168 (op->state->phase == PHASE_EXPECT_IBF) )
1170 op->state->phase = PHASE_EXPECT_IBF_CONT;
1171 GNUNET_assert (NULL == op->state->remote_ibf);
1172 LOG (GNUNET_ERROR_TYPE_DEBUG,
1173 "Creating new ibf of size %u\n",
1175 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1176 op->state->salt_receive = ntohl (msg->salt);
1177 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1178 if (NULL == op->state->remote_ibf)
1180 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1181 "Failed to parse remote IBF, closing connection\n");
1182 fail_union_operation (op);
1183 return GNUNET_SYSERR;
1185 op->state->ibf_buckets_received = 0;
1186 if (0 != ntohl (msg->offset))
1188 GNUNET_break_op (0);
1189 fail_union_operation (op);
1190 return GNUNET_SYSERR;
1193 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1195 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1197 GNUNET_break_op (0);
1198 fail_union_operation (op);
1199 return GNUNET_SYSERR;
1201 if (1<<msg->order != op->state->remote_ibf->size)
1203 GNUNET_break_op (0);
1204 fail_union_operation (op);
1205 return GNUNET_SYSERR;
1207 if (ntohl (msg->salt) != op->state->salt_receive)
1209 GNUNET_break_op (0);
1210 fail_union_operation (op);
1211 return GNUNET_SYSERR;
1219 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1221 if (0 == buckets_in_message)
1223 GNUNET_break_op (0);
1224 fail_union_operation (op);
1225 return GNUNET_SYSERR;
1228 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1230 GNUNET_break_op (0);
1231 fail_union_operation (op);
1232 return GNUNET_SYSERR;
1235 GNUNET_assert (NULL != op->state->remote_ibf);
1237 ibf_read_slice (&msg[1],
1238 op->state->ibf_buckets_received,
1240 op->state->remote_ibf);
1241 op->state->ibf_buckets_received += buckets_in_message;
1243 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1245 LOG (GNUNET_ERROR_TYPE_DEBUG,
1246 "received full ibf\n");
1247 op->state->phase = PHASE_INVENTORY_ACTIVE;
1249 decode_and_send (op))
1251 /* Internal error, best we can do is shut down */
1252 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1253 "Failed to decode IBF, closing connection\n");
1254 return GNUNET_SYSERR;
1262 * Send a result message to the client indicating
1263 * that there is a new element.
1265 * @param op union operation
1266 * @param element element to send
1267 * @param status status to send with the new element
1270 send_client_element (struct Operation *op,
1271 struct GNUNET_SET_Element *element,
1274 struct GNUNET_MQ_Envelope *ev;
1275 struct GNUNET_SET_ResultMessage *rm;
1277 LOG (GNUNET_ERROR_TYPE_DEBUG,
1278 "sending element (size %u) to client\n",
1280 GNUNET_assert (0 != op->spec->client_request_id);
1281 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1284 GNUNET_MQ_discard (ev);
1288 rm->result_status = htons (status);
1289 rm->request_id = htonl (op->spec->client_request_id);
1290 rm->element_type = htons (element->element_type);
1291 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1292 GNUNET_memcpy (&rm[1], element->data, element->size);
1293 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1298 * Signal to the client that the operation has finished and
1299 * destroy the operation.
1301 * @param cls operation to destroy
1304 send_done_and_destroy (void *cls)
1306 struct Operation *op = cls;
1307 struct GNUNET_MQ_Envelope *ev;
1308 struct GNUNET_SET_ResultMessage *rm;
1310 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1311 rm->request_id = htonl (op->spec->client_request_id);
1312 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1313 rm->element_type = htons (0);
1314 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1315 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1316 /* Will also call the union-specific cancel function. */
1317 _GSS_operation_destroy (op, GNUNET_YES);
1322 maybe_finish (struct Operation *op)
1324 unsigned int num_demanded;
1326 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1328 if (PHASE_FINISH_WAITING == op->state->phase)
1330 LOG (GNUNET_ERROR_TYPE_DEBUG,
1331 "In PHASE_FINISH_WAITING, pending %u demands\n",
1333 if (0 == num_demanded)
1335 struct GNUNET_MQ_Envelope *ev;
1337 op->state->phase = PHASE_DONE;
1338 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1339 GNUNET_MQ_send (op->mq, ev);
1341 /* We now wait until the other peer closes the channel
1342 * after it got all elements from us. */
1345 if (PHASE_FINISH_CLOSING == op->state->phase)
1347 LOG (GNUNET_ERROR_TYPE_DEBUG,
1348 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1350 if (0 == num_demanded)
1352 op->state->phase = PHASE_DONE;
1353 send_done_and_destroy (op);
1360 * Handle an element message from a remote peer.
1361 * Sent by the other peer either because we decoded an IBF and placed a demand,
1362 * or because the other peer switched to full set transmission.
1364 * @param cls the union operation
1365 * @param mh the message
1368 handle_p2p_elements (void *cls,
1369 const struct GNUNET_MessageHeader *mh)
1371 struct Operation *op = cls;
1372 struct ElementEntry *ee;
1373 const struct GNUNET_SET_ElementMessage *emsg;
1374 uint16_t element_size;
1376 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1378 GNUNET_break_op (0);
1379 fail_union_operation (op);
1382 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1384 GNUNET_break_op (0);
1385 fail_union_operation (op);
1389 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1391 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1392 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1393 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1394 ee->element.size = element_size;
1395 ee->element.data = &ee[1];
1396 ee->element.element_type = ntohs (emsg->element_type);
1397 ee->remote = GNUNET_YES;
1398 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1401 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1405 /* We got something we didn't demand, since it's not in our map. */
1406 GNUNET_break_op (0);
1408 fail_union_operation (op);
1412 LOG (GNUNET_ERROR_TYPE_DEBUG,
1413 "Got element (size %u, hash %s) from peer\n",
1414 (unsigned int) element_size,
1415 GNUNET_h2s (&ee->element_hash));
1417 GNUNET_STATISTICS_update (_GSS_statistics,
1418 "# received elements",
1421 GNUNET_STATISTICS_update (_GSS_statistics,
1422 "# exchanged elements",
1426 op->state->received_total += 1;
1428 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1432 /* Got repeated element. Should not happen since
1433 * we track demands. */
1434 GNUNET_STATISTICS_update (_GSS_statistics,
1435 "# repeated elements",
1438 ke->received = GNUNET_YES;
1443 LOG (GNUNET_ERROR_TYPE_DEBUG,
1444 "Registering new element from remote peer\n");
1445 op->state->received_fresh += 1;
1446 op_register_element (op, ee, GNUNET_YES);
1447 /* only send results immediately if the client wants it */
1448 switch (op->spec->result_mode)
1450 case GNUNET_SET_RESULT_ADDED:
1451 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1453 case GNUNET_SET_RESULT_SYMMETRIC:
1454 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1457 /* Result mode not supported, should have been caught earlier. */
1463 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1465 /* The other peer gave us lots of old elements, there's something wrong. */
1466 GNUNET_break_op (0);
1467 fail_union_operation (op);
1476 * Handle an element message from a remote peer.
1478 * @param cls the union operation
1479 * @param mh the message
1482 handle_p2p_full_element (void *cls,
1483 const struct GNUNET_MessageHeader *mh)
1485 struct Operation *op = cls;
1486 struct ElementEntry *ee;
1487 const struct GNUNET_SET_ElementMessage *emsg;
1488 uint16_t element_size;
1490 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1492 GNUNET_break_op (0);
1493 fail_union_operation (op);
1497 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1499 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1500 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1501 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1502 ee->element.size = element_size;
1503 ee->element.data = &ee[1];
1504 ee->element.element_type = ntohs (emsg->element_type);
1505 ee->remote = GNUNET_YES;
1506 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1508 LOG (GNUNET_ERROR_TYPE_DEBUG,
1509 "Got element (full diff, size %u, hash %s) from peer\n",
1510 (unsigned int) element_size,
1511 GNUNET_h2s (&ee->element_hash));
1513 GNUNET_STATISTICS_update (_GSS_statistics,
1514 "# received elements",
1517 GNUNET_STATISTICS_update (_GSS_statistics,
1518 "# exchanged elements",
1522 op->state->received_total += 1;
1524 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1528 /* Got repeated element. Should not happen since
1529 * we track demands. */
1530 GNUNET_STATISTICS_update (_GSS_statistics,
1531 "# repeated elements",
1534 ke->received = GNUNET_YES;
1539 LOG (GNUNET_ERROR_TYPE_DEBUG,
1540 "Registering new element from remote peer\n");
1541 op->state->received_fresh += 1;
1542 op_register_element (op, ee, GNUNET_YES);
1543 /* only send results immediately if the client wants it */
1544 switch (op->spec->result_mode)
1546 case GNUNET_SET_RESULT_ADDED:
1547 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1549 case GNUNET_SET_RESULT_SYMMETRIC:
1550 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1553 /* Result mode not supported, should have been caught earlier. */
1559 if ( (GNUNET_YES == op->spec->byzantine) &&
1560 (op->state->received_total > 128) &&
1561 (op->state->received_fresh < op->state->received_total / 3) )
1563 /* The other peer gave us lots of old elements, there's something wrong. */
1564 LOG (GNUNET_ERROR_TYPE_ERROR,
1565 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1566 (unsigned long long) op->state->received_fresh,
1567 (unsigned long long) op->state->received_total);
1568 GNUNET_break_op (0);
1569 fail_union_operation (op);
1575 * Send offers (for GNUNET_Hash-es) in response
1576 * to inquiries (for IBF_Key-s).
1578 * @param cls the union operation
1579 * @param mh the message
1582 handle_p2p_inquiry (void *cls,
1583 const struct GNUNET_MessageHeader *mh)
1585 struct Operation *op = cls;
1586 const struct IBF_Key *ibf_key;
1587 unsigned int num_keys;
1588 struct InquiryMessage *msg;
1590 /* look up elements and send them */
1591 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1593 GNUNET_break_op (0);
1594 fail_union_operation (op);
1597 num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1598 / sizeof (struct IBF_Key);
1599 if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1600 != num_keys * sizeof (struct IBF_Key))
1602 GNUNET_break_op (0);
1603 fail_union_operation (op);
1607 msg = (struct InquiryMessage *) mh;
1609 ibf_key = (const struct IBF_Key *) &msg[1];
1610 while (0 != num_keys--)
1612 struct IBF_Key unsalted_key;
1613 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1614 send_offers_for_key (op, unsalted_key);
1621 * Iterator over hash map entries, called to
1622 * destroy the linked list of colliding ibf key entries.
1624 * @param cls closure
1625 * @param key current key code
1626 * @param value value in the hash map
1627 * @return #GNUNET_YES if we should continue to iterate,
1628 * #GNUNET_NO if not.
1631 send_missing_elements_iter (void *cls,
1635 struct Operation *op = cls;
1636 struct KeyEntry *ke = value;
1637 struct GNUNET_MQ_Envelope *ev;
1638 struct GNUNET_SET_ElementMessage *emsg;
1639 struct ElementEntry *ee = ke->element;
1641 if (GNUNET_YES == ke->received)
1644 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1645 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1646 emsg->reserved = htons (0);
1647 emsg->element_type = htons (ee->element.element_type);
1648 GNUNET_MQ_send (op->mq, ev);
1657 * @parem cls closure, a set union operation
1658 * @param mh the demand message
1661 handle_p2p_request_full (void *cls,
1662 const struct GNUNET_MessageHeader *mh)
1664 struct Operation *op = cls;
1666 if (PHASE_EXPECT_IBF != op->state->phase)
1668 fail_union_operation (op);
1669 GNUNET_break_op (0);
1673 // FIXME: we need to check that our set is larger than the
1674 // byzantine_lower_bound by some threshold
1680 * Handle a "full done" message.
1682 * @parem cls closure, a set union operation
1683 * @param mh the demand message
1686 handle_p2p_full_done (void *cls,
1687 const struct GNUNET_MessageHeader *mh)
1689 struct Operation *op = cls;
1691 if (PHASE_EXPECT_IBF == op->state->phase)
1693 struct GNUNET_MQ_Envelope *ev;
1695 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1697 /* send all the elements that did not come from the remote peer */
1698 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1699 &send_missing_elements_iter,
1702 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1703 GNUNET_MQ_send (op->mq, ev);
1704 op->state->phase = PHASE_DONE;
1706 /* we now wait until the other peer shuts the tunnel down*/
1708 else if (PHASE_FULL_SENDING == op->state->phase)
1710 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1711 /* We sent the full set, and got the response for that. We're done. */
1712 op->state->phase = PHASE_DONE;
1713 send_done_and_destroy (op);
1717 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1718 GNUNET_break_op (0);
1719 fail_union_operation (op);
1726 * Handle a demand by the other peer for elements based on a list
1727 * of GNUNET_HashCode-s.
1729 * @parem cls closure, a set union operation
1730 * @param mh the demand message
1733 handle_p2p_demand (void *cls,
1734 const struct GNUNET_MessageHeader *mh)
1736 struct Operation *op = cls;
1737 struct ElementEntry *ee;
1738 struct GNUNET_SET_ElementMessage *emsg;
1739 const struct GNUNET_HashCode *hash;
1740 unsigned int num_hashes;
1741 struct GNUNET_MQ_Envelope *ev;
1743 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1744 / sizeof (struct GNUNET_HashCode);
1745 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1746 != num_hashes * sizeof (struct GNUNET_HashCode))
1748 GNUNET_break_op (0);
1749 fail_union_operation (op);
1753 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1755 hash++, num_hashes--)
1757 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1760 /* Demand for non-existing element. */
1761 GNUNET_break_op (0);
1762 fail_union_operation (op);
1765 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1767 /* Probably confused lazily copied sets. */
1768 GNUNET_break_op (0);
1769 fail_union_operation (op);
1772 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1773 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1774 emsg->reserved = htons (0);
1775 emsg->element_type = htons (ee->element.element_type);
1776 LOG (GNUNET_ERROR_TYPE_DEBUG,
1777 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1779 (unsigned int) ee->element.size,
1780 GNUNET_h2s (&ee->element_hash));
1781 GNUNET_MQ_send (op->mq, ev);
1782 GNUNET_STATISTICS_update (_GSS_statistics,
1783 "# exchanged elements",
1787 switch (op->spec->result_mode)
1789 case GNUNET_SET_RESULT_ADDED:
1790 /* Nothing to do. */
1792 case GNUNET_SET_RESULT_SYMMETRIC:
1793 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1796 /* Result mode not supported, should have been caught earlier. */
1805 * Handle offers (of GNUNET_HashCode-s) and
1806 * respond with demands (of GNUNET_HashCode-s).
1808 * @param cls the union operation
1809 * @param mh the message
1812 handle_p2p_offer (void *cls,
1813 const struct GNUNET_MessageHeader *mh)
1815 struct Operation *op = cls;
1816 const struct GNUNET_HashCode *hash;
1817 unsigned int num_hashes;
1819 /* look up elements and send them */
1820 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1821 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1823 GNUNET_break_op (0);
1824 fail_union_operation (op);
1827 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1828 / sizeof (struct GNUNET_HashCode);
1829 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1830 != num_hashes * sizeof (struct GNUNET_HashCode))
1832 GNUNET_break_op (0);
1833 fail_union_operation (op);
1837 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1839 hash++, num_hashes--)
1841 struct ElementEntry *ee;
1842 struct GNUNET_MessageHeader *demands;
1843 struct GNUNET_MQ_Envelope *ev;
1845 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1848 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1852 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1855 LOG (GNUNET_ERROR_TYPE_DEBUG,
1856 "Skipped sending duplicate demand\n");
1860 GNUNET_assert (GNUNET_OK ==
1861 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1864 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1866 LOG (GNUNET_ERROR_TYPE_DEBUG,
1867 "[OP %x] Requesting element (hash %s)\n",
1868 (void *) op, GNUNET_h2s (hash));
1869 ev = GNUNET_MQ_msg_header_extra (demands,
1870 sizeof (struct GNUNET_HashCode),
1871 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1872 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1873 GNUNET_MQ_send (op->mq, ev);
1879 * Handle a done message from a remote peer
1881 * @param cls the union operation
1882 * @param mh the message
1885 handle_p2p_done (void *cls,
1886 const struct GNUNET_MessageHeader *mh)
1888 struct Operation *op = cls;
1890 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1892 /* We got all requests, but still have to send our elements in response. */
1894 op->state->phase = PHASE_FINISH_WAITING;
1896 LOG (GNUNET_ERROR_TYPE_DEBUG,
1897 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1898 /* The active peer is done sending offers
1899 * and inquiries. This means that all
1900 * our responses to that (demands and offers)
1901 * must be in flight (queued or in mesh).
1903 * We should notify the active peer once
1904 * all our demands are satisfied, so that the active
1905 * peer can quit if we gave him everything.
1910 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1912 LOG (GNUNET_ERROR_TYPE_DEBUG,
1913 "got DONE (as active partner), waiting to finish\n");
1914 /* All demands of the other peer are satisfied,
1915 * and we processed all offers, thus we know
1916 * exactly what our demands must be.
1918 * We'll close the channel
1919 * to the other peer once our demands are met.
1921 op->state->phase = PHASE_FINISH_CLOSING;
1925 GNUNET_break_op (0);
1926 fail_union_operation (op);
1931 * Initiate operation to evaluate a set union with a remote peer.
1933 * @param op operation to perform (to be initialized)
1934 * @param opaque_context message to be transmitted to the listener
1935 * to convince him to accept, may be NULL
1938 union_evaluate (struct Operation *op,
1939 const struct GNUNET_MessageHeader *opaque_context)
1941 struct GNUNET_MQ_Envelope *ev;
1942 struct OperationRequestMessage *msg;
1944 GNUNET_assert (NULL == op->state);
1945 op->state = GNUNET_new (struct OperationState);
1946 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1947 /* copy the current generation's strata estimator for this operation */
1948 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1949 /* we started the operation, thus we have to send the operation request */
1950 op->state->phase = PHASE_EXPECT_SE;
1951 op->state->salt_receive = op->state->salt_send = 42;
1952 LOG (GNUNET_ERROR_TYPE_DEBUG,
1953 "Initiating union operation evaluation\n");
1954 GNUNET_STATISTICS_update (_GSS_statistics,
1955 "# of total union operations",
1958 GNUNET_STATISTICS_update (_GSS_statistics,
1959 "# of initiated union operations",
1962 ev = GNUNET_MQ_msg_nested_mh (msg,
1963 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1967 /* the context message is too large */
1969 GNUNET_SERVICE_client_drop (op->spec->set->client);
1972 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1973 GNUNET_MQ_send (op->mq,
1976 if (NULL != opaque_context)
1977 LOG (GNUNET_ERROR_TYPE_DEBUG,
1978 "sent op request with context message\n");
1980 LOG (GNUNET_ERROR_TYPE_DEBUG,
1981 "sent op request without context message\n");
1983 initialize_key_to_element (op);
1984 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
1989 * Accept an union operation request from a remote peer.
1990 * Only initializes the private operation state.
1992 * @param op operation that will be accepted as a union operation
1995 union_accept (struct Operation *op)
1997 LOG (GNUNET_ERROR_TYPE_DEBUG,
1998 "accepting set union operation\n");
1999 GNUNET_assert (NULL == op->state);
2001 GNUNET_STATISTICS_update (_GSS_statistics,
2002 "# of accepted union operations",
2005 GNUNET_STATISTICS_update (_GSS_statistics,
2006 "# of total union operations",
2010 op->state = GNUNET_new (struct OperationState);
2011 op->state->se = strata_estimator_dup (op->spec->set->state->se);
2012 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
2013 op->state->salt_receive = op->state->salt_send = 42;
2014 initialize_key_to_element (op);
2015 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
2016 /* kick off the operation */
2017 send_strata_estimator (op);
2022 * Create a new set supporting the union operation
2024 * We maintain one strata estimator per set and then manipulate it over the
2025 * lifetime of the set, as recreating a strata estimator would be expensive.
2027 * @return the newly created set, NULL on error
2029 static struct SetState *
2030 union_set_create (void)
2032 struct SetState *set_state;
2034 LOG (GNUNET_ERROR_TYPE_DEBUG,
2035 "union set created\n");
2036 set_state = GNUNET_new (struct SetState);
2037 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2038 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2039 if (NULL == set_state->se)
2041 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2042 "Failed to allocate strata estimator\n");
2043 GNUNET_free (set_state);
2051 * Add the element from the given element message to the set.
2053 * @param set_state state of the set want to add to
2054 * @param ee the element to add to the set
2057 union_add (struct SetState *set_state, struct ElementEntry *ee)
2059 strata_estimator_insert (set_state->se,
2060 get_ibf_key (&ee->element_hash));
2065 * Remove the element given in the element message from the set.
2066 * Only marks the element as removed, so that older set operations can still exchange it.
2068 * @param set_state state of the set to remove from
2069 * @param ee set element to remove
2072 union_remove (struct SetState *set_state, struct ElementEntry *ee)
2074 strata_estimator_remove (set_state->se,
2075 get_ibf_key (&ee->element_hash));
2080 * Destroy a set that supports the union operation.
2082 * @param set_state the set to destroy
2085 union_set_destroy (struct SetState *set_state)
2087 if (NULL != set_state->se)
2089 strata_estimator_destroy (set_state->se);
2090 set_state->se = NULL;
2092 GNUNET_free (set_state);
2097 * Dispatch messages for a union operation.
2099 * @param op the state of the union evaluate operation
2100 * @param mh the received message
2101 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2102 * #GNUNET_OK otherwise
2105 union_handle_p2p_message (struct Operation *op,
2106 const struct GNUNET_MessageHeader *mh)
2108 //LOG (GNUNET_ERROR_TYPE_DEBUG,
2109 // "received p2p message (t: %u, s: %u)\n",
2110 // ntohs (mh->type),
2111 // ntohs (mh->size));
2112 switch (ntohs (mh->type))
2114 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2115 return handle_p2p_ibf (op, mh);
2116 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2117 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2118 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2119 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2120 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2121 handle_p2p_elements (op, mh);
2123 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2124 handle_p2p_full_element (op, mh);
2126 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2127 handle_p2p_inquiry (op, mh);
2129 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2130 handle_p2p_done (op, mh);
2132 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2133 handle_p2p_offer (op, mh);
2135 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2136 handle_p2p_demand (op, mh);
2138 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2139 handle_p2p_full_done (op, mh);
2141 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2142 handle_p2p_request_full (op, mh);
2145 /* Something wrong with cadet's message handlers? */
2153 * Handler for peer-disconnects, notifies the client
2154 * about the aborted operation in case the op was not concluded.
2156 * @param op the destroyed operation
2159 union_peer_disconnect (struct Operation *op)
2161 if (PHASE_DONE != op->state->phase)
2163 struct GNUNET_MQ_Envelope *ev;
2164 struct GNUNET_SET_ResultMessage *msg;
2166 ev = GNUNET_MQ_msg (msg,
2167 GNUNET_MESSAGE_TYPE_SET_RESULT);
2168 msg->request_id = htonl (op->spec->client_request_id);
2169 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2170 msg->element_type = htons (0);
2171 GNUNET_MQ_send (op->spec->set->client_mq,
2173 LOG (GNUNET_ERROR_TYPE_WARNING,
2174 "other peer disconnected prematurely, phase %u\n",
2176 _GSS_operation_destroy (op,
2180 // else: the session has already been concluded
2181 LOG (GNUNET_ERROR_TYPE_DEBUG,
2182 "other peer disconnected (finished)\n");
2183 if (GNUNET_NO == op->state->client_done_sent)
2184 send_done_and_destroy (op);
2189 * Copy union-specific set state.
2191 * @param set source set for copying the union state
2192 * @return a copy of the union-specific set state
2194 static struct SetState *
2195 union_copy_state (struct Set *set)
2197 struct SetState *new_state;
2199 new_state = GNUNET_new (struct SetState);
2200 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
2201 new_state->se = strata_estimator_dup (set->state->se);
2208 * Get the table with implementing functions for
2211 * @return the operation specific VTable
2213 const struct SetVT *
2216 static const struct SetVT union_vt = {
2217 .create = &union_set_create,
2218 .msg_handler = &union_handle_p2p_message,
2220 .remove = &union_remove,
2221 .destroy_set = &union_set_destroy,
2222 .evaluate = &union_evaluate,
2223 .accept = &union_accept,
2224 .peer_disconnect = &union_peer_disconnect,
2225 .cancel = &union_op_cancel,
2226 .copy_state = &union_copy_state,