2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set_intersection.c
23 * @brief two-peer set intersection
24 * @author Christian Fuchs
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
35 #define CALCULATE_BF_SIZE(A, B, s, k) \
37 k = ceil(1 + log2((double) (2*B / (double) A)));\
38 s = ceil((double) (A * k / log(2))); \
42 * Current phase we are in for a intersection operation.
44 enum IntersectionOperationPhase
47 * Alices has suggested an operation to bob,
48 * and is waiting for a bf or session end.
52 * Bob has accepted the operation, Bob and Alice are now exchanging bfs
53 * until one notices the their element count is equal
57 * if both peers have an equal peercount, they enter this state for
58 * one more turn, to see if they actually have agreed on a correct set.
59 * if a peer finds the same element count after the next iteration,
60 * it ends the the session
64 * The protocol is over.
65 * Results may still have to be sent to the client.
72 * State of an evaluate operation
78 * The bf we currently receive
80 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
83 * BF of the set's element.
85 struct GNUNET_CONTAINER_BloomFilter *local_bf;
88 * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
93 * size of the bloomfilter
95 uint32_t bf_data_size;
98 * size of the bloomfilter
100 uint32_t bf_bits_per_element;
103 * Current state of the operation.
105 enum IntersectionOperationPhase phase;
108 * Generation in which the operation handle
111 unsigned int generation_created;
114 * Maps element-id-hashes to 'elements in our set'.
116 struct GNUNET_CONTAINER_MultiHashMap *my_elements;
119 * Current element count contained within contained_elements
121 uint32_t my_element_count;
124 * Iterator for sending elements on the key to element mapping to the client.
126 struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
129 * Evaluate operations are held in
132 struct OperationState *next;
135 * Evaluate operations are held in
138 struct OperationState *prev;
141 * Did we send the client that we are done?
143 int client_done_sent;
148 * Extra state required for efficient set intersection.
153 * Number of currently valid elements in the set which have not been removed
155 uint32_t current_set_element_count;
160 * Send a result message to the client indicating
161 * we removed an element
163 * @param op union operation
164 * @param element element to send
167 send_client_element (struct Operation *op,
168 struct GNUNET_SET_Element *element)
170 struct GNUNET_MQ_Envelope *ev;
171 struct GNUNET_SET_ResultMessage *rm;
173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending removed element (size %u) to client\n", element->size);
174 GNUNET_assert (0 != op->spec->client_request_id);
175 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
178 GNUNET_MQ_discard (ev);
182 rm->result_status = htons (GNUNET_SET_STATUS_OK);
183 rm->request_id = htonl (op->spec->client_request_id);
184 rm->element_type = element->type;
185 memcpy (&rm[1], element->data, element->size);
186 GNUNET_MQ_send (op->spec->set->client_mq, ev);
193 * fills the contained-elements hashmap with all relevant
194 * elements and adds their mutated hashes to our local bloomfilter with mutator+1
197 * @param key current key code
198 * @param value value in the hash map
199 * @return #GNUNET_YES if we should continue to
204 iterator_initialization_by_alice (void *cls,
205 const struct GNUNET_HashCode *key,
208 struct ElementEntry *ee = value;
209 struct Operation *op = cls;
210 struct GNUNET_HashCode mutated_hash;
212 //only consider this element, if it is valid for us
213 if ((op->generation_created < ee->generation_removed)
214 && (op->generation_created >= ee->generation_added))
217 // not contained according to bob's bloomfilter
218 GNUNET_BLOCK_mingle_hash(&ee->element_hash,
221 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
223 if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
224 send_client_element (op, &ee->element);
228 op->state->my_element_count++;
229 GNUNET_assert (GNUNET_YES ==
230 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
231 &ee->element_hash, ee,
232 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
238 * fills the contained-elements hashmap with all relevant
239 * elements and adds their mutated hashes to our local bloomfilter
242 * @param key current key code
243 * @param value value in the hash map
244 * @return #GNUNET_YES if we should continue to
249 iterator_initialization (void *cls,
250 const struct GNUNET_HashCode *key,
253 struct ElementEntry *ee = value;
254 struct Operation *op = cls;
256 //only consider this element, if it is valid for us
257 if ((op->generation_created < ee->generation_removed)
258 && (op->generation_created >= ee->generation_added))
261 GNUNET_assert (GNUNET_YES ==
262 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
263 &ee->element_hash, ee,
264 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
270 * removes element from a hashmap if it is not contained within the
271 * provided remote bloomfilter. Then, fill our new bloomfilter.
274 * @param key current key code
275 * @param value value in the hash map
276 * @return #GNUNET_YES if we should continue to
281 iterator_bf_reduce (void *cls,
282 const struct GNUNET_HashCode *key,
285 struct ElementEntry *ee = value;
286 struct Operation *op = cls;
287 struct GNUNET_HashCode mutated_hash;
289 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
292 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
295 op->state->my_element_count--;
296 GNUNET_assert (GNUNET_YES ==
297 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
300 if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode)
301 send_client_element (op, &ee->element);
308 * create a bloomfilter based on the elements given
311 * @param key current key code
312 * @param value value in the hash map
313 * @return #GNUNET_YES if we should continue to
318 iterator_bf_create (void *cls,
319 const struct GNUNET_HashCode *key,
322 struct ElementEntry *ee = value;
323 struct Operation *op = cls;
324 struct GNUNET_HashCode mutated_hash;
326 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
328 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
334 * Inform the client that the union operation has failed,
335 * and proceed to destroy the evaluate operation.
337 * @param op the intersection operation to fail
340 fail_intersection_operation (struct Operation *op)
342 struct GNUNET_MQ_Envelope *ev;
343 struct GNUNET_SET_ResultMessage *msg;
345 if (op->state->my_elements)
346 GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
348 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
350 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
351 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
352 msg->request_id = htonl (op->spec->client_request_id);
353 msg->element_type = htons (0);
354 GNUNET_MQ_send (op->spec->set->client_mq, ev);
355 _GSS_operation_destroy (op);
360 * Send a request for the evaluate operation to a remote peer
362 * @param op operation with the other peer
365 send_operation_request (struct Operation *op)
367 struct GNUNET_MQ_Envelope *ev;
368 struct OperationRequestMessage *msg;
370 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
371 op->spec->context_msg);
375 /* the context message is too large */
377 GNUNET_SERVER_client_disconnect (op->spec->set->client);
380 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
381 msg->app_id = op->spec->app_id;
382 msg->salt = htonl (op->spec->salt);
383 msg->element_count = htonl(op->state->my_element_count);
385 GNUNET_MQ_send (op->mq, ev);
387 if (NULL != op->spec->context_msg)
388 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
390 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
392 if (NULL != op->spec->context_msg)
394 GNUNET_free (op->spec->context_msg);
395 op->spec->context_msg = NULL;
400 send_bloomfilter_multipart (struct Operation *op, uint32_t offset)
402 struct GNUNET_MQ_Envelope *ev;
404 uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
405 uint32_t todo_size = op->state->bf_data_size - offset;
407 if (todo_size < chunk_size)
408 chunk_size = todo_size;
410 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
412 msg->chunk_length = htonl (chunk_size);
413 msg->chunk_offset = htonl (offset);
414 memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
416 GNUNET_MQ_send (op->mq, ev);
418 if (op->state->bf_data_size == offset + chunk_size)
421 GNUNET_free(op->state->bf_data);
422 op->state->bf_data = NULL;
426 send_bloomfilter_multipart (op, offset + chunk_size);
430 * Send a bloomfilter to our peer.
431 * that the operation is over.
432 * After the result done message has been sent to the client,
433 * destroy the evaluate operation.
435 * @param op intersection operation
438 send_bloomfilter (struct Operation *op)
440 struct GNUNET_MQ_Envelope *ev;
441 struct BFMessage *msg;
443 uint32_t bf_elementbits;
445 struct GNUNET_CONTAINER_BloomFilter * local_bf;
447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
449 CALCULATE_BF_SIZE(op->state->my_element_count,
450 op->spec->remote_element_count,
454 local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
459 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
463 // send our bloomfilter
464 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) {
466 chunk_size = bf_size;
467 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
468 GNUNET_assert (GNUNET_SYSERR !=
469 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
475 chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
476 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
477 op->state->bf_data = (char *) GNUNET_malloc (bf_size);
478 GNUNET_assert (GNUNET_SYSERR !=
479 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
482 memcpy (&msg[1], op->state->bf_data, chunk_size);
483 op->state->bf_data_size = bf_size;
485 GNUNET_CONTAINER_bloomfilter_free (local_bf);
487 msg->sender_element_count = htonl (op->state->my_element_count);
488 msg->bloomfilter_total_length = htonl (bf_size);
489 msg->bloomfilter_length = htonl (chunk_size);
490 msg->bits_per_element = htonl (bf_elementbits);
491 msg->sender_mutator = htonl (op->spec->salt);
493 GNUNET_MQ_send (op->mq, ev);
495 if (op->state->bf_data)
496 send_bloomfilter_multipart (op, chunk_size);
501 * Signal to the client that the operation has finished and
502 * destroy the operation.
504 * @param cls operation to destroy
507 send_client_done_and_destroy (void *cls)
509 struct Operation *op = cls;
510 struct GNUNET_MQ_Envelope *ev;
511 struct GNUNET_SET_ResultMessage *rm;
512 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
513 rm->request_id = htonl (op->spec->client_request_id);
514 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
515 rm->element_type = htons (0);
516 GNUNET_MQ_send (op->spec->set->client_mq, ev);
517 _GSS_operation_destroy (op);
522 * Send all elements in the full result iterator.
524 * @param cls operation
527 send_remaining_elements (void *cls)
529 struct Operation *op = cls;
530 struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
531 struct GNUNET_MQ_Envelope *ev;
532 struct GNUNET_SET_ResultMessage *rm;
533 struct GNUNET_SET_Element *element;
536 res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
537 if (GNUNET_NO == res) {
538 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
539 send_client_done_and_destroy (op);
543 element = &remaining->element;
544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
545 GNUNET_assert (0 != op->spec->client_request_id);
547 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
548 GNUNET_assert (NULL != ev);
550 rm->result_status = htons (GNUNET_SET_STATUS_OK);
551 rm->request_id = htonl (op->spec->client_request_id);
552 rm->element_type = element->type;
553 memcpy (&rm[1], element->data, element->size);
555 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
556 GNUNET_MQ_send (op->spec->set->client_mq, ev);
561 * Inform the peer that this operation is complete.
563 * @param op the intersection operation to fail
566 send_peer_done (struct Operation *op)
568 struct GNUNET_MQ_Envelope *ev;
570 op->state->phase = PHASE_FINISHED;
571 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
572 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
573 op->state->local_bf = NULL;
575 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
576 GNUNET_MQ_send (op->mq, ev);
581 * Process a Bloomfilter once we got all the chunks
583 * @param op the intersection operation
586 process_bf (struct Operation *op){
587 uint32_t old_elements;
588 uint32_t peer_elements;
590 old_elements = op->state->my_element_count;
591 peer_elements = op->spec->remote_element_count;
592 switch (op->state->phase)
595 // If we are ot our first msg
596 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
598 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
599 &iterator_initialization_by_alice,
602 case PHASE_BF_EXCHANGE:
603 case PHASE_MAYBE_FINISHED:
604 // if we are bob or alice and are continuing operation
605 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
611 fail_intersection_operation(op);
613 // the iterators created a new BF with salt+1
614 // the peer needs this information for decoding the next BF
615 // this behavior can be modified at will later on.
618 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
619 op->state->remote_bf = NULL;
621 if ((0 == op->state->my_element_count) // fully disjoint
622 || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements
623 && (old_elements == op->state->my_element_count)
624 && (op->state->my_element_count == peer_elements))) {
625 // In the last round we though we were finished, we now know this is correct
630 op->state->phase = PHASE_BF_EXCHANGE;
631 if (op->state->my_element_count == peer_elements)
632 // maybe we are finished, but we do one more round to make certain
633 // we don't have false positives ...
634 op->state->phase = PHASE_MAYBE_FINISHED;
636 send_bloomfilter (op);
641 * Handle an BF multipart message from a remote peer.
643 * @param cls the intersection operation
644 * @param mh the header of the message
647 handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
649 struct Operation *op = cls;
650 const struct BFPart *msg = (const struct BFPart *) mh;
652 uint32_t chunk_offset;
654 chunk_size = ntohl(msg->chunk_length);
655 chunk_offset = ntohl(msg->chunk_offset);
657 if ((NULL == op->state->bf_data)
658 || (op->state->bf_data_size < chunk_size + chunk_offset)){
659 // unexpected multipart chunk
661 fail_intersection_operation(op);
665 memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
667 if (op->state->bf_data_size != chunk_offset + chunk_size)
668 // wait for next chunk
671 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
672 op->state->bf_data_size,
673 op->state->bf_bits_per_element);
675 GNUNET_free (op->state->bf_data);
676 op->state->bf_data = NULL;
683 * Handle an BF message from a remote peer.
685 * @param cls the intersection operation
686 * @param mh the header of the message
689 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
691 struct Operation *op = cls;
692 const struct BFMessage *msg = (const struct BFMessage *) mh;
695 uint32_t bf_bits_per_element;
697 switch (op->state->phase)
700 case PHASE_BF_EXCHANGE:
701 case PHASE_MAYBE_FINISHED:
702 if (NULL == op->state->bf_data) {
703 // no colliding multipart transaction going on currently
704 op->spec->salt = ntohl (msg->sender_mutator);
705 bf_size = ntohl (msg->bloomfilter_total_length);
706 bf_bits_per_element = ntohl (msg->bits_per_element);
707 chunk_size = ntohl (msg->bloomfilter_length);
708 op->spec->remote_element_count = ntohl(msg->sender_element_count);
709 if (bf_size == chunk_size) {
710 // single part, done here
711 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
713 bf_bits_per_element);
718 //first multipart chunk
719 op->state->bf_data = GNUNET_malloc (bf_size);
720 op->state->bf_data_size = bf_size;
721 op->state->bf_bits_per_element = bf_bits_per_element;
722 memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
727 fail_intersection_operation (op);
733 * Handle an BF message from a remote peer.
735 * @param cls the intersection operation
736 * @param mh the header of the message
739 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
741 struct Operation *op = cls;
742 struct BFMessage *msg = (struct BFMessage *) mh;
744 op->spec->remote_element_count = ntohl(msg->sender_element_count);
745 if ((op->state->phase != PHASE_INITIAL)
746 || (op->state->my_element_count > op->spec->remote_element_count)){
748 fail_intersection_operation(op);
751 op->state->phase = PHASE_BF_EXCHANGE;
752 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
754 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
755 &iterator_initialization,
758 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
759 op->state->remote_bf = NULL;
761 if (op->state->my_element_count == ntohl (msg->sender_element_count))
762 op->state->phase = PHASE_MAYBE_FINISHED;
764 send_bloomfilter (op);
769 * Send our element to the peer, in case our element count is lower than his
771 * @param op intersection operation
774 send_element_count (struct Operation *op)
776 struct GNUNET_MQ_Envelope *ev;
777 struct BFMessage *msg;
779 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
781 // just send our element count, as the other peer must start
782 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
783 msg->sender_element_count = htonl (op->state->my_element_count);
784 msg->bloomfilter_length = htonl (0);
785 msg->sender_mutator = htonl (0);
787 GNUNET_MQ_send (op->mq, ev);
792 * Send a result message to the client indicating
793 * that the operation is over.
794 * After the result done message has been sent to the client,
795 * destroy the evaluate operation.
797 * @param op intersection operation
800 finish_and_destroy (struct Operation *op)
802 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
804 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
807 op->state->full_result_iter =
808 GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
809 send_remaining_elements (op);
812 send_client_done_and_destroy (op);
817 * Handle a done message from a remote peer
819 * @param cls the union operation
820 * @param mh the message
823 handle_p2p_done (void *cls,
824 const struct GNUNET_MessageHeader *mh)
826 struct Operation *op = cls;
828 if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
829 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
831 finish_and_destroy (op);
836 fail_intersection_operation (op);
841 * Evaluate a union operation with
844 * @param op operation to evaluate
847 intersection_evaluate (struct Operation *op)
849 op->state = GNUNET_new (struct OperationState);
850 /* we started the operation, thus we have to send the operation request */
851 op->state->phase = PHASE_INITIAL;
852 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
853 op->state->my_element_count = op->spec->set->state->current_set_element_count;
855 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856 "evaluating intersection operation");
857 send_operation_request (op);
862 * Accept an union operation request from a remote peer.
863 * Only initializes the private operation state.
865 * @param op operation that will be accepted as a union operation
868 intersection_accept (struct Operation *op)
870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
871 op->state = GNUNET_new (struct OperationState);
872 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
873 op->state->my_element_count = op->spec->set->state->current_set_element_count;
875 // if Alice (the peer) has more elements than Bob (us), she should start
876 if (op->spec->remote_element_count < op->state->my_element_count){
877 op->state->phase = PHASE_INITIAL;
878 send_element_count(op);
881 // create a new bloomfilter in case we have fewer elements
882 op->state->phase = PHASE_BF_EXCHANGE;
883 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
885 GNUNET_CONSTANTS_BLOOMFILTER_K);
886 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
887 &iterator_initialization,
889 send_bloomfilter (op);
894 * Create a new set supporting the intersection operation
896 * @return the newly created set
898 static struct SetState *
899 intersection_set_create ()
901 struct SetState *set_state;
903 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904 "intersection set created\n");
905 set_state = GNUNET_new (struct SetState);
906 set_state->current_set_element_count = 0;
913 * Add the element from the given element message to the set.
915 * @param set_state state of the set want to add to
916 * @param ee the element to add to the set
919 intersection_add (struct SetState *set_state,
920 struct ElementEntry *ee)
922 set_state->current_set_element_count++;
927 * Destroy a set that supports the intersection operation
929 * @param set_state the set to destroy
932 intersection_set_destroy (struct SetState *set_state)
934 GNUNET_free (set_state);
939 * Remove the element given in the element message from the set.
941 * @param set_state state of the set to remove from
942 * @param element set element to remove
945 intersection_remove (struct SetState *set_state,
946 struct ElementEntry *element)
948 GNUNET_assert(0 < set_state->current_set_element_count);
949 set_state->current_set_element_count--;
954 * Dispatch messages for a intersection operation.
956 * @param op the state of the intersection evaluate operation
957 * @param mh the received message
958 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
959 * #GNUNET_OK otherwise
962 intersection_handle_p2p_message (struct Operation *op,
963 const struct GNUNET_MessageHeader *mh)
965 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
966 ntohs (mh->type), ntohs (mh->size));
967 switch (ntohs (mh->type))
969 /* this message handler is not active until after we received an
970 * operation request message, thus the ops request is not handled here
972 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
973 handle_p2p_element_info (op, mh);
975 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
976 handle_p2p_bf (op, mh);
978 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART:
979 handle_p2p_bf_part (op, mh);
981 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
982 handle_p2p_done (op, mh);
985 /* something wrong with mesh's message handlers? */
993 * handler for peer-disconnects, notifies the client about the aborted operation
995 * @param op the destroyed operation
998 intersection_peer_disconnect (struct Operation *op)
1000 if (PHASE_FINISHED != op->state->phase)
1002 struct GNUNET_MQ_Envelope *ev;
1003 struct GNUNET_SET_ResultMessage *msg;
1005 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1006 msg->request_id = htonl (op->spec->client_request_id);
1007 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1008 msg->element_type = htons (0);
1009 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1010 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1011 _GSS_operation_destroy (op);
1014 // else: the session has already been concluded
1015 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1016 if (GNUNET_NO == op->state->client_done_sent)
1017 finish_and_destroy (op);
1022 * Destroy the union operation. Only things specific to the union operation are destroyed.
1024 * @param op union operation to destroy
1027 intersection_op_cancel (struct Operation *op)
1029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
1030 /* check if the op was canceled twice */
1031 GNUNET_assert (NULL != op->state);
1032 if (NULL != op->state->remote_bf)
1034 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1035 op->state->remote_bf = NULL;
1037 if (NULL != op->state->local_bf)
1039 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1040 op->state->local_bf = NULL;
1042 if (NULL != op->state->my_elements)
1044 // no need to free the elements, they are still part of the set
1045 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1046 op->state->my_elements = NULL;
1048 GNUNET_free (op->state);
1050 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
1053 const struct SetVT *
1054 _GSS_intersection_vt ()
1056 static const struct SetVT intersection_vt = {
1057 .create = &intersection_set_create,
1058 .msg_handler = &intersection_handle_p2p_message,
1059 .add = &intersection_add,
1060 .remove = &intersection_remove,
1061 .destroy_set = &intersection_set_destroy,
1062 .evaluate = &intersection_evaluate,
1063 .accept = &intersection_accept,
1064 .peer_disconnect = &intersection_peer_disconnect,
1065 .cancel = &intersection_op_cancel,
1068 return &intersection_vt;