2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set_intersection.c
23 * @brief two-peer set intersection
24 * @author Christian Fuchs
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "strata_estimator.h"
30 #include "set_protocol.h"
33 #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
35 * Current phase we are in for a intersection operation.
37 enum IntersectionOperationPhase
40 * Alices has suggested an operation to bob,
41 * and is waiting for a bf or session end.
45 * Bob has accepted the operation, Bob and Alice are now exchanging bfs
46 * until one notices the their element count is equal
50 * if both peers have an equal peercount, they enter this state for
51 * one more turn, to see if they actually have agreed on a correct set.
52 * if a peer finds the same element count after the next iteration,
53 * it ends the the session
57 * The protocol is over.
58 * Results may still have to be sent to the client.
65 * State of an evaluate operation
71 * The bf we currently receive
73 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
76 * BF of the set's element.
78 struct GNUNET_CONTAINER_BloomFilter *local_bf;
81 * Current state of the operation.
83 enum IntersectionOperationPhase phase;
86 * Generation in which the operation handle
89 unsigned int generation_created;
92 * Maps element-id-hashes to 'elements in our set'.
94 struct GNUNET_CONTAINER_MultiHashMap *my_elements;
97 * Current element count contained within contained_elements
99 uint32_t my_elements_count;
102 * Iterator for sending elements on the key to element mapping to the client.
104 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
107 * Evaluate operations are held in
110 struct OperationState *next;
113 * Evaluate operations are held in
116 struct OperationState *prev;
119 * Did we send the client that we are done?
121 int client_done_sent;
128 * fills the contained-elements hashmap with all relevant
129 * elements and adds their mutated hashes to our local bloomfilter with mutator+1
132 * @param key current key code
133 * @param value value in the hash map
134 * @return #GNUNET_YES if we should continue to
139 iterator_initialization_by_alice (void *cls,
140 const struct GNUNET_HashCode *key,
142 struct ElementEntry *ee = value;
143 struct Operation *op = cls;
144 struct GNUNET_HashCode mutated_hash;
146 //only consider this element, if it is valid for us
147 if ((op->generation_created >= ee->generation_removed)
148 || (op->generation_created < ee->generation_added))
151 // not contained according to bob's bloomfilter
152 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
153 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
157 op->state->my_elements_count++;
158 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
159 &ee->element_hash, ee,
160 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
162 // create our own bloomfilter with salt+1
163 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt+1, &mutated_hash);
164 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
171 * fills the contained-elements hashmap with all relevant
172 * elements and adds their mutated hashes to our local bloomfilter
175 * @param key current key code
176 * @param value value in the hash map
177 * @return #GNUNET_YES if we should continue to
182 iterator_initialization (void *cls,
183 const struct GNUNET_HashCode *key,
185 struct ElementEntry *ee = value;
186 struct Operation *op = cls;
187 struct GNUNET_HashCode mutated_hash;
189 //only consider this element, if it is valid for us
190 if ((op->generation_created >= ee->generation_removed)
191 || (op->generation_created < ee->generation_added))
194 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
195 &ee->element_hash, ee,
196 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
198 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
200 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
207 * Counts all valid elements in the hashmap
208 * (the ones that are valid in our generation)
211 * @param key current key code
212 * @param value value in the hash map
213 * @return #GNUNET_YES if we should continue to
218 iterator_element_count (void *cls,
219 const struct GNUNET_HashCode *key,
221 struct ElementEntry *ee = value;
222 struct Operation *op = cls;
224 //only consider this element, if it is valid for us
225 if ((op->generation_created >= ee->generation_removed)
226 || (op->generation_created < ee->generation_added))
229 op->state->my_elements_count++;
235 * removes element from a hashmap if it is not contained within the
236 * provided remote bloomfilter. Then, fill our new bloomfilter.
239 * @param key current key code
240 * @param value value in the hash map
241 * @return #GNUNET_YES if we should continue to
246 iterator_bf_round (void *cls,
247 const struct GNUNET_HashCode *key,
249 struct ElementEntry *ee = value;
250 struct Operation *op = cls;
251 struct GNUNET_HashCode mutated_hash;
253 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
255 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
257 op->state->my_elements_count--;
258 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
264 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt+1, &mutated_hash);
266 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
273 * Inform the client that the union operation has failed,
274 * and proceed to destroy the evaluate operation.
276 * @param op the intersection operation to fail
279 fail_intersection_operation (struct Operation *op)
281 struct GNUNET_MQ_Envelope *ev;
282 struct GNUNET_SET_ResultMessage *msg;
284 if (op->state->my_elements)
285 GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements);
287 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n");
289 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
290 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
291 msg->request_id = htonl (op->spec->client_request_id);
292 msg->element_type = htons (0);
293 GNUNET_MQ_send (op->spec->set->client_mq, ev);
294 _GSS_operation_destroy (op);
300 * Inform the peer that this operation is complete.
302 * @param eo the intersection operation to fail
305 send_peer_done (struct Operation *op)
307 struct GNUNET_MQ_Envelope *ev;
309 op->state->phase = PHASE_FINISHED;
310 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
311 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
312 op->state->local_bf = NULL;
314 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
315 GNUNET_MQ_send (op->mq, ev);
319 * Send a request for the evaluate operation to a remote peer
321 * @param eo operation with the other peer
324 send_operation_request (struct Operation *op)
326 struct GNUNET_MQ_Envelope *ev;
327 struct OperationRequestMessage *msg;
329 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
330 op->spec->context_msg);
334 /* the context message is too large */
336 GNUNET_SERVER_client_disconnect (op->spec->set->client);
339 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
340 msg->app_id = op->spec->app_id;
341 msg->salt = htonl (op->spec->salt);
342 msg->element_count = htonl(op->state->my_elements);
344 GNUNET_MQ_send (op->mq, ev);
346 if (NULL != op->spec->context_msg)
347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
351 if (NULL != op->spec->context_msg)
353 GNUNET_free (op->spec->context_msg);
354 op->spec->context_msg = NULL;
360 * Handle an BF message from a remote peer.
362 * @param cls the intersection operation
363 * @param mh the header of the message
366 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
368 struct Operation *op = cls;
369 struct BFMessage *msg = (struct BFMessage *) mh;
372 old_count = op->state->my_elements_count;
373 op->spec->salt = ntohl (msg->sender_mutator);
375 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init (&msg[1],
377 ntohl (msg->bloomfilter_length));
378 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
380 GNUNET_CONSTANTS_BLOOMFILTER_K);
381 switch (op->state->phase)
384 // If we are ot our first msg
385 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_elements_count, GNUNET_YES);
387 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
388 &iterator_initialization_by_alice,
391 case PHASE_BF_EXCHANGE:
392 case PHASE_MAYBE_FINISHED:
393 // if we are bob or alice and are continuing operation
394 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
400 fail_intersection_operation(op);
402 // the iterators created a new BF with salt+1
403 // the peer needs this information for decoding the next BF
404 // this behavior can be modified at will later on.
407 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
408 op->state->remote_bf = NULL;
410 if ((op->state->phase == PHASE_MAYBE_FINISHED)
411 && (old_count == op->state->my_elements_count)){
412 // In the last round we though we were finished, we now know this is correct
417 op->state->phase = PHASE_BF_EXCHANGE;
418 // maybe we are finished, but we do one more round to make certain
419 // we don't have false positives ...
420 if (op->state->my_elements_count == ntohl (msg->sender_element_count))
421 op->state->phase = PHASE_MAYBE_FINISHED;
423 send_bloomfilter (op);
428 * Handle an BF message from a remote peer.
430 * @param cls the intersection operation
431 * @param mh the header of the message
434 handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
436 struct Operation *op = cls;
437 struct BFMessage *msg = (struct BFMessage *) mh;
438 uint32_t remote_element_count;
440 remote_element_count = ntohl(msg->sender_element_count);
441 if ((op->state->phase == PHASE_INITIAL)
442 || (op->state->my_elements_count > remote_element_count)){
444 fail_intersection_operation(op);
447 op->state->phase = PHASE_BF_EXCHANGE;
448 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
450 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
452 GNUNET_CONSTANTS_BLOOMFILTER_K);
453 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
454 &iterator_initialization,
457 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
458 op->state->remote_bf = NULL;
460 if (op->state->my_elements_count == ntohl (msg->sender_element_count))
461 op->state->phase = PHASE_MAYBE_FINISHED;
463 send_bloomfilter (op);
468 * Send a bloomfilter to our peer.
469 * that the operation is over.
470 * After the result done message has been sent to the client,
471 * destroy the evaluate operation.
473 * @param eo intersection operation
476 send_bloomfilter (struct Operation *op)
478 struct GNUNET_MQ_Envelope *ev;
479 struct BFMessage *msg;
482 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n",);
484 // send our bloomfilter
485 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (op->state->local_bf);
487 ev = GNUNET_MQ_msg_extra (msg, bf_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
489 msg->sender_element_count = htonl (op->state->my_elements_count);
490 msg->bloomfilter_length = htonl (bf_size);
491 msg->sender_mutator = htonl (op->spec->salt);
492 GNUNET_assert (GNUNET_SYSERR !=
493 GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
494 &msg->sender_bf_data,
496 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
497 op->state->local_bf = NULL;
498 GNUNET_MQ_send (op->mq, ev);
502 * Send our element to the peer, in case our element count is lower than his
504 * @param eo intersection operation
507 send_element_count (struct Operation *op)
509 struct GNUNET_MQ_Envelope *ev;
510 struct BFMessage *msg;
512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n");
514 // just send our element count, as the other peer must start
515 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
517 msg->sender_element_count = htonl (op->state->my_elements_count);
518 msg->bloomfilter_length = htonl (0);
519 msg->sender_mutator = htonl (0);
521 GNUNET_MQ_send (op->mq, ev);
525 * Send a result message to the client indicating
526 * that the operation is over.
527 * After the result done message has been sent to the client,
528 * destroy the evaluate operation.
530 * @param op intersection operation
533 finish_and_destroy (struct Operation *op)
535 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
537 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
540 op->state->full_result_iter =
541 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->my_elements);
542 send_remaining_elements (op);
545 send_client_done_and_destroy (op);
549 * Handle a done message from a remote peer
551 * @param cls the union operation
552 * @param mh the message
555 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
557 struct Operation *op = cls;
558 struct GNUNET_MQ_Envelope *ev;
560 if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
561 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
563 finish_and_destroy (op);
568 fail_intersection_operation (op);
573 * Evaluate a union operation with
576 * @param op operation to evaluate
579 intersection_evaluate (struct Operation *op)
581 op->state = GNUNET_new (struct OperationState);
582 /* we started the operation, thus we have to send the operation request */
583 op->state->phase = PHASE_INITIAL;
584 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
585 GNUNET_CONTAINER_multihashmap_iterate(op->spec->set->elements,
586 &iterator_element_count,
589 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation");
590 send_operation_request (op);
594 * Accept an union operation request from a remote peer.
595 * Only initializes the private operation state.
597 * @param op operation that will be accepted as a union operation
600 intersection_accept (struct Operation *op)
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
603 op->state = GNUNET_new (struct OperationState);
604 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
605 GNUNET_CONTAINER_multihashmap_iterate(op->spec->set->elements,
606 &iterator_element_count,
608 // if Alice (the peer) has more elements than Bob (us), she should start
609 if (op->spec->element_count < op->state->my_elements_count){
610 op->state->phase = PHASE_INITIAL;
611 send_element_count(op);
614 // create a new bloomfilter in case we have fewer elements
615 op->state->phase = PHASE_BF_EXCHANGE;
616 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
618 GNUNET_CONSTANTS_BLOOMFILTER_K);
619 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
620 &iterator_initialization,
622 send_bloomfilter (op);
627 * Create a new set supporting the intersection operation
629 * @return the newly created set
631 static struct SetState *
632 intersection_set_create (void)
634 struct SetState *set_state;
636 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
638 set_state = GNUNET_new (struct SetState);
645 * Add the element from the given element message to the set.
647 * @param set_state state of the set want to add to
648 * @param ee the element to add to the set
651 intersection_add (struct SetState *set_state, struct ElementEntry *ee)
658 * Destroy a set that supports the intersection operation
660 * @param set_state the set to destroy
663 intersection_set_destroy (struct SetState *set_state)
665 GNUNET_free (set_state);
670 * Remove the element given in the element message from the set.
672 * @param set_state state of the set to remove from
673 * @param element set element to remove
676 intersection_remove (struct SetState *set_state, struct ElementEntry *element)
683 * Dispatch messages for a intersection operation.
685 * @param eo the state of the intersection evaluate operation
686 * @param mh the received message
687 * @return GNUNET_SYSERR if the tunnel should be disconnected,
688 * GNUNET_OK otherwise
691 intersection_handle_p2p_message (struct Operation *op,
692 const struct GNUNET_MessageHeader *mh)
694 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
695 ntohs (mh->type), ntohs (mh->size));
696 switch (ntohs (mh->type))
698 /* this message handler is not active until after we received an
699 * operation request message, thus the ops request is not handled here
701 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
702 handle_p2p_element_info (op, mh);
704 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
705 handle_p2p_bf (op, mh);
707 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
708 handle_p2p_done (op, mh);
711 /* something wrong with mesh's message handlers? */
718 * Signal to the client that the operation has finished and
719 * destroy the operation.
721 * @param cls operation to destroy
724 send_client_done_and_destroy (void *cls)
726 struct Operation *op = cls;
727 struct GNUNET_MQ_Envelope *ev;
728 struct GNUNET_SET_ResultMessage *rm;
729 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
730 rm->request_id = htonl (op->spec->client_request_id);
731 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
732 rm->element_type = htons (0);
733 GNUNET_MQ_send (op->spec->set->client_mq, ev);
734 _GSS_operation_destroy (op);
737 * Send all elements in the full result iterator.
739 * @param cls operation
742 send_remaining_elements (void *cls)
744 struct Operation *op = cls;
745 struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
746 struct GNUNET_MQ_Envelope *ev;
747 struct GNUNET_SET_ResultMessage *rm;
748 struct GNUNET_SET_Element *element;
751 res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
752 if (GNUNET_NO == res) {
753 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
754 send_client_done_and_destroy (op);
758 element = &remaining->element;
759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
760 GNUNET_assert (0 != op->spec->client_request_id);
762 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
763 GNUNET_assert (NULL != ev);
765 rm->result_status = htons (GNUNET_SET_STATUS_OK);
766 rm->request_id = htonl (op->spec->client_request_id);
767 rm->element_type = element->type;
768 memcpy (&rm[1], element->data, element->size);
770 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
771 GNUNET_MQ_send (op->spec->set->client_mq, ev);
775 * handler for peer-disconnects, notifies the client about the aborted operation
777 * @param op the destroyed operation
780 intersection_peer_disconnect (struct Operation *op)
782 if (PHASE_FINISHED != op->state->phase)
784 struct GNUNET_MQ_Envelope *ev;
785 struct GNUNET_SET_ResultMessage *msg;
787 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
788 msg->request_id = htonl (op->spec->client_request_id);
789 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
790 msg->element_type = htons (0);
791 GNUNET_MQ_send (op->spec->set->client_mq, ev);
792 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
793 _GSS_operation_destroy (op);
796 // else: the session has already been concluded
797 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
798 if (GNUNET_NO == op->state->client_done_sent)
799 finish_and_destroy (op);
804 * Destroy the union operation. Only things specific to the union operation are destroyed.
806 * @param op union operation to destroy
809 intersection_op_cancel (struct Operation *op)
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
812 /* check if the op was canceled twice */
813 GNUNET_assert (NULL != op->state);
814 if (NULL != op->state->remote_bf)
816 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
817 op->state->remote_bf = NULL;
819 if (NULL != op->state->local_bf)
821 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
822 op->state->local_bf = NULL;
824 if (NULL != op->state->my_elements)
826 // no need to free the elements, they are still part of the set
827 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
828 op->state->my_elements = NULL;
830 GNUNET_free (op->state);
832 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
836 _GSS_intersection_vt ()
838 static const struct SetVT intersection_vt = {
839 .create = &intersection_set_create,
840 .msg_handler = &intersection_handle_p2p_message,
841 .add = &intersection_add,
842 .remove = &intersection_remove,
843 .destroy_set = &intersection_set_destroy,
844 .evaluate = &intersection_evaluate,
845 .accept = &intersection_accept,
846 .peer_disconnect = &intersection_peer_disconnect,
847 .cancel = &intersection_op_cancel,
850 return &intersection_vt;