2 This file is part of GNUnet
3 Copyright (C) 2013-2015 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
22 * @brief two-peer set operations
23 * @author Florian Dold
26 #include "gnunet_util_lib.h"
27 #include "gnunet_statistics_service.h"
28 #include "gnunet-service-set.h"
30 #include "gnunet-service-set_union_strata_estimator.h"
31 #include "gnunet-service-set_protocol.h"
35 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
39 * Number of IBFs in a strata estimator.
41 #define SE_STRATA_COUNT 32
44 * Size of the IBFs in the strata estimator.
46 #define SE_IBF_SIZE 80
49 * The hash num parameter for the difference digests and strata estimators.
51 #define SE_IBF_HASH_NUM 4
54 * Number of buckets that can be transmitted in one message.
56 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
59 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60 * Choose this value so that computing the IBF is still cheaper
61 * than transmitting all values.
63 #define MAX_IBF_ORDER (16)
66 * Number of buckets used in the ibf per estimated
73 * Current phase we are in for a union operation.
75 enum UnionOperationPhase
78 * We sent the request message, and expect a strata estimator.
83 * We sent the strata estimator, and expect an IBF. This phase is entered once
84 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86 * XXX: could use better wording.
88 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
93 * Continuation for multi part IBFs.
95 PHASE_EXPECT_IBF_CONT,
98 * We are decoding an IBF.
100 PHASE_INVENTORY_ACTIVE,
103 * The other peer is decoding the IBF we just sent.
105 PHASE_INVENTORY_PASSIVE,
108 * The protocol is almost finished, but we still have to flush our message
109 * queue and/or expect some elements.
111 PHASE_FINISH_CLOSING,
114 * In the penultimate phase,
115 * we wait until all our demands
116 * are satisfied. Then we send a done
117 * message, and wait for another done message.*/
118 PHASE_FINISH_WAITING,
121 * In the ultimate phase, we wait until
122 * our demands are satisfied and then
123 * quit (sending another DONE message). */
129 * State of an evaluate operation with another peer.
131 struct OperationState
134 * Copy of the set's strata estimator at the time of
135 * creation of this operation.
137 struct StrataEstimator *se;
140 * The IBF we currently receive.
142 struct InvertibleBloomFilter *remote_ibf;
145 * The IBF with the local set's element.
147 struct InvertibleBloomFilter *local_ibf;
150 * Maps IBF-Keys (specific to the current salt) to elements.
151 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
152 * Colliding IBF-Keys are linked.
154 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
157 * Current state of the operation.
159 enum UnionOperationPhase phase;
162 * Did we send the client that we are done?
164 int client_done_sent;
167 * Number of ibf buckets already received into the @a remote_ibf.
169 unsigned int ibf_buckets_received;
172 * Hashes for elements that we have demanded from the other peer.
174 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
179 * The key entry is used to associate an ibf key with an element.
184 * IBF key for the entry, derived from the current salt.
186 struct IBF_Key ibf_key;
189 * The actual element associated with the key.
191 * Only owned by the union operation if element->operation
194 struct ElementEntry *element;
199 * Used as a closure for sending elements
200 * with a specific IBF key.
202 struct SendElementClosure
205 * The IBF key whose matching elements should be
208 struct IBF_Key ibf_key;
211 * Operation for which the elements
214 struct Operation *op;
219 * Extra state required for efficient set union.
224 * The strata estimator is only generated once for
226 * The IBF keys are derived from the element hashes with
229 struct StrataEstimator *se;
234 * Iterator over hash map entries, called to
235 * destroy the linked list of colliding ibf key entries.
238 * @param key current key code
239 * @param value value in the hash map
240 * @return #GNUNET_YES if we should continue to iterate,
244 destroy_key_to_element_iter (void *cls,
248 struct KeyEntry *k = value;
250 GNUNET_assert (NULL != k);
251 if (GNUNET_YES == k->element->remote)
253 GNUNET_free (k->element);
262 * Destroy the union operation. Only things specific to the union
263 * operation are destroyed.
265 * @param op union operation to destroy
268 union_op_cancel (struct Operation *op)
270 LOG (GNUNET_ERROR_TYPE_DEBUG,
271 "destroying union op\n");
272 /* check if the op was canceled twice */
273 GNUNET_assert (NULL != op->state);
274 if (NULL != op->state->remote_ibf)
276 ibf_destroy (op->state->remote_ibf);
277 op->state->remote_ibf = NULL;
279 if (NULL != op->state->demanded_hashes)
281 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
282 op->state->demanded_hashes = NULL;
284 if (NULL != op->state->local_ibf)
286 ibf_destroy (op->state->local_ibf);
287 op->state->local_ibf = NULL;
289 if (NULL != op->state->se)
291 strata_estimator_destroy (op->state->se);
292 op->state->se = NULL;
294 if (NULL != op->state->key_to_element)
296 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
297 &destroy_key_to_element_iter,
299 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
300 op->state->key_to_element = NULL;
302 GNUNET_free (op->state);
304 LOG (GNUNET_ERROR_TYPE_DEBUG,
305 "destroying union op done\n");
310 * Inform the client that the union operation has failed,
311 * and proceed to destroy the evaluate operation.
313 * @param op the union operation to fail
316 fail_union_operation (struct Operation *op)
318 struct GNUNET_MQ_Envelope *ev;
319 struct GNUNET_SET_ResultMessage *msg;
321 LOG (GNUNET_ERROR_TYPE_ERROR,
322 "union operation failed\n");
323 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
324 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
325 msg->request_id = htonl (op->spec->client_request_id);
326 msg->element_type = htons (0);
327 GNUNET_MQ_send (op->spec->set->client_mq, ev);
328 _GSS_operation_destroy (op, GNUNET_YES);
333 * Derive the IBF key from a hash code and
336 * @param src the hash code
337 * @param salt salt to use
338 * @return the derived IBF key
340 static struct IBF_Key
341 get_ibf_key (const struct GNUNET_HashCode *src,
346 /* FIXME: Ensure that the salt is handled correctly.
347 This is a quick fix so that consensus works for now. */
350 GNUNET_CRYPTO_kdf (&key, sizeof (key),
352 &salt, sizeof (salt),
359 * Iterator over the mapping from IBF keys to element entries. Checks if we
360 * have an element with a given GNUNET_HashCode.
363 * @param key current key code
364 * @param value value in the hash map
365 * @return #GNUNET_YES if we should search further,
366 * #GNUNET_NO if we've found the element.
369 op_has_element_iterator (void *cls,
373 struct GNUNET_HashCode *element_hash = cls;
374 struct KeyEntry *k = value;
376 GNUNET_assert (NULL != k);
377 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
385 * Determine whether the given element is already in the operation's element
388 * @param op operation that should be tested for 'element_hash'
389 * @param element_hash hash of the element to look for
390 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
393 op_has_element (struct Operation *op,
394 const struct GNUNET_HashCode *element_hash)
397 struct IBF_Key ibf_key;
399 ibf_key = get_ibf_key (element_hash, op->spec->salt);
400 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
401 (uint32_t) ibf_key.key_val,
402 op_has_element_iterator,
403 (void *) element_hash);
405 /* was the iteration aborted because we found the element? */
406 if (GNUNET_SYSERR == ret)
413 * Insert an element into the union operation's
414 * key-to-element mapping. Takes ownership of 'ee'.
415 * Note that this does not insert the element in the set,
416 * only in the operation's key-element mapping.
417 * This is done to speed up re-tried operations, if some elements
418 * were transmitted, and then the IBF fails to decode.
420 * XXX: clarify ownership, doesn't sound right.
422 * @param op the union operation
423 * @param ee the element entry
426 op_register_element (struct Operation *op,
427 struct ElementEntry *ee)
429 struct IBF_Key ibf_key;
432 ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
433 k = GNUNET_new (struct KeyEntry);
435 k->ibf_key = ibf_key;
436 GNUNET_assert (GNUNET_OK ==
437 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
438 (uint32_t) ibf_key.key_val,
440 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
445 * Insert a key into an ibf.
449 * @param value the key entry to get the key from
452 prepare_ibf_iterator (void *cls,
456 struct Operation *op = cls;
457 struct KeyEntry *ke = value;
459 LOG (GNUNET_ERROR_TYPE_DEBUG,
460 "[OP %x] inserting %lx (hash %s) into ibf\n",
462 (unsigned long) ke->ibf_key.key_val,
463 GNUNET_h2s (&ke->element->element_hash));
464 ibf_insert (op->state->local_ibf, ke->ibf_key);
470 * Iterator for initializing the
471 * key-to-element mapping of a union operation
473 * @param cls the union operation `struct Operation *`
475 * @param value the `struct ElementEntry *` to insert
476 * into the key-to-element mapping
477 * @return #GNUNET_YES (to continue iterating)
480 init_key_to_element_iterator (void *cls,
481 const struct GNUNET_HashCode *key,
484 struct Operation *op = cls;
485 struct ElementEntry *ee = value;
487 /* make sure that the element belongs to the set at the time
488 * of creating the operation */
489 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
492 GNUNET_assert (GNUNET_NO == ee->remote);
494 op_register_element (op, ee);
500 * Create an ibf with the operation's elements
501 * of the specified size
503 * @param op the union operation
504 * @param size size of the ibf to create
505 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
508 prepare_ibf (struct Operation *op,
511 if (NULL == op->state->key_to_element)
515 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
516 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
517 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
518 init_key_to_element_iterator, op);
520 if (NULL != op->state->local_ibf)
521 ibf_destroy (op->state->local_ibf);
522 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
523 if (NULL == op->state->local_ibf)
525 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
526 "Failed to allocate local IBF\n");
527 return GNUNET_SYSERR;
529 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
530 &prepare_ibf_iterator,
537 * Send an ibf of appropriate size.
539 * Fragments the IBF into multiple messages if necessary.
541 * @param op the union operation
542 * @param ibf_order order of the ibf to send, size=2^order
543 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
546 send_ibf (struct Operation *op,
549 unsigned int buckets_sent = 0;
550 struct InvertibleBloomFilter *ibf;
553 prepare_ibf (op, 1<<ibf_order))
555 /* allocation failed */
556 return GNUNET_SYSERR;
559 LOG (GNUNET_ERROR_TYPE_DEBUG,
560 "sending ibf of size %u\n",
564 char name[64] = { 0 };
565 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
566 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
569 ibf = op->state->local_ibf;
571 while (buckets_sent < (1 << ibf_order))
573 unsigned int buckets_in_message;
574 struct GNUNET_MQ_Envelope *ev;
575 struct IBFMessage *msg;
577 buckets_in_message = (1 << ibf_order) - buckets_sent;
578 /* limit to maximum */
579 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
580 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
582 ev = GNUNET_MQ_msg_extra (msg,
583 buckets_in_message * IBF_BUCKET_SIZE,
584 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
586 msg->order = ibf_order;
587 msg->offset = htons (buckets_sent);
588 ibf_write_slice (ibf, buckets_sent,
589 buckets_in_message, &msg[1]);
590 buckets_sent += buckets_in_message;
591 LOG (GNUNET_ERROR_TYPE_DEBUG,
592 "ibf chunk size %u, %u/%u sent\n",
596 GNUNET_MQ_send (op->mq, ev);
599 /* The other peer must decode the IBF, so
601 op->state->phase = PHASE_INVENTORY_PASSIVE;
607 * Send a strata estimator to the remote peer.
609 * @param op the union operation with the remote peer
612 send_strata_estimator (struct Operation *op)
614 const struct StrataEstimator *se = op->state->se;
615 struct GNUNET_MQ_Envelope *ev;
616 struct GNUNET_MessageHeader *strata_msg;
621 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
622 len = strata_estimator_write (op->state->se,
624 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
625 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
627 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
628 ev = GNUNET_MQ_msg_header_extra (strata_msg,
631 memcpy (&strata_msg[1],
635 GNUNET_MQ_send (op->mq,
637 op->state->phase = PHASE_EXPECT_IBF;
638 LOG (GNUNET_ERROR_TYPE_DEBUG,
639 "sent SE, expecting IBF\n");
644 * Compute the necessary order of an ibf
645 * from the size of the symmetric set difference.
647 * @param diff the difference
648 * @return the required size of the ibf
651 get_order_from_difference (unsigned int diff)
653 unsigned int ibf_order;
656 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
657 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
659 if (ibf_order > MAX_IBF_ORDER)
660 ibf_order = MAX_IBF_ORDER;
666 * Handle a strata estimator from a remote peer
668 * @param cls the union operation
669 * @param mh the message
670 * @param is_compressed #GNUNET_YES if the estimator is compressed
671 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
672 * #GNUNET_OK otherwise
675 handle_p2p_strata_estimator (void *cls,
676 const struct GNUNET_MessageHeader *mh,
679 struct Operation *op = cls;
680 struct StrataEstimator *remote_se;
684 if (op->state->phase != PHASE_EXPECT_SE)
686 fail_union_operation (op);
688 return GNUNET_SYSERR;
690 len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
691 if ( (GNUNET_NO == is_compressed) &&
692 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
694 fail_union_operation (op);
696 return GNUNET_SYSERR;
698 remote_se = strata_estimator_create (SE_STRATA_COUNT,
701 if (NULL == remote_se)
703 /* insufficient resources, fail */
704 fail_union_operation (op);
705 return GNUNET_SYSERR;
708 strata_estimator_read (&mh[1],
713 /* decompression failed */
714 fail_union_operation (op);
715 return GNUNET_SYSERR;
717 GNUNET_assert (NULL != op->state->se);
718 diff = strata_estimator_difference (remote_se,
720 strata_estimator_destroy (remote_se);
721 strata_estimator_destroy (op->state->se);
722 op->state->se = NULL;
723 LOG (GNUNET_ERROR_TYPE_DEBUG,
724 "got se diff=%d, using ibf size %d\n",
726 1<<get_order_from_difference (diff));
729 get_order_from_difference (diff)))
731 /* Internal error, best we can do is shut the connection */
732 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
733 "Failed to send IBF, closing connection\n");
734 fail_union_operation (op);
735 return GNUNET_SYSERR;
742 * Iterator to send elements to a remote peer
744 * @param cls closure with the element key and the union operation
746 * @param value the key entry
749 send_offers_iterator (void *cls,
753 struct SendElementClosure *sec = cls;
754 struct Operation *op = sec->op;
755 struct KeyEntry *ke = value;
756 struct GNUNET_MQ_Envelope *ev;
757 struct GNUNET_MessageHeader *mh;
759 /* Detect 32-bit key collision for the 64-bit IBF keys. */
760 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
763 ev = GNUNET_MQ_msg_header_extra (mh,
764 sizeof (struct GNUNET_HashCode),
765 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
767 GNUNET_assert (NULL != ev);
768 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
769 LOG (GNUNET_ERROR_TYPE_DEBUG,
770 "[OP %x] sending element offer (%s) to peer\n",
772 GNUNET_h2s (&ke->element->element_hash));
773 GNUNET_MQ_send (op->mq, ev);
779 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
781 * @param op union operation
782 * @param ibf_key IBF key of interest
785 send_offers_for_key (struct Operation *op,
786 struct IBF_Key ibf_key)
788 struct SendElementClosure send_cls;
790 send_cls.ibf_key = ibf_key;
792 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
793 (uint32_t) ibf_key.key_val,
794 &send_offers_iterator,
800 * Decode which elements are missing on each side, and
801 * send the appropriate offers and inquiries.
803 * @param op union operation
804 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
807 decode_and_send (struct Operation *op)
810 struct IBF_Key last_key;
812 unsigned int num_decoded;
813 struct InvertibleBloomFilter *diff_ibf;
815 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
818 prepare_ibf (op, op->state->remote_ibf->size))
820 /* allocation failed */
821 return GNUNET_SYSERR;
823 diff_ibf = ibf_dup (op->state->local_ibf);
824 ibf_subtract (diff_ibf, op->state->remote_ibf);
826 ibf_destroy (op->state->remote_ibf);
827 op->state->remote_ibf = NULL;
829 LOG (GNUNET_ERROR_TYPE_DEBUG,
830 "decoding IBF (size=%u)\n",
834 last_key.key_val = 0;
839 int cycle_detected = GNUNET_NO;
843 res = ibf_decode (diff_ibf, &side, &key);
844 if (res == GNUNET_OK)
846 LOG (GNUNET_ERROR_TYPE_DEBUG,
847 "decoded ibf key %lx\n",
848 (unsigned long) key.key_val);
850 if ( (num_decoded > diff_ibf->size) ||
851 (num_decoded > 1 && last_key.key_val == key.key_val) )
853 LOG (GNUNET_ERROR_TYPE_DEBUG,
854 "detected cyclic ibf (decoded %u/%u)\n",
857 cycle_detected = GNUNET_YES;
860 if ( (GNUNET_SYSERR == res) ||
861 (GNUNET_YES == cycle_detected) )
865 while (1<<next_order < diff_ibf->size)
868 if (next_order <= MAX_IBF_ORDER)
870 LOG (GNUNET_ERROR_TYPE_DEBUG,
871 "decoding failed, sending larger ibf (size %u)\n",
873 GNUNET_STATISTICS_update (_GSS_statistics,
878 send_ibf (op, next_order))
880 /* Internal error, best we can do is shut the connection */
881 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
882 "Failed to send IBF, closing connection\n");
883 fail_union_operation (op);
884 ibf_destroy (diff_ibf);
885 return GNUNET_SYSERR;
890 GNUNET_STATISTICS_update (_GSS_statistics,
891 "# of failed union operations (too large)",
894 // XXX: Send the whole set, element-by-element
895 LOG (GNUNET_ERROR_TYPE_ERROR,
896 "set union failed: reached ibf limit\n");
897 fail_union_operation (op);
898 ibf_destroy (diff_ibf);
899 return GNUNET_SYSERR;
903 if (GNUNET_NO == res)
905 struct GNUNET_MQ_Envelope *ev;
907 LOG (GNUNET_ERROR_TYPE_DEBUG,
908 "transmitted all values, sending DONE\n");
909 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
910 GNUNET_MQ_send (op->mq, ev);
911 /* We now wait until we get a DONE message back
912 * and then wait for our MQ to be flushed and all our
913 * demands be delivered. */
918 send_offers_for_key (op, key);
922 struct GNUNET_MQ_Envelope *ev;
923 struct GNUNET_MessageHeader *msg;
925 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
926 * the effort additional complexity. */
927 ev = GNUNET_MQ_msg_header_extra (msg,
928 sizeof (struct IBF_Key),
929 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
933 sizeof (struct IBF_Key));
934 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "sending element inquiry for IBF key %lx\n",
936 (unsigned long) key.key_val);
937 GNUNET_MQ_send (op->mq, ev);
944 ibf_destroy (diff_ibf);
950 * Handle an IBF message from a remote peer.
952 * Reassemble the IBF from multiple pieces, and
953 * process the whole IBF once possible.
955 * @param cls the union operation
956 * @param mh the header of the message
957 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
958 * #GNUNET_OK otherwise
961 handle_p2p_ibf (void *cls,
962 const struct GNUNET_MessageHeader *mh)
964 struct Operation *op = cls;
965 const struct IBFMessage *msg;
966 unsigned int buckets_in_message;
968 if (ntohs (mh->size) < sizeof (struct IBFMessage))
971 fail_union_operation (op);
972 return GNUNET_SYSERR;
974 msg = (const struct IBFMessage *) mh;
975 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
976 (op->state->phase == PHASE_EXPECT_IBF) )
978 op->state->phase = PHASE_EXPECT_IBF_CONT;
979 GNUNET_assert (NULL == op->state->remote_ibf);
980 LOG (GNUNET_ERROR_TYPE_DEBUG,
981 "Creating new ibf of size %u\n",
983 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
984 if (NULL == op->state->remote_ibf)
986 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
987 "Failed to parse remote IBF, closing connection\n");
988 fail_union_operation (op);
989 return GNUNET_SYSERR;
991 op->state->ibf_buckets_received = 0;
992 if (0 != ntohs (msg->offset))
995 fail_union_operation (op);
996 return GNUNET_SYSERR;
999 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1001 if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
1002 (1<<msg->order != op->state->remote_ibf->size) )
1004 GNUNET_break_op (0);
1005 fail_union_operation (op);
1006 return GNUNET_SYSERR;
1014 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1016 if (0 == buckets_in_message)
1018 GNUNET_break_op (0);
1019 fail_union_operation (op);
1020 return GNUNET_SYSERR;
1023 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1025 GNUNET_break_op (0);
1026 fail_union_operation (op);
1027 return GNUNET_SYSERR;
1030 GNUNET_assert (NULL != op->state->remote_ibf);
1032 ibf_read_slice (&msg[1],
1033 op->state->ibf_buckets_received,
1035 op->state->remote_ibf);
1036 op->state->ibf_buckets_received += buckets_in_message;
1038 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1040 LOG (GNUNET_ERROR_TYPE_DEBUG,
1041 "received full ibf\n");
1042 op->state->phase = PHASE_INVENTORY_ACTIVE;
1044 decode_and_send (op))
1046 /* Internal error, best we can do is shut down */
1047 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1048 "Failed to decode IBF, closing connection\n");
1049 return GNUNET_SYSERR;
1057 * Send a result message to the client indicating
1058 * that there is a new element.
1060 * @param op union operation
1061 * @param element element to send
1062 * @param status status to send with the new element
1065 send_client_element (struct Operation *op,
1066 struct GNUNET_SET_Element *element,
1069 struct GNUNET_MQ_Envelope *ev;
1070 struct GNUNET_SET_ResultMessage *rm;
1072 LOG (GNUNET_ERROR_TYPE_DEBUG,
1073 "sending element (size %u) to client\n",
1075 GNUNET_assert (0 != op->spec->client_request_id);
1076 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1079 GNUNET_MQ_discard (ev);
1083 rm->result_status = htons (status);
1084 rm->request_id = htonl (op->spec->client_request_id);
1085 rm->element_type = element->element_type;
1086 memcpy (&rm[1], element->data, element->size);
1087 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1092 * Signal to the client that the operation has finished and
1093 * destroy the operation.
1095 * @param cls operation to destroy
1098 send_done_and_destroy (void *cls)
1100 struct Operation *op = cls;
1101 struct GNUNET_MQ_Envelope *ev;
1102 struct GNUNET_SET_ResultMessage *rm;
1104 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1105 rm->request_id = htonl (op->spec->client_request_id);
1106 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1107 rm->element_type = htons (0);
1108 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1109 /* Will also call the union-specific cancel function. */
1110 _GSS_operation_destroy (op, GNUNET_YES);
1115 maybe_finish (struct Operation *op)
1117 unsigned int num_demanded;
1119 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1121 if (PHASE_FINISH_WAITING == op->state->phase)
1123 LOG (GNUNET_ERROR_TYPE_DEBUG,
1124 "In PHASE_FINISH_WAITING, pending %u demands\n",
1126 if (0 == num_demanded)
1128 struct GNUNET_MQ_Envelope *ev;
1130 op->state->phase = PHASE_DONE;
1131 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1132 GNUNET_MQ_send (op->mq, ev);
1134 /* We now wait until the other peer closes the channel
1135 * after it got all elements from us. */
1138 if (PHASE_FINISH_CLOSING == op->state->phase)
1140 LOG (GNUNET_ERROR_TYPE_DEBUG,
1141 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1143 if (0 == num_demanded)
1145 op->state->phase = PHASE_DONE;
1146 send_done_and_destroy (op);
1153 * Handle an element message from a remote peer.
1155 * @param cls the union operation
1156 * @param mh the message
1159 handle_p2p_elements (void *cls,
1160 const struct GNUNET_MessageHeader *mh)
1162 struct Operation *op = cls;
1163 struct ElementEntry *ee;
1164 const struct GNUNET_SET_ElementMessage *emsg;
1165 uint16_t element_size;
1167 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1169 GNUNET_break_op (0);
1170 fail_union_operation (op);
1173 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1175 GNUNET_break_op (0);
1176 fail_union_operation (op);
1180 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1182 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1183 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1184 memcpy (&ee[1], &emsg[1], element_size);
1185 ee->element.size = element_size;
1186 ee->element.data = &ee[1];
1187 ee->element.element_type = ntohs (emsg->element_type);
1188 ee->remote = GNUNET_YES;
1189 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1192 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1196 /* We got something we didn't demand, since it's not in our map. */
1197 GNUNET_break_op (0);
1199 fail_union_operation (op);
1203 LOG (GNUNET_ERROR_TYPE_DEBUG,
1204 "Got element (size %u, hash %s) from peer\n",
1205 (unsigned int) element_size,
1206 GNUNET_h2s (&ee->element_hash));
1208 GNUNET_STATISTICS_update (_GSS_statistics,
1209 "# received elements",
1213 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1215 /* Got repeated element. Should not happen since
1216 * we track demands. */
1217 GNUNET_STATISTICS_update (_GSS_statistics,
1218 "# repeated elements",
1225 LOG (GNUNET_ERROR_TYPE_DEBUG,
1226 "Registering new element from remote peer\n");
1227 op_register_element (op, ee);
1228 /* only send results immediately if the client wants it */
1229 switch (op->spec->result_mode)
1231 case GNUNET_SET_RESULT_ADDED:
1232 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1234 case GNUNET_SET_RESULT_SYMMETRIC:
1235 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1238 /* Result mode not supported, should have been caught earlier. */
1249 * Send offers (for GNUNET_Hash-es) in response
1250 * to inquiries (for IBF_Key-s).
1252 * @param cls the union operation
1253 * @param mh the message
1256 handle_p2p_inquiry (void *cls,
1257 const struct GNUNET_MessageHeader *mh)
1259 struct Operation *op = cls;
1260 const struct IBF_Key *ibf_key;
1261 unsigned int num_keys;
1263 /* look up elements and send them */
1264 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1266 GNUNET_break_op (0);
1267 fail_union_operation (op);
1270 num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1271 / sizeof (struct IBF_Key);
1272 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1273 != num_keys * sizeof (struct IBF_Key))
1275 GNUNET_break_op (0);
1276 fail_union_operation (op);
1280 ibf_key = (const struct IBF_Key *) &mh[1];
1281 while (0 != num_keys--)
1283 send_offers_for_key (op, *ibf_key);
1293 handle_p2p_demand (void *cls,
1294 const struct GNUNET_MessageHeader *mh)
1296 struct Operation *op = cls;
1297 struct ElementEntry *ee;
1298 struct GNUNET_SET_ElementMessage *emsg;
1299 const struct GNUNET_HashCode *hash;
1300 unsigned int num_hashes;
1301 struct GNUNET_MQ_Envelope *ev;
1303 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1304 / sizeof (struct GNUNET_HashCode);
1305 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1306 != num_hashes * sizeof (struct GNUNET_HashCode))
1308 GNUNET_break_op (0);
1309 fail_union_operation (op);
1313 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1315 hash++, num_hashes--)
1317 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1320 /* Demand for non-existing element. */
1321 GNUNET_break_op (0);
1322 fail_union_operation (op);
1325 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1327 /* Probably confused lazily copied sets. */
1328 GNUNET_break_op (0);
1329 fail_union_operation (op);
1332 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1333 memcpy (&emsg[1], ee->element.data, ee->element.size);
1334 emsg->reserved = htons (0);
1335 emsg->element_type = htons (ee->element.element_type);
1336 LOG (GNUNET_ERROR_TYPE_DEBUG,
1337 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1339 (unsigned int) ee->element.size,
1340 GNUNET_h2s (&ee->element_hash));
1341 GNUNET_MQ_send (op->mq, ev);
1343 switch (op->spec->result_mode)
1345 case GNUNET_SET_RESULT_ADDED:
1346 /* Nothing to do. */
1348 case GNUNET_SET_RESULT_SYMMETRIC:
1349 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1352 /* Result mode not supported, should have been caught earlier. */
1361 * Handle offers (of GNUNET_HashCode-s) and
1362 * respond with demands (of GNUNET_HashCode-s).
1364 * @param cls the union operation
1365 * @param mh the message
1368 handle_p2p_offer (void *cls,
1369 const struct GNUNET_MessageHeader *mh)
1371 struct Operation *op = cls;
1372 const struct GNUNET_HashCode *hash;
1373 unsigned int num_hashes;
1375 /* look up elements and send them */
1376 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1377 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1379 GNUNET_break_op (0);
1380 fail_union_operation (op);
1383 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1384 / sizeof (struct GNUNET_HashCode);
1385 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1386 != num_hashes * sizeof (struct GNUNET_HashCode))
1388 GNUNET_break_op (0);
1389 fail_union_operation (op);
1393 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1395 hash++, num_hashes--)
1397 struct ElementEntry *ee;
1398 struct GNUNET_MessageHeader *demands;
1399 struct GNUNET_MQ_Envelope *ev;
1401 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1404 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1408 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1411 LOG (GNUNET_ERROR_TYPE_DEBUG,
1412 "Skipped sending duplicate demand\n");
1416 GNUNET_assert (GNUNET_OK ==
1417 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1420 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1422 LOG (GNUNET_ERROR_TYPE_DEBUG,
1423 "[OP %x] Requesting element (hash %s)\n",
1424 (void *) op, GNUNET_h2s (hash));
1425 ev = GNUNET_MQ_msg_header_extra (demands,
1426 sizeof (struct GNUNET_HashCode),
1427 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1428 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1429 GNUNET_MQ_send (op->mq, ev);
1435 * Handle a done message from a remote peer
1437 * @param cls the union operation
1438 * @param mh the message
1441 handle_p2p_done (void *cls,
1442 const struct GNUNET_MessageHeader *mh)
1444 struct Operation *op = cls;
1446 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1448 /* We got all requests, but still have to send our elements in response. */
1450 op->state->phase = PHASE_FINISH_WAITING;
1452 LOG (GNUNET_ERROR_TYPE_DEBUG,
1453 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1454 /* The active peer is done sending offers
1455 * and inquiries. This means that all
1456 * our responses to that (demands and offers)
1457 * must be in flight (queued or in mesh).
1459 * We should notify the active peer once
1460 * all our demands are satisfied, so that the active
1461 * peer can quit if we gave him everything.
1466 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1468 LOG (GNUNET_ERROR_TYPE_DEBUG,
1469 "got DONE (as active partner), waiting to finish\n");
1470 /* All demands of the other peer are satisfied,
1471 * and we processed all offers, thus we know
1472 * exactly what our demands must be.
1474 * We'll close the channel
1475 * to the other peer once our demands are met.
1477 op->state->phase = PHASE_FINISH_CLOSING;
1481 GNUNET_break_op (0);
1482 fail_union_operation (op);
1487 * Initiate operation to evaluate a set union with a remote peer.
1489 * @param op operation to perform (to be initialized)
1490 * @param opaque_context message to be transmitted to the listener
1491 * to convince him to accept, may be NULL
1494 union_evaluate (struct Operation *op,
1495 const struct GNUNET_MessageHeader *opaque_context)
1497 struct GNUNET_MQ_Envelope *ev;
1498 struct OperationRequestMessage *msg;
1500 GNUNET_assert (NULL == op->state);
1501 op->state = GNUNET_new (struct OperationState);
1502 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1503 /* copy the current generation's strata estimator for this operation */
1504 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1505 /* we started the operation, thus we have to send the operation request */
1506 op->state->phase = PHASE_EXPECT_SE;
1507 LOG (GNUNET_ERROR_TYPE_DEBUG,
1508 "Initiating union operation evaluation\n");
1509 GNUNET_STATISTICS_update (_GSS_statistics,
1510 "# of total union operations",
1513 GNUNET_STATISTICS_update (_GSS_statistics,
1514 "# of initiated union operations",
1517 ev = GNUNET_MQ_msg_nested_mh (msg,
1518 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1522 /* the context message is too large */
1524 GNUNET_SERVER_client_disconnect (op->spec->set->client);
1527 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1528 msg->app_id = op->spec->app_id;
1529 GNUNET_MQ_send (op->mq,
1532 if (NULL != opaque_context)
1533 LOG (GNUNET_ERROR_TYPE_DEBUG,
1534 "sent op request with context message\n");
1536 LOG (GNUNET_ERROR_TYPE_DEBUG,
1537 "sent op request without context message\n");
1542 * Accept an union operation request from a remote peer.
1543 * Only initializes the private operation state.
1545 * @param op operation that will be accepted as a union operation
1548 union_accept (struct Operation *op)
1550 LOG (GNUNET_ERROR_TYPE_DEBUG,
1551 "accepting set union operation\n");
1552 GNUNET_assert (NULL == op->state);
1554 GNUNET_STATISTICS_update (_GSS_statistics,
1555 "# of accepted union operations",
1558 GNUNET_STATISTICS_update (_GSS_statistics,
1559 "# of total union operations",
1563 op->state = GNUNET_new (struct OperationState);
1564 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1565 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1566 /* kick off the operation */
1567 send_strata_estimator (op);
1572 * Create a new set supporting the union operation
1574 * We maintain one strata estimator per set and then manipulate it over the
1575 * lifetime of the set, as recreating a strata estimator would be expensive.
1577 * @return the newly created set, NULL on error
1579 static struct SetState *
1580 union_set_create (void)
1582 struct SetState *set_state;
1584 LOG (GNUNET_ERROR_TYPE_DEBUG,
1585 "union set created\n");
1586 set_state = GNUNET_new (struct SetState);
1587 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1588 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1589 if (NULL == set_state->se)
1591 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1592 "Failed to allocate strata estimator\n");
1593 GNUNET_free (set_state);
1601 * Add the element from the given element message to the set.
1603 * @param set_state state of the set want to add to
1604 * @param ee the element to add to the set
1607 union_add (struct SetState *set_state, struct ElementEntry *ee)
1609 strata_estimator_insert (set_state->se,
1610 get_ibf_key (&ee->element_hash, 0));
1615 * Remove the element given in the element message from the set.
1616 * Only marks the element as removed, so that older set operations can still exchange it.
1618 * @param set_state state of the set to remove from
1619 * @param ee set element to remove
1622 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1624 strata_estimator_remove (set_state->se,
1625 get_ibf_key (&ee->element_hash, 0));
1630 * Destroy a set that supports the union operation.
1632 * @param set_state the set to destroy
1635 union_set_destroy (struct SetState *set_state)
1637 if (NULL != set_state->se)
1639 strata_estimator_destroy (set_state->se);
1640 set_state->se = NULL;
1642 GNUNET_free (set_state);
1647 * Dispatch messages for a union operation.
1649 * @param op the state of the union evaluate operation
1650 * @param mh the received message
1651 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1652 * #GNUNET_OK otherwise
1655 union_handle_p2p_message (struct Operation *op,
1656 const struct GNUNET_MessageHeader *mh)
1658 //LOG (GNUNET_ERROR_TYPE_DEBUG,
1659 // "received p2p message (t: %u, s: %u)\n",
1660 // ntohs (mh->type),
1661 // ntohs (mh->size));
1662 switch (ntohs (mh->type))
1664 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1665 return handle_p2p_ibf (op, mh);
1666 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1667 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1668 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1669 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1670 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1671 handle_p2p_elements (op, mh);
1673 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1674 handle_p2p_inquiry (op, mh);
1676 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1677 handle_p2p_done (op, mh);
1679 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1680 handle_p2p_offer (op, mh);
1682 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1683 handle_p2p_demand (op, mh);
1686 /* Something wrong with cadet's message handlers? */
1694 * Handler for peer-disconnects, notifies the client
1695 * about the aborted operation in case the op was not concluded.
1697 * @param op the destroyed operation
1700 union_peer_disconnect (struct Operation *op)
1702 if (PHASE_DONE != op->state->phase)
1704 struct GNUNET_MQ_Envelope *ev;
1705 struct GNUNET_SET_ResultMessage *msg;
1707 ev = GNUNET_MQ_msg (msg,
1708 GNUNET_MESSAGE_TYPE_SET_RESULT);
1709 msg->request_id = htonl (op->spec->client_request_id);
1710 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1711 msg->element_type = htons (0);
1712 GNUNET_MQ_send (op->spec->set->client_mq,
1714 LOG (GNUNET_ERROR_TYPE_WARNING,
1715 "other peer disconnected prematurely, phase %u\n",
1717 _GSS_operation_destroy (op,
1721 // else: the session has already been concluded
1722 LOG (GNUNET_ERROR_TYPE_DEBUG,
1723 "other peer disconnected (finished)\n");
1724 if (GNUNET_NO == op->state->client_done_sent)
1725 send_done_and_destroy (op);
1730 * Copy union-specific set state.
1732 * @param set source set for copying the union state
1733 * @return a copy of the union-specific set state
1735 static struct SetState *
1736 union_copy_state (struct Set *set)
1738 struct SetState *new_state;
1740 new_state = GNUNET_new (struct SetState);
1741 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1742 new_state->se = strata_estimator_dup (set->state->se);
1749 * Get the table with implementing functions for
1752 * @return the operation specific VTable
1754 const struct SetVT *
1757 static const struct SetVT union_vt = {
1758 .create = &union_set_create,
1759 .msg_handler = &union_handle_p2p_message,
1761 .remove = &union_remove,
1762 .destroy_set = &union_set_destroy,
1763 .evaluate = &union_evaluate,
1764 .accept = &union_accept,
1765 .peer_disconnect = &union_peer_disconnect,
1766 .cancel = &union_op_cancel,
1767 .copy_state = &union_copy_state,