2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set_intersection.c
23 * @brief two-peer set intersection
24 * @author Christian Fuchs
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "gnunet_block_lib.h"
30 #include "set_protocol.h"
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
35 * Current phase we are in for a intersection operation.
37 enum IntersectionOperationPhase
40 * Alices has suggested an operation to bob,
41 * and is waiting for a bf or session end.
45 * Bob has accepted the operation, Bob and Alice are now exchanging bfs
46 * until one notices the their element count is equal
50 * if both peers have an equal peercount, they enter this state for
51 * one more turn, to see if they actually have agreed on a correct set.
52 * if a peer finds the same element count after the next iteration,
53 * it ends the the session
57 * The protocol is over.
58 * Results may still have to be sent to the client.
65 * State of an evaluate operation
71 * The bf we currently receive
73 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
76 * BF of the set's element.
78 struct GNUNET_CONTAINER_BloomFilter *local_bf;
81 * Current state of the operation.
83 enum IntersectionOperationPhase phase;
86 * Generation in which the operation handle
89 unsigned int generation_created;
92 * Maps element-id-hashes to 'elements in our set'.
94 struct GNUNET_CONTAINER_MultiHashMap *my_elements;
97 * Current element count contained within contained_elements
99 uint32_t my_element_count;
102 * Iterator for sending elements on the key to element mapping to the client.
104 struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
107 * Evaluate operations are held in
110 struct OperationState *next;
113 * Evaluate operations are held in
116 struct OperationState *prev;
119 * Did we send the client that we are done?
121 int client_done_sent;
126 * Extra state required for efficient set intersection.
131 * Number of currently valid elements in the set which have not been removed
133 uint32_t current_set_element_count;
140 * fills the contained-elements hashmap with all relevant
141 * elements and adds their mutated hashes to our local bloomfilter with mutator+1
144 * @param key current key code
145 * @param value value in the hash map
146 * @return #GNUNET_YES if we should continue to
151 iterator_initialization_by_alice (void *cls,
152 const struct GNUNET_HashCode *key,
155 struct ElementEntry *ee = value;
156 struct Operation *op = cls;
157 struct GNUNET_HashCode mutated_hash;
159 //only consider this element, if it is valid for us
160 if ((op->generation_created >= ee->generation_removed)
161 || (op->generation_created < ee->generation_added))
164 // not contained according to bob's bloomfilter
165 GNUNET_BLOCK_mingle_hash(&ee->element_hash,
168 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
172 op->state->my_element_count++;
173 GNUNET_assert (GNUNET_YES ==
174 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
175 &ee->element_hash, ee,
176 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
178 /* create our own bloomfilter with salt+1 */
179 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
182 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
189 * fills the contained-elements hashmap with all relevant
190 * elements and adds their mutated hashes to our local bloomfilter
193 * @param key current key code
194 * @param value value in the hash map
195 * @return #GNUNET_YES if we should continue to
200 iterator_initialization (void *cls,
201 const struct GNUNET_HashCode *key,
204 struct ElementEntry *ee = value;
205 struct Operation *op = cls;
206 struct GNUNET_HashCode mutated_hash;
208 //only consider this element, if it is valid for us
209 if ((op->generation_created >= ee->generation_removed)
210 || (op->generation_created < ee->generation_added))
213 GNUNET_assert (GNUNET_YES ==
214 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
215 &ee->element_hash, ee,
216 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
217 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
220 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
227 * removes element from a hashmap if it is not contained within the
228 * provided remote bloomfilter. Then, fill our new bloomfilter.
231 * @param key current key code
232 * @param value value in the hash map
233 * @return #GNUNET_YES if we should continue to
238 iterator_bf_round (void *cls,
239 const struct GNUNET_HashCode *key,
242 struct ElementEntry *ee = value;
243 struct Operation *op = cls;
244 struct GNUNET_HashCode mutated_hash;
246 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
249 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
252 op->state->my_element_count--;
253 GNUNET_assert (GNUNET_YES ==
254 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
259 GNUNET_BLOCK_mingle_hash(&ee->element_hash,
262 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
269 * Inform the client that the union operation has failed,
270 * and proceed to destroy the evaluate operation.
272 * @param op the intersection operation to fail
275 fail_intersection_operation (struct Operation *op)
277 struct GNUNET_MQ_Envelope *ev;
278 struct GNUNET_SET_ResultMessage *msg;
280 if (op->state->my_elements)
281 GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
283 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
285 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
286 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
287 msg->request_id = htonl (op->spec->client_request_id);
288 msg->element_type = htons (0);
289 GNUNET_MQ_send (op->spec->set->client_mq, ev);
290 _GSS_operation_destroy (op);
295 * Send a request for the evaluate operation to a remote peer
297 * @param eo operation with the other peer
300 send_operation_request (struct Operation *op)
302 struct GNUNET_MQ_Envelope *ev;
303 struct OperationRequestMessage *msg;
305 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
306 op->spec->context_msg);
310 /* the context message is too large */
312 GNUNET_SERVER_client_disconnect (op->spec->set->client);
315 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
316 msg->app_id = op->spec->app_id;
317 msg->salt = htonl (op->spec->salt);
318 msg->element_count = htonl(op->state->my_element_count);
320 GNUNET_MQ_send (op->mq, ev);
322 if (NULL != op->spec->context_msg)
323 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
327 if (NULL != op->spec->context_msg)
329 GNUNET_free (op->spec->context_msg);
330 op->spec->context_msg = NULL;
336 * Send a bloomfilter to our peer.
337 * that the operation is over.
338 * After the result done message has been sent to the client,
339 * destroy the evaluate operation.
341 * @param eo intersection operation
344 send_bloomfilter (struct Operation *op)
346 struct GNUNET_MQ_Envelope *ev;
347 struct BFMessage *msg;
350 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
352 // send our bloomfilter
353 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (op->state->local_bf);
355 ev = GNUNET_MQ_msg_extra (msg, bf_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
357 msg->sender_element_count = htonl (op->state->my_element_count);
358 msg->bloomfilter_length = htonl (bf_size);
359 msg->sender_mutator = htonl (op->spec->salt);
360 GNUNET_assert (GNUNET_SYSERR !=
361 GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
364 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
365 op->state->local_bf = NULL;
366 GNUNET_MQ_send (op->mq, ev);
371 * Signal to the client that the operation has finished and
372 * destroy the operation.
374 * @param cls operation to destroy
377 send_client_done_and_destroy (void *cls)
379 struct Operation *op = cls;
380 struct GNUNET_MQ_Envelope *ev;
381 struct GNUNET_SET_ResultMessage *rm;
382 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
383 rm->request_id = htonl (op->spec->client_request_id);
384 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
385 rm->element_type = htons (0);
386 GNUNET_MQ_send (op->spec->set->client_mq, ev);
387 _GSS_operation_destroy (op);
392 * Send all elements in the full result iterator.
394 * @param cls operation
397 send_remaining_elements (void *cls)
399 struct Operation *op = cls;
400 struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
401 struct GNUNET_MQ_Envelope *ev;
402 struct GNUNET_SET_ResultMessage *rm;
403 struct GNUNET_SET_Element *element;
406 res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
407 if (GNUNET_NO == res) {
408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
409 send_client_done_and_destroy (op);
413 element = &remaining->element;
414 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
415 GNUNET_assert (0 != op->spec->client_request_id);
417 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
418 GNUNET_assert (NULL != ev);
420 rm->result_status = htons (GNUNET_SET_STATUS_OK);
421 rm->request_id = htonl (op->spec->client_request_id);
422 rm->element_type = element->type;
423 memcpy (&rm[1], element->data, element->size);
425 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
426 GNUNET_MQ_send (op->spec->set->client_mq, ev);
431 * Inform the peer that this operation is complete.
433 * @param eo the intersection operation to fail
436 send_peer_done (struct Operation *op)
438 struct GNUNET_MQ_Envelope *ev;
440 op->state->phase = PHASE_FINISHED;
441 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
442 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
443 op->state->local_bf = NULL;
445 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
446 GNUNET_MQ_send (op->mq, ev);
451 * Handle an BF message from a remote peer.
453 * @param cls the intersection operation
454 * @param mh the header of the message
457 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
459 struct Operation *op = cls;
460 const struct BFMessage *msg = (const struct BFMessage *) mh;
461 uint32_t old_elements;
462 uint32_t peer_elements;
464 old_elements = op->state->my_element_count;
465 op->spec->salt = ntohl (msg->sender_mutator);
467 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
469 ntohl (msg->bloomfilter_length));
470 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
472 GNUNET_CONSTANTS_BLOOMFILTER_K);
473 switch (op->state->phase)
476 // If we are ot our first msg
477 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
479 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
480 &iterator_initialization_by_alice,
483 case PHASE_BF_EXCHANGE:
484 case PHASE_MAYBE_FINISHED:
485 // if we are bob or alice and are continuing operation
486 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
492 fail_intersection_operation(op);
494 // the iterators created a new BF with salt+1
495 // the peer needs this information for decoding the next BF
496 // this behavior can be modified at will later on.
499 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
500 op->state->remote_bf = NULL;
502 peer_elements = ntohl(msg->sender_element_count);
503 if ((op->state->phase == PHASE_MAYBE_FINISHED)
504 && (old_elements == op->state->my_element_count)
505 && (op->state->my_element_count == peer_elements)){
506 // In the last round we though we were finished, we now know this is correct
511 op->state->phase = PHASE_BF_EXCHANGE;
512 // maybe we are finished, but we do one more round to make certain
513 // we don't have false positives ...
514 if (op->state->my_element_count == peer_elements)
515 op->state->phase = PHASE_MAYBE_FINISHED;
517 send_bloomfilter (op);
522 * Handle an BF message from a remote peer.
524 * @param cls the intersection operation
525 * @param mh the header of the message
528 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
530 struct Operation *op = cls;
531 struct BFMessage *msg = (struct BFMessage *) mh;
533 op->spec->remote_element_count = ntohl(msg->sender_element_count);
534 if ((op->state->phase != PHASE_INITIAL)
535 || (op->state->my_element_count > op->spec->remote_element_count)){
537 fail_intersection_operation(op);
540 op->state->phase = PHASE_BF_EXCHANGE;
541 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
543 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
545 GNUNET_CONSTANTS_BLOOMFILTER_K);
546 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
547 &iterator_initialization,
550 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
551 op->state->remote_bf = NULL;
553 if (op->state->my_element_count == ntohl (msg->sender_element_count))
554 op->state->phase = PHASE_MAYBE_FINISHED;
556 send_bloomfilter (op);
561 * Send our element to the peer, in case our element count is lower than his
563 * @param eo intersection operation
566 send_element_count (struct Operation *op)
568 struct GNUNET_MQ_Envelope *ev;
569 struct BFMessage *msg;
571 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
573 // just send our element count, as the other peer must start
574 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
576 msg->sender_element_count = htonl (op->state->my_element_count);
577 msg->bloomfilter_length = htonl (0);
578 msg->sender_mutator = htonl (0);
580 GNUNET_MQ_send (op->mq, ev);
585 * Send a result message to the client indicating
586 * that the operation is over.
587 * After the result done message has been sent to the client,
588 * destroy the evaluate operation.
590 * @param op intersection operation
593 finish_and_destroy (struct Operation *op)
595 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
597 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
599 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
600 op->state->full_result_iter =
601 GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
602 send_remaining_elements (op);
605 send_client_done_and_destroy (op);
610 * Handle a done message from a remote peer
612 * @param cls the union operation
613 * @param mh the message
616 handle_p2p_done (void *cls,
617 const struct GNUNET_MessageHeader *mh)
619 struct Operation *op = cls;
621 if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
624 finish_and_destroy (op);
629 fail_intersection_operation (op);
634 * Evaluate a union operation with
637 * @param op operation to evaluate
640 intersection_evaluate (struct Operation *op)
642 op->state = GNUNET_new (struct OperationState);
643 /* we started the operation, thus we have to send the operation request */
644 op->state->phase = PHASE_INITIAL;
645 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
646 op->state->my_element_count = op->spec->set->state->current_set_element_count;
648 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
649 "evaluating intersection operation");
650 send_operation_request (op);
655 * Accept an union operation request from a remote peer.
656 * Only initializes the private operation state.
658 * @param op operation that will be accepted as a union operation
661 intersection_accept (struct Operation *op)
663 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
664 op->state = GNUNET_new (struct OperationState);
665 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
666 op->state->my_element_count = op->spec->set->state->current_set_element_count;
668 // if Alice (the peer) has more elements than Bob (us), she should start
669 if (op->spec->remote_element_count < op->state->my_element_count){
670 op->state->phase = PHASE_INITIAL;
671 send_element_count(op);
674 // create a new bloomfilter in case we have fewer elements
675 op->state->phase = PHASE_BF_EXCHANGE;
676 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
678 GNUNET_CONSTANTS_BLOOMFILTER_K);
679 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
680 &iterator_initialization,
682 send_bloomfilter (op);
687 * Create a new set supporting the intersection operation
689 * @return the newly created set
691 static struct SetState *
692 intersection_set_create ()
694 struct SetState *set_state;
696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
697 "intersection set created\n");
698 set_state = GNUNET_new (struct SetState);
699 set_state->current_set_element_count = 0;
706 * Add the element from the given element message to the set.
708 * @param set_state state of the set want to add to
709 * @param ee the element to add to the set
712 intersection_add (struct SetState *set_state,
713 struct ElementEntry *ee)
715 GNUNET_assert(0 < set_state->current_set_element_count);
716 set_state->current_set_element_count++;
721 * Destroy a set that supports the intersection operation
723 * @param set_state the set to destroy
726 intersection_set_destroy (struct SetState *set_state)
728 GNUNET_free (set_state);
733 * Remove the element given in the element message from the set.
735 * @param set_state state of the set to remove from
736 * @param element set element to remove
739 intersection_remove (struct SetState *set_state,
740 struct ElementEntry *element)
742 GNUNET_assert(0 < set_state->current_set_element_count);
743 set_state->current_set_element_count--;
748 * Dispatch messages for a intersection operation.
750 * @param eo the state of the intersection evaluate operation
751 * @param mh the received message
752 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
753 * #GNUNET_OK otherwise
756 intersection_handle_p2p_message (struct Operation *op,
757 const struct GNUNET_MessageHeader *mh)
759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
760 ntohs (mh->type), ntohs (mh->size));
761 switch (ntohs (mh->type))
763 /* this message handler is not active until after we received an
764 * operation request message, thus the ops request is not handled here
766 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
767 handle_p2p_element_info (op, mh);
769 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
770 handle_p2p_bf (op, mh);
772 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
773 handle_p2p_done (op, mh);
776 /* something wrong with mesh's message handlers? */
784 * handler for peer-disconnects, notifies the client about the aborted operation
786 * @param op the destroyed operation
789 intersection_peer_disconnect (struct Operation *op)
791 if (PHASE_FINISHED != op->state->phase)
793 struct GNUNET_MQ_Envelope *ev;
794 struct GNUNET_SET_ResultMessage *msg;
796 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
797 msg->request_id = htonl (op->spec->client_request_id);
798 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
799 msg->element_type = htons (0);
800 GNUNET_MQ_send (op->spec->set->client_mq, ev);
801 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
802 _GSS_operation_destroy (op);
805 // else: the session has already been concluded
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
807 if (GNUNET_NO == op->state->client_done_sent)
808 finish_and_destroy (op);
813 * Destroy the union operation. Only things specific to the union operation are destroyed.
815 * @param op union operation to destroy
818 intersection_op_cancel (struct Operation *op)
820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
821 /* check if the op was canceled twice */
822 GNUNET_assert (NULL != op->state);
823 if (NULL != op->state->remote_bf)
825 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
826 op->state->remote_bf = NULL;
828 if (NULL != op->state->local_bf)
830 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
831 op->state->local_bf = NULL;
833 if (NULL != op->state->my_elements)
835 // no need to free the elements, they are still part of the set
836 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
837 op->state->my_elements = NULL;
839 GNUNET_free (op->state);
841 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
845 _GSS_intersection_vt ()
847 static const struct SetVT intersection_vt = {
848 .create = &intersection_set_create,
849 .msg_handler = &intersection_handle_p2p_message,
850 .add = &intersection_add,
851 .remove = &intersection_remove,
852 .destroy_set = &intersection_set_destroy,
853 .evaluate = &intersection_evaluate,
854 .accept = &intersection_accept,
855 .peer_disconnect = &intersection_peer_disconnect,
856 .cancel = &intersection_op_cancel,
859 return &intersection_vt;