2 This file is part of GNUnet
3 Copyright (C) 2013-2015 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
22 * @brief two-peer set operations
23 * @author Florian Dold
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
29 #include "gnunet-service-set_union_strata_estimator.h"
30 #include "gnunet-service-set_protocol.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
38 * Number of IBFs in a strata estimator.
40 #define SE_STRATA_COUNT 32
42 * Size of the IBFs in the strata estimator.
44 #define SE_IBF_SIZE 80
46 * The hash num parameter for the difference digests and strata estimators.
48 #define SE_IBF_HASH_NUM 4
51 * Number of buckets that can be transmitted in one message.
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
56 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57 * Choose this value so that computing the IBF is still cheaper
58 * than transmitting all values.
60 #define MAX_IBF_ORDER (16)
63 * Number of buckets used in the ibf per estimated
70 * Current phase we are in for a union operation.
72 enum UnionOperationPhase
75 * We sent the request message, and expect a strata estimator.
80 * We sent the strata estimator, and expect an IBF. This phase is entered once
81 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
83 * XXX: could use better wording.
85 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
90 * Continuation for multi part IBFs.
92 PHASE_EXPECT_IBF_CONT,
95 * We are decoding an IBF.
97 PHASE_INVENTORY_ACTIVE,
100 * The other peer is decoding the IBF we just sent.
102 PHASE_INVENTORY_PASSIVE,
105 * The protocol is almost finished, but we still have to flush our message
106 * queue and/or expect some elements.
108 PHASE_FINISH_CLOSING,
111 * In the penultimate phase,
112 * we wait until all our demands
113 * are satisfied. Then we send a done
114 * message, and wait for another done message.*/
115 PHASE_FINISH_WAITING,
118 * In the ultimate phase, we wait until
119 * our demands are satisfied and then
120 * quit (sending another DONE message). */
126 * State of an evaluate operation with another peer.
128 struct OperationState
131 * Copy of the set's strata estimator at the time of
132 * creation of this operation.
134 struct StrataEstimator *se;
137 * The IBF we currently receive.
139 struct InvertibleBloomFilter *remote_ibf;
142 * The IBF with the local set's element.
144 struct InvertibleBloomFilter *local_ibf;
147 * Maps IBF-Keys (specific to the current salt) to elements.
148 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
149 * Colliding IBF-Keys are linked.
151 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
154 * Current state of the operation.
156 enum UnionOperationPhase phase;
159 * Did we send the client that we are done?
161 int client_done_sent;
164 * Number of ibf buckets already received into the @a remote_ibf.
166 unsigned int ibf_buckets_received;
169 * Hashes for elements that we have demanded from the other peer.
171 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
176 * The key entry is used to associate an ibf key with an element.
181 * IBF key for the entry, derived from the current salt.
183 struct IBF_Key ibf_key;
186 * The actual element associated with the key.
188 * Only owned by the union operation if element->operation
191 struct ElementEntry *element;
196 * Used as a closure for sending elements
197 * with a specific IBF key.
199 struct SendElementClosure
202 * The IBF key whose matching elements should be
205 struct IBF_Key ibf_key;
208 * Operation for which the elements
211 struct Operation *op;
216 * Extra state required for efficient set union.
221 * The strata estimator is only generated once for
223 * The IBF keys are derived from the element hashes with
226 struct StrataEstimator *se;
231 * Iterator over hash map entries, called to
232 * destroy the linked list of colliding ibf key entries.
235 * @param key current key code
236 * @param value value in the hash map
237 * @return #GNUNET_YES if we should continue to iterate,
241 destroy_key_to_element_iter (void *cls,
245 struct KeyEntry *k = value;
247 GNUNET_assert (NULL != k);
248 if (GNUNET_YES == k->element->remote)
250 GNUNET_free (k->element);
259 * Destroy the union operation. Only things specific to the union
260 * operation are destroyed.
262 * @param op union operation to destroy
265 union_op_cancel (struct Operation *op)
267 LOG (GNUNET_ERROR_TYPE_DEBUG,
268 "destroying union op\n");
269 /* check if the op was canceled twice */
270 GNUNET_assert (NULL != op->state);
271 if (NULL != op->state->remote_ibf)
273 ibf_destroy (op->state->remote_ibf);
274 op->state->remote_ibf = NULL;
276 if (NULL != op->state->demanded_hashes)
278 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
279 op->state->demanded_hashes = NULL;
281 if (NULL != op->state->local_ibf)
283 ibf_destroy (op->state->local_ibf);
284 op->state->local_ibf = NULL;
286 if (NULL != op->state->se)
288 strata_estimator_destroy (op->state->se);
289 op->state->se = NULL;
291 if (NULL != op->state->key_to_element)
293 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
294 &destroy_key_to_element_iter,
296 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
297 op->state->key_to_element = NULL;
299 GNUNET_free (op->state);
301 LOG (GNUNET_ERROR_TYPE_DEBUG,
302 "destroying union op done\n");
307 * Inform the client that the union operation has failed,
308 * and proceed to destroy the evaluate operation.
310 * @param op the union operation to fail
313 fail_union_operation (struct Operation *op)
315 struct GNUNET_MQ_Envelope *ev;
316 struct GNUNET_SET_ResultMessage *msg;
318 LOG (GNUNET_ERROR_TYPE_ERROR,
319 "union operation failed\n");
320 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
321 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
322 msg->request_id = htonl (op->spec->client_request_id);
323 msg->element_type = htons (0);
324 GNUNET_MQ_send (op->spec->set->client_mq, ev);
325 _GSS_operation_destroy (op, GNUNET_YES);
330 * Derive the IBF key from a hash code and
333 * @param src the hash code
334 * @param salt salt to use
335 * @return the derived IBF key
337 static struct IBF_Key
338 get_ibf_key (const struct GNUNET_HashCode *src,
343 GNUNET_CRYPTO_kdf (&key, sizeof (key),
345 &salt, sizeof (salt),
352 * Iterator over the mapping from IBF keys to element entries. Checks if we
353 * have an element with a given GNUNET_HashCode.
356 * @param key current key code
357 * @param value value in the hash map
358 * @return #GNUNET_YES if we should search further,
359 * #GNUNET_NO if we've found the element.
362 op_has_element_iterator (void *cls,
366 struct GNUNET_HashCode *element_hash = cls;
367 struct KeyEntry *k = value;
369 GNUNET_assert (NULL != k);
370 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
378 * Determine whether the given element is already in the operation's element
381 * @param op operation that should be tested for 'element_hash'
382 * @param element_hash hash of the element to look for
383 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
386 op_has_element (struct Operation *op,
387 const struct GNUNET_HashCode *element_hash)
390 struct IBF_Key ibf_key;
392 ibf_key = get_ibf_key (element_hash, op->spec->salt);
393 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
394 (uint32_t) ibf_key.key_val,
395 op_has_element_iterator,
396 (void *) element_hash);
398 /* was the iteration aborted because we found the element? */
399 if (GNUNET_SYSERR == ret)
406 * Insert an element into the union operation's
407 * key-to-element mapping. Takes ownership of 'ee'.
408 * Note that this does not insert the element in the set,
409 * only in the operation's key-element mapping.
410 * This is done to speed up re-tried operations, if some elements
411 * were transmitted, and then the IBF fails to decode.
413 * XXX: clarify ownership, doesn't sound right.
415 * @param op the union operation
416 * @param ee the element entry
419 op_register_element (struct Operation *op,
420 struct ElementEntry *ee)
422 struct IBF_Key ibf_key;
425 ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
426 k = GNUNET_new (struct KeyEntry);
428 k->ibf_key = ibf_key;
429 GNUNET_assert (GNUNET_OK ==
430 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
431 (uint32_t) ibf_key.key_val,
433 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
438 * Insert a key into an ibf.
442 * @param value the key entry to get the key from
445 prepare_ibf_iterator (void *cls,
449 struct Operation *op = cls;
450 struct KeyEntry *ke = value;
452 LOG (GNUNET_ERROR_TYPE_DEBUG,
453 "[OP %x] inserting %lx (hash %s) into ibf\n",
455 (unsigned long) ke->ibf_key.key_val,
456 GNUNET_h2s (&ke->element->element_hash));
457 ibf_insert (op->state->local_ibf, ke->ibf_key);
463 * Iterator for initializing the
464 * key-to-element mapping of a union operation
466 * @param cls the union operation `struct Operation *`
468 * @param value the `struct ElementEntry *` to insert
469 * into the key-to-element mapping
470 * @return #GNUNET_YES (to continue iterating)
473 init_key_to_element_iterator (void *cls,
474 const struct GNUNET_HashCode *key,
477 struct Operation *op = cls;
478 struct ElementEntry *ee = value;
480 /* make sure that the element belongs to the set at the time
481 * of creating the operation */
482 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
485 GNUNET_assert (GNUNET_NO == ee->remote);
487 op_register_element (op, ee);
493 * Create an ibf with the operation's elements
494 * of the specified size
496 * @param op the union operation
497 * @param size size of the ibf to create
500 prepare_ibf (struct Operation *op,
503 if (NULL == op->state->key_to_element)
507 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
508 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
509 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
510 init_key_to_element_iterator, op);
512 if (NULL != op->state->local_ibf)
513 ibf_destroy (op->state->local_ibf);
514 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
515 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
516 &prepare_ibf_iterator,
522 * Send an ibf of appropriate size.
524 * Fragments the IBF into multiple messages if necessary.
526 * @param op the union operation
527 * @param ibf_order order of the ibf to send, size=2^order
530 send_ibf (struct Operation *op,
533 unsigned int buckets_sent = 0;
534 struct InvertibleBloomFilter *ibf;
536 prepare_ibf (op, 1<<ibf_order);
538 LOG (GNUNET_ERROR_TYPE_DEBUG,
539 "sending ibf of size %u\n",
542 ibf = op->state->local_ibf;
544 while (buckets_sent < (1 << ibf_order))
546 unsigned int buckets_in_message;
547 struct GNUNET_MQ_Envelope *ev;
548 struct IBFMessage *msg;
550 buckets_in_message = (1 << ibf_order) - buckets_sent;
551 /* limit to maximum */
552 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
553 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
555 ev = GNUNET_MQ_msg_extra (msg,
556 buckets_in_message * IBF_BUCKET_SIZE,
557 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
559 msg->order = ibf_order;
560 msg->offset = htons (buckets_sent);
561 ibf_write_slice (ibf, buckets_sent,
562 buckets_in_message, &msg[1]);
563 buckets_sent += buckets_in_message;
564 LOG (GNUNET_ERROR_TYPE_DEBUG,
565 "ibf chunk size %u, %u/%u sent\n",
569 GNUNET_MQ_send (op->mq, ev);
572 /* The other peer must decode the IBF, so
574 op->state->phase = PHASE_INVENTORY_PASSIVE;
579 * Send a strata estimator to the remote peer.
581 * @param op the union operation with the remote peer
584 send_strata_estimator (struct Operation *op)
586 struct GNUNET_MQ_Envelope *ev;
587 struct GNUNET_MessageHeader *strata_msg;
589 ev = GNUNET_MQ_msg_header_extra (strata_msg,
590 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
591 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
592 strata_estimator_write (op->state->se, &strata_msg[1]);
593 GNUNET_MQ_send (op->mq,
595 op->state->phase = PHASE_EXPECT_IBF;
596 LOG (GNUNET_ERROR_TYPE_DEBUG,
597 "sent SE, expecting IBF\n");
602 * Compute the necessary order of an ibf
603 * from the size of the symmetric set difference.
605 * @param diff the difference
606 * @return the required size of the ibf
609 get_order_from_difference (unsigned int diff)
611 unsigned int ibf_order;
614 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
615 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
617 if (ibf_order > MAX_IBF_ORDER)
618 ibf_order = MAX_IBF_ORDER;
624 * Handle a strata estimator from a remote peer
626 * @param cls the union operation
627 * @param mh the message
628 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
629 * #GNUNET_OK otherwise
632 handle_p2p_strata_estimator (void *cls,
633 const struct GNUNET_MessageHeader *mh)
635 struct Operation *op = cls;
636 struct StrataEstimator *remote_se;
639 if (op->state->phase != PHASE_EXPECT_SE)
641 fail_union_operation (op);
643 return GNUNET_SYSERR;
645 if (ntohs (mh->size) !=
646 SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE +
647 sizeof (struct GNUNET_MessageHeader))
649 fail_union_operation (op);
651 return GNUNET_SYSERR;
653 remote_se = strata_estimator_create (SE_STRATA_COUNT,
656 strata_estimator_read (&mh[1], remote_se);
657 GNUNET_assert (NULL != op->state->se);
658 diff = strata_estimator_difference (remote_se,
660 strata_estimator_destroy (remote_se);
661 strata_estimator_destroy (op->state->se);
662 op->state->se = NULL;
663 LOG (GNUNET_ERROR_TYPE_DEBUG,
664 "got se diff=%d, using ibf size %d\n",
666 1<<get_order_from_difference (diff));
668 get_order_from_difference (diff));
674 * Iterator to send elements to a remote peer
676 * @param cls closure with the element key and the union operation
678 * @param value the key entry
681 send_offers_iterator (void *cls,
685 struct SendElementClosure *sec = cls;
686 struct Operation *op = sec->op;
687 struct KeyEntry *ke = value;
688 struct GNUNET_MQ_Envelope *ev;
689 struct GNUNET_MessageHeader *mh;
691 /* Detect 32-bit key collision for the 64-bit IBF keys. */
692 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
695 ev = GNUNET_MQ_msg_header_extra (mh,
696 sizeof (struct GNUNET_HashCode),
697 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
699 GNUNET_assert (NULL != ev);
700 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
701 LOG (GNUNET_ERROR_TYPE_DEBUG,
702 "[OP %x] sending element offer (%s) to peer\n",
704 GNUNET_h2s (&ke->element->element_hash));
705 GNUNET_MQ_send (op->mq, ev);
711 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
713 * @param op union operation
714 * @param ibf_key IBF key of interest
717 send_offers_for_key (struct Operation *op,
718 struct IBF_Key ibf_key)
720 struct SendElementClosure send_cls;
722 send_cls.ibf_key = ibf_key;
724 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
725 (uint32_t) ibf_key.key_val,
726 &send_offers_iterator,
732 * Decode which elements are missing on each side, and
733 * send the appropriate offers and inquiries.
735 * @param op union operation
738 decode_and_send (struct Operation *op)
741 struct IBF_Key last_key;
743 unsigned int num_decoded;
744 struct InvertibleBloomFilter *diff_ibf;
746 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
748 prepare_ibf (op, op->state->remote_ibf->size);
749 diff_ibf = ibf_dup (op->state->local_ibf);
750 ibf_subtract (diff_ibf, op->state->remote_ibf);
752 ibf_destroy (op->state->remote_ibf);
753 op->state->remote_ibf = NULL;
755 LOG (GNUNET_ERROR_TYPE_DEBUG,
756 "decoding IBF (size=%u)\n",
760 last_key.key_val = 0;
765 int cycle_detected = GNUNET_NO;
769 res = ibf_decode (diff_ibf, &side, &key);
770 if (res == GNUNET_OK)
772 LOG (GNUNET_ERROR_TYPE_DEBUG,
773 "decoded ibf key %lx\n",
774 (unsigned long) key.key_val);
776 if ( (num_decoded > diff_ibf->size) ||
777 (num_decoded > 1 && last_key.key_val == key.key_val) )
779 LOG (GNUNET_ERROR_TYPE_DEBUG,
780 "detected cyclic ibf (decoded %u/%u)\n",
783 cycle_detected = GNUNET_YES;
786 if ( (GNUNET_SYSERR == res) ||
787 (GNUNET_YES == cycle_detected) )
791 while (1<<next_order < diff_ibf->size)
794 if (next_order <= MAX_IBF_ORDER)
796 LOG (GNUNET_ERROR_TYPE_DEBUG,
797 "decoding failed, sending larger ibf (size %u)\n",
799 send_ibf (op, next_order);
803 // XXX: Send the whole set, element-by-element
804 LOG (GNUNET_ERROR_TYPE_ERROR,
805 "set union failed: reached ibf limit\n");
809 if (GNUNET_NO == res)
811 struct GNUNET_MQ_Envelope *ev;
813 LOG (GNUNET_ERROR_TYPE_DEBUG,
814 "transmitted all values, sending DONE\n");
815 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
816 GNUNET_MQ_send (op->mq, ev);
817 /* We now wait until we get a DONE message back
818 * and then wait for our MQ to be flushed and all our
819 * demands be delivered. */
824 send_offers_for_key (op, key);
828 struct GNUNET_MQ_Envelope *ev;
829 struct GNUNET_MessageHeader *msg;
831 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
832 * the effort additional complexity. */
833 ev = GNUNET_MQ_msg_header_extra (msg,
834 sizeof (struct IBF_Key),
835 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
839 sizeof (struct IBF_Key));
840 LOG (GNUNET_ERROR_TYPE_DEBUG,
841 "sending element inquiry for IBF key %lx\n",
842 (unsigned long) key.key_val);
843 GNUNET_MQ_send (op->mq, ev);
850 ibf_destroy (diff_ibf);
855 * Handle an IBF message from a remote peer.
857 * Reassemble the IBF from multiple pieces, and
858 * process the whole IBF once possible.
860 * @param cls the union operation
861 * @param mh the header of the message
862 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
863 * #GNUNET_OK otherwise
866 handle_p2p_ibf (void *cls,
867 const struct GNUNET_MessageHeader *mh)
869 struct Operation *op = cls;
870 const struct IBFMessage *msg;
871 unsigned int buckets_in_message;
873 if (ntohs (mh->size) < sizeof (struct IBFMessage))
876 fail_union_operation (op);
877 return GNUNET_SYSERR;
879 msg = (const struct IBFMessage *) mh;
880 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
881 (op->state->phase == PHASE_EXPECT_IBF) )
883 op->state->phase = PHASE_EXPECT_IBF_CONT;
884 GNUNET_assert (NULL == op->state->remote_ibf);
885 LOG (GNUNET_ERROR_TYPE_DEBUG,
886 "Creating new ibf of size %u\n",
888 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
889 op->state->ibf_buckets_received = 0;
890 if (0 != ntohs (msg->offset))
893 fail_union_operation (op);
894 return GNUNET_SYSERR;
897 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
899 if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
900 (1<<msg->order != op->state->remote_ibf->size) )
903 fail_union_operation (op);
904 return GNUNET_SYSERR;
912 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
914 if (0 == buckets_in_message)
917 fail_union_operation (op);
918 return GNUNET_SYSERR;
921 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
924 fail_union_operation (op);
925 return GNUNET_SYSERR;
928 GNUNET_assert (NULL != op->state->remote_ibf);
930 ibf_read_slice (&msg[1],
931 op->state->ibf_buckets_received,
933 op->state->remote_ibf);
934 op->state->ibf_buckets_received += buckets_in_message;
936 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
938 LOG (GNUNET_ERROR_TYPE_DEBUG,
939 "received full ibf\n");
940 op->state->phase = PHASE_INVENTORY_ACTIVE;
941 decode_and_send (op);
948 * Send a result message to the client indicating
949 * that there is a new element.
951 * @param op union operation
952 * @param element element to send
953 * @param status status to send with the new element
956 send_client_element (struct Operation *op,
957 struct GNUNET_SET_Element *element,
960 struct GNUNET_MQ_Envelope *ev;
961 struct GNUNET_SET_ResultMessage *rm;
963 LOG (GNUNET_ERROR_TYPE_DEBUG,
964 "sending element (size %u) to client\n",
966 GNUNET_assert (0 != op->spec->client_request_id);
967 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
970 GNUNET_MQ_discard (ev);
974 rm->result_status = htons (status);
975 rm->request_id = htonl (op->spec->client_request_id);
976 rm->element_type = element->element_type;
977 memcpy (&rm[1], element->data, element->size);
978 GNUNET_MQ_send (op->spec->set->client_mq, ev);
983 * Signal to the client that the operation has finished and
984 * destroy the operation.
986 * @param cls operation to destroy
989 send_done_and_destroy (void *cls)
991 struct Operation *op = cls;
992 struct GNUNET_MQ_Envelope *ev;
993 struct GNUNET_SET_ResultMessage *rm;
995 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
996 rm->request_id = htonl (op->spec->client_request_id);
997 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
998 rm->element_type = htons (0);
999 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1000 /* Will also call the union-specific cancel function. */
1001 _GSS_operation_destroy (op, GNUNET_YES);
1006 maybe_finish (struct Operation *op)
1008 unsigned int num_demanded;
1010 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1012 if (PHASE_FINISH_WAITING == op->state->phase)
1014 LOG (GNUNET_ERROR_TYPE_DEBUG,
1015 "In PHASE_FINISH_WAITING, pending %u demands\n",
1017 if (0 == num_demanded)
1019 struct GNUNET_MQ_Envelope *ev;
1021 op->state->phase = PHASE_DONE;
1022 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1023 GNUNET_MQ_send (op->mq, ev);
1025 /* We now wait until the other peer closes the channel
1026 * after it got all elements from us. */
1029 if (PHASE_FINISH_CLOSING == op->state->phase)
1031 LOG (GNUNET_ERROR_TYPE_DEBUG,
1032 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1034 if (0 == num_demanded)
1036 op->state->phase = PHASE_DONE;
1037 send_done_and_destroy (op);
1044 * Handle an element message from a remote peer.
1046 * @param cls the union operation
1047 * @param mh the message
1050 handle_p2p_elements (void *cls,
1051 const struct GNUNET_MessageHeader *mh)
1053 struct Operation *op = cls;
1054 struct ElementEntry *ee;
1055 const struct GNUNET_SET_ElementMessage *emsg;
1056 uint16_t element_size;
1058 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1060 GNUNET_break_op (0);
1061 fail_union_operation (op);
1065 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1067 GNUNET_break_op (0);
1068 fail_union_operation (op);
1072 emsg = (struct GNUNET_SET_ElementMessage *) mh;
1074 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1075 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1076 memcpy (&ee[1], &emsg[1], element_size);
1077 ee->element.size = element_size;
1078 ee->element.data = &ee[1];
1079 ee->element.element_type = ntohs (emsg->element_type);
1080 ee->remote = GNUNET_YES;
1081 GNUNET_CRYPTO_hash (ee->element.data,
1085 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, &ee->element_hash, NULL))
1087 /* We got something we didn't demand, since it's not in our map. */
1088 GNUNET_break_op (0);
1090 fail_union_operation (op);
1094 LOG (GNUNET_ERROR_TYPE_DEBUG,
1095 "Got element (size %u, hash %s) from peer\n",
1096 (unsigned int) element_size,
1097 GNUNET_h2s (&ee->element_hash));
1099 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1101 /* Got repeated element. Should not happen since
1102 * we track demands. */
1108 LOG (GNUNET_ERROR_TYPE_DEBUG,
1109 "Registering new element from remote peer\n");
1110 op_register_element (op, ee);
1111 /* only send results immediately if the client wants it */
1112 switch (op->spec->result_mode)
1114 case GNUNET_SET_RESULT_ADDED:
1115 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1117 case GNUNET_SET_RESULT_SYMMETRIC:
1118 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1121 /* Result mode not supported, should have been caught earlier. */
1132 * Send offers (for GNUNET_Hash-es) in response
1133 * to inquiries (for IBF_Key-s).
1135 * @param cls the union operation
1136 * @param mh the message
1139 handle_p2p_inquiry (void *cls,
1140 const struct GNUNET_MessageHeader *mh)
1142 struct Operation *op = cls;
1143 const struct IBF_Key *ibf_key;
1144 unsigned int num_keys;
1146 /* look up elements and send them */
1147 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1149 GNUNET_break_op (0);
1150 fail_union_operation (op);
1153 num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1154 / sizeof (struct IBF_Key);
1155 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1156 != num_keys * sizeof (struct IBF_Key))
1158 GNUNET_break_op (0);
1159 fail_union_operation (op);
1163 ibf_key = (const struct IBF_Key *) &mh[1];
1164 while (0 != num_keys--)
1166 send_offers_for_key (op, *ibf_key);
1174 handle_p2p_demand (void *cls,
1175 const struct GNUNET_MessageHeader *mh)
1177 struct Operation *op = cls;
1178 struct ElementEntry *ee;
1179 struct GNUNET_SET_ElementMessage *emsg;
1180 const struct GNUNET_HashCode *hash;
1181 unsigned int num_hashes;
1182 struct GNUNET_MQ_Envelope *ev;
1184 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1185 / sizeof (struct GNUNET_HashCode);
1186 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1187 != num_hashes * sizeof (struct GNUNET_HashCode))
1189 GNUNET_break_op (0);
1190 fail_union_operation (op);
1194 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1196 hash++, num_hashes--)
1198 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1201 /* Demand for non-existing element. */
1202 GNUNET_break_op (0);
1203 fail_union_operation (op);
1206 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1208 /* Probably confused lazily copied sets. */
1209 GNUNET_break_op (0);
1210 fail_union_operation (op);
1213 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1214 memcpy (&emsg[1], ee->element.data, ee->element.size);
1215 emsg->reserved = htons (0);
1216 emsg->element_type = htons (ee->element.element_type);
1217 LOG (GNUNET_ERROR_TYPE_DEBUG,
1218 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1220 (unsigned int) ee->element.size,
1221 GNUNET_h2s (&ee->element_hash));
1222 GNUNET_MQ_send (op->mq, ev);
1224 switch (op->spec->result_mode)
1226 case GNUNET_SET_RESULT_ADDED:
1227 /* Nothing to do. */
1229 case GNUNET_SET_RESULT_SYMMETRIC:
1230 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1233 /* Result mode not supported, should have been caught earlier. */
1242 * Handle offers (of GNUNET_HashCode-s) and
1243 * respond with demands (of GNUNET_HashCode-s).
1245 * @param cls the union operation
1246 * @param mh the message
1249 handle_p2p_offer (void *cls,
1250 const struct GNUNET_MessageHeader *mh)
1252 struct Operation *op = cls;
1253 const struct GNUNET_HashCode *hash;
1254 unsigned int num_hashes;
1256 /* look up elements and send them */
1257 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1258 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1260 GNUNET_break_op (0);
1261 fail_union_operation (op);
1264 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1265 / sizeof (struct GNUNET_HashCode);
1266 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1267 != num_hashes * sizeof (struct GNUNET_HashCode))
1269 GNUNET_break_op (0);
1270 fail_union_operation (op);
1274 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1276 hash++, num_hashes--)
1278 struct ElementEntry *ee;
1279 struct GNUNET_MessageHeader *demands;
1280 struct GNUNET_MQ_Envelope *ev;
1281 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1283 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1286 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, hash))
1288 LOG (GNUNET_ERROR_TYPE_DEBUG,
1289 "Skipped sending duplicate demand\n");
1293 GNUNET_assert (GNUNET_OK ==
1294 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1297 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1299 LOG (GNUNET_ERROR_TYPE_DEBUG,
1300 "[OP %x] Requesting element (hash %s)\n",
1301 (void *) op, GNUNET_h2s (hash));
1302 ev = GNUNET_MQ_msg_header_extra (demands, sizeof (struct GNUNET_HashCode), GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1303 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1304 GNUNET_MQ_send (op->mq, ev);
1310 * Handle a done message from a remote peer
1312 * @param cls the union operation
1313 * @param mh the message
1316 handle_p2p_done (void *cls,
1317 const struct GNUNET_MessageHeader *mh)
1319 struct Operation *op = cls;
1321 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1323 /* We got all requests, but still have to send our elements in response. */
1325 op->state->phase = PHASE_FINISH_WAITING;
1327 LOG (GNUNET_ERROR_TYPE_DEBUG,
1328 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1329 /* The active peer is done sending offers
1330 * and inquiries. This means that all
1331 * our responses to that (demands and offers)
1332 * must be in flight (queued or in mesh).
1334 * We should notify the active peer once
1335 * all our demands are satisfied, so that the active
1336 * peer can quit if we gave him everything.
1341 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1343 LOG (GNUNET_ERROR_TYPE_DEBUG,
1344 "got DONE (as active partner), waiting to finish\n");
1345 /* All demands of the other peer are satisfied,
1346 * and we processed all offers, thus we know
1347 * exactly what our demands must be.
1349 * We'll close the channel
1350 * to the other peer once our demands are met.
1352 op->state->phase = PHASE_FINISH_CLOSING;
1356 GNUNET_break_op (0);
1357 fail_union_operation (op);
1362 * Initiate operation to evaluate a set union with a remote peer.
1364 * @param op operation to perform (to be initialized)
1365 * @param opaque_context message to be transmitted to the listener
1366 * to convince him to accept, may be NULL
1369 union_evaluate (struct Operation *op,
1370 const struct GNUNET_MessageHeader *opaque_context)
1372 struct GNUNET_MQ_Envelope *ev;
1373 struct OperationRequestMessage *msg;
1375 GNUNET_assert (NULL == op->state);
1376 op->state = GNUNET_new (struct OperationState);
1377 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1378 /* copy the current generation's strata estimator for this operation */
1379 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1380 /* we started the operation, thus we have to send the operation request */
1381 op->state->phase = PHASE_EXPECT_SE;
1382 LOG (GNUNET_ERROR_TYPE_DEBUG,
1383 "Initiating union operation evaluation\n");
1384 ev = GNUNET_MQ_msg_nested_mh (msg,
1385 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1389 /* the context message is too large */
1391 GNUNET_SERVER_client_disconnect (op->spec->set->client);
1394 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1395 msg->app_id = op->spec->app_id;
1396 GNUNET_MQ_send (op->mq,
1399 if (NULL != opaque_context)
1400 LOG (GNUNET_ERROR_TYPE_DEBUG,
1401 "sent op request with context message\n");
1403 LOG (GNUNET_ERROR_TYPE_DEBUG,
1404 "sent op request without context message\n");
1409 * Accept an union operation request from a remote peer.
1410 * Only initializes the private operation state.
1412 * @param op operation that will be accepted as a union operation
1415 union_accept (struct Operation *op)
1417 LOG (GNUNET_ERROR_TYPE_DEBUG,
1418 "accepting set union operation\n");
1419 GNUNET_assert (NULL == op->state);
1420 op->state = GNUNET_new (struct OperationState);
1421 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1422 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1423 /* kick off the operation */
1424 send_strata_estimator (op);
1429 * Create a new set supporting the union operation
1431 * We maintain one strata estimator per set and then manipulate it over the
1432 * lifetime of the set, as recreating a strata estimator would be expensive.
1434 * @return the newly created set
1436 static struct SetState *
1437 union_set_create (void)
1439 struct SetState *set_state;
1441 LOG (GNUNET_ERROR_TYPE_DEBUG,
1442 "union set created\n");
1443 set_state = GNUNET_new (struct SetState);
1444 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1445 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1451 * Add the element from the given element message to the set.
1453 * @param set_state state of the set want to add to
1454 * @param ee the element to add to the set
1457 union_add (struct SetState *set_state, struct ElementEntry *ee)
1459 strata_estimator_insert (set_state->se,
1460 get_ibf_key (&ee->element_hash, 0));
1465 * Remove the element given in the element message from the set.
1466 * Only marks the element as removed, so that older set operations can still exchange it.
1468 * @param set_state state of the set to remove from
1469 * @param ee set element to remove
1472 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1474 strata_estimator_remove (set_state->se,
1475 get_ibf_key (&ee->element_hash, 0));
1480 * Destroy a set that supports the union operation.
1482 * @param set_state the set to destroy
1485 union_set_destroy (struct SetState *set_state)
1487 if (NULL != set_state->se)
1489 strata_estimator_destroy (set_state->se);
1490 set_state->se = NULL;
1492 GNUNET_free (set_state);
1497 * Dispatch messages for a union operation.
1499 * @param op the state of the union evaluate operation
1500 * @param mh the received message
1501 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1502 * #GNUNET_OK otherwise
1505 union_handle_p2p_message (struct Operation *op,
1506 const struct GNUNET_MessageHeader *mh)
1508 //LOG (GNUNET_ERROR_TYPE_DEBUG,
1509 // "received p2p message (t: %u, s: %u)\n",
1510 // ntohs (mh->type),
1511 // ntohs (mh->size));
1512 switch (ntohs (mh->type))
1514 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1515 return handle_p2p_ibf (op, mh);
1516 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1517 return handle_p2p_strata_estimator (op, mh);
1518 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1519 handle_p2p_elements (op, mh);
1521 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1522 handle_p2p_inquiry (op, mh);
1524 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1525 handle_p2p_done (op, mh);
1527 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1528 handle_p2p_offer (op, mh);
1530 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1531 handle_p2p_demand (op, mh);
1534 /* Something wrong with cadet's message handlers? */
1542 * Handler for peer-disconnects, notifies the client
1543 * about the aborted operation in case the op was not concluded.
1545 * @param op the destroyed operation
1548 union_peer_disconnect (struct Operation *op)
1550 if (PHASE_DONE != op->state->phase)
1552 struct GNUNET_MQ_Envelope *ev;
1553 struct GNUNET_SET_ResultMessage *msg;
1555 ev = GNUNET_MQ_msg (msg,
1556 GNUNET_MESSAGE_TYPE_SET_RESULT);
1557 msg->request_id = htonl (op->spec->client_request_id);
1558 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1559 msg->element_type = htons (0);
1560 GNUNET_MQ_send (op->spec->set->client_mq,
1562 LOG (GNUNET_ERROR_TYPE_WARNING,
1563 "other peer disconnected prematurely, phase %u\n",
1565 _GSS_operation_destroy (op,
1569 // else: the session has already been concluded
1570 LOG (GNUNET_ERROR_TYPE_DEBUG,
1571 "other peer disconnected (finished)\n");
1572 if (GNUNET_NO == op->state->client_done_sent)
1573 send_done_and_destroy (op);
1578 * Copy union-specific set state.
1580 * @param set source set for copying the union state
1581 * @return a copy of the union-specific set state
1583 static struct SetState *
1584 union_copy_state (struct Set *set)
1586 struct SetState *new_state;
1588 new_state = GNUNET_new (struct SetState);
1589 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1590 new_state->se = strata_estimator_dup (set->state->se);
1597 * Get the table with implementing functions for
1600 * @return the operation specific VTable
1602 const struct SetVT *
1605 static const struct SetVT union_vt = {
1606 .create = &union_set_create,
1607 .msg_handler = &union_handle_p2p_message,
1609 .remove = &union_remove,
1610 .destroy_set = &union_set_destroy,
1611 .evaluate = &union_evaluate,
1612 .accept = &union_accept,
1613 .peer_disconnect = &union_peer_disconnect,
1614 .cancel = &union_op_cancel,
1615 .copy_state = &union_copy_state,