2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set_intersection.c
23 * @brief two-peer set intersection
24 * @author Christian Fuchs
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
35 #define CALCULATE_BF_SIZE(A, B, s, k) \
37 k = ceil(1 + log2((double) (2*B / (double) A)));\
38 if (k<1) k=1; /* k can be calculated as 0 */\
39 s = ceil((double) (A * k / log(2))); \
43 * Current phase we are in for a intersection operation.
45 enum IntersectionOperationPhase
48 * Alices has suggested an operation to bob,
49 * and is waiting for a bf or session end.
53 * Bob has accepted the operation, Bob and Alice are now exchanging bfs
54 * until one notices the their element count is equal
58 * if both peers have an equal peercount, they enter this state for
59 * one more turn, to see if they actually have agreed on a correct set.
60 * if a peer finds the same element count after the next iteration,
61 * it ends the the session
65 * The protocol is over.
66 * Results may still have to be sent to the client.
73 * State of an evaluate operation
79 * The bf we currently receive
81 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
84 * BF of the set's element.
86 struct GNUNET_CONTAINER_BloomFilter *local_bf;
89 * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
94 * size of the bloomfilter
96 uint32_t bf_data_size;
99 * size of the bloomfilter
101 uint32_t bf_bits_per_element;
104 * Current state of the operation.
106 enum IntersectionOperationPhase phase;
109 * Generation in which the operation handle
112 unsigned int generation_created;
115 * Maps element-id-hashes to 'elements in our set'.
117 struct GNUNET_CONTAINER_MultiHashMap *my_elements;
120 * Current element count contained within contained_elements
122 uint32_t my_element_count;
125 * Iterator for sending elements on the key to element mapping to the client.
127 struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
130 * Evaluate operations are held in
133 struct OperationState *next;
136 * Evaluate operations are held in
139 struct OperationState *prev;
142 * Did we send the client that we are done?
144 int client_done_sent;
149 * Extra state required for efficient set intersection.
154 * Number of currently valid elements in the set which have not been removed
156 uint32_t current_set_element_count;
161 * Send a result message to the client indicating
162 * we removed an element
164 * @param op union operation
165 * @param element element to send
168 send_client_element (struct Operation *op,
169 struct GNUNET_SET_Element *element)
171 struct GNUNET_MQ_Envelope *ev;
172 struct GNUNET_SET_ResultMessage *rm;
174 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending removed element (size %u) to client\n", element->size);
175 GNUNET_assert (0 != op->spec->client_request_id);
176 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
179 GNUNET_MQ_discard (ev);
183 rm->result_status = htons (GNUNET_SET_STATUS_OK);
184 rm->request_id = htonl (op->spec->client_request_id);
185 rm->element_type = element->type;
186 memcpy (&rm[1], element->data, element->size);
187 GNUNET_MQ_send (op->spec->set->client_mq, ev);
194 * fills the contained-elements hashmap with all relevant
195 * elements and adds their mutated hashes to our local bloomfilter with mutator+1
198 * @param key current key code
199 * @param value value in the hash map
200 * @return #GNUNET_YES if we should continue to
205 iterator_initialization_by_alice (void *cls,
206 const struct GNUNET_HashCode *key,
209 struct ElementEntry *ee = value;
210 struct Operation *op = cls;
211 struct GNUNET_HashCode mutated_hash;
213 //only consider this element, if it is valid for us
214 if ((op->generation_created < ee->generation_removed)
215 && (op->generation_created >= ee->generation_added))
218 // not contained according to bob's bloomfilter
219 GNUNET_BLOCK_mingle_hash(&ee->element_hash,
222 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
224 if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
225 send_client_element (op, &ee->element);
229 op->state->my_element_count++;
230 GNUNET_assert (GNUNET_YES ==
231 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
232 &ee->element_hash, ee,
233 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
239 * fills the contained-elements hashmap with all relevant
240 * elements and adds their mutated hashes to our local bloomfilter
243 * @param key current key code
244 * @param value value in the hash map
245 * @return #GNUNET_YES if we should continue to
250 iterator_initialization (void *cls,
251 const struct GNUNET_HashCode *key,
254 struct ElementEntry *ee = value;
255 struct Operation *op = cls;
257 //only consider this element, if it is valid for us
258 if ((op->generation_created < ee->generation_removed)
259 && (op->generation_created >= ee->generation_added))
262 GNUNET_assert (GNUNET_YES ==
263 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
264 &ee->element_hash, ee,
265 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
271 * removes element from a hashmap if it is not contained within the
272 * provided remote bloomfilter. Then, fill our new bloomfilter.
275 * @param key current key code
276 * @param value value in the hash map
277 * @return #GNUNET_YES if we should continue to
282 iterator_bf_reduce (void *cls,
283 const struct GNUNET_HashCode *key,
286 struct ElementEntry *ee = value;
287 struct Operation *op = cls;
288 struct GNUNET_HashCode mutated_hash;
290 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
293 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
296 op->state->my_element_count--;
297 GNUNET_assert (GNUNET_YES ==
298 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
301 if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
302 send_client_element (op, &ee->element);
309 * create a bloomfilter based on the elements given
312 * @param key current key code
313 * @param value value in the hash map
314 * @return #GNUNET_YES if we should continue to
319 iterator_bf_create (void *cls,
320 const struct GNUNET_HashCode *key,
323 struct ElementEntry *ee = value;
324 struct Operation *op = cls;
325 struct GNUNET_HashCode mutated_hash;
327 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
329 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
335 * Inform the client that the union operation has failed,
336 * and proceed to destroy the evaluate operation.
338 * @param op the intersection operation to fail
341 fail_intersection_operation (struct Operation *op)
343 struct GNUNET_MQ_Envelope *ev;
344 struct GNUNET_SET_ResultMessage *msg;
346 if (op->state->my_elements){
347 GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
348 op->state->my_elements = NULL;
350 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
352 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
353 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
354 msg->request_id = htonl (op->spec->client_request_id);
355 msg->element_type = htons (0);
356 GNUNET_MQ_send (op->spec->set->client_mq, ev);
357 _GSS_operation_destroy (op);
362 * Send a request for the evaluate operation to a remote peer
364 * @param op operation with the other peer
367 send_operation_request (struct Operation *op)
369 struct GNUNET_MQ_Envelope *ev;
370 struct OperationRequestMessage *msg;
372 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
373 op->spec->context_msg);
377 /* the context message is too large */
379 GNUNET_SERVER_client_disconnect (op->spec->set->client);
382 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
383 msg->app_id = op->spec->app_id;
384 msg->salt = htonl (op->spec->salt);
385 msg->element_count = htonl(op->state->my_element_count);
387 GNUNET_MQ_send (op->mq, ev);
389 if (NULL != op->spec->context_msg)
390 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
394 if (NULL != op->spec->context_msg)
396 GNUNET_free (op->spec->context_msg);
397 op->spec->context_msg = NULL;
402 send_bloomfilter_multipart (struct Operation *op, uint32_t offset)
404 struct GNUNET_MQ_Envelope *ev;
406 uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
407 uint32_t todo_size = op->state->bf_data_size - offset;
409 if (todo_size < chunk_size)
410 chunk_size = todo_size;
412 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
414 msg->chunk_length = htonl (chunk_size);
415 msg->chunk_offset = htonl (offset);
416 memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
418 GNUNET_MQ_send (op->mq, ev);
420 if (op->state->bf_data_size == offset + chunk_size)
423 GNUNET_free(op->state->bf_data);
424 op->state->bf_data = NULL;
428 send_bloomfilter_multipart (op, offset + chunk_size);
432 * Send a bloomfilter to our peer.
433 * that the operation is over.
434 * After the result done message has been sent to the client,
435 * destroy the evaluate operation.
437 * @param op intersection operation
440 send_bloomfilter (struct Operation *op)
442 struct GNUNET_MQ_Envelope *ev;
443 struct BFMessage *msg;
445 uint32_t bf_elementbits;
447 struct GNUNET_CONTAINER_BloomFilter * local_bf;
449 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
451 CALCULATE_BF_SIZE(op->state->my_element_count,
452 op->spec->remote_element_count,
456 local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
461 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
465 // send our bloomfilter
466 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) {
468 chunk_size = bf_size;
469 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
470 GNUNET_assert (GNUNET_SYSERR !=
471 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
477 chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
478 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
479 op->state->bf_data = (char *) GNUNET_malloc (bf_size);
480 GNUNET_assert (GNUNET_SYSERR !=
481 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
484 memcpy (&msg[1], op->state->bf_data, chunk_size);
485 op->state->bf_data_size = bf_size;
487 GNUNET_CONTAINER_bloomfilter_free (local_bf);
489 msg->sender_element_count = htonl (op->state->my_element_count);
490 msg->bloomfilter_total_length = htonl (bf_size);
491 msg->bloomfilter_length = htonl (chunk_size);
492 msg->bits_per_element = htonl (bf_elementbits);
493 msg->sender_mutator = htonl (op->spec->salt);
495 GNUNET_MQ_send (op->mq, ev);
497 if (op->state->bf_data)
498 send_bloomfilter_multipart (op, chunk_size);
503 * Signal to the client that the operation has finished and
504 * destroy the operation.
506 * @param cls operation to destroy
509 send_client_done_and_destroy (void *cls)
511 struct Operation *op = cls;
512 struct GNUNET_MQ_Envelope *ev;
513 struct GNUNET_SET_ResultMessage *rm;
514 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
515 rm->request_id = htonl (op->spec->client_request_id);
516 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
517 rm->element_type = htons (0);
518 GNUNET_MQ_send (op->spec->set->client_mq, ev);
519 _GSS_operation_destroy (op);
524 * Send all elements in the full result iterator.
526 * @param cls operation
529 send_remaining_elements (void *cls)
531 struct Operation *op = cls;
532 struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
533 struct GNUNET_MQ_Envelope *ev;
534 struct GNUNET_SET_ResultMessage *rm;
535 struct GNUNET_SET_Element *element;
538 res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
539 if (GNUNET_NO == res) {
540 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
541 send_client_done_and_destroy (op);
545 element = &remaining->element;
546 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
547 GNUNET_assert (0 != op->spec->client_request_id);
549 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
550 GNUNET_assert (NULL != ev);
552 rm->result_status = htons (GNUNET_SET_STATUS_OK);
553 rm->request_id = htonl (op->spec->client_request_id);
554 rm->element_type = element->type;
555 memcpy (&rm[1], element->data, element->size);
557 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
558 GNUNET_MQ_send (op->spec->set->client_mq, ev);
563 * Inform the peer that this operation is complete.
565 * @param op the intersection operation to fail
568 send_peer_done (struct Operation *op)
570 struct GNUNET_MQ_Envelope *ev;
572 op->state->phase = PHASE_FINISHED;
573 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
574 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
575 op->state->local_bf = NULL;
577 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
578 GNUNET_MQ_send (op->mq, ev);
583 * Process a Bloomfilter once we got all the chunks
585 * @param op the intersection operation
588 process_bf (struct Operation *op){
589 uint32_t old_elements;
590 uint32_t peer_elements;
592 old_elements = op->state->my_element_count;
593 peer_elements = op->spec->remote_element_count;
594 switch (op->state->phase)
597 // If we are ot our first msg
598 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
600 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
601 &iterator_initialization_by_alice,
604 case PHASE_BF_EXCHANGE:
605 case PHASE_MAYBE_FINISHED:
606 // if we are bob or alice and are continuing operation
607 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
613 fail_intersection_operation(op);
615 // the iterators created a new BF with salt+1
616 // the peer needs this information for decoding the next BF
617 // this behavior can be modified at will later on.
620 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
621 op->state->remote_bf = NULL;
623 if ((0 == op->state->my_element_count) // fully disjoint
624 || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements
625 && (old_elements == op->state->my_element_count)
626 && (op->state->my_element_count == peer_elements))) {
627 // In the last round we though we were finished, we now know this is correct
632 op->state->phase = PHASE_BF_EXCHANGE;
633 if (op->state->my_element_count == peer_elements)
634 // maybe we are finished, but we do one more round to make certain
635 // we don't have false positives ...
636 op->state->phase = PHASE_MAYBE_FINISHED;
638 send_bloomfilter (op);
643 * Handle an BF multipart message from a remote peer.
645 * @param cls the intersection operation
646 * @param mh the header of the message
649 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
651 struct Operation *op = cls;
652 const struct BFPart *msg = (const struct BFPart *) mh;
654 uint32_t chunk_offset;
656 chunk_size = ntohl(msg->chunk_length);
657 chunk_offset = ntohl(msg->chunk_offset);
659 if ((NULL == op->state->bf_data)
660 || (op->state->bf_data_size < chunk_size + chunk_offset)){
661 // unexpected multipart chunk
663 fail_intersection_operation(op);
667 memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
669 if (op->state->bf_data_size != chunk_offset + chunk_size)
670 // wait for next chunk
673 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
674 op->state->bf_data_size,
675 op->state->bf_bits_per_element);
677 GNUNET_free (op->state->bf_data);
678 op->state->bf_data = NULL;
685 * Handle an BF message from a remote peer.
687 * @param cls the intersection operation
688 * @param mh the header of the message
691 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
693 struct Operation *op = cls;
694 const struct BFMessage *msg = (const struct BFMessage *) mh;
697 uint32_t bf_bits_per_element;
699 switch (op->state->phase)
702 case PHASE_BF_EXCHANGE:
703 case PHASE_MAYBE_FINISHED:
704 if (NULL == op->state->bf_data) {
705 // no colliding multipart transaction going on currently
706 op->spec->salt = ntohl (msg->sender_mutator);
707 bf_size = ntohl (msg->bloomfilter_total_length);
708 bf_bits_per_element = ntohl (msg->bits_per_element);
709 chunk_size = ntohl (msg->bloomfilter_length);
710 op->spec->remote_element_count = ntohl(msg->sender_element_count);
711 if (bf_size == chunk_size) {
712 // single part, done here
713 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
715 bf_bits_per_element);
720 //first multipart chunk
721 op->state->bf_data = GNUNET_malloc (bf_size);
722 op->state->bf_data_size = bf_size;
723 op->state->bf_bits_per_element = bf_bits_per_element;
724 memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
729 fail_intersection_operation (op);
735 * Handle an BF message from a remote peer.
737 * @param cls the intersection operation
738 * @param mh the header of the message
741 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
743 struct Operation *op = cls;
744 struct BFMessage *msg = (struct BFMessage *) mh;
746 op->spec->remote_element_count = ntohl(msg->sender_element_count);
747 if ((op->state->phase != PHASE_INITIAL)
748 || (op->state->my_element_count > op->spec->remote_element_count)
749 || (0 == op->state->my_element_count)
750 || (0 == op->spec->remote_element_count)){
752 fail_intersection_operation(op);
756 op->state->phase = PHASE_BF_EXCHANGE;
757 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
759 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
760 &iterator_initialization,
763 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
764 op->state->remote_bf = NULL;
766 if (op->state->my_element_count == ntohl (msg->sender_element_count))
767 op->state->phase = PHASE_MAYBE_FINISHED;
769 send_bloomfilter (op);
774 * Send our element count to the peer, in case our element count is lower than his
776 * @param op intersection operation
779 send_element_count (struct Operation *op)
781 struct GNUNET_MQ_Envelope *ev;
782 struct BFMessage *msg;
784 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
786 // just send our element count, as the other peer must start
787 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
788 msg->sender_element_count = htonl (op->state->my_element_count);
789 msg->bloomfilter_length = htonl (0);
790 msg->sender_mutator = htonl (0);
792 GNUNET_MQ_send (op->mq, ev);
797 * Send a result message to the client indicating
798 * that the operation is over.
799 * After the result done message has been sent to the client,
800 * destroy the evaluate operation.
802 * @param op intersection operation
805 finish_and_destroy (struct Operation *op)
807 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
809 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
812 op->state->full_result_iter =
813 GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
814 send_remaining_elements (op);
817 send_client_done_and_destroy (op);
822 * Handle a done message from a remote peer
824 * @param cls the union operation
825 * @param mh the message
828 handle_p2p_done (void *cls,
829 const struct GNUNET_MessageHeader *mh)
831 struct Operation *op = cls;
833 if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
834 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
836 finish_and_destroy (op);
841 fail_intersection_operation (op);
846 * Evaluate a union operation with
849 * @param op operation to evaluate
852 intersection_evaluate (struct Operation *op)
854 op->state = GNUNET_new (struct OperationState);
855 /* we started the operation, thus we have to send the operation request */
856 op->state->phase = PHASE_INITIAL;
857 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
858 op->state->my_element_count = op->spec->set->state->current_set_element_count;
860 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861 "evaluating intersection operation");
862 send_operation_request (op);
867 * Accept an union operation request from a remote peer.
868 * Only initializes the private operation state.
870 * @param op operation that will be accepted as a union operation
873 intersection_accept (struct Operation *op)
875 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
876 op->state = GNUNET_new (struct OperationState);
877 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
878 op->state->my_element_count = op->spec->set->state->current_set_element_count;
880 // if Alice (the peer) has more elements than Bob (us), she should start
881 if (op->spec->remote_element_count < op->state->my_element_count){
882 op->state->phase = PHASE_INITIAL;
883 send_element_count(op);
886 // create a new bloomfilter in case we have fewer elements
887 op->state->phase = PHASE_BF_EXCHANGE;
888 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
890 GNUNET_CONSTANTS_BLOOMFILTER_K);
891 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
892 &iterator_initialization,
894 send_bloomfilter (op);
899 * Create a new set supporting the intersection operation
901 * @return the newly created set
903 static struct SetState *
904 intersection_set_create ()
906 struct SetState *set_state;
908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909 "intersection set created\n");
910 set_state = GNUNET_new (struct SetState);
911 set_state->current_set_element_count = 0;
918 * Add the element from the given element message to the set.
920 * @param set_state state of the set want to add to
921 * @param ee the element to add to the set
924 intersection_add (struct SetState *set_state,
925 struct ElementEntry *ee)
927 set_state->current_set_element_count++;
932 * Destroy a set that supports the intersection operation
934 * @param set_state the set to destroy
937 intersection_set_destroy (struct SetState *set_state)
939 GNUNET_free (set_state);
944 * Remove the element given in the element message from the set.
946 * @param set_state state of the set to remove from
947 * @param element set element to remove
950 intersection_remove (struct SetState *set_state,
951 struct ElementEntry *element)
953 GNUNET_assert(0 < set_state->current_set_element_count);
954 set_state->current_set_element_count--;
959 * Dispatch messages for a intersection operation.
961 * @param op the state of the intersection evaluate operation
962 * @param mh the received message
963 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
964 * #GNUNET_OK otherwise
967 intersection_handle_p2p_message (struct Operation *op,
968 const struct GNUNET_MessageHeader *mh)
970 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
971 ntohs (mh->type), ntohs (mh->size));
972 switch (ntohs (mh->type))
974 /* this message handler is not active until after we received an
975 * operation request message, thus the ops request is not handled here
977 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
978 handle_p2p_element_info (op, mh);
980 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
981 handle_p2p_bf (op, mh);
983 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
984 handle_p2p_bf_part (op, mh);
986 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
987 handle_p2p_done (op, mh);
990 /* something wrong with cadet's message handlers? */
998 * handler for peer-disconnects, notifies the client about the aborted operation
1000 * @param op the destroyed operation
1003 intersection_peer_disconnect (struct Operation *op)
1005 if (PHASE_FINISHED != op->state->phase)
1007 struct GNUNET_MQ_Envelope *ev;
1008 struct GNUNET_SET_ResultMessage *msg;
1010 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1011 msg->request_id = htonl (op->spec->client_request_id);
1012 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1013 msg->element_type = htons (0);
1014 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1015 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1016 _GSS_operation_destroy (op);
1019 // else: the session has already been concluded
1020 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1021 if (GNUNET_NO == op->state->client_done_sent)
1022 finish_and_destroy (op);
1027 * Destroy the union operation. Only things specific to the union operation are destroyed.
1029 * @param op union operation to destroy
1032 intersection_op_cancel (struct Operation *op)
1034 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
1035 /* check if the op was canceled twice */
1036 GNUNET_assert (NULL != op->state);
1037 if (NULL != op->state->remote_bf)
1039 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1040 op->state->remote_bf = NULL;
1042 if (NULL != op->state->local_bf)
1044 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1045 op->state->local_bf = NULL;
1047 /* if (NULL != op->state->my_elements)
1049 // no need to free the elements, they are still part of the set
1050 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1051 op->state->my_elements = NULL;
1053 GNUNET_free (op->state);
1055 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
1058 const struct SetVT *
1059 _GSS_intersection_vt ()
1061 static const struct SetVT intersection_vt = {
1062 .create = &intersection_set_create,
1063 .msg_handler = &intersection_handle_p2p_message,
1064 .add = &intersection_add,
1065 .remove = &intersection_remove,
1066 .destroy_set = &intersection_set_destroy,
1067 .evaluate = &intersection_evaluate,
1068 .accept = &intersection_accept,
1069 .peer_disconnect = &intersection_peer_disconnect,
1070 .cancel = &intersection_op_cancel,
1073 return &intersection_vt;