2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file set/gnunet-service-set_intersection.c
22 * @brief two-peer set intersection
23 * @author Christian Fuchs
24 * @author Christian Grothoff
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
30 #include "gnunet_block_lib.h"
31 #include "gnunet-service-set_protocol.h"
32 #include "gnunet-service-set_intersection.h"
37 * Current phase we are in for a intersection operation.
39 enum IntersectionOperationPhase
42 * We are just starting.
47 * We have send the number of our elements to the other
48 * peer, but did not setup our element set yet.
53 * We have initialized our set and are now reducing it by exchanging
54 * Bloom filters until one party notices the their element hashes
60 * We must next send the P2P DONE message (after finishing mostly
61 * with the local client). Then we will wait for the channel to close.
66 * We have received the P2P DONE message, and must finish with the
67 * local client before terminating the channel.
72 * The protocol is over. Results may still have to be sent to the
81 * State of an evaluate operation with another peer.
86 * The bf we currently receive
88 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
91 * BF of the set's element.
93 struct GNUNET_CONTAINER_BloomFilter *local_bf;
96 * Remaining elements in the intersection operation.
97 * Maps element-id-hashes to 'elements in our set'.
99 struct GNUNET_CONTAINER_MultiHashMap *my_elements;
102 * Iterator for sending the final set of @e my_elements to the client.
104 struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
107 * Evaluate operations are held in a linked list.
109 struct OperationState *next;
112 * Evaluate operations are held in a linked list.
114 struct OperationState *prev;
117 * For multipart BF transmissions, we have to store the
118 * bloomfilter-data until we fully received it.
123 * XOR of the keys of all of the elements (remaining) in my set.
124 * Always updated when elements are added or removed to
127 struct GNUNET_HashCode my_xor;
130 * XOR of the keys of all of the elements (remaining) in
131 * the other peer's set. Updated when we receive the
132 * other peer's Bloom filter.
134 struct GNUNET_HashCode other_xor;
137 * How many bytes of @e bf_data are valid?
139 uint32_t bf_data_offset;
142 * Current element count contained within @e my_elements.
143 * (May differ briefly during initialization.)
145 uint32_t my_element_count;
148 * size of the bloomfilter in @e bf_data.
150 uint32_t bf_data_size;
153 * size of the bloomfilter
155 uint32_t bf_bits_per_element;
158 * Salt currently used for BF construction (by us or the other peer,
159 * depending on where we are in the code).
164 * Current state of the operation.
166 enum IntersectionOperationPhase phase;
169 * Generation in which the operation handle
172 unsigned int generation_created;
175 * Did we send the client that we are done?
177 int client_done_sent;
180 * Set whenever we reach the state where the death of the
181 * channel is perfectly find and should NOT result in the
182 * operation being cancelled.
184 int channel_death_expected;
189 * Extra state required for efficient set intersection.
190 * Merely tracks the total number of elements.
195 * Number of currently valid elements in the set which have not been
198 uint32_t current_set_element_count;
203 * If applicable in the current operation mode, send a result message
204 * to the client indicating we removed an element.
206 * @param op intersection operation
207 * @param element element to send
210 send_client_removed_element (struct Operation *op,
211 struct GNUNET_SET_Element *element)
213 struct GNUNET_MQ_Envelope *ev;
214 struct GNUNET_SET_ResultMessage *rm;
216 if (GNUNET_SET_RESULT_REMOVED != op->result_mode)
217 return; /* Wrong mode for transmitting removed elements */
218 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
219 "Sending removed element (size %u) to client\n",
221 GNUNET_STATISTICS_update (_GSS_statistics,
222 "# Element removed messages sent",
225 GNUNET_assert (0 != op->client_request_id);
226 ev = GNUNET_MQ_msg_extra (rm,
228 GNUNET_MESSAGE_TYPE_SET_RESULT);
234 rm->result_status = htons (GNUNET_SET_STATUS_OK);
235 rm->request_id = htonl (op->client_request_id);
236 rm->element_type = element->element_type;
237 GNUNET_memcpy (&rm[1],
240 GNUNET_MQ_send (op->set->cs->mq,
246 * Fills the "my_elements" hashmap with all relevant elements.
248 * @param cls the `struct Operation *` we are performing
249 * @param key current key code
250 * @param value the `struct ElementEntry *` from the hash map
251 * @return #GNUNET_YES (we should continue to iterate)
254 filtered_map_initialization (void *cls,
255 const struct GNUNET_HashCode *key,
258 struct Operation *op = cls;
259 struct ElementEntry *ee = value;
260 struct GNUNET_HashCode mutated_hash;
263 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
264 "FIMA called for %s:%u\n",
265 GNUNET_h2s (&ee->element_hash),
268 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
270 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
271 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
272 GNUNET_h2s (&ee->element_hash),
274 return GNUNET_YES; /* element not valid in our operation's generation */
277 /* Test if element is in other peer's bloomfilter */
278 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
282 "Testing mingled hash %s with salt %u\n",
283 GNUNET_h2s (&mutated_hash),
286 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
289 /* remove this element */
290 send_client_removed_element (op,
292 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
293 "Reduced initialization, not starting with %s:%u\n",
294 GNUNET_h2s (&ee->element_hash),
298 op->state->my_element_count++;
299 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
302 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
303 "Filtered initialization of my_elements, adding %s:%u\n",
304 GNUNET_h2s (&ee->element_hash),
306 GNUNET_break (GNUNET_YES ==
307 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
310 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
317 * Removes elements from our hashmap if they are not contained within the
318 * provided remote bloomfilter.
320 * @param cls closure with the `struct Operation *`
321 * @param key current key code
322 * @param value value in the hash map
323 * @return #GNUNET_YES (we should continue to iterate)
326 iterator_bf_reduce (void *cls,
327 const struct GNUNET_HashCode *key,
330 struct Operation *op = cls;
331 struct ElementEntry *ee = value;
332 struct GNUNET_HashCode mutated_hash;
334 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
337 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
338 "Testing mingled hash %s with salt %u\n",
339 GNUNET_h2s (&mutated_hash),
342 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
345 GNUNET_break (0 < op->state->my_element_count);
346 op->state->my_element_count--;
347 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
350 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
351 "Bloom filter reduction of my_elements, removing %s:%u\n",
352 GNUNET_h2s (&ee->element_hash),
354 GNUNET_assert (GNUNET_YES ==
355 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
358 send_client_removed_element (op,
363 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
364 "Bloom filter reduction of my_elements, keeping %s:%u\n",
365 GNUNET_h2s (&ee->element_hash),
373 * Create initial bloomfilter based on all the elements given.
375 * @param cls the `struct Operation *`
376 * @param key current key code
377 * @param value the `struct ElementEntry` to process
378 * @return #GNUNET_YES (we should continue to iterate)
381 iterator_bf_create (void *cls,
382 const struct GNUNET_HashCode *key,
385 struct Operation *op = cls;
386 struct ElementEntry *ee = value;
387 struct GNUNET_HashCode mutated_hash;
389 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
393 "Initializing BF with hash %s with salt %u\n",
394 GNUNET_h2s (&mutated_hash),
396 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
403 * Inform the client that the intersection operation has failed,
404 * and proceed to destroy the evaluate operation.
406 * @param op the intersection operation to fail
409 fail_intersection_operation (struct Operation *op)
411 struct GNUNET_MQ_Envelope *ev;
412 struct GNUNET_SET_ResultMessage *msg;
414 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
415 "Intersection operation failed\n");
416 GNUNET_STATISTICS_update (_GSS_statistics,
417 "# Intersection operations failed",
420 if (NULL != op->state->my_elements)
422 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
423 op->state->my_elements = NULL;
425 ev = GNUNET_MQ_msg (msg,
426 GNUNET_MESSAGE_TYPE_SET_RESULT);
427 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
428 msg->request_id = htonl (op->client_request_id);
429 msg->element_type = htons (0);
430 GNUNET_MQ_send (op->set->cs->mq,
432 _GSS_operation_destroy (op,
438 * Send a bloomfilter to our peer. After the result done message has
439 * been sent to the client, destroy the evaluate operation.
441 * @param op intersection operation
444 send_bloomfilter (struct Operation *op)
446 struct GNUNET_MQ_Envelope *ev;
447 struct BFMessage *msg;
449 uint32_t bf_elementbits;
454 /* We consider the ratio of the set sizes to determine
455 the number of bits per element, as the smaller set
456 should use more bits to maximize its set reduction
457 potential and minimize overall bandwidth consumption. */
458 bf_elementbits = 2 + ceil (log2((double)
459 (op->remote_element_count /
460 (double) op->state->my_element_count)));
461 if (bf_elementbits < 1)
462 bf_elementbits = 1; /* make sure k is not 0 */
463 /* optimize BF-size to ~50% of bits set */
464 bf_size = ceil ((double) (op->state->my_element_count
465 * bf_elementbits / log(2)));
466 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
467 "Sending Bloom filter (%u) of size %u bytes\n",
468 (unsigned int) bf_elementbits,
469 (unsigned int) bf_size);
470 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
473 op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
475 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
479 /* send our Bloom filter */
480 GNUNET_STATISTICS_update (_GSS_statistics,
481 "# Intersection Bloom filters sent",
484 chunk_size = 60 * 1024 - sizeof (struct BFMessage);
485 if (bf_size <= chunk_size)
488 chunk_size = bf_size;
489 ev = GNUNET_MQ_msg_extra (msg,
491 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
492 GNUNET_assert (GNUNET_SYSERR !=
493 GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
496 msg->sender_element_count = htonl (op->state->my_element_count);
497 msg->bloomfilter_total_length = htonl (bf_size);
498 msg->bits_per_element = htonl (bf_elementbits);
499 msg->sender_mutator = htonl (op->state->salt);
500 msg->element_xor_hash = op->state->my_xor;
501 GNUNET_MQ_send (op->mq, ev);
506 bf_data = GNUNET_malloc (bf_size);
507 GNUNET_assert (GNUNET_SYSERR !=
508 GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
512 while (offset < bf_size)
514 if (bf_size - chunk_size < offset)
515 chunk_size = bf_size - offset;
516 ev = GNUNET_MQ_msg_extra (msg,
518 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
519 GNUNET_memcpy (&msg[1],
522 offset += chunk_size;
523 msg->sender_element_count = htonl (op->state->my_element_count);
524 msg->bloomfilter_total_length = htonl (bf_size);
525 msg->bits_per_element = htonl (bf_elementbits);
526 msg->sender_mutator = htonl (op->state->salt);
527 msg->element_xor_hash = op->state->my_xor;
528 GNUNET_MQ_send (op->mq, ev);
530 GNUNET_free (bf_data);
532 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
533 op->state->local_bf = NULL;
538 * Signal to the client that the operation has finished and
539 * destroy the operation.
541 * @param cls operation to destroy
544 send_client_done_and_destroy (void *cls)
546 struct Operation *op = cls;
547 struct GNUNET_MQ_Envelope *ev;
548 struct GNUNET_SET_ResultMessage *rm;
550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
551 "Intersection succeeded, sending DONE to local client\n");
552 GNUNET_STATISTICS_update (_GSS_statistics,
553 "# Intersection operations succeeded",
556 ev = GNUNET_MQ_msg (rm,
557 GNUNET_MESSAGE_TYPE_SET_RESULT);
558 rm->request_id = htonl (op->client_request_id);
559 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
560 rm->element_type = htons (0);
561 GNUNET_MQ_send (op->set->cs->mq,
563 _GSS_operation_destroy (op,
569 * Remember that we are done dealing with the local client
570 * AND have sent the other peer our message that we are done,
571 * so we are not just waiting for the channel to die before
572 * telling the local client that we are done as our last act.
574 * @param cls the `struct Operation`.
577 finished_local_operations (void *cls)
579 struct Operation *op = cls;
581 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
582 "DONE sent to other peer, now waiting for other end to close the channel\n");
583 op->state->phase = PHASE_FINISHED;
584 op->state->channel_death_expected = GNUNET_YES;
589 * Notify the other peer that we are done. Once this message
590 * is out, we still need to notify the local client that we
593 * @param op operation to notify for.
596 send_p2p_done (struct Operation *op)
598 struct GNUNET_MQ_Envelope *ev;
599 struct IntersectionDoneMessage *idm;
601 GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
602 GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
603 ev = GNUNET_MQ_msg (idm,
604 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
605 idm->final_element_count = htonl (op->state->my_element_count);
606 idm->element_xor_hash = op->state->my_xor;
607 GNUNET_MQ_notify_sent (ev,
608 &finished_local_operations,
610 GNUNET_MQ_send (op->mq,
616 * Send all elements in the full result iterator.
618 * @param cls the `struct Operation *`
621 send_remaining_elements (void *cls)
623 struct Operation *op = cls;
625 const struct ElementEntry *ee;
626 struct GNUNET_MQ_Envelope *ev;
627 struct GNUNET_SET_ResultMessage *rm;
628 const struct GNUNET_SET_Element *element;
631 res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
634 if (GNUNET_NO == res)
636 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
637 "Sending done and destroy because iterator ran out\n");
638 GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
639 op->state->full_result_iter = NULL;
640 if (PHASE_DONE_RECEIVED == op->state->phase)
642 op->state->phase = PHASE_FINISHED;
643 send_client_done_and_destroy (op);
645 else if (PHASE_MUST_SEND_DONE == op->state->phase)
656 element = &ee->element;
657 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658 "Sending element %s:%u to client (full set)\n",
659 GNUNET_h2s (&ee->element_hash),
661 GNUNET_assert (0 != op->client_request_id);
662 ev = GNUNET_MQ_msg_extra (rm,
664 GNUNET_MESSAGE_TYPE_SET_RESULT);
665 GNUNET_assert (NULL != ev);
666 rm->result_status = htons (GNUNET_SET_STATUS_OK);
667 rm->request_id = htonl (op->client_request_id);
668 rm->element_type = element->element_type;
669 GNUNET_memcpy (&rm[1],
672 GNUNET_MQ_notify_sent (ev,
673 &send_remaining_elements,
675 GNUNET_MQ_send (op->set->cs->mq,
681 * Fills the "my_elements" hashmap with the initial set of
682 * (non-deleted) elements from the set of the specification.
684 * @param cls closure with the `struct Operation *`
685 * @param key current key code for the element
686 * @param value value in the hash map with the `struct ElementEntry *`
687 * @return #GNUNET_YES (we should continue to iterate)
690 initialize_map_unfiltered (void *cls,
691 const struct GNUNET_HashCode *key,
694 struct ElementEntry *ee = value;
695 struct Operation *op = cls;
697 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
698 return GNUNET_YES; /* element not live in operation's generation */
699 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
702 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
703 "Initial full initialization of my_elements, adding %s:%u\n",
704 GNUNET_h2s (&ee->element_hash),
706 GNUNET_break (GNUNET_YES ==
707 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
710 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
716 * Send our element count to the peer, in case our element count is
719 * @param op intersection operation
722 send_element_count (struct Operation *op)
724 struct GNUNET_MQ_Envelope *ev;
725 struct IntersectionElementInfoMessage *msg;
727 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728 "Sending our element count (%u)\n",
729 op->state->my_element_count);
730 ev = GNUNET_MQ_msg (msg,
731 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
732 msg->sender_element_count = htonl (op->state->my_element_count);
733 GNUNET_MQ_send (op->mq, ev);
738 * We go first, initialize our map with all elements and
739 * send the first Bloom filter.
741 * @param op operation to start exchange for
744 begin_bf_exchange (struct Operation *op)
746 op->state->phase = PHASE_BF_EXCHANGE;
747 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
748 &initialize_map_unfiltered,
750 send_bloomfilter (op);
755 * Handle the initial `struct IntersectionElementInfoMessage` from a
758 * @param cls the intersection operation
759 * @param mh the header of the message
762 handle_intersection_p2p_element_info (void *cls,
763 const struct IntersectionElementInfoMessage *msg)
765 struct Operation *op = cls;
767 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
770 fail_intersection_operation(op);
773 op->remote_element_count = ntohl (msg->sender_element_count);
774 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
775 "Received remote element count (%u), I have %u\n",
776 op->remote_element_count,
777 op->state->my_element_count);
778 if ( ( (PHASE_INITIAL != op->state->phase) &&
779 (PHASE_COUNT_SENT != op->state->phase) ) ||
780 (op->state->my_element_count > op->remote_element_count) ||
781 (0 == op->state->my_element_count) ||
782 (0 == op->remote_element_count) )
785 fail_intersection_operation(op);
788 GNUNET_break (NULL == op->state->remote_bf);
789 begin_bf_exchange (op);
790 GNUNET_CADET_receive_done (op->channel);
795 * Process a Bloomfilter once we got all the chunks.
797 * @param op the intersection operation
800 process_bf (struct Operation *op)
802 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
803 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
805 op->remote_element_count,
806 op->state->my_element_count,
807 GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
808 switch (op->state->phase)
812 fail_intersection_operation(op);
814 case PHASE_COUNT_SENT:
815 /* This is the first BF being sent, build our initial map with
816 filtering in place */
817 op->state->my_element_count = 0;
818 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
819 &filtered_map_initialization,
822 case PHASE_BF_EXCHANGE:
823 /* Update our set by reduction */
824 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
828 case PHASE_MUST_SEND_DONE:
830 fail_intersection_operation(op);
832 case PHASE_DONE_RECEIVED:
834 fail_intersection_operation(op);
838 fail_intersection_operation(op);
841 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
842 op->state->remote_bf = NULL;
844 if ( (0 == op->state->my_element_count) || /* fully disjoint */
845 ( (op->state->my_element_count == op->remote_element_count) &&
846 (0 == memcmp (&op->state->my_xor,
847 &op->state->other_xor,
848 sizeof (struct GNUNET_HashCode))) ) )
851 op->state->phase = PHASE_MUST_SEND_DONE;
852 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853 "Intersection succeeded, sending DONE to other peer\n");
854 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
855 op->state->local_bf = NULL;
856 if (GNUNET_SET_RESULT_FULL == op->result_mode)
858 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859 "Sending full result set (%u elements)\n",
860 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
861 op->state->full_result_iter
862 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
863 send_remaining_elements (op);
869 op->state->phase = PHASE_BF_EXCHANGE;
870 send_bloomfilter (op);
875 * Check an BF message from a remote peer.
877 * @param cls the intersection operation
878 * @param msg the header of the message
879 * @return #GNUNET_OK if @a msg is well-formed
882 check_intersection_p2p_bf (void *cls,
883 const struct BFMessage *msg)
885 struct Operation *op = cls;
887 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
890 return GNUNET_SYSERR;
897 * Handle an BF message from a remote peer.
899 * @param cls the intersection operation
900 * @param msg the header of the message
903 handle_intersection_p2p_bf (void *cls,
904 const struct BFMessage *msg)
906 struct Operation *op = cls;
909 uint32_t bf_bits_per_element;
911 switch (op->state->phase)
915 fail_intersection_operation (op);
917 case PHASE_COUNT_SENT:
918 case PHASE_BF_EXCHANGE:
919 bf_size = ntohl (msg->bloomfilter_total_length);
920 bf_bits_per_element = ntohl (msg->bits_per_element);
921 chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
922 op->state->other_xor = msg->element_xor_hash;
923 if (bf_size == chunk_size)
925 if (NULL != op->state->bf_data)
928 fail_intersection_operation (op);
931 /* single part, done here immediately */
933 = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
935 bf_bits_per_element);
936 op->state->salt = ntohl (msg->sender_mutator);
937 op->remote_element_count = ntohl (msg->sender_element_count);
941 /* multipart chunk */
942 if (NULL == op->state->bf_data)
944 /* first chunk, initialize */
945 op->state->bf_data = GNUNET_malloc (bf_size);
946 op->state->bf_data_size = bf_size;
947 op->state->bf_bits_per_element = bf_bits_per_element;
948 op->state->bf_data_offset = 0;
949 op->state->salt = ntohl (msg->sender_mutator);
950 op->remote_element_count = ntohl (msg->sender_element_count);
955 if ( (op->state->bf_data_size != bf_size) ||
956 (op->state->bf_bits_per_element != bf_bits_per_element) ||
957 (op->state->bf_data_offset + chunk_size > bf_size) ||
958 (op->state->salt != ntohl (msg->sender_mutator)) ||
959 (op->remote_element_count != ntohl (msg->sender_element_count)) )
962 fail_intersection_operation (op);
966 GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset],
967 (const char*) &msg[1],
969 op->state->bf_data_offset += chunk_size;
970 if (op->state->bf_data_offset == bf_size)
972 /* last chunk, run! */
974 = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
976 bf_bits_per_element);
977 GNUNET_free (op->state->bf_data);
978 op->state->bf_data = NULL;
979 op->state->bf_data_size = 0;
985 fail_intersection_operation (op);
988 GNUNET_CADET_receive_done (op->channel);
993 * Remove all elements from our hashmap.
995 * @param cls closure with the `struct Operation *`
996 * @param key current key code
997 * @param value value in the hash map
998 * @return #GNUNET_YES (we should continue to iterate)
1001 filter_all (void *cls,
1002 const struct GNUNET_HashCode *key,
1005 struct Operation *op = cls;
1006 struct ElementEntry *ee = value;
1008 GNUNET_break (0 < op->state->my_element_count);
1009 op->state->my_element_count--;
1010 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
1012 &op->state->my_xor);
1013 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014 "Final reduction of my_elements, removing %s:%u\n",
1015 GNUNET_h2s (&ee->element_hash),
1017 GNUNET_assert (GNUNET_YES ==
1018 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
1021 send_client_removed_element (op,
1028 * Handle a done message from a remote peer
1030 * @param cls the intersection operation
1031 * @param mh the message
1034 handle_intersection_p2p_done (void *cls,
1035 const struct IntersectionDoneMessage *idm)
1037 struct Operation *op = cls;
1039 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
1041 GNUNET_break_op (0);
1042 fail_intersection_operation (op);
1045 if (PHASE_BF_EXCHANGE != op->state->phase)
1047 /* wrong phase to conclude? FIXME: Or should we allow this
1048 if the other peer has _initially_ already an empty set? */
1049 GNUNET_break_op (0);
1050 fail_intersection_operation (op);
1053 if (0 == ntohl (idm->final_element_count))
1055 /* other peer determined empty set is the intersection,
1056 remove all elements */
1057 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
1061 if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
1062 (0 != memcmp (&op->state->my_xor,
1063 &idm->element_xor_hash,
1064 sizeof (struct GNUNET_HashCode))) )
1066 /* Other peer thinks we are done, but we disagree on the result! */
1067 GNUNET_break_op (0);
1068 fail_intersection_operation (op);
1071 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072 "Got IntersectionDoneMessage, have %u elements in intersection\n",
1073 op->state->my_element_count);
1074 op->state->phase = PHASE_DONE_RECEIVED;
1075 GNUNET_CADET_receive_done (op->channel);
1077 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1078 if (GNUNET_SET_RESULT_FULL == op->result_mode)
1080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1081 "Sending full result set to client (%u elements)\n",
1082 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
1083 op->state->full_result_iter
1084 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
1085 send_remaining_elements (op);
1088 op->state->phase = PHASE_FINISHED;
1089 send_client_done_and_destroy (op);
1094 * Initiate a set intersection operation with a remote peer.
1096 * @param op operation that is created, should be initialized to
1097 * begin the evaluation
1098 * @param opaque_context message to be transmitted to the listener
1099 * to convince it to accept, may be NULL
1100 * @return operation-specific state to keep in @a op
1102 static struct OperationState *
1103 intersection_evaluate (struct Operation *op,
1104 const struct GNUNET_MessageHeader *opaque_context)
1106 struct OperationState *state;
1107 struct GNUNET_MQ_Envelope *ev;
1108 struct OperationRequestMessage *msg;
1110 ev = GNUNET_MQ_msg_nested_mh (msg,
1111 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1115 /* the context message is too large!? */
1119 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1120 "Initiating intersection operation evaluation\n");
1121 state = GNUNET_new (struct OperationState);
1122 /* we started the operation, thus we have to send the operation request */
1123 state->phase = PHASE_INITIAL;
1124 state->my_element_count = op->set->state->current_set_element_count;
1126 = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
1129 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1130 msg->element_count = htonl (state->my_element_count);
1131 GNUNET_MQ_send (op->mq,
1133 state->phase = PHASE_COUNT_SENT;
1134 if (NULL != opaque_context)
1135 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136 "Sent op request with context message\n");
1138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139 "Sent op request without context message\n");
1145 * Accept an intersection operation request from a remote peer. Only
1146 * initializes the private operation state.
1148 * @param op operation that will be accepted as an intersection operation
1150 static struct OperationState *
1151 intersection_accept (struct Operation *op)
1153 struct OperationState *state;
1155 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1156 "Accepting set intersection operation\n");
1157 state = GNUNET_new (struct OperationState);
1158 state->phase = PHASE_INITIAL;
1159 state->my_element_count
1160 = op->set->state->current_set_element_count;
1162 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
1163 op->remote_element_count),
1166 if (op->remote_element_count < state->my_element_count)
1168 /* If the other peer (Alice) has fewer elements than us (Bob),
1169 we just send the count as Alice should send the first BF */
1170 send_element_count (op);
1171 state->phase = PHASE_COUNT_SENT;
1174 /* We have fewer elements, so we start with the BF */
1175 begin_bf_exchange (op);
1181 * Destroy the intersection operation. Only things specific to the
1182 * intersection operation are destroyed.
1184 * @param op intersection operation to destroy
1187 intersection_op_cancel (struct Operation *op)
1189 /* check if the op was canceled twice */
1190 GNUNET_assert (NULL != op->state);
1191 if (NULL != op->state->remote_bf)
1193 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1194 op->state->remote_bf = NULL;
1196 if (NULL != op->state->local_bf)
1198 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1199 op->state->local_bf = NULL;
1201 if (NULL != op->state->my_elements)
1203 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1204 op->state->my_elements = NULL;
1206 if (NULL != op->state->full_result_iter)
1208 GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
1209 op->state->full_result_iter = NULL;
1211 GNUNET_free (op->state);
1213 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214 "Destroying intersection op state done\n");
1219 * Create a new set supporting the intersection operation.
1221 * @return the newly created set
1223 static struct SetState *
1224 intersection_set_create ()
1226 struct SetState *set_state;
1228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1229 "Intersection set created\n");
1230 set_state = GNUNET_new (struct SetState);
1231 set_state->current_set_element_count = 0;
1238 * Add the element from the given element message to the set.
1240 * @param set_state state of the set want to add to
1241 * @param ee the element to add to the set
1244 intersection_add (struct SetState *set_state,
1245 struct ElementEntry *ee)
1247 set_state->current_set_element_count++;
1252 * Destroy a set that supports the intersection operation
1254 * @param set_state the set to destroy
1257 intersection_set_destroy (struct SetState *set_state)
1259 GNUNET_free (set_state);
1264 * Remove the element given in the element message from the set.
1266 * @param set_state state of the set to remove from
1267 * @param element set element to remove
1270 intersection_remove (struct SetState *set_state,
1271 struct ElementEntry *element)
1273 GNUNET_assert (0 < set_state->current_set_element_count);
1274 set_state->current_set_element_count--;
1279 * Callback for channel death for the intersection operation.
1281 * @param op operation that lost the channel
1284 intersection_channel_death (struct Operation *op)
1286 if (GNUNET_YES == op->state->channel_death_expected)
1288 /* oh goodie, we are done! */
1289 send_client_done_and_destroy (op);
1293 /* sorry, channel went down early, too bad. */
1294 _GSS_operation_destroy (op,
1301 * Get the table with implementing functions for set intersection.
1303 * @return the operation specific VTable
1305 const struct SetVT *
1306 _GSS_intersection_vt ()
1308 static const struct SetVT intersection_vt = {
1309 .create = &intersection_set_create,
1310 .add = &intersection_add,
1311 .remove = &intersection_remove,
1312 .destroy_set = &intersection_set_destroy,
1313 .evaluate = &intersection_evaluate,
1314 .accept = &intersection_accept,
1315 .cancel = &intersection_op_cancel,
1316 .channel_death = &intersection_channel_death,
1319 return &intersection_vt;